In [0]:
from pyspark.sql.functions import col ,year, month, dayofmonth, weekofyear,to_date ,sum as spark_sum, isnan
from datetime import datetime
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType


In [0]:

# Читаємо останню дату в Silver рівні
latest_date_row = spark.read.format("delta").load("/FileStore/delta/silver_stock") \
    .selectExpr("max(Date) as latest_date").collect()

# Перевіряємо, чи результат порожній
latest_date = latest_date_row[0]["latest_date"] if latest_date_row and latest_date_row[0]["latest_date"] else None

# Встановлюємо порогову дату для історичних даних
historical_threshold = datetime.strptime("2025-06-10", "%Y-%m-%d").date()

# Логіка завантаження Bronze даних
if latest_date is None or latest_date < historical_threshold:
    # Завантажуємо ВСІ історичні дані
    bronze_df = spark.read.format("delta").load("/FileStore/delta/bronze_stock")
else:
    # Завантажуємо ТІЛЬКИ нові записи
    bronze_df = spark.read.format("delta").load("/FileStore/delta/bronze_stock") \
        .filter(col("Date") > latest_date)

In [0]:
#Перетворення типів стовпців та видалення дублікатів

silver_df = (
    bronze_df
    .withColumn("Date", to_date(col("Date"), "yyyy-MM-dd"))\
    .withColumn("Company", col("Company").cast("string"))\
    .withColumn("Open", col("Open").cast("double"))\
    .withColumn("High", col("High").cast("double"))\
    .withColumn("Low", col("Low").cast("double"))\
    .withColumn("Close", col("Close").cast("double"))\
    .withColumn("Volume", col("Volume").cast("long"))\
    .dropDuplicates(["Date", "Company"])) #Видалення дублікатів
    
    
    
    



In [0]:
#Заміна NaN на 0
cols_to_fill = ["Open", "Close", "Volume"]
silver_df = silver_df.fillna(0, subset=cols_to_fill)

In [0]:
#Додавання додаткових стовпців з датою
silver_df = (silver_df
    .withColumn("Year", year(col("Date")))\
    .withColumn("Month", month(col("Date")))\
    .withColumn("Day", dayofmonth(col("Date")))\
    .withColumn("Week", weekofyear(col("Date"))))

In [0]:
#Запис даних у форматі delta у таблицю "silver_stock"
silver_df.write.format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .partitionBy("Year", "Month") \
    .save("/FileStore/delta/silver_stock")

In [0]:
# Підрахунок кількості записів до видалення дублікатів
record_before = bronze_df.count()
#  Підрахунок кількості записів після видалення дублікатів
record_after = silver_df.count()
duplicates_removed = record_before - record_after


In [0]:
#Підрахунок пропущених значень Null та NaN

nan_counts_df = bronze_df.select([
    spark_sum((col(c).isNull() | isnan(col(c))).cast("int")).alias(c) for c in bronze_df.columns
])

null_total = nan_counts_df.selectExpr(
    "stack(" + str(len(nan_counts_df.columns)) + "," +
    ",".join([f"'{c}', `{c}`" for c in nan_counts_df.columns]) +
    ") as (column_name, null_count)"
).agg({"null_count": "sum"}).collect()[0][0]


In [0]:
#Додає дані до лог-таблиці
log_data = [(
    datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
    "silver_stock_data",
    "/FileStore/delta/bronze_stock",
    record_after,
    null_total,
    duplicates_removed,
    None,
    None,
    None
)]

log_silver_df = spark.createDataFrame(log_data, schema=log_schema)
#Запис даних до лог-таблиці "etl_log" у форматі delta
log_silver_df.write.format("delta").mode("append").save("/FileStore/delta/etl_log")