## ***Installations***

In [None]:
!pip install pyspark b2sdk xgboost



## ***Imports***

In [None]:
import pandas as pd

from sklearn.preprocessing import LabelEncoder,StandardScaler

from b2sdk.v2 import InMemoryAccountInfo, B2Api

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    to_timestamp, hour, dayofweek, when, col,lag, avg, percent_rank, stddev, log1p, concat, lit )
from pyspark.sql.window import Window
from pyspark.sql.functions import sum as spark_sum
from pyspark.sql.functions import max as spark_max

from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator

from functools import reduce



## ***Precprocessing***

### Spark Session

In [None]:
spark = SparkSession.builder \
    .appName("SmartEnergyOptimization") \
    .getOrCreate()

spark


### Connection with B2 Bucket

In [None]:
info = InMemoryAccountInfo()
b2_api = B2Api(info)

b2_api.authorize_account(
    "production",
    application_key_id="b0d2c1c0c4fa",
    application_key="006bd1352274cc6e314e349e52f8c69136b2a6e2d2"
)

bucket = b2_api.get_bucket_by_name("smartenergydataenergyanalyticslakeenergyproject")

downloaded_file = bucket.download_file_by_name(
    "raw/generated_energy_data.csv"
)

downloaded_file.save_to("energy_data.csv")


### Loading data

In [None]:
df = spark.read.csv(
    "energy_data.csv",
    header=True,
    inferSchema=True
)

df.show(5)


+--------------------+-------------------+-------+-----------+---------+-----------+------------+--------------+------+-----------+-------+-----------+----------+
|            event_id|          timestamp|room_id|  room_name|occupancy|temperature|appliance_id|appliance_name|status|power_watts|voltage|current_amp|is_anomaly|
+--------------------+-------------------+-------+-----------+---------+-----------+------------+--------------+------+-----------+-------+-----------+----------+
|4165458f-4f64-4f7...|2025-12-07 21:21:22|   R001|Living Room|        2|      28.92|        A001|         Light|    on|      10.47|    210|       0.05|     false|
|1ac30cb4-b918-407...|2025-12-07 21:21:22|   R001|Living Room|        2|      28.92|        A002|           Fan|    on|      38.68|    212|      0.182|     false|
|58278ecd-9420-454...|2025-12-07 21:21:22|   R001|Living Room|        2|      28.92|        A003|            TV|   off|        0.0|    180|        0.0|      true|
|ac73db8b-1d77-4b4...|

### Data Analysis

In [None]:
df.printSchema()

root
 |-- event_id: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- room_id: string (nullable = true)
 |-- room_name: string (nullable = true)
 |-- occupancy: integer (nullable = true)
 |-- temperature: double (nullable = true)
 |-- appliance_id: string (nullable = true)
 |-- appliance_name: string (nullable = true)
 |-- status: string (nullable = true)
 |-- power_watts: double (nullable = true)
 |-- voltage: integer (nullable = true)
 |-- current_amp: double (nullable = true)
 |-- is_anomaly: boolean (nullable = true)



### Feature engineering

Time + Categorical Features

In [None]:
df = df.withColumn("timestamp", to_timestamp("timestamp")) \
       .withColumn("hour", hour("timestamp")) \
       .withColumn("day_of_week", dayofweek("timestamp")) \
       .withColumn(
           "is_weekend",
           when(col("day_of_week").isin([1, 7]), 1).otherwise(0)
       )\
       .withColumn(
    "status_encoded",
    when(col("status") == "on", 1).otherwise(0)
)


Appliance-Level Window

In [None]:
appliance_window = (
    Window
    .partitionBy("room_id", "appliance_id")
    .orderBy("timestamp")
)

# Lag features
df = (
    df.withColumn("lag_1", lag("power_watts", 1).over(appliance_window))
      .withColumn("lag_3", lag("power_watts", 3).over(appliance_window))
      .withColumn("lag_24", lag("power_watts", 24).over(appliance_window))
)

# Rolling averages (past only)
rolling_3  = appliance_window.rowsBetween(-3, -1)
rolling_24 = appliance_window.rowsBetween(-24, -1)

df = (
    df.withColumn("rolling_mean_3", avg("power_watts").over(rolling_3))
      .withColumn("rolling_mean_24", avg("power_watts").over(rolling_24))
)

NULL Handling

In [None]:
df = df.fillna({
    "lag_1": 0,
    "lag_3": 0,
    "lag_24": 0,
    "rolling_mean_3": 0,
    "rolling_mean_24": 0
})



In [None]:
df.schema

StructType([StructField('event_id', StringType(), True), StructField('timestamp', TimestampType(), True), StructField('room_id', StringType(), True), StructField('room_name', StringType(), True), StructField('occupancy', IntegerType(), True), StructField('temperature', DoubleType(), True), StructField('appliance_id', StringType(), True), StructField('appliance_name', StringType(), True), StructField('status', StringType(), True), StructField('power_watts', DoubleType(), True), StructField('voltage', IntegerType(), True), StructField('current_amp', DoubleType(), True), StructField('is_anomaly', BooleanType(), True), StructField('hour', IntegerType(), True), StructField('day_of_week', IntegerType(), True), StructField('is_weekend', IntegerType(), False), StructField('status_encoded', IntegerType(), False), StructField('lag_1', DoubleType(), False), StructField('lag_3', DoubleType(), False), StructField('lag_24', DoubleType(), False), StructField('rolling_mean_3', DoubleType(), False), St

In [None]:
df.show()

+--------------------+-------------------+-------+-----------+---------+-----------+------------+--------------+------+-----------+-------+-----------+----------+----+-----------+----------+--------------+-----+-----+------+------------------+------------------+
|            event_id|          timestamp|room_id|  room_name|occupancy|temperature|appliance_id|appliance_name|status|power_watts|voltage|current_amp|is_anomaly|hour|day_of_week|is_weekend|status_encoded|lag_1|lag_3|lag_24|    rolling_mean_3|   rolling_mean_24|
+--------------------+-------------------+-------+-----------+---------+-----------+------------+--------------+------+-----------+-------+-----------+----------+----+-----------+----------+--------------+-----+-----+------+------------------+------------------+
|4165458f-4f64-4f7...|2025-12-07 21:21:22|   R001|Living Room|        2|      28.92|        A001|         Light|    on|      10.47|    210|       0.05|     false|  21|          1|         1|             1|  0.0|

Vector Assembler (APPLIANCE LEVEL)

In [None]:
feature_cols = [
    "hour", "day_of_week", "is_weekend",
    "occupancy", "temperature", "status_encoded",
    "voltage", "current_amp",
    "lag_1", "lag_3", "lag_24",
    "rolling_mean_3", "rolling_mean_24"
]

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)

df = assembler.transform(df)


## ***ML Modeling***

Time-Aware Train/Test Split

In [None]:
rank_window = (
    Window
    .partitionBy("room_id", "appliance_id")
    .orderBy("timestamp")
)

df = df.withColumn("time_rank", percent_rank().over(rank_window))

train_df = df.filter(col("time_rank") <= 0.8)
test_df  = df.filter(col("time_rank") > 0.8)

train_df = train_df.drop("time_rank")
test_df  = test_df.drop("time_rank")


NameError: name 'Window' is not defined

Train Appliance-Level Model

In [None]:
gbt = GBTRegressor(
    labelCol="power_watts",
    featuresCol="features",
    maxDepth=5,
    maxIter=50
)

model = gbt.fit(train_df)


Predict (Appliance Level)

In [None]:
predictions = model.transform(test_df)

predictions.select(
    "timestamp", "room_id", "room_name",
    "appliance_id", "appliance_name",
    "power_watts", "prediction"
).show()


In [None]:
predictions.orderBy(col("timestamp").desc()).show(10)


In [None]:
predictions.filter(col("appliance_id") == "A004") \
           .orderBy(col("timestamp").desc()) \
           .show(10)


## ***Evaluate the model***

In [None]:
evaluator = RegressionEvaluator(
    labelCol="power_watts",
    predictionCol="prediction",
    metricName="rmse"
)

rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")

evaluator.setMetricName("r2")
r2 = evaluator.evaluate(predictions)
print(f"R Squared (R2) on test data = {r2}")

evaluator.setMetricName("mae")
mae = evaluator.evaluate(predictions)
print(f"Mean Absolute Error (MAE) on test data = {mae}")


Root Mean Squared Error (RMSE) on test data = 40.918696843123676
R Squared (R2) on test data = 0.9742056329244109
Mean Absolute Error (MAE) on test data = 8.657503216979373


## ***ROOM-LEVEL PIPELINE***

In [None]:
base_df = df.select(
    "event_id", "timestamp", "room_id", "room_name",
    "occupancy", "temperature",
    "appliance_id", "appliance_name",
    "status", "power_watts",
    "voltage", "current_amp"
)


Room Aggregation

In [None]:
room_df = (
    base_df
    .groupBy("room_id", "room_name", "timestamp")
    .agg(
        spark_sum("power_watts").alias("room_power_watts"),
        avg("temperature").alias("temperature"),
        avg("occupancy").alias("occupancy")
    )
)


Room Time Features

In [None]:
room_df = (
    room_df.withColumn("hour", hour("timestamp"))
           .withColumn("day_of_week", dayofweek("timestamp"))
           .withColumn(
               "is_weekend",
               when(col("day_of_week").isin([1, 7]), 1).otherwise(0)
           )
)

Room Lag Features

In [None]:
room_window = Window.partitionBy("room_id").orderBy("timestamp")

room_df = (
    room_df
    .withColumn("lag_1", lag("room_power_watts", 1).over(room_window))
    .withColumn("lag_3", lag("room_power_watts", 3).over(room_window))
)
rolling_3 = room_window.rowsBetween(-3, -1)

room_df = room_df.withColumn(
    "rolling_mean_3",
    avg("room_power_watts").over(rolling_3)
)



In [None]:
room_list = room_df.select("room_id", "room_name").distinct().toLocalIterator()


In [None]:
room_df = room_df.fillna({
    "lag_1": 0,
    "lag_3": 0,
    "rolling_mean_3": 0,
    "hour": 0,
    "day_of_week": 0,
    "is_weekend": 0,
    "temperature": 0,
    "occupancy": 0
})


Cache reused DataFrames

In [None]:
room_df = room_df.cache()
room_df.count()


1800

In [None]:
room_df = room_df.withColumn(
    "room_power_watts_log",
    log1p("room_power_watts")
)

Room VectorAssembler

In [None]:
room_features = [
    "hour", "day_of_week", "is_weekend",
    "temperature", "occupancy",
    "lag_1", "lag_3", "rolling_mean_3"
]

assembler = VectorAssembler(
    inputCols=room_features,
    outputCol="features"
)

room_df = assembler.transform(room_df)


In [None]:
train_df.schema

StructType([StructField('event_id', StringType(), True), StructField('timestamp', TimestampType(), True), StructField('room_id', StringType(), True), StructField('room_name', StringType(), True), StructField('occupancy', IntegerType(), True), StructField('temperature', DoubleType(), True), StructField('appliance_id', StringType(), True), StructField('appliance_name', StringType(), True), StructField('status', StringType(), True), StructField('power_watts', DoubleType(), True), StructField('voltage', IntegerType(), True), StructField('current_amp', DoubleType(), True), StructField('is_anomaly', BooleanType(), True), StructField('hour', IntegerType(), True), StructField('day_of_week', IntegerType(), True), StructField('is_weekend', IntegerType(), False), StructField('status_encoded', IntegerType(), False), StructField('lag_1', DoubleType(), False), StructField('lag_3', DoubleType(), False), StructField('lag_24', DoubleType(), False), StructField('rolling_mean_3', DoubleType(), False), St

Room Model Training

In [None]:
results = []

for row in room_list:
    room_id = row["room_id"]
    room_name = row["room_name"]

    print(f"\nTraining model for room: {room_name}")

    # Filter room data
    rdf = room_df.filter(col("room_id") == room_id)

    # Time-aware split
    time_window = Window.orderBy("timestamp")
    rdf = rdf.withColumn("time_rank", percent_rank().over(time_window))


    train_df = rdf.filter(col("time_rank") <= 0.8)
    test_df  = rdf.filter(col("time_rank") > 0.8)

    train_df = train_df.drop("time_rank")
    test_df  = test_df.drop("time_rank")

    # Model
    gbt = GBTRegressor(
        labelCol="room_power_watts_log",
        featuresCol="features",
        maxDepth=4,
        maxIter=80
    )


    model = gbt.fit(train_df)

    # Prediction
    preds = model.transform(test_df)

    # Evaluation
    rmse = RegressionEvaluator(
        labelCol="room_power_watts",
        predictionCol="prediction",
        metricName="rmse"
    ).evaluate(preds)

    r2 = RegressionEvaluator(
        labelCol="room_power_watts",
        predictionCol="prediction",
        metricName="r2"
    ).evaluate(preds)

    mae = RegressionEvaluator(
        labelCol="room_power_watts",
        predictionCol="prediction",
        metricName="mae"
    ).evaluate(preds)

    results.append({
        "room_id": room_id,
        "room_name": room_name,
        "RMSE": rmse,
        "R2": r2,
        "MAE": mae
    })

    print(f"RMSE={rmse:.2f}, R2={r2:.3f}, MAE={mae:.2f}")


Training model for room: Bedroom 2
RMSE=596.25, R2=-0.334, MAE=300.34

Training model for room: Bedroom 1
RMSE=556.91, R2=-0.314, MAE=275.21

Training model for room: Living Room
RMSE=780.32, R2=-0.729, MAE=508.36

Training model for room: Study Room
RMSE=89.93, R2=-0.296, MAE=46.15

Training model for room: Kitchen
RMSE=72.63, R2=-0.947, MAE=52.23

Training model for room: Bathroom
RMSE=65.71, R2=-0.930, MAE=47.82


Store results as Spark DataFrame

In [None]:
results_df = spark.createDataFrame(results)
results_df.coalesce(1).write.mode("overwrite").csv("room_model_results")



## Alerts And KPIs

In [None]:
room_df = (
    room_df
    .withColumn("rolling_mean_24", avg("room_power_watts").over(room_window))
    .withColumn("rolling_std_24", stddev("room_power_watts").over(room_window))
)

room_df = room_df.withColumn(
    "is_spike",
    when(
        room_df.room_power_watts >
        room_df.rolling_mean_24 + 3 * room_df.rolling_std_24,
        1
    ).otherwise(0)
)

In [None]:
room_df = room_df.withColumn(
    "inefficient_usage",
    when(
        (room_df.occupancy == 0) &
        (room_df.room_power_watts > room_df.rolling_mean_24),
        1
    ).otherwise(0)
)

In [None]:
room_df = room_df.withColumn(
    "alert_level",
    when((room_df.is_spike == 1) & (room_df.inefficient_usage == 1), "CRITICAL")
    .when(room_df.is_spike == 1, "WARNING")
    .otherwise("NORMAL")
)

In [None]:
room_df = room_df.withColumn(
    "alert_message",
    when(
        (col("alert_level") == "CRITICAL"),
        concat(
            lit("CRITICAL: High power spike detected in "),
            col("room_name"),
            lit(" at "),
            col("timestamp"),
            lit(". Power usage: "),
            col("room_power_watts").cast("string"),
            lit(" W with no occupancy.")
        )
    ).when(
        (col("alert_level") == "WARNING"),
        concat(
            lit("WARNING: Elevated power usage in "),
            col("room_name"),
            lit(" at "),
            col("timestamp"),
            lit(". Power usage: "),
            col("room_power_watts").cast("string"),
            lit(" W.")
        )
    ).otherwise(
        lit("NORMAL: Energy usage within expected range.")
    )
)

In [None]:
room_df.show()

+-------+----------+-------------------+----------------+-----------+---------+----+-----------+----------+-----+-----+--------------+--------------------+--------------------+------------------+------------------+--------+-----------------+-----------+--------------------+
|room_id| room_name|          timestamp|room_power_watts|temperature|occupancy|hour|day_of_week|is_weekend|lag_1|lag_3|rolling_mean_3|room_power_watts_log|            features|   rolling_mean_24|    rolling_std_24|is_spike|inefficient_usage|alert_level|       alert_message|
+-------+----------+-------------------+----------------+-----------+---------+----+-----------+----------+-----+-----+--------------+--------------------+--------------------+------------------+------------------+--------+-----------------+-----------+--------------------+
|   R005|Study Room|2025-12-07 21:21:22|             0.0|      26.66|      0.0|  21|          1|         1|  0.0|  0.0|           0.0|                 0.0|(8,[0,1,2,3],[21....

### *KPIs*

In [None]:
energy_kpi = (
    room_df
    .withColumn("energy_kwh", col("room_power_watts") / 1000)
    .groupBy("room_id", "room_name")
    .agg(
        spark_sum("energy_kwh").alias("total_energy_kwh")
    )
)

In [None]:
power_kpi = (
    room_df
    .groupBy("room_id", "room_name")
    .agg(
        avg("room_power_watts").alias("avg_power_watts"),
        spark_max("room_power_watts").alias("peak_power_watts")
    )
)

In [None]:
spike_kpi = (
    room_df
    .groupBy("room_id", "room_name")
    .agg(
        spark_sum("is_spike").alias("spike_count")
    )
)


In [None]:
from pyspark.sql.functions import count

alert_kpi = (
    room_df
    .groupBy("room_id", "room_name", "alert_level")
    .agg(
        count("*").alias("alert_count")
    )
)


In [None]:
inefficiency_kpi = (
    room_df
    .groupBy("room_id", "room_name")
    .agg(
        spark_sum("inefficient_usage").alias("inefficient_events")
    )
)


In [None]:
waste_kpi = (
    room_df
    .withColumn(
        "wasted_energy_kwh",
        when(col("inefficient_usage") == 1, col("room_power_watts") / 1000)
        .otherwise(0)
    )
    .groupBy("room_id", "room_name")
    .agg(
        spark_sum("wasted_energy_kwh").alias("wasted_energy_kwh")
    )
)


In [None]:
COST_PER_KWH = 6.0
cost_kpi = (
    room_df
    .withColumn("energy_kwh", col("room_power_watts") / 1000)
    .withColumn("cost", col("energy_kwh") * COST_PER_KWH)
    .groupBy("room_id", "room_name")
    .agg(
        spark_sum("cost").alias("total_cost_rs")
    )
)


In [None]:
waste_cost_kpi = (
    room_df
    .withColumn(
        "waste_cost",
        when(col("inefficient_usage") == 1,
             (col("room_power_watts") / 1000) * COST_PER_KWH)
        .otherwise(0)
    )
    .groupBy("room_id", "room_name")
    .agg(
        spark_sum("waste_cost").alias("wasted_cost_rs")
    )
)


In [None]:
kpi_dfs = [
    energy_kpi,
    power_kpi,
    spike_kpi,
    inefficiency_kpi,
    waste_kpi,
    cost_kpi,
    waste_cost_kpi
]

final_kpi_df = reduce(
    lambda left, right: left.join(right, ["room_id", "room_name"], "left"),
    kpi_dfs
)

final_kpi_df.show(truncate=False)

+-------+-----------+------------------+------------------+------------------+-----------+------------------+-----------------+------------------+------------------+
|room_id|room_name  |total_energy_kwh  |avg_power_watts   |peak_power_watts  |spike_count|inefficient_events|wasted_energy_kwh|total_cost_rs     |wasted_cost_rs    |
+-------+-----------+------------------+------------------+------------------+-----------+------------------+-----------------+------------------+------------------+
|R005   |Study Room |15.298780000000004|50.99593333333332 |248.77            |5          |2                 |0.03127          |91.79268000000002 |0.18762           |
|R001   |Living Room|175.05166999999992|583.5055666666666 |2244.47           |0          |45                |51.78943000000002|1050.3100200000003|310.73657999999995|
|R004   |Kitchen    |16.396879999999992|54.65626666666667 |192.29000000000002|0          |0                 |0.0              |98.38127999999993 |0.0               |
|R00

In [None]:
latest_time = room_df.select(spark_max("timestamp")).collect()[0][0]

future_input_df = room_df.filter(col("timestamp") == latest_time)

future_preds = model.transform(future_input_df)

future_preds.select(
    "room_id",
    "room_name",
    "timestamp",
    col("prediction").alias("predicted_power_watts")
)

DataFrame[room_id: string, room_name: string, timestamp: timestamp, predicted_power_watts: double]

In [None]:
expected_df = room_df.select(
    "room_id",
    "room_name",
    "timestamp",
    col("room_power_watts").alias("expected_power_watts")
)


In [None]:
energy_df = expected_df.withColumn(
    "energy_kwh",
    col("expected_power_watts") / 1000
)

In [None]:
weekly_expected = (
    energy_df
    .groupBy("room_id", "room_name")
    .agg(
        spark_sum("energy_kwh").alias("weekly_expected_kwh")
    )
)


In [None]:
monthly_expected = (
    energy_df
    .groupBy("room_id", "room_name")
    .agg(
        spark_sum("energy_kwh").alias("monthly_expected_kwh")
    )
)


In [None]:
actual_energy_df = room_df.withColumn(
    "actual_energy_kwh",
    col("room_power_watts") / 1000
)


In [None]:
weekly_actual = (
    actual_energy_df
    .groupBy("room_id", "room_name")
    .agg(
        spark_sum("actual_energy_kwh").alias("weekly_actual_kwh")
    )
)


In [None]:
monthly_actual = (
    actual_energy_df
    .groupBy("room_id", "room_name")
    .agg(
        spark_sum("actual_energy_kwh").alias("monthly_actual_kwh")
    )
)


In [218]:
weekly_compare = (
    weekly_expected
    .join(weekly_actual, ["room_id", "room_name"])
    .withColumn(
        "weekly_deviation_kwh",
        col("weekly_actual_kwh") - col("weekly_expected_kwh")
    )
)


In [217]:
alerts_high_power = predictions.withColumn(
    "high_power_alert",
    when(
        col("prediction") > col("rolling_mean_24") * 1.5,
        1
    ).otherwise(0)
)


In [219]:
alerts_high_power.filter(col("high_power_alert") == 1) \
    .select("timestamp", "room_name", "appliance_name",
            "prediction", "rolling_mean_24") \
    .show()


+-------------------+-----------+--------------+------------------+------------------+
|          timestamp|  room_name|appliance_name|        prediction|   rolling_mean_24|
+-------------------+-----------+--------------+------------------+------------------+
|2025-12-12 21:21:22|Living Room|         Light| 32.70536656076007|11.910000000000002|
|2025-12-12 21:51:22|Living Room|         Light|20.759550239942968|13.367500000000001|
|2025-12-12 22:21:22|Living Room|         Light|  39.9296505516821|14.250416666666668|
|2025-12-12 22:51:22|Living Room|         Light| 32.38083600953069|15.870833333333335|
|2025-12-13 01:21:22|Living Room|         Light|  32.7296052312386|15.896666666666668|
|2025-12-13 03:21:22|Living Room|         Light|25.483339293249713|14.576250000000002|
|2025-12-13 07:21:22|Living Room|         Light|20.583493343661797|13.578333333333333|
|2025-12-13 10:51:22|Living Room|         Light| 21.04445573822339| 8.553749999999999|
|2025-12-13 11:21:22|Living Room|         L

In [220]:
waste_alerts = df.withColumn(
    "energy_waste_alert",
    when(
        (col("occupancy") == 0) & (col("power_watts") > 5),
        1
    ).otherwise(0)
)


In [221]:
waste_alerts.filter(col("energy_waste_alert") == 1) \
    .select("timestamp", "room_name", "appliance_name", "power_watts") \
    .show()


+-------------------+-----------+--------------+-----------+
|          timestamp|  room_name|appliance_name|power_watts|
+-------------------+-----------+--------------+-----------+
|2025-12-07 21:21:22|  Bedroom 1|           Fan|      64.99|
|2025-12-07 21:51:22|  Bedroom 1|           Fan|       64.7|
|2025-12-08 00:51:22|Living Room|           Fan|      36.29|
|2025-12-08 01:51:22|Living Room|           Fan|       37.7|
|2025-12-08 03:51:22| Study Room|         Light|      12.21|
|2025-12-08 04:21:22|Living Room|         Light|      14.92|
|2025-12-08 04:21:22|Living Room|           Fan|      44.26|
|2025-12-08 06:21:22|Living Room|           Fan|      51.88|
|2025-12-08 06:21:22|    Kitchen|         Light|       13.4|
|2025-12-08 06:51:22|Living Room|           Fan|      57.81|
|2025-12-08 06:51:22|Living Room|            AC|    1000.32|
|2025-12-08 07:21:22|Living Room|         Light|       5.52|
|2025-12-08 07:21:22|Living Room|           Fan|      47.36|
|2025-12-08 07:21:22|Liv

In [222]:
inefficiency_alerts = df.withColumn(
    "inefficiency_alert",
    when(
        col("rolling_mean_24") > col("rolling_mean_3") * 1.2,
        1
    ).otherwise(0)
)


In [224]:
final_alerts = predictions.select(
    "timestamp",
    "room_id",
    "room_name",
    "appliance_id",
    "appliance_name",
    "power_watts",
    "prediction",
    "rolling_mean_24",
    "is_anomaly"
).withColumn(
    "alert_type",
    when(col("prediction") > col("rolling_mean_24") * 1.5, "High Consumption")
    .when(col("is_anomaly") == True, "Anomaly Detected")
    .otherwise("Normal")
)


In [225]:
appliance_live_df = predictions.select(
    "timestamp",
    "room_id",
    "room_name",
    "appliance_id",
    "appliance_name",
    col("power_watts").alias("actual_power_watts"),
    col("prediction").alias("predicted_power_watts"),
).withColumn(
    "deviation_watts",
    col("predicted_power_watts") - col("actual_power_watts")
)

appliance_live_df.cache()
appliance_live_df.show(5)


+-------------------+-------+---------+------------+--------------+------------------+---------------------+-------------------+
|          timestamp|room_id|room_name|appliance_id|appliance_name|actual_power_watts|predicted_power_watts|    deviation_watts|
+-------------------+-------+---------+------------+--------------+------------------+---------------------+-------------------+
|2025-12-12 21:21:22|   R002|Bedroom 1|        A004|            AC|               0.0|  0.49877097066158954|0.49877097066158954|
|2025-12-12 21:51:22|   R002|Bedroom 1|        A004|            AC|               0.0|   1.4133260678900128| 1.4133260678900128|
|2025-12-12 22:21:22|   R002|Bedroom 1|        A004|            AC|               0.0|  -0.9736681023791852|-0.9736681023791852|
|2025-12-12 22:51:22|   R002|Bedroom 1|        A004|            AC|            971.48|    967.4466339550208| -4.033366044979175|
|2025-12-12 23:21:22|   R002|Bedroom 1|        A004|            AC|               0.0|   1.822233

In [226]:
room_live_df = room_df.select(
    "timestamp",
    "room_id",
    "room_name",
    "room_power_watts",
    "rolling_mean_24",
    "is_spike",
    "inefficient_usage",
    "alert_level",
    "alert_message"
)

room_live_df.cache()
room_live_df.show(5)


+-------------------+-------+----------+----------------+---------------+--------+-----------------+-----------+--------------------+
|          timestamp|room_id| room_name|room_power_watts|rolling_mean_24|is_spike|inefficient_usage|alert_level|       alert_message|
+-------------------+-------+----------+----------------+---------------+--------+-----------------+-----------+--------------------+
|2025-12-07 21:21:22|   R005|Study Room|             0.0|            0.0|       0|                0|     NORMAL|NORMAL: Energy us...|
|2025-12-07 21:51:22|   R005|Study Room|             0.0|            0.0|       0|                0|     NORMAL|NORMAL: Energy us...|
|2025-12-07 22:21:22|   R005|Study Room|             0.0|            0.0|       0|                0|     NORMAL|NORMAL: Energy us...|
|2025-12-07 22:51:22|   R005|Study Room|             0.0|            0.0|       0|                0|     NORMAL|NORMAL: Energy us...|
|2025-12-07 23:21:22|   R005|Study Room|             0.0|     

In [227]:
dashboard_kpi_df = final_kpi_df.select(
    "room_id",
    "room_name",
    "total_energy_kwh",
    "avg_power_watts",
    "peak_power_watts",
    "spike_count",
    "inefficient_events",
    "wasted_energy_kwh",
    col("total_cost_rs").alias("total_cost"),
    col("wasted_cost_rs").alias("wasted_cost")
)

dashboard_kpi_df.cache()
dashboard_kpi_df.show(truncate=False)


+-------+-----------+------------------+------------------+------------------+-----------+------------------+-----------------+------------------+------------------+
|room_id|room_name  |total_energy_kwh  |avg_power_watts   |peak_power_watts  |spike_count|inefficient_events|wasted_energy_kwh|total_cost        |wasted_cost       |
+-------+-----------+------------------+------------------+------------------+-----------+------------------+-----------------+------------------+------------------+
|R005   |Study Room |15.298780000000004|50.99593333333332 |248.77            |5          |2                 |0.03127          |91.79268000000002 |0.18762           |
|R001   |Living Room|175.05166999999992|583.5055666666666 |2244.47           |0          |45                |51.78943000000002|1050.3100200000003|310.73657999999995|
|R004   |Kitchen    |16.396879999999992|54.65626666666667 |192.29000000000002|0          |0                 |0.0              |98.38127999999993 |0.0               |
|R00

In [228]:
# Appliance alerts
appliance_alerts_df = final_alerts.select(
    "timestamp",
    lit("APPLIANCE").alias("entity_type"),
    concat(col("room_name"), lit(" - "), col("appliance_name")).alias("entity_name"),
    col("alert_type").alias("alert_level"),
    concat(
        lit("Predicted: "),
        col("prediction").cast("string"),
        lit(" W")
    ).alias("alert_message")
)

# Room alerts
room_alerts_df = room_live_df.filter(col("alert_level") != "NORMAL").select(
    "timestamp",
    lit("ROOM").alias("entity_type"),
    col("room_name").alias("entity_name"),
    "alert_level",
    "alert_message"
)

alert_feed_df = appliance_alerts_df.unionByName(room_alerts_df)

alert_feed_df.cache()
alert_feed_df.orderBy(col("timestamp").desc()).show(10, truncate=False)


+-------------------+-----------+---------------------+----------------+----------------------------------+
|timestamp          |entity_type|entity_name          |alert_level     |alert_message                     |
+-------------------+-----------+---------------------+----------------+----------------------------------+
|2025-12-14 02:51:22|APPLIANCE  |Bedroom 1 - AC       |Normal          |Predicted: 0.19352132700116814 W  |
|2025-12-14 02:51:22|APPLIANCE  |Kitchen - Light      |Normal          |Predicted: 0.13279285328437956 W  |
|2025-12-14 02:51:22|APPLIANCE  |Bedroom 2 - Light    |Normal          |Predicted: 19.999506856657657 W   |
|2025-12-14 02:51:22|APPLIANCE  |Living Room - Fan    |Normal          |Predicted: -0.11962509668853281 W |
|2025-12-14 02:51:22|APPLIANCE  |Study Room - Computer|Normal          |Predicted: -0.031038822860365144 W|
|2025-12-14 02:51:22|APPLIANCE  |Study Room - Light   |Normal          |Predicted: -0.3056537714265493 W  |
|2025-12-14 02:51:22|APPLIAN

Latest Snapshot DF

For dashboard refresh without history.

In [229]:
latest_ts = room_live_df.select(spark_max("timestamp")).collect()[0][0]

dashboard_snapshot_df = (
    room_live_df
    .filter(col("timestamp") == latest_ts)
    .select(
        "room_id",
        "room_name",
        "room_power_watts",
        "alert_level"
    )
)

dashboard_snapshot_df.show()


+-------+-----------+----------------+-----------+
|room_id|  room_name|room_power_watts|alert_level|
+-------+-----------+----------------+-----------+
|   R005| Study Room|             0.0|     NORMAL|
|   R001|Living Room|           28.34|     NORMAL|
|   R004|    Kitchen|             0.0|     NORMAL|
|   R003|  Bedroom 2|           20.07|     NORMAL|
|   R002|  Bedroom 1|           15.35|     NORMAL|
|   R006|   Bathroom|           80.07|     NORMAL|
+-------+-----------+----------------+-----------+



In [None]:
appliance_live_df.write.mode("overwrite").parquet("output/appliance_live")
room_live_df.write.mode("overwrite").parquet("output/room_live")
dashboard_kpi_df.write.mode("overwrite").parquet("output/kpi_summary")
alert_feed_df.write.mode("overwrite").parquet("output/alert_feed")


In [231]:
model.write().overwrite().save("models/appliance_gbt_model")


In [230]:
model.write().overwrite().save(f"models/room_gbt_model_{room_id}")
