In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id, col, row_number, lit, count, desc, regexp_replace, sum, avg, year, month, asc
from pyspark.sql.window import Window
from pyspark.sql.types import *


KeyboardInterrupt: 

In [None]:
spark = SparkSession.builder \
    .appName("Spark SQL with PostgreSQL and ClickHouse") \
    .config("spark.jars", "postgresql-42.6.0.jar,clickhouse-jdbc-0.4.6.jar") \
    .getOrCreate()

In [None]:

schema = StructType([
    # Customer fields
    StructField("id", IntegerType(), nullable=False),
    StructField("customer_first_name", StringType()),
    StructField("customer_last_name", StringType()),
    StructField("customer_age", IntegerType()),
    StructField("customer_email", StringType()),
    StructField("customer_country", StringType()),
    StructField("customer_postal_code", StringType()),
    StructField("customer_pet_type", StringType()),
    StructField("customer_pet_name", StringType()),
    StructField("customer_pet_breed", StringType()),
    
    # Seller fields
    StructField("seller_first_name", StringType()),
    StructField("seller_last_name", StringType()),
    StructField("seller_email", StringType()),
    StructField("seller_country", StringType()),
    StructField("seller_postal_code", StringType()),
    
    # Product fields
    StructField("product_name", StringType()),
    StructField("product_category", StringType()),
    StructField("product_price", DecimalType(10, 2)),  # Will map to PostgreSQL money
    StructField("product_quantity", IntegerType()),
    
    # Sale fields
    StructField("sale_date", DateType()),
    StructField("sale_customer_id", IntegerType()),
    StructField("sale_seller_id", IntegerType()),
    StructField("sale_product_id", IntegerType()),
    StructField("sale_quantity", IntegerType()),
    StructField("sale_total_price", DecimalType(10, 2)),  # PostgreSQL money
    
    # Store fields
    StructField("store_name", StringType()),
    StructField("store_location", StringType()),
    StructField("store_city", StringType()),
    StructField("store_state", StringType()),
    StructField("store_country", StringType()),
    StructField("store_phone", StringType()),
    StructField("store_email", StringType()),
    
    # Additional product details
    StructField("pet_category", StringType()),
    StructField("product_weight", FloatType()),
    StructField("product_color", StringType()),
    StructField("product_size", StringType()),
    StructField("product_brand", StringType()),
    StructField("product_material", StringType()),
    StructField("product_description", StringType()),
    StructField("product_rating", FloatType()),
    StructField("product_reviews", IntegerType()),
    StructField("product_release_date", DateType()),
    StructField("product_expiry_date", DateType()),
    
    # Supplier fields
    StructField("supplier_name", StringType()),
    StructField("supplier_contact", StringType()),
    StructField("supplier_email", StringType()),
    StructField("supplier_phone", StringType()),
    StructField("supplier_address", StringType()),
    StructField("supplier_city", StringType()),
    StructField("supplier_country", StringType())
])

In [None]:
jdbc_url = "jdbc:postgresql://postgres:5432/spark_db"
properties = {
    "user": "spark_user",
    "password": "spark_password",
    "driver": "org.postgresql.Driver"
}

In [None]:
data = spark.read.jdbc(url=jdbc_url, table="mock_data", properties=properties)

In [None]:
data.schema


In [None]:
customer = data.select(
    'customer_pet_type',
    'customer_pet_name',
    'customer_pet_breed',
    'customer_country',
    'customer_postal_code',
    'customer_first_name',
    'customer_last_name',
    'customer_age',
    'customer_email'
).distinct()

window = Window.orderBy('customer_first_name')
customer = customer.withColumn("id", row_number().over(window))
customer.show()

In [None]:
seller = data.select(
    'seller_first_name',
    'seller_last_name',
    'seller_country',
    'seller_postal_code',
    'seller_email'
).distinct()

window = Window.orderBy('seller_first_name')
seller = seller.withColumn("id", row_number().over(window))
seller.show()

In [None]:
store = data.select(
    'store_location',
    'store_city',
    'store_state',
    'store_country',
    'store_phone',
    'store_email',
    'store_name',
).distinct()

window = Window.orderBy('store_name')
store = store.withColumn("id", row_number().over(window))
store.show()

In [None]:
product = data.select(
    'product_weight',
    'product_color',
    'product_size',
    'product_material',
    'product_description',
    'product_rating',
    'product_reviews',
    'product_brand',
    'product_name',
    'product_category',
    'product_price',
    'product_quantity',
    'product_release_date',
    'product_expiry_date',
).distinct()

window = Window.orderBy('product_name')
product = product.withColumn("id", row_number().over(window))
product.show()

In [None]:
sale = data.select(
    'sale_quantity',
    'sale_total_price',
    'sale_date',
    'sale_customer_id',
    'sale_seller_id',
    'sale_product_id',
).distinct()

sale = sale.withColumn("sale_total_price", regexp_replace("sale_total_price", "\\$", "").cast("double"))


window = Window.orderBy('sale_date')
sale = sale.withColumn("id", row_number().over(window))
sale.show()

In [None]:
supplier = data.select(
    'supplier_name',
    'supplier_contact',
    'supplier_email',
    'supplier_phone',
    'supplier_address',
    'supplier_city',
    'supplier_country'
).distinct()

window = Window.orderBy('supplier_name')
supplier = supplier.withColumn("id", row_number().over(window))
supplier.show()

In [None]:
fact = data.join(customer,
    on=[
        data.customer_pet_type == customer.customer_pet_type,
        data.customer_pet_name == customer.customer_pet_name,
        data.customer_pet_breed == customer.customer_pet_breed,
        data.customer_country == customer.customer_country,
        data.customer_postal_code == customer.customer_postal_code,
        data.customer_first_name == customer.customer_first_name,
        data.customer_last_name == customer.customer_last_name,
        data.customer_age == customer.customer_age,
        data.customer_email == customer.customer_email
    ],
    how = 'left'
).join(seller,
      on = [
        data.seller_first_name == seller.seller_first_name,
        data.seller_last_name == seller.seller_last_name,
        data.seller_country == seller.seller_country,
        data.seller_postal_code == seller.seller_postal_code,
        data.seller_email == seller.seller_email
      ],
    how = 'left'
).join(store,
      on = [
        data.store_location == store.store_location,
        data.store_city == store.store_city,
        data.store_state == store.store_state,
        data.store_country == store.store_country,
        data.store_phone == store.store_phone,
        data.store_email == store.store_email,
        data.store_name == store.store_name
      ],
    how = 'left'
).join(product,
      on = [
        data.product_weight == product.product_weight,
        data.product_color == product.product_color,
        data.product_size == product.product_size,
        data.product_material == product.product_material,
        data.product_description == product.product_description,
        data.product_rating == product.product_rating,
        data.product_reviews == product.product_reviews,
        data.product_brand == product.product_brand,
        data.product_name == product.product_name,
        data.product_category == product.product_category,
        data.product_quantity == product.product_quantity,
        data.product_release_date == product.product_release_date,
        data.product_reviews == product.product_reviews,
        data.product_expiry_date == product.product_expiry_date,
      ],
    how = 'left'
).join(sale,
      on = [
        data.sale_quantity == sale.sale_quantity,
        data.sale_total_price == sale.sale_total_price,
        data.sale_date == sale.sale_date,
        data.sale_customer_id == sale.sale_customer_id,
        data.sale_seller_id == sale.sale_seller_id,
        data.sale_product_id == sale.sale_product_id
      ],
    how = 'left'
).join(supplier,
      on = [
        data.supplier_name == supplier.supplier_name,
        data.supplier_contact == supplier.supplier_contact,
        data.supplier_email == supplier.supplier_email,
        data.supplier_phone == supplier.supplier_phone,
        data.supplier_address == supplier.supplier_address,
        data.supplier_city == supplier.supplier_city,
        data.supplier_country == supplier.supplier_country 
      ],
    how = 'left'
).select(
    data['id'],
    customer['id'].alias('customer_id'),
    seller['id'].alias('seller_id'),
    store['id'].alias('store_id'),
    product['id'].alias('product_id'),
    sale['id'].alias('sale_id'),
    supplier['id'].alias('supplier_id')
)
fact.show()


In [None]:
fact.count()

In [None]:
customer.write.jdbc(url=jdbc_url, table="customer", properties=properties)
seller.write.jdbc(url=jdbc_url, table="seller", properties=properties)
store.write.jdbc(url=jdbc_url, table="store", properties=properties)
product.write.jdbc(url=jdbc_url, table="product", properties=properties)
sale.write.jdbc(url=jdbc_url, table="sale", properties=properties)
supplier.write.jdbc(url=jdbc_url, table="supplier", properties=properties)
fact.write.jdbc(url=jdbc_url, table="fact", properties=properties)

In [None]:
print(sale.schema)

In [None]:
product_report = (
    fact.join(product, on = [fact.product_id == product.id]).join(sale, on = [fact.sale_id == sale.id])
    .groupBy("product_id")
    .agg(
        (count(col('product_id')) * sum(col('sale_quantity'))).alias("quantity sold"),
        sum(col("sale_total_price")),
        sum(col('product_reviews')),
        avg(col('product_rating'))
    ).orderBy(desc('quantity sold')).limit(10)
)
product_report.show()

In [None]:
customer_report = (
    fact.join(customer, fact.customer_id == customer.id)
    .join(sale, fact.sale_id == sale.id)
    .groupBy("customer_id", "customer_country")
    .agg(
        sum("sale_total_price").alias("total_spent"),
        (sum("sale_total_price") / count("sale_id")).alias("average_check")
    )
    .orderBy(desc("total_spent")).limit(10)
)
customer_report.show()

In [None]:
time_report = (
    fact.join(sale, fact.sale_id == sale.id)
    .withColumn("year", year("sale_date"))
    .withColumn("month", month("sale_date"))
    .groupBy("year", "month")
    .agg(
        sum("sale_total_price").alias("monthly_revenue"),
        count("sale_id").alias("number_of_sales"),
        avg("sale_total_price").alias("average_order_size")
    )
    .orderBy("year", "month")
)
time_report.show()

In [None]:
store_report = (
    fact.join(store, fact.store_id == store.id)
    .join(sale, fact.sale_id == sale.id)
    .groupBy("store_id", "store_name", "store_city", "store_country")
    .agg(
        sum("sale_total_price").alias("total_revenue"),
        count("sale_id").alias("number_of_sales"),
        avg("sale_total_price").alias("average_check")
    )
    .orderBy(desc("total_revenue")).limit(5)
)
store_report.show()

In [None]:
supplier_report = (
    fact.join(supplier, fact.supplier_id == supplier.id)
    .join(product, fact.product_id == product.id)
    .join(sale, fact.sale_id == sale.id)
    .groupBy("supplier_id", "supplier_name", "supplier_country")
    .agg(
        sum("sale_total_price").alias("total_revenue"),
        avg("product_price").alias("average_product_price"),
        count("sale_id").alias("number_of_sales")
    )
    .orderBy(desc("total_revenue")).limit(5)
)

supplier_report.show()

In [None]:
quality_report = (
    fact.join(product, fact.product_id == product.id).join(sale,fact.sale_id == sale.id)
    .groupBy("product_id", "product_name", "product_category")
    .agg(
        avg("product_rating").alias("average_rating"),
        sum("product_reviews").alias("total_reviews"),
        sum("sale_quantity").alias("total_quantity_sold"),
        (sum("product_reviews") / sum("sale_quantity")).alias("reviews_per_sale")
    )
    .orderBy(desc("average_rating"))
)
quality_report.show()

In [None]:
ch_jdbc_url = "jdbc:clickhouse://clickhouse:8123/default"
properties = {
    "driver": "com.clickhouse.jdbc.ClickHouseDriver",
    "user": "custom_user",
    "password": "custom_password"
}


In [None]:
# Останавливаем SparkSession
spark.stop()

In [None]:

def write_to_clickhouse(df, table_name):
    df.write       .format("jdbc")       .option("url", "jdbc:clickhouse://clickhouse:8123/default")       .option("driver", "com.clickhouse.jdbc.ClickHouseDriver")       .option("dbtable", table_name)       .mode("overwrite")       .save()


In [None]:
top_10_products = fact_sales.groupBy('product_id').agg(F.sum('sales_amount').alias('total_sales')).orderBy(F.desc('total_sales')).limit(10)
write_to_clickhouse(top_10_products, 'top_10_products')

In [None]:
revenue_by_category = fact_sales.join(dim_product, 'product_id').groupBy('category').agg(F.sum('sales_amount').alias('total_revenue'))
write_to_clickhouse(revenue_by_category, 'revenue_by_category')

In [None]:
product_reviews = dim_product.groupBy('product_id').agg(F.avg('rating').alias('avg_rating'), F.count('review_id').alias('review_count'))
write_to_clickhouse(product_reviews, 'product_reviews')

In [None]:
top_10_customers = fact_sales.groupBy('customer_id').agg(F.sum('sales_amount').alias('total_spent')).orderBy(F.desc('total_spent')).limit(10)
write_to_clickhouse(top_10_customers, 'top_10_customers')

In [None]:
customer_country_distribution = dim_customer.groupBy('country').agg(F.count('customer_id').alias('customer_count'))
write_to_clickhouse(customer_country_distribution, 'customer_country_distribution')

In [None]:
avg_receipt = fact_sales.groupBy('customer_id').agg(F.avg('sales_amount').alias('avg_receipt'))
write_to_clickhouse(avg_receipt, 'average_receipt_per_customer')

In [None]:
monthly_yearly_sales = fact_sales.join(dim_time, 'time_id').groupBy('year', 'month').agg(F.sum('sales_amount').alias('monthly_sales'))
write_to_clickhouse(monthly_yearly_sales, 'monthly_yearly_sales')

In [None]:
revenue_comparison = fact_sales.join(dim_time, 'time_id').groupBy('year').agg(F.sum('sales_amount').alias('yearly_revenue'))
write_to_clickhouse(revenue_comparison, 'revenue_comparison')

In [None]:
monthly_avg_order = fact_sales.join(dim_time, 'time_id').groupBy('month').agg(F.avg('sales_amount').alias('avg_order'))
write_to_clickhouse(monthly_avg_order, 'monthly_avg_order_size')

In [None]:
top_5_stores = fact_sales.groupBy('store_id').agg(F.sum('sales_amount').alias('store_revenue')).orderBy(F.desc('store_revenue')).limit(5)
write_to_clickhouse(top_5_stores, 'top_5_stores')

In [None]:
sales_by_location = fact_sales.join(dim_store, 'store_id').groupBy('city', 'country').agg(F.sum('sales_amount').alias('sales'))
write_to_clickhouse(sales_by_location, 'sales_by_location')

In [None]:
store_avg_check = fact_sales.groupBy('store_id').agg(F.avg('sales_amount').alias('avg_check'))
write_to_clickhouse(store_avg_check, 'store_average_check')

In [None]:
top_5_suppliers = fact_sales.join(dim_product, 'product_id').groupBy('supplier_id').agg(F.sum('sales_amount').alias('supplier_revenue')).orderBy(F.desc('supplier_revenue')).limit(5)
write_to_clickhouse(top_5_suppliers, 'top_5_suppliers')

In [None]:
avg_price = dim_product.groupBy('supplier_id').agg(F.avg('price').alias('avg_price'))
write_to_clickhouse(avg_price, 'avg_product_price_per_supplier')

In [None]:
sales_by_supplier_country = fact_sales.join(dim_product, 'product_id').join(dim_supplier, 'supplier_id').groupBy('country').agg(F.sum('sales_amount').alias('revenue'))
write_to_clickhouse(sales_by_supplier_country, 'sales_by_supplier_country')

In [None]:
rated = dim_product.select('product_id', 'rating').orderBy(F.desc('rating'))
write_to_clickhouse(rated.limit(10), 'top_rated_products')
write_to_clickhouse(rated.orderBy('rating').limit(10), 'worst_rated_products')

In [None]:
rating_sales = fact_sales.join(dim_product, 'product_id').select('rating', 'sales_amount')
write_to_clickhouse(rating_sales, 'rating_sales_correlation')

In [None]:
most_reviewed = dim_product.groupBy('product_id').agg(F.count('review_id').alias('review_count')).orderBy(F.desc('review_count'))
write_to_clickhouse(most_reviewed, 'most_reviewed_products')