### Read the csv file using spark reader API

In [0]:
from pyspark.sql.types import StructType, StructField,IntegerType,StringType,DoubleType

In [0]:
circuits_schema = StructType(fields=[StructField("circuitId",IntegerType(),False),
                                     StructField("circuitRef",StringType(),True),
                                     StructField("name",StringType(),True),
                                     StructField("location",StringType(),True),
                                     StructField("country",StringType(),True),
                                     StructField("lat",DoubleType(),True),
                                     StructField("lng",DoubleType(),True),
                                     StructField("alt",DoubleType(),True),
                                     StructField("url",StringType(),True)
                                     ])

In [0]:
circuits_df =spark.read \
    .option("header",True) \
        .schema(circuits_schema) \
        .csv("abfss://raw@formulaonedl01.dfs.core.windows.net/circuits.csv")

In [0]:
display(circuits_df)

circuitId,circuitRef,name,location,country,lat,lng,alt,url
1,albert_park,Albert Park Grand Prix Circuit,Melbourne,Australia,-37.8497,144.968,10.0,http://en.wikipedia.org/wiki/Melbourne_Grand_Prix_Circuit
2,sepang,Sepang International Circuit,Kuala Lumpur,Malaysia,2.76083,101.738,18.0,http://en.wikipedia.org/wiki/Sepang_International_Circuit
3,bahrain,Bahrain International Circuit,Sakhir,Bahrain,26.0325,50.5106,7.0,http://en.wikipedia.org/wiki/Bahrain_International_Circuit
4,catalunya,Circuit de Barcelona-Catalunya,Montmeló,Spain,41.57,2.26111,109.0,http://en.wikipedia.org/wiki/Circuit_de_Barcelona-Catalunya
5,istanbul,Istanbul Park,Istanbul,Turkey,40.9517,29.405,130.0,http://en.wikipedia.org/wiki/Istanbul_Park
6,monaco,Circuit de Monaco,Monte-Carlo,Monaco,43.7347,7.42056,7.0,http://en.wikipedia.org/wiki/Circuit_de_Monaco
7,villeneuve,Circuit Gilles Villeneuve,Montreal,Canada,45.5,-73.5228,13.0,http://en.wikipedia.org/wiki/Circuit_Gilles_Villeneuve
8,magny_cours,Circuit de Nevers Magny-Cours,Magny Cours,France,46.8642,3.16361,228.0,http://en.wikipedia.org/wiki/Circuit_de_Nevers_Magny-Cours
9,silverstone,Silverstone Circuit,Silverstone,UK,52.0786,-1.01694,153.0,http://en.wikipedia.org/wiki/Silverstone_Circuit
10,hockenheimring,Hockenheimring,Hockenheim,Germany,49.3278,8.56583,103.0,http://en.wikipedia.org/wiki/Hockenheimring


In [0]:
circuits_df.printSchema()

root
 |-- circuitId: integer (nullable = true)
 |-- circuitRef: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- country: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lng: double (nullable = true)
 |-- alt: double (nullable = true)
 |-- url: string (nullable = true)



In [0]:
circuits_df.describe().show()

+-------+------------------+----------+-------+---------+---------+------------------+-----------------+-----------------+--------------------+
|summary|         circuitId|circuitRef|   name| location|  country|               lat|              lng|              alt|                 url|
+-------+------------------+----------+-------+---------+---------+------------------+-----------------+-----------------+--------------------+
|  count|                77|        77|     77|       77|       77|                77|               77|               77|                  77|
|   mean|              39.0|      NULL|   NULL|     NULL|     NULL| 33.72035103896102|3.551302597402597|247.4935064935065|                NULL|
| stddev|22.371857321197094|      NULL|   NULL|     NULL|     NULL|22.885969000074535| 64.8766790440326|363.2672505910991|                NULL|
|    min|                 1|       BAK|A1-Ring|Abu Dhabi|Argentina|          -37.8497|         -118.189|             -7.0|http://en.wiki

#### Select only Required Columns

In [0]:
circuits_selected_df = circuits_df.select("circuitId","circuitRef","name","location","country","lat","lng","alt")

In [0]:
display(circuits_selected_df)

circuitId,circuitRef,name,location,country,lat,lng,alt
1,albert_park,Albert Park Grand Prix Circuit,Melbourne,Australia,-37.8497,144.968,10.0
2,sepang,Sepang International Circuit,Kuala Lumpur,Malaysia,2.76083,101.738,18.0
3,bahrain,Bahrain International Circuit,Sakhir,Bahrain,26.0325,50.5106,7.0
4,catalunya,Circuit de Barcelona-Catalunya,Montmeló,Spain,41.57,2.26111,109.0
5,istanbul,Istanbul Park,Istanbul,Turkey,40.9517,29.405,130.0
6,monaco,Circuit de Monaco,Monte-Carlo,Monaco,43.7347,7.42056,7.0
7,villeneuve,Circuit Gilles Villeneuve,Montreal,Canada,45.5,-73.5228,13.0
8,magny_cours,Circuit de Nevers Magny-Cours,Magny Cours,France,46.8642,3.16361,228.0
9,silverstone,Silverstone Circuit,Silverstone,UK,52.0786,-1.01694,153.0
10,hockenheimring,Hockenheimring,Hockenheim,Germany,49.3278,8.56583,103.0


In [0]:
circuits_selected_df = circuits_df.select(circuits_df.circuitId,circuits_df.circuitRef,circuits_df.name,circuits_df.location,circuits_df.country.alias("race_country"),circuits_df.lat,circuits_df.lng,circuits_df.alt)

In [0]:
display(circuits_selected_df)

circuitId,circuitRef,name,location,race_country,lat,lng,alt
1,albert_park,Albert Park Grand Prix Circuit,Melbourne,Australia,-37.8497,144.968,10.0
2,sepang,Sepang International Circuit,Kuala Lumpur,Malaysia,2.76083,101.738,18.0
3,bahrain,Bahrain International Circuit,Sakhir,Bahrain,26.0325,50.5106,7.0
4,catalunya,Circuit de Barcelona-Catalunya,Montmeló,Spain,41.57,2.26111,109.0
5,istanbul,Istanbul Park,Istanbul,Turkey,40.9517,29.405,130.0
6,monaco,Circuit de Monaco,Monte-Carlo,Monaco,43.7347,7.42056,7.0
7,villeneuve,Circuit Gilles Villeneuve,Montreal,Canada,45.5,-73.5228,13.0
8,magny_cours,Circuit de Nevers Magny-Cours,Magny Cours,France,46.8642,3.16361,228.0
9,silverstone,Silverstone Circuit,Silverstone,UK,52.0786,-1.01694,153.0
10,hockenheimring,Hockenheimring,Hockenheim,Germany,49.3278,8.56583,103.0


In [0]:
from pyspark.sql.functions import col
circuits_selected_df = circuits_df.select(col("circuitId"),col("circuitRef"),col("name"),col("location"),col("country"),col("lat"),col("lng"),col("alt"))

In [0]:
display(circuits_selected_df)

circuitId,circuitRef,name,location,country,lat,lng,alt
1,albert_park,Albert Park Grand Prix Circuit,Melbourne,Australia,-37.8497,144.968,10.0
2,sepang,Sepang International Circuit,Kuala Lumpur,Malaysia,2.76083,101.738,18.0
3,bahrain,Bahrain International Circuit,Sakhir,Bahrain,26.0325,50.5106,7.0
4,catalunya,Circuit de Barcelona-Catalunya,Montmeló,Spain,41.57,2.26111,109.0
5,istanbul,Istanbul Park,Istanbul,Turkey,40.9517,29.405,130.0
6,monaco,Circuit de Monaco,Monte-Carlo,Monaco,43.7347,7.42056,7.0
7,villeneuve,Circuit Gilles Villeneuve,Montreal,Canada,45.5,-73.5228,13.0
8,magny_cours,Circuit de Nevers Magny-Cours,Magny Cours,France,46.8642,3.16361,228.0
9,silverstone,Silverstone Circuit,Silverstone,UK,52.0786,-1.01694,153.0
10,hockenheimring,Hockenheimring,Hockenheim,Germany,49.3278,8.56583,103.0


### rename the existing columns

In [0]:
circuits_renamed_df = circuits_selected_df.withColumnRenamed("circuitId","circuit_id") \
    .withColumnRenamed("circuitRef","circuit_ref") \
        .withColumnRenamed("lat","latitude") \
            .withColumnRenamed("lng","longitude") \
                .withColumnRenamed("alt","altitude")

In [0]:
display(circuits_renamed_df)

circuit_id,circuit_ref,name,location,country,latitude,longitude,altitude
1,albert_park,Albert Park Grand Prix Circuit,Melbourne,Australia,-37.8497,144.968,10.0
2,sepang,Sepang International Circuit,Kuala Lumpur,Malaysia,2.76083,101.738,18.0
3,bahrain,Bahrain International Circuit,Sakhir,Bahrain,26.0325,50.5106,7.0
4,catalunya,Circuit de Barcelona-Catalunya,Montmeló,Spain,41.57,2.26111,109.0
5,istanbul,Istanbul Park,Istanbul,Turkey,40.9517,29.405,130.0
6,monaco,Circuit de Monaco,Monte-Carlo,Monaco,43.7347,7.42056,7.0
7,villeneuve,Circuit Gilles Villeneuve,Montreal,Canada,45.5,-73.5228,13.0
8,magny_cours,Circuit de Nevers Magny-Cours,Magny Cours,France,46.8642,3.16361,228.0
9,silverstone,Silverstone Circuit,Silverstone,UK,52.0786,-1.01694,153.0
10,hockenheimring,Hockenheimring,Hockenheim,Germany,49.3278,8.56583,103.0


##### Add ingestion date to the dataframe

In [0]:
from pyspark.sql.functions import current_timestamp
circuits_final_df = circuits_renamed_df.withColumn("ingestion_date",current_timestamp())

In [0]:
display(circuits_final_df)

circuit_id,circuit_ref,name,location,country,latitude,longitude,altitude,ingestion_date
1,albert_park,Albert Park Grand Prix Circuit,Melbourne,Australia,-37.8497,144.968,10.0,2024-10-24T21:06:52.953Z
2,sepang,Sepang International Circuit,Kuala Lumpur,Malaysia,2.76083,101.738,18.0,2024-10-24T21:06:52.953Z
3,bahrain,Bahrain International Circuit,Sakhir,Bahrain,26.0325,50.5106,7.0,2024-10-24T21:06:52.953Z
4,catalunya,Circuit de Barcelona-Catalunya,Montmeló,Spain,41.57,2.26111,109.0,2024-10-24T21:06:52.953Z
5,istanbul,Istanbul Park,Istanbul,Turkey,40.9517,29.405,130.0,2024-10-24T21:06:52.953Z
6,monaco,Circuit de Monaco,Monte-Carlo,Monaco,43.7347,7.42056,7.0,2024-10-24T21:06:52.953Z
7,villeneuve,Circuit Gilles Villeneuve,Montreal,Canada,45.5,-73.5228,13.0,2024-10-24T21:06:52.953Z
8,magny_cours,Circuit de Nevers Magny-Cours,Magny Cours,France,46.8642,3.16361,228.0,2024-10-24T21:06:52.953Z
9,silverstone,Silverstone Circuit,Silverstone,UK,52.0786,-1.01694,153.0,2024-10-24T21:06:52.953Z
10,hockenheimring,Hockenheimring,Hockenheim,Germany,49.3278,8.56583,103.0,2024-10-24T21:06:52.953Z


### Write data to datalake as a Parquet file

In [0]:
circuits_final_df.write.parquet("abfss://processed@formulaonedl01.dfs.core.windows.net/circuits",mode="overwrite")

In [0]:
%fs
ls abfss://processed@formulaonedl01.dfs.core.windows.net/circuits

path,name,size,modificationTime
abfss://processed@formulaonedl01.dfs.core.windows.net/circuits/_SUCCESS,_SUCCESS,0,1729841630000
abfss://processed@formulaonedl01.dfs.core.windows.net/circuits/_committed_4195298824077143240,_committed_4195298824077143240,232,1729841630000
abfss://processed@formulaonedl01.dfs.core.windows.net/circuits/_committed_8774873030726592761,_committed_8774873030726592761,123,1729841411000
abfss://processed@formulaonedl01.dfs.core.windows.net/circuits/_started_4195298824077143240,_started_4195298824077143240,0,1729841629000
abfss://processed@formulaonedl01.dfs.core.windows.net/circuits/_started_8774873030726592761,_started_8774873030726592761,0,1729841408000
abfss://processed@formulaonedl01.dfs.core.windows.net/circuits/part-00000-tid-4195298824077143240-6ebd97a6-1f4b-4d59-8ac4-b8e54b5ea7ea-28-1-c000.snappy.parquet,part-00000-tid-4195298824077143240-6ebd97a6-1f4b-4d59-8ac4-b8e54b5ea7ea-28-1-c000.snappy.parquet,7904,1729841630000


In [0]:
display(spark.read.parquet("abfss://processed@formulaonedl01.dfs.core.windows.net/circuits"))

circuit_id,circuit_ref,name,location,country,latitude,longitude,altitude,ingestion_date
1,albert_park,Albert Park Grand Prix Circuit,Melbourne,Australia,-37.8497,144.968,10.0,2024-10-25T07:33:49.584Z
2,sepang,Sepang International Circuit,Kuala Lumpur,Malaysia,2.76083,101.738,18.0,2024-10-25T07:33:49.584Z
3,bahrain,Bahrain International Circuit,Sakhir,Bahrain,26.0325,50.5106,7.0,2024-10-25T07:33:49.584Z
4,catalunya,Circuit de Barcelona-Catalunya,Montmeló,Spain,41.57,2.26111,109.0,2024-10-25T07:33:49.584Z
5,istanbul,Istanbul Park,Istanbul,Turkey,40.9517,29.405,130.0,2024-10-25T07:33:49.584Z
6,monaco,Circuit de Monaco,Monte-Carlo,Monaco,43.7347,7.42056,7.0,2024-10-25T07:33:49.584Z
7,villeneuve,Circuit Gilles Villeneuve,Montreal,Canada,45.5,-73.5228,13.0,2024-10-25T07:33:49.584Z
8,magny_cours,Circuit de Nevers Magny-Cours,Magny Cours,France,46.8642,3.16361,228.0,2024-10-25T07:33:49.584Z
9,silverstone,Silverstone Circuit,Silverstone,UK,52.0786,-1.01694,153.0,2024-10-25T07:33:49.584Z
10,hockenheimring,Hockenheimring,Hockenheim,Germany,49.3278,8.56583,103.0,2024-10-25T07:33:49.584Z
