In [0]:
spark.conf.set("fs.azure.account.key.weatherdatastorage09.dfs.core.windows.net", "B2bEeFnUp3PZ3F30/WS4303QZSBtriz3pc4ntFa6cH5kbSALhzmK8YLN1wCUrpTNjGoVSQXNZQH8+ASt+5Nm4w==")
raw_path = "abfss://bronzelayer@weatherdatastorage09.dfs.core.windows.net/weather_raw/"

In [0]:
account   = "weatherdatastorage09"
container = "bronzelayer"
raw_dir   = "weather_raw"       
key       = "B2bEeFnUp3PZ3F30/WS4303QZSBtriz3pc4ntFa6cH5kbSALhzmK8YLN1wCUrpTNjGoVSQXNZQH8+ASt+5Nm4w=="

spark.conf.set(f"fs.azure.account.auth.type.{account}.dfs.core.windows.net", "SharedKey")
spark.conf.set(f"fs.azure.account.key.{account}.dfs.core.windows.net",  key)

raw_path    = f"abfss://{container}@{account}.dfs.core.windows.net/{raw_dir}/"
bronze_path = f"abfss://{container}@{account}.dfs.core.windows.net/bronze"


In [0]:
from pyspark.sql.functions import col, current_timestamp

def copy_to_bronze(csv):
    (
        spark.read          
             .option("header", "true")
             .option("inferSchema", "true")
             .csv(raw_path + csv)
             .withColumn("source_file", col("_metadata.file_path"))
             .withColumn("ingestion_time", current_timestamp())
             .write
             .format("delta")
             .option("delta.columnMapping.mode", "name") 
             .mode("overwrite") 
             .save(f"{bronze_path}/{csv}")
    )

csv_files = [
    "city_attributes.csv", "humidity.csv", "pressure.csv", "temperature.csv",
    "weather_description.csv", "wind_direction.csv", "wind_speed.csv"
]

for f in csv_files:
    copy_to_bronze(f)

In [0]:
from pyspark.sql.functions import col, expr
from pyspark.sql.types import StringType
import pyspark.sql.functions as F

humidity_df = spark.read.format("delta").load(f"{bronze_path}/humidity.csv")
temperature_df = spark.read.format("delta").load(f"{bronze_path}/temperature.csv")

city_cols = [c for c in humidity_df.columns if c not in ['datetime', 'ingestion_time', 'source_file']]

humidity_long = humidity_df.selectExpr("datetime", *[f"`{city}` as `{city}`" for city in city_cols]) \
    .selectExpr("datetime", "stack(" + str(len(city_cols)) + ", " + 
                ", ".join([f"'{c}', `{c}`" for c in city_cols]) + 
                ") as (location, humidity)")

temperature_long = temperature_df.selectExpr("datetime", *[f"`{city}` as `{city}`" for city in city_cols]) \
    .selectExpr("datetime", "stack(" + str(len(city_cols)) + ", " + 
                ", ".join([f"'{c}', `{c}`" for c in city_cols]) + 
                ") as (location, temperature)")

silver_df = humidity_long.join(
    temperature_long, ["datetime", "location"]
)

silver_path = f"abfss://{container}@{account}.dfs.core.windows.net/silverlayer"
silver_df.write.format("delta").mode("overwrite").save(f"{silver_path}/weather_combined")

silver_df.display(5)

datetime,location,humidity,temperature
2014-07-18T08:00:00Z,Vancouver,82.0,289.41
2014-07-18T09:00:00Z,Vancouver,82.0,288.71
2014-07-18T10:00:00Z,Vancouver,87.0,287.78
2014-07-18T11:00:00Z,Vancouver,87.0,287.52
2014-07-18T12:00:00Z,Vancouver,93.0,286.84
2014-07-18T13:00:00Z,Vancouver,87.0,287.09
2014-07-18T14:00:00Z,Vancouver,87.0,287.75
2014-07-18T15:00:00Z,Vancouver,82.0,289.58
2014-07-18T16:00:00Z,Vancouver,63.0,290.0
2014-07-18T17:00:00Z,Vancouver,63.0,292.67


In [0]:
from pyspark.sql.functions import col

# Load Silver data
silver_df = spark.read.format("delta").load(f"{silver_path}/weather_combined")

# Load pressure and wind_speed data (from bronze)
pressure_df = spark.read.format("delta").load(f"{bronze_path}/pressure.csv")
wind_speed_df = spark.read.format("delta").load(f"{bronze_path}/wind_speed.csv")

# Get city columns again (excluding metadata)
city_cols = [c for c in pressure_df.columns if c not in ['datetime', 'ingestion_time', 'source_file']]

# Unpivot pressure
pressure_long = pressure_df.selectExpr("datetime", *[f"`{city}` as `{city}`" for city in city_cols]) \
    .selectExpr("datetime", "stack(" + str(len(city_cols)) + ", " +
                ", ".join([f"'{c}', `{c}`" for c in city_cols]) +
                ") as (location, pressure)")

# Unpivot wind_speed
wind_speed_long = wind_speed_df.selectExpr("datetime", *[f"`{city}` as `{city}`" for city in city_cols]) \
    .selectExpr("datetime", "stack(" + str(len(city_cols)) + ", " +
                ", ".join([f"'{c}', `{c}`" for c in city_cols]) +
                ") as (location, wind_speed)")

# Join all into Gold
gold_df = silver_df.alias("s") \
    .join(pressure_long.alias("p"), ["datetime", "location"]) \
    .join(wind_speed_long.alias("w"), ["datetime", "location"]) \
    .select(
        "s.datetime", "s.location", "temperature", "humidity",
        "p.pressure", "w.wind_speed"
    )

# Save Gold Layer
gold_path = f"abfss://{container}@{account}.dfs.core.windows.net/goldlayer"
gold_df.write.format("delta").mode("overwrite").save(f"{gold_path}/weather_features")

gold_df.display(5)


datetime,location,temperature,humidity,pressure,wind_speed
2014-07-18T08:00:00Z,Vancouver,289.41,82.0,1016.0,3.0
2014-07-18T09:00:00Z,Vancouver,288.71,82.0,1016.0,1.0
2014-07-18T10:00:00Z,Vancouver,287.78,87.0,1016.0,3.0
2014-07-18T11:00:00Z,Vancouver,287.52,87.0,1016.0,2.0
2014-07-18T12:00:00Z,Vancouver,286.84,93.0,1017.0,2.0
2014-07-18T13:00:00Z,Vancouver,287.09,87.0,1017.0,3.0
2014-07-18T14:00:00Z,Vancouver,287.75,87.0,1017.0,2.0
2014-07-18T15:00:00Z,Vancouver,289.58,82.0,1018.0,2.0
2014-07-18T16:00:00Z,Vancouver,290.0,63.0,1018.0,3.0
2014-07-18T17:00:00Z,Vancouver,292.67,63.0,1018.0,3.0
