In [0]:
from pyspark.sql.functions import (
    col, lit, coalesce, 
    round as spark_round,
    avg as spark_avg,
    when, hour, date_format
)
from datetime import datetime

spark.sql("USE energy_analytics")

DataFrame[]

#Combine Historical + Live Prices

In [0]:


try:
    last_prices_ts = spark.sql("SELECT MAX(datetime_aest) as last_ts FROM silver_prices").collect()[0]['last_ts']
    if last_prices_ts is None:
        last_prices_ts = '2025-01-01 00:00:00'
    print(f"📅 Last price timestamp: {last_prices_ts}")
except:
    last_prices_ts = '2025-01-01 00:00:00'
    print(f"⚠️  silver_prices doesn't exist yet, processing all data")

# Live prices - only NEW records
df_live_prices = spark.table("bronze_aemo_live_prices") \
    .filter(col("settlement_datetime_aest") > last_prices_ts) \
    .select(
        col("region"),
        col("settlement_datetime_aest").alias("datetime_aest"),
        col("settlement_date"),
        col("price"),
        col("raise_reg"),
        col("lower_reg"),
        lit(None).cast("double").alias("total_demand"),
        col("source")
    )

# Historical prices - only NEW records
df_hist_prices = spark.table("bronze_aemo_historical_prices") \
    .filter(col("settlement_datetime_aest") > last_prices_ts) \
    .select(
        col("region"),
        col("settlement_datetime_aest").alias("datetime_aest"),
        col("settlement_date"),
        col("price"),
        lit(None).cast("double").alias("raise_reg"),
        lit(None).cast("double").alias("lower_reg"),
        col("total_demand"),
        col("source")
    )


df_prices_all = df_live_prices.union(df_hist_prices)


df_prices_all = df_prices_all.withColumn("price_tier", 
    when(col("price") < 0, "negative")
    .when(col("price") < 50, "cheap")
    .when(col("price") < 100, "normal")
    .when(col("price") < 300, "expensive")
    .otherwise("spike")
)

print(f"✅ Live new:       {df_live_prices.count()}")
print(f"✅ Historical new: {df_hist_prices.count()}")
print(f"✅ Total new:      {df_prices_all.count()}")

if df_prices_all.count() > 0:
    
    df_prices_all.dropDuplicates(["region", "datetime_aest"]) \
        .write.format("delta") \
        .mode("append") \
        .partitionBy("settlement_date", "region") \
        .saveAsTable("silver_prices")
    print("✅ Appended to silver_prices")
else:
    print("⏭️  No new records to append")

df_prices_all.orderBy("datetime_aest", "region").show(5)

📅 Last price timestamp: 2026-02-20 16:35:52.630667
✅ Live new:       0
✅ Historical new: 0
✅ Total new:      0
⏭️  No new records to append
+------+-------------+---------------+-----+---------+---------+------------+------+----------+
|region|datetime_aest|settlement_date|price|raise_reg|lower_reg|total_demand|source|price_tier|
+------+-------------+---------------+-----+---------+---------+------------+------+----------+
+------+-------------+---------------+-----+---------+---------+------------+------+----------+



In [0]:

# Get last processed timestamp
try:
    last_gen_ts = spark.sql("SELECT MAX(datetime_aest) as last_ts FROM silver_generation").collect()[0]['last_ts']
    if last_gen_ts is None:
        last_gen_ts = '2025-01-01 00:00:00'
    print(f"📅 Last generation timestamp: {last_gen_ts}")
except:
    last_gen_ts = '2025-01-01 00:00:00'
    print(f"⚠️  silver_generation doesn't exist yet, processing all data")


df_live_gen = spark.table("bronze_aemo_generation") \
    .filter(col("settlement_datetime_aest") > last_gen_ts) \
    .select(
        col("region"),
        col("settlement_datetime_aest").alias("datetime_aest"),
        col("settlement_date"),
        col("scheduled_generation"),
        col("semischeduled_generation"),
        col("total_demand"),
        col("net_interchange"),
        lit(None).cast("double").alias("battery_mwh"),
        col("source")
    )


df_hist_gen = spark.table("bronze_aemo_historical_generation") \
    .filter(col("interval_datetime") > last_gen_ts) \
    .select(
        col("region"),
        col("interval_datetime").alias("datetime_aest"),
        col("settlement_date"),
        col("scheduled_generation"),
        col("semischeduled_generation"),
        col("total_demand"),
        lit(None).cast("double").alias("net_interchange"),
        col("battery_mwh"),
        col("source")
    )

# Union
df_gen_all = df_live_gen.union(df_hist_gen)

# Add calculated columns
df_gen_all = df_gen_all \
    .withColumn("total_generation", 
        col("scheduled_generation") + col("semischeduled_generation")
    ) \
    .withColumn("renewable_pct",
        spark_round((col("semischeduled_generation") / col("total_generation")) * 100, 1)
    )

print(f"✅ Live new:       {df_live_gen.count()}")
print(f"✅ Historical new: {df_hist_gen.count()}")
print(f"✅ Total new:      {df_gen_all.count()}")

if df_gen_all.count() > 0:
    df_gen_all.dropDuplicates(["region", "datetime_aest"]) \
        .write.format("delta") \
        .mode("append") \
        .partitionBy("settlement_date", "region") \
        .saveAsTable("silver_generation")
    print("✅ Appended to silver_generation")
else:
    print("⏭️  No new records to append")

df_gen_all.orderBy("datetime_aest", "region").show(5)

📅 Last generation timestamp: 2026-02-20 16:35:52.630667
✅ Live new:       0
✅ Historical new: 0
✅ Total new:      0
⏭️  No new records to append
+------+-------------+---------------+--------------------+------------------------+------------+---------------+-----------+------+----------------+-------------+
|region|datetime_aest|settlement_date|scheduled_generation|semischeduled_generation|total_demand|net_interchange|battery_mwh|source|total_generation|renewable_pct|
+------+-------------+---------------+--------------------+------------------------+------------+---------------+-----------+------+----------------+-------------+
+------+-------------+---------------+--------------------+------------------------+------------+---------------+-----------+------+----------------+-------------+



In [0]:


# Get last processed timestamp
try:
    last_weather_ts = spark.sql("SELECT MAX(datetime_aest) as last_ts FROM silver_weather").collect()[0]['last_ts']
    if last_weather_ts is None:
        last_weather_ts = '2025-01-01 00:00:00'
    print(f"📅 Last weather timestamp: {last_weather_ts}")
except:
    last_weather_ts = '2025-01-01 00:00:00'
    print(f"⚠️  silver_weather doesn't exist yet, processing all data")

# Live weather - only NEW records
df_live_weather = spark.table("bronze_bom_weather") \
    .filter(col("observation_datetime_aest") > last_weather_ts) \
    .select(
        col("observation_datetime_aest").alias("datetime_aest"),
        col("settlement_date"),
        col("station_id"),
        col("station_name"),
        col("air_temp"),
        col("apparent_temp"),
        col("relative_humidity"),
        col("wind_speed_kmh").cast("double"),
        lit(None).cast("double").alias("precipitation"),
        col("source")
    )

# Historical weather - only NEW records
df_hist_weather = spark.table("bronze_historical_weather") \
    .filter(col("observation_datetime_aest") > last_weather_ts) \
    .select(
        col("observation_datetime_aest").alias("datetime_aest"),
        col("settlement_date"),
        col("station_id"),
        col("station_name"),
        col("air_temp"),
        col("apparent_temp"),
        col("relative_humidity"),
        col("wind_speed_kmh"),
        col("precipitation"),
        col("source")
    )


df_weather_all = df_live_weather.union(df_hist_weather)

# Average across stations per timestamp
df_weather_clean = df_weather_all \
    .groupBy("datetime_aest", "settlement_date") \
    .agg(
        spark_avg("air_temp").alias("avg_temp"),
        spark_avg("apparent_temp").alias("avg_apparent_temp"),
        spark_avg("relative_humidity").alias("avg_humidity"),
        spark_avg("wind_speed_kmh").alias("avg_wind_speed"),
        spark_avg("precipitation").alias("total_precipitation")
    ) \
    .withColumn("avg_temp", spark_round(col("avg_temp"), 1)) \
    .withColumn("avg_apparent_temp", spark_round(col("avg_apparent_temp"), 1))

print(f"✅ Live new:       {df_live_weather.count()}")
print(f"✅ Historical new: {df_hist_weather.count()}")
print(f"✅ Total new:      {df_weather_clean.count()}")

if df_weather_clean.count() > 0:
    df_weather_clean.dropDuplicates(["datetime_aest"]) \
        .write.format("delta") \
        .mode("append") \
        .partitionBy("settlement_date") \
        .saveAsTable("silver_weather")
    print("✅ Appended to silver_weather")
else:
    print("⏭️  No new records to append")

df_weather_clean.orderBy("datetime_aest").show(5)

📅 Last weather timestamp: 2026-02-20 23:00:00
✅ Live new:       0
✅ Historical new: 0
✅ Total new:      0
⏭️  No new records to append
+-------------+---------------+--------+-----------------+------------+--------------+-------------------+
|datetime_aest|settlement_date|avg_temp|avg_apparent_temp|avg_humidity|avg_wind_speed|total_precipitation|
+-------------+---------------+--------+-----------------+------------+--------------+-------------------+
+-------------+---------------+--------+-----------------+------------+--------------+-------------------+



#CELL 5: Save Unified Tables & Add Calculated Columns

In [0]:


from pyspark.sql.functions import round as spark_round, when, hour

df_prices_clean = df_prices_all \
    .withColumn("price_tier", 
        when(col("price") < 0, "negative")
        .when(col("price") < 50, "cheap")
        .when(col("price") < 100, "normal")
        .when(col("price") < 300, "expensive")
        .otherwise("spike")
    ) \
    .dropDuplicates(["region", "datetime_aest"]) 

df_prices_clean.write.format("delta") \
    .mode("overwrite") \
    .partitionBy("settlement_date", "region") \
    .saveAsTable("energy_analytics.silver_prices")

print(f"✅ Saved silver_prices: {df_prices_clean.count()} records")


df_gen_clean = df_gen_all \
    .withColumn("total_generation", 
        col("scheduled_generation") + col("semischeduled_generation")
    ) \
    .withColumn("renewable_pct",
        spark_round((col("semischeduled_generation") / col("total_generation")) * 100, 1)
    ) \
    .dropDuplicates(["region", "datetime_aest"])

df_gen_clean.write.format("delta") \
    .mode("overwrite") \
    .partitionBy("settlement_date", "region") \
    .saveAsTable("energy_analytics.silver_generation")

print(f"✅ Saved silver_generation: {df_gen_clean.count()} records")


df_weather_clean = df_weather_all \
    .groupBy("datetime_aest", "settlement_date") \
    .agg(
        spark_avg("air_temp").alias("avg_temp"),
        spark_avg("apparent_temp").alias("avg_apparent_temp"),
        spark_avg("relative_humidity").alias("avg_humidity"),
        spark_avg("wind_speed_kmh").alias("avg_wind_speed"),
        spark_avg("precipitation").alias("total_precipitation")
    ) \
    .withColumn("avg_temp", spark_round(col("avg_temp"), 1)) \
    .withColumn("avg_apparent_temp", spark_round(col("avg_apparent_temp"), 1))

df_weather_clean.write.format("delta") \
    .mode("overwrite") \
    .partitionBy("settlement_date") \
    .saveAsTable("energy_analytics.silver_weather")

print(f"✅ Saved silver_weather: {df_weather_clean.count()} records")


print("\n📊 Sample data:")
df_prices_clean.filter(col("region") == "VIC1").orderBy("datetime_aest").show(3)
df_gen_clean.filter(col("region") == "VIC1").orderBy("datetime_aest").show(3)
df_weather_clean.orderBy("datetime_aest").show(3)

✅ Saved silver_prices: 0 records
✅ Saved silver_generation: 0 records
✅ Saved silver_weather: 0 records

📊 Sample data:
+------+-------------+---------------+-----+---------+---------+------------+------+----------+
|region|datetime_aest|settlement_date|price|raise_reg|lower_reg|total_demand|source|price_tier|
+------+-------------+---------------+-----+---------+---------+------------+------+----------+
+------+-------------+---------------+-----+---------+---------+------------+------+----------+

+------+-------------+---------------+--------------------+------------------------+------------+---------------+-----------+------+----------------+-------------+
|region|datetime_aest|settlement_date|scheduled_generation|semischeduled_generation|total_demand|net_interchange|battery_mwh|source|total_generation|renewable_pct|
+------+-------------+---------------+--------------------+------------------------+------------+---------------+-----------+------+----------------+-------------+
+--

# CELL 6: Create Final Unified Silver Table VIC1 - Melbourne

In [0]:



try:
    last_melb_ts = spark.sql("SELECT MAX(datetime_aest) as last_ts FROM silver_energy_melbourne").collect()[0]['last_ts']
    if last_melb_ts is None:
        last_melb_ts = '2025-01-01 00:00:00'
    print(f"📅 Last Melbourne timestamp: {last_melb_ts}")
except:
    last_melb_ts = '2025-01-01 00:00:00'
    print(f"⚠️  silver_energy_melbourne doesn't exist yet, processing all data")


df_prices_vic = spark.table("silver_prices") \
    .filter(col("region") == "VIC1") \
    .filter(col("datetime_aest") > last_melb_ts) \
    .alias("p")

df_gen_vic = spark.table("silver_generation") \
    .filter(col("region") == "VIC1") \
    .filter(col("datetime_aest") > last_melb_ts) \
    .alias("g")

df_weather = spark.table("silver_weather") \
    .filter(col("datetime_aest") > last_melb_ts) \
    .alias("w")


df_silver_energy = df_prices_vic \
    .join(df_gen_vic, 
          (col("p.datetime_aest") == col("g.datetime_aest")) & 
          (col("p.settlement_date") == col("g.settlement_date")), 
          "left") \
    .join(df_weather, 
          (col("p.datetime_aest") == col("w.datetime_aest")) & 
          (col("p.settlement_date") == col("w.settlement_date")), 
          "left") \
    .select(
        lit("VIC1").alias("region_code"),
        lit("Victoria (Melbourne)").alias("region_name"),
        col("p.datetime_aest"),
        col("p.settlement_date"),
        hour("p.datetime_aest").alias("hour_of_day"),
        col("p.price"),
        col("p.price_tier"),
        coalesce(col("g.total_demand"), col("p.total_demand")).alias("demand_mw"),
        col("g.scheduled_generation"),
        col("g.semischeduled_generation"),
        col("g.total_generation"),
        col("g.renewable_pct"),
        col("g.battery_mwh"),
        col("w.avg_temp"),
        col("w.avg_apparent_temp"),
        col("w.avg_humidity"),
        col("w.avg_wind_speed"),
        col("w.total_precipitation")
    ) \
    .withColumn("is_peak", 
        when((col("hour_of_day").between(7, 9)) | (col("hour_of_day").between(17, 20)), True)
        .otherwise(False)
    )

print(f"✅ New records to add: {df_silver_energy.count()}")

if df_silver_energy.count() > 0:
    df_silver_energy.dropDuplicates(["datetime_aest"]) \
        .write.format("delta") \
        .mode("append") \
        .partitionBy("settlement_date") \
        .saveAsTable("silver_energy_melbourne")
    print("✅ Appended to silver_energy_melbourne")
    df_silver_energy.orderBy("datetime_aest").show(5, truncate=False)
else:
    print("⏭️  No new records to append")

📅 Last Melbourne timestamp: 2026-02-20 16:35:52.630667
✅ New records to add: 0
⏭️  No new records to append
