In [1]:
from pyspark import SparkContext
print(SparkContext._active_spark_context)

None


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ETL Star Schema Reports to ClickHouse") \
    .config("spark.jars", "postgresql-42.6.0.jar,clickhouse-jdbc-0.4.6.jar") \
    .getOrCreate()

# PostgreSQL
pg_url   = "jdbc:postgresql://postgres:5432/spark_db"
pg_props = {
    "user": "spark_user",
    "password": "spark_password",
    "driver": "org.postgresql.Driver"
}

# ClickHouse
ch_url   = "jdbc:clickhouse://clickhouse:8123/default"
ch_props = {
    "driver":   "com.clickhouse.jdbc.ClickHouseDriver",
    "user":     "custom_user",
    "password": "custom_password",
}

print("‚úîÔ∏è SparkSession –∏ JDBC –Ω–∞—Å—Ç—Ä–æ–µ–Ω—ã")


‚úîÔ∏è SparkSession –∏ JDBC –Ω–∞—Å—Ç—Ä–æ–µ–Ω—ã


In [3]:
from pyspark.sql import functions as F

fact_sales    = spark.read.jdbc(
    url=pg_url,
    table="fact_sales",
    properties=pg_props
)
dim_products  = spark.read.jdbc(
    url=pg_url,
    table="dim_products",
    properties=pg_props
)
dim_customers = spark.read.jdbc(
    url=pg_url,
    table="dim_customers",
    properties=pg_props
)
dim_dates     = spark.read.jdbc(
    url=pg_url,
    table="dim_dates",
    properties=pg_props
)
dim_stores    = spark.read.jdbc(
    url=pg_url,
    table="dim_stores",
    properties=pg_props
)
dim_suppliers = spark.read.jdbc(
    url=pg_url,
    table="dim_suppliers",
    properties=pg_props
)
dim_countries = spark.read.jdbc(
    url=pg_url,
    table="dim_countries",
    properties=pg_props
)
dim_cities    = spark.read.jdbc(
    url=pg_url,
    table="dim_cities",
    properties=pg_props
)

print("‚úîÔ∏è –î–∞–Ω–Ω—ã–µ –∏–∑ PostgreSQL –∑–∞–≥—Ä—É–∂–µ–Ω—ã")


‚úîÔ∏è –î–∞–Ω–Ω—ã–µ –∏–∑ PostgreSQL –∑–∞–≥—Ä—É–∂–µ–Ω—ã


In [14]:
# –Ø—á–µ–π–∫–∞ 3a: sales_by_product ‚Äî –±–∞–∑–æ–≤–∞—è –≤–∏—Ç—Ä–∏–Ω–∞
from pyspark.sql.window import Window

prod_metrics = (
    fact_sales
      .join(dim_products, "product_id")
      .select("product_id","product_name","category_id","quantity","total_price","rating","reviews")
)

# 1) –¢–æ–ø-10 —Å–∞–º—ã—Ö –ø—Ä–æ–¥–∞–≤–∞–µ–º—ã—Ö –ø—Ä–æ–¥—É–∫—Ç–æ–≤
top10_products = (
    prod_metrics
      .groupBy("product_id","product_name")
      .agg(F.sum("quantity").alias("units_sold"))
      .orderBy(F.col("units_sold").desc())
      .limit(10)
)

# 2) –û–±—â–∞—è –≤—ã—Ä—É—á–∫–∞ –ø–æ –∫–∞—Ç–µ–≥–æ—Ä–∏—è–º –ø—Ä–æ–¥—É–∫—Ç–æ–≤
revenue_by_category = (
    prod_metrics
      .groupBy("category_id")
      .agg(F.sum("total_price").alias("revenue"))
)

# 3) –°—Ä–µ–¥–Ω–∏–π —Ä–µ–π—Ç–∏–Ω–≥ –∏ –∫–æ–ª-–≤–æ –æ—Ç–∑—ã–≤–æ–≤ –¥–ª—è –∫–∞–∂–¥–æ–≥–æ –ø—Ä–æ–¥—É–∫—Ç–∞
rating_reviews = (
    prod_metrics
      .groupBy("product_id","product_name")
      .agg(
         F.avg("rating").alias("avg_rating"),
         F.sum("reviews").alias("total_reviews")
      )
)

# –ó–∞–ø–∏—Å—ã–≤–∞–µ–º –≤—Å–µ —Ç—Ä–∏ –≤ ClickHouse
for tbl, df, order_cols in [
    ("top10_products",      top10_products,      ["units_sold"]),
    ("revenue_by_category", revenue_by_category, ["category_id"]),
    ("rating_reviews",      rating_reviews,      ["product_id"])
]:
    df.write.format("jdbc") \
       .option("url", ch_url).option("dbtable", tbl) \
       .options(**{**ch_props, "createTableOptions":f"ENGINE = MergeTree() ORDER BY ({','.join(order_cols)})"}) \
       .mode("overwrite").save()
    print(f"‚úîÔ∏è {tbl}: {df.count()} —Å—Ç—Ä–æ–∫")


‚úîÔ∏è top10_products: 3 —Å—Ç—Ä–æ–∫
‚úîÔ∏è revenue_by_category: 2 —Å—Ç—Ä–æ–∫
‚úîÔ∏è rating_reviews: 3 —Å—Ç—Ä–æ–∫


In [15]:
# –Ø—á–µ–π–∫–∞ 4a: sales_by_customer ‚Äî –≤—Å–µ –º–µ—Ç—Ä–∏–∫–∏
# 1) –¢–æ–ø-10 –∫–ª–∏–µ–Ω—Ç–æ–≤ –ø–æ –æ–±—â–µ–π —Å—É–º–º–µ –ø–æ–∫—É–ø–æ–∫
top10_customers = (
    fact_sales
      .join(dim_customers, "customer_id")
      .groupBy("customer_id","first_name","last_name")
      .agg(F.sum("total_price").alias("total_spent"))
      .orderBy(F.col("total_spent").desc())
      .limit(10)
)

# 2) –†–∞—Å–ø—Ä–µ–¥–µ–ª–µ–Ω–∏–µ –∫–ª–∏–µ–Ω—Ç–æ–≤ –ø–æ —Å—Ç—Ä–∞–Ω–∞–º
customers_by_country = (
    fact_sales
      .join(dim_customers, "customer_id")
      .join(dim_countries, ["country_id"], "left")
      .groupBy("country_name")
      .agg(F.countDistinct("customer_id").alias("unique_customers"))
)

# 3) –°—Ä–µ–¥–Ω–∏–π —á–µ–∫ –¥–ª—è –∫–∞–∂–¥–æ–≥–æ –∫–ª–∏–µ–Ω—Ç–∞
avg_check_per_customer = (
    fact_sales
      .groupBy("customer_id")
      .agg((F.sum("total_price")/F.count("*")).alias("avg_check"))
)

for tbl, df, order_cols in [
    ("top10_customers",        top10_customers,        ["total_spent"]),
    ("customers_by_country",   customers_by_country,   ["country_name"]),
    ("avg_check_per_customer", avg_check_per_customer, ["customer_id"])
]:
    df.write.format("jdbc") \
       .option("url", ch_url).option("dbtable", tbl) \
       .options(**{**ch_props, "createTableOptions":f"ENGINE = MergeTree() ORDER BY ({','.join(order_cols)})"}) \
       .mode("overwrite").save()
    print(f"‚úîÔ∏è {tbl}: {df.count()} —Å—Ç—Ä–æ–∫")


‚úîÔ∏è top10_customers: 10 —Å—Ç—Ä–æ–∫
‚úîÔ∏è customers_by_country: 204 —Å—Ç—Ä–æ–∫
‚úîÔ∏è avg_check_per_customer: 10000 —Å—Ç—Ä–æ–∫


In [16]:
# –Ø—á–µ–π–∫–∞ 5a: sales_by_time ‚Äî –≤—Å–µ –º–µ—Ç—Ä–∏–∫–∏
# 1) –ú–µ—Å—è—á–Ω—ã–µ –∏ –≥–æ–¥–æ–≤—ã–µ —Ç—Ä–µ–Ω–¥—ã –ø—Ä–æ–¥–∞–∂
monthly_trends = (
    fact_sales
      .join(dim_dates.withColumnRenamed("full_date","sale_date"), "date_id")
      .withColumn("year",  F.year("sale_date"))
      .withColumn("month", F.month("sale_date"))
      .groupBy("year","month")
      .agg(F.sum("total_price").alias("revenue"))
)

# 2) –°—Ä–∞–≤–Ω–µ–Ω–∏–µ –≤—ã—Ä—É—á–∫–∏ –∑–∞ —Ä–∞–∑–Ω—ã–µ –ø–µ—Ä–∏–æ–¥—ã (–Ω–∞–ø—Ä–∏–º–µ—Ä –≥–æ–¥ –∫ –≥–æ–¥—É)
yearly_revenue = (
    monthly_trends
      .groupBy("year")
      .agg(F.sum("revenue").alias("yearly_revenue"))
)

# 3) –°—Ä–µ–¥–Ω–∏–π —Ä–∞–∑–º–µ—Ä –∑–∞–∫–∞–∑–∞ –ø–æ –º–µ—Å—è—Ü–∞–º
avg_order_size_by_month = (
    fact_sales
      .join(dim_dates.withColumnRenamed("full_date","sale_date"), "date_id")
      .withColumn("year",  F.year("sale_date"))
      .withColumn("month", F.month("sale_date"))
      .groupBy("year","month")
      .agg((F.sum("total_price")/F.count("*")).alias("avg_order_size"))
)

for tbl, df, order_cols in [
    ("monthly_trends",          monthly_trends,          ["year","month"]),
    ("yearly_revenue",          yearly_revenue,          ["year"]),
    ("avg_order_size_by_month", avg_order_size_by_month, ["year","month"])
]:
    df.write.format("jdbc") \
       .option("url", ch_url).option("dbtable", tbl) \
       .options(**{**ch_props, "createTableOptions":f"ENGINE = MergeTree() ORDER BY ({','.join(order_cols)})"}) \
       .mode("overwrite").save()
    print(f"‚úîÔ∏è {tbl}: {df.count()} —Å—Ç—Ä–æ–∫")


‚úîÔ∏è monthly_trends: 12 —Å—Ç—Ä–æ–∫
‚úîÔ∏è yearly_revenue: 1 —Å—Ç—Ä–æ–∫
‚úîÔ∏è avg_order_size_by_month: 12 —Å—Ç—Ä–æ–∫


In [17]:
# –Ø—á–µ–π–∫–∞ 6a: sales_by_store ‚Äî –≤—Å–µ –º–µ—Ç—Ä–∏–∫–∏
# 1) –¢–æ–ø-5 –º–∞–≥–∞–∑–∏–Ω–æ–≤ –ø–æ –≤—ã—Ä—É—á–∫–µ
top5_stores = (
    fact_sales
      .join(dim_stores, "store_id")
      .groupBy("store_id","store_name")
      .agg(F.sum("total_price").alias("revenue"))
      .orderBy(F.col("revenue").desc())
      .limit(5)
)

# 2) –†–∞—Å–ø—Ä–µ–¥–µ–ª–µ–Ω–∏–µ –ø—Ä–æ–¥–∞–∂ –ø–æ –≥–æ—Ä–æ–¥–∞–º –∏ —Å—Ç—Ä–∞–Ω–∞–º
sales_by_city_country = (
    fact_sales
      .join(dim_stores, "store_id")
      .join(dim_cities,    ["city_id"],    "left")
      .join(dim_countries, ["country_id"], "left")
      .groupBy("city_name","country_name")
      .agg(F.sum("total_price").alias("revenue"))
)

# 3) –°—Ä–µ–¥–Ω–∏–π —á–µ–∫ –¥–ª—è –∫–∞–∂–¥–æ–≥–æ –º–∞–≥–∞–∑–∏–Ω–∞
avg_check_per_store = (
    fact_sales
      .groupBy("store_id")
      .agg((F.sum("total_price")/F.count("*")).alias("avg_check"))
)

for tbl, df, order_cols in [
    ("top5_stores",           top5_stores,           ["revenue"]),
    ("sales_by_city_country", sales_by_city_country, ["city_name"]),
    ("avg_check_per_store",   avg_check_per_store,   ["store_id"])
]:
    df.write.format("jdbc") \
       .option("url", ch_url).option("dbtable", tbl) \
       .options(**{**ch_props, "createTableOptions":f"ENGINE = MergeTree() ORDER BY ({','.join(order_cols)})"}) \
       .mode("overwrite").save()
    print(f"‚úîÔ∏è {tbl}: {df.count()} —Å—Ç—Ä–æ–∫")


‚úîÔ∏è top5_stores: 5 —Å—Ç—Ä–æ–∫
‚úîÔ∏è sales_by_city_country: 383 —Å—Ç—Ä–æ–∫
‚úîÔ∏è avg_check_per_store: 383 —Å—Ç—Ä–æ–∫


In [18]:
# –Ø—á–µ–π–∫–∞ 7a: sales_by_supplier ‚Äî –≤—Å–µ –º–µ—Ç—Ä–∏–∫–∏
# 1) –¢–æ–ø-5 –ø–æ—Å—Ç–∞–≤—â–∏–∫–æ–≤ –ø–æ –≤—ã—Ä—É—á–∫–µ
sales_with_supp = fact_sales.join(dim_products.select("product_id","supplier_id"), "product_id")
top5_suppliers = (
    sales_with_supp
      .groupBy("supplier_id")
      .agg(F.sum("total_price").alias("revenue"))
      .orderBy(F.col("revenue").desc())
      .limit(5)
)

# 2) –°—Ä–µ–¥–Ω—è—è —Ü–µ–Ω–∞ —Ç–æ–≤–∞—Ä–æ–≤ –æ—Ç –∫–∞–∂–¥–æ–≥–æ –ø–æ—Å—Ç–∞–≤—â–∏–∫–∞
avg_price_by_supplier = (
    sales_with_supp
      .groupBy("supplier_id")
      .agg((F.sum("total_price")/F.sum("quantity")).alias("avg_price"))
)

# 3) –†–∞—Å–ø—Ä–µ–¥–µ–ª–µ–Ω–∏–µ –ø—Ä–æ–¥–∞–∂ –ø–æ —Å—Ç—Ä–∞–Ω–∞–º –ø–æ—Å—Ç–∞–≤—â–∏–∫–æ–≤
sales_by_supplier_country = (
    sales_with_supp
      .join(dim_suppliers, "supplier_id")
      .join(dim_countries, ["country_id"], "left")
      .groupBy("country_name")
      .agg(F.sum("total_price").alias("revenue"))
)

for tbl, df, order_cols in [
    ("top5_suppliers",            top5_suppliers,            ["revenue"]),
    ("avg_price_by_supplier",     avg_price_by_supplier,     ["supplier_id"]),
    ("sales_by_supplier_country", sales_by_supplier_country, ["country_name"])
]:
    df.write.format("jdbc") \
       .option("url", ch_url).option("dbtable", tbl) \
       .options(**{**ch_props, "createTableOptions":f"ENGINE = MergeTree() ORDER BY ({','.join(order_cols)})"}) \
       .mode("overwrite").save()
    print(f"‚úîÔ∏è {tbl}: {df.count()} —Å—Ç—Ä–æ–∫")


‚úîÔ∏è top5_suppliers: 3 —Å—Ç—Ä–æ–∫
‚úîÔ∏è avg_price_by_supplier: 3 —Å—Ç—Ä–æ–∫
‚úîÔ∏è sales_by_supplier_country: 3 —Å—Ç—Ä–æ–∫


In [20]:
# –Ø—á–µ–π–∫–∞ 8a (–∏—Å–ø—Ä–∞–≤–ª–µ–Ω–Ω–∞—è): product_quality
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

quality = (
    fact_sales
      .join(dim_products, "product_id")
      .select("product_id","product_name","rating","reviews","quantity")
)

window_desc = Window.orderBy(F.col("rating").desc())
window_asc  = Window.orderBy(F.col("rating").asc())

highest_rating = (
    quality
      .withColumn("rn", row_number().over(window_desc))
      .filter(F.col("rn") == 1)
      .drop("rn")
)
lowest_rating = (
    quality
      .withColumn("rn", row_number().over(window_asc))
      .filter(F.col("rn") == 1)
      .drop("rn")
)

corr_val = quality.stat.corr("rating","quantity")

most_reviewed = (
    quality
      .groupBy("product_id","product_name")
      .agg(F.sum("reviews").alias("total_reviews"))
      .orderBy(F.col("total_reviews").desc())
      .limit(10)
)

for tbl, df, order_cols in [
    ("highest_rating", highest_rating, ["rating"]),
    ("lowest_rating",  lowest_rating,  ["rating"]),
    ("most_reviewed",  most_reviewed,  ["total_reviews"])
]:
    df.write.format("jdbc") \
       .option("url", ch_url).option("dbtable", tbl) \
       .options(**{**ch_props, "createTableOptions":f"ENGINE = MergeTree() ORDER BY ({','.join(order_cols)})"}) \
       .mode("overwrite").save()
    print(f"‚úîÔ∏è {tbl}: {df.count()} —Å—Ç—Ä–æ–∫")

print(f"üßÆ –ö–æ—Ä—Ä–µ–ª—è—Ü–∏—è rating‚Üîunits_sold = {corr_val:.4f}")


‚úîÔ∏è highest_rating: 1 —Å—Ç—Ä–æ–∫
‚úîÔ∏è lowest_rating: 1 —Å—Ç—Ä–æ–∫
‚úîÔ∏è most_reviewed: 3 —Å—Ç—Ä–æ–∫
üßÆ –ö–æ—Ä—Ä–µ–ª—è—Ü–∏—è rating‚Üîunits_sold = 0.0076
