In [125]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, sum as spark_sum, expr, datediff, when, first, max as spark_max
from pyspark.sql.functions import current_date
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
from pyspark.sql.types import DateType
import pyspark.sql.functions as F

In [126]:
# Инициализация сессии Spark
spark = SparkSession.builder \
    .appName("DWH Data Pipeline - Craftsman Marketplace") \
    .config("spark.jars", "/opt/spark/jars/postgresql-42.6.0.jar") \
    .getOrCreate()

# Параметры подключения к PostgreSQL
jdbc_url = "jdbc:postgresql://postgres_container:5432/postgres" # JDBC-URL для Postgres
jdbc_props = {
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver"
}

In [127]:
# ==========Чтение исходных таблиц-источников==========#
# Читаем первую исходную таблицу (source1.craft_market_wide) из PostgreSQL
df_source1 = spark.read.jdbc(
    url=jdbc_url,
    table="source1.craft_market_wide",
    properties=jdbc_props
)

# Читаем таблицы из source2
df_source2_masters_products = spark.read.jdbc(
    url=jdbc_url,
    table="source2.craft_market_masters_products",
    properties=jdbc_props
)
df_source2_orders_customers = spark.read.jdbc(
    url=jdbc_url,
    table="source2.craft_market_orders_customers",
    properties=jdbc_props
)
# Соединяем две таблицы source2 по полю craftsman_id
df_source2 = df_source2_orders_customers.join(
    df_source2_masters_products,
    on="craftsman_id",
    how="inner"
)

# Читаем таблицы из source3
df_source3_craftsmans = spark.read.jdbc(
    url=jdbc_url,
    table="source3.craft_market_craftsmans",
    properties=jdbc_props
)
df_source3_customers = spark.read.jdbc(
    url=jdbc_url,
    table="source3.craft_market_customers",
    properties=jdbc_props
)
df_source3_orders = spark.read.jdbc(
    url=jdbc_url,
    table="source3.craft_market_orders",
    properties=jdbc_props
)
# Объединяем три таблицы source3 по полям craftsman_id и customer_id
df_source3 = df_source3_orders \
    .join(df_source3_craftsmans, on="craftsman_id", how="inner") \
    .join(df_source3_customers, on="customer_id", how="inner")

In [128]:
# ==========объединение данных из трех источников==========#
# Список колонок, которые должны быть общими для всех источников
common_columns = [
    'craftsman_name', 'craftsman_address', 'craftsman_birthday', 'craftsman_email',
    'product_name', 'product_description', 'product_type', 'product_price',
    'order_created_date', 'order_completion_date', 'order_status',
    'customer_name', 'customer_address', 'customer_birthday', 'customer_email'
]

# Собираем все DataFrame источников в список
list_dfs = [df_source1, df_source2, df_source3]
# Создаём пустой список для выравненных (aligned) DataFrame
list_dfs_aligned = []
for df_item in list_dfs:
    # Проверяем каждую колонку из common_columns:
    # если её нет в df_item, используем lit(None) вместо неё
    new_df = df_item.select([
        col(c) if c in df_item.columns else lit(None).alias(c)
        for c in common_columns
    ])
    # Добавляем готовый aligned_df в список list_dfs_aligned
    list_dfs_aligned.append(new_df)

# Объединяем (union) все три приведённых DataFrame в один df_unified
df_unified = list_dfs_aligned[0].union(list_dfs_aligned[1]).union(list_dfs_aligned[2])

In [129]:
# ========== 3. Формирование таблиц измерений (d_*) и таблицы фактов (f_orders) ==========

# Создаём окно для ранжирования мастеров (d_craftsmans)
w_craftsman = Window.orderBy("craftsman_name", "craftsman_birthday")
# Формируем таблицу измерения мастеров, уникализируем по (craftsman_name, craftsman_birthday)
df_dim_craftsmans = df_unified.select(
    "craftsman_name", "craftsman_address", "craftsman_birthday", "craftsman_email"
).dropDuplicates(["craftsman_name", "craftsman_birthday"]) \
 .withColumn("craftsman_id", row_number().over(w_craftsman))

# Аналогичное окно для клиентов
w_customer = Window.orderBy("customer_name", "customer_birthday")
df_dim_customers = df_unified.select(
    "customer_name", "customer_address", "customer_birthday", "customer_email"
).dropDuplicates(["customer_name", "customer_birthday"]) \
 .withColumn("customer_id", row_number().over(w_customer))

# Аналогичное окно для продуктов
w_product = Window.orderBy("product_name", "product_price")
df_dim_products = df_unified.select(
    "product_name", "product_description", "product_type", "product_price"
).dropDuplicates(["product_name", "product_price"]) \
 .withColumn("product_id", row_number().over(w_product))

# Факт: Заказы (f_orders)
# Создаём окно для фактов (f_orders), будем генерировать order_id
w_fact_orders = Window.orderBy("product_id", "craftsman_id", "customer_id")
# Формируем таблицу фактов, связываем по ключам
df_fact_orders = df_unified \
    .join(df_dim_products, ["product_name", "product_price"], "left") \
    .join(df_dim_craftsmans, ["craftsman_name", "craftsman_birthday"], "left") \
    .join(df_dim_customers, ["customer_name", "customer_birthday"], "left") \
    .select(
        row_number().over(w_fact_orders).alias("order_id"), # Генерируем surrogate key для заказов
        col("product_id"),
        col("craftsman_id"),
        col("customer_id"),
        col("order_created_date"),
        col("order_completion_date"),
        col("order_status")
    )

# Добавляем столбец загрузки (load_dttm)
load_timestamp = current_timestamp()
df_dim_craftsmans = df_dim_craftsmans.withColumn("load_dttm", load_timestamp)
df_dim_customers = df_dim_customers.withColumn("load_dttm", load_timestamp)
df_dim_products = df_dim_products.withColumn("load_dttm", load_timestamp)
df_fact_orders = df_fact_orders.withColumn("load_dttm", load_timestamp)

# Печатаем размеры (кол-во строк) для контроля
print("Final unified DataFrame count:", df_unified.count())
print("Dim Craftsman count:", df_dim_craftsmans.count())
print("Dim Customers count:", df_dim_customers.count())
print("Dim Products count:", df_dim_products.count())
print("Fact Orders count:", df_fact_orders.count())

Final unified DataFrame count: 2997
Dim Craftsman count: 2995
Dim Customers count: 2997
Dim Products count: 2990
Fact Orders count: 2997


In [130]:
# ==========функция записи уникальных данных в DWH (чтобы не дублировать записи) ==========#
 
#  Читает значения существующих ключей из таблицы DWH, 
#   исключает дубликаты, после чего дозаписывает (append) в целевую таблицу.
def write_unique_data(df_data, table_name, key_col, jdbc_url, jdbc_props):

    # Читаем уже существующие ключи
    existing_key_df = spark.read.jdbc(url=jdbc_url, table=table_name, properties=jdbc_props)
    existing_keys = existing_key_df.select(key_col).rdd.flatMap(lambda x: x).collect()

    # Фильтруем новые записи по тому, что ключ не входит в уже имеющиеся
    df_filtered = df_data.filter(~df_data[key_col].isin(existing_keys))

    # Запись только уникальных значений
    df_filtered.write.jdbc(url=jdbc_url, table=table_name, mode="append", properties=jdbc_props)

In [131]:
# ==========запись в DWH: измерения и факт ==========

write_unique_data(
    df_data=df_dim_craftsmans,
    table_name="dwh.d_craftsmans",
    key_col="craftsman_id",
    jdbc_url=jdbc_url,
    jdbc_props=jdbc_props
)

write_unique_data(
    df_data=df_dim_customers,
    table_name="dwh.d_customers",
    key_col="customer_id",
    jdbc_url=jdbc_url,
    jdbc_props=jdbc_props
)

write_unique_data(
    df_data=df_dim_products,
    table_name="dwh.d_products",
    key_col="product_id",
    jdbc_url=jdbc_url,
    jdbc_props=jdbc_props
)

write_unique_data(
    df_data=df_fact_orders,
    table_name="dwh.f_orders",
    key_col="order_id",
    jdbc_url=jdbc_url,
    jdbc_props=jdbc_props
)

In [132]:
# ==========Организация инкрементальной загрузки в витрину dwh.craftsman_report_datamart ==========#

# Читаем таблицы из DWH (уже заполненные измерения и факт)
df_d_craftsmans = spark.read.jdbc(jdbc_url, "dwh.d_craftsmans", properties=jdbc_props)
df_d_customers = spark.read.jdbc(jdbc_url, "dwh.d_customers", properties=jdbc_props)
df_d_products = spark.read.jdbc(jdbc_url, "dwh.d_products", properties=jdbc_props)
df_f_orders = spark.read.jdbc(jdbc_url, "dwh.f_orders", properties=jdbc_props)

# Читаем таблицу с датами загрузок
df_load_dates = spark.read.jdbc(jdbc_url, "dwh.load_dates_craftsman_report_datamart", properties=jdbc_props)

# Получаем последнюю дату загрузки
max_date_row = df_load_dates.agg(spark_max("load_dttm")).collect()[0][0]
last_load_date = max_date_row if max_date_row is not None else None

# Фильтрация измерений и фактов по load_dttm (если last_load_date не None)
if last_load_date:
    df_d_craftsmans = df_d_craftsmans.filter(col("load_dttm") > last_load_date)
    df_d_customers = df_d_customers.filter(col("load_dttm") > last_load_date)
    df_d_products = df_d_products.filter(col("load_dttm") > last_load_date)
    df_f_orders = df_f_orders.filter(col("load_dttm") > last_load_date)

# Объединяем таблицы измерений и фактов в один df_joined
df_joined = (
    df_f_orders
    .join(df_d_products, "product_id")
    .join(df_d_craftsmans, "craftsman_id")
    .join(df_d_customers, "customer_id")
)

# Добавляем "report_period" по дате создания заказа
df_joined = df_joined.withColumn(
    "report_period",
    expr("date_format(order_created_date, 'yyyy-MM')")
)

# Считаем длительность выполнения заказа (completion - created)
df_joined = df_joined.withColumn(
    "days_to_complete",
    datediff(col("order_completion_date"), col("order_created_date"))
)

# Определяем приблизительный возраст клиента
df_joined = df_joined.withColumn(
    "customer_age",
    datediff(current_date(), col("customer_birthday")) / 365.25
)

# Агрегируем для витрины
df_agg = df_joined.groupBy(
    "craftsman_id",
    "craftsman_name",
    "craftsman_address",
    "craftsman_birthday",
    "craftsman_email",
    "report_period"
).agg(
    spark_sum(col("product_price") * 0.9).alias("craftsman_money"),
    spark_sum(col("product_price") * 0.1).alias("platform_money"),
    count("order_id").alias("count_order"),
    avg("product_price").alias("avg_price_order"),
    avg("customer_age").alias("avg_age_customer"),
    expr("percentile_approx(days_to_complete, 0.5)").alias("median_time_order_completed"),
    count(when(col("order_status") == "created", True)).alias("count_order_created"),
    count(when(col("order_status") == "in progress", True)).alias("count_order_in_progress"),
    count(when(col("order_status") == "delivery", True)).alias("count_order_delivery"),
    count(when(col("order_status") == "done", True)).alias("count_order_done"),
    count(when(col("order_status").isin("created", "in progress", "delivery"), True)).alias("count_order_not_done")
)

# Определяем самую популярную категорию товара для каждого мастера и периода
df_product_category_count = df_joined.groupBy(
    "craftsman_id",
    "report_period",
    "product_type"
).agg(
    count("product_id").alias("product_count")
)

# Задаём окно для сортировки product_type по убыванию product_count
w_category = Window.partitionBy("craftsman_id", "report_period").orderBy(col("product_count").desc())
# Находим категорию, которая идёт на 1-м месте (rank = 1)
df_top_category = (
    df_product_category_count
    .withColumn("rank", row_number().over(w_category))
    .filter(col("rank") == 1)
    .select(
        "craftsman_id",
        "report_period",
        col("product_type").alias("top_product_category")
    )
)

# Склеиваем итоговый датафрейм для витрины
df_final_report = df_agg.join(df_top_category, ["craftsman_id", "report_period"], how="left")

# Записываем витрину в таблицу DWH
df_final_report.write.jdbc(
    url=jdbc_url,
    table="dwh.craftsman_report_datamart",
    mode="append",
    properties=jdbc_props
)

# Обновляем таблицу с датой последней загрузки,
df_new_load_date = spark.range(1).select(current_date().alias("load_dttm"))

df_new_load_date.write.jdbc(
    url=jdbc_url,
    table="dwh.load_dates_craftsman_report_datamart",
    mode="append",
    properties=jdbc_props
)

print("All done!")

All done!
