### ============================================================
## BRONZE → SILVER TRANSFORMATION PIPELINE
### ============================================================
### Purpose:
###   - Read Bronze Delta table
###   - Clean, standardize, type‑cast, impute, validate
###   - Add engineered features
###   - Write Silver Delta table (UC-managed)
### ============================================================


In [0]:
#Authorization
spark.conf.set(
  "fs.azure.account.key.<container>.blob.core.windows.net",
  "<security key>"
)

In [0]:
#Read Bronze table
df = spark.table("<custom>.default.project2_bronze")

In [0]:
#STANDARDIZE COLUMN NAMES
for col_name in df.columns:
    df = df.withColumnRenamed(
        col_name,
        col_name.strip().lower().replace(" ", "_")
    )

In [0]:
#REMOVE DUPLICATES
df = df.dropDuplicates()

In [0]:
#TYPE CASTING
from pyspark.sql.functions import col, to_timestamp

df = df.withColumn("temperature", col("temperature").cast("float"))
df = df.withColumn("humidity", col("humidity").cast("float"))
df = df.withColumn("timestamp", to_timestamp("timestamp"))
df = df.withColumn("device_id", col("device_id").cast("string"))
df = df.withColumn("status", col("status").cast("string"))


In [0]:
#HANDLE MISSING VALUES (NUMERIC + CATEGORICAL)
from pyspark.sql.functions import avg

# numeric imputations
num_cols = ["temperature", "humidity"]
impute_vals = {c: df.select(avg(c)).first()[0] for c in num_cols}
df = df.fillna(impute_vals)

# categorical imputations
cat_cols = ["device_id", "status"]
df = df.fillna({c: "unknown" for c in cat_cols})

# drop rows missing timestamp (key column)
df = df.filter(col("timestamp").isNotNull())


In [0]:
#NORMALIZE STATUS
from pyspark.sql.functions import lower, trim

df = df.withColumn("status", lower(trim(col("status"))))

In [0]:
#FILTER INVALID VALUES
df = df.filter(col("humidity") >= 0)

In [0]:
#FEATURE ENGINEERING
from pyspark.sql.functions import year, month, dayofweek

df = df.withColumn("year", year("timestamp"))
df = df.withColumn("month", month("timestamp"))
df = df.withColumn("weekday", dayofweek("timestamp"))


In [0]:
#WRITE SILVER TABLE (UC‑MANAGED DELTA)
df.write.format("delta").mode("overwrite").saveAsTable("<custom>.default.project2_silver")

In [0]:
%sql
-- ============================================================
-- VALIDATION / INSPECTION
-- Validation only. Not part of pipeline logic.
-- ============================================================

SELECT * FROM uzi.default.project2_silver LIMIT 20;

device_id,timestamp,temperature,humidity,status,hour_of_day,temperature_f,year,month,weekday
dev-01,2026-01-01T23:53:00Z,16.86,45.42,on,,,2026,1,5
dev-02,2026-01-01T15:26:00Z,22.43,48.08,off,,,2026,1,5
dev-02,2026-01-01T20:43:00Z,15.58,75.82,on,,,2026,1,5
dev-03,2026-01-01T17:37:00Z,34.06,53.58,off,,,2026,1,5
dev-03,2026-01-01T08:32:00Z,20.31,76.24,on,,,2026,1,5
dev-01,2026-01-01T09:20:00Z,23.29,50.01,on,,,2026,1,5
dev-01,2026-01-01T20:06:00Z,17.83,63.54,off,,,2026,1,5
dev-03,2026-01-01T17:43:00Z,23.81,29.86,on,,,2026,1,5
dev-03,2026-01-01T23:18:00Z,26.47,77.77,on,,,2026,1,5
dev-02,2026-01-01T09:17:00Z,19.19,30.0,off,,,2026,1,5
