### Настройка подключения

In [1]:
from pyspark.sql import SparkSession, Window, Row
from pyspark.sql.functions import (
    current_timestamp, current_date, date_format, col, sum, avg, median, count, lit, expr, row_number, desc, when
)
from datetime import date

spark = SparkSession.builder \
    .appName("Postgres-Spark") \
    .config("spark.jars", "postgresql-42.7.4.jar") \
    .getOrCreate()

jdbc_url = "jdbc:postgresql://postgres:5432/dwh"
connection_properties = {
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver"
}

### Загрузка источников

In [2]:
def read_table(schema, table):
    return spark.read.jdbc(
        url=jdbc_url,
        table=f"{schema}.{table}",
        properties=connection_properties
    )

def write_table(df, schema, table, mode="append"):
    df.write.jdbc(
        url=jdbc_url,
        table=f"{schema}.{table}",
        mode=mode,
        properties=connection_properties
    )

In [3]:

df_sources = {
    "source1": read_table("source1", "craft_market_wide"),
    "source2_masters_products": read_table("source2", "craft_market_masters_products"),
    "source2_orders_customers": read_table("source2", "craft_market_orders_customers"),
    "source3_orders": read_table("source3", "craft_market_orders"),
    "source3_craftsmans": read_table("source3", "craft_market_craftsmans"),
    "source3_customers": read_table("source3", "craft_market_customers")
}

def combine_sources():
    columns = [
        'order_id', 'order_created_date', 'order_completion_date', 'order_status', 'craftsman_id',
        'craftsman_name', 'craftsman_address', 'craftsman_birthday', 'craftsman_email', 'product_id',
        'product_name', 'product_description', 'product_type', 'product_price', 'customer_id',
        'customer_name', 'customer_address', 'customer_birthday', 'customer_email'
    ]
    
    source1_df = df_sources["source1"].select(columns)
    
    source2_df = df_sources["source2_masters_products"] \
        .join(df_sources["source2_orders_customers"], on=[
            "product_id", "craftsman_id"], how="inner") \
        .select(columns)

    source3_df = df_sources["source3_orders"] \
        .join(df_sources["source3_craftsmans"], on="craftsman_id", how="inner") \
        .join(df_sources["source3_customers"], on="customer_id", how="inner") \
        .select(columns)

    return source1_df.union(source2_df).union(source3_df).distinct()

combined_df = combine_sources()

combined_df.show(1)

+--------+------------------+---------------------+------------+------------+--------------+-------------------+------------------+-------------------+----------+--------------------+--------------------+------------+-------------+-----------+----------------+----------------+-----------------+-----------------+
|order_id|order_created_date|order_completion_date|order_status|craftsman_id|craftsman_name|  craftsman_address|craftsman_birthday|    craftsman_email|product_id|        product_name| product_description|product_type|product_price|customer_id|   customer_name|customer_address|customer_birthday|   customer_email|
+--------+------------------+---------------------+------------+------------+--------------+-------------------+------------------+-------------------+----------+--------------------+--------------------+------------+-------------+-----------+----------------+----------------+-----------------+-----------------+
|      86|        2022-07-18|           2022-07-21|       

### Формирование таблиц измерений и фактов в DWH

In [4]:
from pyspark.sql import functions as F

dwh_customers_df = read_table("dwh", "d_customers")
dwh_products_df = read_table("dwh", "d_products")
dwh_craftsmans_df = read_table("dwh", "d_craftsmans")
dwh_orders_df = read_table("dwh", "f_orders")


In [5]:
from pyspark.sql import functions as F

# Чтение данных о клиентах
customers_data_df = read_table("dwh", "d_customers")

# Указание интересующих нас колонок
fields_customers = ['customer_name', 'customer_address', 'customer_birthday', 'customer_email']

# Получение уникальных новых клиентов
new_customers_df = combined_df.select(fields_customers).distinct()

# Определение новых клиентов, которых нет в таблице
new_unique_customers = new_customers_df.join(
    customers_data_df.select(fields_customers),
    on=fields_customers,
    how='left_anti'
).withColumn("load_dttm", F.current_timestamp()) 

# Кэшируем новые данные клиентов
new_unique_customers = new_unique_customers.cache()

# Записываем новые данные в DWH
write_table(new_unique_customers, "dwh", "d_customers")

# Проверяем обновленные данные
updated_customers = read_table("dwh", "d_customers")
updated_customers.show(5)

# Обогащаем новые данные о клиентах
final_customers_df = new_unique_customers.alias("new_customers").join(
    customers_data_df.alias("existing_customers"),
    on=[
        F.col("new_customers.customer_name") == F.col("existing_customers.customer_name"),
        F.col("new_customers.customer_address") == F.col("existing_customers.customer_address"),
        F.col("new_customers.customer_birthday") == F.col("existing_customers.customer_birthday"),
        F.col("new_customers.customer_email") == F.col("existing_customers.customer_email")
    ],
    how='left'
).select(
    F.col("existing_customers.customer_id").alias("customer_id"),
    F.col("new_customers.customer_name"),
    F.col("new_customers.customer_address"),
    F.col("new_customers.customer_birthday"),
    F.col("new_customers.customer_email"),
    F.col("new_customers.load_dttm")
)

# Просмотр результата с добавленным customer_id
final_customers_df.show(1)

+-----------+---------------+--------------------+-----------------+--------------------+--------------------+
|customer_id|  customer_name|    customer_address|customer_birthday|      customer_email|           load_dttm|
+-----------+---------------+--------------------+-----------------+--------------------+--------------------+
|          1|Ariel Hidderley|3616 American Ash...|       1995-05-21|ahidderleyik@tiny...|2025-01-22 13:02:...|
|          2|    Del Kindred|220 Grasskamp Par...|       1993-06-03|     dkindredki@g.co|2025-01-22 13:02:...|
|          3|    Dynah Lough|286 Mitchell Terrace|       1999-04-17|dloughlz@blogspot...|2025-01-22 13:02:...|
|          4| Goldina Napper|    592 Nova Parkway|       2004-11-08|gnapperp7@hatena....|2025-01-22 13:02:...|
|          5|  Dinny McGlynn|      1 Sommers Hill|       2000-10-15|dmcglynn6i@archiv...|2025-01-22 13:02:...|
+-----------+---------------+--------------------+-----------------+--------------------+--------------------+
o

In [6]:
# Чтение данных о продуктах
products_data_df = read_table("dwh", "d_products")

# Указание интересующих нас колонок для продуктов
fields_products = ['product_name', 'product_description', 'product_type', 'product_price']

# Получение уникальных новых продуктов
new_products_df = combined_df.select(fields_products).distinct()

# Определение новых продуктов
new_unique_products = new_products_df.join(
    products_data_df.select(fields_products),
    on=fields_products,
    how='left_anti'
).withColumn("load_dttm", F.current_timestamp()) 

# Кэшируем новые данные о продуктах
new_unique_products = new_unique_products.cache()

# Записываем новые продукты в DWH
write_table(new_unique_products, "dwh", "d_products")

# Проверяем обновленные данные о продуктах
updated_products = read_table("dwh", "d_products")
updated_products.show(5)

# Обогащаем новые данные о продуктах
final_products_df = new_unique_products.alias("new_products").join(
    products_data_df.alias("existing_products"),
    on=[
        F.col("new_products.product_name") == F.col("existing_products.product_name"),
        F.col("new_products.product_description") == F.col("existing_products.product_description"),
        F.col("new_products.product_type") == F.col("existing_products.product_type"),
        F.col("new_products.product_price") == F.col("existing_products.product_price")
    ],
    how='left'
).select(
    F.col("existing_products.product_id").alias("product_id"),
    F.col("new_products.product_name"),
    F.col("new_products.product_description"),
    F.col("new_products.product_type"),
    F.col("new_products.product_price"),
    F.col("new_products.load_dttm")
)

# Просмотр результата с добавленным product_id
final_products_df.show(1)

+----------+--------------------+--------------------+--------------------+-------------+--------------------+
|product_id|        product_name| product_description|        product_type|product_price|           load_dttm|
+----------+--------------------+--------------------+--------------------+-------------+--------------------+
|         1|        Pathiri Podi|Pathiri is a trad...|Foodgrains, Oil &...|           12|2025-01-22 13:02:...|
|         2|HandMade Gold-Pla...|Gold-plated and b...|             clothes|           10|2025-01-22 13:02:...|
|         3|HandMade Plus Siz...|Blue and beige la...|             clothes|           19|2025-01-22 13:02:...|
|         4|Foot Pumice Paddl...|HandMade foot pum...|    Beauty & Hygiene|           25|2025-01-22 13:02:...|
|         5|Bergamot & Cedarw...|HandMade Luxury S...|    Beauty & Hygiene|           31|2025-01-22 13:02:...|
+----------+--------------------+--------------------+--------------------+-------------+--------------------+
o

In [7]:
# Чтение данных о мастерах
craftsman_data_df = read_table("dwh", "d_craftsmans")

# Указание интересующих нас колонок для мастеров
fields_craftsmans = ['craftsman_name', 'craftsman_address', 'craftsman_birthday', 'craftsman_email']

# Получение уникальных новых мастеров
new_craftsman_df = combined_df.select(fields_craftsmans).distinct()

# Определение новых мастеров
new_unique_craftsmans = new_craftsman_df.join(
    craftsman_data_df.select(fields_craftsmans),
    on=fields_craftsmans,
    how='left_anti'
).withColumn("load_dttm", F.current_timestamp()) 

# Кэшируем новые данные о мастерах
new_unique_craftsmans = new_unique_craftsmans.cache()

# Записываем новые данные о мастерах в DWH
write_table(new_unique_craftsmans, "dwh", "d_craftsmans")

# Проверяем обновленные данные о мастерах
updated_craftsmans = read_table("dwh", "d_craftsmans")
updated_craftsmans.show(5)

# Обогащаем новые данные о мастерах
final_craftsmans_df = new_unique_craftsmans.alias("new_craftsmans").join(
    craftsman_data_df.alias("existing_craftsmans"),
    on=[
        F.col("new_craftsmans.craftsman_name") == F.col("existing_craftsmans.craftsman_name"),
        F.col("new_craftsmans.craftsman_address") == F.col("existing_craftsmans.craftsman_address"),
        F.col("new_craftsmans.craftsman_birthday") == F.col("existing_craftsmans.craftsman_birthday"),
        F.col("new_craftsmans.craftsman_email") == F.col("existing_craftsmans.craftsman_email")
    ],
    how='left'
).select(
    F.col("existing_craftsmans.craftsman_id").alias("craftsman_id"),
    F.col("new_craftsmans.craftsman_name"),
    F.col("new_craftsmans.craftsman_address"),
    F.col("new_craftsmans.craftsman_birthday"),
    F.col("new_craftsmans.craftsman_email"),
    F.col("new_craftsmans.load_dttm")
)

# Просмотр результата с добавленным craftsman_id
final_craftsmans_df.show(1)

+------------+---------------+--------------------+------------------+--------------------+--------------------+
|craftsman_id| craftsman_name|   craftsman_address|craftsman_birthday|     craftsman_email|           load_dttm|
+------------+---------------+--------------------+------------------+--------------------+--------------------+
|           1| Khalil Heining|  83956 Manley Plaza|        1998-02-08|ymcwhorter17@inte...|2025-01-22 13:02:...|
|           2|     Jake Draye|      2 Bluestem Way|        2003-08-01|dwannes8v@newsvin...|2025-01-22 13:02:...|
|           3|  Gustave Irwin|   54700 Swallow Way|        1992-09-02|jwherritc1@cornel...|2025-01-22 13:02:...|
|           4|  Zelma Scarffe|5560 Blackbird Plaza|        2000-05-24|ccripinh2@list-ma...|2025-01-22 13:02:...|
|           5|Katlin Guilloud|       9659 8th Lane|        1992-01-08|bsheberjf@pcworld...|2025-01-22 13:02:...|
+------------+---------------+--------------------+------------------+--------------------+-----

In [8]:
from pyspark.sql import functions as F

# Чтение данных о заказах
orders_data_df = read_table("dwh", "f_orders")

# Указание интересующих нас колонок для заказов
order_fields = ['product_id', 'craftsman_id', 'customer_id', 'order_created_date', 'order_completion_date', 'order_status']

# Получение уникальных новых заказов
new_orders_df = combined_df.select(order_fields).distinct()

# Определение новых заказов
new_unique_orders = new_orders_df.join(
    orders_data_df.select(order_fields),
    on=order_fields,
    how='left_anti'
).withColumn("load_dttm", F.current_timestamp()) 

# Кэшируем новые данные о заказах
new_unique_orders = new_unique_orders.cache()

# Записываем новые заказы в DWH
write_table(new_unique_orders, "dwh", "f_orders")

# Проверяем обновленные данные о заказах
updated_orders = read_table("dwh", "f_orders")
updated_orders.show(5)

# Обогащаем новые данные о заказах
final_orders_df = new_unique_orders.alias("new_orders").join(
    orders_data_df.alias("existing_orders"),
    on=[
        F.col("new_orders.product_id") == F.col("existing_orders.product_id"),
        F.col("new_orders.craftsman_id") == F.col("existing_orders.craftsman_id"),
        F.col("new_orders.customer_id") == F.col("existing_orders.customer_id"),
        F.col("new_orders.order_created_date") == F.col("existing_orders.order_created_date"),
        F.col("new_orders.order_completion_date") == F.col("existing_orders.order_completion_date"),
        F.col("new_orders.order_status") == F.col("existing_orders.order_status")
    ],
    how='left'
).select(
    F.col("existing_orders.order_id").alias("order_id"),
    F.col("new_orders.product_id"),
    F.col("new_orders.craftsman_id"),
    F.col("new_orders.customer_id"),
    F.col("new_orders.order_created_date"),
    F.col("new_orders.order_completion_date"),
    F.col("new_orders.order_status"),
    F.col("new_orders.load_dttm")
)

# Просмотр результата с добавленным order_id
final_orders_df.show(1)

+--------+----------+------------+-----------+------------------+---------------------+------------+--------------------+
|order_id|product_id|craftsman_id|customer_id|order_created_date|order_completion_date|order_status|           load_dttm|
+--------+----------+------------+-----------+------------------+---------------------+------------+--------------------+
|       1|       157|         157|        157|        2022-06-25|           2022-06-29|        done|2025-01-22 13:02:...|
|       2|       805|         805|        805|        2022-09-05|           2022-09-07|        done|2025-01-22 13:02:...|
|       3|       122|         122|        122|        2022-07-09|           2022-07-11|        done|2025-01-22 13:02:...|
|       4|       630|         630|        630|        2022-08-16|           2022-08-17|        done|2025-01-22 13:02:...|
|       5|        11|          11|         11|        2022-05-12|           2022-05-14|        done|2025-01-22 13:02:...|
+--------+----------+---

### Заполнение витрины

In [9]:
# Добавляем столбец для периода отчета (по месяцу и году)
final_orders_df = final_orders_df.withColumn("report_period", F.date_format("order_created_date", "yyyy-MM"))

# Создаем окно для нахождения самой популярной категории (по количеству заказов)
window_spec = Window.partitionBy("craftsman_id", "report_period").orderBy(F.desc("count_category"))

# Считаем количество товаров каждой категории для каждого мастера и периода
product_category_counts = final_orders_df.join(
    final_products_df.alias("products"),
    final_orders_df["product_id"] == F.col("products.product_id")
).groupBy("craftsman_id", "report_period", "products.product_type") \
    .agg(F.count("products.product_type").alias("count_category"))

# Находим топ-1 категорию для каждого мастера и периода
top_categories = product_category_counts.withColumn(
    "row_num", F.row_number().over(window_spec)
).filter(F.col("row_num") == 1).select("craftsman_id", "report_period", "product_type")

# Создаем отчет по данным мастеров, включая агрегированные данные
new_craftsman_report_datamart_df = final_orders_df.join(
    final_craftsmans_df.alias("craftsmans"),
    final_orders_df.craftsman_id == F.col("craftsmans.craftsman_id")
).join(
    final_products_df.alias("products"),
    final_orders_df.product_id == F.col("products.product_id")
).join(
    final_customers_df.alias("customers"),
    final_orders_df.customer_id == F.col("customers.customer_id")
).groupBy(
    F.col("craftsmans.craftsman_id"),
    F.col("craftsmans.craftsman_name"),
    F.col("craftsmans.craftsman_address"),
    F.col("craftsmans.craftsman_birthday"),
    F.col("craftsmans.craftsman_email"),
    F.col("report_period")
).agg(
    F.sum(F.col("products.product_price") * 0.9).alias("craftsman_money"),
    F.sum(F.col("products.product_price") * 0.1).alias("platform_money"),
    F.count(final_orders_df.order_id).alias("count_order"),
    F.avg(F.col("products.product_price")).alias("avg_price_order"),
    F.avg(F.expr("DATEDIFF(current_date(), customers.customer_birthday) / 365.25")).alias("avg_age_customer"),
    F.median(F.expr("DATEDIFF(order_completion_date, order_created_date)")).alias("median_time_order_completed"),
    F.sum(F.when(final_orders_df.order_status == "created", 1).otherwise(0)).alias("count_order_created"),
    F.sum(F.when(final_orders_df.order_status == "in_progress", 1).otherwise(0)).alias("count_order_in_progress"),
    F.sum(F.when(final_orders_df.order_status == "delivery", 1).otherwise(0)).alias("count_order_delivery"),
    F.sum(F.when(final_orders_df.order_status == "done", 1).otherwise(0)).alias("count_order_done"),
    F.sum(F.when(final_orders_df.order_status != "done", 1).otherwise(0)).alias("count_order_not_done")
)

# Присоединяем топ-1 категорию товаров
new_craftsman_report_datamart_df = new_craftsman_report_datamart_df.join(
    top_categories,
    ["craftsman_id", "report_period"],
    "left"
).withColumnRenamed("product_type", "top_product_category")

# Добавляем столбец с текущей датой загрузки (опционально)
new_craftsman_report_datamart_df = new_craftsman_report_datamart_df.withColumn("load_dttm", F.current_timestamp())

# Проверяем результат
new_craftsman_report_datamart_df.toPandas().head(5)

Unnamed: 0,craftsman_id,report_period,craftsman_name,craftsman_address,craftsman_birthday,craftsman_email,craftsman_money,platform_money,count_order,avg_price_order,avg_age_customer,median_time_order_completed,count_order_created,count_order_in_progress,count_order_delivery,count_order_done,count_order_not_done,top_product_category,load_dttm


In [10]:
# Чтение существующих данных из таблицы craftsman_report_datamart
craftsman_report_datamart = read_table("dwh", "craftsman_report_datamart")

# Ключевые столбцы для идентификации строки
key_columns = ["craftsman_id", "report_period"]

# 1. Определяем новые строки (INSERT)
# Используем "left_anti" join, чтобы найти строки, которые есть в новом датафрейме, но отсутствуют в существующем
new_rows_df = new_craftsman_report_datamart_df.join(
    craftsman_report_datamart.select(*key_columns),
    key_columns,
    how="left_anti"
)

# 2. Определяем обновленные строки (UPDATE)
# Используем "inner" join, чтобы получить строки, которые присутствуют в обоих датафреймах, и затем фильтруем
updated_rows_df = new_craftsman_report_datamart_df.alias("new").join(
    craftsman_report_datamart.alias("existing"),
    key_columns,
    how="inner"
).filter(
    # Проверяем расхождения в значимых столбцах
    (
        col("new.craftsman_money") != col("existing.craftsman_money")
    ) |
    (
        col("new.platform_money") != col("existing.platform_money")
    ) |
    (
        col("new.count_order") != col("existing.count_order")
    ) |
    (
        col("new.avg_price_order") != col("existing.avg_price_order")
    ) |
    (
        col("new.avg_age_customer") != col("existing.avg_age_customer")
    ) |
    (
        col("new.median_time_order_completed") != col("existing.median_time_order_completed")
    ) |
    (
        col("new.count_order_created") != col("existing.count_order_created")
    ) |
    (
        col("new.count_order_in_progress") != col("existing.count_order_in_progress")
    ) |
    (
        col("new.count_order_delivery") != col("existing.count_order_delivery")
    ) |
    (
        col("new.count_order_done") != col("existing.count_order_done")
    ) |
    (
        col("new.count_order_not_done") != col("existing.count_order_not_done")
    ) |
    (
        col("new.top_product_category") != col("existing.top_product_category")
    )
).select("new.*")


In [11]:
new_craftsman_report_datamart_df

DataFrame[craftsman_id: bigint, report_period: string, craftsman_name: string, craftsman_address: string, craftsman_birthday: date, craftsman_email: string, craftsman_money: double, platform_money: double, count_order: bigint, avg_price_order: double, avg_age_customer: decimal(22,10), median_time_order_completed: double, count_order_created: bigint, count_order_in_progress: bigint, count_order_delivery: bigint, count_order_done: bigint, count_order_not_done: bigint, top_product_category: string, load_dttm: timestamp]

In [12]:
pip install psycopg2-binary

Collecting psycopg2-binary
  Using cached psycopg2_binary-2.9.10-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.9 kB)
Downloading psycopg2_binary-2.9.10-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.0/3.0 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m0m
[?25hInstalling collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.9.10
Note: you may need to restart the kernel to use updated packages.


In [13]:
import psycopg2
from psycopg2.extras import execute_values

def update_existing_rows(df, schema, table):
    # Преобразуем DataFrame в список словарей
    rows = df.collect()
    update_query = f"""
    UPDATE {schema}.{table} AS target
    SET craftsman_money = data.craftsman_money,
        platform_money = data.platform_money,
        count_order = data.count_order,
        avg_price_order = data.avg_price_order,
        avg_age_customer = data.avg_age_customer,
        median_time_order_completed = data.median_time_order_completed,
        count_order_created = data.count_order_created,
        count_order_in_progress = data.count_order_in_progress,
        count_order_delivery = data.count_order_delivery,
        count_order_done = data.count_order_done,
        count_order_not_done = data.count_order_not_done,
        top_product_category = data.top_product_category,
        load_dttm = data.load_dttm
    FROM (VALUES %s) AS data (
        craftsman_id, report_period, craftsman_money, platform_money, count_order, avg_price_order,
        avg_age_customer, median_time_order_completed, count_order_created, count_order_in_progress,
        count_order_delivery, count_order_done, count_order_not_done, top_product_category, load_dttm
    )
    WHERE target.craftsman_id = data.craftsman_id AND target.report_period = data.report_period
    """
    
    # Устанавливаем соединение с Postgres
    conn = psycopg2.connect(
        dbname="postgres",
        user="postgres",
        password="postgres",
        host="postgres",
        port="5432"
    )
    with conn.cursor() as cursor:
        # Формируем данные для VALUES
        values = [
            (
                row["craftsman_id"], row["report_period"], row["craftsman_money"], row["platform_money"],
                row["count_order"], row["avg_price_order"], row["avg_age_customer"], 
                row["median_time_order_completed"], row["count_order_created"],
                row["count_order_in_progress"], row["count_order_delivery"], row["count_order_done"],
                row["count_order_not_done"], row["top_product_category"], row["load_dttm"]
            )
            for row in rows
        ]
        execute_values(cursor, update_query, values)
    conn.commit()
    conn.close()

In [14]:
# Записываем данные новых строк в таблицу
write_table(new_rows_df.drop("load_dttm"), "dwh", "craftsman_report_datamart")

In [15]:

load_dates_dma = read_table("dwh", "load_dates_craftsman_report_datamart")

load_dates_data = [Row(load_dttm=date.today())]  # Используем текущую дату из модуля datetime
load_dates_df = spark.createDataFrame(load_dates_data).exceptAll(load_dates_dma.select('load_dttm'))


write_table(load_dates_df, "dwh", "load_dates_craftsman_report_datamart")

read_table("dwh", "load_dates_craftsman_report_datamart").show()

+---+----------+
| id| load_dttm|
+---+----------+
|  1|2025-01-22|
+---+----------+

