In [0]:
%python
from pyspark.sql.types import *
from pyspark.sql.functions import col, lit, current_timestamp


circuits_schema = StructType([
    StructField("circuitId", IntegerType(), False),
    StructField("circuitRef", StringType(), True),
    StructField("name", StringType(), True),
    StructField("location", StringType(), True),
    StructField("country", StringType(), True),
    StructField("lat", DoubleType(), True),
    StructField("lng", DoubleType(), True),
    StructField("alt", DoubleType(), True),
    StructField("url", StringType(), True),
])

constructors_schema = StructType([
    StructField("constructorId", IntegerType(), False),
    StructField("constructorRef", StringType(), True),
    StructField("name", StringType(), True),
    StructField("nationality", StringType(), True),
    StructField("url", StringType(), True),
])

constructor_results_schema = StructType([
    StructField("constructorResultsId", IntegerType(), False),
    StructField("raceId", IntegerType(), False),
    StructField("constructorId", IntegerType(), False),
    StructField("points", DoubleType(), True),
    StructField("status", StringType(), True),
])

constructor_standings_schema = StructType([
    StructField("constructorStandingsId", IntegerType(), False),
    StructField("raceId", IntegerType(), False),
    StructField("constructorId", IntegerType(), False),
    StructField("points", DoubleType(), True),
    StructField("position", IntegerType(), True),
    StructField("positionText", StringType(), True),
    StructField("wins", IntegerType(), True),
])

drivers_schema = StructType([
    StructField("driverId", IntegerType(), False),
    StructField("driverRef", StringType(), True),
    StructField("number", IntegerType(), True),
    StructField("code", StringType(), True),
    StructField("forename", StringType(), True),
    StructField("surname", StringType(), True),
    StructField("dob", DateType(), True),
    StructField("nationality", StringType(), True),
    StructField("url", StringType(), True),
])

driver_standings_schema = StructType([
    StructField("driverStandingsId", IntegerType(), False),
    StructField("raceId", IntegerType(), False),
    StructField("driverId", IntegerType(), False),
    StructField("points", DoubleType(), True),
    StructField("position", IntegerType(), True),
    StructField("positionText", StringType(), True),
    StructField("wins", IntegerType(), True),
])

lap_times_schema = StructType([
    StructField("raceId", IntegerType(), False),
    StructField("driverId", IntegerType(), False),
    StructField("lap", IntegerType(), False),
    StructField("position", IntegerType(), True),
    StructField("time", StringType(), True),
    StructField("milliseconds", IntegerType(), True),
])

pit_stops_schema = StructType([
    StructField("raceId", IntegerType(), False),
    StructField("driverId", IntegerType(), False),
    StructField("stop", IntegerType(), False),
    StructField("lap", IntegerType(), False),
    StructField("time", StringType(), True),
    StructField("duration", StringType(), True),
    StructField("milliseconds", IntegerType(), True),
])

qualifying_schema = StructType([
    StructField("qualifyId", IntegerType(), False),
    StructField("raceId", IntegerType(), False),
    StructField("driverId", IntegerType(), False),
    StructField("constructorId", IntegerType(), False),
    StructField("number", IntegerType(), True),
    StructField("position", IntegerType(), True),
    StructField("q1", StringType(), True),
    StructField("q2", StringType(), True),
    StructField("q3", StringType(), True),
])

races_schema = StructType([
    StructField("raceId", IntegerType(), False),
    StructField("year", IntegerType(), False),
    StructField("round", IntegerType(), True),
    StructField("circuitId", IntegerType(), False),
    StructField("name", StringType(), True),
    StructField("date", DateType(), True),
    StructField("time", StringType(), True),
    StructField("url", StringType(), True),
    StructField("fp1_date", DateType(), True),
    StructField("fp1_time", StringType(), True),
    StructField("fp2_date", DateType(), True),
    StructField("fp2_time", StringType(), True),
    StructField("fp3_date", DateType(), True),
    StructField("fp3_time", StringType(), True),
    StructField("quali_date", DateType(), True),
    StructField("quali_time", StringType(), True),
    StructField("sprint_date", DateType(), True),
    StructField("sprint_time", StringType(), True),
])

results_schema = StructType([
    StructField("resultId", IntegerType(), False),
    StructField("raceId", IntegerType(), False),
    StructField("driverId", IntegerType(), False),
    StructField("constructorId", IntegerType(), False),
    StructField("number", IntegerType(), True),
    StructField("grid", IntegerType(), True),
    StructField("position", IntegerType(), True),
    StructField("positionText", StringType(), True),
    StructField("positionOrder", IntegerType(), True),
    StructField("points", DoubleType(), True),
    StructField("laps", IntegerType(), True),
    StructField("time", StringType(), True),
    StructField("milliseconds", IntegerType(), True),
    StructField("fastestLap", IntegerType(), True),
    StructField("rank", IntegerType(), True),
    StructField("fastestLapTime", StringType(), True),
    StructField("fastestLapSpeed", StringType(), True),
    StructField("statusId", IntegerType(), True),
])

sprint_results_schema = StructType([
    StructField("resultId", IntegerType(), False),
    StructField("raceId", IntegerType(), False),
    StructField("driverId", IntegerType(), False),
    StructField("constructorId", IntegerType(), False),
    StructField("number", IntegerType(), True),
    StructField("grid", IntegerType(), True),
    StructField("position", IntegerType(), True),
    StructField("positionText", StringType(), True),
    StructField("positionOrder", IntegerType(), True),
    StructField("points", DoubleType(), True),
    StructField("laps", IntegerType(), True),
    StructField("time", StringType(), True),
    StructField("milliseconds", IntegerType(), True),
    StructField("fastestLap", IntegerType(), True),
    StructField("fastestLapTime", StringType(), True),
    StructField("statusId", IntegerType(), True),
])

seasons_schema = StructType([
    StructField("year", IntegerType(), False),
    StructField("url", StringType(), True),
])

status_schema = StructType([
    StructField("statusId", IntegerType(), False),
    StructField("status", StringType(), True),
])


In [0]:
%python
RAW_BASE_PATH="/Volumes/workspace/f1/raw/"
def read_csv_with_schema(filename: str, schema: StructType):
    path = f"{RAW_BASE_PATH}/{filename}"
    return (spark.read
        .option("header", "true")
        .schema(schema)
        .csv(path))

In [0]:
%python
circuits_raw_df = read_csv_with_schema("circuits.csv", circuits_schema)
constructors_raw_df = read_csv_with_schema("constructors.csv", constructors_schema)
constructor_results_raw_df = read_csv_with_schema("constructor_results.csv", constructor_results_schema)
constructor_standings_raw_df = read_csv_with_schema("constructor_standings.csv", constructor_standings_schema)
drivers_raw_df = read_csv_with_schema("drivers.csv", drivers_schema)
driver_standings_raw_df = read_csv_with_schema("driver_standings.csv", driver_standings_schema)
lap_times_raw_df = read_csv_with_schema("lap_times.csv", lap_times_schema)
pit_stops_raw_df = read_csv_with_schema("pit_stops.csv", pit_stops_schema)
qualifying_raw_df = read_csv_with_schema("qualifying.csv", qualifying_schema)
races_raw_df = read_csv_with_schema("races.csv", races_schema)
results_raw_df = read_csv_with_schema("results.csv", results_schema)
sprint_results_raw_df = read_csv_with_schema("sprint_results.csv", sprint_results_schema)
seasons_raw_df = read_csv_with_schema("seasons.csv", seasons_schema)
status_raw_df = read_csv_with_schema("status.csv", status_schema)


In [0]:
%python
from pyspark.sql.functions import when

def validate_results_generic(results_df, races_df, table_label: str):
    """
    Basic validation for results-like tables:
    - Non-null raceId, driverId, constructorId.
    - race year between 1950 and 2030.
    """
    res_with_year = results_df.join(
        races_df.select("raceId", "year"), on="raceId", how="left"
    )

    valid = res_with_year.filter(
        (col("raceId").isNotNull()) &
        (col("driverId").isNotNull()) &
        (col("constructorId").isNotNull()) &
        (col("year") >= 1950) &
        (col("year") <= 2030)
    )

    invalid = res_with_year.exceptAll(valid).withColumn(
        "error_reason",
        lit(f"Invalid {table_label} record (keys or year out of [1950,2030])")
    )

    return valid.drop("year"), invalid

# Validate race results
valid_results_df, invalid_results_df = validate_results_generic(results_raw_df, races_raw_df, "race_results")

# Validate sprint results
valid_sprint_results_df, invalid_sprint_results_df = validate_results_generic(
    sprint_results_raw_df, races_raw_df, "sprint_results"
)

# Save invalid rows into quarantine Delta tables for auditing
(invalid_results_df
    .withColumn("ingestion_ts", current_timestamp())
    .write
    .mode("append")
    .format("delta")
    .saveAsTable("f1.f1_invalid_results"))

(invalid_sprint_results_df
    .withColumn("ingestion_ts", current_timestamp())
    .write
    .mode("append")
    .format("delta")
    .saveAsTable("f1.f1_invalid_sprint_results"))

# Validation for lap times (no negative or null milliseconds)
valid_lap_times_df = lap_times_raw_df.filter(
    (col("raceId").isNotNull()) &
    (col("driverId").isNotNull()) &
    (col("lap").isNotNull()) &
    (col("milliseconds").isNull() | (col("milliseconds") >= 0))
)

invalid_lap_times_df = lap_times_raw_df.exceptAll(valid_lap_times_df).withColumn(
    "error_reason",
    lit("Invalid lap record (missing IDs or negative ms)")
)

(invalid_lap_times_df
    .withColumn("ingestion_ts", current_timestamp())
    .write
    .mode("append")
    .format("delta")
    .saveAsTable("f1.f1_invalid_lap_times"))

In [0]:
%python
def write_bronze(df, table_name, partition_cols=None):
    writer = (df.write
        .mode("overwrite")
        .format("delta"))
    if partition_cols:
        writer = writer.partitionBy(partition_cols)
    writer.saveAsTable(table_name)

# Races (partition by year because it's natural and used often)
write_bronze(races_raw_df, "f1.f1_bronze_races", "year")

# Seasons (small dim, no partitioning needed, but you can if you want)
write_bronze(seasons_raw_df, "f1.f1_bronze_seasons")

# Status dimension
write_bronze(status_raw_df, "f1.f1_bronze_status")

# Other dimension / fact tables
write_bronze(circuits_raw_df, "f1.f1_bronze_circuits")
write_bronze(constructors_raw_df, "f1.f1_bronze_constructors")
write_bronze(constructor_results_raw_df, "f1.f1_bronze_constructor_results")
write_bronze(constructor_standings_raw_df, "f1.f1_bronze_constructor_standings")
write_bronze(drivers_raw_df, "f1.f1_bronze_drivers")
write_bronze(driver_standings_raw_df, "f1.f1_bronze_driver_standings")
write_bronze(valid_lap_times_df, "f1.f1_bronze_lap_times")
write_bronze(pit_stops_raw_df, "f1.f1_bronze_pit_stops")
write_bronze(qualifying_raw_df, "f1.f1_bronze_qualifying")
write_bronze(valid_results_df, "f1.f1_bronze_results")
write_bronze(valid_sprint_results_df, "f1.f1_bronze_sprint_results")

print("Bronze ingestion completed.")