## Загрузка плагинов

In [48]:
!wget https://jdbc.postgresql.org/download/postgresql-42.6.0.jar -O /home/jovyan/postgresql-jdbc.jar

--2025-05-20 18:37:24--  https://jdbc.postgresql.org/download/postgresql-42.6.0.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1081604 (1.0M) [application/java-archive]
Saving to: ‘/home/jovyan/postgresql-jdbc.jar’


2025-05-20 18:37:29 (268 KB/s) - ‘/home/jovyan/postgresql-jdbc.jar’ saved [1081604/1081604]



## Создание сессии

In [49]:
from pyspark.sql import SparkSession

DB_URL = "jdbc:postgresql://postgres:5432/spark_db"
POSTGRES_USER = "spark_user"
POSTGRES_PASSWORD = "spark_password"

spark = (
    SparkSession.builder.appName("PostgreSQL-Integration")
    .config("spark.jars", "/home/jovyan/postgresql-jdbc.jar")
    .config("spark.driver.extraClassPath", "/home/jovyan/postgresql-jdbc.jar")
    .config("spark.executor.extraClassPath", "/home/jovyan/postgresql-jdbc.jar")
    .getOrCreate()
)

## Предобработка данных

In [50]:
from datetime import datetime

from pyspark.sql.functions import col, monotonically_increasing_id, row_number
from pyspark.sql.window import Window


def process_data():
    df_src = (
        spark.read.format("jdbc")
        .option("url", DB_URL)
        .option("dbtable", "mock")
        .options(user=POSTGRES_USER, password=POSTGRES_PASSWORD)
        .load()
    )

    # 1. Dim_Customer
    df_dim_customer = df_src.select(
        col("customer_first_name").alias("first_name"),
        col("customer_last_name").alias("last_name"),
        col("customer_age").alias("age"),
        col("customer_email").alias("email"),
        col("customer_country").alias("country"),
        col("customer_postal_code").alias("postal_code"),
        col("customer_pet_type").alias("pet_type"),
        col("customer_pet_name").alias("pet_name"),
        col("customer_pet_breed").alias("pet_breed"),
    ).distinct()

    # Handle email conflicts (PySpark equivalent of ON CONFLICT DO NOTHING)
    windowSpec = Window.partitionBy("email").orderBy("email")
    df_dim_customer = df_dim_customer.withColumn(
        "row_num", row_number().over(windowSpec)
    )
    df_dim_customer = df_dim_customer.filter(col("row_num") == 1).drop("row_num")
    df_dim_customer = df_dim_customer.withColumn(
        "customer_id", monotonically_increasing_id()
    )

    # 2. Dim_Seller
    df_dim_seller = df_src.select(
        col("seller_first_name").alias("first_name"),
        col("seller_last_name").alias("last_name"),
        col("seller_email").alias("email"),
        col("seller_country").alias("country"),
        col("seller_postal_code").alias("postal_code"),
    ).distinct()

    # Handle email conflicts
    windowSpec = Window.partitionBy("email").orderBy("email")
    df_dim_seller = df_dim_seller.withColumn("row_num", row_number().over(windowSpec))
    df_dim_seller = df_dim_seller.filter(col("row_num") == 1).drop("row_num")
    df_dim_seller = df_dim_seller.withColumn("seller_id", monotonically_increasing_id())

    # 3. Dim_Product
    df_dim_product = df_src.select(
        col("product_name").alias("name"),
        col("product_category").alias("category"),
        col("product_price").alias("price"),
        col("product_weight").alias("weight"),
        col("product_color").alias("color"),
        col("product_size").alias("size"),
        col("product_brand").alias("brand"),
        col("product_material").alias("material"),
        col("product_description").alias("description"),
        col("product_rating").alias("rating"),
        col("product_reviews").alias("reviews"),
        col("product_release_date").alias("release_date"),
        col("product_expiry_date").alias("expiry_date"),
        col("pet_category").alias("pet_category"),
    ).distinct()

    # Handle (name, category) conflicts
    windowSpec = Window.partitionBy("name", "category").orderBy("name")
    df_dim_product = df_dim_product.withColumn("row_num", row_number().over(windowSpec))
    df_dim_product = df_dim_product.filter(col("row_num") == 1).drop("row_num")
    df_dim_product = df_dim_product.withColumn(
        "product_id", monotonically_increasing_id()
    )

    # 4. Dim_Store
    df_dim_store = df_src.select(
        col("store_name").alias("name"),
        col("store_location").alias("location"),
        col("store_city").alias("city"),
        col("store_state").alias("state"),
        col("store_country").alias("country"),
        col("store_phone").alias("phone"),
        col("store_email").alias("email"),
    ).distinct()

    # Handle (name, location) conflicts
    windowSpec = Window.partitionBy("name", "location").orderBy("name")
    df_dim_store = df_dim_store.withColumn("row_num", row_number().over(windowSpec))
    df_dim_store = df_dim_store.filter(col("row_num") == 1).drop("row_num")
    df_dim_store = df_dim_store.withColumn("store_id", monotonically_increasing_id())

    # 5. Dim_Supplier
    df_dim_supplier = df_src.select(
        col("supplier_name").alias("name"),
        col("supplier_contact").alias("contact"),
        col("supplier_email").alias("email"),
        col("supplier_phone").alias("phone"),
        col("supplier_address").alias("address"),
        col("supplier_city").alias("city"),
        col("supplier_country").alias("country"),
    ).distinct()

    # Handle email conflicts
    windowSpec = Window.partitionBy("email").orderBy("email")
    df_dim_supplier = df_dim_supplier.withColumn(
        "row_num", row_number().over(windowSpec)
    )
    df_dim_supplier = df_dim_supplier.filter(col("row_num") == 1).drop("row_num")
    df_dim_supplier = df_dim_supplier.withColumn(
        "supplier_id", monotonically_increasing_id()
    )

    # Fact Table Creation
    df_fact_sales = (
        df_src.join(
            df_dim_customer, df_src.customer_email == df_dim_customer.email, "inner"
        )
        .join(df_dim_seller, df_src.seller_email == df_dim_seller.email, "inner")
        .join(
            df_dim_product,
            (df_src.product_name == df_dim_product.name)
            & (df_src.product_category == df_dim_product.category),
            "inner",
        )
        .join(
            df_dim_store,
            (df_src.store_name == df_dim_store.name)
            & (df_src.store_location == df_dim_store.location),
            "inner",
        )
        .join(df_dim_supplier, df_src.supplier_email == df_dim_supplier.email, "inner")
        .select(
            df_dim_customer["customer_id"],
            df_dim_seller["seller_id"],
            df_dim_product["product_id"],
            df_dim_store["store_id"],
            df_dim_supplier["supplier_id"],
            col("sale_date"),
            col("sale_quantity").alias("quantity"),
            col("sale_total_price").alias("total_price"),
            col("product_quantity"),
        )
        .withColumn("sale_id", monotonically_increasing_id())
    )

    return {
        "dim_seller": df_dim_seller,
        "dim_product": df_dim_product,
        "dim_customer": df_dim_customer,
        "dim_store": df_dim_store,
        "dim_supplier": df_dim_supplier,
        "fact_sales": df_fact_sales,
    }


def save_to_postgres(df, table_name):
    df.write.format("jdbc").option("url", DB_URL).option("dbtable", table_name).option(
        "user", POSTGRES_USER
    ).option("password", POSTGRES_PASSWORD).option(
        "driver", "org.postgresql.Driver"
    ).mode(
        "overwrite"
    ).save()


def update_data():
    try:
        dataframes = process_data()
        for table_name, df in dataframes.items():
            save_to_postgres(df, table_name)
        print(f"{datetime.now()} - Data update completed successfully")
    except Exception as e:
        print(f"Error during data update: {str(e)}")

In [51]:
update_data()

2025-05-20 18:37:30.985382 - Data update completed successfully


## Составление отчетов

In [52]:
from pyspark.sql.functions import month, year

# === 1. Чтение таблиц из PostgreSQL ===
dim_customer = (
    spark.read.format("jdbc")
    .option("url", DB_URL)
    .option("dbtable", "dim_customer")
    .options(user=POSTGRES_USER, password=POSTGRES_PASSWORD)
    .load()
)
dim_seller = (
    spark.read.format("jdbc")
    .option("url", DB_URL)
    .option("dbtable", "dim_seller")
    .options(user=POSTGRES_USER, password=POSTGRES_PASSWORD)
    .load()
)
dim_product = (
    spark.read.format("jdbc")
    .option("url", DB_URL)
    .option("dbtable", "dim_product")
    .options(user=POSTGRES_USER, password=POSTGRES_PASSWORD)
    .load()
)
dim_store = (
    spark.read.format("jdbc")
    .option("url", DB_URL)
    .option("dbtable", "dim_store")
    .options(user=POSTGRES_USER, password=POSTGRES_PASSWORD)
    .load()
)
dim_supplier = (
    spark.read.format("jdbc")
    .option("url", DB_URL)
    .option("dbtable", "dim_supplier")
    .options(user=POSTGRES_USER, password=POSTGRES_PASSWORD)
    .load()
)
fact_sales = (
    spark.read.format("jdbc")
    .option("url", DB_URL)
    .option("dbtable", "fact_sales")
    .options(user=POSTGRES_USER, password=POSTGRES_PASSWORD)
    .load()
)

### Витрина 1: Продажи по продуктам

In [53]:
from pyspark.sql.functions import avg, col, desc, row_number, sum
from pyspark.sql.window import Window

# === 1. Агрегация по продуктам ===
product_stats = (
    fact_sales.alias("fs")
    .join(dim_product.alias("p"), col("fs.product_id") == col("p.product_id"))
    .groupBy("fs.product_id", "p.name", "p.category", "p.rating", "p.reviews")
    .agg(
        sum("fs.quantity").alias("total_sold"),
        sum("fs.total_price").alias("total_revenue"),
    )
)

# === 2. Выбор топ-10 по количеству продаж ===
window = Window.orderBy(desc("total_sold"))
top_10_products = (
    product_stats.withColumn("row_number", row_number().over(window))
    .filter(col("row_number") <= 10)
    .drop("row_number")
)

# === 3. Агрегация общей выручки по категориям ===
category_revenue = (
    fact_sales.alias("fs")
    .join(dim_product.alias("p"), col("fs.product_id") == col("p.product_id"))
    .groupBy("p.category")
    .agg(sum("fs.total_price").alias("category_total_revenue"))
)

# === 4. Джойн для добавления общей выручки категории ===
report_products = (
    top_10_products.alias("t10")
    .join(category_revenue.alias("cat"), col("t10.category") == col("cat.category"))
    .select(
        col("t10.product_id"),
        col("t10.name").alias("product_name"),
        col("t10.category").alias("product_category"),
        col("t10.total_sold"),
        col("t10.total_revenue"),
        col("cat.category_total_revenue"),
        col("t10.rating").alias("avg_rating"),
        col("t10.reviews").alias("total_reviews"),
    )
    .orderBy(desc("total_sold"))
)

# Просмотр таблицы
report_products.toPandas()

Unnamed: 0,product_id,product_name,product_category,total_sold,total_revenue,category_total_revenue,avg_rating,total_reviews
0,8,Dog Food,Toy,6346,293603.41,868101.63,1.1,778
1,2,Bird Cage,Toy,6261,296563.92,868101.63,3.5,642
2,0,Bird Cage,Cage,6234,279530.37,831117.94,1.9,151
3,7,Dog Food,Food,6196,283588.88,830632.55,4.7,249
4,3,Cat Toy,Cage,6067,280212.67,831117.94,2.2,589
5,5,Cat Toy,Toy,6028,277934.3,868101.63,4.9,511
6,4,Cat Toy,Food,6025,275659.91,830632.55,1.1,291
7,6,Dog Food,Cage,5756,271374.9,831117.94,4.9,265
8,1,Bird Cage,Food,5710,271383.76,830632.55,2.5,500


### Витрина 2: Продажи по клиентам

In [54]:
from pyspark.sql.functions import avg, col, desc, row_number, sum
from pyspark.sql.window import Window

# === 1. Агрегация по клиентам ===
customer_stats = (
    fact_sales.alias("fs")
    .join(dim_customer.alias("c"), col("fs.customer_id") == col("c.customer_id"))
    .groupBy("fs.customer_id", "c.email", "c.first_name", "c.last_name", "c.country")
    .agg(
        sum("fs.total_price").alias("total_spent"),
        avg("fs.total_price").alias("avg_check"),
    )
)

# === 2. Отбор топ-10 клиентов по общей сумме покупок ===
window = Window.orderBy(desc("total_spent"))
report_customers = (
    customer_stats.withColumn("row_number", row_number().over(window))
    .filter(col("row_number") <= 10)
    .drop("row_number")
    .orderBy(desc("total_spent"))
)

# Просмотр результата
report_customers.toPandas()

Unnamed: 0,customer_id,email,first_name,last_name,country,total_spent,avg_check
0,1024,bfeasby57@youku.com,Gus,Hartshorn,Albania,499.85,499.85
1,8867,sstappardbp@businessweek.com,Hayes,McKain,Portugal,499.8,499.8
2,2850,dsorea0@geocities.com,Ava,Lomas,China,499.76,499.76
3,8047,rivattspm@un.org,Dawna,Impey,Indonesia,499.76,499.76
4,7669,previllh3@tinyurl.com,Lavinia,Horsburgh,Poland,499.73,499.73
5,5218,jthurnhamqe@sourceforge.net,Dame,Auchinleck,Indonesia,499.71,499.71
6,1338,bselewayi0@chron.com,Isahella,Colley,Russia,499.69,499.69
7,3845,gcoupman2@bigcartel.com,Nicky,Lattie,Mexico,499.62,499.62
8,9779,wpulmano6@loc.gov,Sisely,Bonevant,China,499.62,499.62
9,8915,svispof9@t.co,Eran,Cotes,China,499.59,499.59


### Витрина 3: Продажи по времени

In [55]:
from pyspark.sql.functions import concat_ws, lpad

report_time = (
    fact_sales.withColumn("sale_month", month("sale_date"))
    .withColumn("sale_year", year("sale_date"))
    .withColumn("month_padded", lpad(col("sale_month"), 2, "0"))
    .withColumn("year_month", concat_ws("-", col("sale_year"), col("month_padded")))
    .groupBy("sale_year", "sale_month", "year_month")
    .agg(
        sum("total_price").alias("monthly_revenue"),
        avg("total_price").alias("avg_order_value"),
    )
    .orderBy("sale_year", "sale_month")
)
report_time.toPandas()

Unnamed: 0,sale_year,sale_month,year_month,monthly_revenue,avg_order_value
0,2021,1,2021-01,224158.54,256.474302
1,2021,2,2021-02,192348.31,260.281881
2,2021,3,2021-03,207282.2,245.886358
3,2021,4,2021-04,206592.82,246.825352
4,2021,5,2021-05,211764.86,255.754662
5,2021,6,2021-06,215042.8,261.609246
6,2021,7,2021-07,220496.51,256.988939
7,2021,8,2021-08,221275.78,246.684259
8,2021,9,2021-09,210623.43,251.041037
9,2021,10,2021-10,228743.32,256.4387


### Витрина 4: Продажи по магазинам

In [56]:
from pyspark.sql.functions import avg, col, count, desc, row_number, sum
from pyspark.sql.window import Window

# === 1. Агрегация по магазинам (основа для витрины) ===
store_stats = (
    fact_sales.alias("fs")
    .join(dim_store.alias("s"), col("fs.store_id") == col("s.store_id"))
    .groupBy("fs.store_id", "s.name", "s.city", "s.country")
    .agg(
        sum("fs.total_price").alias("store_revenue"),
        avg("fs.total_price").alias("avg_check"),
    )
)

# === 2. Отбор топ-5 по выручке ===
window = Window.orderBy(desc("store_revenue"))
top5_stores = (
    store_stats.withColumn("row_number", row_number().over(window))
    .filter(col("row_number") <= 5)
    .drop("row_number")
)

# === 3. Расчёт общего количества продаж по городам и странам ===
sales_by_city = (
    fact_sales.alias("fs")
    .join(dim_store.alias("s"), col("fs.store_id") == col("s.store_id"))
    .groupBy("s.city")
    .agg(count("*").alias("sales_in_city"))
)

sales_by_country = (
    fact_sales.alias("fs")
    .join(dim_store.alias("s"), col("fs.store_id") == col("s.store_id"))
    .groupBy("s.country")
    .agg(count("*").alias("sales_in_country"))
)

# === 4. Финальный джойн всех частей ===
report_stores = (
    top5_stores.alias("t5")
    .join(sales_by_city.alias("c"), col("t5.city") == col("c.city"))
    .join(sales_by_country.alias("cn"), col("t5.country") == col("cn.country"))
    .select(
        col("t5.store_id"),
        col("t5.name").alias("store_name"),
        col("t5.city"),
        col("t5.country"),
        col("t5.store_revenue"),
        col("t5.avg_check"),
        col("c.sales_in_city"),
        col("cn.sales_in_country"),
    )
    .orderBy(desc("store_revenue"))
)
report_stores.toPandas()

Unnamed: 0,store_id,store_name,city,country,store_revenue,avg_check,sales_in_city,sales_in_country
0,9251,Youspan,Duran,Russia,1317.2,439.066667,3,555
1,5094,Ntag,Maojia,China,1212.27,303.0675,4,1863
2,5873,Quimba,Butou,Indonesia,1197.64,239.528,5,1113
3,5797,Quatz,Suncheon,Russia,1069.75,356.583333,3,555
4,5970,Quire,Harian,Italy,1005.19,335.063333,4,20


### Витрина 5: Продажи по поставщикам

In [57]:
from pyspark.sql.functions import avg, col, count, desc, row_number, sum
from pyspark.sql.window import Window

# === 1. Агрегация по поставщикам ===
supplier_stats = (
    fact_sales.alias("fs")
    .join(dim_supplier.alias("sup"), col("fs.supplier_id") == col("sup.supplier_id"))
    .join(dim_product.alias("p"), col("fs.product_id") == col("p.product_id"))
    .groupBy("fs.supplier_id", "sup.name", "sup.country")
    .agg(
        sum("fs.total_price").alias("supplier_revenue"),
        avg("p.price").alias("avg_price"),
    )
)

# === 2. Топ-5 поставщиков по выручке ===
window = Window.orderBy(desc("supplier_revenue"))
top5_suppliers = (
    supplier_stats.withColumn("row_number", row_number().over(window))
    .filter(col("row_number") <= 5)
    .drop("row_number")
)

# === 3. Продажи каждого поставщика в его стране ===
sales_per_supplier_country = (
    fact_sales.alias("fs")
    .join(dim_supplier.alias("sup"), col("fs.supplier_id") == col("sup.supplier_id"))
    .groupBy("sup.supplier_id", "sup.country")
    .agg(count("*").alias("sales_in_country"))
)

# === 4. Финальная витрина ===
report_suppliers = (
    top5_suppliers.alias("top")
    .join(
        sales_per_supplier_country.alias("sc"),
        (col("top.supplier_id") == col("sc.supplier_id"))
        & (col("top.country") == col("sc.country")),
    )
    .select(
        col("top.supplier_id"),
        col("top.name").alias("supplier_name"),
        col("top.country"),
        col("top.supplier_revenue"),
        col("top.avg_price"),
        col("sc.sales_in_country"),
    )
    .orderBy(desc("supplier_revenue"))
)
report_suppliers.toPandas()

Unnamed: 0,supplier_id,supplier_name,country,supplier_revenue,avg_price,sales_in_country
0,1024,Brainverse,Ireland,499.85,61.2,1
1,8867,Jamia,Russia,499.8,27.83,1
2,2850,Eabox,Portugal,499.76,27.83,1
3,8047,Demimbu,China,499.76,39.92,1
4,7669,Browsezoom,Argentina,499.73,61.2,1


### Витрина 6: Качество продукции

In [58]:
from pyspark.sql.functions import avg, col, desc, row_number, sum
from pyspark.sql.window import Window

# === 1. Базовая агрегация по продуктам ===
product_quality_stats = (
    fact_sales.alias("fs")
    .join(dim_product.alias("p"), col("fs.product_id") == col("p.product_id"))
    .groupBy("fs.product_id", "p.name", "p.category")
    .agg(
        avg("p.rating").alias("avg_rating"),
        sum("p.reviews").alias("total_reviews"),
        sum("fs.quantity").alias("total_sold"),
    )
)

# === 2. Окна для флагов ===
window_rating_desc = Window.orderBy(desc("avg_rating"))
window_rating_asc = Window.orderBy(col("avg_rating"))
window_reviews = Window.orderBy(desc("total_reviews"))

# === 3. Добавляем флаги топов ===
report_quality = (
    product_quality_stats.withColumn("rank_top", row_number().over(window_rating_desc))
    .withColumn("rank_low", row_number().over(window_rating_asc))
    .withColumn("rank_reviews", row_number().over(window_reviews))
    .withColumn("is_top_rated", col("rank_top") <= 5)
    .withColumn("is_low_rated", col("rank_low") <= 5)
    .withColumn("is_most_reviewed", col("rank_reviews") <= 5)
    .drop("rank_top", "rank_low", "rank_reviews")
    .orderBy(desc("avg_rating"))
)
report_quality.toPandas()

Unnamed: 0,product_id,name,category,avg_rating,total_reviews,total_sold,is_top_rated,is_low_rated,is_most_reviewed
0,5,Cat Toy,Toy,4.9,566188,6028,True,False,True
1,6,Dog Food,Cage,4.9,286200,5756,True,False,False
2,7,Dog Food,Food,4.7,275643,6196,True,False,False
3,2,Bird Cage,Toy,3.5,735732,6261,True,False,True
4,1,Bird Cage,Food,2.5,533000,5710,True,True,True
5,3,Cat Toy,Cage,2.2,652023,6067,False,True,True
6,0,Bird Cage,Cage,1.9,172140,6234,False,True,False
7,8,Dog Food,Toy,1.1,896256,6346,False,True,True
8,4,Cat Toy,Food,1.1,318354,6025,False,True,False


## Загрузка и проверка отчетов

In [59]:
table_dataframes = {
    "report_products": report_products,
    "report_customers": report_customers,
    "report_time": report_time,
    "report_stores": report_stores,
    "report_suppliers": report_suppliers,
    "report_quality": report_quality,
}

### ClickHouse

In [60]:
%pip install clickhouse_connect

Note: you may need to restart the kernel to use updated packages.


In [61]:
import clickhouse_connect
import pandas as pd

# === 1. Подключение к ClickHouse ===
client = clickhouse_connect.get_client(
    host="clickhouse", port=8123, username="custom_user", password="custom_password"
)

# === 2. Определения таблиц ClickHouse ===
table_defs = {
    "report_products": """
DROP TABLE IF EXISTS report_products;
CREATE TABLE report_products (
    product_id UInt32,
    product_name String,
    product_category String,
    total_sold UInt32,
    total_revenue Float64,
    category_total_revenue Float64,
    avg_rating Float32,
    total_reviews UInt32
) ENGINE = MergeTree()
ORDER BY product_id;
    """,
    "report_customers": """
DROP TABLE IF EXISTS report_customers;
CREATE TABLE report_customers (
    customer_id UInt32,
    email String,
    first_name String,
    last_name String,
    country String,
    total_spent Float64,
    avg_check Float64
) ENGINE = MergeTree()
ORDER BY customer_id;
    """,
    "report_time": """
DROP TABLE IF EXISTS report_time;
CREATE TABLE report_time (
    sale_year UInt16,
    sale_month UInt8,
    year_month String,
    monthly_revenue Float64,
    avg_order_value Float64
) ENGINE = MergeTree()
ORDER BY (sale_year, sale_month);
    """,
    "report_stores": """
DROP TABLE IF EXISTS report_stores;
CREATE TABLE report_stores (
    store_id UInt32,
    store_name String,
    city String,
    country String,
    store_revenue Float64,
    avg_check Float64,
    sales_in_city UInt32,
    sales_in_country UInt32
) ENGINE = MergeTree()
ORDER BY store_id;
    """,
    "report_suppliers": """
DROP TABLE IF EXISTS report_suppliers;
CREATE TABLE report_suppliers (
    supplier_id UInt32,
    supplier_name String,
    country String,
    supplier_revenue Float64,
    avg_price Float64,
    sales_in_country UInt32
) ENGINE = MergeTree()
ORDER BY supplier_id;
    """,
    "report_quality": """
DROP TABLE IF EXISTS report_quality;
CREATE TABLE report_quality (
    product_id UInt32,
    name String,
    category String,
    avg_rating Float32,
    total_reviews UInt32,
    total_sold UInt32,
    is_top_rated UInt8,
    is_low_rated UInt8,
    is_most_reviewed UInt8
) ENGINE = MergeTree()
ORDER BY product_id;
    """,
}


# === 3. Создание таблиц ===
for table_name, ddl in table_defs.items():
    # Разделяем на строки и выполняем по отдельности
    statements = ddl.strip().split(";")
    for statement in statements:
        stmt = statement.strip()
        if stmt:
            client.command(stmt)

# === 4. Запись данных в ClickHouse ===
for table_name, df in table_dataframes.items():
    pdf = df.toPandas()

    # Приводим к списку списков по колонкам
    data_columns = [pdf[col].tolist() for col in pdf.columns]

    client.insert(
        table_name,
        data=data_columns,
        column_names=list(pdf.columns),
        column_oriented=True,
    )

print("✅ Все 6 витрин успешно загружены в ClickHouse.")

# === 5. Проверка: выгрузка и вывод 10 строк из каждой таблицы ===
for table_name in table_dataframes.keys():
    print(f"\n🔍 {table_name} — первые 10 строк:")
    result = client.query(f"SELECT * FROM {table_name} LIMIT 10")
    df_check = pd.DataFrame(result.result_set, columns=result.column_names)
    display(df_check)

✅ Все 6 витрин успешно загружены в ClickHouse.

🔍 report_products — первые 10 строк:


Unnamed: 0,product_id,product_name,product_category,total_sold,total_revenue,category_total_revenue,avg_rating,total_reviews
0,0,Bird Cage,Cage,6234,279530.37,831117.94,1.9,151
1,1,Bird Cage,Food,5710,271383.76,830632.55,2.5,500
2,2,Bird Cage,Toy,6261,296563.92,868101.63,3.5,642
3,3,Cat Toy,Cage,6067,280212.67,831117.94,2.2,589
4,4,Cat Toy,Food,6025,275659.91,830632.55,1.1,291
5,5,Cat Toy,Toy,6028,277934.3,868101.63,4.9,511
6,6,Dog Food,Cage,5756,271374.9,831117.94,4.9,265
7,7,Dog Food,Food,6196,283588.88,830632.55,4.7,249
8,8,Dog Food,Toy,6346,293603.41,868101.63,1.1,778



🔍 report_customers — первые 10 строк:


Unnamed: 0,customer_id,email,first_name,last_name,country,total_spent,avg_check
0,1024,bfeasby57@youku.com,Gus,Hartshorn,Albania,499.85,499.85
1,1338,bselewayi0@chron.com,Isahella,Colley,Russia,499.69,499.69
2,2850,dsorea0@geocities.com,Ava,Lomas,China,499.76,499.76
3,3845,gcoupman2@bigcartel.com,Nicky,Lattie,Mexico,499.62,499.62
4,5218,jthurnhamqe@sourceforge.net,Dame,Auchinleck,Indonesia,499.71,499.71
5,7669,previllh3@tinyurl.com,Lavinia,Horsburgh,Poland,499.73,499.73
6,8047,rivattspm@un.org,Dawna,Impey,Indonesia,499.76,499.76
7,8867,sstappardbp@businessweek.com,Hayes,McKain,Portugal,499.8,499.8
8,8915,svispof9@t.co,Eran,Cotes,China,499.59,499.59
9,9779,wpulmano6@loc.gov,Sisely,Bonevant,China,499.62,499.62



🔍 report_time — первые 10 строк:


Unnamed: 0,sale_year,sale_month,year_month,monthly_revenue,avg_order_value
0,2021,1,2021-01,224158.54,256.474302
1,2021,2,2021-02,192348.31,260.281881
2,2021,3,2021-03,207282.2,245.886358
3,2021,4,2021-04,206592.82,246.825352
4,2021,5,2021-05,211764.86,255.754662
5,2021,6,2021-06,215042.8,261.609246
6,2021,7,2021-07,220496.51,256.988939
7,2021,8,2021-08,221275.78,246.684259
8,2021,9,2021-09,210623.43,251.041037
9,2021,10,2021-10,228743.32,256.4387



🔍 report_stores — первые 10 строк:


Unnamed: 0,store_id,store_name,city,country,store_revenue,avg_check,sales_in_city,sales_in_country
0,5094,Ntag,Maojia,China,1212.27,303.0675,4,1863
1,5797,Quatz,Suncheon,Russia,1069.75,356.583333,3,555
2,5873,Quimba,Butou,Indonesia,1197.64,239.528,5,1113
3,5970,Quire,Harian,Italy,1005.19,335.063333,4,20
4,9251,Youspan,Duran,Russia,1317.2,439.066667,3,555



🔍 report_suppliers — первые 10 строк:


Unnamed: 0,supplier_id,supplier_name,country,supplier_revenue,avg_price,sales_in_country
0,1024,Brainverse,Ireland,499.85,61.2,1
1,2850,Eabox,Portugal,499.76,27.83,1
2,7669,Browsezoom,Argentina,499.73,61.2,1
3,8047,Demimbu,China,499.76,39.92,1
4,8867,Jamia,Russia,499.8,27.83,1



🔍 report_quality — первые 10 строк:


Unnamed: 0,product_id,name,category,avg_rating,total_reviews,total_sold,is_top_rated,is_low_rated,is_most_reviewed
0,0,Bird Cage,Cage,1.9,172140,6234,0,1,0
1,1,Bird Cage,Food,2.5,533000,5710,1,1,1
2,2,Bird Cage,Toy,3.5,735732,6261,1,0,1
3,3,Cat Toy,Cage,2.2,652023,6067,0,1,1
4,4,Cat Toy,Food,1.1,318354,6025,0,1,0
5,5,Cat Toy,Toy,4.9,566188,6028,1,0,1
6,6,Dog Food,Cage,4.9,286200,5756,1,0,0
7,7,Dog Food,Food,4.7,275643,6196,1,0,0
8,8,Dog Food,Toy,1.1,896256,6346,0,1,1
