Finding Min and Max electricity consumption values, to later de-scale the data

Initializing data and reading from dataset .csv file

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, hour, dayofweek
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
import matplotlib.pyplot as plt
import os
import json
from pyspark import SparkContext

if SparkContext._active_spark_context:
    SparkContext._active_spark_context.stop()

spark = SparkSession.builder \
    .appName("EnergyConsumptionPrediction") \
    .getOrCreate()

data_path = "data/smart_meter_data.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True) 

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/15 11:55:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Splitting the data, training and evaluating linear regression and random forest models

In [3]:
# 1) Add hour and dayofweek features
df = df.withColumn("hour", hour(col("Timestamp"))).withColumn("dayofweek", dayofweek(col("Timestamp")))

# 2) Cast numerical columns to DoubleType
feature_cols = ["Temperature", "Humidity", "Wind_Speed", "Avg_Past_Consumption", "hour", "dayofweek"]
for c in feature_cols + ["Electricity_Consumed"]:
    df = df.withColumn(c, col(c).cast(DoubleType()))

# Drop rows with nulls
df = df.dropna(subset=feature_cols + ["Electricity_Consumed", "Timestamp"])

# 3) Build feature vector
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
data = assembler.transform(df).select("features", col("Electricity_Consumed").alias("label"), "Timestamp")

# 4) Train-test split (70-30)
data_ordered = data.orderBy("Timestamp")
count = data_ordered.count()
split_idx = int(count * 0.7)
train = data_ordered.limit(split_idx)
test = data_ordered.subtract(train)

# 5) Train Linear Regression 
lr = LinearRegression(featuresCol="features", labelCol="label", regParam=0.0)
lr_model = lr.fit(train)
lr_pred = lr_model.transform(test)

# 6) Train Random Forest
rf = RandomForestRegressor(
    featuresCol="features",
    labelCol="label",
    numTrees=50,
    maxDepth=20,
    minInstancesPerNode=1,
    seed=42
)
rf_model = rf.fit(train)
rf_pred = rf_model.transform(test)

# 7) Evaluation
evaluator_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator_mae  = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mae")
evaluator_r2   = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

# Linear Regression metrics
lr_rmse = evaluator_rmse.evaluate(lr_pred)
lr_mae  = evaluator_mae.evaluate(lr_pred)
lr_r2   = evaluator_r2.evaluate(lr_pred)

# Random Forest metrics
rf_rmse = evaluator_rmse.evaluate(rf_pred)
rf_mae  = evaluator_mae.evaluate(rf_pred)
rf_r2   = evaluator_r2.evaluate(rf_pred)

print("=== EVALUATION (test set) ===")
print(f"Linear Regression -> RMSE: {lr_rmse:.6f}, MAE: {lr_mae:.6f}, R2: {lr_r2:.6f}")
print(f"Random Forest     -> RMSE: {rf_rmse:.6f}, MAE: {rf_mae:.6f}, R2: {rf_r2:.6f}")


# 8) Coefficients / Feature Importances
print("\nLinear Regression intercept:", lr_model.intercept)
lr_coeffs = lr_model.coefficients.toArray() if hasattr(lr_model, "coefficients") else lr_model.coefficients
print("Linear Regression coefficients:")
for f, coef in zip(feature_cols, lr_coeffs):
    print(f"  {f}: {coef}")

print("\nRandom Forest feature importances:")
rf_importances = rf_model.featureImportances
for f, imp in zip(feature_cols, rf_importances):
    print(f"  {f}: {imp:.6f}")

# 9) Sample predictions
print("\nSample predictions (Random Forest):")
rf_pred.select("Timestamp", "label", "prediction").show(10, truncate=False)

print("\nSample predictions (Linear Regression):")
lr_pred.select("Timestamp", "label", "prediction").show(10, truncate=False)

# 10) Save models
lr_model.write().overwrite().save("models/lr_model")
rf_model.write().overwrite().save("models/rf_model")

25/10/15 11:55:51 WARN Instrumentation: [05069226] regParam is zero, which might cause numerical instability and overfitting.
25/10/15 11:55:51 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/10/15 11:55:51 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
25/10/15 11:55:51 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
25/10/15 11:55:54 WARN DAGScheduler: Broadcasting large task binary with size 1560.3 KiB
25/10/15 11:55:54 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
25/10/15 11:55:55 WARN DAGScheduler: Broadcasting large task binary with size 3.9 MiB
25/10/15 11:55:55 WARN DAGScheduler: Broadcasting large task binary with size 5.7 MiB
25/10/15 11:55:56 WARN DAGScheduler: Broadcasting large task binary with size 8.0 MiB
25/10/15 11:55:56 WARN DAGScheduler: Broadcasting large task binary with size 1024.5 KiB
25/10/15 11:55:57 WARN D

=== EVALUATION (test set) ===
Linear Regression -> RMSE: 0.156323, MAE: 0.125980, R2: 0.095517
Random Forest     -> RMSE: 0.160367, MAE: 0.129234, R2: 0.048116

Linear Regression intercept: 0.23270551826669847
Linear Regression coefficients:
  Temperature: -0.0081487894783487
  Humidity: -0.027358389912020133
  Wind_Speed: -0.005548839197460918
  Avg_Past_Consumption: 0.35379170057845083
  hour: -0.00021103810694142737
  dayofweek: 0.0011126025948691954

Random Forest feature importances:
  Temperature: 0.181098
  Humidity: 0.156231
  Wind_Speed: 0.167586
  Avg_Past_Consumption: 0.233029
  hour: 0.158428
  dayofweek: 0.103629

Sample predictions (Random Forest):
+-------------------+-------------------+-------------------+
|Timestamp          |label              |prediction         |
+-------------------+-------------------+-------------------+
|2024-03-13 22:00:00|0.5717734854086908 |0.46547477425019823|
|2024-03-13 22:30:00|0.27044775900811624|0.3939755378933961 |
|2024-03-13 23:00:0

25/10/15 11:56:08 WARN TaskSetManager: Stage 99 contains a task of very large size (1624 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

Saving results to be shown on the front-end

In [4]:
# Creating a results dictionary
results = {
    "LinearRegression": {
        "RMSE": lr_rmse,
        "MAE": lr_mae,
        "R2": lr_r2,
        "Coefficients": {f: float(c) for f, c in zip(feature_cols, lr_coeffs)},
        "Intercept": float(lr_model.intercept)
    },
    "RandomForest": {
        "RMSE": rf_rmse,
        "MAE": rf_mae,
        "R2": rf_r2,
        "FeatureImportances": {f: float(imp) for f, imp in zip(feature_cols, rf_importances)}
    }
}

# JSON path
output_path = os.path.join("fastapi_app", "spark_results.json")

# Writing data into JSON file
with open(output_path, "w") as f:
    json.dump(results, f, indent=4)

print(f"Results successfully saved at {output_path}")

Results successfully saved at fastapi_app/spark_results.json


Preparing model data to be shown on the front-end

In [5]:
import matplotlib.dates as mdates
from pandas import to_datetime

# Converting spark dataframes to pandas for plotting
lr_pred_pd = lr_pred.withColumn("Timestamp", col("Timestamp").cast("string")).toPandas()
rf_pred_pd = rf_pred.withColumn("Timestamp", col("Timestamp").cast("string")).toPandas()

# Converting Timestamp column back to datetime
lr_pred_pd["Timestamp"] = to_datetime(lr_pred_pd["Timestamp"])
rf_pred_pd["Timestamp"] = to_datetime(rf_pred_pd["Timestamp"])

# Linear Regression prediction vs actual
plt.figure(figsize=(10,5))
plt.plot(lr_pred_pd["Timestamp"], lr_pred_pd["label"], label="Actual Consumption", color="blue")
plt.plot(lr_pred_pd["Timestamp"], lr_pred_pd["prediction"], label="LR Prediction", color="red")
plt.xlabel("Timestamp")
plt.ylabel("Scaled Consumption (`%` of max = 500 kWh)")
plt.title("Linear Regression: Prediction vs Actual Consumption")
plt.legend()

# format X-axis: max 10 ticks for readability
plt.gca().xaxis.set_major_locator(mdates.AutoDateLocator(maxticks=10))
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m-%d %H:%M"))

plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig("fastapi_app/images/lr_prediction.png")
plt.close()

# Random Forest prediction vs actual
plt.figure(figsize=(10,5))
plt.plot(rf_pred_pd["Timestamp"], rf_pred_pd["label"], label="Actual Consumption", color="blue")
plt.plot(rf_pred_pd["Timestamp"], rf_pred_pd["prediction"], label="RF Prediction", color="green")
plt.xlabel("Timestamp")
plt.ylabel("Scaled Consumption (`%` of max = 500 kWh)")
plt.title("Random Forest: Prediction vs Actual Consumption")
plt.legend()

# Again, formatting X-axis: max 10 ticks for readability
plt.gca().xaxis.set_major_locator(mdates.AutoDateLocator(maxticks=10))
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m-%d %H:%M"))

plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig("fastapi_app/images/rf_prediction.png")
plt.close()