###### `configure the file`

In [0]:
%run "playground/includes/configuration/"

In [0]:
%run "playground/includes/common_functions"

#### `Data Ingestion: constructors.json, drivers.json, results.json`

In [0]:
from pyspark.sql.types import *

# better to do StructType & StructField
# constructors_schema = StructType(fields=[StructField("constructorId", IntegerType(), False),
#                                         StructField("constructorRef", StringType(), True),
#                                         StructField("name", StringType(), True),
#                                         StructField("nationality", StringType(), True),
#                                         StructField("url", StringType(), True)])

# DDL
constructors_schema = "constructorId INT, constructorRef STRING, name STRING, nationality STRING, url STRING"

path = f"{raw_folder_path}/{v_file_date}/constructors.json"
constructors_df = spark.read.json(path=path, schema=constructors_schema)

constructors_df.display()

constructorId,constructorRef,name,nationality,url
1,mclaren,McLaren,British,http://en.wikipedia.org/wiki/McLaren
2,bmw_sauber,BMW Sauber,German,http://en.wikipedia.org/wiki/BMW_Sauber
3,williams,Williams,British,http://en.wikipedia.org/wiki/Williams_Grand_Prix_Engineering
4,renault,Renault,French,http://en.wikipedia.org/wiki/Renault_in_Formula_One
5,toro_rosso,Toro Rosso,Italian,http://en.wikipedia.org/wiki/Scuderia_Toro_Rosso
6,ferrari,Ferrari,Italian,http://en.wikipedia.org/wiki/Scuderia_Ferrari
7,toyota,Toyota,Japanese,http://en.wikipedia.org/wiki/Toyota_Racing
8,super_aguri,Super Aguri,Japanese,http://en.wikipedia.org/wiki/Super_Aguri_F1
9,red_bull,Red Bull,Austrian,http://en.wikipedia.org/wiki/Red_Bull_Racing
10,force_india,Force India,Indian,http://en.wikipedia.org/wiki/Racing_Point_Force_India


In [0]:
constructors_df.printSchema()

In [0]:
from pyspark.sql.functions import *

constructors_final_df = constructors_df.select(col("constructorId").alias("constructor_id"),
                                               col("constructorRef").alias("constructor_ref"), 
                                               "name", "nationality")
constructors_final_df = add_ingestion_date(constructors_final_df)

constructors_final_df = constructors_final_df.withColumn("file_date", lit(v_file_date))

constructors_final_df.display()

constructor_id,constructor_ref,name,nationality,ingestion_date
1,mclaren,McLaren,British,2022-05-30T06:04:37.100+0000
2,bmw_sauber,BMW Sauber,German,2022-05-30T06:04:37.100+0000
3,williams,Williams,British,2022-05-30T06:04:37.100+0000
4,renault,Renault,French,2022-05-30T06:04:37.100+0000
5,toro_rosso,Toro Rosso,Italian,2022-05-30T06:04:37.100+0000
6,ferrari,Ferrari,Italian,2022-05-30T06:04:37.100+0000
7,toyota,Toyota,Japanese,2022-05-30T06:04:37.100+0000
8,super_aguri,Super Aguri,Japanese,2022-05-30T06:04:37.100+0000
9,red_bull,Red Bull,Austrian,2022-05-30T06:04:37.100+0000
10,force_india,Force India,Indian,2022-05-30T06:04:37.100+0000


In [0]:
path = f"{processed_folder_path}/constructors/"
constructors_final_df.write.parquet(path=path, mode="overwrite", compression=None)

##### `drivers.json having nested objects`

In [0]:
name_schema = StructType(fields=[StructField("forename", StringType(), True),
                                StructField("surname", StringType(), True)])

drivers_schema = StructType(fields=[StructField("driverId", IntegerType(), False),
                                    StructField("driverRef", StringType(), True),
                                    StructField("number", IntegerType(), True),
                                    StructField("code", StringType(), True),
                                    StructField("name", name_schema, True), # nested fields
                                    StructField("dob", DateType(), True),
                                    StructField("nationality", StringType(), True),
                                    StructField("url", StringType(), True)])

path = f"{raw_folder_path}/{v_file_date}/drivers.json"
drivers_df = spark.read.json(path=path, schema=drivers_schema)
drivers_df.display()

driverId,driverRef,number,code,name,dob,nationality,url
1,hamilton,44.0,HAM,"List(Lewis, Hamilton)",1985-01-07,British,http://en.wikipedia.org/wiki/Lewis_Hamilton
2,heidfeld,,HEI,"List(Nick, Heidfeld)",1977-05-10,German,http://en.wikipedia.org/wiki/Nick_Heidfeld
3,rosberg,6.0,ROS,"List(Nico, Rosberg)",1985-06-27,German,http://en.wikipedia.org/wiki/Nico_Rosberg
4,alonso,14.0,ALO,"List(Fernando, Alonso)",1981-07-29,Spanish,http://en.wikipedia.org/wiki/Fernando_Alonso
5,kovalainen,,KOV,"List(Heikki, Kovalainen)",1981-10-19,Finnish,http://en.wikipedia.org/wiki/Heikki_Kovalainen
6,nakajima,,NAK,"List(Kazuki, Nakajima)",1985-01-11,Japanese,http://en.wikipedia.org/wiki/Kazuki_Nakajima
7,bourdais,,BOU,"List(Sébastien, Bourdais)",1979-02-28,French,http://en.wikipedia.org/wiki/S%C3%A9bastien_Bourdais
8,raikkonen,7.0,RAI,"List(Kimi, Räikkönen)",1979-10-17,Finnish,http://en.wikipedia.org/wiki/Kimi_R%C3%A4ikk%C3%B6nen
9,kubica,88.0,KUB,"List(Robert, Kubica)",1984-12-07,Polish,http://en.wikipedia.org/wiki/Robert_Kubica
10,glock,,GLO,"List(Timo, Glock)",1982-03-18,German,http://en.wikipedia.org/wiki/Timo_Glock


In [0]:
drivers_df.printSchema()

In [0]:
drivers_final_df = drivers_df\
.withColumnRenamed("driverid", "driver_id")\
.withColumnRenamed("driverref", "driver_ref")\
.withColumn("name", concat(col("name.forename"), lit(" "), col("name.surname")))\
.drop("url")\
.withColumn("file_date", lit(v_file_date))

drivers_final_df = add_ingestion_date(drivers_final_df)

drivers_final_df.display()

dirver_id,driver_ref,number,code,name,dob,nationality,ingestion_date
1,hamilton,44.0,HAM,Lewis Hamilton,1985-01-07,British,2022-05-30T06:04:50.018+0000
2,heidfeld,,HEI,Nick Heidfeld,1977-05-10,German,2022-05-30T06:04:50.018+0000
3,rosberg,6.0,ROS,Nico Rosberg,1985-06-27,German,2022-05-30T06:04:50.018+0000
4,alonso,14.0,ALO,Fernando Alonso,1981-07-29,Spanish,2022-05-30T06:04:50.018+0000
5,kovalainen,,KOV,Heikki Kovalainen,1981-10-19,Finnish,2022-05-30T06:04:50.018+0000
6,nakajima,,NAK,Kazuki Nakajima,1985-01-11,Japanese,2022-05-30T06:04:50.018+0000
7,bourdais,,BOU,Sébastien Bourdais,1979-02-28,French,2022-05-30T06:04:50.018+0000
8,raikkonen,7.0,RAI,Kimi Räikkönen,1979-10-17,Finnish,2022-05-30T06:04:50.018+0000
9,kubica,88.0,KUB,Robert Kubica,1984-12-07,Polish,2022-05-30T06:04:50.018+0000
10,glock,,GLO,Timo Glock,1982-03-18,German,2022-05-30T06:04:50.018+0000


In [0]:
path = f"{processed_folder_path}/drivers/"
drivers_final_df.write.parquet(path=path, mode="overwrite", partitionBy=None, compression=None)

##### `results.json`

In [0]:
results_schema = StructType(fields=[StructField("resultId", IntegerType(), False),
                                    StructField("raceId", IntegerType(), False),
                                    StructField("driverId", IntegerType(), False),
                                    StructField("constructorId", IntegerType(), False),
                                    StructField("number", IntegerType(), True),
                                    StructField("grid", IntegerType(), True),
                                    StructField("position", IntegerType(), True),
                                    StructField("positionText", StringType(), True),
                                    StructField("positionOrder", IntegerType(), True),
                                    StructField("points", IntegerType(), True),
                                    StructField("laps", IntegerType(), True),
                                    StructField("time", StringType(), True), # transformation purpose, StringType()
                                    StructField("milliseconds", StringType(), True), #  transformation purpose, StringType()
                                    StructField("fastestLap", IntegerType(), True),
                                    StructField("rank", IntegerType(), True),
                                    StructField("fastestLapTime", StringType(), True),
                                    StructField("fastestLapSpeed", FloatType(), True), # or DoubleType()
                                    StructField("statusId", IntegerType(), False),
                                   ])

path = f"{raw_folder_path}/{v_file_date}/results.json/"
results_df = spark.read.json(path=path, schema=results_schema)
results_df.display()

resultId,raceId,driverId,constructorId,number,grid,position,positionText,positionOrder,points,laps,time,milliseconds,fastestLap,rank,fastestLapTime,fastestLapSpeed,statusId
1,18,1,1,22,1,1.0,1,1,10,58,1:34:50.616,5690616,39.0,2.0,1:27.452,218.3,1
2,18,2,2,3,5,2.0,2,2,8,58,+5.478,5696094,41.0,3.0,1:27.739,217.586,1
3,18,3,3,7,7,3.0,3,3,6,58,+8.163,5698779,41.0,5.0,1:28.090,216.719,1
4,18,4,4,5,11,4.0,4,4,5,58,+17.181,5707797,58.0,7.0,1:28.603,215.464,1
5,18,5,1,23,3,5.0,5,5,4,58,+18.014,5708630,43.0,1.0,1:27.418,218.385,1
6,18,6,3,8,13,6.0,6,6,3,57,\N,\N,50.0,14.0,1:29.639,212.974,11
7,18,7,5,14,17,7.0,7,7,2,55,\N,\N,22.0,12.0,1:29.534,213.224,5
8,18,8,6,1,15,8.0,8,8,1,53,\N,\N,20.0,4.0,1:27.903,217.18,5
9,18,9,2,4,2,,R,9,0,47,\N,\N,15.0,9.0,1:28.753,215.1,4
10,18,10,7,12,18,,R,10,0,43,\N,\N,23.0,13.0,1:29.558,213.166,3


In [0]:
results_df.printSchema()

In [0]:
results_final_df = results_df.withColumnRenamed("resultId", "result_id")\
.withColumnRenamed("raceId", "race_id")\
.withColumnRenamed("driverId", "driver_id")\
.withColumnRenamed("constructorId", "constructor_id")\
.withColumnRenamed("positionText", "position_text")\
.withColumnRenamed("positionOrder", "position_order")\
.withColumnRenamed("fastestLap", "fastest_lap")\
.withColumnRenamed("fastestLapTime", "fastest_lap_time")\
.withColumnRenamed("fastestLapSpeed", "fastest_lap_speed")\
.drop("statusId")\
.withColumn("file_date", lit(v_file_date))

results_final_df = add_ingestion_date(results_final_df)

results_final_df.display()

result_id,race_id,driver_id,constructor_id,number,grid,position,position_text,position_order,points,laps,time,milliseconds,fastest_lap,rank,fastest_lap_time,fastest_lap_speed,ingestion_date
1,18,1,1,22,1,1.0,1,1,10,58,1:34:50.616,5690616,39.0,2.0,1:27.452,218.3,2022-05-30T06:04:53.701+0000
2,18,2,2,3,5,2.0,2,2,8,58,+5.478,5696094,41.0,3.0,1:27.739,217.586,2022-05-30T06:04:53.701+0000
3,18,3,3,7,7,3.0,3,3,6,58,+8.163,5698779,41.0,5.0,1:28.090,216.719,2022-05-30T06:04:53.701+0000
4,18,4,4,5,11,4.0,4,4,5,58,+17.181,5707797,58.0,7.0,1:28.603,215.464,2022-05-30T06:04:53.701+0000
5,18,5,1,23,3,5.0,5,5,4,58,+18.014,5708630,43.0,1.0,1:27.418,218.385,2022-05-30T06:04:53.701+0000
6,18,6,3,8,13,6.0,6,6,3,57,\N,\N,50.0,14.0,1:29.639,212.974,2022-05-30T06:04:53.701+0000
7,18,7,5,14,17,7.0,7,7,2,55,\N,\N,22.0,12.0,1:29.534,213.224,2022-05-30T06:04:53.701+0000
8,18,8,6,1,15,8.0,8,8,1,53,\N,\N,20.0,4.0,1:27.903,217.18,2022-05-30T06:04:53.701+0000
9,18,9,2,4,2,,R,9,0,47,\N,\N,15.0,9.0,1:28.753,215.1,2022-05-30T06:04:53.701+0000
10,18,10,7,12,18,,R,10,0,43,\N,\N,23.0,13.0,1:29.558,213.166,2022-05-30T06:04:53.701+0000


In [0]:
%%time
path = f"{processed_folder_path}/results/"
results_final_df.write.parquet(path=path, mode="overwrite", partitionBy="race_id", compression=None)

##### `pit_stops.json` having multi-lines

In [0]:
pit_stops_schema = StructType(fields=[StructField(name="raceId", dataType=IntegerType(), nullable=False),
                                      StructField(name="driverId", dataType=IntegerType(), nullable=False),
                                      StructField(name="stop", dataType=IntegerType(), nullable=True),
                                      StructField(name="lap", dataType=IntegerType(), nullable=True),
                                      StructField(name="time", dataType=StringType(), nullable=True),
                                      StructField(name="duration", dataType=StringType(), nullable=True), 
                                      # if duration is IntegerType(), DoubleType() or FloatType() then data is not readable
                                      StructField(name="milliseconds", dataType=IntegerType(), nullable=True)
                                     ])

path = f"{raw_folder_path}/{v_file_date}/pit_stops.json/"
pit_stops_df = spark.read.json(path=path, 
                               schema=pit_stops_schema,
                               multiLine=True
                              )
pit_stops_df.display()

raceId,driverId,stop,lap,time,duration,milliseconds
841,153,1,1,17:05:23,26.898,26898
841,30,1,1,17:05:52,25.021,25021
841,17,1,11,17:20:48,23.426,23426
841,4,1,12,17:22:34,23.251,23251
841,13,1,13,17:24:10,23.842,23842
841,22,1,13,17:24:29,23.643,23643
841,20,1,14,17:25:17,22.603,22603
841,814,1,14,17:26:03,24.863,24863
841,816,1,14,17:26:50,25.259,25259
841,67,1,15,17:27:34,25.342,25342


##### `cast()`

In [0]:
pit_stops_final_df = pit_stops_df\
.withColumnRenamed("raceId", "race_id")\
.withColumnRenamed("driverId", "driver_id")\
.withColumn("duration", col("duration").cast("double"))\
.withColumn("milliseconds", col("milliseconds").cast("long"))\
.withColumn("file_date", lit(v_file_date))

pit_stops_final_df = add_ingestion_date(pit_stops_final_df)

pit_stops_final_df.display()

race_id,driver_id,stop,lap,time,duration,milliseconds,ingestion_date
841,153,1,1,17:05:23,26.898,26898,2022-05-30T06:06:37.567+0000
841,30,1,1,17:05:52,25.021,25021,2022-05-30T06:06:37.567+0000
841,17,1,11,17:20:48,23.426,23426,2022-05-30T06:06:37.567+0000
841,4,1,12,17:22:34,23.251,23251,2022-05-30T06:06:37.567+0000
841,13,1,13,17:24:10,23.842,23842,2022-05-30T06:06:37.567+0000
841,22,1,13,17:24:29,23.643,23643,2022-05-30T06:06:37.567+0000
841,20,1,14,17:25:17,22.603,22603,2022-05-30T06:06:37.567+0000
841,814,1,14,17:26:03,24.863,24863,2022-05-30T06:06:37.567+0000
841,816,1,14,17:26:50,25.259,25259,2022-05-30T06:06:37.567+0000
841,67,1,15,17:27:34,25.342,25342,2022-05-30T06:06:37.567+0000


In [0]:
pit_stops_final_df.printSchema()

In [0]:
path = f"{processed_folder_path}/pit_stops/"
pit_stops_final_df.write.parquet(path=path, mode="overwrite")

In [0]:
dbutils.notebook.exit("Data Ingestion - json")

Data Ingestion - json