In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
dbutils.widgets.text('p_data_source', '')
v_data_source = dbutils.widgets.get('p_data_source')

In [0]:
dbutils.widgets.text('p_file_date', '2021-03-21')
v_file_date = dbutils.widgets.get('p_file_date')

In [0]:
%run "../includes/configuration"

In [0]:
%run "../includes/common_functions"

In [0]:
results_schema = StructType(
    [
        StructField('resultId', IntegerType(), False),
        StructField('raceId', IntegerType(), False),
        StructField('driverId', IntegerType(), False),
        StructField('constructorId',IntegerType(), False),
        StructField('number', IntegerType(), True),
        StructField('grid', IntegerType(), False),
        StructField('position', IntegerType(), True),
        StructField('positionText', StringType(), False),
        StructField('positionOrder', IntegerType(), False),
        StructField('points', FloatType(), False),
        StructField('laps', IntegerType(), False),
        StructField('time', StringType(), True),
        StructField('milliseconds', IntegerType(), True),
        StructField('fastestLap', IntegerType(), True),
        StructField('rank', IntegerType(), True),
        StructField('fastestLapTime', StringType(), True),
        StructField('fastestLapSpeed', StringType(), True),
        StructField('statusId', IntegerType(), False)
    ]
)

In [0]:
result_df = spark.read \
    .schema(results_schema) \
    .json(f'{raw_folder_path}/{v_file_date}/results.json') \
    .withColumnRenamed('resultId', 'result_id') \
    .withColumnRenamed('raceId', 'race_id') \
    .withColumnRenamed('driverId', 'driver_id') \
    .withColumnRenamed('constructorId', 'constructor_id') \
    .withColumnRenamed('positionText', 'position_text') \
    .withColumnRenamed('positionOrder', 'position_order') \
    .withColumnRenamed('fastestLap', 'fastest_lap') \
    .withColumnRenamed('fastestLapTime', 'fastest_lap_time') \
    .withColumnRenamed('fastestLapSpeed', 'fastest_lap_speed') \
    .drop('statusId') \
    .withColumn('data_source', lit(v_data_source)) \
    .withColumn('file_date', lit(v_file_date)) \
    .withColumn('ingestion_date', current_timestamp())\
    .dropDuplicates(['race_id', 'driver_id'])

In [0]:
v_merge_condition = 'tgt.result_id = tmp.result_id AND tgt.race_id = tmp.race_id'
merge_delta_data(result_df, 'f1_processed', 'results', processed_folder_path, v_merge_condition, 'race_id')

In [0]:
dbutils.notebook.exit('Success')

In [0]:
%sql
select race_id, driver_id, count(*)
from f1_processed.results
group by race_id, driver_id
having count(*) > 1
order by race_id, driver_id desc