In [1]:
# 1. Install PySpark
!pip install pyspark findspark -q

# 2. Initialize Spark Session with MEMORY LIMITS
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, array
from pyspark.sql.types import StructType, StructField, FloatType, IntegerType, ArrayType

# Create a local Spark Session with explicit memory caps to prevent OOM
spark = SparkSession.builder \
    .appName("IoT_SmartHouse_Streaming") \
    .master("local[2]") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "2g") \
    .config("spark.sql.shuffle.partitions", "2") \
    .getOrCreate()

print("Spark Session Created (Memory Optimized)")


Spark Session Created (Memory Optimized)


In [2]:
# This code loads the trained appliance prediction models and the scaler used during training.
# The predict_appliances() function takes in temperature, time features, season, power, and encoded weather data.
# It arranges these inputs in the exact order expected by the scaler, applies scaling, and then builds the final
# feature vector for the models. Each of the five Random Forest models (one per appliance) then predicts whether
# its appliance is ON or OFF. The function is finally registered as a Spark UDF so it can be used in DataFrame
# operations to generate appliance state predictions.

import pickle
import numpy as np
import pandas as pd

# 1. Load Models AND Scaler
try:
    with open('appliance_models.pkl', 'rb') as f:
        rf_models = pickle.load(f)
    print("Models loaded")

    with open('scaler.pkl', 'rb') as f:
        scaler = pickle.load(f)
    print("Scaler loaded")

    # DEBUG: Print scaler feature names to confirm order
    if hasattr(scaler, 'feature_names_in_'):
        print(f"Scaler expects: {scaler.feature_names_in_}")
        SCALER_FEATURES = list(scaler.feature_names_in_)
    else:
        # Fallback if feature names not saved (older sklearn)
        print("Scaler has no feature names. Assuming default order.")
        SCALER_FEATURES = ['Outside_Temperature_C', 'day_of_week', 'day_of_month', 'Apparent Power', 'hour_sin', 'hour_cos', 'month_sin', 'month_cos']

except:
    print("Models or Scaler not found. Prediction will be wrong!")
    SCALER_FEATURES = []

# 2. Define UDF with Robust Scaling
def predict_appliances(temp, hour_sin, hour_cos, month_sin, month_cos,
                       day_of_week, season, time_of_day, day_of_month,
                       apparent_power, weather_encoded_list):
    try:
        # Check inputs
        if weather_encoded_list is None: return [-1] * 5

        # --- A. PREPARE INPUT DICTIONARY ---
        # Map all inputs to their names so we can pick them in the right order
        input_map = {
            'Outside_Temperature_C': float(temp),
            'day_of_week': float(day_of_week),
            'day_of_month': float(day_of_month),
            'Apparent Power': float(apparent_power),
            'hour_sin': float(hour_sin),
            'hour_cos': float(hour_cos),
            'month_sin': float(month_sin),
            'month_cos': float(month_cos)
        }

        # --- B. CONSTRUCT RAW ROW FOR SCALER ---
        # Pick values in the EXACT order the scaler learned
        raw_row = []
        for feature_name in SCALER_FEATURES:
            # If feature matches one of our inputs, use it. Else 0.
            val = input_map.get(feature_name, 0.0)
            raw_row.append(val)

        raw_nums = np.array([raw_row])

        # --- C. SCALE ---
        # This transforms raw values to scaled values
        scaled_nums = scaler.transform(raw_nums)[0]

        # --- D. MAP SCALED VALUES BACK ---
        # Create a map of scaled values
        scaled_map = dict(zip(SCALER_FEATURES, scaled_nums))

        # Extract specific scaled values we need for the model feature vector
        s_h_sin = scaled_map.get('hour_sin', hour_sin)
        s_h_cos = scaled_map.get('hour_cos', hour_cos)
        s_m_sin = scaled_map.get('month_sin', month_sin)
        s_m_cos = scaled_map.get('month_cos', month_cos)
        s_dow = scaled_map.get('day_of_week', day_of_week)
        s_dom = scaled_map.get('day_of_month', day_of_month)
        s_temp = scaled_map.get('Outside_Temperature_C', temp)
        s_power = scaled_map.get('Apparent Power', apparent_power)

        # --- E. CONSTRUCT MODEL FEATURE VECTOR ---
        # Order: [hour_sin, hour_cos, month_sin, month_cos, day_of_week, season,
        #         time_of_day, day_of_month, Outside_Temperature_C, ...weather..., Apparent Power]

        features = [
            s_h_sin, s_h_cos, s_m_sin, s_m_cos,
            s_dow, season, time_of_day, s_dom,
            s_temp,
            *weather_encoded_list,
            s_power
        ]

        # --- F. PREDICT ---
        features_array = np.array(features).reshape(1, -1)
        results = []
        for device in ['Television', 'Dryer', 'Oven', 'Refrigerator', 'Microwave']:
            model = rf_models[device]
            pred = model.predict(features_array)[0]
            results.append(int(pred))

        return results

    except Exception as e:
        # print(str(e)) # Debug if needed
        return [-1] * 5

# Re-register UDF
predict_udf = udf(predict_appliances, ArrayType(IntegerType()))
print("UDF updated with ROBUST SCALING")

Models loaded
Scaler loaded
Scaler expects: ['Outside_Temperature_C' 'hour_sin' 'hour_cos' 'month_sin' 'month_cos'
 'day_of_week' 'day_of_month' 'Apparent Power']
UDF updated with ROBUST SCALING


In [3]:
# This script simulates a real-time data stream by repeatedly sampling small batches of rows
# from a large smart-home dataset. For each batch, it recreates all feature engineering steps
# used during model training, including cyclical time features, season/time-of-day encoding,
# and one-hot weather encoding across 10 fixed weather columns. It then outputs each processed
# batch as a CSV file into a streaming directory every few seconds, allowing Spark Structured
# Streaming to read the data as if it were arriving in real time.

import time
import os
import shutil
import pandas as pd
from threading import Thread

# SETUP
SOURCE_CSV_PATH = 'smart_home_dataset_with_weather.csv' # UPDATE THIS
input_dir = "/content/streaming_input"

if os.path.exists(input_dir):
    shutil.rmtree(input_dir)
os.makedirs(input_dir)

def stream_data_generator():
    print(f"Simulator started using: {SOURCE_CSV_PATH}")

    # Load Data
    try:
        full_df = pd.read_csv(SOURCE_CSV_PATH)
    except:
        print("Error: CSV not found. Check path.")
        return

    # THESE ARE THE 10 WEATHER COLUMNS YOUR MODEL EXPECTS
    required_weather_cols = [
        'weather_clear', 'weather_cloudy', 'weather_foggy', 'weather_overcast',
        'weather_partly_cloudy', 'weather_rainy', 'weather_snowy', 'weather_sunny',
        'weather_thunderstorm', 'weather_windy'
    ]

    batch_id = 0
    while True:
        try:
            # Sample Data
            raw_batch = full_df.sample(np.random.randint(5, 15)).copy()

            # --- FEATURE ENGINEERING (Must match training exactly) ---
            raw_batch['timestamp'] = pd.to_datetime(raw_batch['Unix Timestamp'], unit='s')
            raw_batch['hour'] = raw_batch['timestamp'].dt.hour
            raw_batch['month'] = raw_batch['timestamp'].dt.month

            # 1. Cyclical
            raw_batch['hour_sin'] = np.sin(2 * np.pi * raw_batch['hour'] / 24)
            raw_batch['hour_cos'] = np.cos(2 * np.pi * raw_batch['hour'] / 24)
            raw_batch['month_sin'] = np.sin(2 * np.pi * raw_batch['month'] / 12)
            raw_batch['month_cos'] = np.cos(2 * np.pi * raw_batch['month'] / 12)

            # 2. Categorical
            raw_batch['day_of_week'] = raw_batch['timestamp'].dt.dayofweek
            raw_batch['day_of_month'] = raw_batch['timestamp'].dt.day

            # Season mapping
            raw_batch['season'] = raw_batch['month'].apply(lambda m: 0 if m in [12,1,2] else 1 if m in [3,4,5] else 2 if m in [6,7,8] else 3)

            # Time of Day mapping
            raw_batch['time_of_day'] = raw_batch['hour'].apply(lambda h: 0 if 5<=h<12 else 1 if 12<=h<17 else 2 if 17<=h<21 else 3)

            # 3. Weather Encoding (The Critical Fix)
            # Create all 10 columns, initialized to 0
            for col in required_weather_cols:
                weather_type = col.replace('weather_', '')
                # If the raw weather type matches, set to 1, else 0
                raw_batch[col] = (raw_batch['Weather_Type'] == weather_type).astype(int)

            # 4. Select Columns in EXACT ORDER
            final_cols = [
                'hour_sin', 'hour_cos', 'month_sin', 'month_cos',
                'day_of_week', 'season', 'time_of_day', 'day_of_month',
                'Outside_Temperature_C'
            ] + required_weather_cols + ['Apparent Power']

            output_df = raw_batch[final_cols]

            # Write batch
            output_df.to_csv(f"{input_dir}/batch_{batch_id}.csv", index=False)
            batch_id += 1
            time.sleep(3) # Slow down to prevent OOM

        except Exception as e:
            print(f"Simulator Error: {e}")
            time.sleep(1)

# Start Simulator
t = Thread(target=stream_data_generator)
t.daemon = True
t.start()
print("Simulator running...")


Simulator started using: smart_home_dataset_with_weather.csv
Simulator running...


In [4]:
# This code defines the schema for incoming streaming batches so it matches the exact
# column structure produced by the simulator. It then reads CSV files as a Spark
# Structured Stream, builds a weather feature array, and applies the custom prediction
# UDF to generate ON/OFF predictions for five appliances. Finally, it selects the key
# fields and starts an in-memory streaming query so predictions can be viewed
# continuously as new batches arrive.

# 1. Define Columns matching the simulator output
weather_cols_names = [
    'weather_clear', 'weather_cloudy', 'weather_foggy', 'weather_overcast',
    'weather_partly_cloudy', 'weather_rainy', 'weather_snowy', 'weather_sunny',
    'weather_thunderstorm', 'weather_windy'
]

stream_cols = [
    'hour_sin', 'hour_cos', 'month_sin', 'month_cos',
    'day_of_week', 'season', 'time_of_day', 'day_of_month',
    'Outside_Temperature_C'
] + weather_cols_names + ['Apparent Power']

# 2. Create Schema
schema = StructType([StructField(c, FloatType(), True) for c in stream_cols])

# 3. Read Stream
iot_stream = spark.readStream \
    .option("header", "true") \
    .schema(schema) \
    .csv(input_dir)

# 4. Transform & Predict
weather_struct = [col(c) for c in weather_cols_names]

predictions = iot_stream.withColumn(
    "weather_list",
    array(weather_struct)
).withColumn(
    "pred",
    predict_udf(
        col("Outside_Temperature_C"),
        col("hour_sin"), col("hour_cos"),
        col("month_sin"), col("month_cos"),
        col("day_of_week"), col("season"),
        col("time_of_day"), col("day_of_month"),
        col("Apparent Power"),
        col("weather_list")
    )
)

# 5. Select Output
final_stream = predictions.select(
    col("Outside_Temperature_C").alias("Temp"),
    col("Apparent Power").alias("Power"),
    col("pred")[0].alias("TV"),
    col("pred")[1].alias("Dryer"),
    col("pred")[2].alias("Oven"),
    col("pred")[3].alias("Fridge"),
    col("pred")[4].alias("Micro")
)

# 6. Start Query
# Stop existing queries to free memory
for q in spark.streams.active:
    q.stop()

query = final_stream.writeStream \
    .queryName("iot_predictions") \
    .format("memory") \
    .outputMode("append") \
    .start()

print("Streaming started.")


Streaming started.


In [5]:
import time
from IPython.display import clear_output, display
import pandas as pd

start = time.time()
last_count = 0

print("Monitoring started... (Updates every 3s)")

while time.time() - start < 120:  # Run for 2 minutes
    if spark.catalog.tableExists("iot_predictions"):
        # Get count to see if data is growing
        count_df = spark.sql("SELECT count(*) as cnt FROM iot_predictions").toPandas()
        current_count = count_df['cnt'][0]

        # Get latest 10 rows
        df = spark.sql("SELECT * FROM iot_predictions")
        pdf = df.toPandas()

        if current_count > last_count:
            clear_output(wait=True)
            print(f"NEW BATCH RECEIVED! Total Rows: {current_count} (+{current_count - last_count})")
            print(f"Time Elapsed: {int(time.time() - start)}s")

            # Show the last 10 rows (The newest batch)
            display(pdf.tail(10))

            last_count = current_count
        else:
            # Optional: Print dots to show it's alive
            print(".", end="", flush=True)

    time.sleep(3)

print("\n Monitoring stopped.")


NEW BATCH RECEIVED! Total Rows: 358 (+122)
Time Elapsed: 132s


Unnamed: 0,Temp,Power,TV,Dryer,Oven,Fridge,Micro
348,10.3,2284.239014,1,1,1,1,1
349,14.2,1925.584717,0,0,0,1,0
350,15.8,1873.350586,0,0,0,1,0
351,14.5,1799.465942,0,0,0,0,0
352,27.9,1898.415283,0,0,0,1,0
353,27.0,1530.0,0,0,0,0,0
354,3.1,2193.30957,1,1,1,1,1
355,18.799999,1631.234009,0,0,1,0,0
356,17.200001,1696.806763,0,0,0,0,0
357,16.6,1548.4104,0,1,0,0,1



 Monitoring stopped.


In [6]:
query.stop()