In [0]:
dbutils.widgets.text("p_file_date", "2021-03-21")
v_file_date = dbutils.widgets.get("p_file_date")

In [0]:
dbutils.widgets.text("p_data_source", "")
v_data_source = dbutils.widgets.get("p_data_source")

In [0]:
%run "./configuration"

In [0]:
%run "./common_functions"

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
from pyspark.sql.functions import lit

In [0]:
# Set up schema for datatypes

results_schema = StructType(fields=[
    StructField("resultId", IntegerType(), False),
    StructField("raceId", IntegerType(), False),
    StructField("driverId", IntegerType(), False),
    StructField("constructorId", IntegerType(), False),
    StructField("number", IntegerType(), True),
    StructField("grid", IntegerType(), False),
    StructField("position", IntegerType(), True),
    StructField("positionText", StringType(), False),
    StructField("positionOrder", IntegerType(), False),
    StructField("points", FloatType(), False),
    StructField("laps", IntegerType(), False),
    StructField("time", StringType(), True),
    StructField("milliseconds", IntegerType(), True),
    StructField("fastestLap", IntegerType(), True),
    StructField("rank", IntegerType(), True),
    StructField("fastestLapTime", StringType(), True),
    StructField("fastestLapSpeed", FloatType(), True),
    StructField("statusId", IntegerType(), False),
])

In [0]:
# Read in json file

results_df = spark.read.schema(results_schema).json(f"{raw_folder_path}/{v_file_date}/results.json")

In [0]:
# Dropping unnecessary columns

results_df = results_df.drop("statusId")

In [0]:
# Renaming and adding columns

results_df = results_df \
    .withColumnRenamed("resultId", "result_id") \
    .withColumnRenamed("raceId", "race_id") \
    .withColumnRenamed("driverId", "driver_id") \
    .withColumnRenamed("constructorId", "constructor_id") \
    .withColumnRenamed("positionText", "position_text") \
    .withColumnRenamed("positionOrder", "position_order") \
    .withColumnRenamed("fastestLap", "fastest_lap") \
    .withColumnRenamed("fastestLapTime", "fastest_lap_time") \
    .withColumnRenamed("fastestLapSpeed", "fastest_lap_speed") \
    .withColumn("data_source", lit(v_data_source)) \
    .withColumn("file_date", lit(v_file_date))

In [0]:
results_df = add_ingestion_date(results_df)

In [0]:
# Drop duplicates

results_duped_df = results_df.dropDuplicates(["race_id", "driver_id"])

In [0]:
display(results_df)

result_id,race_id,driver_id,constructor_id,number,grid,position,position_text,position_order,points,laps,time,milliseconds,fastest_lap,rank,fastest_lap_time,fastest_lap_speed,data_source,file_date,ingestion_date
24986,1053,830,9,33,3,1.0,1,1,25.0,63,2:02:34.598,7354598.0,60.0,2,1:17.524,227.96,Ergast,2021-04-18,2023-06-30T15:06:49.116+0000
24987,1053,1,131,44,1,2.0,2,2,19.0,63,+22.000,7376598.0,60.0,1,1:16.702,230.403,Ergast,2021-04-18,2023-06-30T15:06:49.116+0000
24988,1053,846,1,4,7,3.0,3,3,15.0,63,+23.702,7378300.0,63.0,3,1:18.259,225.819,Ergast,2021-04-18,2023-06-30T15:06:49.116+0000
24989,1053,844,6,16,4,4.0,4,4,12.0,63,+25.579,7380177.0,60.0,6,1:18.379,225.473,Ergast,2021-04-18,2023-06-30T15:06:49.116+0000
24990,1053,832,6,55,11,5.0,5,5,10.0,63,+27.036,7381634.0,60.0,7,1:18.490,225.154,Ergast,2021-04-18,2023-06-30T15:06:49.116+0000
24991,1053,817,1,3,6,6.0,6,6,8.0,63,+51.220,7405818.0,54.0,12,1:19.341,222.739,Ergast,2021-04-18,2023-06-30T15:06:49.116+0000
24992,1053,842,213,10,5,7.0,7,7,6.0,63,+52.818,7407416.0,52.0,9,1:18.994,223.718,Ergast,2021-04-18,2023-06-30T15:06:49.116+0000
24993,1053,840,117,18,10,8.0,8,8,4.0,63,+56.909,7411507.0,59.0,8,1:18.782,224.32,Ergast,2021-04-18,2023-06-30T15:06:49.116+0000
24994,1053,839,214,31,9,9.0,9,9,2.0,63,+65.704,7420302.0,62.0,15,1:19.422,222.512,Ergast,2021-04-18,2023-06-30T15:06:49.116+0000
24995,1053,4,214,14,15,10.0,10,10,1.0,63,+66.561,7421159.0,62.0,14,1:19.417,222.526,Ergast,2021-04-18,2023-06-30T15:06:49.116+0000


In [0]:
# Write to Parquet file and partition by racer_id column

results_df.write.mode("append").partitionBy("race_id").format("parquet").saveAsTable("f1_processed.results")

In [0]:
%sql
SELECT race_id, driver_id, COUNT(1)
FROM f1_processed.results
GROUP BY race_id, driver_id
HAVING COUNT(1) > 1
ORDER BY race_id, driver_id DESC

race_id,driver_id,count(1)


In [0]:
dbutils.notebook.exit("Success")