In [34]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("DWH_Spark_Postgres").config("spark.jars", "/mnt/driver/postgresql-42.7.4.jar").getOrCreate()


In [35]:
# Параметры подключения PostgreSQL
url = "jdbc:postgresql://my_postgres:5432/dwh"
properties = {
    "user": "admin",
    "password": "admin",
    "driver": "org.postgresql.Driver"
}

In [48]:
# Чтение таблиц измерений и фактов из DWH
d_craftsmans_df = spark.read.jdbc(url=url, table="dwh.d_craftsmans", properties=properties)
d_products_df = spark.read.jdbc(url=url, table="dwh.d_products", properties=properties)
f_orders_df = spark.read.jdbc(url=url, table="dwh.f_orders", properties=properties)

# Проверка данных
d_craftsmans_df.show(5)
d_products_df.show(5)
f_orders_df.show(5)


+------------+------------------+--------------------+------------------+--------------------+--------------------+
|craftsman_id|    craftsman_name|   craftsman_address|craftsman_birthday|     craftsman_email|           load_dttm|
+------------+------------------+--------------------+------------------+--------------------+--------------------+
|           1|   Griffith MacVay|   3370 Laurel Point|        1994-02-15|drickwood1@soundc...|2024-12-24 10:46:...|
|           2|       Deeyn Jaggs|85 Stone Corner C...|        1996-12-26| clanghorn2@cnbc.com|2024-12-24 10:46:...|
|           3|    Shawn Alentyev| 36752 Lyons Terrace|        1993-01-18|lmcdougall3@trell...|2024-12-24 10:46:...|
|           4|Jorrie Brigginshaw|  5693 Boyd Junction|        2001-12-28|dfettes4@youtube.com|2024-12-24 10:46:...|
|           5|      Fae Winscomb|    2705 Elgar Trail|        2004-07-15|     vlapree5@360.cn|2024-12-24 10:46:...|
+------------+------------------+--------------------+------------------

In [49]:
# Чтение данных из таблиц DWH
f_orders_df = spark.read.jdbc(
    url=url,
    table="dwh.f_orders",
    properties=properties
)

d_craftsmans_df = spark.read.jdbc(
    url=url,
    table="dwh.d_craftsmans",
    properties=properties
)

d_products_df = spark.read.jdbc(
    url=url,
    table="dwh.d_products",
    properties=properties
)

# Проверяем количество строк в каждом DataFrame
print(f"Количество строк в таблице f_orders: {f_orders_df.count()}")
print(f"Количество строк в таблице d_craftsmans: {d_craftsmans_df.count()}")
print(f"Количество строк в таблице d_products: {d_products_df.count()}")


Количество строк в таблице f_orders: 999
Количество строк в таблице d_craftsmans: 999
Количество строк в таблице d_products: 999


In [51]:
from pyspark.sql.functions import col, count, sum, max as spark_max, current_timestamp

# Формирование витрины данных
craftsman_report_df = (
    f_orders_df
    .filter(col("order_status") == "done")  # Учитываем только завершенные заказы
    .join(d_craftsmans_df, on="craftsman_id", how="inner")  # Объединение с таблицей мастеров
    .join(d_products_df, on="product_id", how="inner")  # Объединение с таблицей продуктов
    .groupBy(
        "craftsman_id",  # Группируем по мастеру
        "craftsman_name",
        "product_type"  # Тип продукта
    )
    .agg(
        count("*").alias("total_orders"),                # Количество заказов
        sum("product_price").alias("total_sales"),       # Общая сумма продаж
        spark_max("order_created_date").alias("last_sale_date")  # Дата последнего заказа
    )
    .withColumnRenamed("product_type", "product_category")  # Переименовываем столбец
    .withColumn("load_dttm", current_timestamp())  # Добавляем метку времени загрузки
)

# Вывод схемы витрины данных
craftsman_report_df.printSchema()

# Вывод примера данных
craftsman_report_df.show(20)


root
 |-- craftsman_id: long (nullable = true)
 |-- craftsman_name: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- total_orders: long (nullable = false)
 |-- total_sales: long (nullable = true)
 |-- last_sale_date: date (nullable = true)
 |-- load_dttm: timestamp (nullable = false)

+------------+------------------+----------------+------------+-----------+--------------+--------------------+
|craftsman_id|    craftsman_name|product_category|total_orders|total_sales|last_sale_date|           load_dttm|
+------------+------------------+----------------+------------+-----------+--------------+--------------------+
|         558|     Brian Glashby|         clothes|           1|        100|    2018-01-19|2024-12-24 16:55:...|
|         222|  Glynda Ravenshaw|Beauty & Hygiene|           1|         87|    2020-05-07|2024-12-24 16:55:...|
|         730|   Tammara Parratt|         clothes|           1|        140|    2020-05-10|2024-12-24 16:55:...|
|         442

In [52]:
# Запись витрины данных в таблицу DWH
craftsman_report_df.write.jdbc(
    url=url,
    table="dwh.craftsman_report_datamart",
    mode="overwrite",  # Перезаписываем данные
    properties=properties
)
