# Formula 1 DLT Bronze Layer
This notebook creates all bronze layer DLT tables for the Formula 1 ETL pipeline.

In [None]:
import dlt
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType, FloatType
from pyspark.sql.functions import current_timestamp

In [None]:
# Landing zone mount path - configure file_date as needed
file_date = spark.conf.get("file_date", "2021-03-21")
landing_path = f"/mnt/gualterformula1dl/landing/{file_date}"

## Circuits (CSV)

In [None]:
circuits_schema = StructType([
    StructField("circuitId", IntegerType(), False),
    StructField("circuitRef", StringType(), True),
    StructField("name", StringType(), True),
    StructField("location", StringType(), True),
    StructField("country", StringType(), True),
    StructField("lat", DoubleType(), True),
    StructField("lng", DoubleType(), True),
    StructField("alt", IntegerType(), True),
    StructField("url", StringType(), True),
])

@dlt.table(
    comment="Bronze layer: raw circuits data from Formula 1"
)
def bronze_circuits():
    df = (
        spark.read
        .option("header", True)
        .schema(circuits_schema)
        .csv(f"{landing_path}/circuits.csv")
    )
    return df.withColumn("ingestion_date", current_timestamp())

## Races (CSV)

In [None]:
races_schema = StructType([
    StructField("raceId", IntegerType(), False),
    StructField("year", IntegerType(), True),
    StructField("round", IntegerType(), True),
    StructField("circuitId", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("date", DateType(), True),
    StructField("time", StringType(), True),
    StructField("url", StringType(), True),
])

@dlt.table(
    comment="Bronze layer: raw races data from Formula 1"
)
def bronze_races():
    df = (
        spark.read
        .option("header", True)
        .schema(races_schema)
        .csv(f"{landing_path}/races.csv")
    )
    return df.withColumn("ingestion_date", current_timestamp())

## Constructors (JSON)

In [None]:
constructors_schema = "constructorId INT, constructorRef STRING, name STRING, nationality STRING, url STRING"

@dlt.table(
    comment="Bronze layer: raw constructors data from Formula 1"
)
def bronze_constructors():
    df = (
        spark.read
        .schema(constructors_schema)
        .json(f"{landing_path}/constructors.json")
    )
    return df.withColumn("ingestion_date", current_timestamp())

## Drivers (JSON with nested schema)

In [None]:
name_schema = StructType([
    StructField("forename", StringType(), True),
    StructField("surname", StringType(), True),
])

drivers_schema = StructType([
    StructField("driverId", IntegerType(), False),
    StructField("driverRef", StringType(), True),
    StructField("number", IntegerType(), True),
    StructField("code", StringType(), True),
    StructField("name", name_schema),
    StructField("dob", DateType(), True),
    StructField("nationality", StringType(), True),
    StructField("url", StringType(), True),
])

@dlt.table(
    comment="Bronze layer: raw drivers data from Formula 1"
)
def bronze_drivers():
    df = (
        spark.read
        .schema(drivers_schema)
        .json(f"{landing_path}/drivers.json")
    )
    return df.withColumn("ingestion_date", current_timestamp())

## Results (JSON)

In [None]:
results_schema = StructType([
    StructField("resultId", IntegerType(), False),
    StructField("raceId", IntegerType(), True),
    StructField("driverId", IntegerType(), True),
    StructField("constructorId", IntegerType(), True),
    StructField("number", IntegerType(), True),
    StructField("grid", IntegerType(), True),
    StructField("position", IntegerType(), True),
    StructField("positionText", StringType(), True),
    StructField("positionOrder", IntegerType(), True),
    StructField("points", FloatType(), True),
    StructField("laps", IntegerType(), True),
    StructField("time", StringType(), True),
    StructField("milliseconds", IntegerType(), True),
    StructField("fastestLap", IntegerType(), True),
    StructField("rank", IntegerType(), True),
    StructField("fastestLapTime", StringType(), True),
    StructField("fastestLapSpeed", FloatType(), True),
    StructField("statusId", StringType(), True),
])

@dlt.table(
    comment="Bronze layer: raw results data from Formula 1"
)
def bronze_results():
    df = (
        spark.read
        .schema(results_schema)
        .json(f"{landing_path}/results.json")
    )
    return df.withColumn("ingestion_date", current_timestamp())

## Pit Stops (JSON multiLine)

In [None]:
pit_stops_schema = StructType([
    StructField("raceId", IntegerType(), False),
    StructField("driverId", IntegerType(), True),
    StructField("stop", StringType(), True),
    StructField("lap", IntegerType(), True),
    StructField("time", StringType(), True),
    StructField("duration", StringType(), True),
    StructField("milliseconds", IntegerType(), True),
])

@dlt.table(
    comment="Bronze layer: raw pit stops data from Formula 1"
)
def bronze_pit_stops():
    df = (
        spark.read
        .schema(pit_stops_schema)
        .option("multiLine", True)
        .json(f"{landing_path}/pit_stops.json")
    )
    return df.withColumn("ingestion_date", current_timestamp())

## Lap Times (CSV folder)

In [None]:
lap_times_schema = StructType([
    StructField("raceId", IntegerType(), False),
    StructField("driverId", IntegerType(), True),
    StructField("lap", IntegerType(), True),
    StructField("position", IntegerType(), True),
    StructField("time", StringType(), True),
    StructField("milliseconds", IntegerType(), True),
])

@dlt.table(
    comment="Bronze layer: raw lap times data from Formula 1"
)
def bronze_lap_times():
    df = (
        spark.read
        .schema(lap_times_schema)
        .csv(f"{landing_path}/lap_times")
    )
    return df.withColumn("ingestion_date", current_timestamp())

## Qualifying (JSON multiLine folder)

In [None]:
qualifying_schema = StructType([
    StructField("qualifyId", IntegerType(), False),
    StructField("raceId", IntegerType(), True),
    StructField("driverId", IntegerType(), True),
    StructField("constructorId", IntegerType(), True),
    StructField("number", IntegerType(), True),
    StructField("position", IntegerType(), True),
    StructField("q1", StringType(), True),
    StructField("q2", StringType(), True),
    StructField("q3", StringType(), True),
])

@dlt.table(
    comment="Bronze layer: raw qualifying data from Formula 1"
)
def bronze_qualifying():
    df = (
        spark.read
        .schema(qualifying_schema)
        .option("multiLine", True)
        .json(f"{landing_path}/qualifying")
    )
    return df.withColumn("ingestion_date", current_timestamp())