In [0]:
dbutils.widgets.text("p_data_source","")
v_data_source = dbutils.widgets.get("p_data_source")

In [0]:
# for incremental process of data setup, skipped 21st, went to 28th for incremental
dbutils.widgets.text("p_file_date","2021-03-28")
v_file_date = dbutils.widgets.get("p_file_date")

In [0]:
v_data_source

'testing'

##### Step 1 - Read the JSON file using spark dataframe reader API

In [0]:
%run "../../Includes/Configuration"

In [0]:
%run "../../Includes/Common Functions"

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType, FloatType

In [0]:
pit_stops_schema = StructType(fields=[
                                   StructField("raceId", StringType(),False),
                                   StructField("driverId", IntegerType(),True),
                                   StructField("stop", StringType(),True),
                                   StructField("lap", IntegerType(),True),
                                   StructField("time", StringType(),True),
                                   StructField("duration", StringType(),True),
                                   StructField("milliseconds", IntegerType(),True)
])

In [0]:
pit_stops_df = spark.read\
    .schema(pit_stops_schema)\
    .option("multiLine", True) \
    .json(f"{raw_folder_path}/{v_file_date}/pit_stops.json")
         

In [0]:
display(pit_stops_df)

raceId,driverId,stop,lap,time,duration,milliseconds
1053,839,1,1,15:05:16,30.866,30866
1053,20,1,3,15:10:09,32.024,32024
1053,854,1,5,15:15:11,51.007,51007
1053,853,1,12,15:27:20,31.168,31168
1053,842,1,14,15:30:10,31.068,31068
1053,20,2,20,15:39:11,31.184,31184
1053,854,2,21,15:41:24,32.479,32479
1053,20,3,22,15:42:52,39.502,39502
1053,853,2,23,15:45:20,31.5,31500
1053,852,1,25,15:46:39,30.696,30696


##### Step 2 - Rename columns and add new columns 


In [0]:
from pyspark.sql.functions import current_timestamp,lit

In [0]:
final_df = pit_stops_df.withColumnRenamed("driverId", "driver_id") \
                                   .withColumnRenamed("raceId", "race_id") \
                                   .withColumn("ingestion_date", current_timestamp())\
                                   .withColumn("data_source", lit(v_data_source))\
                                   .withColumn("file_date", lit(v_file_date))

##### Step 4 - Write to output to processed container in parquet format

In [0]:
#final_df.write.mode("overwrite").format("parquet").saveAsTable("f1_processed.pit_stops")
#overwrite_partition(result_final_df,'f1_processed','pit_stops','race_id')


In [0]:
merge_condition = "tgt.race_id = src.race_id AND tgt.driver_id = src.driver_id AND tgt.stop = src.stop" 
merge_delta_table(final_df, 'f1_processed', 'pit_stops', processed_folder_path, merge_condition, 'race_id') 

In [0]:
dbutils.notebook.exit("Success")

In [0]:
%sql
SELECT race_id, COUNT(1)
FROM f1_processed.pit_stops
GROUP BY race_id
ORDER BY race_id DESC;