In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import current_timestamp
from pathlib import Path
import time

In [2]:
spark = SparkSession \
.builder \
.appName("App") \
.master("local[*]") \
.getOrCreate()

In [5]:
from pyspark.sql.types import StructType, StructField, DateType, IntegerType, StringType, TimestampType, FloatType

In [6]:
# Определение схем
schema_ft_posting_f = StructType([
    StructField("oper_date", DateType(), nullable=False),
    StructField("credit_account_rk", IntegerType(), nullable=False),
    StructField("debet_account_rk", IntegerType(), nullable=False),
    StructField("credit_amount", FloatType()),
    StructField("debet_amount", FloatType())
])

schema_ft_balance_f = StructType([
    StructField("on_date", DateType(), nullable=False),
    StructField("account_rk", IntegerType(), nullable=False),
    StructField("currency_rk", IntegerType(), nullable=False),
    StructField("balance_out", FloatType(), nullable=False)
])

schema_md_exchange_rate_d = StructType([
    StructField("data_actual_date", DateType(), nullable=False),
    StructField("currency_rk", IntegerType(), nullable=False),
    StructField("reduced_rate", FloatType()),
    StructField("code_iso_num", StringType())
])

schema_md_ledger_account_s = StructType([
    StructField("ledger_account", StringType(), nullable=False),
    StructField("start_date", DateType(), nullable=False),
    StructField("end_date", DateType()),
    StructField("chapter", StringType()),
    StructField("account_name", StringType())
])

schema_md_account_d = StructType([
    StructField("data_actual_date", DateType(), nullable=False),
    StructField("data_actual_end_date", DateType(), nullable=False),
    StructField("account_rk", IntegerType(), nullable=False),
    StructField("account_number", StringType(), nullable=False),
    StructField("char_type", StringType(), nullable=False),
    StructField("currency_rk", IntegerType(), nullable=False),
    StructField("currency_code", StringType(), nullable=False)
])

log_schema = StructType([
    StructField("datetime_start", TimestampType(), nullable=False),
    StructField("datetime_end", TimestampType(), nullable=True),
    StructField("name_table", StringType(), nullable=False)
])

In [7]:
# инициализируем логовую таблицу: 
logs_table = spark.read.csv(
    "/home/jovyan/practice/logs.csv",
    header=True,
    schema=log_schema  
)

In [17]:
table_paths = [
    "/home/jovyan/practice/data/md_account_d.csv",
    "/home/jovyan/practice/data/md_currency_d.csv", 
    "/home/jovyan/practice/data/md_exchange_rate_d.csv",
    "/home/jovyan/practice/data/ft_balance_f.csv",
    "/home/jovyan/practice/data/ft_posting_f.csv",
    "/home/jovyan/practice/data/md_ledger_account_s.csv"
]

for source_path in table_paths:
    # Фиксируем время начала
    time_start = spark.sql("SELECT current_timestamp()").collect()[0][0]
    
    if "ft_posting_f" in source_path:
        schema = schema_ft_posting_f
    elif "ft_balance_f" in source_path:
        schema = schema_ft_balance_f
    elif "md_exchange_rate_d" in source_path:
        schema = schema_md_exchange_rate_d
    elif "md_ledger_account_s" in source_path:
        schema = schema_md_ledger_account_s
    elif "md_account_d" in source_path:
        schema = schema_md_account_d
    
    # Чтение данных
    df = spark.read.csv(
        source_path,
        schema=schema,
        header=True,
        sep=";"
    )
    
    # Имитация обработки
    time.sleep(5)
    
    # Фиксируем время окончания
    time_end = spark.sql("SELECT current_timestamp()").collect()[0][0]
    
    # Получаем имя таблицы из пути
    target_table_name = Path(source_path.rstrip('/')).stem
    
    # Создаем запись в лог
    start_log_df = spark.createDataFrame(
        [(time_start, time_end, target_table_name)],
        schema=log_schema
    )
    
    # Обновляем таблицу логов
    updated_logs_table = logs_table.union(start_log_df)
    
    # Сохраняем логи
    updated_logs_table.coalesce(1).write.csv(
        "/home/jovyan/practice/logs_tmp",
        header=True,
        mode="append"
    )
    
    # Сохраняем данные в parquet
    output_path = f"/home/jovyan/practice/data/{target_table_name}"
    df.write.parquet(output_path, mode="overwrite")
    
    print(f"Обработка таблицы {target_table_name} завершена")

Обработка таблицы md_account_d завершена
Обработка таблицы md_currency_d завершена
Обработка таблицы md_exchange_rate_d завершена
Обработка таблицы ft_balance_f завершена
Обработка таблицы ft_posting_f завершена
Обработка таблицы md_ledger_account_s завершена


In [None]:
parquet_df = spark.read.parquet("/home/jovyan/practice/data/md_account_d")

In [None]:
parquet_df.printSchema()

In [None]:
parquet_df.show(5)

In [None]:
# апдейт стратегия

In [None]:
existing_df = spark.read.parquet("/home/jovyan/practice/data/md_account_d")

In [None]:
# ключ прописываем - account rk
# изменим у account rk 34156787 currency_code на 228
updates_df = spark.createDataFrame([
    ("2018-01-01",	"2018-01-31",	34156787, "30236826400000009001",	"П",	31,	"228"),
], ["DATA_ACTUAL_DATE",	"DATA_ACTUAL_END_DATE", "ACCOUNT_RK","ACCOUNT_NUMBER", "CHAR_TYPE",	"CURRENCY_RK",	"CURRENCY_CODE"])

In [None]:
df_combined = existing_df.join(
    updates_df,
    on="account_rk",
    how="left_anti"
).unionByName(updates_df)

In [None]:
df_combined.filter(df_combined.account_rk==34156787).show()