In [0]:
%run "../../config/config"

In [0]:
dbutils.widgets.text("p_file_date", "")
v_file_date = dbutils.widgets.get("p_file_date")
dbutils.widgets.text("p_data_source", "")
v_data_source = dbutils.widgets.get("p_data_source")

print(v_file_date)
print(v_data_source)

In [0]:
spark.read.json(f"{raw_folder_path}/{v_file_date}/results.json").createOrReplaceTempView("results_race")

In [0]:
%sql
select raceid, count(1) from results_race group by raceid order by raceid desc limit 5

In [0]:
# v_file_date="2021-03-28"
spark.read.json(f"{raw_folder_path}/2021-03-28/results.json").createOrReplaceTempView("results_race_w2")
display(spark.sql("select raceid, count(1) from results_race_w2 group by raceid order by raceid desc limit 5"))

In [0]:
# v_file_date="2021-04-18"
spark.read.json(f"{raw_folder_path}/2021-04-18/results.json").createOrReplaceTempView("results_race_w3")
display(spark.sql("select raceid, count(1) from results_race_w3 group by raceid order by raceid desc limit 5"))

In [0]:
spark.sql("USE CATALOG dev_silver_formula1_catalog")
spark.sql("USE SCHEMA processed_formula1_schema")

# OR 
# spark.sql("USE dev_silver_formula1_catalog.processed_formula1_schema")



In [0]:
display(spark.sql("select current_catalog() , current_schema() , current_database()"))

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

results_schema = StructType([
    StructField("resultId", IntegerType(), True),
    StructField("raceId", IntegerType(), True),
    StructField("driverId", IntegerType(), True),
    StructField("constructorId", IntegerType(), True),
    StructField("number", IntegerType(), True),
    StructField("grid", IntegerType(), True),
    StructField("position", IntegerType(), True),
    StructField("positionText", StringType(), True),
    StructField("positionOrder", IntegerType(), True),
    StructField("points", IntegerType(), True),
    StructField("laps", IntegerType(), True),
    StructField("time", StringType(), True),
    StructField("milliseconds", IntegerType(), True),
    StructField("fastestLap", IntegerType(), True),
    StructField("rank", IntegerType(), True),
    StructField("fastestLapTime", StringType(), True),
    StructField("fastestLapSpeed", DoubleType(), True),
    StructField("statusId", IntegerType(), True)
])

In [0]:
from pyspark.sql.functions import col
from delta.tables import DeltaTable
df = spark.read.json(f"{raw_folder_path}/{v_file_date}/results.json", schema=results_schema)
df.write.mode("overwrite").partitionBy("raceId").format("delta").saveAsTable("f1_processed_race")

# Create the table if it does not exist. Otherwise, replace the existing table.
# df.writeTo("f1_processed_race").createOrReplace()

# Load target as DeltaTable if table already exist
delta_target = DeltaTable.forName(spark, "f1_processed_race")

# Read source as DataFrame and deduplicate
source_df =  (
    df.dropDuplicates(["raceid"])
)

# for debugging
target_df = spark.table("f1_processed_race")

# Find records in source that match by raceid
matched_df = source_df.join(target_df, on="raceid", how="inner")

# Find new records (not matched by raceid)
not_matched_df = source_df.join(target_df, on="raceid", how="left_anti")

num_matched = matched_df.count()
num_not_matched = not_matched_df.count()

display(f"Matched records (updates) : {num_matched}")
display(f"Unmatched records (inserts) : {num_not_matched}")

# Perform the merge directly in Python
delta_target.alias("tgt") \
  .merge(
    source_df.alias("src"),
    "tgt.raceid = src.raceid"
  ) \
  .whenMatchedUpdateAll() \
  .whenNotMatchedInsertAll() \
  .execute()

In [0]:
%sql
-- DROP TABLE dev_silver_formula1_catalog.processed_formula1_schema.f1_processed_race;
 

In [0]:
display(spark.sql("select raceid, count(1) from f1_processed_race group by raceid order by raceid desc limit 110"))

# **incremental load for parquet file**

In [0]:
# Spark by default overwrites the entire table, not just the partitions present in the DataFrame.
# If you only want to overwrite some partitions dynamically (for incoming data frame), then you must set:
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

# Save parquet file to path /processed partitioned by raceId
# Write to a subdirectory to avoid overlap with Unity Catalog managed storage otherwise erro is raised
output_path = f"{processed_folder_path}/raw_output"
df.write.format("parquet").mode("overwrite").partitionBy("raceId").save(output_path)

In [0]:
display(
    spark.read.parquet(output_path)
    .groupBy("raceId")
    .count()
    .orderBy(col("raceId").desc())
)

In [0]:
 %sql
 describe extended f1_processed_race