In [24]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ETL_Pipeline") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.2.27") \
    .getOrCreate()

jdbc_url = "jdbc:postgresql://host.docker.internal:5432/postgres"
properties = {
    "user": "postgres",
    "password": "Rbkkth3920",
    "driver": "org.postgresql.Driver"
}

df = spark.read.jdbc(url=jdbc_url, table="mock_data", properties=properties)
df.show(4)

+-------------------+------------------+------------+--------------------+------------------+--------------------+-----------------+-----------------+------------------+-----------------+----------------+--------------------+--------------+------------------+------------+----------------+-------------+----------------+---------+-------------+----------------+----------+--------------+-------------------+-----------+-------------+------------+--------------------+------------+--------------+-------------+------------+-------------+----------------+--------------------+--------------+---------------+--------------------+-------------------+-------------+------------------+--------------------+--------------+----------------+-------------+----------------+----+----------------+--------------+---------------+
|customer_first_name|customer_last_name|customer_age|      customer_email|  customer_country|customer_postal_code|customer_pet_type|customer_pet_name|customer_pet_breed|seller_first_n

In [25]:
dim_customer = df.select(
    df.sale_customer_id.alias("customer_id"),
    df.customer_first_name.alias("first_name"),
    df.customer_last_name.alias("last_name"),
    df.customer_age.alias("age"),
    df.customer_email.alias("email"),
    df.customer_country.alias("country_name"),
    df.customer_postal_code.alias("postal_code")
).dropDuplicates(["customer_id"])

dim_customer.show(10)


+-----------+----------+---------+---+--------------------+------------+-----------+
|customer_id|first_name|last_name|age|               email|country_name|postal_code|
+-----------+----------+---------+---+--------------------+------------+-----------+
|          1|    Ardith|    Haack| 69|fjagglikl@trellia...|   Indonesia|           |
|          2|     Julie|    Leete| 45|cfeehan9i@instagr...|       China|           |
|          3|    Selene|  Le Page| 83|msheplandcu@yello...|       Egypt|           |
|          4|     Ibbie|  Hannent| 53|pclowneydd@netvib...|       China|           |
|          5|  Tarrance|   Panons| 44|kweldonr0@creativ...|      Poland|     74-120|
|          6|    Shanda|  Monkley| 47|mstickensju@delic...|     Comoros|           |
|          7|   Myrlene|  Fillery| 76|   abartlema@pbs.org|        Peru|           |
|          8|     Costa|    Happs| 42| ppalkh0@storify.com|       China|           |
|          9|       Dev|   McLese| 71| jlutzo0@4shared.com| Nethe

In [26]:
dim_seller = df.select(
    df.sale_seller_id.alias("seller_id"),  
    df.seller_first_name.alias("first_name"),
    df.seller_last_name.alias("last_name"),
    df.seller_email.alias("email"),
    df.seller_country.alias("country_name"),
    df.seller_postal_code.alias("postal_code")
).distinct()

dim_seller.show(10)


+---------+----------+---------+--------------------+------------+-----------+
|seller_id|first_name|last_name|               email|country_name|postal_code|
+---------+----------+---------+--------------------+------------+-----------+
|      960|   Kirsten|   Grouse|    kgrouse9@tiny.cc|       Japan|   990-0711|
|     6753|    Tanney|   Robker|trobkerkd@columbi...|    Honduras|           |
|     1504|      Erin| Bradwell|ebradwelldn@fastc...|   Indonesia|           |
|      458|     Pavia|Guerreiro|pguerreirocm@scri...|    Portugal|   4830-433|
|      556|   Steward|  Wayvill| swayvillfb@time.com|      Russia|     618593|
|      603|   Chloris|  Ostrich|costrichgn@devhub...|   Argentina|       5442|
|      707|       Lee|     Shah|lshahji@sciencedi...| Philippines|       9611|
|     1211|     Naomi|  Postill| npostill5i@ucoz.com|       China|           |
|     1460|      Tris|    Czaja|tczajacg@comsenz.com|     Belarus|           |
|     1467|    Morgen|  Himpson| mhimpsoncn@narod.ru

In [27]:
dim_product = df.select(
    df.sale_product_id.alias("product_id"),
    df.product_name.alias("name"),
    df.product_category.alias("category_name"),
    df.product_price.alias("price"),
    df.product_quantity.alias("quantity"),
    df.product_weight.alias("weight"),
    df.product_brand.alias("brand_name"),
    df.product_description.alias("description"),
    df.product_rating.alias("rating"),
    df.product_reviews.alias("reviews"),
    df.product_release_date.alias("release_date"),
    df.product_expiry_date.alias("expiry_date")
).distinct()

dim_product.show(10)


+----------+---------+-------------+-----+--------+------+----------+--------------------+------+-------+------------+-----------+
|product_id|     name|category_name|price|quantity|weight|brand_name|         description|rating|reviews|release_date|expiry_date|
+----------+---------+-------------+-----+--------+------+----------+--------------------+------+-------+------------+-----------+
|        27|Bird Cage|         Food|32.07|       1|  34.7|     Quatz|Nullam sit amet t...|   4.1|    924|  12/24/2017|  7/11/2030|
|       361|Bird Cage|          Toy|47.73|      44|  17.8|     Skiba|Sed sagittis. Nam...|   4.6|    874|    6/5/2022|   7/9/2030|
|       495|  Cat Toy|         Food| 3.61|      37|  32.0|Shuffletag|Fusce consequat. ...|   1.5|    855|    7/5/2011|  4/30/2025|
|       953| Dog Food|         Food|28.08|      69|  34.8|   Meembee|Proin leo odio, p...|   2.6|    798|    1/2/2016| 10/10/2026|
|      1174|Bird Cage|         Cage|34.86|      69|  32.7|      Kazu|Praesent bland

In [28]:
from pyspark.sql import functions as F

dim_supplier = df.select(
    df.supplier_name.alias("name"),
    df.supplier_contact.alias("contact"),
    df.supplier_email.alias("email"),
    df.supplier_phone.alias("phone"),
    df.supplier_address.alias("address"),
    df.supplier_city.alias("city"),
    df.supplier_country.alias("country_name")
).distinct().withColumn("supplier_id", F.monotonically_increasing_id())

dim_supplier.show(10)


+----------+--------------------+--------------------+------------+------------+---------------+--------------+-----------+
|      name|             contact|               email|       phone|     address|           city|  country_name|supplier_id|
+----------+--------------------+--------------------+------------+------------+---------------+--------------+-----------+
|  Feedspan|Massimiliano Burt...|mburtwhistle5m@ho...|814-659-8974|    Room 376|       Tønsberg|      Honduras|          0|
|  Edgeblab|     Binni Gristwood|bgristwoodmy@hous...|855-220-8231|PO Box 95444|        Chonchi|Czech Republic|          1|
|Divanoodle|    Rustin Wetherald|rwetherald5k@wire...|705-797-7921|    Suite 22|       Mandiana| United States|          2|
|    Wikivu|      Franni MacAree|fmacareec7@homest...|784-851-8096|PO Box 49983|        Hongmei|         China|          3|
|      Jayo|    Vannie Skitterel|vskitterelmz@spri...|460-844-3280|   Room 1988|      Três Rios|         China|          4|
|   Oyon

In [29]:
dim_supplier.select("*").show(5, truncate=False)

+----------+------------------------+----------------------------+------------+------------+---------+--------------+-----------+
|name      |contact                 |email                       |phone       |address     |city     |country_name  |supplier_id|
+----------+------------------------+----------------------------+------------+------------+---------+--------------+-----------+
|Feedspan  |Massimiliano Burtwhistle|mburtwhistle5m@homestead.com|814-659-8974|Room 376    |Tønsberg |Honduras      |0          |
|Edgeblab  |Binni Gristwood         |bgristwoodmy@house.gov      |855-220-8231|PO Box 95444|Chonchi  |Czech Republic|1          |
|Divanoodle|Rustin Wetherald        |rwetherald5k@wired.com      |705-797-7921|Suite 22    |Mandiana |United States |2          |
|Wikivu    |Franni MacAree          |fmacareec7@homestead.com    |784-851-8096|PO Box 49983|Hongmei  |China         |3          |
|Jayo      |Vannie Skitterel        |vskitterelmz@springer.com   |460-844-3280|Room 1988  

In [30]:
from pyspark.sql import functions as F

dim_store = df.select(
    F.col("store_name").alias("store_name"),
    F.col("store_location").alias("location"),
    F.col("store_city").alias("city"),
    F.col("store_state").alias("state"),
    F.col("store_country").alias("country_name"),
    F.col("store_phone").alias("phone"),
    F.col("store_email").alias("email")
).distinct().withColumn("store_id", F.monotonically_increasing_id())


dim_store.show(10)


+------------+------------+---------------+-----+--------------------+------------+--------------------+--------+
|  store_name|    location|           city|state|        country_name|       phone|               email|store_id|
+------------+------------+---------------+-----+--------------------+------------+--------------------+--------+
|    Topdrive|  20th Floor|    Loma Bonita|  COA|             Myanmar|726-607-2362|mletessier9h@sees...|       0|
|    Edgeblab|  20th Floor|     Haz-Zebbug|     |            Ethiopia|872-702-2152|  cdeh7@virginia.edu|       1|
|  Flashpoint|    Suite 58|         Błędów|     |               China|955-466-2242|dkilleenj4@mozill...|       2|
|        Katz|    Suite 96|        Dongjia|     |      Czech Republic|160-526-6710| klauganu@oakley.com|       3|
|      Roodel|    Apt 1379|         Potoru|     |              Canada|291-585-2726|chewkinqn@indiati...|       4|
|Thoughtworks|    Suite 91|Lagny-sur-Marne|   A8|              Poland|988-582-6510| lswa

In [31]:
dim_customer_pet = df.select(
    df.sale_customer_id.alias("customer_id"),
    df.customer_pet_name.alias("pet_name"),
    df.customer_pet_type.alias("pet_type_name"),
    df.customer_pet_breed.alias("pet_breed_name"),
    df.pet_category.alias("pet_category_name")
).distinct()

dim_customer_pet.show(10)


+-----------+--------+-------------+------------------+-----------------+
|customer_id|pet_name|pet_type_name|    pet_breed_name|pet_category_name|
+-----------+--------+-------------+------------------+-----------------+
|        135|Henrieta|          dog|          Parakeet|             Fish|
|        219|  Alysia|         bird|           Siamese|             Cats|
|       1349|   Grace|         bird|           Siamese|             Dogs|
|       1808|Ludovika|          cat|          Parakeet|             Fish|
|       1814|  Nolana|         bird|           Siamese|             Fish|
|       2147|    Ulla|         bird|Labrador Retriever|             Fish|
|       2191| Maureen|          cat|          Parakeet|             Fish|
|       2274|   Meggi|          cat|Labrador Retriever|         Reptiles|
|       2797|  Enrica|         bird|           Siamese|         Reptiles|
|       2941|   Jason|          cat|           Siamese|            Birds|
+-----------+--------+-------------+--

In [32]:
from pyspark.sql import functions as F

fact_sales = df.select(
    df.sale_customer_id.alias("customer_id"),
    df.sale_seller_id.alias("seller_id"),
    df.sale_product_id.alias("product_id"),
    df.id.alias("supplier_id"),       
    df.store_name.alias("store_name"), 
    df.sale_date.alias("sale_date"),
    df.sale_quantity.alias("quantity"),
    df.sale_total_price.alias("total_price")
).distinct().withColumn("sale_id", F.monotonically_increasing_id())

fact_sales.show(10)


+-----------+---------+----------+-----------+----------+----------+--------+-----------+-------+
|customer_id|seller_id|product_id|supplier_id|store_name| sale_date|quantity|total_price|sale_id|
+-----------+---------+----------+-----------+----------+----------+--------+-----------+-------+
|        468|      454|       496|        454|    Wikido| 11/1/2021|       2|     344.19|      0|
|        442|      428|       470|        429|  Kanoodle| 1/13/2021|       6|     327.28|      1|
|       1664|     1658|      1698|       1651|   Pixoboo| 12/3/2021|       6|      39.17|      2|
|       2008|     9846|      2038|       1994|     Oyoyo| 10/6/2021|       2|     453.37|      3|
|       2103|     2099|      2137|       2090|   Youopia| 6/19/2021|       3|     472.31|      4|
|       2833|     2829|      2867|       2818|     Voomm|12/10/2021|       3|     346.03|      5|
|       2982|     2978|      3016|       2968|     Npath|  8/4/2021|       4|     188.28|      6|
|       3001|     29

In [33]:
from pyspark.sql import functions as F

dim_supplier_temp = dim_supplier
df_raw = spark.read.jdbc(url=jdbc_url, table="mock_data", properties=properties)

dim_store = df_raw.select("store_name").distinct() \
    .withColumn("store_id", F.monotonically_increasing_id())

dim_store.show()

dim_supplier_temp = df_raw.select("supplier_name", "supplier_email").distinct() \
    .withColumn("supplier_id", F.monotonically_increasing_id())

fact_sales = df_raw.join(
    dim_supplier_temp,
    (df_raw.supplier_name == dim_supplier_temp.supplier_name) & (df_raw.supplier_email == dim_supplier_temp.supplier_email),
    "left"
).join(
    dim_store,
    df_raw.store_name == dim_store.store_name,
    "left"
).select(
    df_raw.sale_customer_id.alias("customer_id"),
    df_raw.sale_seller_id.alias("seller_id"),
    df_raw.sale_product_id.alias("product_id"),
    dim_supplier_temp.supplier_id,
    dim_store.store_id,
    df_raw.sale_date.alias("sale_date"),
    df_raw.sale_quantity.alias("quantity"),
    df_raw.sale_total_price.alias("total_price")
).distinct() \
 .withColumn("sale_id", F.monotonically_increasing_id())

fact_sales.show(10)


+------------+--------+
|  store_name|store_id|
+------------+--------+
|     Jetwire|       0|
|    Jaxworks|       1|
|   Reallinks|       2|
|    Snaptags|       3|
| Brainlounge|       4|
|    Feedfish|       5|
|    Skipfire|       6|
|       Kamba|       7|
|      Quimba|       8|
|       Quaxo|       9|
|      Oyondu|      10|
|    Realfire|      11|
|Thoughtworks|      12|
|  Browsezoom|      13|
|      BlogXS|      14|
|     Voonder|      15|
|   Babbleset|      16|
|       Yabox|      17|
|       Einti|      18|
|   Photofeed|      19|
+------------+--------+
only showing top 20 rows

+-----------+---------+----------+-----------+--------+----------+--------+-----------+-------+
|customer_id|seller_id|product_id|supplier_id|store_id| sale_date|quantity|total_price|sale_id|
+-----------+---------+----------+-----------+--------+----------+--------+-----------+-------+
|       8742|     8763|      8784|        336|     374|12/28/2021|       2|     119.77|      0|
|       5759| 

In [91]:
jdbc_url = "jdbc:postgresql://host.docker.internal:5432/postgres"
properties = {
    "user": "postgres",
    "password": "Rbkkth3920",
    "driver": "org.postgresql.Driver"
}
dim_customer.write.jdbc(
    url=jdbc_url,
    table="dim_customer",
    mode="append",
    properties=properties
)

dim_customer_pet.write.jdbc(
    url=jdbc_url,
    table="dim_customer_pet",
    mode="append",
    properties=properties
)

dim_seller.write.jdbc(
    url=jdbc_url,
    table="dim_seller",
    mode="append",
    properties=properties
)

dim_product.write.jdbc(
    url=jdbc_url,
    table="dim_product",
    mode="append",
    properties=properties
)

dim_supplier = dim_supplier.withColumnRenamed("supplier_name", "name")
dim_supplier = dim_supplier.withColumnRenamed("supplier_email", "email")

dim_supplier.write.jdbc(
    url=jdbc_url,
    table="dim_supplier",
    mode="append",
    properties=properties
)

dim_store = dim_store.withColumnRenamed("store_name", "name")

dim_store.write.jdbc(
    url=jdbc_url,
    table="dim_store",
    mode="append",
    properties=properties
)

fact_sales.write.jdbc(
    url=jdbc_url,
    table="fact_sales",
    mode="append",
    properties=properties
)

# ClickHouse

In [34]:
clickhouse_url = "jdbc:clickhouse://clickhouse:8123/clickhouse"

clickhouse_props = {
    "driver": "com.clickhouse.jdbc.ClickHouseDriver",
    "user": "clickhouse",
    "password": "Rbkkth3920",
}

In [35]:
!curl http://clickhouse:8123

Ok.


# 1. Витрина продаж по продуктам
## Цель: Анализ выручки, количества продаж и популярности продуктов.

# Топ-10 самых продаваемых продуктов.

In [36]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

top_10_products = fact_sales.groupBy("product_id") \
    .agg(
        F.sum("quantity").alias("total_quantity"),
        F.sum("total_price").alias("total_revenue")
    )

top_10_products = top_10_products.join(
    dim_product.select(
        "product_id",
        "name",           
        "category_name",  
        "price",          
        "quantity",      
        "weight",       
        "brand_name",     
        "description",    
        "rating",       
        "reviews",       
        "release_date",   
        "expiry_date"     
    ),
    on="product_id",
    how="left"
)

top_10_products = top_10_products.orderBy(
    F.desc("total_quantity"),
    F.desc("total_revenue")
).limit(10)

window_spec = Window.orderBy(
    F.desc("total_quantity"),
    F.desc("total_revenue")
)
top_10_products = top_10_products.withColumn(
    "rank",
    F.row_number().over(window_spec)
)

top_10_products.show(10)


+----------+--------------+------------------+---------+-------------+-----+--------+------+------------+--------------------+------+-------+------------+-----------+----+
|product_id|total_quantity|     total_revenue|     name|category_name|price|quantity|weight|  brand_name|         description|rating|reviews|release_date|expiry_date|rank|
+----------+--------------+------------------+---------+-------------+-----+--------+------+------------+--------------------+------+-------+------------+-----------+----+
|      8668|            10| 499.7300109863281|  Cat Toy|          Toy|58.24|      40|  24.3|       Lazzy|Vestibulum quam s...|   1.6|    290|    4/2/2017|  2/24/2030|   1|
|      3601|            10| 499.5899963378906| Dog Food|          Toy| 6.89|      87|  18.6|    Realblab|Praesent id massa...|   1.9|    769|   1/20/2012|  4/18/2028|   2|
|      1535|            10| 498.9599914550781|Bird Cage|          Toy|55.78|      38|  39.2|   Browsebug|Duis bibendum. Mo...|   3.3|    842

In [37]:
top_10_products.write \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY (rank)") \
    .mode("overwrite") \
    .jdbc(url=clickhouse_url, table="top_10_products", properties=clickhouse_props)

# Общая выручка по категориям продуктов

In [38]:
from pyspark.sql import functions as F

total_revenue_by_category = fact_sales.join(
    dim_product.select("product_id", "category_name"),
    "product_id"
).groupBy("category_name").agg(
    F.sum("total_price").alias("total_revenue")
)

total_revenue_by_category.show(5)

+-------------+-----------------+
|category_name|    total_revenue|
+-------------+-----------------+
|         Cage|831117.9398345947|
|         Food|830632.5497875214|
|          Toy|868101.6302814484|
+-------------+-----------------+



In [39]:
clickhouse_url = "jdbc:clickhouse://clickhouse:8123/clickhouse"

clickhouse_props = {
    "driver": "com.clickhouse.jdbc.ClickHouseDriver",
    "user": "clickhouse",
    "password": "Rbkkth3920",
}

total_revenue_by_category.write \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY (total_revenue)") \
    .mode("overwrite") \
    .jdbc(url=clickhouse_url, table="total_revenue_by_category", properties=clickhouse_props)

# Средний рейтинг и количество отзывов для каждого продукта

In [40]:
from pyspark.sql import functions as F

avg_product_rating = dim_product.groupBy('name').agg(
    F.avg('rating').alias('avg_rating'),
    F.sum('reviews').alias('total_reviews')
)

avg_product_rating.show(5)


+---------+------------------+-------------+
|     name|        avg_rating|total_reviews|
+---------+------------------+-------------+
|Bird Cage|3.0001491633975137|      1682260|
| Dog Food| 3.018298893783149|      1653413|
|  Cat Toy| 3.006860081398008|      1676222|
+---------+------------------+-------------+



In [41]:
avg_product_rating.write \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY (name)") \
    .mode("overwrite") \
    .jdbc(url=clickhouse_url, table="avg_product_rating", properties=clickhouse_props)

# 2. Витрина продаж по клиентам

## Цель: Анализ покупательского поведения и сегментация клиентов.

# Топ-10 клиентов с наибольшей общей суммой покупок

In [42]:
from pyspark.sql import functions as F

top_10_customers = fact_sales.join(dim_customer, "customer_id") \
    .groupBy("customer_id", "first_name", "last_name") \
    .agg(F.sum("total_price").alias("total_spent")) \
    .orderBy(F.desc("total_spent")) \
    .limit(10)

top_10_customers.show(5)

top_10_customers.write \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY (total_spent)") \
    .mode("overwrite") \
    .jdbc(url=clickhouse_url, table="top_10_customers", properties=clickhouse_props)


+-----------+----------+---------+------------------+
|customer_id|first_name|last_name|       total_spent|
+-----------+----------+---------+------------------+
|       4206|       Gus|Hartshorn| 499.8500061035156|
|       6436|     Hayes|   McKain|499.79998779296875|
|       6371|       Ava|    Lomas|  499.760009765625|
|       5932|     Dawna|    Impey|  499.760009765625|
|       8628|   Lavinia|Horsburgh| 499.7300109863281|
+-----------+----------+---------+------------------+
only showing top 5 rows



# Распределение клиентов по странам

In [43]:
from pyspark.sql import functions as F

clients_by_country = dim_customer.groupBy("country_name") \
    .agg(F.count("customer_id").alias("num_customers"))

clients_by_country.show(5)

clients_by_country.write \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY (country_name)") \
    .mode("overwrite") \
    .jdbc(url=clickhouse_url, table="clients_by_country", properties=clickhouse_props)


+-------------------+-------------+
|       country_name|num_customers|
+-------------------+-------------+
|               Chad|            5|
|             Russia|          628|
|           Paraguay|           18|
|              Yemen|           39|
|U.S. Virgin Islands|            1|
+-------------------+-------------+
only showing top 5 rows



# Средний чек для каждого клиента

In [44]:
from pyspark.sql import functions as F

avg_receipt_per_customer = fact_sales.join(dim_customer, "customer_id") \
    .groupBy("customer_id", "first_name", "last_name") \
    .agg(F.avg("total_price").alias("avg_receipt"))

avg_receipt_per_customer.show(5)

avg_receipt_per_customer.write \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY (customer_id)") \
    .mode("overwrite") \
    .jdbc(url=clickhouse_url, table="avg_receipt_per_customer", properties=clickhouse_props)


+-----------+----------+---------+------------------+
|customer_id|first_name|last_name|       avg_receipt|
+-----------+----------+---------+------------------+
|       5313|  Ethelred|   Briton| 380.8800048828125|
|       6865|     Ethan|Phillipps| 54.59000015258789|
|       3310|      Rene|   Harden| 340.4599914550781|
|       1733|    Belvia|     Earl|246.44000244140625|
|       5838|     Davey|     Jurs| 479.6300048828125|
+-----------+----------+---------+------------------+
only showing top 5 rows



# 3. Витрина продаж по времени

## Цель: Анализ сезонности и трендов продаж.

# Месячные и годовые тренды продаж

In [45]:
from pyspark.sql import functions as F

fact_sales = fact_sales.withColumn(
    "sale_date_parsed",
    F.to_date("sale_date", "M/d/yyyy")
)

time_trends = fact_sales.withColumn("year", F.year("sale_date_parsed")) \
    .withColumn("month", F.month("sale_date_parsed"))

time_trends = time_trends.filter(F.col("sale_date_parsed").isNotNull())

time_trends = time_trends.groupBy("year", "month") \
    .agg(
        F.sum("total_price").alias("monthly_revenue"),
        F.count("sale_id").alias("orders_count")
    )

from pyspark.sql.window import Window
window_spec = Window.partitionBy("year")
time_trends = time_trends.withColumn(
    "yearly_revenue",
    F.sum("monthly_revenue").over(window_spec)
)

time_trends.show(5)

time_trends.write \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY (year, month)") \
    .mode("overwrite") \
    .jdbc(url=clickhouse_url, table="time_trends", properties=clickhouse_props)


+----+-----+------------------+------------+------------------+
|year|month|   monthly_revenue|orders_count|    yearly_revenue|
+----+-----+------------------+------------+------------------+
|2021|    8| 221275.7799243927|         897|2529852.1199035645|
|2021|    6| 215042.7997379303|         822|2529852.1199035645|
|2021|    5|211764.86022472382|         828|2529852.1199035645|
|2021|   10| 228743.3197774887|         892|2529852.1199035645|
|2021|   11|200154.69006824493|         801|2529852.1199035645|
+----+-----+------------------+------------+------------------+
only showing top 5 rows



In [46]:
from pyspark.sql import functions as F

cleaned_sales = fact_sales.withColumn(
    "sale_date_parsed",
    F.to_date(F.col("sale_date"), "M/d/yyyy")
).filter(F.col("sale_date_parsed").isNotNull())

yearly_revenue = cleaned_sales.withColumn("year", F.year("sale_date_parsed")) \
    .groupBy("year") \
    .agg(F.sum("total_price").alias("total_revenue"))

yearly_revenue.show(5)

yearly_revenue.write \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY (year)") \
    .mode("overwrite") \
    .jdbc(url=clickhouse_url, table="yearly_revenue", properties=clickhouse_props)


+----+------------------+
|year|     total_revenue|
+----+------------------+
|2021|2529852.1199035645|
+----+------------------+



# Средний размер заказа по месяцам

In [47]:
from pyspark.sql import functions as F

fact_sales = fact_sales.withColumn(
    "sale_date_parsed",
    F.to_date("sale_date", "M/d/yyyy")
).filter(F.col("sale_date_parsed").isNotNull())

avg_order_monthly = fact_sales.withColumn("month", F.month("sale_date_parsed")) \
    .groupBy("month") \
    .agg(F.avg("quantity").alias("avg_quantity"))

avg_order_monthly.show(5)

avg_order_monthly.write \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY (month)") \
    .mode("overwrite") \
    .jdbc(url=clickhouse_url, table="avg_order_monthly", properties=clickhouse_props)


+-----+------------------+
|month|      avg_quantity|
+-----+------------------+
|   12|  5.62987012987013|
|    1| 5.556064073226545|
|    6| 5.399026763990268|
|    3|5.4104389086595495|
|    5| 5.375603864734299|
+-----+------------------+
only showing top 5 rows



# 4. Витрина продаж по магазинам
# Цель: Анализ эффективности магазинов.

# Топ-5 магазинов с наибольшей выручкой

In [48]:
from pyspark.sql import functions as F

dim_store_renamed = dim_store.withColumnRenamed("store_name", "name")

top_5_stores = (
    fact_sales
      .join(dim_store_renamed, "store_id")              
      .groupBy("store_id", "name")                     
      .agg(F.sum("total_price").alias("total_revenue"))  
      .orderBy(F.desc("total_revenue"))                
      .limit(5)                                         
)

top_5_stores.show(5)

top_5_stores.write \
    .mode("overwrite") \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY (total_revenue)") \
    .jdbc(
        url=clickhouse_url,
        table="top_5_stores",
        properties=clickhouse_props
    )


+--------+--------+------------------+
|store_id|    name|     total_revenue|
+--------+--------+------------------+
|     200|   Mynte|15751.709976196289|
|     176|   Quatz|15176.640044212341|
|     217|    Jayo|13976.010055541992|
|     212|   Quinu|13952.879922866821|
|     340|Realcube|13700.770052909851|
+--------+--------+------------------+



# Распределение продаж по городам и странам

In [49]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

dim_store_loc = df.select(
    F.monotonically_increasing_id().alias("store_id"),
    F.col("store_city").alias("city"),
    F.col("store_country").alias("country_name")
).distinct()

sales_by_location = (
    fact_sales
    .join(dim_store_loc, "store_id")
    .groupBy("city", "country_name")
    .agg(F.sum("total_price").alias("city_revenue"))
    .withColumn("country_revenue", F.sum("city_revenue").over(Window.partitionBy("country_name")))
)

sales_by_location.show(5)

sales_by_location.write \
    .mode("overwrite") \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY (country_name, city)") \
    .jdbc(url=clickhouse_url, table="sales_by_location", properties=clickhouse_props)


+-------------+------------+------------------+------------------+
|         city|country_name|      city_revenue|   country_revenue|
+-------------+------------+------------------+------------------+
|Sumberngerjat| Afghanistan|7717.1599197387695|7717.1599197387695|
|      Słubice|     Albania| 6181.639970779419| 6181.639970779419|
|     Västerås|   Argentina| 5358.200012207031|32627.120027542114|
|      Xilanqi|   Argentina| 5585.280012130737|32627.120027542114|
|    Sacapulas|   Argentina| 6575.219997406006|32627.120027542114|
+-------------+------------+------------------+------------------+
only showing top 5 rows



# Средний чек для каждого магазина

In [50]:
from pyspark.sql import functions as F

avg_receipt_per_store = (
    fact_sales
    .join(dim_store.select("store_id", "store_name"), "store_id")
    .groupBy("store_id", "store_name")
    .agg(F.avg("total_price").alias("avg_receipt"))
)

avg_receipt_per_store.show(5)

avg_receipt_per_store.write \
    .mode("overwrite") \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY (store_id)") \
    .jdbc(url=clickhouse_url, table="avg_receipt_per_store", properties=clickhouse_props)


+--------+----------+------------------+
|store_id|store_name|       avg_receipt|
+--------+----------+------------------+
|      27|    JumpXS|257.14406329393387|
|     108|  Photobug|235.87957483656862|
|     226|    Voolia|256.01947362799393|
|     267|    Rhynyx| 275.6128542763846|
|     355|     Aimbo| 264.4278566496713|
+--------+----------+------------------+
only showing top 5 rows



# 5. Витрина продаж по поставщикам
## Цель: Анализ эффективности поставщиков.

# Топ-5 поставщиков с наибольшей выручкой

In [52]:
from pyspark.sql import functions as F

top_5_suppliers = (
    fact_sales
    .join(dim_supplier.select("supplier_id", "name"), "supplier_id")
    .groupBy("supplier_id", "name")
    .agg(F.sum("total_price").alias("total_revenue"))
    .orderBy(F.desc("total_revenue"))
    .limit(5)
)

top_5_suppliers.show(5)

top_5_suppliers.write \
    .mode("overwrite") \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY (total_revenue)") \
    .jdbc(url=clickhouse_url, table="top_5_suppliers", properties=clickhouse_props)


+-----------+----------+------------------+
|supplier_id|      name|     total_revenue|
+-----------+----------+------------------+
|       9251|   Blogtag| 499.8500061035156|
|       3482| Flashspan|499.79998779296875|
|        803|    Skyndu|  499.760009765625|
|       9416|Photospace|  499.760009765625|
|       7725|  Gigazoom| 499.7300109863281|
+-----------+----------+------------------+



# Средняя цена товаров от каждого поставщика

In [54]:
from pyspark.sql import functions as F

fact = fact_sales.alias("f")
product = dim_product.alias("p")
supplier = dim_supplier.alias("s")

avg_price_per_supplier = fact \
    .join(product, F.col("p.product_id") == F.col("f.product_id")) \
    .join(supplier, F.col("s.supplier_id") == F.col("f.supplier_id")) \
    .groupBy(F.col("s.supplier_id"), F.col("s.name").alias("supplier_name")) \
    .agg(F.avg(F.col("p.price")).alias("avg_price"))

avg_price_per_supplier.show(5)

avg_price_per_supplier.write \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY (supplier_id)") \
    .mode("overwrite") \
    .jdbc(url=clickhouse_url, table="avg_price_per_supplier", properties=clickhouse_props)


+-----------+-------------+------------------+
|supplier_id|supplier_name|         avg_price|
+-----------+-------------+------------------+
|       4838|        Einti|11.329999923706055|
|       3748|     Zoombeat|60.369998931884766|
|       5672|     Photobug| 63.11000061035156|
|       4924|       Yakijo|3.4200000762939453|
|       2158|        Voomm| 6.940000057220459|
+-----------+-------------+------------------+
only showing top 5 rows



# Распределение продаж по странам поставщиков

In [55]:
from pyspark.sql import functions as F

sales_by_supplier_country = fact_sales \
    .join(dim_supplier, "supplier_id") \
    .groupBy("country_name") \
    .agg(F.sum("total_price").alias("total_revenue"))

sales_by_supplier_country.show(5)

sales_by_supplier_country.write \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY (country_name)") \
    .mode("overwrite") \
    .jdbc(url=clickhouse_url, table="sales_by_supplier_country", properties=clickhouse_props)


+------------+------------------+
|country_name|     total_revenue|
+------------+------------------+
|        Chad| 1694.580008506775|
|      Russia|  149108.400100708|
|    Paraguay| 3071.669984817505|
|       Yemen|10493.579959869385|
|     Senegal|2279.1099996566772|
+------------+------------------+
only showing top 5 rows



# 6. Витрина качества продукции
## Цель: Анализ отзывов и рейтингов товаров.

# Продукты с наивысшим и наименьшим рейтингом

In [55]:
from pyspark.sql import functions as F

# Выборка и сортировка продуктов по рейтингу
extreme_ratings = dim_product.select("product_id", "name", "rating") \
    .orderBy("rating")

# Просмотр первых 5 записей
extreme_ratings.show(5)

# Запись в ClickHouse
extreme_ratings.write \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY (rating)") \
    .mode("overwrite") \
    .jdbc(url=clickhouse_url, table="extreme_ratings", properties=clickhouse_props)


+----------+---------+------+
|product_id|     name|rating|
+----------+---------+------+
|      5890|Bird Cage|   1.0|
|      9139|  Cat Toy|   1.0|
|      5099|Bird Cage|   1.0|
|      5930| Dog Food|   1.0|
|      4561|Bird Cage|   1.0|
+----------+---------+------+
only showing top 5 rows



# Корреляция между рейтингом и объемом продаж

In [59]:
from pyspark.sql import functions as F

fact_sales_clean = fact_sales.select("product_id", "quantity") \
    .withColumnRenamed("quantity", "sale_quantity")

dim_product_clean = dim_product.select("product_id", "rating")

rating_sales_corr = fact_sales_clean.join(dim_product_clean, "product_id") \
    .groupBy("product_id", "rating") \
    .agg(F.sum("sale_quantity").alias("total_quantity"))

rating_sales_corr.show(5)

rating_sales_corr.write \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY (rating)") \
    .mode("overwrite") \
    .jdbc(url=clickhouse_url, table="rating_sales_corr", properties=clickhouse_props)


+----------+------+--------------+
|product_id|rating|total_quantity|
+----------+------+--------------+
|      2460|   1.2|             5|
|      8888|   1.2|             4|
|      6801|   4.1|             1|
|      8365|   2.0|             5|
|      4844|   4.8|             4|
+----------+------+--------------+
only showing top 5 rows



# Продукты с наибольшим количеством отзывов

In [60]:
from pyspark.sql import functions as F

top_reviewed_products = dim_product.select("product_id", "name", "reviews") \
    .orderBy(F.desc("reviews"))

top_reviewed_products.show(5)

top_reviewed_products.write \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY (reviews)") \
    .mode("overwrite") \
    .jdbc(url=clickhouse_url, table="top_reviewed_products", properties=clickhouse_props)


+----------+--------+-------+
|product_id|    name|reviews|
+----------+--------+-------+
|      2436|Dog Food|   1000|
|      2974| Cat Toy|   1000|
|      7427| Cat Toy|   1000|
|       919|Dog Food|   1000|
|      4075| Cat Toy|   1000|
+----------+--------+-------+
only showing top 5 rows

