In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as spark_sum, avg, count, desc, month, year

spark = SparkSession.builder \
    .appName("Sales report to ClickHouse") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.2.27,com.clickhouse:clickhouse-jdbc:0.4.6") \
    .getOrCreate()



pg_url = "jdbc:postgresql://postgres:5432/lab2BDA"
pg_properties = {
    "user": "zloyaloha",
    "password": "12341234",
    "driver": "org.postgresql.Driver"
}

ch_url = "jdbc:clickhouse://clickhouse-server:8123/default"
ch_properties = {
    "driver": "com.clickhouse.jdbc.ClickHouseDriver",
    "user": "zloyaloha",
    "password": "12341234"
}

In [None]:
sales = spark.read.jdbc(pg_url, "f_sale", properties=pg_properties)
products = spark.read.jdbc(pg_url, "d_product", properties=pg_properties)
customers = spark.read.jdbc(pg_url, "d_customer", properties=pg_properties)
stores = spark.read.jdbc(pg_url, "d_store", properties=pg_properties)
suppliers = spark.read.jdbc(pg_url, "d_supplier", properties=pg_properties)
ratings = spark.read.jdbc(pg_url, "d_product_rating", properties=pg_properties)

In [None]:
top10_products = (
    sales
    .join(products.select("product_id", "name", "brand", "category"), on="product_id")
    .groupBy("name", "brand", "category")
    .agg(
        spark_sum("quantity").alias("total_quantity"),
        spark_sum("total_price").alias("total_revenue")
    )
    .orderBy(desc("total_quantity"))
    .limit(10)
)

top10_products.show()

In [None]:
revenue_by_category = (
    sales.join(products.select("product_id", "category"), "product_id")
    .groupBy("category")
    .agg(spark_sum("total_price").alias("total_revenue"))
)

revenue_by_category.show()

In [None]:
ratings_with_product = ratings.join(
    products.select("product_id", "name", "category", "brand"),
    on="product_id",
    how="inner"
)

avg_rating_reviews = ratings_with_product.groupBy("name", "category", "brand") \
    .agg(
        avg("rating").alias("avg_rating"),
        count("reviews").alias("review_count")
    )

avg_rating_reviews.show()

In [None]:
top10_products.write.jdbc(ch_url, "product_top10", mode="append", properties=ch_properties)
revenue_by_category.write.jdbc(ch_url, "revenue_by_category", mode="append", properties=ch_properties)
avg_rating_reviews.write.jdbc(ch_url, "avg_rating_reviews", mode="append", properties=ch_properties)

In [None]:
sales_with_customers = sales.join(customers, on="customer_id")

# Считаем топ-10 клиентов по сумме покупок
top10_customers = (
    sales_with_customers
    .groupBy("first_name", "last_name")
    .agg(spark_sum("total_price").alias("total_spent"))
    .orderBy(col("total_spent").desc())
    .limit(10)
)

top10_customers.show()


In [None]:
country_distribution = (
    customers
    .groupBy("country")
    .agg(count("customer_id").alias("customer_count"))
    .orderBy(col("customer_count").desc())
)

country_distribution.show()

In [None]:
avg_receipt = (
    sales_with_customers
    .groupBy("first_name", "last_name", "email")
    .agg(
        spark_sum("total_price").alias("total_spent"),
        count("sale_id").alias("purchase_count")
    )
    .withColumn("avg_receipt", col("total_spent") / col("purchase_count"))
)
avg_receipt.show()

In [None]:
top10_customers.write.jdbc(ch_url, "top10_customers", mode="append", properties=ch_properties)
country_distribution.write.jdbc(ch_url, "country_distribution", mode="append", properties=ch_properties)
avg_receipt.write.jdbc(ch_url, "avg_receipt", mode="append", properties=ch_properties)

In [None]:
from pyspark.sql.functions import year, month, to_date

sales = sales.withColumn("sale_date", to_date("sale_date"))
sales = sales.withColumn("year", year("sale_date"))
sales = sales.withColumn("month", month("sale_date"))

monthly_sales = (
    sales.groupBy("year", "month")
    .agg(spark_sum("total_price").alias("total_revenue"))
    .orderBy("year", "month")
)

monthly_sales.show()

In [None]:
yearly_sales = (
    sales.groupBy("year")
    .agg(spark_sum("total_price").alias("total_revenue"))
    .orderBy("year")
)

yearly_sales.show()

In [None]:
monthly_avg_order = (
    sales.groupBy("year", "month")
    .agg(
        count("*").alias("order_count"),
        spark_sum("total_price").alias("total_revenue"),
        avg("total_price").alias("avg_order_value")
    )
    .orderBy("year", "month")
)

monthly_avg_order.show()

In [None]:
monthly_sales.write.jdbc(ch_url, "monthly_sales", mode="append", properties=ch_properties)
yearly_sales.write.jdbc(ch_url, "yearly_sales", mode="append", properties=ch_properties)
monthly_avg_order.write.jdbc(ch_url, "monthly_avg_order", mode="append", properties=ch_properties)

In [None]:
sales_with_store = sales.join(stores, sales.store_id == stores.store_id, "inner")

top5_stores = (
    sales_with_store
    .groupBy("name", "location", "city")
    .agg(spark_sum("total_price").alias("total_revenue"))
    .orderBy(desc("total_revenue"))
    .limit(5)
)

top5_stores.show()

In [None]:
sales_by_location = (
    sales_with_store
    .groupBy("city", "country")
    .agg(spark_sum("total_price").alias("total_revenue"),
         count("sale_id").alias("total_sales"))
    .orderBy(desc("total_revenue"))
)

sales_by_location.show()

In [None]:
avg_check_per_store = (
    sales_with_store
    .groupBy("name", "location", "city")
    .agg(avg("total_price").alias("avg_receipt"))
    .orderBy(desc("avg_receipt"))
)

avg_check_per_store.show()

In [None]:
top5_stores.write.jdbc(ch_url, "top5_stores", mode="append", properties=ch_properties)
sales_by_location.write.jdbc(ch_url, "sales_by_location", mode="append", properties=ch_properties)
avg_check_per_store.write.jdbc(ch_url, "avg_check_per_store", mode="append", properties=ch_properties)

In [None]:
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

supplier_sales = sales.join(
    suppliers.select("supplier_id", "name", "contact", "email", "country"),
    on="supplier_id",
    how="left"
)

supplier_agg = supplier_sales.groupBy("supplier_id", "name", "contact", "email", "country").agg(
    spark_sum("total_price").alias("total_revenue"),
    avg(col("total_price") / col("quantity")).alias("avg_unit_price")
)

supplier_agg.show()

In [None]:
window_spec = Window.orderBy(col("total_revenue").desc())

top5_suppliers = supplier_agg.withColumn("rank", row_number().over(window_spec)) \
                            .filter(col("rank") <= 5) \
                            .drop("rank")
top5_suppliers = top5_suppliers.select("supplier_id", "name", "contact", "email", "total_revenue")
top5_suppliers.show()

In [None]:
sales_by_country = supplier_agg.groupBy("country").agg(
    spark_sum("total_revenue").alias("total_revenue_by_country")
)

sales_by_country.show()

In [None]:
supplier_agg.write.jdbc(ch_url, "avg_unit_price_per_supplier", mode="append", properties=ch_properties)
top5_suppliers.write.jdbc(ch_url, "top5_suppliers", mode="append", properties=ch_properties)
sales_by_country.write.jdbc(ch_url, "sales_by_country", mode="append", properties=ch_properties)

In [None]:
reviews_enriched = ratings.join(
    products.select("product_id", "name", "brand", "category"),
    on="product_id",
    how="left"
)

sales_enriched = sales.join(
    products.select("product_id", "name", "brand", "category"),
    on="product_id",
    how="left"
)

In [None]:
review_metrics = reviews_enriched.groupBy("name", "brand", "category").agg(
    avg("rating").alias("avg_rating"),
    count("*").alias("review_count")
)

sales_metrics = sales_enriched.groupBy("name", "brand", "category").agg(
    spark_sum("quantity").alias("total_quantity_sold")
)

product_quality = review_metrics.join(
    sales_metrics,
    on=["name", "brand", "category"],
    how="outer"
)

top_rated = product_quality.orderBy(col("avg_rating").desc()).limit(5)
lowest_rated = product_quality.orderBy(col("avg_rating").asc()).limit(5)

top_rated.show()
lowest_rated.show()

In [None]:
from pyspark.sql import Row

correlation = product_quality.stat.corr("avg_rating", "total_quantity_sold")
corr_df = spark.createDataFrame([Row(correlation=correlation)])
corr_df.show()

In [None]:
most_reviewed = product_quality.orderBy(col("review_count").desc()).limit(5)
least_reviewed = product_quality.orderBy(col("review_count").asc_nulls_last()).limit(5)

most_reviewed.show()
least_reviewed.show()

In [None]:
most_reviewed.write.jdbc(ch_url, "most_reviewed", mode="append", properties=ch_properties)
least_reviewed.write.jdbc(ch_url, "least_reviewed", mode="append", properties=ch_properties)
corr_df.write.jdbc(ch_url, "correlation", mode="append", properties=ch_properties)
lowest_rated.write.jdbc(ch_url, "lowest_rated", mode="append", properties=ch_properties)
top_rated.write.jdbc(ch_url, "top_rated", mode="append", properties=ch_properties)