In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, StringType
from delta.tables import DeltaTable
from pyspark.sql.window import Window
from datetime import datetime
import json

In [0]:
dbutils.widgets.text("pipeline_config_json", "", "Pipeline Config JSON (from 01_config)")
dbutils.widgets.text("run_mode", "full", "Run Mode")  # fallback only

pipeline_config_json = dbutils.widgets.get("pipeline_config_json").strip()

if pipeline_config_json:
    pipeline_config = json.loads(pipeline_config_json)

    run_mode = pipeline_config.get("run_mode", "full")
    FULL_TABLE_NAME = pipeline_config["bronze_table"]
    VOLUME_PATH = pipeline_config["volume_path"]
    FILES_CONFIG = pipeline_config["files_to_process"]

    incr_cfg = pipeline_config.get("incremental", {})
    merge_key_cols = incr_cfg.get("merge_key_cols", ["row_key"])

else:
    # Fallback
    run_mode = dbutils.widgets.get("run_mode")

    CATALOG = "ironman"
    BRONZE_SCHEMA = "bronze"
    TABLE_NAME = "ironman_results"
    FULL_TABLE_NAME = f"{CATALOG}.{BRONZE_SCHEMA}.{TABLE_NAME}"

    VOLUME_PATH = "/Volumes/ironman/default/landing"

    FILES_CONFIG = [
        {"filename": "2023_men.csv", "year": 2023, "gender": "M"},
        {"filename": "2023_women.csv", "year": 2023, "gender": "F"},
        # {"filename": "2024_men.csv", "year": 2024, "gender": "M"},
        # {"filename": "2024_women.csv", "year": 2024, "gender": "F"},
        # {"filename": "2025_men.csv", "year": 2025, "gender": "M"},
        # {"filename": "2025_women.csv", "year": 2025, "gender": "F"},
    ]

    merge_key_cols = ["row_key"]

print(f"Target: {FULL_TABLE_NAME}")
print(f"Run Mode: {run_mode}")
print(f"Files to process: {[f['filename'] for f in FILES_CONFIG]}")
print(f"Merge keys: {merge_key_cols}")

In [0]:
pipeline_config_json = dbutils.widgets.get("pipeline_config_json").strip()
print("pipeline_config_json length:", len(pipeline_config_json))


In [0]:
def read_csv_with_metadata(spark, file_path: str, year: int, gender: str):
    df = (
        spark.read
        .option("header", "true")
        .option("inferSchema", "false")
        .csv(file_path)
    )

    for col_name in df.columns:
        df = df.withColumn(
            col_name,
            F.when(F.col(col_name) == "-", None).otherwise(F.col(col_name))
        )

    df = (
        df
        .withColumn("year", F.lit(year).cast(IntegerType()))
        .withColumn("source_gender", F.lit(gender).cast(StringType()))
        .withColumn("source_file", F.lit(file_path).cast(StringType()))
        .withColumn("load_timestamp", F.current_timestamp())
        .withColumn("load_date", F.current_date())
    )

    return df

In [0]:
dataframes = []

for config in FILES_CONFIG:
    file_path = f"{VOLUME_PATH}/year={config['year']}/{config['filename']}"
    df = read_csv_with_metadata(spark, file_path, config["year"], config["gender"])
    row_count = df.count()
    dataframes.append(df)
    print(f"Read {row_count:,} rows from {config['filename']}")

bronze_df = dataframes[0]
for df in dataframes[1:]:
    bronze_df = bronze_df.unionByName(df, allowMissingColumns=True)

print(f"\nTotal rows: {bronze_df.count():,}")

In [0]:
bronze_df = bronze_df.withColumn(
    "athlete_name_clean",
    F.lower(F.regexp_replace(F.col("athlete_name"), "[^a-zA-Z0-9]", ""))
)

window_spec = Window.partitionBy("year", "source_gender", "athlete_name_clean").orderBy(
    F.col("rank").asc_nulls_last(),
    F.col("bib").asc_nulls_last()
)
bronze_df = bronze_df.withColumn("dup_rank", F.row_number().over(window_spec))

bronze_df = bronze_df.withColumn(
    "row_key",
    F.concat(
        F.col("year").cast("string"),
        F.lit("_"),
        F.col("source_gender"),
        F.lit("_"),
        F.col("athlete_name_clean"),
        F.lit("_"),
        F.col("dup_rank").cast("string")
    )
)

In [0]:
bronze_df = bronze_df.drop("athlete_name_clean", "dup_rank")

dup_count = bronze_df.groupBy("row_key").count().filter(F.col("count") > 1).count()
print(f"Duplicate keys: {dup_count}")

print("Schema:")
bronze_df.printSchema()

In [0]:
table_exists = spark.catalog.tableExists(FULL_TABLE_NAME)

In [0]:
merge_condition = " AND ".join([f"target.{c} = source.{c}" for c in merge_key_cols])

if (not table_exists) or (run_mode == "full"):
    print(f"Writing full load to {FULL_TABLE_NAME}")
    (
        bronze_df.write
        .format("delta")
        .mode("overwrite")
        .option("overwriteSchema", "true")
        .saveAsTable(FULL_TABLE_NAME)
    )
else:
    print(f"Incremental merge (insert-only) into {FULL_TABLE_NAME}")
    delta_table = DeltaTable.forName(spark, FULL_TABLE_NAME)
    (
        delta_table.alias("target")
        .merge(bronze_df.alias("source"), merge_condition)
        .whenNotMatchedInsertAll()
        .execute()
    )

print("Write complete")

In [0]:
result_df = spark.table(FULL_TABLE_NAME)

print(f"Table: {FULL_TABLE_NAME}")
print(f"Rows: {result_df.count():,}")

display(
    result_df
    .groupBy("year", "source_gender")
    .count()
    .orderBy("year", "source_gender")
)

spark.sql(f"OPTIMIZE {FULL_TABLE_NAME}")

print("\n" + "=" * 50)
print("BRONZE LAYER COMPLETE")
print("=" * 50)

In [0]:
dbutils.notebook.exit("SUCCESS")