### Ingest qualifying json files

##### Step 1 - Read the JSON file using the spark dataframe reader API

In [38]:
from pandas import read_csv,read_json,concat
from glob import glob
from datetime import datetime
from lib import configuration
from lib import common_functions

In [39]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

In [40]:
spark = common_functions.get_spark_session()

In [41]:
results_schema = StructType(fields=[StructField("resultId", IntegerType(), False),
                                    StructField("raceId", IntegerType(), True),
                                    StructField("driverId", IntegerType(), True),
                                    StructField("constructorId", IntegerType(), True),
                                    StructField("number", IntegerType(), True),
                                    StructField("grid", IntegerType(), True),
                                    StructField("position", IntegerType(), True),
                                    StructField("positionText", StringType(), True),
                                    StructField("positionOrder", IntegerType(), True),
                                    StructField("points", FloatType(), True),
                                    StructField("laps", IntegerType(), True),
                                    StructField("time", StringType(), True),
                                    StructField("milliseconds", IntegerType(), True),
                                    StructField("fastestLap", IntegerType(), True),
                                    StructField("rank", IntegerType(), True),
                                    StructField("fastestLapTime", StringType(), True),
                                    StructField("fastestLapSpeed", FloatType(), True),
                                    StructField("statusId", StringType(), True)])

In [42]:
folder_path = f"{configuration.bronze_folder_path}/results"
all_files = glob(folder_path + "/*.json", recursive=False)

dfs = [read_json(file, lines=True) for file in all_files]
results_df = concat(dfs, ignore_index=True)
results_df

Unnamed: 0,resultId,raceId_join,driverId_join,constructorId_join,number,grid,position,positionText,positionOrder,points,laps,statusId
0,195001,British Grand Prix1950,farina,alfa,2,1,1,1,positionOrder,9.0,70,Finished
1,195001,British Grand Prix1950,fagioli,alfa,3,2,2,2,positionOrder,6.0,70,Finished
2,195001,British Grand Prix1950,reg_parnell,alfa,4,4,3,3,positionOrder,4.0,70,Finished
3,195001,British Grand Prix1950,cabantous,lago,14,6,4,4,positionOrder,3.0,68,+2 Laps
4,195001,British Grand Prix1950,rosier,lago,15,9,5,5,positionOrder,2.0,68,+2 Laps
...,...,...,...,...,...,...,...,...,...,...,...,...
26275,198413,Dutch Grand Prix1984,piquet,brabham,1,2,25,R,positionOrder,0.0,10,Oil pressure
26276,198413,Dutch Grand Prix1984,ghinzani,osella,24,21,26,R,positionOrder,0.0,8,Fuel pump
26277,198413,Dutch Grand Prix1984,alboreto,ferrari,27,9,27,R,positionOrder,0.0,7,Engine
26278,198414,Italian Grand Prix1984,lauda,mclaren,8,4,1,1,positionOrder,9.0,51,Finished


##### Step 2 - Rename columns and add new columns
1. Rename qualifyingId, driverId, constructorId and raceId
1. Add ingestion_date with current timestamp

In [43]:
results_df["data_source"] = configuration.v_data_source
#results_df["ingestion_date"] = common_functions.get_ingestion_date()
results_df

Unnamed: 0,resultId,raceId_join,driverId_join,constructorId_join,number,grid,position,positionText,positionOrder,points,laps,statusId,data_source
0,195001,British Grand Prix1950,farina,alfa,2,1,1,1,positionOrder,9.0,70,Finished,api
1,195001,British Grand Prix1950,fagioli,alfa,3,2,2,2,positionOrder,6.0,70,Finished,api
2,195001,British Grand Prix1950,reg_parnell,alfa,4,4,3,3,positionOrder,4.0,70,Finished,api
3,195001,British Grand Prix1950,cabantous,lago,14,6,4,4,positionOrder,3.0,68,+2 Laps,api
4,195001,British Grand Prix1950,rosier,lago,15,9,5,5,positionOrder,2.0,68,+2 Laps,api
...,...,...,...,...,...,...,...,...,...,...,...,...,...
26275,198413,Dutch Grand Prix1984,piquet,brabham,1,2,25,R,positionOrder,0.0,10,Oil pressure,api
26276,198413,Dutch Grand Prix1984,ghinzani,osella,24,21,26,R,positionOrder,0.0,8,Fuel pump,api
26277,198413,Dutch Grand Prix1984,alboreto,ferrari,27,9,27,R,positionOrder,0.0,7,Engine,api
26278,198414,Italian Grand Prix1984,lauda,mclaren,8,4,1,1,positionOrder,9.0,51,Finished,api


In [44]:
results_final_df = results_df.rename(columns={"resultId":"result_id","positionText":"position_text","positionOrder":"position_order","statusId":"status"})
results_final_df

Unnamed: 0,result_id,raceId_join,driverId_join,constructorId_join,number,grid,position,position_text,position_order,points,laps,status,data_source
0,195001,British Grand Prix1950,farina,alfa,2,1,1,1,positionOrder,9.0,70,Finished,api
1,195001,British Grand Prix1950,fagioli,alfa,3,2,2,2,positionOrder,6.0,70,Finished,api
2,195001,British Grand Prix1950,reg_parnell,alfa,4,4,3,3,positionOrder,4.0,70,Finished,api
3,195001,British Grand Prix1950,cabantous,lago,14,6,4,4,positionOrder,3.0,68,+2 Laps,api
4,195001,British Grand Prix1950,rosier,lago,15,9,5,5,positionOrder,2.0,68,+2 Laps,api
...,...,...,...,...,...,...,...,...,...,...,...,...,...
26275,198413,Dutch Grand Prix1984,piquet,brabham,1,2,25,R,positionOrder,0.0,10,Oil pressure,api
26276,198413,Dutch Grand Prix1984,ghinzani,osella,24,21,26,R,positionOrder,0.0,8,Fuel pump,api
26277,198413,Dutch Grand Prix1984,alboreto,ferrari,27,9,27,R,positionOrder,0.0,7,Engine,api
26278,198414,Italian Grand Prix1984,lauda,mclaren,8,4,1,1,positionOrder,9.0,51,Finished,api


In [45]:
drivers_df = read_csv(f'{configuration.silver_folder_path}/drivers.csv')
drivers_df = drivers_df[["driver_id","driver_ref"]]
drivers_df

Unnamed: 0,driver_id,driver_ref
0,1,abate
1,2,abecassis
2,3,acheson
3,4,adams
4,5,ader
...,...,...
854,855,zapico
855,856,zhou
856,857,zonta
857,858,zorzi


In [46]:
constructors_df = read_csv(f'{configuration.silver_folder_path}/constructors.csv')
constructors_df = constructors_df[["constructor_id","constructor_ref"]]
constructors_df

Unnamed: 0,constructor_id,constructor_ref
0,1,adams
1,2,afm
2,3,ags
3,4,alfa
4,5,alphatauri
...,...,...
207,208,watson
208,209,wetteroth
209,210,williams
210,211,wolf


In [47]:
race_circuits_df = read_csv(f'{configuration.silver_folder_path}/race_circuits.csv')
race_circuits_df = race_circuits_df[["race_id","race_id_join","race_date"]]
race_circuits_df

Unnamed: 0,race_id,race_id_join,race_date
0,81,German Grand Prix1959,1959-08-02
1,420,Australian Grand Prix1985,1985-11-03
2,436,Australian Grand Prix1986,1986-10-26
3,452,Australian Grand Prix1987,1987-11-15
4,468,Australian Grand Prix1988,1988-11-13
...,...,...,...
1120,319,Belgian Grand Prix1979,1979-05-13
1121,333,Belgian Grand Prix1980,1980-05-04
1122,347,Belgian Grand Prix1981,1981-05-17
1123,362,Belgian Grand Prix1982,1982-05-09


In [48]:
results_final_df = results_final_df.set_index("driverId_join").join(drivers_df.set_index("driver_ref"), lsuffix='driver_', how='inner').reset_index(drop=True)
results_final_df = results_final_df.set_index("constructorId_join").join(constructors_df.set_index("constructor_ref"), lsuffix='constructor_', how='inner').reset_index(drop=True)
results_final_df = results_final_df.set_index("raceId_join").join(race_circuits_df.set_index("race_id_join"), lsuffix='race_circuits_', how='inner').reset_index(drop=True)

results_final_df["ingestion_date"] = results_final_df["race_date"]

results_final_df = results_final_df[["result_id","race_id","driver_id","constructor_id","number","grid","position","position_text","position_order","points","laps","status","data_source","ingestion_date"]]
results_final_df

Unnamed: 0,result_id,race_id,driver_id,constructor_id,number,grid,position,position_text,position_order,points,laps,status,data_source,ingestion_date
0,202005,1023,317,4,99,19,17,17,positionOrder,0.0,51,+1 Lap,api,2020-08-09
1,202005,1023,634,4,7,20,15,15,positionOrder,0.0,51,+1 Lap,api,2020-08-09
2,202005,1023,305,5,10,7,11,11,positionOrder,0.0,52,Finished,api,2020-08-09
3,202005,1023,436,5,26,16,10,10,positionOrder,1.0,52,Finished,api,2020-08-09
4,202005,1023,456,75,16,8,4,4,positionOrder,12.0,52,Finished,api,2020-08-09
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
26275,202318,1097,671,140,63,5,5,5,positionOrder,10.0,56,Finished,api,2023-10-22
26276,202318,1097,803,168,1,6,1,1,positionOrder,25.0,56,Finished,api,2023-10-22
26277,202318,1097,595,168,11,9,4,4,positionOrder,12.0,56,Finished,api,2023-10-22
26278,202318,1097,11,210,23,15,9,9,positionOrder,2.0,56,Finished,api,2023-10-22


##### Step 3 - Write to output to processed container in parquet format

In [49]:
results_final_df.to_csv(f"{configuration.silver_folder_path}/results.csv", index=False)

In [50]:
df_parquet = read_csv(f'{configuration.silver_folder_path}/results.csv')
df_parquet

Unnamed: 0,result_id,race_id,driver_id,constructor_id,number,grid,position,position_text,position_order,points,laps,status,data_source,ingestion_date
0,202005,1023,317,4,99.0,19,17,17,positionOrder,0.0,51,+1 Lap,api,2020-08-09
1,202005,1023,634,4,7.0,20,15,15,positionOrder,0.0,51,+1 Lap,api,2020-08-09
2,202005,1023,305,5,10.0,7,11,11,positionOrder,0.0,52,Finished,api,2020-08-09
3,202005,1023,436,5,26.0,16,10,10,positionOrder,1.0,52,Finished,api,2020-08-09
4,202005,1023,456,75,16.0,8,4,4,positionOrder,12.0,52,Finished,api,2020-08-09
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
26275,202318,1097,671,140,63.0,5,5,5,positionOrder,10.0,56,Finished,api,2023-10-22
26276,202318,1097,803,168,1.0,6,1,1,positionOrder,25.0,56,Finished,api,2023-10-22
26277,202318,1097,595,168,11.0,9,4,4,positionOrder,12.0,56,Finished,api,2023-10-22
26278,202318,1097,11,210,23.0,15,9,9,positionOrder,2.0,56,Finished,api,2023-10-22
