In [0]:
from pyspark.sql.functions import coalesce, lit, max as spark_max
import pandas as pd
from pyspark.sql.functions import col

In [0]:
%sql
CREATE TABLE IF NOT EXISTS hcmut.gold.fact_vn_weather_hourly (
    cd_fact_vn_weather_hourly BIGINT PRIMARY KEY 
    GENERATED ALWAYS AS IDENTITY,
    nr_weather_code FLOAT,
    ds_weather_description STRING,
    ds_location STRING,
    cd_bronze_table BIGINT,
    dt_date_record TIMESTAMP,
    nr_temperature_2m FLOAT,
    nr_dew_point_2m FLOAT,
    nr_relative_humidity_2m FLOAT,
    nr_snow_depth FLOAT,
    nr_snowfall FLOAT,
    nr_rain FLOAT,
    nr_precipitation FLOAT,
    nr_apparent_temperature FLOAT,
    nr_pressure_msl FLOAT,
    nr_surface_pressure FLOAT,
    nr_cloud_cover FLOAT,
    nr_cloud_cover_mid FLOAT,
    nr_cloud_cover_low FLOAT,
    nr_cloud_cover_high FLOAT,
    nr_et0_fao_evapotranspiration FLOAT,
    nr_vapour_pressure_deficit FLOAT,
    nr_wind_speed_10m FLOAT,
    nr_wind_direction_10m FLOAT,
    nr_wind_speed_100m FLOAT,
    nr_wind_direction_100m FLOAT,
    nr_wind_gusts_10m FLOAT,
    nr_soil_temperature_0_to_7cm FLOAT,
    nr_soil_temperature_28_to_100cm FLOAT,
    nr_soil_temperature_7_to_28cm FLOAT,
    nr_soil_temperature_100_to_255cm FLOAT,
    nr_soil_moisture_0_to_7cm FLOAT,
    nr_soil_moisture_7_to_28cm FLOAT,
    nr_soil_moisture_28_to_100cm FLOAT,
    nr_soil_moisture_100_to_255cm FLOAT,
    nr_is_day FLOAT,
    nr_sunshine_duration FLOAT,
    cd_location_key INT,
    nr_latitude DOUBLE,
    nr_longitude DOUBLE,
    ds_timezone STRING,
    dt_time_to_bronze TIMESTAMP,
    dt_time_to_gold TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
TBLPROPERTIES('delta.feature.allowColumnDefaults' = 'supported')

In [0]:
bronze_hcm_hourly_df = spark.table(
    "hcmut.bronze.vn_hcm_weather_data_hourly"
).withColumn(
    "cd_location_key", lit(1)
).withColumnRenamed("cd_vn_hcm_weather_data_hourly","cd_bronze_table")

bronze_danang_hourly_df = spark.table(
    "hcmut.bronze.vn_danang_weather_data_hourly"
).withColumn(
    "cd_location_key", lit(2)
).withColumnRenamed("cd_vn_danang_weather_data_daily","cd_bronze_table")

bronze_hanoi_hourly_df = spark.table(
    "hcmut.bronze.vn_hanoi_weather_data_hourly"
).withColumn(
    "cd_location_key", lit(3)
).withColumnRenamed("cd_vn_hanoi_weather_data_daily","cd_bronze_table")

bronze_union_df = bronze_hcm_hourly_df.unionByName(
    bronze_danang_hourly_df, allowMissingColumns=True
).unionByName(
    bronze_hanoi_hourly_df, allowMissingColumns=True
)

dim_weather_desc_df = spark.table("hcmut.gold.dim_weather_code_description")
dim_location_df = spark.table("hcmut.gold.dim_location")

joined_df = bronze_union_df.join(
    dim_weather_desc_df,
    ["nr_weather_code"],
    "left"
).join(
    dim_location_df,
    bronze_union_df["cd_location_key"] == dim_location_df["cd_dim_location_description"],
    "left"
)

deduped_df = joined_df.dropDuplicates(
    ["dt_date_record", "cd_location_key", "nr_weather_code"]
)

deduped_df = deduped_df.drop("cd_dim_location_description").withColumnRenamed(
    "ds_description", "ds_weather_description"
)


display(deduped_df)

In [0]:
fact_max_date = (
    spark.table("hcmut.gold.fact_vn_weather_hourly")
    .select(coalesce(spark_max(col("dt_date_record")), lit("2020-01-01")).alias("max_date"))
    .collect()[0]["max_date"]

)

deduped_df = deduped_df.filter(col("dt_date_record") > fact_max_date)

print("Number of records to be inserted: ", deduped_df.count())


deduped_df.write.format("delta").mode("append").saveAsTable(
    "hcmut.gold.fact_vn_weather_hourly"
)