# Silver to Gold - All Tables
This notebook curates the data in the silver table to the gold layer 

In [0]:
silver_catalog_schema = "catadb360dev.f1silver"
gold_catalog_schema = "catadb360dev.f1gold"

## Circuits Table

In [0]:
df_circuits = spark.table(silver_catalog_schema + ".circuits")
df_circuits = df_circuits.drop("alt", "url", "circuitRef")
display(df_circuits)

In [0]:
df_circuits.write.mode("overwrite").saveAsTable(gold_catalog_schema + ".circuits")

## Constructor Results Table

In [0]:
from pyspark.sql.functions import col

df_constructor_results = spark.table(silver_catalog_schema + ".constructor_results")
df_races = spark.table(silver_catalog_schema + ".races").select("raceId", col("name").alias("race"), "year")
df_constructors = spark.table(silver_catalog_schema + ".constructors").select("constructorId", col("name").alias("constructor"))
df_constructor_results = df_constructor_results.join(df_races, on="raceId", how="left")
df_constructor_results = df_constructor_results.join(df_constructors, on="constructorId", how="left")
df_constructor_results = df_constructor_results.drop("raceId", "constructorId", "status")
display(df_constructor_results)

In [0]:
df_constructor_results.write.mode("overwrite").saveAsTable(gold_catalog_schema + ".constructor_results")

## Constructor Standings

In [0]:
df_constructor_standings = spark.table(silver_catalog_schema + ".constructor_standings")
df_races = spark.table(silver_catalog_schema + ".races").select("raceId", col("name").alias("race"), "year", "date")
df_constructors = spark.table(silver_catalog_schema + ".constructors").select("constructorId", col("name").alias("constructor"))
df_constructor_standings = df_constructor_standings.join(df_races, on="raceId", how="left")
df_constructor_standings = df_constructor_standings.join(df_constructors, on="constructorId", how="left")
df_constructor_standings = df_constructor_standings.drop("raceId", "constructorId", "positionText")
display(df_constructor_standings)

In [0]:
df_constructor_standings.write.mode("overwrite").saveAsTable(gold_catalog_schema + ".constructor_standings")

## Constructors

In [0]:
df_constructors = spark.table(silver_catalog_schema + ".constructors")
df_constructors = df_constructors.drop("constructorRef", "url")
display(df_constructors)

In [0]:
df_constructors.write.mode("overwrite").saveAsTable(gold_catalog_schema + ".constructors")

## Driver Standings

In [0]:
from pyspark.sql.functions import col, concat, lit

df_driver_standings = spark.table(silver_catalog_schema + ".driver_standings")
df_races = spark.table(silver_catalog_schema + ".races").select(
    "raceId",
    col("name").alias("race"),
    "year",
    "date"
)
df_drivers = spark.table(silver_catalog_schema + ".drivers").select(
    "driverId",
    concat(col("forename"), lit(" "), col("surname")).alias("driver")
)
df_driver_standings = df_driver_standings.join(df_races, on="raceId", how="left")
df_driver_standings = df_driver_standings.join(df_drivers, on="driverId", how="left")
df_driver_standings = df_driver_standings.drop("raceId", "driverId", "positionText")
display(df_driver_standings)

In [0]:
df_driver_standings.write.mode("overwrite").saveAsTable(gold_catalog_schema + ".driver_standings")

## Drivers

In [0]:
from pyspark.sql.functions import concat, col, lit

df_drivers = spark.table(silver_catalog_schema + ".drivers")
df_drivers = df_drivers.withColumn("driver", concat(col("forename"), lit(" "), col("surname")))
df_drivers = df_drivers.drop("code", "url", "driverRef", "forename", "surname")
display(df_drivers)

In [0]:
df_drivers.write.mode("overwrite").saveAsTable(gold_catalog_schema + ".drivers")

## Lap Times

In [0]:
df_lap_times = spark.table(silver_catalog_schema + ".lap_times")
df_races = spark.table(silver_catalog_schema + ".races").select("raceId", col("name").alias("race"), "year", "date")
df_drivers = spark.table(silver_catalog_schema + ".drivers").select(
    "driverId",
    concat(col("forename"), lit(" "), col("surname")).alias("driver")
)
df_lap_times = df_lap_times.join(df_races, on="raceId", how="left")
df_lap_times = df_lap_times.join(df_drivers, on="driverId", how="left")
df_lap_times = df_lap_times.withColumn("seconds", col("milliseconds") / 1000)
df_lap_times = df_lap_times.drop("driverId", "raceId", "driverRef", "forename", "surname", "time", "milliseconds")
display(df_lap_times)

In [0]:
df_lap_times.write.mode("overwrite").saveAsTable(gold_catalog_schema + ".lap_times")

## Pit Stops

In [0]:
from pyspark.sql.functions import col, concat, lit, expr

df_pit_stops = spark.table(silver_catalog_schema + ".pit_stops")
df_races = spark.table(silver_catalog_schema + ".races").select(
    "raceId",
    col("name").alias("race"),
    "year"
)
df_drivers = spark.table(silver_catalog_schema + ".drivers").select(
    "driverId",
    concat(col("forename"), lit(" "), col("surname")).alias("driver")
)
df_pit_stops = df_pit_stops.join(df_races, on="raceId", how="left")
df_pit_stops = df_pit_stops.join(df_drivers, on="driverId", how="left")

df_pit_stops = df_pit_stops.withColumn("seconds", col("milliseconds") / 1000)
df_pit_stops = df_pit_stops.drop("driverId", "raceId", "milliseconds", "duration")
display(df_pit_stops)

In [0]:
df_pit_stops.write.mode("overwrite").saveAsTable(gold_catalog_schema + ".pit_stops")

## Qualifying

In [0]:
from pyspark.sql.functions import col, concat, lit

df_qualifying = spark.table(silver_catalog_schema + ".qualifying")
df_races = spark.table(silver_catalog_schema + ".races").select(
    "raceId",
    col("name").alias("race"),
    "year"
)
df_drivers = spark.table(silver_catalog_schema + ".drivers").select(
    "driverId",
    concat(col("forename"), lit(" "), col("surname")).alias("driver")
)
df_constructors = spark.table(silver_catalog_schema + ".constructors").select("constructorId", col("name"))
df_qualifying = df_qualifying.join(df_races, on="raceId", how="left")
df_qualifying = df_qualifying.join(df_drivers, on="driverId", how="left")
df_qualifying = df_qualifying.join(df_constructors, on="constructorId", how="left")

df_qualifying = df_qualifying.withColumnRenamed("name", "constructor")
df_qualifying = df_qualifying.drop("constructorId","driverId", "raceId", "q1", "q2", "q3")
display(df_qualifying)

In [0]:
df_qualifying.write.mode("overwrite").saveAsTable(gold_catalog_schema + ".qualifying")

## Races

In [0]:
from pyspark.sql.functions import col, concat, lit

df_races = spark.table(silver_catalog_schema + ".races")
df_circuits = spark.table(silver_catalog_schema + ".circuits").select("circuitId", col("name").alias("circuit"),"country")
df_races = df_races.join(df_circuits, on="circuitId", how="left")

df_races = df_races.drop("circuitId", "raceId", "url", "fp1_date", "fp1_time", "fp2_date", "fp2_time", "fp3_date", "fp3_time", "quali_date", "quali_time", "sprint_date", "sprint_time")

display(df_races)

In [0]:
df_races.write.mode("overwrite").saveAsTable(gold_catalog_schema + ".races")

## Results

In [0]:
from pyspark.sql.functions import col, concat, lit

df_results = spark.table(silver_catalog_schema + ".results")
df_races = spark.table(silver_catalog_schema + ".races").select(
    "raceId",
    col("name").alias("race"),
    "year"
)
df_drivers = spark.table(silver_catalog_schema + ".drivers").select(
    "driverId",
    concat(col("forename"), lit(" "), col("surname")).alias("driver")
)
df_constructors = spark.table(silver_catalog_schema + ".constructors").select("constructorId", col("name").alias("constructorName"))
df_status = spark.table(silver_catalog_schema + ".status").select("statusId", "status")

df_results = df_results.join(df_races, on="raceId", how="left")
df_results = df_results.join(df_drivers, on="driverId", how="left")
df_results = df_results.join(df_constructors, on="constructorId", how="left")
df_results = df_results.join(df_status, on="statusId", how="left")

df_results = df_results.drop("constructorId","driverId","raceId","positionText","statusId")

display(df_results)

In [0]:
df_results.write.mode("overwrite").saveAsTable(gold_catalog_schema + ".results")

## Seasons

In [0]:
df_seasons = spark.table(silver_catalog_schema + ".seasons")
df_seasons = df_seasons.drop("url")

display(df_seasons)

In [0]:
df_seasons.write.mode("overwrite").saveAsTable(gold_catalog_schema + ".seasons")

## Sprint Results

In [0]:
df_sprint_results = spark.table(silver_catalog_schema + ".sprint_results")
df_races = spark.table(silver_catalog_schema + ".races").select(
    "raceId",
    col("name").alias("race"),
    "year"
)
df_drivers = spark.table(silver_catalog_schema + ".drivers").select(
    "driverId",
    concat(col("forename"), lit(" "), col("surname")).alias("driver")
)
df_constructors = spark.table(silver_catalog_schema + ".constructors").select("constructorId", col("name").alias("constructorName"))
df_status = spark.table(silver_catalog_schema + ".status").select("statusId", "status")

df_sprint_results = df_sprint_results.join(df_races, on="raceId", how="left")
df_sprint_results = df_sprint_results.join(df_drivers, on="driverId", how="left")
df_sprint_results = df_sprint_results.join(df_constructors, on="constructorId", how="left")
df_sprint_results = df_sprint_results.join(df_status, on="statusId", how="left")
df_sprint_results = df_results.drop("constructorId","driverId","raceId","positionText","statusId")

display(df_sprint_results)

In [0]:
df_sprint_results.write.mode("overwrite").saveAsTable(gold_catalog_schema + ".sprint_results")

## Table Maintenance

In [0]:
spark.sql(
    f"ALTER TABLE {gold_catalog_schema}.constructor_standings SET TBLPROPERTIES ('delta.columnMapping.mode' = 'name')"
)
spark.sql(
    f"ALTER TABLE {gold_catalog_schema}.constructor_standings DROP COLUMNS (positionText)"
)