In [0]:
%run "../includes/configuration"

In [0]:
spark.read.json(f"{raw_folder_path}/2021-03-21/results.json").createOrReplaceTempView("results_cutover")

In [0]:
%sql
SELECT raceId, count(1)
FROM results_cutover
GROUP BY raceId
ORDER BY raceId DESC;

raceId,count(1)
1047,20
1046,20
1045,20
1044,20
1043,20
1042,20
1041,20
1040,20
1039,20
1038,20


In [0]:
spark.read.json(f"{raw_folder_path}/2021-03-28/results.json").createOrReplaceTempView("results_w1")

In [0]:
%sql
SELECT raceId, count(1)
FROM results_w1
GROUP BY raceId
ORDER BY raceId DESC;

raceId,count(1)
1052,20


In [0]:
spark.read.json(f"{raw_folder_path}/2021-04-18/results.json").createOrReplaceTempView("results_w2")

In [0]:
%sql
SELECT raceId, count(1)
FROM results_w2
GROUP BY raceId
ORDER BY raceId DESC;

raceId,count(1)
1053,20


## Ingest results.json file
Step 1- Read the json file using the spark dataframe reader API

In [0]:
%run "../includes/configuration"

In [0]:
dbutils.widgets.text("p_data_source","")
v_data_source = dbutils.widgets.get("p_data_source")

In [0]:
dbutils.widgets.text("p_file_date","2021-03-28")
v_file_date = dbutils.widgets.get("p_file_date")

In [0]:
%run "../includes/common_functions"

In [0]:
from pyspark.sql.types import StructType, StringType, IntegerType, DoubleType, StructField, TimestampType, DateType, FloatType
from pyspark.sql.functions import col, struct,current_timestamp , concat, lit

In [0]:
results_schema = StructType(fields=[StructField("resultId", IntegerType(), False),
                                    StructField("raceId", IntegerType(), True),
                                    StructField("driverId", IntegerType(), True),
                                    StructField("constructorId", IntegerType(), True),
                                    StructField("number", IntegerType(), True),
                                    StructField("grid", IntegerType(), True),
                                    StructField("position", IntegerType(), True),
                                    StructField("positionText", StringType(), True),
                                    StructField("positionOrder", IntegerType(), True),
                                    StructField("points", FloatType(), True),
                                    StructField("laps", IntegerType(), True),
                                    StructField("time", StringType(), True),
                                    StructField("milliseconds", IntegerType(), True),
                                    StructField("fastestLap", IntegerType(), True),
                                    StructField("rank", IntegerType(), True),
                                    StructField("fastestLapTime", StringType(), True),
                                    StructField("fastestLapSpeed", StringType(), True),
                                    StructField("statusId", StringType(), True)])

In [0]:
# Load the JSON file
results_df = spark.read\
    .schema(results_schema)\
    .json(f"{raw_folder_path}/{v_file_date}/results.json")

In [0]:
results_df= results_df.withColumn("fastestLapSpeed", col("fastestLapSpeed").cast(FloatType()))   

In [0]:
display(results_df)

resultId,raceId,driverId,constructorId,number,grid,position,positionText,positionOrder,points,laps,time,milliseconds,fastestLap,rank,fastestLapTime,fastestLapSpeed,statusId
24986,1053,830,9,33,3,1.0,1,1,25.0,63,2:02:34.598,7354598.0,60.0,2,1:17.524,227.96,1
24987,1053,1,131,44,1,2.0,2,2,19.0,63,+22.000,7376598.0,60.0,1,1:16.702,230.403,1
24988,1053,846,1,4,7,3.0,3,3,15.0,63,+23.702,7378300.0,63.0,3,1:18.259,225.819,1
24989,1053,844,6,16,4,4.0,4,4,12.0,63,+25.579,7380177.0,60.0,6,1:18.379,225.473,1
24990,1053,832,6,55,11,5.0,5,5,10.0,63,+27.036,7381634.0,60.0,7,1:18.490,225.154,1
24991,1053,817,1,3,6,6.0,6,6,8.0,63,+51.220,7405818.0,54.0,12,1:19.341,222.739,1
24992,1053,842,213,10,5,7.0,7,7,6.0,63,+52.818,7407416.0,52.0,9,1:18.994,223.718,1
24993,1053,840,117,18,10,8.0,8,8,4.0,63,+56.909,7411507.0,59.0,8,1:18.782,224.32,1
24994,1053,839,214,31,9,9.0,9,9,2.0,63,+65.704,7420302.0,62.0,15,1:19.422,222.512,1
24995,1053,4,214,14,15,10.0,10,10,1.0,63,+66.561,7421159.0,62.0,14,1:19.417,222.526,1


In [0]:
results_df.printSchema()

root
 |-- resultId: integer (nullable = true)
 |-- raceId: integer (nullable = true)
 |-- driverId: integer (nullable = true)
 |-- constructorId: integer (nullable = true)
 |-- number: integer (nullable = true)
 |-- grid: integer (nullable = true)
 |-- position: integer (nullable = true)
 |-- positionText: string (nullable = true)
 |-- positionOrder: integer (nullable = true)
 |-- points: float (nullable = true)
 |-- laps: integer (nullable = true)
 |-- time: string (nullable = true)
 |-- milliseconds: integer (nullable = true)
 |-- fastestLap: integer (nullable = true)
 |-- rank: integer (nullable = true)
 |-- fastestLapTime: string (nullable = true)
 |-- fastestLapSpeed: float (nullable = true)
 |-- statusId: string (nullable = true)



#### Step 2 - Rename cloumns and add ingestion date

In [0]:
results_with_columns_df = results_df.withColumnRenamed("resultId", "result_id")\
                                    .withColumnRenamed("raceId", "race_id")\
                                    .withColumnRenamed("driverId", "driver_id")\
                                    .withColumnRenamed("constructorId", "constructor_id")\
                                    .withColumnRenamed("positionText", "position_text")\
                                    .withColumnRenamed("positionOrder", "position_order")\
                                    .withColumnRenamed("fastestLap", "fastest_lap")\
                                    .withColumnRenamed("fastestLapTime", "fastest_lap_time")\
                                    .withColumnRenamed("fastestLapSpeed", "fastest_lap_speed")\
                                    .withColumn("data_source", lit(v_data_source))\
                                    .withColumn("file_date", lit(v_file_date))
                                    
                                           

In [0]:
results_with_columns_df = add_ingestion_date(results_with_columns_df)

In [0]:
display(results_with_columns_df)

result_id,race_id,driver_id,constructor_id,number,grid,position,position_text,position_order,points,laps,time,milliseconds,fastest_lap,rank,fastest_lap_time,fastest_lap_speed,statusId,data_source,file_date,ingestion_date
24986,1053,830,9,33,3,1.0,1,1,25.0,63,2:02:34.598,7354598.0,60.0,2,1:17.524,227.96,1,test,2021-04-18,2025-04-10T23:02:57.171+0000
24987,1053,1,131,44,1,2.0,2,2,19.0,63,+22.000,7376598.0,60.0,1,1:16.702,230.403,1,test,2021-04-18,2025-04-10T23:02:57.171+0000
24988,1053,846,1,4,7,3.0,3,3,15.0,63,+23.702,7378300.0,63.0,3,1:18.259,225.819,1,test,2021-04-18,2025-04-10T23:02:57.171+0000
24989,1053,844,6,16,4,4.0,4,4,12.0,63,+25.579,7380177.0,60.0,6,1:18.379,225.473,1,test,2021-04-18,2025-04-10T23:02:57.171+0000
24990,1053,832,6,55,11,5.0,5,5,10.0,63,+27.036,7381634.0,60.0,7,1:18.490,225.154,1,test,2021-04-18,2025-04-10T23:02:57.171+0000
24991,1053,817,1,3,6,6.0,6,6,8.0,63,+51.220,7405818.0,54.0,12,1:19.341,222.739,1,test,2021-04-18,2025-04-10T23:02:57.171+0000
24992,1053,842,213,10,5,7.0,7,7,6.0,63,+52.818,7407416.0,52.0,9,1:18.994,223.718,1,test,2021-04-18,2025-04-10T23:02:57.171+0000
24993,1053,840,117,18,10,8.0,8,8,4.0,63,+56.909,7411507.0,59.0,8,1:18.782,224.32,1,test,2021-04-18,2025-04-10T23:02:57.171+0000
24994,1053,839,214,31,9,9.0,9,9,2.0,63,+65.704,7420302.0,62.0,15,1:19.422,222.512,1,test,2021-04-18,2025-04-10T23:02:57.171+0000
24995,1053,4,214,14,15,10.0,10,10,1.0,63,+66.561,7421159.0,62.0,14,1:19.417,222.526,1,test,2021-04-18,2025-04-10T23:02:57.171+0000


#### Step 3 - Drop unwanted columns from the dataframe

In [0]:
results_final_df =results_with_columns_df.drop(col("statusId"))

de-dupe the dataframe

In [0]:
results_final_df = results_final_df.dropDuplicates(["driver_id", "race_id"])


#### Step 4 - Write to output to processed container in parquet format

method 1

In [0]:
# for race_id_list in results_final_df.select("race_id").distinct().collect():
#     if (spark._jsparkSession.catalog().tableExists("f1_processed.results")):
#      spark.sql(f"ALTER TABLE f1_processed.results DROP IF EXISTS PARTITION (race_id={race_id_list.race_id})")

In [0]:
##results_final_df.write.mode("overwrite").partitionBy("race_id").parquet(f"{processed_folder_path}/results")
# results_final_df.write.mode("append").partitionBy("race_id").format("parquet").saveAsTable("f1_processed.results")

Method 2

In [0]:
#overwrite_partition(results_final_df, "f1_processed", "results", "race_id")

In [0]:
merge_condition ="tgt.result_id =src.result_id AND tgt.race_id = src.race_id"
merge_delta_data (results_final_df,"f1_processed", "results",processed_folder_path,merge_condition,"race_id")

In [0]:
%sql
SELECT * FROM f1_processed.results

result_id,race_id,driver_id,constructor_id,number,grid,position,position_text,position_order,points,laps,time,milliseconds,fastest_lap,rank,fastest_lap_time,fastest_lap_speed,data_source,file_date,ingestion_date
21664,878,1,1,4.0,2,1.0,1,1,25.0,56,1:35:55.269,5755269.0,55.0,5.0,1:39.709,199.047,test,2021-03-21,2025-04-10T22:51:00.321+0000
21666,878,4,6,5.0,7,3.0,3,3,15.0,56,+39.229,5794498.0,56.0,4.0,1:39.672,199.121,test,2021-03-21,2025-04-10T22:51:00.321+0000
21681,878,5,207,20.0,22,18.0,18,18,0.0,55,\N,,50.0,19.0,1:43.072,192.552,test,2021-03-21,2025-04-10T22:51:00.321+0000
21669,878,8,208,9.0,4,6.0,6,6,8.0,56,+64.425,5819694.0,56.0,3.0,1:39.474,199.517,test,2021-03-21,2025-04-10T22:51:00.321+0000
21682,878,10,206,24.0,19,19.0,19,19,0.0,55,\N,,40.0,20.0,1:43.324,192.083,test,2021-03-21,2025-04-10T22:51:00.321+0000
21667,878,13,6,6.0,11,4.0,4,4,12.0,56,+46.013,5801282.0,56.0,2.0,1:39.402,199.661,test,2021-03-21,2025-04-10T22:51:00.321+0000
21686,878,17,9,2.0,3,,R,23,0.0,16,\N,,13.0,21.0,1:43.599,191.573,test,2021-03-21,2025-04-10T22:51:00.321+0000
21668,878,18,1,3.0,12,5.0,5,5,10.0,56,+56.432,5811701.0,54.0,6.0,1:40.150,198.17,test,2021-03-21,2025-04-10T22:51:00.321+0000
21665,878,20,9,1.0,1,2.0,2,2,18.0,56,+0.675,5755944.0,56.0,1.0,1:39.347,199.772,test,2021-03-21,2025-04-10T22:51:00.321+0000
21679,878,30,131,7.0,5,16.0,16,16,0.0,55,\N,,54.0,15.0,1:40.923,196.652,test,2021-03-21,2025-04-10T22:51:00.321+0000


In [0]:
%sql
--DROP TABLE f1_processed.results;

In [0]:
%fs
ls /mnt/formula1dl2025practice/processed/results

path,name,size,modificationTime
dbfs:/mnt/formula1dl2025practice/processed/results/_delta_log/,_delta_log/,0,1744325460000
dbfs:/mnt/formula1dl2025practice/processed/results/race_id=1/,race_id=1/,0,1744325462000
dbfs:/mnt/formula1dl2025practice/processed/results/race_id=10/,race_id=10/,0,1744325463000
dbfs:/mnt/formula1dl2025practice/processed/results/race_id=100/,race_id=100/,0,1744325472000
dbfs:/mnt/formula1dl2025practice/processed/results/race_id=1000/,race_id=1000/,0,1744325561000
dbfs:/mnt/formula1dl2025practice/processed/results/race_id=1001/,race_id=1001/,0,1744325561000
dbfs:/mnt/formula1dl2025practice/processed/results/race_id=1002/,race_id=1002/,0,1744325561000
dbfs:/mnt/formula1dl2025practice/processed/results/race_id=1003/,race_id=1003/,0,1744325561000
dbfs:/mnt/formula1dl2025practice/processed/results/race_id=1004/,race_id=1004/,0,1744325561000
dbfs:/mnt/formula1dl2025practice/processed/results/race_id=1005/,race_id=1005/,0,1744325561000


In [0]:
#display(spark.read.parquet("/mnt/formula1dl2025practice/processed/results"))

In [0]:
dbutils.notebook.exit("Success")

In [0]:
%sql
SELECT race_id, 
COUNT(1)
 FROM f1_processed.results
 GROUP BY race_id
 ORDER BY race_id DESC;

In [0]:
%sql

SELECT max(race_year)
FROM f1_processed.results;