In [0]:
dbutils.widgets.text("p_data_source", "")
v_data_source = dbutils.widgets.get("p_data_source")

In [0]:
dbutils.widgets.text("p_file_date", "2021-03-28")
v_file_date = dbutils.widgets.get("p_file_date")

In [0]:
%run "/databricks-course/Formula 1/ingestion-incremental/includes/configurations"

In [0]:
%run "/databricks-course/Formula 1/ingestion-incremental/includes/common_functions"

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

In [0]:
results_schema = StructType(fields=[StructField("resultId", IntegerType(), False),
                                    StructField("raceId", IntegerType(), True),
                                    StructField("driverId", IntegerType(), True),
                                    StructField("constructorId", IntegerType(), True),
                                    StructField("number", IntegerType(), True),
                                    StructField("grid", IntegerType(), True),
                                    StructField("position", IntegerType(), True),
                                    StructField("positionText", StringType(), True),
                                    StructField("positionOrder", IntegerType(), True),
                                    StructField("points", FloatType(), True),
                                    StructField("laps", IntegerType(), True),
                                    StructField("time", StringType(), True),
                                    StructField("milliseconds", IntegerType(), True),
                                    StructField("fastestLap", IntegerType(), True),
                                    StructField("rank", IntegerType(), True),
                                    StructField("fastestLapTime", StringType(), True),
                                    StructField("fastestLapSpeed", FloatType(), True),
                                    StructField("statusId", StringType(), True)])

In [0]:
results_df = spark.read \
.schema(results_schema) \
.json(f"{raw_folder_path}/{v_file_date}/results.json")

In [0]:
from pyspark.sql.functions import lit

In [0]:
results_with_columns_df = results_df.withColumnRenamed("resultId", "result_id") \
                                    .withColumnRenamed("raceId", "race_id") \
                                    .withColumnRenamed("driverId", "driver_id") \
                                    .withColumnRenamed("constructorId", "constructor_id") \
                                    .withColumnRenamed("positionText", "position_text") \
                                    .withColumnRenamed("positionOrder", "position_order") \
                                    .withColumnRenamed("fastestLap", "fastest_lap") \
                                    .withColumnRenamed("fastestLapTime", "fastest_lap_time") \
                                    .withColumnRenamed("fastestLapSpeed", "fastest_lap_speed") \
                                    .withColumn("data_source", lit(v_data_source)) \
                                    .withColumn("file_date", lit(v_file_date))

In [0]:
results_with_ingestion_date_df = add_ingestion_date(results_with_columns_df)

In [0]:
from pyspark.sql.functions import col

In [0]:
results_final_df = results_with_ingestion_date_df.drop(col("statusId")

### Incremental Loading - 2 Methods

In [0]:
# make a list of all the distinct race ids using the collect function
# now if the table f1_processed.results exists, then...
# we alter the table to drop the partition if the race id is already in the parition list.
# this way, we can ingest this partition again in the next code block
for race_id_list in results_final_df.select("race_id").distinct().collect():
    if (spark._jsparkSession.catalog().tableExists("f1_processed.results")):
        spark.sql(f"ALTER TABLE f1_processed.results DROP IF EXISTS PARTITION (race_id = {race_id_list.race_id})")

In [0]:
results_final_df.write.mode("append").partitionBy('race_id').format("parquet").saveAsTable("f1_processed.results")

### Incremental Load Method 2

In [0]:
output_df = rearrange_partition_column(result_df,'race_id')

In [0]:
overwrite_partition(results_final_df,'f2_processed','results','race_id')

This method is better than the first one because spark identiifies which partitions already exist during insertion of the data itself instead of looping over a list everytime and checking which partition is repeated