In [0]:
#Step 1 - Read the JSON file using the spark dataframe reader API

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

In [0]:
results_schema = StructType(fields=[StructField("resultId", IntegerType(), False),
                                    StructField("raceId", IntegerType(), True),
                                    StructField("driverId", IntegerType(), True),
                                    StructField("constructorId", IntegerType(), True),
                                    StructField("number", IntegerType(), True),
                                    StructField("grid", IntegerType(), True),
                                    StructField("position", IntegerType(), True),
                                    StructField("positionText", StringType(), True),
                                    StructField("positionOrder", IntegerType(), True),
                                    StructField("points", FloatType(), True),
                                    StructField("laps", IntegerType(), True),
                                    StructField("time", StringType(), True),
                                    StructField("milliseconds", IntegerType(), True),
                                    StructField("fastestLap", IntegerType(), True),
                                    StructField("rank", IntegerType(), True),
                                    StructField("fastestLapTime", StringType(), True),
                                    StructField("fastestLapSpeed", FloatType(), True),
                                    StructField("statusId", StringType(), True)])

In [0]:
results_df = spark.read.json("dbfs:/FileStore/results.json" , schema = results_schema )

In [0]:
#Step 2 - Rename columns and add new columns

In [0]:
from pyspark.sql.functions import lit , current_timestamp

In [0]:
results_with_columns_df = results_df.withColumnRenamed("resultId", "result_id") \
                                    .withColumnRenamed("raceId", "race_id") \
                                    .withColumnRenamed("driverId", "driver_id") \
                                    .withColumnRenamed("constructorId", "constructor_id") \
                                    .withColumnRenamed("positionText", "position_text") \
                                    .withColumnRenamed("positionOrder", "position_order") \
                                    .withColumnRenamed("fastestLap", "fastest_lap") \
                                    .withColumnRenamed("fastestLapTime", "fastest_lap_time") \
                                    .withColumnRenamed("fastestLapSpeed", "fastest_lap_speed") 
                                    

In [0]:
results_with_ingestion_date_df = results_with_columns_df.withColumn("ingestion_date", current_timestamp())

In [0]:
#Step 3 - Drop the unwanted column

In [0]:
from pyspark.sql.functions import col

In [0]:
results_final_df = results_with_ingestion_date_df.drop(col("statusId"))

In [0]:
results_deduped_df = results_final_df.dropDuplicates(['race_id', 'driver_id'])

In [0]:
results_deduped_df.show(truncate=False)

+---------+-------+---------+--------------+------+----+--------+-------------+--------------+------+----+---------+------------+-----------+----+----------------+-----------------+----------------------+
|result_id|race_id|driver_id|constructor_id|number|grid|position|position_text|position_order|points|laps|time     |milliseconds|fastest_lap|rank|fastest_lap_time|fastest_lap_speed|ingestion_date        |
+---------+-------+---------+--------------+------+----+--------+-------------+--------------+------+----+---------+------------+-----------+----+----------------+-----------------+----------------------+
|7573     |1      |1        |1             |1     |18  |null    |D            |20            |0.0   |58  |\N       |null        |39         |13  |1:29.020        |214.455          |2024-04-23 04:09:58.81|
|7563     |1      |2        |2             |6     |9   |10      |10           |10            |0.0   |58  |+7.085   |5662869     |48         |5   |1:28.283        |216.245          

In [0]:
#Step 4 - Write to output to processed container in parquet format

In [0]:
results_deduped_df.write.parquet("dbfs:/FileStore/Formula1/processed/f1_processed.result") 

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-2747915330769759>:1[0m
[0;32m----> 1[0m [43mresults_deduped_df[49m[38;5;241;43m.[39;49m[43mwrite[49m[38;5;241;43m.[39;49m[43mparquet[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mdbfs:/FileStore/Formula1/processed/f1_processed.result[39;49m[38;5;124;43m"[39;49m[43m)[49m

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=[39m [43mfunc[49m[43m([49m[38;5;241;43m*[39;49m[43margs[49m[43m,[49m[43m [49m[38;5;241;43m*[39;49m[38;5;241;43m*[39;49m[43mkwargs[49m[43m)[49m
[1;32m     49[0m     logger[38;5