## Produce driver ranking summary

In [None]:
# Define parameters (can set parameters in a workflow job)
target_type   =oidlUtils.parameters.getParameter("TARGET_TYPE", "table")
target_format =oidlUtils.parameters.getParameter("TARGET_FORMAT", "delta")
silver_catalog    = "f1_silver"
gold_catalog    = "f1_gold"
adw_catalog = "f1_gold_adw"
silver_schema     = "silver"
gold_schema     = "gold"
adw_schema =      "f1_gold"
gold_table_dlt = "f1_drivers_ranking_dlt"
gold_table_par = "f1_drivers_ranking_par"


In [None]:
# Step 1: Read Silver Managed Tables
driver_standings_df = spark.read.table(f"{silver_catalog}.{silver_schema}.f1_driver_standings_dlt")
results_df = spark.read.table(f"{silver_catalog}.{silver_schema}.f1_results_dlt")
races_df   = spark.read.table(f"{silver_catalog}.{silver_schema}.f1_races_dlt")
drivers_df   = spark.read.table(f"{silver_catalog}.{silver_schema}.f1_drivers_dlt")

In [None]:
#step 2: Add race_year to driver_standing and aggrigate in the year level (not race level)
from pyspark.sql.functions import max, col

driver_summary_df = driver_standings_df \
    .join(races_df.select("race_id", "race_year"), on="race_id", how="inner") \
    .join(
        drivers_df.select("driver_id", "driver_ref"),
        on="driver_id", how="left"
    ) \
    .groupBy("race_year", "driver_id", "driver_ref") \
    .agg(
        max("points").alias("total_points"),
        max("wins").alias("total_wins")
    )

In [None]:
#step 3: Get constructor_id (team_id) per driver/year from results (join to races to get year)
driver_team_df = results_df \
    .join(races_df.select("race_id", "race_year"), on="race_id", how="inner") \
    .select("driver_id", "constructor_id", "race_year") \
    .dropna(subset=["driver_id", "constructor_id", "race_year"]) \
    .dropDuplicates(["driver_id", "race_year"])

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col, rank
from pyspark.sql.window import Window

In [None]:
#step 4: Final join + rank ---
final_driver_ranked_df = driver_summary_df \
    .join(driver_team_df, on=["driver_id", "race_year"], how="left") \
    .withColumnRenamed("driver_ref", "driver_ref_name") \
    .withColumnRenamed("constructor_id", "team_id") \
    .withColumn("team_name", col("team_id").cast("string")) \
    .withColumn(
        "rank",
        rank().over(
            Window.partitionBy("race_year")
                  .orderBy(col("total_points").desc(), col("total_wins").desc())
        )
    ) \
    .withColumn("update_ts", F.current_timestamp()) \
    .select("race_year", "driver_id", "driver_ref_name", "team_id", "team_name",
            "total_points", "total_wins", "rank", "update_ts")

In [None]:
if target_type == 'file':
    if target_format == 'parquet':
        final_driver_ranked_df.write.mode("overwrite").parquet(f"{gold_folder_path}/drivers_ranking")
elif target_type == 'table':
    if target_format == 'parquet':
        final_driver_ranked_df.write.mode("overwrite").format("parquet").saveAsTable(f"{gold_catalog}.{gold_schema}.f1_drivers_ranking_dlt")
    elif  target_format == 'delta':
        final_driver_ranked_df.write.mode("overwrite").partitionBy("race_year").format("delta").saveAsTable(f"{gold_catalog}.{gold_schema}.f1_drivers_ranking_dlt")
    elif target_format == 'adw':
        final_driver_ranked_df.write.insertInto(f"{adw_catalog}.{adw_schema}.f1_drivers_ranking")

In [None]:
%sql
select * from f1_demo_adw.demo_f1_history.f1_drivers_ranking