In [1]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import (
    current_timestamp, current_date, date_format, col, sum, avg, count, lit, expr,
    row_number, desc, when, median
)
from datetime import date

spark = SparkSession.builder \
    .appName("Postgres-Spark") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.5.4") \
    .getOrCreate()


jdbc_url = "jdbc:postgresql://postgres:5432/postgres_db"
connection_properties = {
    "user": "postgres_user",
    "password": "postgres_password",
    "driver": "org.postgresql.Driver"
}

# get data from Postgres
def read_from_postgres(schema, table):
    return spark.read.format("jdbc") \
        .option("url", jdbc_url) \
        .option("driver", connection_properties["driver"]) \
        .option("dbtable", f"{schema}.{table}") \
        .option("user", connection_properties["user"]) \
        .option("password", connection_properties["password"]) \
        .load()

# Take into Postgres
def write_to_postgres(df, schema, table):
    df.write.format("jdbc") \
        .option("url", jdbc_url) \
        .option("driver", connection_properties["driver"]) \
        .option("dbtable", f"{schema}.{table}") \
        .option("user", connection_properties["user"]) \
        .option("password", connection_properties["password"]) \
        .mode("append") \
        .save()

# =========================
# 1. Read data
# =========================
source1_df = read_from_postgres("source1", "craft_market_wide")
source2_masters_products = read_from_postgres("source2", "craft_market_masters_products")
source2_orders_customers = read_from_postgres("source2", "craft_market_orders_customers")
source3_orders = read_from_postgres("source3", "craft_market_orders")
source3_craftsmans = read_from_postgres("source3", "craft_market_craftsmans")
source3_customers = read_from_postgres("source3", "craft_market_customers")

all_columns = [
    'order_id',
    'order_created_date',
    'order_completion_date',
    'order_status',
    'craftsman_id',
    'craftsman_name',
    'craftsman_address',
    'craftsman_birthday',
    'craftsman_email',
    'product_id',
    'product_name',
    'product_description',
    'product_type',
    'product_price',
    'customer_id',
    'customer_name',
    'customer_address',
    'customer_birthday',
    'customer_email'
]

# 1st source
source1_df = source1_df.select(all_columns)

# 2nd source
source2_df = source2_masters_products.join(
    source2_orders_customers,
    (source2_masters_products.product_id == source2_orders_customers.product_id) &
    (source2_masters_products.craftsman_id == source2_orders_customers.craftsman_id)
).select(
    source2_masters_products.craftsman_id,
    source2_masters_products.craftsman_name,
    source2_masters_products.craftsman_address,
    source2_masters_products.craftsman_birthday,
    source2_masters_products.craftsman_email,
    source2_masters_products.product_id,
    source2_masters_products.product_name,
    source2_masters_products.product_description,
    source2_masters_products.product_type,
    source2_masters_products.product_price,
    source2_orders_customers.customer_id,
    source2_orders_customers.customer_name,
    source2_orders_customers.customer_address,
    source2_orders_customers.customer_birthday,
    source2_orders_customers.customer_email,
    source2_orders_customers.order_id,
    source2_orders_customers.order_created_date,
    source2_orders_customers.order_completion_date,
    source2_orders_customers.order_status
).select(all_columns)

# 3rd source
source3_df = source3_orders.join(
    source3_craftsmans,
    source3_orders.craftsman_id == source3_craftsmans.craftsman_id
).join(
    source3_customers,
    source3_orders.customer_id == source3_customers.customer_id
).select(
    source3_orders.order_id,
    source3_orders.order_created_date,
    source3_orders.order_completion_date,
    source3_orders.order_status,
    source3_craftsmans.craftsman_id,
    source3_craftsmans.craftsman_name,
    source3_craftsmans.craftsman_address,
    source3_craftsmans.craftsman_birthday,
    source3_craftsmans.craftsman_email,
    source3_orders.product_id,
    source3_orders.product_name,
    source3_orders.product_description,
    source3_orders.product_type,
    source3_orders.product_price,
    source3_customers.customer_id,
    source3_customers.customer_name,
    source3_customers.customer_address,
    source3_customers.customer_birthday,
    source3_customers.customer_email
).select(all_columns)

all_sources_df = (source1_df.union(source2_df).union(source3_df)).distinct()

# =========================
# 2. d-customers
# =========================
d_customers_df = read_from_postgres("dwh", "d_customers")
customers_columns = ['customer_name', 'customer_address', 'customer_birthday', 'customer_email']

# Удаляем из объединенного DataFrame записи, которые уже существуют в таблице измерений
new_customers_df = (
    all_sources_df.select(customers_columns)
    .distinct()
    .exceptAll(d_customers_df.select(customers_columns))
    .withColumn("load_dttm", current_timestamp())
).cache()

# Записываем оставшиеся записи в таблицу измерений
write_to_postgres(new_customers_df, "dwh", "d_customers")

d_customers_df = read_from_postgres("dwh", "d_customers")

# Присоединяем customer_id из оригинальной таблицы d_customers_df
new_customers_df = new_customers_df.alias("new_customers").join(
    d_customers_df.alias("d_customers"),
    (
        (col("new_customers.customer_name") == col("d_customers.customer_name")) &
        (col("new_customers.customer_address") == col("d_customers.customer_address")) &
        (col("new_customers.customer_birthday") == col("d_customers.customer_birthday")) &
        (col("new_customers.customer_email") == col("d_customers.customer_email"))
    ),
    how='left'
).select(
    col("d_customers.customer_id").alias("customer_id"),
    col("new_customers.customer_name"),
    col("new_customers.customer_address"),
    col("new_customers.customer_birthday"),
    col("new_customers.customer_email"),
    col("new_customers.load_dttm")
)

# обновление customer_id на фактический
all_sources_df = all_sources_df.alias("all_sources_df").join(
    d_customers_df.alias("d_customers_df"),
    (
        (col("all_sources_df.customer_name") == col("d_customers_df.customer_name")) &
        (col("all_sources_df.customer_address") == col("d_customers_df.customer_address")) &
        (col("all_sources_df.customer_birthday") == col("d_customers_df.customer_birthday")) &
        (col("all_sources_df.customer_email") == col("d_customers_df.customer_email"))
    ),
    how='left'
).select(
    col("d_customers_df.customer_id").alias("customer_id"),
    col('all_sources_df.order_id'),
    col('all_sources_df.order_created_date'),
    col('all_sources_df.order_completion_date'),
    col('all_sources_df.order_status'),
    col('all_sources_df.craftsman_id'),
    col('all_sources_df.craftsman_name'),
    col('all_sources_df.craftsman_address'),
    col('all_sources_df.craftsman_birthday'),
    col('all_sources_df.craftsman_email'),
    col('all_sources_df.product_id'),
    col('all_sources_df.product_name'),
    col('all_sources_df.product_description'),
    col('all_sources_df.product_type'),
    col('all_sources_df.product_price'),
    col('all_sources_df.customer_name'),
    col('all_sources_df.customer_address'),
    col('all_sources_df.customer_birthday'),
    col('all_sources_df.customer_email')
)

# =========================
# 3. d-products
# =========================
d_products_df = read_from_postgres("dwh", "d_products")
products_columns = ['product_name', 'product_description', 'product_type', 'product_price']

# Удаляем из объединенного DataFrame записи, которые уже существуют в таблице измерений
new_products_df = (
    all_sources_df.select(products_columns)
    .distinct()
    .exceptAll(d_products_df.select(products_columns))
    .withColumn("load_dttm", current_timestamp())
).cache()

# Записываем оставшиеся записи в таблицу измерений
write_to_postgres(new_products_df, "dwh", "d_products")
d_products_df = read_from_postgres("dwh", "d_products")

# Присоединяем product_id из оригинальной таблицы d_products_df
new_products_df = new_products_df.alias("new_products").join(
    d_products_df.alias("d_products"),
    (
        (col("new_products.product_name") == col("d_products.product_name")) &
        (col("new_products.product_description") == col("d_products.product_description")) &
        (col("new_products.product_type") == col("d_products.product_type")) &
        (col("new_products.product_price") == col("d_products.product_price"))
    ),
    how='left'
).select(
    col("d_products.product_id").alias("product_id"),
    col("new_products.product_name"),
    col("new_products.product_description"),
    col("new_products.product_type"),
    col("new_products.product_price")
)

# обновление product_id на фактический
all_sources_df = all_sources_df.alias("all_sources_df").join(
    d_products_df.alias("d_products_df"),
    (
        (col("all_sources_df.product_name") == col("d_products_df.product_name")) &
        (col("all_sources_df.product_description") == col("d_products_df.product_description")) &
        (col("all_sources_df.product_type") == col("d_products_df.product_type")) &
        (col("all_sources_df.product_price") == col("d_products_df.product_price"))
    ),
    how='left'
).select(
    col("d_products_df.product_id").alias("product_id"),
    col('all_sources_df.order_id'),
    col('all_sources_df.order_created_date'),
    col('all_sources_df.order_completion_date'),
    col('all_sources_df.order_status'),
    col('all_sources_df.craftsman_id'),
    col('all_sources_df.craftsman_name'),
    col('all_sources_df.craftsman_address'),
    col('all_sources_df.craftsman_birthday'),
    col('all_sources_df.craftsman_email'),
    col('all_sources_df.customer_id'),
    col('all_sources_df.product_name'),
    col('all_sources_df.product_description'),
    col('all_sources_df.product_type'),
    col('all_sources_df.product_price'),
    col('all_sources_df.customer_name'),
    col('all_sources_df.customer_address'),
    col('all_sources_df.customer_birthday'),
    col('all_sources_df.customer_email')
)

# =========================
# 4. d-craftsmans
# =========================
d_craftsmans_df = read_from_postgres("dwh", "d_craftsmans")
craftsmans_columns = ['craftsman_name', 'craftsman_address', 'craftsman_birthday', 'craftsman_email']

# Удаляем из объединенного DataFrame записи, которые уже существуют в таблице измерений
new_craftsmans_df = (
    all_sources_df.select(craftsmans_columns)
    .distinct()
    .exceptAll(d_craftsmans_df.select(craftsmans_columns))
    .withColumn("load_dttm", current_timestamp())
).cache()

# Записываем оставшиеся записи в таблицу измерений
write_to_postgres(new_craftsmans_df, "dwh", "d_craftsmans")
d_craftsmans_df = read_from_postgres("dwh", "d_craftsmans")

# Присоединяем craftsman_id из оригинальной таблицы d_craftsmans_df
new_craftsmans_df = new_craftsmans_df.alias("new_craftsmans").join(
    d_craftsmans_df.alias("d_craftsmans"),
    (
        (col("new_craftsmans.craftsman_name") == col("d_craftsmans.craftsman_name")) &
        (col("new_craftsmans.craftsman_address") == col("d_craftsmans.craftsman_address")) &
        (col("new_craftsmans.craftsman_birthday") == col("d_craftsmans.craftsman_birthday")) &
        (col("new_craftsmans.craftsman_email") == col("d_craftsmans.craftsman_email"))
    ),
    how='left'
).select(
    col("d_craftsmans.craftsman_id").alias("craftsman_id"),
    col("new_craftsmans.craftsman_name"),
    col("new_craftsmans.craftsman_address"),
    col("new_craftsmans.craftsman_birthday"),
    col("new_craftsmans.craftsman_email")
)

# обновление craftsman_id на фактический
all_sources_df = all_sources_df.alias("all_sources_df").join(
    d_craftsmans_df.alias("d_craftsmans_df"),
    (
        (col("all_sources_df.craftsman_name") == col("d_craftsmans_df.craftsman_name")) &
        (col("all_sources_df.craftsman_address") == col("d_craftsmans_df.craftsman_address")) &
        (col("all_sources_df.craftsman_birthday") == col("d_craftsmans_df.craftsman_birthday")) &
        (col("all_sources_df.craftsman_email") == col("d_craftsmans_df.craftsman_email"))
    ),
    how='left'
).select(
    col("d_craftsmans_df.craftsman_id").alias("craftsman_id"),
    col('all_sources_df.order_id'),
    col('all_sources_df.order_created_date'),
    col('all_sources_df.order_completion_date'),
    col('all_sources_df.order_status'),
    col('all_sources_df.product_id'),
    col('all_sources_df.craftsman_name'),
    col('all_sources_df.craftsman_address'),
    col('all_sources_df.craftsman_birthday'),
    col('all_sources_df.craftsman_email'),
    col('all_sources_df.customer_id'),
    col('all_sources_df.product_name'),
    col('all_sources_df.product_description'),
    col('all_sources_df.product_type'),
    col('all_sources_df.product_price'),
    col('all_sources_df.customer_name'),
    col('all_sources_df.customer_address'),
    col('all_sources_df.customer_birthday'),
    col('all_sources_df.customer_email')
)

# =========================
# 5. f-orders
# =========================
f_orders_df = read_from_postgres("dwh", "f_orders")
orders_columns = ['product_id', 'craftsman_id', 'customer_id', 'order_created_date', 'order_completion_date', 'order_status']

# Удаляем из объединенного DataFrame записи, которые уже существуют в таблице измерений
new_orders_df = (
    all_sources_df.select(orders_columns)
    .distinct()
    .exceptAll(f_orders_df.select(orders_columns))
    .withColumn("load_dttm", current_timestamp())
).cache()

# Записываем оставшиеся записи в таблицу измерений
write_to_postgres(new_orders_df, "dwh", "f_orders")
f_orders_df = read_from_postgres("dwh", "f_orders")

# Присоединяем order_id из оригинальной таблицы f_orders_df
new_orders_df = new_orders_df.alias("new_orders").join(
    f_orders_df.alias("f_orders"),
    (
        (col("new_orders.product_id") == col("f_orders.product_id")) &
        (col("new_orders.craftsman_id") == col("f_orders.craftsman_id")) &
        (col("new_orders.customer_id") == col("f_orders.customer_id")) &
        (col("new_orders.order_created_date") == col("f_orders.order_created_date")) &
        (col("new_orders.order_completion_date") == col("f_orders.order_completion_date")) &
        (col("new_orders.order_status") == col("f_orders.order_status"))
    ),
    how='left'
).select(
    col("f_orders.order_id").alias("order_id"),
    col("new_orders.product_id"),
    col("new_orders.craftsman_id"),
    col("new_orders.customer_id"),
    col("new_orders.order_created_date"),
    col("new_orders.order_completion_date"),
    col("new_orders.order_status")
)

# обновление order_id на фактический
all_sources_df = all_sources_df.alias("all_sources_df").join(
    f_orders_df.alias("f_orders_df"),
    (
        (col("all_sources_df.product_id") == col("f_orders_df.product_id")) &
        (col("all_sources_df.craftsman_id") == col("f_orders_df.craftsman_id")) &
        (col("all_sources_df.customer_id") == col("f_orders_df.customer_id")) &
        (col("all_sources_df.order_created_date") == col("f_orders_df.order_created_date")) &
        (col("all_sources_df.order_completion_date") == col("f_orders_df.order_completion_date")) &
        (col("all_sources_df.order_status") == col("f_orders_df.order_status"))
    ),
    how='left'
).select(
    col("f_orders_df.order_id").alias("order_id"),
    col('all_sources_df.craftsman_id'),
    col('all_sources_df.order_created_date'),
    col('all_sources_df.order_completion_date'),
    col('all_sources_df.order_status'),
    col('all_sources_df.product_id'),
    col('all_sources_df.craftsman_name'),
    col('all_sources_df.craftsman_address'),
    col('all_sources_df.craftsman_birthday'),
    col('all_sources_df.craftsman_email'),
    col('all_sources_df.customer_id'),
    col('all_sources_df.product_name'),
    col('all_sources_df.product_description'),
    col('all_sources_df.product_type'),
    col('all_sources_df.product_price'),
    col('all_sources_df.customer_name'),
    col('all_sources_df.customer_address'),
    col('all_sources_df.customer_birthday'),
    col('all_sources_df.customer_email')
)

# =========================
# 6. Инкрементальная таблица
# =========================
# Добавляем отчетный период в таблицу заказов
new_orders_df = new_orders_df.withColumn("report_period", date_format("order_created_date", "yyyy-MM"))

# Создаем окно для нахождения самой популярной категории
window_spec = Window.partitionBy("craftsman_id", "report_period").orderBy(desc("count_category"))

# Считаем количество товаров каждой категории для каждого мастера и периода
product_category_counts = (
    new_orders_df.join(
        new_products_df.alias("products"),
        new_orders_df.product_id == col("products.product_id")
    )
    .groupBy("craftsman_id", "report_period", "products.product_type")
    .agg(count("products.product_type").alias("count_category"))
)

# Находим топ-1 категорию для каждого мастера и периода
top_categories = (
    product_category_counts
    .withColumn("row_num", row_number().over(window_spec))
    .filter(col("row_num") == 1)
    .select("craftsman_id", "report_period", "product_type")
)

# Добавляем расчет агрегатов
new_craftsman_report_datamart_df = (
    new_orders_df.join(
        new_craftsmans_df.alias("craftsmans"),
        new_orders_df.craftsman_id == col("craftsmans.craftsman_id")
    )
    .join(
        new_products_df.alias("products"),
        new_orders_df.product_id == col("products.product_id")
    )
    .join(
        new_customers_df.alias("customers"),
        new_orders_df.customer_id == col("customers.customer_id")
    )
    .groupBy(
        col("craftsmans.craftsman_id"),
        col("craftsmans.craftsman_name"),
        col("craftsmans.craftsman_address"),
        col("craftsmans.craftsman_birthday"),
        col("craftsmans.craftsman_email"),
        col("report_period")
    )
    .agg(
        sum(col("products.product_price") * 0.9).alias("craftsman_money"),
        sum(col("products.product_price") * 0.1).alias("platform_money"),
        count(new_orders_df.order_id).alias("count_order"),
        avg(col("products.product_price")).alias("avg_price_order"),
        avg(expr("DATEDIFF(current_date(), customers.customer_birthday) / 365.25")).alias("avg_age_customer"),
        median(expr("DATEDIFF(order_completion_date, order_created_date)")).alias("median_time_order_completed"),
        sum(when(new_orders_df.order_status == "created", 1).otherwise(0)).alias("count_order_created"),
        sum(when(new_orders_df.order_status == "in_progress", 1).otherwise(0)).alias("count_order_in_progress"),
        sum(when(new_orders_df.order_status == "delivery", 1).otherwise(0)).alias("count_order_delivery"),
        sum(when(new_orders_df.order_status == "done", 1).otherwise(0)).alias("count_order_done"),
        sum(when(new_orders_df.order_status != "done", 1).otherwise(0)).alias("count_order_not_done")
    )
)

# Присоединяем топ-1 категорию товаров
new_craftsman_report_datamart_df = new_craftsman_report_datamart_df.join(
    top_categories,
    ["craftsman_id", "report_period"],
    "left"
).withColumnRenamed("product_type", "top_product_category").withColumn("load_dttm", current_timestamp())

# Check
new_craftsman_report_datamart_df.toPandas().head(5)


Unnamed: 0,craftsman_id,report_period,craftsman_name,craftsman_address,craftsman_birthday,craftsman_email,craftsman_money,platform_money,count_order,avg_price_order,avg_age_customer,median_time_order_completed,count_order_created,count_order_in_progress,count_order_delivery,count_order_done,count_order_not_done,top_product_category,load_dttm
0,100,2021-01,Rafe Torbeck,1296 Farragut Plaza,1998-02-12,htillerjq@adobe.com,143.1,15.9,0,159.0,26.836413,,1,0,0,0,1,clothes,2025-01-01 23:41:44.193389
1,143,2020-02,Myriam Knocker,2 Paget Center,1998-12-17,cdecreuzeqp@sciencedaily.com,226.8,25.2,0,252.0,21.434634,,0,0,0,0,1,Beauty & Hygiene,2025-01-01 23:41:44.193389
2,241,2021-08,Hart Elintune,07 Rusk Parkway,2002-02-16,mropkins2q@biblegateway.com,91.8,10.2,0,102.0,24.492813,,0,0,1,0,1,clothes,2025-01-01 23:41:44.193389
3,854,2022-11,Myron Sinnock,90698 Rusk Way,1996-07-30,jharloweg6@youku.com,51.3,5.7,0,57.0,20.845996,,0,0,1,0,1,clothes,2025-01-01 23:41:44.193389
4,1279,2022-07,Pryce Gilbard,21 Westend Alley,1991-11-07,ljudkinfw@upenn.edu,65.7,7.3,1,73.0,28.528405,3.0,0,0,0,1,0,clothes,2025-01-01 23:41:44.193389


In [2]:
craftsman_report_datamart = read_from_postgres("dwh", "craftsman_report_datamart")

# Ключевые столбцы для идентификации строки
key_columns = ["craftsman_id", "report_period"]

# 1. Определяем новые строки (INSERT)
new_rows_df = new_craftsman_report_datamart_df.join(
    craftsman_report_datamart.select(*key_columns),
    key_columns,
    how="left_anti"
)

# 2. Определяем обновленные строки (UPDATE)
updated_rows_df = new_craftsman_report_datamart_df.alias("new").join(
    craftsman_report_datamart.alias("existing"),
    key_columns,
    how="inner"
).filter(
    # Проверяем, есть ли расхождения в значимых столбцах
    (col("new.craftsman_money") != col("existing.craftsman_money")) |
    (col("new.platform_money") != col("existing.platform_money")) |
    (col("new.count_order") != col("existing.count_order")) |
    (col("new.avg_price_order") != col("existing.avg_price_order")) |
    (col("new.avg_age_customer") != col("existing.avg_age_customer")) |
    (col("new.median_time_order_completed") != col("existing.median_time_order_completed")) |
    (col("new.count_order_created") != col("existing.count_order_created")) |
    (col("new.count_order_in_progress") != col("existing.count_order_in_progress")) |
    (col("new.count_order_delivery") != col("existing.count_order_delivery")) |
    (col("new.count_order_done") != col("existing.count_order_done")) |
    (col("new.count_order_not_done") != col("existing.count_order_not_done")) |
    (col("new.top_product_category") != col("existing.top_product_category"))
).select("new.*")

In [3]:
!pip install psycopg2-binary
import psycopg2
from psycopg2.extras import execute_values
from pyspark.sql import SparkSession, Window, Row


def update_existing_rows(df, schema, table):
    # Преобразуем DataFrame в список словарей
    rows = df.collect()
    update_query = f"""
    UPDATE {schema}.{table} AS target
    SET craftsman_money = data.craftsman_money,
        platform_money = data.platform_money,
        count_order = data.count_order,
        avg_price_order = data.avg_price_order,
        avg_age_customer = data.avg_age_customer,
        median_time_order_completed = data.median_time_order_completed,
        count_order_created = data.count_order_created,
        count_order_in_progress = data.count_order_in_progress,
        count_order_delivery = data.count_order_delivery,
        count_order_done = data.count_order_done,
        count_order_not_done = data.count_order_not_done,
        top_product_category = data.top_product_category,
        load_dttm = data.load_dttm
    FROM (VALUES %s) AS data (
        craftsman_id, report_period, craftsman_money, platform_money, count_order, avg_price_order,
        avg_age_customer, median_time_order_completed, count_order_created, count_order_in_progress,
        count_order_delivery, count_order_done, count_order_not_done, top_product_category, load_dttm
    )
    WHERE target.craftsman_id = data.craftsman_id AND target.report_period = data.report_period
    """
    conn = psycopg2.connect(
        dbname="postgres",
        user="postgres_user",
        password="postgres_password",
        host="postgres",
        port="5432"
    )
    with conn.cursor() as cursor:
        values = [
            (
                row["craftsman_id"], row["report_period"], row["craftsman_money"], row["platform_money"],
                row["count_order"], row["avg_price_order"], row["avg_age_customer"], 
                row["median_time_order_completed"], row["count_order_created"],
                row["count_order_in_progress"], row["count_order_delivery"], row["count_order_done"],
                row["count_order_not_done"], row["top_product_category"], row["load_dttm"]
            )
            for row in rows
        ]
        execute_values(cursor, update_query, values)
    conn.commit()
    conn.close()


Collecting psycopg2-binary
  Using cached psycopg2_binary-2.9.10-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.9 kB)
Using cached psycopg2_binary-2.9.10-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.0 MB)
Installing collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.9.10


In [4]:
write_to_postgres(new_rows_df.drop("load_dttm"), "dwh", "craftsman_report_datamart")

In [8]:
# Обновление таблицы инкрементальных загрузок
load_dates_dma = read_from_postgres("dwh", "load_dates_craftsman_report_datamart")

load_dates_data = [Row(load_dttm=date.today())]  # Используем текущую дату из модуля datetime
load_dates_df = spark.createDataFrame(load_dates_data).exceptAll(load_dates_dma.select('load_dttm'))

# Запись в таблицу
write_to_postgres(load_dates_df, "dwh", "load_dates_craftsman_report_datamart")

read_from_postgres("dwh", "load_dates_craftsman_report_datamart").toPandas().sort_values(by='load_dttm', ascending=False).head(10)

Unnamed: 0,id,load_dttm
0,1,2025-01-01


In [9]:
import gc
gc.collect()

749