In [None]:
sc.install_pypi_package("numpy==1.26.4")
sc.install_pypi_package("pandas")
sc.install_pypi_package("matplotlib")
sc.install_pypi_package("altair")
sc.list_packages()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, avg, stddev, window, year, month
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.window import Window
from pyspark.sql.functions import lag
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

import random
from datetime import datetime, timedelta


In [None]:
spark = SparkSession.builder.appName("EnergyConsumptionAnalysis").getOrCreate()

In [None]:
s3_bucket = "aws-emr-studio-624855134767-us-east-1"

In [None]:
csv_path = f"s3://{s3_bucket}/data/opsd-time_series-2020-10-06/time_series_60min_singleindex.csv"

df = spark.read.csv(csv_path, header=True, inferSchema=True)

In [None]:
df.printSchema()

In [None]:
all_columns = df.columns
fixed_columns = ['utc_timestamp', 'cet_cest_timestamp']

country_load_columns = [
    col for col in all_columns
    if len(col) > 14 and col[2:14] == '_load_actual' and col[:2].isalpha()
]

columns_to_select = fixed_columns + country_load_columns

df_filtered = df.select(*columns_to_select)

In [None]:
df_filtered.printSchema()

In [None]:
columns_to_select = ["utc_timestamp", "PL_load_actual_entsoe_transparency"]
df_filtered = df.select(*columns_to_select)
df_filtered = df_filtered.withColumnRenamed(
    "PL_load_actual_entsoe_transparency", "energy_consumption")
df_filtered.show(5)

In [None]:
df_filtered = df_filtered.dropna()

In [None]:
df_with_time = df_filtered.withColumn("year", F.year("utc_timestamp")) \
                          .withColumn("month", F.month("utc_timestamp")) \
                          .withColumn("day_of_week", F.dayofweek("utc_timestamp")) \
                          .withColumn("hour", F.hour("utc_timestamp")) 

window_spec = Window.partitionBy("month", "day_of_week")

df_stats = df_with_time.withColumn("mean", F.avg("energy_consumption").over(window_spec)) \
                       .withColumn("stddev", F.stddev("energy_consumption").over(window_spec))

df_stats = df_stats.withColumn("thresh_low", F.col("mean") - 2 * F.col("stddev")) \
                   .withColumn("thresh_high", F.col("mean") + 2 * F.col("stddev"))

df_anomalies = df_stats.filter(
    (F.col("energy_consumption") < F.col("thresh_low")) |
    (F.col("energy_consumption") > F.col("thresh_high"))).select(
        "utc_timestamp", "year", "month", "day_of_week", "hour", "energy_consumption",
        "thresh_low", "thresh_high")

df_anomalies.show(20)

In [None]:
df_anomalies_pd = df_anomalies.toPandas()
df_filtered_pd = df_filtered.toPandas()

plt.figure(figsize=(12, 6))

plt.plot(df_filtered_pd["utc_timestamp"],
         df_filtered_pd["energy_consumption"],
         label="Energy Consumption",
         color='blue')

plt.scatter(df_anomalies_pd["utc_timestamp"],
            df_anomalies_pd["energy_consumption"],
            color='red',
            label="Anomalies",
            zorder=5)

plt.xlabel("Timestamp")
plt.ylabel("Energy Consumption")
plt.title("Energy Consumption with Anomalies")
plt.legend()
plt.xticks(rotation=45)
plt.show()
%matplot plt

In [None]:
anomalies_timestamps = df_anomalies.select("utc_timestamp").distinct()

df_filtered = df_with_time.join(anomalies_timestamps,
                               on="utc_timestamp",
                               how="left_anti")

df_filtered.show(20)

In [None]:
window_spec = Window.orderBy("utc_timestamp")

df_lagged = df_filtered.withColumn("prev_time", lag("utc_timestamp", 1).over(window_spec)) \
                       .withColumn("lag_1", lag("energy_consumption", 1).over(window_spec)) \
                       .withColumn("lag_2", lag("energy_consumption", 2).over(window_spec)) \
                       .withColumn("lag_3", lag("energy_consumption", 3).over(window_spec))

df_lagged = df_lagged.withColumn("time_diff", (col("utc_timestamp").cast("long") - col("prev_time").cast("long")) / 3600)

df_lagged = df_lagged.withColumn("lag_1", when(col("time_diff") > 1, None).otherwise(col("lag_1"))) \
                     .withColumn("lag_2", when(col("time_diff") > 1, None)
                                         .when(col("lag_1").isNotNull(), lag("lag_1", 1).over(window_spec))) \
                     .withColumn("lag_3", when(col("time_diff") > 1, None)
                                         .when(col("lag_2").isNotNull(), lag("lag_2", 1).over(window_spec)))

df_lagged = df_lagged.drop("prev_time", "time_diff")

df_lagged.show()


In [None]:
df_lagged = df_lagged.dropna()
df_lagged.show()

In [None]:
assembler = VectorAssembler(
    inputCols=["lag_1", "lag_2", "lag_3", "day_of_week", "month", "hour"],
    outputCol="features")

In [None]:
model = DecisionTreeRegressor(featuresCol="features", labelCol="energy_consumption")

pipeline = Pipeline(stages=[assembler, model])

train_data, test_data = df_lagged.randomSplit([0.8, 0.2])

In [None]:
model = pipeline.fit(train_data)

In [None]:
predictions = model.transform(test_data)
predictions.select("utc_timestamp", "energy_consumption", "prediction").show()

In [None]:
evaluator = RegressionEvaluator(predictionCol="prediction",
                                labelCol="energy_consumption",
                                metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse}")

In [None]:
predictions_pd = predictions.select("utc_timestamp", "energy_consumption",
                                    "prediction").toPandas()
predictions_pd['utc_timestamp'] = pd.to_datetime(
    predictions_pd['utc_timestamp'])
predictions_pd['year'] = predictions_pd['utc_timestamp'].dt.year
predictions_pd['month'] = predictions_pd['utc_timestamp'].dt.month

years = predictions_pd['year'].unique()
for year in years:
    yearly_data = predictions_pd[predictions_pd['year'] == year]

    months = yearly_data['month'].unique()

    for month in months:
        monthly_data = yearly_data[yearly_data['month'] == month]

        plt.figure(figsize=(12, 6))
        plt.plot(monthly_data['utc_timestamp'],
                 monthly_data['energy_consumption'],
                 label="Actual")
        plt.plot(monthly_data['utc_timestamp'],
                 monthly_data['prediction'],
                 label="Predicted",
                 linestyle="--")
        plt.xlabel("Timestamp")
        plt.ylabel("Energy Consumption")
        plt.title(
            f"Energy Consumption in {year}-{month:02d}: Actual vs Predicted")
        plt.legend()
        plt.xticks(rotation=45)

        plt.savefig(f'energy_consumption_plot_{year}_{month:02d}.png')
        plt.show()

%matplot plt

In [None]:
def generate_random_date(start_date, end_date):
    delta_days = (end_date - start_date).days
    random_days = random.randint(0, delta_days)
    random_date = start_date + timedelta(days=random_days)
    return random_date

In [None]:
start_date = datetime(2015, 1, 1)
end_date = datetime(2020, 12, 30)

random_date = generate_random_date(start_date, end_date)
print(f"Random date: {random_date}")

random_date_str = random_date.strftime("%Y-%m-%d")

df_random_day = df_lagged.filter(F.to_date(df_lagged["utc_timestamp"]) == random_date_str)

df_random_day.show()


In [None]:
rows = df_random_day.orderBy("utc_timestamp").collect()

lag_1 = rows[0]['lag_1']
lag_2 = rows[0]['lag_2']
lag_3 = rows[0]['lag_3']

predictions = []
actual_values = []

for i in range(len(rows)):
    current_row = rows[i].asDict()  
    current_row['lag_1'] = lag_1
    current_row['lag_2'] = lag_2
    current_row['lag_3'] = lag_3

    new_df = spark.createDataFrame([current_row])

    pred = model.transform(new_df)

    prediction_value = round(pred.select("prediction").collect()[0]['prediction'], 2)

    predictions.append(prediction_value)
    actual_values.append(current_row['energy_consumption'])

    lag_3 = lag_2
    lag_2 = lag_1
    lag_1 = prediction_value 

pred_df = pd.DataFrame({
    'utc_timestamp': [row['utc_timestamp'] for row in rows],
    'actual': actual_values,
    'prediction': predictions
})

plt.figure(figsize=(12, 6))
plt.plot(pred_df['utc_timestamp'], pred_df['actual'], label='Energy Consumption', color='blue')
plt.plot(pred_df['utc_timestamp'], pred_df['prediction'], label='Predicted', color='orange', linestyle='--')
plt.xlabel('Timestamp')
plt.ylabel('Energy Consumption')
plt.title('Energy Consumption: Actual vs Predicted')
plt.legend()
plt.xticks(rotation=45)
plt.show()
%matplot plt