In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

def create_spark_session():
    return SparkSession.builder \
        .appName("ClickHouse_DataMart") \
        .config("spark.jars", 
                "/opt/spark/jars/clickhouse-jdbc-0.9.2-all-dependencies.jar,/opt/spark/jars/postgresql-42.7.8.jar") \
        .getOrCreate()


def read_postgres_tables(spark):
    """Чтение таблиц из PostgreSQL"""
    
    # Конфигурация PostgreSQL
    pg_properties = {
        "driver": "org.postgresql.Driver",
        "user": "postgres",
        "password": "highload"
    }
    pg_url = "jdbc:postgresql://highload_db:5432/highload_db"
    
    # Чтение таблиц измерений
    dim_customers = spark.read.jdbc(url=pg_url, table="dim_customers", properties=pg_properties)
    dim_products = spark.read.jdbc(url=pg_url, table="dim_products", properties=pg_properties)
    dim_stores = spark.read.jdbc(url=pg_url, table="dim_stores", properties=pg_properties)
    dim_suppliers = spark.read.jdbc(url=pg_url, table="dim_suppliers", properties=pg_properties)
    
    # Чтение таблицы фактов
    fact_sales = spark.read.jdbc(url=pg_url, table="fact_sales", properties=pg_properties)
    # Кэширование часто используемых таблиц
    fact_sales.cache()
    dim_products.cache()
        
    return dim_customers, dim_products, dim_stores, dim_suppliers, fact_sales

def create_product_sales_mart(dim_products, fact_sales):
    """Витрина продаж по продуктам - исправленная версия"""
    
    # Исправление: используем правильные имена столбцов
    product_sales = fact_sales.groupBy("product_id") \
        .agg(
            sum("quantity").alias("total_quantity"),
            sum("total_price").alias("total_revenue"),
            count("sale_id").alias("sales_count")  # Исправлено: было unique_sales_count
        )
    
    product_mart = product_sales.join(dim_products, "product_id", "left") \
        .select(
            "product_id",
            "name",
            "category",
            "total_quantity",
            "total_revenue",
            "sales_count",  # Исправлено: было unique_sales_count
            "rating",
            "reviews"
        ).fillna({"rating": 0, "reviews": 0, "total_quantity": 0, "total_revenue": 0})
    
    # Топ-10 самых продаваемых продуктов по количеству
    top_10_products = product_mart.orderBy(desc("total_quantity")).limit(10)
    
    # Общая выручка по категориям продуктов
    category_revenue = product_mart.groupBy("category") \
        .agg(
            sum("total_revenue").alias("category_revenue"),
            sum("total_quantity").alias("category_total_quantity"),  # Исправлено имя
            count("product_id").alias("products_count")
        )
    
    # НОВАЯ ВИТРИНА: Средний рейтинг и количество отзывов для каждого продукта
    product_ratings_mart = dim_products.select(
        "product_id",
        "name", 
        "category",
        "rating",
        "reviews"
    ).fillna({"rating": 0, "reviews": 0})
    
    return product_mart, top_10_products, category_revenue, product_ratings_mart



def create_customer_sales_mart(dim_customers, fact_sales):
    """Витрина продаж по клиентам - исправленная версия"""
    
    customer_sales = fact_sales.groupBy("customer_id") \
        .agg(
            sum("total_price").alias("total_spent"),
            count("sale_id").alias("purchase_count"),
            avg("total_price").alias("avg_order_value")
        )
    
    # Исправление: убираем email и другие отсутствующие столбцы
    customer_mart = customer_sales.join(dim_customers, "customer_id", "left") \
        .select(
            "customer_id",
            "first_name",
            "last_name",
            "country",
            "total_spent",
            "purchase_count",
            "avg_order_value"
        ).fillna({"country": "Unknown"})
    
    # Топ-10 клиентов
    top_10_customers = customer_mart.orderBy(desc("total_spent")).limit(10)
    
    # Распределение по странам (упрощенное)
    country_distribution = customer_mart.groupBy("country") \
        .agg(
            count("customer_id").alias("customer_count"),
            sum("total_spent").alias("total_spent_by_country")
        ).orderBy(desc("total_spent_by_country"))
    
    return customer_mart, top_10_customers, country_distribution

def create_time_sales_mart(fact_sales):
    """Витрина продаж по времени - исправленная версия"""
    
    time_mart = fact_sales \
        .filter(col("sale_date").isNotNull()) \
        .withColumn("year", year("sale_date")) \
        .withColumn("month", month("sale_date")) \
        .withColumn("year_month", date_format("sale_date", "yyyy-MM"))
    
    # Месячные тренды (без unique_customers)
    monthly_trends = time_mart.groupBy("year", "month", "year_month") \
        .agg(
            sum("total_price").alias("monthly_revenue"),
            count("sale_id").alias("monthly_orders"),
            avg("total_price").alias("avg_order_size_monthly")
        ).orderBy("year", "month")
    
    # Годовые тренды (без unique_customers)
    yearly_trends = time_mart.groupBy("year") \
        .agg(
            sum("total_price").alias("yearly_revenue"),
            count("sale_id").alias("yearly_orders"),
            avg("total_price").alias("avg_order_size_yearly")
        ).orderBy("year")
    
    # Сравнение периодов (упрощенное)
    period_comparison = monthly_trends.groupBy("month") \
        .agg(
            avg("monthly_revenue").alias("avg_monthly_revenue"),
            avg("monthly_orders").alias("avg_monthly_orders")
        )
    
    return monthly_trends, yearly_trends, period_comparison

def create_store_sales_mart(dim_stores, fact_sales):
    """Витрина продаж по магазинам - исправленная версия"""
    
    store_sales = fact_sales.groupBy("store_id") \
        .agg(
            sum("total_price").alias("total_revenue"),
            countDistinct("sale_id").alias("total_orders"),
            countDistinct("customer_id").alias("unique_customers"),
            sum("quantity").alias("total_quantity_sold"),
            avg("total_price").alias("avg_order_value")
        )
    
    store_mart = store_sales.join(dim_stores, "store_id", "left") \
        .select(
            "store_id",
            "name",
            "city",
            "country",
            "total_revenue",
            "total_orders",
            "unique_customers",
            "total_quantity_sold",
            "avg_order_value"
        ).fillna({"country": "Unknown", "city": "Unknown"})
    
    # Топ-5 магазинов по выручке
    top_5_stores = store_mart.orderBy(desc("total_revenue")).limit(5)
    
    # Распределение по городам и странам с агрегацией
    geo_distribution = store_mart.groupBy("country", "city") \
        .agg(
            sum("total_revenue").alias("total_revenue_by_location"),
            count("store_id").alias("store_count"),
            sum("total_orders").alias("total_orders_by_location"),
            avg("avg_order_value").alias("avg_order_value_by_location")
        ).orderBy(desc("total_revenue_by_location"))
    
    return store_mart, top_5_stores, geo_distribution

def create_supplier_sales_mart(dim_suppliers, fact_sales, dim_products):
    """Витрина продаж по поставщикам - исправленная версия"""
    
    # Используем готовый supplier_id из fact_sales (согласно структуре таблиц)
    sales_with_supplier = fact_sales.filter(col("supplier_id").isNotNull())

    supplier_sales = sales_with_supplier.groupBy("supplier_id") \
        .agg(
            sum("total_price").alias("total_revenue"),
            countDistinct("sale_id").alias("total_sales"),
            countDistinct("product_id").alias("unique_products_sold"),
            sum("quantity").alias("total_quantity_sold"),
            avg("unit_price").alias("avg_product_price"),
            min("unit_price").alias("min_product_price"),
            max("unit_price").alias("max_product_price")
        )
    
    supplier_mart = supplier_sales.join(dim_suppliers, "supplier_id", "left") \
        .select(
            "supplier_id",
            "name",
            "country",
            "contact",
            "total_revenue",
            "total_sales",
            "unique_products_sold",
            "total_quantity_sold",
            "avg_product_price",
            "min_product_price",
            "max_product_price"
        ).fillna({"country": "Unknown"})
    
    # Топ-5 поставщиков по выручке
    top_5_suppliers = supplier_mart.orderBy(desc("total_revenue")).limit(5)
    
    # Распределение по странам
    supplier_country_dist = supplier_mart.groupBy("country") \
        .agg(
            sum("total_revenue").alias("total_revenue_by_country"),
            count("supplier_id").alias("supplier_count"),
            sum("total_sales").alias("total_sales_by_country"),
            avg("avg_product_price").alias("avg_product_price_by_country")
        ).orderBy(desc("total_revenue_by_country"))
    
    return supplier_mart, top_5_suppliers, supplier_country_dist


def create_product_quality_mart(dim_products, fact_sales):
    """Витрина качества продукции - исправленная версия"""
    
    # Статистика продаж по продуктам
    product_sales_stats = fact_sales.groupBy("product_id") \
        .agg(
            sum("quantity").alias("total_sold"),
            sum("total_price").alias("total_revenue"),
            countDistinct("sale_id").alias("sales_count"),
            avg("unit_price").alias("avg_sale_price")
        )
    
    quality_mart = product_sales_stats.join(dim_products, "product_id", "left") \
        .select(
            "product_id",
            "name",
            "category",
            "rating",
            "reviews",
            "total_sold",
            "total_revenue",
            "sales_count",
            "avg_sale_price"
        ).fillna({"rating": 0, "reviews": 0, "total_sold": 0, "total_revenue": 0})
    
    # Продукты с наивысшим рейтингом (минимум 10 отзывов)
    highest_rated = quality_mart.filter(col("reviews") >= 10) \
                               .orderBy(desc("rating")).limit(10)
    
    # Продукты с наименьшим рейтингом (минимум 1 отзыв)
    lowest_rated = quality_mart.filter(col("reviews") >= 1) \
                              .orderBy("rating").limit(10)
    
    # Продукты с наибольшим количеством отзывов
    most_reviewed = quality_mart.orderBy(desc("reviews")).limit(10)
    
    # Анализ корреляции между рейтингом и продажами
    correlation_analysis = quality_mart.filter(col("reviews") > 0) \
        .agg(
            corr("rating", "total_sold").alias("corr_rating_sales"),
            corr("rating", "total_revenue").alias("corr_rating_revenue"),
            corr("rating", "reviews").alias("corr_rating_reviews")
        )
    
    return quality_mart, highest_rated, lowest_rated, most_reviewed, correlation_analysis

def write_to_clickhouse(df, table_name):
    """Запись DataFrame в ClickHouse с использованием правильного драйвера"""
    
    try:
        df.write \
            .format("jdbc") \
            .option("url", "jdbc:clickhouse://clickhouse:8123/mydb") \
            .option("dbtable", table_name) \
            .option("user", "admin") \
            .option("password", "password") \
            .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
            .mode("append") \
            .save()
        #logger.info(f"Данные успешно записаны в таблицу {table_name}")
    except Exception as e:
        print(e)
        #logger.error(f"Ошибка при записи в таблицу {table_name}: {str(e)}")

In [3]:
spark = create_spark_session()

# Читаем данные из PostgreSQL
dim_customers, dim_products, dim_stores, dim_suppliers, fact_sales = read_postgres_tables(spark)

# Создаем витрины

# 1. Витрина продаж по продуктам
# product_mart, top_10_products, category_revenue, category_ratings = create_product_sales_mart(dim_products, fact_sales)    

# write_to_clickhouse(category_ratings, "product_ratings_mart")
# write_to_clickhouse(top_10_products, "top_10_products")
# write_to_clickhouse(category_revenue, "category_revenue_mart")

25/10/29 21:41:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
# 2. Витрина продаж по клиентам
customer_mart, top_10_customers, country_distribution = create_customer_sales_mart(dim_customers, fact_sales)
write_to_clickhouse(customer_mart, "customer_sales_mart")
write_to_clickhouse(top_10_customers, "top_10_customers")
write_to_clickhouse(country_distribution, "customer_country_distribution")

25/10/29 21:23:24 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:23:27 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:23:27 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:23:27 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:23:27 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:23:27 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.Cl

25/10/29 21:23:34 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:23:34 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:23:34 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:23:34 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:23:34 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:23:34 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:23:34 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtabl

25/10/29 21:23:38 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:23:38 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:23:38 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:23:38 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:23:38 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:23:38 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:23:38 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtabl

25/10/29 21:23:42 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:23:42 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:23:42 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:23:42 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:23:42 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:23:42 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:23:42 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtabl

25/10/29 21:23:47 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:23:47 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:23:47 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:23:47 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:23:47 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:23:47 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:23:47 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtabl

25/10/29 21:23:53 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:23:53 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:23:53 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:23:53 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:23:53 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:23:53 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:23:53 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtabl

25/10/29 21:23:58 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:23:58 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:23:58 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:23:58 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:23:58 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:23:58 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.Cl

25/10/29 21:24:55 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_country_distribution, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:24:55 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:24:55 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_country_distribution, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:24:55 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:24:55 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_country_distribution, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:24:55 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:24:55 WARN ClientConfigProperties: Unknown and unmap

25/10/29 21:24:55 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_country_distribution, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:24:55 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:24:55 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_country_distribution, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:24:55 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:24:55 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_country_distribution, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:24:55 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:24:55 WARN ClientConfigProperties: Unknown and unmap

25/10/29 21:24:56 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:24:56 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_country_distribution, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:24:56 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:24:56 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_country_distribution, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:24:56 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:24:56 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_country_distribution, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:24:56 WARN JdbcUtils: Requested isolation level 1, b

25/10/29 21:24:57 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_country_distribution, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:24:57 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:24:57 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_country_distribution, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:24:57 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:24:57 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_country_distribution, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:24:57 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:24:57 WARN ClientConfigProperties: Unknown and unmap

25/10/29 21:24:57 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_country_distribution, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:24:57 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:24:57 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_country_distribution, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:24:57 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:24:57 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=customer_country_distribution, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:24:57 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:24:57 WARN ClientConfigProperties: Unknown and unmap

In [4]:
# 3. Витрина продаж по времени
monthly_trends, yearly_trends, period_comparison = create_time_sales_mart(fact_sales)
write_to_clickhouse(monthly_trends, "monthly_trends_mart")
write_to_clickhouse(yearly_trends, "yearly_trends_mart")
write_to_clickhouse(period_comparison, "monthly_comparison_mart")

In [4]:
# 4. Витрина продаж по магазинам
store_mart, top_5_stores, geo_distribution = create_store_sales_mart(dim_stores, fact_sales)
# write_to_clickhouse(store_mart, "store_sales_mart")
# write_to_clickhouse(top_5_stores, "top_5_stores")
write_to_clickhouse(geo_distribution, "store_geo_distribution")

25/10/29 21:42:05 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=store_geo_distribution, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:43:33 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=store_geo_distribution, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:43:33 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:43:33 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=store_geo_distribution, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:43:33 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:43:33 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=store_geo_distribution, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickh

25/10/29 21:43:34 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:43:34 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=store_geo_distribution, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:43:34 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:43:34 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=store_geo_distribution, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:43:34 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:43:34 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=store_geo_distribution, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:43:34 WARN JdbcUtils: Requested isolation level 1, but transactions are u

25/10/29 21:43:35 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:43:35 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=store_geo_distribution, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:43:35 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:43:35 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=store_geo_distribution, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:43:35 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:43:35 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=store_geo_distribution, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:43:35 WARN JdbcUtils: Requested isolation level 1, but transactions are u

25/10/29 21:43:36 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:43:36 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=store_geo_distribution, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:43:36 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:43:36 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=store_geo_distribution, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:43:36 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:43:36 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=store_geo_distribution, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:43:36 WARN JdbcUtils: Requested isolation level 1, but transactions are u

25/10/29 21:43:36 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:43:36 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=store_geo_distribution, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:43:36 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:43:36 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=store_geo_distribution, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:43:36 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:43:36 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=store_geo_distribution, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:43:36 WARN JdbcUtils: Requested isolation level 1, but transactions are u

25/10/29 21:43:36 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:43:36 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=store_geo_distribution, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:43:36 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:43:36 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=store_geo_distribution, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:43:36 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:43:36 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=store_geo_distribution, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:43:36 WARN JdbcUtils: Requested isolation level 1, but transactions are u

In [5]:
# 5. Витрина продаж по поставщикам
supplier_mart, top_5_suppliers, supplier_country_dist = create_supplier_sales_mart(dim_suppliers, fact_sales, dim_products)
write_to_clickhouse(supplier_mart, "supplier_sales_mart")
# write_to_clickhouse(top_5_suppliers, "top_5_suppliers")
# write_to_clickhouse(supplier_country_dist, "supplier_country_distribution")

25/10/29 21:43:38 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=supplier_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:44:09 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=supplier_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:44:09 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:44:09 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=supplier_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:44:09 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:44:09 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=supplier_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.Cl

25/10/29 21:44:14 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=supplier_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:44:14 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:44:14 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=supplier_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:44:14 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:44:14 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=supplier_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:44:14 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:44:14 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtabl

25/10/29 21:44:17 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=supplier_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:44:17 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:44:17 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=supplier_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:44:17 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:44:17 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=supplier_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:44:17 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:44:17 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtabl

25/10/29 21:44:20 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=supplier_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:44:20 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:44:20 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=supplier_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:44:20 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:44:20 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=supplier_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:44:20 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:44:20 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtabl

25/10/29 21:44:23 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=supplier_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:44:23 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:44:23 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=supplier_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:44:23 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:44:23 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=supplier_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:44:23 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:44:23 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtabl

25/10/29 21:44:25 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=supplier_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:44:25 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:44:25 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=supplier_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:44:25 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:44:25 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=supplier_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:44:25 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=supplier_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.Cl

25/10/29 21:44:27 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=supplier_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:44:27 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:44:27 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=supplier_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:44:27 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:44:27 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=supplier_sales_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:44:27 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:44:27 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtabl

In [4]:
# 6. Витрина качества продукции
quality_mart, highest_rated, lowest_rated, most_reviewed, correlation_analysis = create_product_quality_mart(dim_products, fact_sales)    
write_to_clickhouse(quality_mart, "product_quality_mart")
write_to_clickhouse(highest_rated, "highest_rated_products")
write_to_clickhouse(lowest_rated, "lowest_rated_products")
write_to_clickhouse(most_reviewed, "most_reviewed_products")
write_to_clickhouse(correlation_analysis, "correlation_analysis_mart")

25/10/29 21:32:43 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=product_quality_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:33:25 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=product_quality_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:33:25 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:33:25 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=product_quality_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:33:25 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:33:25 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=product_quality_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdb

25/10/29 21:33:34 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=product_quality_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:33:34 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:33:34 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=product_quality_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:33:34 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:33:34 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=product_quality_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:33:34 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:33:34 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbt

25/10/29 21:33:37 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:33:37 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=product_quality_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:33:37 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:33:37 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=product_quality_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:33:37 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:33:38 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=product_quality_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:33:38 WARN JdbcUtils: Requested isolation level 1, but transactions are unsuppo

25/10/29 21:33:42 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=product_quality_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:33:42 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:33:42 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=product_quality_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:33:42 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:33:42 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=product_quality_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:33:42 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:33:42 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbt

25/10/29 21:33:48 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=product_quality_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:33:48 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:33:48 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=product_quality_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:33:48 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:33:48 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=product_quality_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:33:48 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:33:48 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbt

25/10/29 21:33:50 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:33:51 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=product_quality_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:33:51 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:33:52 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=product_quality_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:33:52 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:33:52 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=product_quality_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:33:52 WARN JdbcUtils: Requested isolation level 1, but transactions are unsuppo

25/10/29 21:33:57 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=product_quality_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:33:57 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:33:57 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=product_quality_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:33:57 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:33:57 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=product_quality_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:33:57 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:33:57 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbt

25/10/29 21:35:30 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=correlation_analysis_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}
25/10/29 21:35:30 WARN JdbcUtils: Requested isolation level 1, but transactions are unsupported
25/10/29 21:35:30 WARN ClientConfigProperties: Unknown and unmapped config properties: {dbtable=correlation_analysis_mart, url=jdbc:clickhouse://clickhouse:8123/mydb, driver=com.clickhouse.jdbc.ClickHouseDriver}


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

def create_spark_session():
    return SparkSession.builder \
        .appName("ClickHouse_DataMart") \
        .config("spark.jars", 
                "/opt/spark/jars/clickhouse-jdbc-0.9.2-all-dependencies.jar,/opt/spark/jars/postgresql-42.7.8.jar") \
        .getOrCreate()

def read_postgres_tables(spark):
    """Чтение таблиц из PostgreSQL"""
    
    # Конфигурация PostgreSQL
    pg_properties = {
        "driver": "org.postgresql.Driver",
        "user": "postgres",
        "password": "highload"
    }
    pg_url = "jdbc:postgresql://highload_db:5432/highload_db"
    
    # Чтение таблиц измерений
    dim_customers = spark.read.jdbc(url=pg_url, table="dim_customers", properties=pg_properties)
    dim_products = spark.read.jdbc(url=pg_url, table="dim_products", properties=pg_properties)
    dim_stores = spark.read.jdbc(url=pg_url, table="dim_stores", properties=pg_properties)
    dim_suppliers = spark.read.jdbc(url=pg_url, table="dim_suppliers", properties=pg_properties)
    
    # Чтение таблицы фактов
    fact_sales = spark.read.jdbc(url=pg_url, table="fact_sales", properties=pg_properties)
    
    return dim_customers, dim_products, dim_stores, dim_suppliers, fact_sales

def create_product_sales_mart(dim_products, fact_sales):
    """Витрина продаж по продуктам"""
    
    # Топ-10 самых продаваемых продуктов
    product_sales = fact_sales.groupBy("product_id") \
        .agg(
            sum("quantity").alias("total_quantity"),
            sum("total_price").alias("total_revenue"),
            count("sale_id").alias("sales_count")
        )
    
    product_mart = product_sales.join(dim_products, "product_id") \
        .select(
            "product_id",
            "name",
            "category",
            "total_quantity",
            "total_revenue",
            "sales_count",
            "rating",
            "reviews"
        )
    
    # Топ-10 продуктов по количеству продаж
    top_10_products = product_mart.orderBy(desc("total_quantity")).limit(10)
    
    # Общая выручка по категориям
    category_revenue = product_mart.groupBy("category") \
        .agg(sum("total_revenue").alias("category_revenue"))
    
    return product_mart, top_10_products, category_revenue

def create_customer_sales_mart(dim_customers, fact_sales):
    """Витрина продаж по клиентам"""
    
    customer_sales = fact_sales.groupBy("customer_id") \
        .agg(
            sum("total_price").alias("total_spent"),
            count("sale_id").alias("purchase_count"),
            avg("total_price").alias("avg_order_value")
        )
    
    customer_mart = customer_sales.join(dim_customers, "customer_id") \
        .select(
            "customer_id",
            "first_name",
            "last_name",
            "country",
            "total_spent",
            "purchase_count",
            "avg_order_value"
        )
    
    # Топ-10 клиентов
    top_10_customers = customer_mart.orderBy(desc("total_spent")).limit(10)
    
    # Распределение по странам
    country_distribution = customer_mart.groupBy("country") \
        .agg(
            count("customer_id").alias("customer_count"),
            sum("total_spent").alias("total_spent_by_country")
        )
    
    return customer_mart, top_10_customers, country_distribution

def create_time_sales_mart(fact_sales):
    """Витрина продаж по времени"""
    
    time_mart = fact_sales \
        .withColumn("year", year("sale_date")) \
        .withColumn("month", month("sale_date")) \
        .withColumn("year_month", date_format("sale_date", "yyyy-MM"))
    
    # Месячные и годовые тренды
    monthly_trends = time_mart.groupBy("year", "month", "year_month") \
        .agg(
            sum("total_price").alias("monthly_revenue"),
            count("sale_id").alias("monthly_orders"),
            avg("total_price").alias("avg_order_size_monthly")
        )
    
    yearly_trends = time_mart.groupBy("year") \
        .agg(
            sum("total_price").alias("yearly_revenue"),
            count("sale_id").alias("yearly_orders"),
            avg("total_price").alias("avg_order_size_yearly")
        )
    
    # Сравнение периодов
    period_comparison = monthly_trends.groupBy("month") \
        .agg(
            avg("monthly_revenue").alias("avg_monthly_revenue"),
            avg("monthly_orders").alias("avg_monthly_orders")
        )
    
    return monthly_trends, yearly_trends, period_comparison

def create_store_sales_mart(dim_stores, fact_sales):
    """Витрина продаж по магазинам"""
    
    store_sales = fact_sales.groupBy("store_id") \
        .agg(
            sum("total_price").alias("total_revenue"),
            count("sale_id").alias("total_orders"),
            avg("total_price").alias("avg_order_value")
        )
    
    store_mart = store_sales.join(dim_stores, "store_id") \
        .select(
            "store_id",
            "name",
            "city",
            "country",
            "total_revenue",
            "total_orders",
            "avg_order_value"
        )
    
    # Топ-5 магазинов
    top_5_stores = store_mart.orderBy(desc("total_revenue")).limit(5)
    
    # Распределение по городам и странам
    geo_distribution = store_mart.groupBy("country", "city") \
        .agg(
            sum("total_revenue").alias("total_revenue_by_location"),
            count("store_id").alias("store_count")
        )
    
    return store_mart, top_5_stores, geo_distribution

def create_supplier_sales_mart(dim_suppliers, fact_sales, dim_products):
    """Витрина продаж по поставщикам"""
    
    # Объединяем факты продаж с продуктами для получения supplier_id
    sales_with_supplier = fact_sales.join(dim_products, "product_id")
    
    supplier_sales = sales_with_supplier.groupBy("supplier_id") \
        .agg(
            sum("total_price").alias("total_revenue"),
            count("sale_id").alias("total_sales"),
            avg("unit_price").alias("avg_product_price")
        )
    
    supplier_mart = supplier_sales.join(dim_suppliers, "supplier_id") \
        .select(
            "supplier_id",
            "name",
            "country",
            "total_revenue",
            "total_sales",
            "avg_product_price"
        )
    
    # Топ-5 поставщиков
    top_5_suppliers = supplier_mart.orderBy(desc("total_revenue")).limit(5)
    
    # Распределение по странам
    supplier_country_dist = supplier_mart.groupBy("country") \
        .agg(
            sum("total_revenue").alias("total_revenue_by_country"),
            count("supplier_id").alias("supplier_count")
        )
    
    return supplier_mart, top_5_suppliers, supplier_country_dist

def create_product_quality_mart(dim_products, fact_sales):
    """Витрина качества продукции"""
    
    product_sales_stats = fact_sales.groupBy("product_id") \
        .agg(
            sum("quantity").alias("total_sold"),
            sum("total_price").alias("total_revenue")
        )
    
    quality_mart = product_sales_stats.join(dim_products, "product_id") \
        .select(
            "product_id",
            "name",
            "category",
            "rating",
            "reviews",
            "total_sold",
            "total_revenue"
        )
    
    # Продукты с наивысшим и наименьшим рейтингом
    highest_rated = quality_mart.orderBy(desc("rating")).limit(10)
    lowest_rated = quality_mart.orderBy("rating").limit(10)
    
    # Продукты с наибольшим количеством отзывов
    most_reviewed = quality_mart.orderBy(desc("reviews")).limit(10)
    
    return quality_mart, highest_rated, lowest_rated, most_reviewed

def write_to_clickhouse(df, table_name):
    """Запись DataFrame в ClickHouse с использованием правильного драйвера"""
    
    try:
        df.write \
            .format("jdbc") \
            .option("url", "jdbc:clickhouse://clickhouse:8123/mydb") \
            .option("dbtable", table_name) \
            .option("user", "admin") \
            .option("password", "password") \
            .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
            .mode("append") \
            .save()
        #logger.info(f"Данные успешно записаны в таблицу {table_name}")
    except Exception as e:
        print(e)
        #logger.error(f"Ошибка при записи в таблицу {table_name}: {str(e)}")

def main():
    print('Создаем Spark сессию')
    spark = create_spark_session()
    print('OK')
    try:
        # Читаем данные из PostgreSQL
        dim_customers, dim_products, dim_stores, dim_suppliers, fact_sales = read_postgres_tables(spark)
        print('postgres')
        # Создаем витрины
        
        # 1. Витрина продаж по продуктам
        product_mart, top_10_products, category_revenue = create_product_sales_mart(dim_products, fact_sales)
        print('1')
        write_to_clickhouse(product_mart, "product_sales_mart")
        print('11')
        write_to_clickhouse(top_10_products, "top_10_products")
        write_to_clickhouse(category_revenue, "product_category_revenue")
        print('11')
        
        # 2. Витрина продаж по клиентам
        customer_mart, top_10_customers, country_distribution = create_customer_sales_mart(dim_customers, fact_sales)
        write_to_clickhouse(customer_mart, "customer_sales_mart")
        write_to_clickhouse(top_10_customers, "top_10_customers")
        write_to_clickhouse(country_distribution, "customer_country_distribution")
        
        # 3. Витрина продаж по времени
        monthly_trends, yearly_trends, period_comparison = create_time_sales_mart(fact_sales)
        write_to_clickhouse(monthly_trends, "sales_monthly_trends")
        write_to_clickhouse(yearly_trends, "sales_yearly_trends")
        write_to_clickhouse(period_comparison, "sales_period_comparison")
        
        # 4. Витрина продаж по магазинам
        store_mart, top_5_stores, geo_distribution = create_store_sales_mart(dim_stores, fact_sales)
        write_to_clickhouse(store_mart, "store_sales_mart")
        write_to_clickhouse(top_5_stores, "top_5_stores")
        write_to_clickhouse(geo_distribution, "store_geo_distribution")
        
        # 5. Витрина продаж по поставщикам
        supplier_mart, top_5_suppliers, supplier_country_dist = create_supplier_sales_mart(dim_suppliers, fact_sales, dim_products)
        write_to_clickhouse(supplier_mart, "supplier_sales_mart")
        write_to_clickhouse(top_5_suppliers, "top_5_suppliers")
        write_to_clickhouse(supplier_country_dist, "supplier_country_distribution")
        
        # 6. Витрина качества продукции
        quality_mart, highest_rated, lowest_rated, most_reviewed = create_product_quality_mart(dim_products, fact_sales)
        write_to_clickhouse(quality_mart, "product_quality_mart")
        write_to_clickhouse(highest_rated, "highest_rated_products")
        write_to_clickhouse(lowest_rated, "lowest_rated_products")
        write_to_clickhouse(most_reviewed, "most_reviewed_products")
        
        print("Все витрины успешно созданы и загружены в ClickHouse!")
        
    except Exception as e:
        print(f"Произошла ошибка: {str(e)}")
        raise e
    finally:
        spark.stop()

if __name__ == "__main__":
    main()