In [0]:
%run "../commons/common_functions"


In [0]:
%run "../commons/configuration"

In [0]:
dbutils.widgets.text("env_name","DEV")
p_env_name = dbutils.widgets.get("env_name")

In [0]:
dbutils.widgets.text("file_date","2021-03-28")
p_file_date = dbutils.widgets.get("file_date")

In [0]:
from pyspark.sql.functions import current_timestamp
from pyspark.sql.functions import to_timestamp, concat, lit, col
from pyspark.sql.types import *


result_schema = StructType([StructField("resultId", IntegerType(), True), 
                            StructField("raceId", IntegerType(), True), 
                            StructField("driverId", IntegerType(), True), 
                            StructField("constructorId", IntegerType(), True), 
                            StructField("number", IntegerType(), True),
                            StructField("grid", IntegerType(), True),
                            StructField("position", IntegerType(), True),
                            StructField("positionText", StringType(), True),
                            StructField("positionOrder", IntegerType(), True),
                            StructField("points", FloatType(), True),
                            StructField("lap", IntegerType(), True),
                            StructField("time", StringType(), True),
                            StructField("milliseconds", IntegerType(), True),
                            StructField("fastestLap", IntegerType(), True),
                            StructField("rank", IntegerType(), True),
                            StructField("fastestLapTime", StringType(), True),
                            StructField("fastestLapSpeed", FloatType(), True),
                            StructField("statusId", StringType(), True)])

results_df = spark.read.json(f"{raw_mount_path}/{p_file_date}/results.json", schema=result_schema)\
    .withColumnRenamed("resultId", "result_id")\
    .withColumnRenamed("raceId", "race_id")\
    .withColumnRenamed("driverId", "driver_id")\
    .withColumnRenamed("constructorId", "constructor_id")\
    .withColumnRenamed("positionText", "position_text")\
    .withColumnRenamed("positionOrder", "position_order")\
    .withColumnRenamed("fastestLapTime", "fastest_lap_time")\
    .withColumnRenamed("fastestLap", "fastest_lap")\
    .withColumnRenamed("fastestLapSpeed", "fastest_lap_speed")\
    .drop("statusId")\
    .withColumn("data_source",lit(p_env_name))\
    .withColumn("file_date",lit(p_file_date))


In [0]:
for i in results_df.select("race_id").distinct().collect():
    if spark.catalog.tableExists("processed.results"):
        spark.sql("ALTER TABLE processed.results DROP IF EXISTS PARTITION (race_id = {})".format(i.race_id))

In [0]:
results_df_v2 = add_ingestion_date(results_df)

In [0]:
#results_df_v2.write.parquet(f"{processed_mount_path}/results", mode="overwrite", partitionBy="race_id")
results_df_v2.write.partitionBy("race_id").mode("append").format("parquet").saveAsTable("processed.results")

In [0]:
%sql
SELECT race_id, count(1) FROM processed.results
group by race_id
order by 1 DESC

In [0]:
dbutils.notebook.exit("Success")