In [0]:
dbutils.widgets.text("p_data_source", "")

In [0]:
v_data_source = dbutils.widgets.get("p_data_source")

In [0]:
%run "../Includes/configs" 

In [0]:
%run "../SetUp/setup"

### Access Azure Data Lake using Service Principal
**Steps to follow:**
1. Register Azure AD Application/ Service Principal
2. Generate a secret/ password for the application
3. Set spark config with App/ Client Id, Directory/ Tenant Id & Secret
4. Assign role "Storage Blob Data Contributor" to the Data Lake

path,name,size,modificationTime
abfss://raw@forrmulaa1dl.dfs.core.windows.net/circuits.csv,circuits.csv,10044,1767877118000
abfss://raw@forrmulaa1dl.dfs.core.windows.net/constructors.json,constructors.json,30415,1767877118000
abfss://raw@forrmulaa1dl.dfs.core.windows.net/drivers.json,drivers.json,180812,1767877118000
abfss://raw@forrmulaa1dl.dfs.core.windows.net/lap_times/,lap_times/,0,1767877145000
abfss://raw@forrmulaa1dl.dfs.core.windows.net/pit_stops.json,pit_stops.json,1369387,1767877119000
abfss://raw@forrmulaa1dl.dfs.core.windows.net/qualifying/,qualifying/,0,1767877183000
abfss://raw@forrmulaa1dl.dfs.core.windows.net/races.csv,races.csv,116847,1767877118000
abfss://raw@forrmulaa1dl.dfs.core.windows.net/results.json,results.json,7165641,1767877120000


_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8
circuitId,circuitRef,name,location,country,lat,lng,alt,url
1,albert_park,Albert Park Grand Prix Circuit,Melbourne,Australia,-37.8497,144.968,10,http://en.wikipedia.org/wiki/Melbourne_Grand_Prix_Circuit
2,sepang,Sepang International Circuit,Kuala Lumpur,Malaysia,2.76083,101.738,18,http://en.wikipedia.org/wiki/Sepang_International_Circuit
3,bahrain,Bahrain International Circuit,Sakhir,Bahrain,26.0325,50.5106,7,http://en.wikipedia.org/wiki/Bahrain_International_Circuit
4,catalunya,Circuit de Barcelona-Catalunya,Montmeló,Spain,41.57,2.26111,109,http://en.wikipedia.org/wiki/Circuit_de_Barcelona-Catalunya
5,istanbul,Istanbul Park,Istanbul,Turkey,40.9517,29.405,130,http://en.wikipedia.org/wiki/Istanbul_Park
6,monaco,Circuit de Monaco,Monte-Carlo,Monaco,43.7347,7.42056,7,http://en.wikipedia.org/wiki/Circuit_de_Monaco
7,villeneuve,Circuit Gilles Villeneuve,Montreal,Canada,45.5,-73.5228,13,http://en.wikipedia.org/wiki/Circuit_Gilles_Villeneuve
8,magny_cours,Circuit de Nevers Magny-Cours,Magny Cours,France,46.8642,3.16361,228,http://en.wikipedia.org/wiki/Circuit_de_Nevers_Magny-Cours
9,silverstone,Silverstone Circuit,Silverstone,UK,52.0786,-1.01694,153,http://en.wikipedia.org/wiki/Silverstone_Circuit


In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

In [0]:
races_schema = StructType(fields=[StructField("raceId", IntegerType(), False),
                                  StructField("year", IntegerType(), True),
                                  StructField("round", IntegerType(), True),
                                  StructField("circuitId", IntegerType(), True),
                                  StructField("name", StringType(), True),
                                  StructField("date", DateType(), True),
                                  StructField("time", StringType(), True),
                                  StructField("url", StringType(), True)])

In [0]:
races_df = spark.read.option("header", True).schema(races_schema).csv(f"{raw_folder_path}/races.csv")

**Add ingestion date and race_timestamp to the dataframe**

In [0]:
from pyspark.sql.functions import current_timestamp, to_timestamp, lit, concat, col

In [0]:
races_with_timestamp_df = races_df.withColumn("ingestion_date", current_timestamp()).withColumn("race_timestamp", to_timestamp(concat(col("date"), lit(" "), col("time")), "yyyy-MM-dd HH:mm:ss")).withColumn("data_source", lit(v_data_source))

In [0]:
display(races_with_timestamp_df)

raceId,year,round,circuitId,name,date,time,url,ingestion_date,race_timestamp,data_source
1,2009,1,1,Australian Grand Prix,2009-03-29,06:00:00,http://en.wikipedia.org/wiki/2009_Australian_Grand_Prix,2026-01-10T11:41:30.822224Z,2009-03-29T06:00:00Z,Ergast API
2,2009,2,2,Malaysian Grand Prix,2009-04-05,09:00:00,http://en.wikipedia.org/wiki/2009_Malaysian_Grand_Prix,2026-01-10T11:41:30.822224Z,2009-04-05T09:00:00Z,Ergast API
3,2009,3,17,Chinese Grand Prix,2009-04-19,07:00:00,http://en.wikipedia.org/wiki/2009_Chinese_Grand_Prix,2026-01-10T11:41:30.822224Z,2009-04-19T07:00:00Z,Ergast API
4,2009,4,3,Bahrain Grand Prix,2009-04-26,12:00:00,http://en.wikipedia.org/wiki/2009_Bahrain_Grand_Prix,2026-01-10T11:41:30.822224Z,2009-04-26T12:00:00Z,Ergast API
5,2009,5,4,Spanish Grand Prix,2009-05-10,12:00:00,http://en.wikipedia.org/wiki/2009_Spanish_Grand_Prix,2026-01-10T11:41:30.822224Z,2009-05-10T12:00:00Z,Ergast API
6,2009,6,6,Monaco Grand Prix,2009-05-24,12:00:00,http://en.wikipedia.org/wiki/2009_Monaco_Grand_Prix,2026-01-10T11:41:30.822224Z,2009-05-24T12:00:00Z,Ergast API
7,2009,7,5,Turkish Grand Prix,2009-06-07,12:00:00,http://en.wikipedia.org/wiki/2009_Turkish_Grand_Prix,2026-01-10T11:41:30.822224Z,2009-06-07T12:00:00Z,Ergast API
8,2009,8,9,British Grand Prix,2009-06-21,12:00:00,http://en.wikipedia.org/wiki/2009_British_Grand_Prix,2026-01-10T11:41:30.822224Z,2009-06-21T12:00:00Z,Ergast API
9,2009,9,20,German Grand Prix,2009-07-12,12:00:00,http://en.wikipedia.org/wiki/2009_German_Grand_Prix,2026-01-10T11:41:30.822224Z,2009-07-12T12:00:00Z,Ergast API
10,2009,10,11,Hungarian Grand Prix,2009-07-26,12:00:00,http://en.wikipedia.org/wiki/2009_Hungarian_Grand_Prix,2026-01-10T11:41:30.822224Z,2009-07-26T12:00:00Z,Ergast API


**Select only the columns we need**

In [0]:
races_selected_df = races_with_timestamp_df.select(col("raceId").alias("race_id"), col("year").alias("race_year"), col("round"), col("circuitId").alias("circuit_id"), col("name"), col("ingestion_date"), col("race_timestamp")).withColumn("data_source", lit(v_data_source))

In [0]:
display(races_selected_df)

race_id,race_year,round,circuit_id,name,ingestion_date,race_timestamp,data_source
1,2009,1,1,Australian Grand Prix,2026-01-10T11:42:07.306167Z,2009-03-29T06:00:00Z,Ergast API
2,2009,2,2,Malaysian Grand Prix,2026-01-10T11:42:07.306167Z,2009-04-05T09:00:00Z,Ergast API
3,2009,3,17,Chinese Grand Prix,2026-01-10T11:42:07.306167Z,2009-04-19T07:00:00Z,Ergast API
4,2009,4,3,Bahrain Grand Prix,2026-01-10T11:42:07.306167Z,2009-04-26T12:00:00Z,Ergast API
5,2009,5,4,Spanish Grand Prix,2026-01-10T11:42:07.306167Z,2009-05-10T12:00:00Z,Ergast API
6,2009,6,6,Monaco Grand Prix,2026-01-10T11:42:07.306167Z,2009-05-24T12:00:00Z,Ergast API
7,2009,7,5,Turkish Grand Prix,2026-01-10T11:42:07.306167Z,2009-06-07T12:00:00Z,Ergast API
8,2009,8,9,British Grand Prix,2026-01-10T11:42:07.306167Z,2009-06-21T12:00:00Z,Ergast API
9,2009,9,20,German Grand Prix,2026-01-10T11:42:07.306167Z,2009-07-12T12:00:00Z,Ergast API
10,2009,10,11,Hungarian Grand Prix,2026-01-10T11:42:07.306167Z,2009-07-26T12:00:00Z,Ergast API


**Write the output to processed container in parquet format**

In [0]:
races_selected_df.write.mode("overwrite").partitionBy("race_year").parquet(f"{processed_folder_path}/races")

In [0]:
display(spark.read.parquet(f"{processed_folder_path}/races"))

race_id,round,circuit_id,name,ingestion_date,race_timestamp,data_source,race_year
1053,2,21,Emilia Romagna Grand Prix,2026-01-10T11:42:15.655963Z,2021-04-18T13:00:00Z,Ergast API,2021
1052,1,3,Bahrain Grand Prix,2026-01-10T11:42:15.655963Z,2021-03-28T15:00:00Z,Ergast API,2021
1051,21,1,Australian Grand Prix,2026-01-10T11:42:15.655963Z,2021-11-21T06:00:00Z,Ergast API,2021
1054,3,20,TBC,2026-01-10T11:42:15.655963Z,,Ergast API,2021
1055,4,4,Spanish Grand Prix,2026-01-10T11:42:15.655963Z,2021-05-09T13:00:00Z,Ergast API,2021
1056,5,6,Monaco Grand Prix,2026-01-10T11:42:15.655963Z,2021-05-23T13:00:00Z,Ergast API,2021
1057,6,73,Azerbaijan Grand Prix,2026-01-10T11:42:15.655963Z,2021-06-06T12:00:00Z,Ergast API,2021
1058,7,7,Canadian Grand Prix,2026-01-10T11:42:15.655963Z,2021-06-13T18:00:00Z,Ergast API,2021
1059,8,34,French Grand Prix,2026-01-10T11:42:15.655963Z,2021-06-27T13:00:00Z,Ergast API,2021
1060,9,70,Austrian Grand Prix,2026-01-10T11:42:15.655963Z,2021-07-04T13:00:00Z,Ergast API,2021
