In [0]:
import pyspark.sql.functions as F
from src.transformations import ingest_raw_to_bronze, raw_to_silver, calculate_extreme_weather, save_to_delta

In [0]:
raw_path = "dbfs:/tmp/knmi_data/data/"

In [0]:
%sql
DROP TABLE IF EXISTS kevinheimbach.default.knmi_bronze;
DROP TABLE IF EXISTS kevinheimbach.default.knmi_silver;
DROP TABLE IF EXISTS kevinheimbach.default.knmi_coldwaves_gold;
DROP TABLE IF EXISTS kevinheimbach.default.knmi_heatwaves_gold;

In [0]:
# --- BRONZE ---
bronze_df = ingest_raw_to_bronze(spark, raw_path)
save_to_delta(bronze_df, "kevinheimbach.default.knmi_bronze")

# --- SILVER ---
bronze_df = spark.read.table("kevinheimbach.default.knmi_bronze")
silver_df = raw_to_silver(bronze_df)
save_to_delta(silver_df, "kevinheimbach.default.knmi_silver")

# --- GOLD ---
silver_df = spark.read.table("kevinheimbach.default.knmi_silver")

silver_df.cache()

heatwaves_df = calculate_extreme_weather(
    silver_df,
    base_threshold=25.0, 
    base_col="max_temp",
    intensity_threshold=30.0, 
    intensity_col="max_temp",
    operator="ge"
)

coldwaves_df = calculate_extreme_weather(
    silver_df,
    base_threshold=0.0, 
    base_col="max_temp",
    intensity_threshold=-10.0, 
    intensity_col="min_temp",
    operator="lt"
)

save_to_delta(heatwaves_df, "kevinheimbach.default.knmi_heatwaves_gold")
save_to_delta(coldwaves_df, "kevinheimbach.default.knmi_coldwaves_gold")

silver_df.unpersist()

In [0]:
import pyspark.sql.functions as F

display(spark.read.table("kevinheimbach.default.knmi_bronze")
        .filter(F.col("location") == "260_T_a")
        # .filter(F.col("tx_dryb_10").between(-20,40))
        # .filter(F.col("tn_dryb_10").between(-20,40))
        .withColumn("date", F.to_date(F.col("dtg")))
        .groupBy("location", "date").agg(
            F.max("tx_dryb_10").alias("max_temp"),
            F.min("tn_dryb_10").alias("min_temp")
        )
        .select("location", "date", "max_temp", "min_temp")
)

Databricks visualization. Run in Databricks to view.