In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, year, month, dayofmonth, quarter, row_number, desc, first, corr, count, lit, concat_ws, avg
from pyspark.sql.window import Window

In [None]:
# Инициализация SparkSession с драйвером PostgreSQL
spark = SparkSession.builder \
    .master("spark://spark-master:7077") \
    .appName("ETL to Star") \
    .config("spark.jars", "/opt/spark/jars/postgresql-42.7.4.jar,/opt/spark/jars/clickhouse-jdbc-0.6.0.jar") \
    .getOrCreate()

# Чтение данных из PostgreSQL
pg_url = "jdbc:postgresql://postgres:5432/bober_db"
pg_properties = {"user": "bober", "password": "bober", "driver": "org.postgresql.Driver"}
df = spark.read.jdbc(url=pg_url, table="mock_data", properties=pg_properties)

# Проверка чтения данных
df.head(1)

# Снежинка

In [None]:
dim_date = df.select(col("sale_date").alias("full_date")) \
    .distinct() \
    .filter(col("full_date").isNotNull()) \
    .withColumn("date_id", row_number().over(Window.orderBy("full_date"))) \
    .withColumn("year", year("full_date")) \
    .withColumn("month", month("full_date")) \
    .withColumn("day", dayofmonth("full_date")) \
    .withColumn("quarter", quarter("full_date"))

dim_date.write.jdbc(url=pg_url, table="dim_date", mode="overwrite", properties=pg_properties)

dim_customer = df.select(
    col("sale_customer_id").alias("customer_id"),
    col("customer_first_name").alias("first_name"),
    col("customer_last_name").alias("last_name"),
    col("customer_age").alias("age"),
    col("customer_email").alias("email"),
    col("customer_country").alias("country"),
    col("customer_postal_code").alias("postal_code")
).distinct()

dim_customer.write.jdbc(url=pg_url, table="dim_customer", mode="overwrite", properties=pg_properties)

dim_seller = df.select(
    col("sale_seller_id").alias("seller_id"),
    col("seller_first_name").alias("first_name"),
    col("seller_last_name").alias("last_name"),
    col("seller_email").alias("email"),
    col("seller_country").alias("country"),
    col("seller_postal_code").alias("postal_code")
).distinct()

dim_seller.write.jdbc(url=pg_url, table="dim_seller", mode="overwrite", properties=pg_properties)

dim_product = df.select(
    col("sale_product_id").alias("product_id"),
    col("product_name").alias("name"),
    col("product_category").alias("category"),          
    col("product_price").alias("price"),
    col("product_weight").alias("weight"),
    col("product_color").alias("color"),
    col("product_size").alias("size"),
    col("product_brand").alias("brand"),
    col("product_material").alias("material"),
    col("product_description").alias("description"),
    col("product_rating").alias("rating"),
    col("product_reviews").alias("reviews"),
    col("product_release_date").alias("release_date"),
    col("product_expiry_date").alias("expiry_date")
).distinct()

dim_product.write.jdbc(url=pg_url, table="dim_product", mode="overwrite", properties=pg_properties)

dim_store_raw = df.select(
    "store_name", "store_location", "store_city",
    "store_state", "store_country", "store_phone", "store_email"
).distinct()

store_window = Window.orderBy("store_name", "store_city", "store_country")
dim_store = dim_store_raw.withColumn("store_id", row_number().over(store_window))
dim_store.write.jdbc(url=pg_url, table="dim_store", mode="overwrite", properties=pg_properties)

supplier_window = Window.orderBy("supplier_name", "supplier_city", "supplier_country")
dim_supplier = df.select(
    "supplier_name",
    col("supplier_contact").alias("contact"),
    "supplier_email",
    "supplier_phone",
    "supplier_address",
    "supplier_city",
    "supplier_country"
).distinct() \
    .withColumn("supplier_id", row_number().over(supplier_window))

dim_supplier.write.jdbc(url=pg_url, table="dim_supplier", mode="overwrite", properties=pg_properties)

dim_pet_raw = df.select(
    col("sale_customer_id").alias("customer_id"),
    col("customer_pet_type").alias("pet_type"),
    col("customer_pet_name").alias("pet_name"),
    col("customer_pet_breed").alias("pet_breed"),
    col("pet_category").alias("pet_category")        
).distinct()

# Окно определяем ПОСЛЕ select + alias, чтобы использовать новые имена колонок
pet_window = Window.orderBy("customer_id", "pet_name", "pet_type")
dim_pet = dim_pet_raw.withColumn("pet_id", row_number().over(pet_window))
dim_pet.write.jdbc(url=pg_url, table="dim_pet", mode="overwrite", properties=pg_properties)

fact_sales = df \
    .join(dim_date, df.sale_date == dim_date.full_date, "left") \
    .join(dim_store, 
          (df.store_name == dim_store.store_name) &
          (df.store_location == dim_store.store_location) &
          (df.store_city == dim_store.store_city) &
          (df.store_state == dim_store.store_state) &
          (df.store_country == dim_store.store_country) &
          (df.store_phone == dim_store.store_phone) &
          (df.store_email == dim_store.store_email), "left") \
    .join(dim_supplier,
          (df.supplier_name == dim_supplier.supplier_name) &
          (df.supplier_city == dim_supplier.supplier_city) &
          (df.supplier_country == dim_supplier.supplier_country), "left") \
    .join(dim_pet,
          (df.sale_customer_id == dim_pet.customer_id) &
          (df.customer_pet_name == dim_pet.pet_name) &
          (df.customer_pet_type == dim_pet.pet_type), "left") \
    .select(
        col("id").alias("sale_id"),
        col("sale_customer_id").alias("customer_id"),
        col("pet_id"),
        col("sale_seller_id").alias("seller_id"),
        col("sale_product_id").alias("product_id"),
        col("store_id"),
        col("supplier_id"),
        col("date_id"),
        col("sale_quantity").alias("sale_quantity"),
        col("sale_total_price").alias("sale_total_price")
    )

fact_sales.write.jdbc(url=pg_url, table="fact_sales", mode="overwrite", properties=pg_properties)

print("Звёздная схема успешно построена! Проверить можно в DBeaver: SELECT * FROM fact_sales LIMIT 5;")


In [None]:
# Настройки подключения к БД и Spark
ch_url = "jdbc:clickhouse://clickhouse:8123/default"
ch_options = {
    "host": "clickhouse",
    "port": "8123",
    "user": "default",
    "password": "",
    "database": "default"
}


25/11/17 19:24:32 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [4]:
# Загружаем все таблицы звезды
fact = spark.read.jdbc(url=pg_url, table="fact_sales", properties=pg_properties)
dim_product = spark.read.jdbc(url=pg_url, table="dim_product", properties=pg_properties)
dim_customer = spark.read.jdbc(url=pg_url, table="dim_customer", properties=pg_properties)
dim_store = spark.read.jdbc(url=pg_url, table="dim_store", properties=pg_properties)
dim_supplier = spark.read.jdbc(url=pg_url, table="dim_supplier", properties=pg_properties)
dim_date = spark.read.jdbc(url=pg_url, table="dim_date", properties=pg_properties)

In [None]:
# Продажи по продуктам
product_vitrina = fact.join(dim_product, fact.product_id == dim_product.product_id) \
    .groupBy(dim_product.product_id, dim_product.name, dim_product.category) \
    .agg(
        _sum("sale_quantity").alias("total_quantity"),
        _sum("sale_total_price").alias("total_revenue"),
        first("rating").alias("avg_rating"),
        first("reviews").alias("review_count")
    )
product_vitrina.write.jdbc(url=ch_url, table="vitrina_product_sales", mode="overwrite", properties=ch_properties)

# Топ-10 самых продаваемых (отдельная таблица для удобства проверки)
top10_products = product_vitrina.orderBy(desc("total_quantity")).limit(10)
top10_products.write.jdbc(url=ch_url, table="top10_sold_products", mode="overwrite", properties=ch_properties)

# Выручка по категориям (отдельная таблица)
category_revenue = product_vitrina.groupBy("category") \
    .agg(_sum("total_revenue").alias("category_revenue"))
category_revenue.write.jdbc(url=ch_url, table="category_revenue", mode="overwrite", properties=ch_properties)

# Продажи по клиентам
customer_vitrina = fact.join(dim_customer, fact.customer_id == dim_customer.customer_id) \
    .groupBy(dim_customer.customer_id, dim_customer.first_name, dim_customer.last_name, dim_customer.country) \
    .agg(
        _sum("sale_total_price").alias("total_spent"),
        count("*").alias("order_count"),
        avg("sale_total_price").alias("avg_check")
    ) \
    .withColumn("customer_name", concat_ws(" ", col("first_name"), col("last_name"))) \
    .select("customer_id", "customer_name", "country", "total_spent", "order_count", "avg_check")
customer_vitrina.write.jdbc(url=ch_url, table="vitrina_customer_sales", mode="overwrite", properties=ch_properties)

# Топ-10 клиентов
top10_customers = customer_vitrina.orderBy(desc("total_spent")).limit(10)
top10_customers.write.jdbc(url=ch_url, table="top10_customers_by_spent", mode="overwrite", properties=ch_properties)

# Распределение по странам (отдельная таблица)
customer_country_dist = customer_vitrina.groupBy("country") \
    .agg(
        _sum("total_spent").alias("total_spent_by_country"),
        count("*").alias("customer_count")
    )
customer_country_dist.write.jdbc(url=ch_url, table="customer_country_distribution", mode="overwrite", properties=ch_properties)

# Продажи по времени
time_vitrina = fact.join(dim_date, fact.date_id == dim_date.date_id) \
    .groupBy(dim_date.year, dim_date.month) \
    .agg(
        _sum("sale_total_price").alias("total_revenue"),
        _sum("sale_quantity").alias("total_quantity"),
        count("*").alias("order_count")
    ) \
    .withColumn("avg_check", col("total_revenue") / col("order_count")) \
    .withColumn("avg_order_size", col("total_quantity") / col("order_count"))
time_vitrina.write.jdbc(url=ch_url, table="vitrina_time_sales", mode="overwrite", properties=ch_properties)

# Продажи по магазинам
store_vitrina = fact.join(dim_store, fact.store_id == dim_store.store_id) \
    .groupBy(dim_store.store_id, dim_store.store_name, dim_store.city, dim_store.country) \
    .agg(
        _sum("sale_total_price").alias("total_revenue"),
        count("*").alias("order_count"),
        avg("sale_total_price").alias("avg_check")
    )
store_vitrina.write.jdbc(url=ch_url, table="vitrina_store_sales", mode="overwrite", properties=ch_properties)

# Топ-5 магазинов
top5_stores = store_vitrina.orderBy(desc("total_revenue")).limit(5)
top5_stores.write.jdbc(url=ch_url, table="top5_stores_by_revenue", mode="overwrite", properties=ch_properties)

# Продажи по поставщикам
supplier_vitrina = fact.join(dim_product[["product_id", "price"]], fact.product_id == dim_product.product_id) \
    .join(dim_supplier, fact.supplier_id == dim_supplier.supplier_id) \
    .groupBy(dim_supplier.supplier_id, dim_supplier.supplier_name, dim_supplier.country) \
    .agg(
        _sum("sale_total_price").alias("total_revenue"),
        _sum(col("price") * col("sale_quantity")).alias("weighted_price_sum"),
        _sum("sale_quantity").alias("total_quantity")
    ) \
    .withColumn("avg_price", col("weighted_price_sum") / col("total_quantity")) \
    .select("supplier_id", "supplier_name", "country", "total_revenue", "avg_price")
supplier_vitrina.write.jdbc(url=ch_url, table="vitrina_supplier_sales", mode="overwrite", properties=ch_properties)

# Топ-5 поставщиков
top5_suppliers = supplier_vitrina.orderBy(desc("total_revenue")).limit(5)
top5_suppliers.write.jdbc(url=ch_url, table="top5_suppliers_by_revenue", mode="overwrite", properties=ch_properties)

# Качество продукции
quality_vitrina = fact.join(dim_product, fact.product_id == dim_product.product_id) \
    .groupBy(dim_product.product_id, dim_product.name) \
    .agg(
        first("rating").alias("rating"),
        first("reviews").alias("review_count"),
        _sum("sale_quantity").alias("total_quantity"),
        _sum("sale_total_price").alias("total_revenue")
    )
quality_vitrina.write.jdbc(url=ch_url, table="vitrina_product_quality", mode="overwrite", properties=ch_properties)

# Корреляция (одна строка — отдельная таблица)
correlation = quality_vitrina.agg(
    corr("rating", "total_revenue").alias("corr_rating_revenue"),
    corr("rating", "total_quantity").alias("corr_rating_quantity")
).withColumn("description", lit("Correlation between rating and sales"))
correlation.write.jdbc(url=ch_url, table="product_quality_correlation", mode="overwrite", properties=ch_properties)


print("Все 6 витрин + топы + корреляция успешно загружены в ClickHouse!")
print("Проверить можно в DBeaver или clickhouse-client:")
print("SELECT * FROM vitrina_product_sales LIMIT 10;")
print("SELECT corr(rating, total_quantity) FROM vitrina_product_quality;")

In [13]:
# Завершаем сессию Spark
spark.stop()