# Ingest pit_stops.json file

In [0]:
dbutils.widgets.dropdown("Environment", "Dev", ["Prod", "Dev", "Test"], "Environment")
env = dbutils.widgets.get("Environment")

In [0]:
dbutils.widgets.text("p_file_date", '2021-03-21')
v_file_date = dbutils.widgets.get("p_file_date")

In [0]:
%run "../includes/configuration"

In [0]:
%run "../includes/common_functions"

### Step 1 - Read the JSON file using the spark dataframe reader API

In [0]:
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
from pyspark.sql import functions as f

In [0]:
pit_stops_schema = StructType(fields=[StructField("raceId", IntegerType(), True),
                                      StructField("driverId", IntegerType(), True),
                                      StructField("stop", IntegerType(), True),
                                      StructField("lap", IntegerType(), True),
                                      StructField("time", StringType(), True),
                                      StructField("duration", StringType(), False),
                                      StructField("milliseconds", IntegerType(), False),
                                     ])

In [0]:
df = spark.read.schema(pit_stops_schema).option("multiline", True).json(f"{raw_folder_path}/{v_file_date}/pit_stops.json")

In [0]:
df.show(5)

+------+--------+----+---+--------+--------+------------+
|raceId|driverId|stop|lap|    time|duration|milliseconds|
+------+--------+----+---+--------+--------+------------+
|  1053|     839|   1|  1|15:05:16|  30.866|       30866|
|  1053|      20|   1|  3|15:10:09|  32.024|       32024|
|  1053|     854|   1|  5|15:15:11|  51.007|       51007|
|  1053|     853|   1| 12|15:27:20|  31.168|       31168|
|  1053|     842|   1| 14|15:30:10|  31.068|       31068|
+------+--------+----+---+--------+--------+------------+
only showing top 5 rows



### Step 2 - Rename columns and add new columns

In [0]:
transformed_df = df.withColumnRenamed("raceId", "race_id").withColumnRenamed("driverId", "driver_id").withColumn("env", f.lit(env)).withColumn("ingestion_date", f.current_timestamp()).withColumn("file_date", f.lit(v_file_date))

In [0]:
transformed_df.show(5)

+-------+---------+----+---+--------+--------+------------+----+--------------------+----------+
|race_id|driver_id|stop|lap|    time|duration|milliseconds| env|      ingestion_date| file_date|
+-------+---------+----+---+--------+--------+------------+----+--------------------+----------+
|   1053|      839|   1|  1|15:05:16|  30.866|       30866|Test|2023-06-17 16:52:...|2021-04-18|
|   1053|       20|   1|  3|15:10:09|  32.024|       32024|Test|2023-06-17 16:52:...|2021-04-18|
|   1053|      854|   1|  5|15:15:11|  51.007|       51007|Test|2023-06-17 16:52:...|2021-04-18|
|   1053|      853|   1| 12|15:27:20|  31.168|       31168|Test|2023-06-17 16:52:...|2021-04-18|
|   1053|      842|   1| 14|15:30:10|  31.068|       31068|Test|2023-06-17 16:52:...|2021-04-18|
+-------+---------+----+---+--------+--------+------------+----+--------------------+----------+
only showing top 5 rows



### Step 3 - Write the output to processed container in parquet format

In [0]:
# dbutils.fs.rm("dbfs:/FileStore/tables/processed/pit_stops", True)

# transformed_df \
# .write \
# .mode("overwrite") \
# .parquet(f"{processed_folder_path}/pit_stops.parquet")

# transformed_df.write.mode("overwrite").format("parquet").saveAsTable("f1_processed.pit_stops")

In [0]:
# overwrite_partition(transformed_df, db_name='f1_processed', table_name='pit_stops', partition_column='race_id')
merge_condition = "tgt.driver_id = src.driver_id AND tgt.stop = src.stop AND tgt.race_id = src.race_id"
merge_delta_data(transformed_df, 'f1_processed', 'pit_stops', processed_folder_path, merge_condition, 'race_id')

In [0]:
%sql
-- DROP table f1_processed.pit_stops;
SELECT race_id, count(1) FROM f1_processed.pit_stops
GROUP BY 1
ORDER BY 1 DESC;

race_id,count(1)
1053,56
1052,40
1047,23
1046,39
1045,57
1044,38
1043,30
1042,25
1041,33
1040,24
