## Ingested Multi-Line JSON Files

##### Step-01 -- Read JSON File and specify the schema

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [0]:
pit_stops_schema = StructType(fields =[StructField("raceId",IntegerType(),False),
                                       StructField("driverId",IntegerType(),True),
                                       StructField("stop",StringType(),True),
                                       StructField("lap",IntegerType(),True),
                                       StructField("time",StringType(),True),
                                       StructField("duration",StringType(),True),
                                       StructField("milliseconds",IntegerType(),True),
                                       
    
] )

In [0]:
pit_stops_df = spark.read \
.schema(pit_stops_schema) \
.option("multiline",True) \
.json('/mnt/formula1dlbyumar136/raw/pit_stops.json')

In [0]:
display(pit_stops_df)

raceId,driverId,stop,lap,time,duration,milliseconds
841,153,1,1,17:05:23,26.898,26898
841,30,1,1,17:05:52,25.021,25021
841,17,1,11,17:20:48,23.426,23426
841,4,1,12,17:22:34,23.251,23251
841,13,1,13,17:24:10,23.842,23842
841,22,1,13,17:24:29,23.643,23643
841,20,1,14,17:25:17,22.603,22603
841,814,1,14,17:26:03,24.863,24863
841,816,1,14,17:26:50,25.259,25259
841,67,1,15,17:27:34,25.342,25342


##### Step-02 Rename and add column

In [0]:
from pyspark.sql.functions import current_timestamp

In [0]:
add_columns_df = pit_stops_df.withColumnRenamed("resultId","result_id") \
                           .withColumnRenamed("raceId","race_id") \
                            .withColumn("ingestion_date",current_timestamp())

In [0]:
display (add_columns_df)

race_id,driverId,stop,lap,time,duration,milliseconds,ingestion_date
841,153,1,1,17:05:23,26.898,26898,2022-09-16T13:58:02.804+0000
841,30,1,1,17:05:52,25.021,25021,2022-09-16T13:58:02.804+0000
841,17,1,11,17:20:48,23.426,23426,2022-09-16T13:58:02.804+0000
841,4,1,12,17:22:34,23.251,23251,2022-09-16T13:58:02.804+0000
841,13,1,13,17:24:10,23.842,23842,2022-09-16T13:58:02.804+0000
841,22,1,13,17:24:29,23.643,23643,2022-09-16T13:58:02.804+0000
841,20,1,14,17:25:17,22.603,22603,2022-09-16T13:58:02.804+0000
841,814,1,14,17:26:03,24.863,24863,2022-09-16T13:58:02.804+0000
841,816,1,14,17:26:50,25.259,25259,2022-09-16T13:58:02.804+0000
841,67,1,15,17:27:34,25.342,25342,2022-09-16T13:58:02.804+0000


##### Step 03 - Save into Processed Container in Parquet format

In [0]:
add_columns_df.write.mode("overwrite").parquet("/mnt/formula1dlbyumar136/processed/pit_stops")

In [0]:
display(spark.read.parquet("/mnt/formula1dlbyumar136/processed/pit_stops"))

race_id,driverId,stop,lap,time,duration,milliseconds,ingestion_date
841,153,1,1,17:05:23,26.898,26898,2022-09-16T14:00:02.410+0000
841,30,1,1,17:05:52,25.021,25021,2022-09-16T14:00:02.410+0000
841,17,1,11,17:20:48,23.426,23426,2022-09-16T14:00:02.410+0000
841,4,1,12,17:22:34,23.251,23251,2022-09-16T14:00:02.410+0000
841,13,1,13,17:24:10,23.842,23842,2022-09-16T14:00:02.410+0000
841,22,1,13,17:24:29,23.643,23643,2022-09-16T14:00:02.410+0000
841,20,1,14,17:25:17,22.603,22603,2022-09-16T14:00:02.410+0000
841,814,1,14,17:26:03,24.863,24863,2022-09-16T14:00:02.410+0000
841,816,1,14,17:26:50,25.259,25259,2022-09-16T14:00:02.410+0000
841,67,1,15,17:27:34,25.342,25342,2022-09-16T14:00:02.410+0000
