In [23]:
import pyspark
import findspark
import os

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler

from pyspark.sql.functions import col, dayofweek, when, month, hour, lag, avg, sum as spark_sum
from pyspark.sql.window import Window
from pyspark.ml.feature import MinMaxScaler


##### Read parquet files

In [None]:
findspark.init()

# Create Spark session
spark = SparkSession.builder.appName("ParquetViewer").config("spark.driver.memory", "8g").config("spark.executor.memory", "4g").getOrCreate()

# Load Parquet file
repo_root = os.getcwd()  
parquet_path = os.path.join(repo_root, "merged_df1_df3_df7_df8")
df = spark.read.parquet(parquet_path)

df.show(5)
df.printSchema()

                                                                                

+----------+---------+--------------------+--------------------+--------------------+------------+--------------------+------------------+--------------------+--------------+-------------------+-----------+-----------------+--------+-------------------+----------+---------+--------+--------------------------+-----------------------+----------+----------+--------+---------------------------+----------------------+----------------------+-------+-------------------+-------------------+--------------+--------------+---------------+-------------------+-------------------+-------------------+--------------------+-------------------+----------------------+--------------------------+--------------------------+---------+----+
|       day|    LCLid|       energy_median|         energy_mean|          energy_max|energy_count|          energy_std|        energy_sum|          energy_min|temperatureMax| temperatureMaxTime|windBearing|             icon|dewPoint| temperatureMinTime|cloudCover|windSpeed

In [None]:
# from pyspark.sql.functions import col, sum

# # Count missing values in each column
# missing_values = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
# missing_values.show()



+---+-----+-------------+-----------+----------+------------+----------+----------+----------+--------------+------------------+-----------+-----+--------+------------------+----------+---------+--------+--------------------------+-----------------------+----------+----------+--------+---------------------------+----------------------+----------------------+-------+-----+----------+--------------+--------------+---------------+-----------+-------------------+-----------+-------+------------------+----------------------+--------------------------+--------------------------+---------+-------+
|day|LCLid|energy_median|energy_mean|energy_max|energy_count|energy_std|energy_sum|energy_min|temperatureMax|temperatureMaxTime|windBearing| icon|dewPoint|temperatureMinTime|cloudCover|windSpeed|pressure|apparentTemperatureMinTime|apparentTemperatureHigh|precipType|visibility|humidity|apparentTemperatureHighTime|apparentTemperatureLow|apparentTemperatureMax|uvIndex| time|sunsetTime|temperatureLow|te

                                                                                

### Feature engineering

In [25]:
# Time-based features
df = df.withColumn("day_of_week", dayofweek(col("day")))
df = df.withColumn("is_weekend", when((col("day_of_week") == 1) | (col("day_of_week") == 7), 1).otherwise(0))
df = df.withColumn("month", month(col("day")))
df = df.withColumn("hour", hour(col("time")))

# Lag feature (previous day's energy consumption)
window_spec = Window.partitionBy("LCLid").orderBy("day")
df = df.withColumn("lag_1_day", lag("energy_sum", 1).over(window_spec))

# Weather-based features
df = df.withColumn("temperature_variability", col("temperatureMax") - col("temperatureMin"))
df = df.withColumn("humidity_temp_interaction", col("humidity") * col("temperatureMax"))
df = df.withColumn("cloud_temp_interaction", col("cloudCover") * col("temperatureMax"))

# Rolling sum of precipitation over 7 days
df = df.withColumn("rolling_precipitation_7d", spark_sum("precipType").over(window_spec.rowsBetween(-6, 0)))

# Drop only rows where `energy_sum` is NULL
df = df.dropna(subset=["energy_sum"])

# Fill missing values instead of dropping
df = df.fillna({
    "lag_1_day": 0,  
    "temperature_variability": df.select(avg("temperatureMax") - avg("temperatureMin")).collect()[0][0],
    "humidity_temp_interaction": df.select(avg("humidity") * avg("temperatureMax")).collect()[0][0],
    "cloud_temp_interaction": df.select(avg("cloudCover") * avg("temperatureMax")).collect()[0][0],
    "rolling_precipitation_7d": 0  
})

# Drop columns with too many missing values
columns_to_drop = ["precipType", "summary", "Type"]
df = df.drop(*columns_to_drop)

# Verify that we still have rows
print(f"Dataset size after cleaning: {df.count()}")

# Assemble features
feature_cols = [
    "day_of_week", "is_weekend", "lag_1_day",
    "temperature_variability", "humidity_temp_interaction",
    "cloud_temp_interaction", "rolling_precipitation_7d"
]

if "features" in df.columns:
    df = df.drop("features")

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df = assembler.transform(df)

scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")
df = scaler.fit(df).transform(df)

                                                                                

Dataset size after cleaning: 3517002


                                                                                

##### Train test split

In [6]:
train_df, test_df = df.randomSplit([0.9, 0.1], seed=42)

print(f"Training set size: {train_df.count()}")
print(f"Test set size: {test_df.count()}")

if train_df.count() == 0:
    raise ValueError("Training dataset is empty. Check preprocessing!")

                                                                                

Training set size: 3165126


                                                                                

Test set size: 351876


                                                                                

##### Random forest model

In [19]:
import shutil
import glob

from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

rf = RandomForestRegressor(featuresCol="features", labelCol="energy_sum", numTrees=100)
rf_model = rf.fit(train_df)

predictions = rf_model.transform(test_df)
predictions = predictions.orderBy("day", "LCLid")

# save predictions in csv
repo_root = os.getcwd()  
output_folder = os.path.join(repo_root, "random_forest_output")  
os.makedirs(output_folder, exist_ok=True)
temp_output_folder = os.path.join(repo_root, "temp_output")
predictions.select("day", "LCLid", "energy_sum", "prediction").repartition(1).write.mode("overwrite").csv(temp_output_folder, header=True)
part_file = glob.glob(os.path.join(temp_output_folder, "part-*.csv"))[0]
final_output_path = os.path.join(output_folder, "random_forest_predictions.csv")
shutil.move(part_file, final_output_path)
shutil.rmtree(temp_output_folder)

evaluator = RegressionEvaluator(labelCol="energy_sum", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Test RMSE: {rmse}")
mean_energy_sum = df.select(avg("energy_sum")).collect()[0][0]
rmse_percentage = (4.62 / mean_energy_sum) * 100
print(f"RMSE as percentage of mean: {rmse_percentage:.2f}%")

                                                                                

Test RMSE: 4.7671459655217
RMSE as percentage of mean: 45.62%


##### XGBoost model

In [26]:
from xgboost.spark import SparkXGBRegressor

os.environ["DMLC_TRACKER_URI"] = "127.0.0.1"
os.environ["DMLC_TRACKER_PORT"] = "9091"
os.environ["DMLC_NUM_WORKER"] = "1"
os.environ["DMLC_NUM_SERVER"] = "1"

train_df_xgb, test_df_xgb = df.randomSplit([0.9, 0.1], seed=42)

In [29]:
xgb = SparkXGBRegressor(
    features_col="scaled_features",
    label_col="energy_sum",
    max_depth=10,
    eta=0.05,
    subsample=0.8,
    # num_round=150,
    # n_workers=1,  # Run in single-worker mode
    # use_external_storage=False
)

xgb_model = xgb.fit(train_df_xgb)

predictions = xgb_model.transform(test_df_xgb)
predictions = predictions.orderBy("day", "LCLid")

# save predictions in csv
repo_root = os.getcwd()  
output_folder = os.path.join(repo_root, "xgboost_output")  
os.makedirs(output_folder, exist_ok=True)
temp_output_folder = os.path.join(repo_root, "temp_output")
predictions.select("day", "LCLid", "energy_sum", "prediction").repartition(1).write.mode("overwrite").csv(temp_output_folder, header=True)
part_file = glob.glob(os.path.join(temp_output_folder, "part-*.csv"))[0]
final_output_path = os.path.join(output_folder, "xgboost_predictions.csv")
shutil.move(part_file, final_output_path)
shutil.rmtree(temp_output_folder)

evaluator = RegressionEvaluator(labelCol="energy_sum", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Test RMSE (XGBoost): {rmse}")
mean_energy_sum = df.select(avg("energy_sum")).collect()[0][0]
rmse_percentage = (4.62 / mean_energy_sum) * 100
print(f"RMSE as percentage of mean: {rmse_percentage:.2f}%")

INFO:XGBoost-PySpark:Running xgboost-2.1.4 on 1 workers with       (9 + 3) / 12]
	booster params: {'objective': 'reg:squarederror', 'device': 'cpu', 'max_depth': 10, 'subsample': 0.8, 'eta': 0.05, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
2025-03-16 00:41:50,304 INFO XGBoost-PySpark: _train_booster Training on CPUs 1]
[00:41:51] Task 0 got rank 0
INFO:XGBoost-PySpark:Finished xgboost training!                                 
2025-03-16 00:42:43,113 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs
2025-03-16 00:43:14,997 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs
2025-03-16 00:44:03,437 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs
2025-03-16 00:44:27,318 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs
                                                                                

Test RMSE (XGBoost): 3.949051547923543
RMSE as percentage of mean: 45.62%


### Latest XGBoost Model (after hyperparamter tuning)
#### RMSE = 0.97 (high accuracy)

In [2]:
import pyspark
import findspark
import os
import shutil
import glob

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, MinMaxScaler, StandardScaler
from pyspark.sql.functions import col, dayofweek, when, month, hour, lag, avg, stddev, sum as spark_sum, expr
from pyspark.sql.window import Window
from xgboost.spark import SparkXGBRegressor
from pyspark.ml.evaluation import RegressionEvaluator

findspark.init()

# Create Spark session
spark = SparkSession.builder.appName("EnergyPrediction").config("spark.driver.memory", "12g").config("spark.executor.memory", "6g").getOrCreate()

# Load Parquet file
repo_root = os.getcwd()  
parquet_path = os.path.join(repo_root, "merged_df1_df3_df7_df8")
df = spark.read.parquet(parquet_path)
df.show(5)
df.printSchema()

# Time-Based Features
df = df.withColumn("day_of_week", dayofweek(col("day")))
df = df.withColumn("is_weekend", when((col("day_of_week") == 1) | (col("day_of_week") == 7), 1).otherwise(0))
df = df.withColumn("month", month(col("day")))
df = df.withColumn("hour", hour(col("time")))

# Lag Features
window_spec = Window.partitionBy("LCLid").orderBy("day")
df = df.withColumn("lag_1_day", lag("energy_sum", 1).over(window_spec))
df = df.withColumn("lag_2_day", lag("energy_sum", 2).over(window_spec))
df = df.withColumn("lag_7_day", lag("energy_sum", 7).over(window_spec))

# Rolling Aggregates
rolling_window_3d = window_spec.rowsBetween(-2, 0)
rolling_window_7d = window_spec.rowsBetween(-6, 0)
rolling_window_14d = window_spec.rowsBetween(-13, 0)
rolling_window_30d = window_spec.rowsBetween(-29, 0)

df = df.withColumn("rolling_energy_3d", spark_sum("energy_sum").over(rolling_window_3d))
df = df.withColumn("rolling_energy_7d", spark_sum("energy_sum").over(rolling_window_7d))
df = df.withColumn("rolling_energy_14d", spark_sum("energy_sum").over(rolling_window_14d))
df = df.withColumn("rolling_energy_30d", spark_sum("energy_sum").over(rolling_window_30d))

df = df.withColumn("rolling_energy_mean_14d", avg("energy_sum").over(rolling_window_14d))
df = df.withColumn("rolling_energy_std_14d", stddev("energy_sum").over(rolling_window_14d))

# Energy Consumption Trends
df = df.withColumn("energy_trend_3d", col("rolling_energy_3d") - col("lag_1_day"))
df = df.withColumn("energy_trend_7d", col("rolling_energy_7d") - col("lag_1_day"))

# Daily Energy Change Percentage
df = df.withColumn("daily_energy_change", (col("lag_1_day") - col("energy_sum")) / (col("lag_1_day") + 1))

# Weather Interactions
df = df.withColumn("temperature_variability", col("temperatureMax") - col("temperatureMin"))
df = df.withColumn("humidity_temp_interaction", col("humidity") * col("temperatureMax"))
df = df.withColumn("cloud_temp_interaction", col("cloudCover") * col("temperatureMax"))

# Fill Missing Values
df = df.dropna(subset=["energy_sum"])  

df = df.fillna({
    "lag_1_day": 0, "lag_2_day": 0, "lag_7_day": 0,
    "rolling_energy_3d": df.select(avg("energy_sum")).collect()[0][0],
    "rolling_energy_7d": df.select(avg("energy_sum")).collect()[0][0],
    "rolling_energy_14d": df.select(avg("energy_sum")).collect()[0][0],
    "rolling_energy_30d": df.select(avg("energy_sum")).collect()[0][0],
    "rolling_energy_mean_14d": df.select(avg("energy_sum")).collect()[0][0],
    "rolling_energy_std_14d": df.select(stddev("energy_sum")).collect()[0][0],
    "energy_trend_3d": 0, "energy_trend_7d": 0,
    "daily_energy_change": 0,
    "temperature_variability": df.select(avg("temperatureMax") - avg("temperatureMin")).collect()[0][0],
    "humidity_temp_interaction": df.select(avg("humidity") * avg("temperatureMax")).collect()[0][0],
    "cloud_temp_interaction": df.select(avg("cloudCover") * avg("temperatureMax")).collect()[0][0],
})

# Feature Engineering
feature_cols = [
    "day_of_week", "is_weekend", "month", "hour",
    "lag_1_day", "lag_2_day", "lag_7_day",
    "rolling_energy_3d", "rolling_energy_7d", "rolling_energy_14d", "rolling_energy_30d",
    "rolling_energy_mean_14d", "rolling_energy_std_14d",
    "energy_trend_3d", "energy_trend_7d",
    "daily_energy_change",
    "temperature_variability", "humidity_temp_interaction",
    "cloud_temp_interaction", 
]

# Vector Assembler
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features", handleInvalid="keep")
df = assembler.transform(df)

# Feature Scaling
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
df = scaler.fit(df).transform(df)

# Train-Test Split
train_df, test_df = df.randomSplit([0.9, 0.1], seed=42)

# Optimized XGBoost Model
xgb = SparkXGBRegressor(
    features_col="scaled_features",
    label_col="energy_sum",
    max_depth=5,  
    eta=0.1,  
    subsample=0.85,  
    colsample_bytree=0.85,  
    min_child_weight=5,  
    alpha=0.5,  # L1 regularization
    n_estimators=600
)

xgb_model = xgb.fit(train_df)
predictions = xgb_model.transform(test_df)

# Evaluate Model
evaluator = RegressionEvaluator(labelCol="energy_sum", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Test RMSE (XGBoost): {rmse}")

mean_energy_sum = df.select(avg("energy_sum")).collect()[0][0]
rmse_percentage = (rmse / mean_energy_sum) * 100
print(f"RMSE as percentage of mean: {rmse_percentage:.2f}%")

+----------+---------+--------------------+--------------------+--------------------+------------+--------------------+------------------+--------------------+--------------+-------------------+-----------+-----------------+--------+-------------------+----------+---------+--------+--------------------------+-----------------------+----------+----------+--------+---------------------------+----------------------+----------------------+-------+-------------------+-------------------+--------------+--------------+---------------+-------------------+-------------------+-------------------+--------------------+-------------------+----------------------+--------------------------+--------------------------+---------+----+
|       day|    LCLid|       energy_median|         energy_mean|          energy_max|energy_count|          energy_std|        energy_sum|          energy_min|temperatureMax| temperatureMaxTime|windBearing|             icon|dewPoint| temperatureMinTime|cloudCover|windSpeed

2025-03-16 02:25:07,023 INFO XGBoost-PySpark: _fit Running xgboost-2.1.4 on 1 workers with
	booster params: {'objective': 'reg:squarederror', 'colsample_bytree': 0.85, 'device': 'cpu', 'max_depth': 5, 'min_child_weight': 5, 'subsample': 0.85, 'eta': 0.1, 'alpha': 0.5, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 600}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
2025-03-16 02:26:13,263 INFO XGBoost-PySpark: _train_booster Training on CPUs 1]
[02:26:14] Task 0 got rank 0
2025-03-16 02:28:20,490 INFO XGBoost-PySpark: _fit Finished xgboost training!   
2025-03-16 02:29:27,541 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs
                                                                                

Test RMSE (XGBoost): 0.9680186443981336
RMSE as percentage of mean: 9.56%


#### **Reason for RMSE improvement:**

**A. Better Feature Engineering**
- Model views energy usage as a time series trend rather than isolated values
1. Rolling Aggregates & Trends:
    - Rolling energy sums over 3-day, 7-day, 14-day, and 30-day windows.
2. Daily Energy Change Percentage
    - Measured the daily change rate instead of just raw values
    - Allowed the model to learn how energy consumption fluctuates over time
3. Energy Trends Over Time
    - energy_trend_3d, energy_trend_7d showed short-term and long-term variations.
    - Clearer understanding of consumption trends

**B. Hyperparameter Tuning in XGBoost**
1. max_depth → 5 (previously 7-10)
    - Prevented overfitting 
    - Shallower trees make better general predictions.
2. eta (learning rate) → 0.1 (previously 0.2)
    - Slower, more careful learning instead of rushing to fit data
3. subsample & colsample_bytree → 0.85
    - Forced better generalization by randomly selecting data points & features

**C. Feature Scaling with StandardScaler**
1. Switched from MinMaxScaler → StandardScaler
    - MinMaxScaler squashes values between 0 and 1
    - StandardScaler makes all features follow a normal distribution (mean=0, std=1)
    - Prevented some features from dominating the model due to scale differences

**D. Larger Boosting Rounds (600 Trees)**
1. More iterations → Better learning
    - Previous models ran with ~200 trees.
    - Running 600 boosting rounds means the model learned more fine-grained details