In [0]:
%run "/Workspace/Formula1Project/includes/configuration"

In [0]:
%run "/Workspace/Formula1Project/includes/common_functions"

In [0]:
account_key = dbutils.secrets.get(scope="formala1-scope", key="account-key")
spark.conf.set("fs.azure.account.key.formala1datalake.dfs.core.windows.net", account_key)

In [0]:
dbutils.widgets.text("p_data_source", "")
v_data_source = dbutils.widgets.get("p_data_source")
print(v_data_source)

In [0]:
dbutils.widgets.text("p_file_date", "2020-01-01")
v_file_date = dbutils.widgets.get("p_file_date")
print(v_file_date)

In [0]:
results_schema = "resultId INT, raceId INT, driverId INT, constructorId INT, number INT, grid INT, position INT, positionText STRING, positionOrder INT, points INT, laps INT, time STRING, milliseconds INT, fastestLap INT, rank INT, fastestLapTime STRING, fastestLapSpeed STRING, statusId STRING"

In [0]:
results_df = spark.read.json(f"abfss://raw@formala1datalake.dfs.core.windows.net/{v_file_date}/results.json",schema=results_schema)

In [0]:
from pyspark.sql.functions import current_timestamp, lit
results_df = results_df.withColumnRenamed("resultId","result_id").withColumnRenamed("raceId","race_id").withColumnRenamed("driverId","driver_id").withColumnRenamed("constructorId","constructor_id").withColumnRenamed("positionText","position_text").withColumnRenamed("positionOrder","position_order").withColumnRenamed("fastestLap","fastest_lap").withColumnRenamed("fastestLapTime","fastest_lap_time").withColumnRenamed("fastestLapSpeed","fastest_lap_speed").withColumn("ingestion_date",current_timestamp()).withColumn("file_date",lit(v_file_date))

In [0]:
results_df = results_df.drop("statusId")

In [0]:
results_df = results_df.dropDuplicates(['race_id','driver_id'])

Incremental Load
1. By manually dropping the partition and replacing with the new one.
2. By using InsertInto method to avoid explicitly handling of partition.
3. Using delta table.

1. Replace the partion with new partition data.
So first drop that partition, and then append with new data.
Note here the table will will get appended on the new data available. So it wont have all the data at once.

In [0]:
display(results_df)

In [0]:
# Dropping the partition of the old data to replace it with new data.
#for race_ids in results_df.select("race_id").distinct().collect():
 #   spark.sql(f"ALTER TABLE f1_processed.results DROP IF EXISTS PARTITION (race_id={race_ids.race_id})")

In [0]:
# Adding the new partition data.
#results_df.write.mode("append").format("parquet").partitionBy("race_id").saveAsTable("f1_processed.results")

2. When working with partitioned tables, insertInto is efficient because it automatically inserts data into the correct partition without requiring explicit partition handling.
When executing .insertInto(), Spark:

-     Identifies the partition column (race_id) in the target table.
-     Writes each record to the correct partition based on the year column.
-     Optimizes partition management, reducing unnecessary writes.

Use dynamic partition overwrite to retain existing partitions.


In [0]:
#spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
#overwrite_partition(results_df,race_id, f1_processed.results)

3. Using Delta Table to merge the incoming data with the existing one.

In [0]:
#spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning", True) 
"""
At runtime, Spark extracts the partition values from partitions. 
It prunes irrelevant partitions, scanning only the required partitions.
Performance Boost: Less data read = faster execution & lower costs.
"""

In [0]:
merge_condition = "tgt.race_id = src.race_id and tgt.result_id = src.result_id"
merge_data(results_df, processed_path, "f1_processed", "results", "race_id", merge_condition)