circuits, races, constructors, drivers -> data from all races ->Autoloader + merge into

results, pitstops, laptimes, qualifying -> data from single/that race -> use autoloader 



In [0]:
batch_files = ['circuits', 'races', 'constructors', 'drivers']
for file in batch_files:
    spark.sql(f'CREATE TABLE IF NOT EXISTS keshcatalog.bronze.{file}')        

In [0]:
incremental_files=['results', 'qualifying', 'pit_stops', 'lap_times']
for file in incremental_files:
    spark.sql(f'CREATE TABLE IF NOT EXISTS keshcatalog.bronze.{file}')     

In [0]:
base_dir = 'abfss://raw@keshstorage09.dfs.core.windows.net/incremental'    

CIRCUITS

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, DateType
from pyspark.sql.functions import *
circuits_schema = StructType([
    StructField('circuitId', IntegerType(), True),
    StructField('circuitRef', StringType(), True),
    StructField('name', StringType(), True),
    StructField('location', StringType(), True),
    StructField('country', StringType(), True),
    StructField('lat', FloatType(), True),
    StructField('lng', FloatType(), True),
    StructField('alt', IntegerType(), True),
    StructField('url', StringType(), True)
])

circuits_df = (
    spark.readStream
    .format('cloudFiles')
    .option('cloudFiles.format', 'csv')
    .option('header', 'true')
    .schema(circuits_schema)
    .load(f'{base_dir}/*/circuits/')
)
circuits_df = circuits_df.withColumn('ingestion_timestamp',current_timestamp() )

In [0]:
from delta.tables import DeltaTable

def mergeinto(microBatchDF, batchId):
    microBatchDF = microBatchDF.dropDuplicates(["circuitId"])
    target = DeltaTable.forName(spark, "keshcatalog.bronze.circuits")
    (target.alias("t")
        .merge(
            microBatchDF.alias("s"),
            "t.circuitId = s.circuitId"
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )

query=(circuits_df.writeStream
   .foreachBatch(mergeinto)
   .option("checkpointLocation", "abfss://raw@keshstorage09.dfs.core.windows.net/checkpoints/circuits")
   .option("mergeSchema", "true")
   .trigger(once=True)
   .start()
)   
query.awaitTermination()   

<pyspark.sql.connect.streaming.query.StreamingQuery at 0x7fed8d990390>

RACES

In [0]:
%sql
CREATE TABLE IF NOT EXISTS keshcatalog.bronze.races_ (
  raceId INT,
  year INT,
  round INT,
  circuitId INT,
  name STRING,
  date DATE,
  time STRING,
  url STRING
) USING delta;


In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, DateType
races_schema = StructType([
    StructField('raceId', IntegerType(), True),
    StructField('year', IntegerType(), True),
    StructField('round', IntegerType(), True),
    StructField('circuitId', IntegerType(), True),
    StructField('name', StringType(), True),
    StructField('date', DateType(), True),
    StructField('time', StringType(), True),
    StructField('url', StringType(), True)
])

races_df = (
    spark.readStream
    .format('cloudFiles')
    .option('cloudFiles.format', 'csv')
    .option('header', 'true')
    .schema(races_schema)
    .load(f'{base_dir}/*/races/')
)
races_df = races_df.withColumn('ingestion_timestamp',current_timestamp() )

In [0]:
from delta.tables import DeltaTable

def mergeinto(microBatchDF, batchId):
    microBatchDF = microBatchDF.dropDuplicates(["raceId"])
    target = DeltaTable.forName(spark, "keshcatalog.bronze.races_")

    for col in microBatchDF.columns:
        if col.lower() in [c.lower() for c in target.toDF().columns]:
            microBatchDF = microBatchDF.withColumnRenamed(col, col.lower())
    
    (target.alias("t")
        .merge(
            microBatchDF.alias("s"),
            "t.raceid = s.raceid"   
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )


query=(
    races_df.writeStream
        .foreachBatch(mergeinto)
        .option("checkpointLocation", "abfss://raw@keshstorage09.dfs.core.windows.net/checkpoints/races")
        .trigger(once=True)
        .start()
)
query.awaitTermination()   

<pyspark.sql.connect.streaming.query.StreamingQuery at 0x7fedb53b22d0>

CONSTRUCTORS

In [0]:
constructors_schema = StructType([
    StructField('constructorId', IntegerType(), True),
    StructField('constructorRef', StringType(), True),
    StructField('name', StringType(), True),
    StructField('nationality', StringType(), True),
    StructField('url', StringType(), True)
])

constructors_df = (
    spark.readStream
    .format('cloudFiles')
    .option('cloudFiles.format','json')
    .option('header', 'true')
    .schema(constructors_schema)
    .load(f'{base_dir}/*/constructors')
)
constructors_df = constructors_df.withColumn('ingestion_timestamp',current_timestamp() )

In [0]:
def mergeinto(microBatchDF, batchId):
    microBatchDF=microBatchDF.dropDuplicates(["constructorId"])
    target = DeltaTable.forName(spark, "keshcatalog.bronze.constructors")
    (target.alias("t")
        .merge(
            microBatchDF.alias("s"),
            "t.constructorId = s.constructorId"
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )


query=(constructors_df.writeStream
.foreachBatch(mergeinto)
.option('checkpointLocation','abfss://raw@keshstorage09.dfs.core.windows.net/checkpoints/constructors')
.option("mergeSchema", "true")
.trigger(once=True)
.start()
)
query.awaitTermination()   

<pyspark.sql.connect.streaming.query.StreamingQuery at 0x7f6687cd7a90>

DRIVERS

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType


name_schema = StructType([
    StructField("forename", StringType(), True),
    StructField("surname", StringType(), True)
])

driver_schema = StructType([
    StructField("driverId", IntegerType(), True),
    StructField("driverRef", StringType(), True),
    StructField("number", StringType(), True),
    StructField("code", StringType(), True),
    StructField("name", name_schema, True),   
    StructField("dob", DateType(), True),
    StructField("nationality", StringType(), True),
    StructField("url", StringType(), True)
])


drivers_df_nested = (spark.readStream
    .format("cloudFiles") 
    .option('cloudFiles.format','json')
    .option('header','true')
    .schema(driver_schema) 
    .load(f"{base_dir}/*/drivers"))


drivers_df = drivers_df_nested.select(
    "driverId",
    "driverRef",
    "number",
    "code",
    "name.forename",
    "name.surname",
    "dob",
    "nationality",
    "url"
)
drivers_df = drivers_df.withColumn('ingestion_timestamp',current_timestamp() )


In [0]:
def mergeinto(microBatchDF, batchId):
    microBatchDF=microBatchDF.dropDuplicates(["driverId"])
    target = DeltaTable.forName(spark, "keshcatalog.bronze.drivers")  
    (target.alias("t")
        .merge(
            microBatchDF.alias("s") ,
            "t.driverId = s.driverId"
        )        
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )


query=(drivers_df.writeStream
.foreachBatch(mergeinto)
.option('checkpointLocation','abfss://raw@keshstorage09.dfs.core.windows.net/checkpoints/drivers')
.option("mergeSchema", "true")
.trigger(once=True)
.start()
)
query.awaitTermination()  

<pyspark.sql.connect.streaming.query.StreamingQuery at 0x7f66b4fe9310>

RESULTS DATASET

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType


date = '2021-03-21'
base_dir = 'abfss://raw@keshstorage09.dfs.core.windows.net/incremental'


results_schema = StructType([
    StructField("resultId", IntegerType(), True),
    StructField("raceId", IntegerType(), True),
    StructField("driverId", IntegerType(), True),
    StructField("constructorId", IntegerType(), True),
    StructField("number", IntegerType(), True),
    StructField("grid", IntegerType(), True),
    StructField("position", IntegerType(), True),
    StructField("positionText", StringType(), True),
    StructField("positionOrder", IntegerType(), True),
    StructField("points", FloatType(), True),
    StructField("laps", IntegerType(), True),
    StructField("time", StringType(), True),
    StructField("milliseconds", IntegerType(), True),
    StructField("fastestLap", IntegerType(), True),
    StructField("rank", IntegerType(), True),
    StructField("fastestLapTime", StringType(), True),
    StructField("fastestLapSpeed", FloatType(), True),
    StructField("statusId", IntegerType(), True)
])
results_df = spark.readStream.format('cloudFiles').option('cloudFiles.format', 'json').option('header','true').schema(results_schema).load(f'{base_dir}/*/results/')
results_df = results_df.withColumn('ingestion_timestamp',current_timestamp() )


In [0]:
query=(results_df.writeStream \
    .format("delta") \
    .option(
        "checkpointLocation",
        "abfss://raw@keshstorage09.dfs.core.windows.net/checkpoints/results/"
    ) \
    .option("mergeSchema", "true") \
    .outputMode("append") \
    .trigger(once=True) \
    .table("keshcatalog.bronze.results") )
query.awaitTermination()  

<pyspark.sql.connect.streaming.query.StreamingQuery at 0x7f66b3a37c10>

LAP TIMES DATASET

In [0]:
laps_times_schema = StructType([
    StructField("raceId", IntegerType(), True),
    StructField("driverId", IntegerType(), True),
    StructField("lap", IntegerType(), True),
    StructField("position", IntegerType(), True),
    StructField("time", StringType(), True),
    StructField("milliseconds", IntegerType(), True)
]) 
laps_times_df = spark.readStream.format('cloudFiles').option('cloudFiles.format','csv').schema(laps_times_schema).load(f'{base_dir}/*/lap_times/')
laps_times_df = laps_times_df.withColumn('ingestion_timestamp',current_timestamp() )

In [0]:
query=laps_times_df.writeStream \
    .format("delta") \
    .option(
        "checkpointLocation",
        "abfss://raw@keshstorage09.dfs.core.windows.net/checkpoints/lap_times/"
    ) \
    .option("mergeSchema", "true") \
    .outputMode("append") \
    .trigger(once=True) \
    .table("keshcatalog.bronze.lap_times")
query.awaitTermination() 

<pyspark.sql.connect.streaming.query.StreamingQuery at 0x7f66865eac90>

PIT STOPS DATASET

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

pit_stops_schema = StructType([
    StructField("raceId", IntegerType(), True),
    StructField("driverId", IntegerType(), True),
    StructField("stop", IntegerType(), True),
    StructField("lap", IntegerType(), True),
    StructField("time", StringType(), True),
    StructField("duration", FloatType(), True),
    StructField("milliseconds", IntegerType(), True)
])

pit_stops_df = spark.readStream.format('cloudFiles').option('cloudFiles.format', 'json').option('header','true').option('multiline','true').schema(pit_stops_schema).load(f'{base_dir}/*/pit_stops/')
pit_stops_df = pit_stops_df.withColumn('ingestion_timestamp',current_timestamp() )

In [0]:
query=pit_stops_df.writeStream \
    .format("delta") \
    .option(
        "checkpointLocation",
        "abfss://raw@keshstorage09.dfs.core.windows.net/checkpoints/pit_stops/"
    ) \
    .option("mergeSchema", "true") \
    .outputMode("append") \
    .trigger(once=True) \
    .table("keshcatalog.bronze.pit_stops")
query.awaitTermination() 

<pyspark.sql.connect.streaming.query.StreamingQuery at 0x7f6686403ad0>

QUALIFYING DATASET

In [0]:

qualifying_schema = StructType([
    StructField("qualifyId", IntegerType(), True),
    StructField("raceId", IntegerType(), True),
    StructField("driverId", IntegerType(), True),
    StructField("constructorId", IntegerType(), True),
    StructField("number", IntegerType(), True),
    StructField("position", IntegerType(), True),
    StructField("q1", StringType(), True),
    StructField("q2", StringType(), True),
    StructField("q3", StringType(), True)
])
qualifying_df = spark.readStream.format('cloudFiles').option('cloudFiles.format', 'json').option('header','true').option('multiline','true').schema(qualifying_schema).load(f'{base_dir}/*/qualifying')
qualifying_df = qualifying_df.withColumn('ingestion_timestamp',current_timestamp() )

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-5341728883069491>, line 1[0m
[0;32m----> 1[0m qualifying_schema [38;5;241m=[39m StructType([
[1;32m      2[0m     StructField([38;5;124m"[39m[38;5;124mqualifyId[39m[38;5;124m"[39m, IntegerType(), [38;5;28;01mTrue[39;00m),
[1;32m      3[0m     StructField([38;5;124m"[39m[38;5;124mraceId[39m[38;5;124m"[39m, IntegerType(), [38;5;28;01mTrue[39;00m),
[1;32m      4[0m     StructField([38;5;124m"[39m[38;5;124mdriverId[39m[38;5;124m"[39m, IntegerType(), [38;5;28;01mTrue[39;00m),
[1;32m      5[0m     StructField([38;5;124m"[39m[38;5;124mconstructorId[39m[38;5;124m"[39m, IntegerType(), [38;5;28;01mTrue[39;00m),
[1;32m      6[0m     StructField([38;5;124m"[39m[38;5;124mnumber[39m[38;5;124m"[39m, IntegerType(), [38;5;28;01mTrue[39;00m),
[1;32m      7

In [0]:
query=qualifying_df.writeStream \
    .format("delta") \
    .option(
        "checkpointLocation",
        "abfss://raw@keshstorage09.dfs.core.windows.net/checkpoints/qualifying/"
    ) \
    .option("mergeSchema", "true") \
    .outputMode("append") \
    .trigger(once=True) \
    .table("keshcatalog.bronze.qualifying")
query.awaitTermination() 

<pyspark.sql.connect.streaming.query.StreamingQuery at 0x7f6687cba3d0>