In [17]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
!ls -l /opt/spark/jars/

total 23640
-rwxrwxrwx 1 root root 23089094 Dec  9 10:15 clickhouse-jdbc-0.4.6-all.jar
-rwxrwxrwx 1 root root  1116727 Dec  9 17:15 postgresql-42.7.8.jar


In [18]:
spark = SparkSession.builder \
    .appName("Postgres_to_ClickHouse_Marts") \
    .master("spark://spark-master:7077") \
    .config("spark.jars",
            "/opt/spark/jars/postgresql-42.7.8.jar,/opt/spark/jars/clickhouse-jdbc-0.4.6-all.jar") \
    .getOrCreate()

pg_url = "jdbc:postgresql://postgres:5432/lab2_db_postgres"
pg_props = {
    "user": "user",
    "password": "user",
    "driver": "org.postgresql.Driver",
}
ch_url = "jdbc:clickhouse://clickhouse:8123/default"
ch_props = {"driver": "com.clickhouse.jdbc.ClickHouseDriver", 
            "createTableOptions": "ENGINE = Log"
           }

In [19]:
fact_sales   = spark.read.jdbc(pg_url, "fact_sales",   properties=pg_props)
dim_product  = spark.read.jdbc(pg_url, "dim_product",  properties=pg_props)
dim_customer = spark.read.jdbc(pg_url, "dim_customer", properties=pg_props)
dim_store    = spark.read.jdbc(pg_url, "dim_store",    properties=pg_props)
dim_supplier = spark.read.jdbc(pg_url, "dim_supplier", properties=pg_props)

# Витрина продаж по продуктам

In [20]:
sales_prod = fact_sales.join(dim_product, "product_id")

# Топ-10 самых продаваемых продуктов
top10_products = (sales_prod
    .groupBy("product_id", "name")
    .agg(
        F.sum("quantity").alias("total_qty"),
        F.sum("total_price").alias("revenue")
    )
    .orderBy(F.desc("total_qty"))
    .limit(10)
)

top10_products.write \
    .mode("overwrite") \
    .jdbc(url=ch_url, table="mart_top10_products", properties=ch_props)

# Общая выручка по категориям продуктов 
revenue_by_category = (sales_prod
    .groupBy("category")
    .agg(F.sum("total_price").alias("revenue"))
)
revenue_by_category.write \
    .mode("overwrite") \
    .jdbc(url=ch_url, table="mart_revenue_by_category", properties=ch_props)

# Средний рейтинг и количество отзывов для каждого продукта
rating_reviews = dim_product.select(
    "product_id", "name", "rating", "reviews"
)
rating_reviews.write \
    .mode("overwrite") \
    .jdbc(url=ch_url, table="mart_product_rating_reviews", properties=ch_props)

# Витрина продаж по клиентам

In [21]:
sales_cust = fact_sales.join(dim_customer, "customer_id")

# Топ-10 клиентов с наибольшей общей суммой покупок
top10_customers = (sales_cust
    .groupBy("customer_id", "first_name", "last_name", "email", "country")
    .agg(F.sum("total_price").alias("total_spent"))
    .orderBy(F.desc("total_spent"))
    .limit(10)
)
top10_customers.write.mode("overwrite").jdbc(url=ch_url, table="mart_top10_customers", properties=ch_props)

# Распределение клиентов по странам
customers_by_country = (dim_customer
    .groupBy("country")
    .agg(F.countDistinct("customer_id").alias("customers_cnt"))
)
customers_by_country.write.mode("overwrite").jdbc(url=ch_url, table="mart_customers_by_country", properties=ch_props)

# Средний чек для каждого клиента
avg_check_customer = (sales_cust
    .groupBy("customer_id", "first_name", "last_name", "email")
    .agg(
        F.sum("total_price").alias("total_spent"),
        F.countDistinct("sales_id").alias("orders_cnt")
    )
    .withColumn("avg_check", F.col("total_spent") / F.col("orders_cnt"))
)
avg_check_customer.write.mode("overwrite").jdbc(url=ch_url, table="mart_avg_check_customer", properties=ch_props)

# Витрина продаж по времени

In [23]:
sales_time = (fact_sales
    .withColumn("year",  F.year("sale_date"))
    .withColumn("month", F.date_format("sale_date", "yyyy-MM"))
)

# Месячный тренд
monthly_trend = (sales_time
    .groupBy("month")
    .agg(
        F.sum("total_price").alias("revenue"),
        F.sum("quantity").alias("qty")
    )
)
monthly_trend.write.mode("overwrite").jdbc(url=ch_url, table="mart_monthly_trend", properties=ch_props)

# Годовой тренд
yearly_trend = (sales_time
    .groupBy("year")
    .agg(
        F.sum("total_price").alias("revenue"),
        F.sum("quantity").alias("qty")
    )
)
yearly_trend.write.mode("overwrite").jdbc(url=ch_url, table="mart_yearly_trend", properties=ch_props)

# Сравнение выручки за разные периоды. (Месяца текущего и прерыдущего года)
sales_with_periods = fact_sales \
    .withColumn("year", F.year("sale_date")) \
    .withColumn("month", F.month("sale_date")) \
    .withColumn("month_str", F.date_format("sale_date", "yyyy-MM"))

revenue_by_year_month = sales_with_periods \
    .groupBy("year", "month", "month_str") \
    .agg(F.sum("total_price").alias("revenue")) \
    .orderBy("year", "month")

current_year = revenue_by_year_month.alias("cy")
previous_year = revenue_by_year_month.alias("py")

yoy_comparison = current_year.join(
    previous_year,
    (F.col("cy.month") == F.col("py.month")) & 
    (F.col("cy.year") == F.col("py.year") + 1),
    "left"
).select(
    F.col("cy.year").alias("current_year"),
    F.col("cy.month").alias("month"),
    F.col("cy.month_str").alias("current_month"),
    F.col("cy.revenue").alias("current_revenue"),
    F.col("py.revenue").alias("previous_year_revenue"),
    (F.col("cy.revenue") - F.col("py.revenue")).alias("revenue_difference"),
    F.round(
        ((F.col("cy.revenue") - F.col("py.revenue")) / F.col("py.revenue") * 100),
        2
    ).alias("growth_percent")
).orderBy("current_year", "month")
yoy_comparison = yoy_comparison.fillna(0.0)

yoy_comparison.write \
    .mode("overwrite") \
    .jdbc(url=ch_url, table="mart_by_month_comparison", properties=ch_props)

# Средний размер заказа по месяцам
avg_order_month = (sales_time
    .groupBy("month")
    .agg(
        F.sum("total_price").alias("total_revenue"),
        F.countDistinct("sales_id").alias("orders_cnt")
    )
    .withColumn("avg_order_value", F.col("total_revenue") / F.col("orders_cnt"))
)
avg_order_month.write.mode("overwrite").jdbc(url=ch_url, table="mart_avg_order_month", properties=ch_props)

# Витрина продаж по магазинам

In [7]:
sales_store = fact_sales.join(dim_store, "store_id")

# Топ-5 магазинов с наибольшей выручкой
top5_stores = (sales_store
    .groupBy("store_id", "name", "city", "country")
    .agg(F.sum("total_price").alias("revenue"))
    .orderBy(F.desc("revenue"))
    .limit(5)
)
top5_stores.write.mode("overwrite").jdbc(url=ch_url, table="mart_top5_stores", properties=ch_props)

# Распределение продаж по городам и странам
sales_by_city_country = (sales_store
    .groupBy("city", "country")
    .agg(F.sum("total_price").alias("revenue"))
)
sales_by_city_country.write.mode("overwrite").jdbc(url=ch_url, table="mart_sales_by_city_country", properties=ch_props)

# Средний чек для каждого магазина
avg_check_store = (sales_store
    .groupBy("store_id", "name")
    .agg(
        F.sum("total_price").alias("total_revenue"),
        F.countDistinct("sales_id").alias("orders_cnt")
    )
    .withColumn("avg_check", F.col("total_revenue") / F.col("orders_cnt"))
)
avg_check_store.write.mode("overwrite").jdbc(url=ch_url, table="mart_avg_check_store", properties=ch_props)

# Витрина продаж по поставщикам

In [8]:
sales_supplier = fact_sales.join(dim_supplier, "supplier_id")

# Топ-5 поставщиков с наибольшей выручкой
top5_suppliers = (sales_supplier
    .groupBy("supplier_id", "name", "country")
    .agg(F.sum("total_price").alias("revenue"))
    .orderBy(F.desc("revenue"))
    .limit(5)
)
top5_suppliers.write.mode("overwrite").jdbc(url=ch_url, table="mart_top5_suppliers", properties=ch_props)

# Средняя цена товаров от каждого поставщика
avg_price_supplier = (sales_supplier
    .groupBy("supplier_id", "name")
    .agg(F.avg(F.col("total_price") / F.col("quantity")).alias("avg_unit_price"))
)
avg_price_supplier.write.mode("overwrite").jdbc(url=ch_url, table="mart_avg_price_supplier", properties=ch_props)

# Распределение продаж по странам поставщиков
sales_by_supplier_country = (sales_supplier
    .groupBy("country")
    .agg(F.sum("total_price").alias("revenue"))
)
sales_by_supplier_country.write.mode("overwrite").jdbc(url=ch_url, table="mart_sales_by_supplier_country", properties=ch_props)

# Витрина качества продукции

In [9]:
from pyspark.sql.functions import col

In [10]:
sales_prod = fact_sales.join(dim_product, "product_id")

#Продукты с наивысшим рейтингом
product_ratings_highest = dim_product.orderBy(col("rating").desc()) \
                    .limit(10) \
                    .select("product_id", "name", "rating", "reviews")

product_ratings_highest.write.mode("overwrite").jdbc(url=ch_url, table="mart_product_ratings_highest", properties=ch_props)

#Продукты с наименьшим рейтингом
product_ratings_lowest = dim_product.orderBy(col("rating").asc()) \
                    .limit(10) \
                    .select("product_id", "name", "rating", "reviews")

product_ratings_lowest.write.mode("overwrite").jdbc(url=ch_url, table="mart_product_ratings_lowest", properties=ch_props)

#Корреляция между рейтингом и объемом продаж
rating_sales_corr = (sales_prod
    .groupBy("product_id", "name", "rating")
    .agg(F.sum("quantity").alias("total_qty"))
)
rating_sales_corr.write.mode("overwrite").jdbc(url=ch_url, table="mart_rating_sales_corr", properties=ch_props)

#Продукты с наибольшим количеством отзывов
most_reviews = (dim_product
    .orderBy(F.desc("reviews"))
    .limit(100)
    .select("product_id", "name", "reviews", "rating")
)
most_reviews.write.mode("overwrite").jdbc(url=ch_url, table="mart_most_reviews", properties=ch_props)

In [11]:
spark.stop()