In [0]:
# spark.read.json("/mnt/formula1dlajay/raw/2021-03-21/results.json").createOrReplaceTempView("results_cutover")

In [0]:
# %sql
# select raceId, count(1) from results_cutover group by raceId order by raceId desc;

In [0]:
# spark.read.json("/mnt/formula1dlajay/raw/2021-03-28/results.json").createOrReplaceTempView("results_w1")

In [0]:
# %sql
# select raceId, count(1) from results_w1 group by raceId order by raceId desc;

In [0]:
# spark.read.json("/mnt/formula1dlajay/raw/2021-04-18/results.json").createOrReplaceTempView("results_w2")

In [0]:
# %sql
# select raceId, count(1) from results_w2 group by raceId order by raceId desc;

In [0]:
%run "../includes/configuration"

In [0]:
%run "../includes/common_functions"

In [0]:
dbutils.widgets.text("p_data_source","")
v_data_source = dbutils.widgets.get("p_data_source")

In [0]:
dbutils.widgets.text("p_file_date","2021-03-28")
v_file_date = dbutils.widgets.get("p_file_date")

##### Step 1 - Read the JSON file using the spark dataframe reader API

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

In [0]:
results_schema = StructType(fields=[StructField("resultId", IntegerType(), False),
                                    StructField("raceId", IntegerType(), True),
                                    StructField("driverId", IntegerType(), True),
                                    StructField("constructorId", IntegerType(), True),
                                    StructField("number", IntegerType(), True),
                                    StructField("grid", IntegerType(), True),
                                    StructField("position", IntegerType(), True),
                                    StructField("positionText", StringType(), True),
                                    StructField("positionOrder", IntegerType(), True),
                                    StructField("points", FloatType(), True),
                                    StructField("laps", IntegerType(), True),
                                    StructField("time", StringType(), True),
                                    StructField("milliseconds", IntegerType(), True),
                                    StructField("fastestLap", IntegerType(), True),
                                    StructField("rank", IntegerType(), True),
                                    StructField("fastestLapTime", StringType(), True),
                                    StructField("fastestLapSpeed", FloatType(), True),
                                    StructField("statusId", StringType(), True)])

In [0]:
results_df = spark.read \
.schema(results_schema) \
.json(f"{raw_folder_path}/{v_file_date}/results.json")

##### Step 2 - Rename columns and add new columns

In [0]:
from pyspark.sql.functions import lit

In [0]:
results_with_columns_df = results_df.withColumnRenamed("resultId", "result_id") \
                                    .withColumnRenamed("raceId", "race_id") \
                                    .withColumnRenamed("driverId", "driver_id") \
                                    .withColumnRenamed("constructorId", "constructor_id") \
                                    .withColumnRenamed("positionText", "position_text") \
                                    .withColumnRenamed("positionOrder", "position_order") \
                                    .withColumnRenamed("fastestLap", "fastest_lap") \
                                    .withColumnRenamed("fastestLapTime", "fastest_lap_time") \
                                    .withColumnRenamed("fastestLapSpeed", "fastest_lap_speed") \
                                    .withColumn("data_source", lit(v_data_source)) \
                                    .withColumn("file_date",lit(v_file_date))

In [0]:
results_with_columns_df = add_ingestion_date(results_with_columns_df)

##### Step 3 - Drop the unwanted column

In [0]:
from pyspark.sql.functions import col

In [0]:
results_final_df = results_with_columns_df.drop(col("statusId"))

In [0]:
display(results_final_df)

result_id,race_id,driver_id,constructor_id,number,grid,position,position_text,position_order,points,laps,time,milliseconds,fastest_lap,rank,fastest_lap_time,fastest_lap_speed,data_source,file_date,ingestion_date
24986,1053,830,9,33,3,1.0,1,1,25.0,63,2:02:34.598,7354598.0,60.0,2,1:17.524,227.96,testing,2021-04-18,2024-02-04T08:21:10.858Z
24987,1053,1,131,44,1,2.0,2,2,19.0,63,+22.000,7376598.0,60.0,1,1:16.702,230.403,testing,2021-04-18,2024-02-04T08:21:10.858Z
24988,1053,846,1,4,7,3.0,3,3,15.0,63,+23.702,7378300.0,63.0,3,1:18.259,225.819,testing,2021-04-18,2024-02-04T08:21:10.858Z
24989,1053,844,6,16,4,4.0,4,4,12.0,63,+25.579,7380177.0,60.0,6,1:18.379,225.473,testing,2021-04-18,2024-02-04T08:21:10.858Z
24990,1053,832,6,55,11,5.0,5,5,10.0,63,+27.036,7381634.0,60.0,7,1:18.490,225.154,testing,2021-04-18,2024-02-04T08:21:10.858Z
24991,1053,817,1,3,6,6.0,6,6,8.0,63,+51.220,7405818.0,54.0,12,1:19.341,222.739,testing,2021-04-18,2024-02-04T08:21:10.858Z
24992,1053,842,213,10,5,7.0,7,7,6.0,63,+52.818,7407416.0,52.0,9,1:18.994,223.718,testing,2021-04-18,2024-02-04T08:21:10.858Z
24993,1053,840,117,18,10,8.0,8,8,4.0,63,+56.909,7411507.0,59.0,8,1:18.782,224.32,testing,2021-04-18,2024-02-04T08:21:10.858Z
24994,1053,839,214,31,9,9.0,9,9,2.0,63,+65.704,7420302.0,62.0,15,1:19.422,222.512,testing,2021-04-18,2024-02-04T08:21:10.858Z
24995,1053,4,214,14,15,10.0,10,10,1.0,63,+66.561,7421159.0,62.0,14,1:19.417,222.526,testing,2021-04-18,2024-02-04T08:21:10.858Z


##### Step 4 - Write to output to processed container in parquet format


#### Method 1

In [0]:
# for race_id_list in results_final_df.select("race_id").distinct().collect():    #use for small data because collect() will store in driver node
#     if (spark._jsparkSession.catalog().tableExists("f1_processed.results")):
#       spark.sql(f"alter table f1_processed.results drop if exists partition (race_id = {race_id_list.race_id})")

In [0]:
# results_final_df.write.mode("overwrite").partitionBy('race_id').parquet(f"{processed_folder_path}/results")

# results_final_df.write.mode("overwrite").partitionBy('race_id').format("parquet").saveAsTable("f1_processed.results")

# results_final_df.write.mode("append").partitionBy('race_id').format("parquet").saveAsTable("f1_processed.results") #for append

#### Method 2

In [0]:
%sql
-- drop table f1_processed.results;

In [0]:
# results_final_df.schema.names

In [0]:
# def re_arrange_columns(input_df,partition_column): #used from common functions files
#     column_list = []
#     for column_name in input_df.schema.names:
#         if column_name != partition_column:
#             column_list.append(column_name)
#         column_list.append(partition_column)
#     output_df = input_df.select(column_list)
#     return output_df

In [0]:
# output_df = re_arrange_columns(results_final_df,"race_id")

In [0]:
# def overwrite_partition(input_df, db_name, table_name, partition_column): #used from common functions files
#   output_df = re_arrange_columns(input_df, partition_column)
#   spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
#   if (spark._jsparkSession.catalog().tableExists(f"{db_name}.{table_name}")):
#     output_df.write.mode("overwrite").insertInto(f"{db_name}.{table_name}")
#   else:
#     output_df.write.mode("overwrite").partitionBy(partition_column).format("parquet").saveAsTable(f"{db_name}.{table_name}")


In [0]:
# overwrite_partition(results_final_df,"f1_processed","results","race_id")
# overwrite_function(results_final_df, 'f1_processed', 'results', 'race_id')

#by using partition column in merge statement then query will be quiker -> tgt.race_id = src.race_id

# spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning","true")
# from delta.tables import DeltaTable

# if (spark._jsparkSession.catalog().tableExists("f1_processed.results")):
#    deltaTable = DeltaTable.forPath(spark,"/mnt/formula1dlajay/processed/results")
#    deltaTable.alias('tgt').merge(
#        results_final_df.alias("src"),
#        "tgt.result_id = src.result_id and tgt.race_id = src.race_id") \
#        .whenMatchedUpdateAll() \
#        .whenNotMatchedInsertAll() \
#        .execute() 
# else:
#       results_final_df.write.mode("overwrite").partitionBy("race_id").format("delta").saveAsTable("f1_processed.results")

In [0]:
merge_condition = "tgt.result_id = src.result_id and tgt.race_id = src.race_id"
merge_delta_data(results_final_df, "f1_processed", "results", "race_id",processed_folder_path, merge_condition)

In [0]:
%sql
select race_id, count(1) from f1_processed.results
group by race_id
order by race_id desc;

race_id,count(1)
1053,20
1052,20
1047,20
1046,20
1045,20
1044,20
1043,20
1042,20
1041,20
1040,20


In [0]:
dbutils.notebook.exit("Success")

In [0]:
# %sql
# drop table f1_processed.results;