 Bronze → Silver ETL (Incremental with Merge)

Loads events from Bronze to Silver using config and prevents duplicates via merge on `event_id`.

In [0]:
import yaml
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, col, to_timestamp, date_format
from pyspark.sql.types import ArrayType, StringType
from delta.tables import DeltaTable


spark = SparkSession.builder.getOrCreate()


In [0]:
config_path = "/Workspace/Users/pablo.sanchez.armas@gmail.com/lingokids/configs/silver/events.yaml"


with open(config_path, "r") as f:
    config = yaml.safe_load(f)

source_cfg = config["source"]
target_cfg = config["target"]
load_cfg = config["load"]

bronze_table = f"{source_cfg['catalog']}.{source_cfg['schema']}.{source_cfg['table']}"
silver_table = f"{target_cfg['catalog']}.{target_cfg['schema']}.{target_cfg['table']}"

merge_key = load_cfg.get("merge_key")
if not merge_key:
    raise ValueError("merge_key must be defined in the config file")

print(f"Source: {bronze_table}")
print(f"Target: {silver_table}")
print(f"Merge Key: {merge_key}")

In [0]:
# Read from bronze
df_bronze = spark.read.table(bronze_table)

# Add processing timestamp
df_bronze = df_bronze.withColumn("updated_at", current_timestamp())

# Convert occurred_at to timestamp and then to datekey (yyyymmdd)
df_bronze = df_bronze.withColumn(
    "date_key",
    date_format(to_timestamp("occurred_at", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), "yyyyMMdd")
)

df_data = df_bronze.select(
    "event_id",
    col("data.*"),
    "updated_at",
)

# -------------------------
# Split "context" into a separate DataFrame and flatten nested structs
# -------------------------

# Rename the nested fields first
df_context_renamed = df_bronze.withColumn("os_version", col("context.os.version")) \
                               .withColumn("app_version", col("context.app.version"))


# Now flatten the rest of the context struct (excluding the already renamed fields)
df_context = df_context_renamed.select(
    "event_id",
    col("context.device.*"),
    col("context.os.name").alias("os_name"),
    col("context.app.build").alias("app_build"),
    "os_version",
    "app_version",
    "updated_at"
)

df_bronze = df_bronze.drop("data").drop("context")

# -------------------------
# Dictionary of DataFrames and their target Silver table names
# -------------------------
silver_dfs = {
    silver_table: df_bronze,             # main events Silver table
    f"{target_cfg['catalog']}.{target_cfg['schema']}.events_data": df_data,    # data table
    f"{target_cfg['catalog']}.{target_cfg['schema']}.events_context": df_context  # context table
}

# -------------------------
# Write each DataFrame to Delta if table doesn't exist
# -------------------------
for table_name, df in silver_dfs.items():
    if not spark.catalog.tableExists(table_name):
        print(f"Creating Silver table {table_name}")
        df.write.format("delta") \
            .mode("overwrite") \
            .option("overwriteSchema", "true") \
            .saveAsTable(table_name)
    else:
        print(f"Merging new data into {table_name}")
        delta_table = DeltaTable.forName(spark, table_name)
        merge_condition = "t.event_id = s.event_id"
        delta_table.alias("t").merge(
            df.alias("s"),
            merge_condition
        ).whenMatchedUpdateAll() \
         .whenNotMatchedInsertAll() \
         .execute()


print("✅ Merge completed: new events added")