#### Step 1 - Ingest JSON file

In [0]:
%run "../includes/configuration"

In [0]:
%run "../includes/common_functions"

In [0]:
#Let's import some packages
from pyspark.sql.types import *
from pyspark.sql.functions import *

results_schema = StructType( fields = [
    StructField("resultId", IntegerType(),False),
    StructField("raceId", IntegerType(), True),
    StructField("driverId", IntegerType(), True),
    StructField("constructorId", IntegerType(), True),
    StructField("number", IntegerType(), True),
    StructField("grid", IntegerType(), True),
    StructField("position", IntegerType(), True),
    StructField("positionText", StringType(), True),
    StructField("positionOrder", IntegerType(), True),
    StructField("points", DoubleType(), True),
    StructField("laps", IntegerType(), True),
    StructField("time", StringType(), True),
    StructField("miliseconds", IntegerType(), True),
    StructField("fastestLap", IntegerType(), True),
    StructField("rank", IntegerType(), True),
    StructField("fastestLaptime", StringType(), True),
    StructField("fastestLapSpeed", StringType(), True),
    StructField("statusId", IntegerType(), True)
])

results_df = spark.read \
.schema(results_schema)\
.json(f"{raw_folder_path}/results.json")

#### Step 2 - Rename Columns

In [0]:
results_renamed_df = add_ingestion_date(results_df).withColumnRenamed("resultid","result_id")\
.withColumnRenamed("raceId", "race_id")\
.withColumnRenamed("driverId", "driver_id")\
.withColumnRenamed("constructorId","constructor_id")\
.withColumnRenamed("positionText","position_text")\
.withColumnRenamed("positionOrder","position_order")\
.withColumnRenamed("fastestLap","fastest_lap")\
.withColumnRenamed("fastestLapTime","fastest_lap_time")\
.withColumnRenamed("FastestLapSpeed","fastest_lap_speed")

#### Step 3 - Drop unwanted columns

In [0]:
results_final_df = results_renamed_df.drop("statusId")

In [0]:
results_final_df.write.mode("overwrite").partitionBy('race_id').parquet(f"{processed_folder_path}/results")

In [0]:
results_final_df.write.mode("overwrite").format("parquet").saveAsTable("f1_processed.resultsSQL")

#### Step 6 - Send exit statement for any dbutils.notebook.run cells

In [0]:
dbutils.notebook.exit("Success")

Success