In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
#######################################
###!@0 START INIT ENVIRONMENT
!ls /content/drive/MyDrive/spark-3.5.2-bin-hadoop3.tgz
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!tar xf /content/drive/MyDrive/spark-3.5.2-bin-hadoop3.tgz
!pip install -q findspark
!pip install -q pyspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.2-bin-hadoop3"
###!@0 END INIT ENVIRONMENT

/content/drive/MyDrive/spark-3.5.2-bin-hadoop3.tgz


In [3]:
!mkdir -p /content/data
!rm -rf /content/data/*.csv
!ln -s /content/drive/MyDrive/DES_Project/DataSet/weather_data/*.csv /content/data/

In [4]:
#Load Weather and consumption Data

csv_path1 = '/content/drive/MyDrive/DES_Project/DataSet/weather_data.csv'
csv_path2 = '/content/drive/MyDrive/DES_Project/DataSet/electricity_consumption_delhi.csv'

In [5]:
#######################################
###!@1 START OF PYSPARK INIT
import findspark
findspark.init()
findspark.find()
from pyspark.sql import SparkSession
input_type = 'sample'
spark = SparkSession.builder\
         .master("local")\
         .appName("Colab")\
         .config('spark.ui.port', '4050')\
         .getOrCreate()
# Spark is ready to go within Colab!
###!@1 END OF PYSPARK INIT

In [6]:
#Load data from csv files

weather_data = spark.read.csv(csv_path1, header=True, inferSchema=True)
electricity_data = spark.read.csv(csv_path2, header=True, inferSchema=True)

In [7]:
#Import required Libraries

from pyspark.sql.functions import F, col, count, when, isnan
from pyspark.sql.functions import to_date
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import avg, date_trunc
from pyspark.sql.window import Window
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.types import DateType
from datetime import timedelta, datetime

In [8]:
# Display the first few rows

electricity_data.show(5)
weather_data.show(5)

+-----+-------------------+----------------+
| City|               Date|Consumption (MW)|
+-----+-------------------+----------------+
|Delhi|2023-01-01 00:00:00|            8326|
|Delhi|2023-01-01 00:01:00|            8218|
|Delhi|2023-01-01 00:02:00|            8317|
|Delhi|2023-01-01 00:03:00|            8338|
|Delhi|2023-01-01 00:04:00|            8211|
+-----+-------------------+----------------+
only showing top 5 rows

+-----+-------------------+---------------+--------------+------------+--------------+-------------------+----------------+--------------+------------+-------------------+-------------------+
| City|               Date|Temperature (C)|Feels Like (C)|Humidity (%)|Pressure (hPa)|Weather Description|Wind Speed (m/s)|Cloudiness (%)|Rain (1h mm)|            Sunrise|             Sunset|
+-----+-------------------+---------------+--------------+------------+--------------+-------------------+----------------+--------------+------------+-------------------+---------------

In [None]:
# Print schema

electricity_data.printSchema()
weather_data.printSchema()

root
 |-- City: string (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Consumption (MW): integer (nullable = true)

root
 |-- City: string (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Temperature (C): double (nullable = true)
 |-- Feels Like (C): double (nullable = true)
 |-- Humidity (%): integer (nullable = true)
 |-- Pressure (hPa): integer (nullable = true)
 |-- Weather Description: string (nullable = true)
 |-- Wind Speed (m/s): double (nullable = true)
 |-- Cloudiness (%): integer (nullable = true)
 |-- Rain (1h mm): double (nullable = true)
 |-- Sunrise: timestamp (nullable = true)
 |-- Sunset: timestamp (nullable = true)



In [None]:

# Check data statistics

electricity_data.describe().show()
weather_data.describe().show()

+-------+--------+------------------+
|summary|    City|  Consumption (MW)|
+-------+--------+------------------+
|  count|24144480|          24144480|
|   mean|    NULL|3242.0363132277025|
| stddev|    NULL|2824.2361845591695|
|    min|   Delhi|                50|
|    max|   Delhi|              8600|
+-------+--------+------------------+

+-------+--------+------------------+------------------+------------------+----------------+-------------------+------------------+------------------+------------------+
|summary|    City|   Temperature (C)|    Feels Like (C)|      Humidity (%)|  Pressure (hPa)|Weather Description|  Wind Speed (m/s)|    Cloudiness (%)|      Rain (1h mm)|
+-------+--------+------------------+------------------+------------------+----------------+-------------------+------------------+------------------+------------------+
|  count|24111360|          24111360|          24111360|          24111360|        24111360|           24111360|          24111360|          241113

In [9]:
# Check for missing values

def missing_values(df):
    total_rows = df.count()
    return df.select([
       (count(when(isnan(c) | col(c).isNull(), c)) / total_rows).alias(f"{c}_missing") if dict(df.dtypes)[c] in ["double", "float"]
       else (count(when(col(c).isNull(), c)) / total_rows).alias(f"{c}_missing") for c in df.columns])

missing_values(electricity_data).show()
missing_values(weather_data).show()

+------------+------------+------------------------+
|City_missing|Date_missing|Consumption (MW)_missing|
+------------+------------+------------------------+
|         0.0|         0.0|                     0.0|
+------------+------------+------------------------+

+------------+------------+-----------------------+----------------------+--------------------+----------------------+---------------------------+------------------------+----------------------+--------------------+---------------+--------------+
|City_missing|Date_missing|Temperature (C)_missing|Feels Like (C)_missing|Humidity (%)_missing|Pressure (hPa)_missing|Weather Description_missing|Wind Speed (m/s)_missing|Cloudiness (%)_missing|Rain (1h mm)_missing|Sunrise_missing|Sunset_missing|
+------------+------------+-----------------------+----------------------+--------------------+----------------------+---------------------------+------------------------+----------------------+--------------------+---------------+---------

In [12]:
# Assuming the DataFrame name is `weather_df` and the column is `Date`
weather_data = weather_data.withColumn("Date", F.to_date(F.col("Date")))

# Show the resulting DataFrame to verify
weather_data.show()

# Assuming the DataFrame name is `electricity_df` and the column is `Date`
electricity_data = electricity_data.withColumn("Date", F.to_date(F.col("Date")))

# Show the resulting DataFrame to verify
electricity_data.show()

+-----+----------+---------------+--------------+------------+--------------+-------------------+----------------+--------------+------------+-------------------+-------------------+
| City|      Date|Temperature (C)|Feels Like (C)|Humidity (%)|Pressure (hPa)|Weather Description|Wind Speed (m/s)|Cloudiness (%)|Rain (1h mm)|            Sunrise|             Sunset|
+-----+----------+---------------+--------------+------------+--------------+-------------------+----------------+--------------+------------+-------------------+-------------------+
|Delhi|2023-01-01|           8.24|          6.36|          68|          1083|              sunny|            4.87|            83|        5.47|2023-01-01 06:31:00|2023-01-01 18:30:00|
|Delhi|2023-01-01|          12.01|         10.88|          60|          1100|              sunny|            2.66|            72|        1.53|2023-01-01 06:29:00|2023-01-01 18:28:00|
|Delhi|2023-01-01|            5.4|          5.79|          37|          1064|        

In [16]:
# Removing 'Sunrise' and 'Sunset' columns
weather_data = weather_data.drop("Sunrise", "Sunset")

# Aggregating numerical columns and rounding to 2 decimal places
numerical_agg = weather_data.groupBy("Date", "City").agg(
    *[F.round(F.avg(F.col(col)), 2).alias(col) for col in weather_data.columns if col not in ["Date", "Weather Description", "City"]]
)

# Finding the most frequent category for Weather Description
weather_description_count = weather_data.groupBy("Date", "Weather Description").count()

# Using a window function to rank weather descriptions by count for each date
window_spec = Window.partitionBy("Date").orderBy(F.desc("count"))
most_frequent_description = weather_description_count.withColumn(
    "rank", F.row_number().over(window_spec)
).filter(F.col("rank") == 1).select("Date", "Weather Description")

# Joining numerical aggregates with the most frequent Weather Description
final_weather_data = numerical_agg.join(most_frequent_description, on="Date", how="inner")

# Show the resulting DataFrame
final_weather_data.show()

+----------+-----+---------------+--------------+------------+--------------+----------------+--------------+------------+-------------------+
|      Date| City|Temperature (C)|Feels Like (C)|Humidity (%)|Pressure (hPa)|Wind Speed (m/s)|Cloudiness (%)|Rain (1h mm)|Weather Description|
+----------+-----+---------------+--------------+------------+--------------+----------------+--------------+------------+-------------------+
|2023-01-01|Delhi|          13.65|         13.68|       49.93|       1000.54|            2.57|          39.0|        2.52|              clear|
|2023-01-02|Delhi|          13.36|         13.32|       50.06|       1001.62|            2.54|         39.78|        2.52|              sunny|
|2023-01-03|Delhi|          13.53|         13.54|       50.45|         999.8|            2.55|         39.09|        2.54|              clear|
|2023-01-04|Delhi|          13.59|         13.56|       49.74|       1001.24|             2.6|         39.16|        2.47|               cold|

In [18]:
# Aggregating numerical columns for each date and rounding to 2 decimal places
final_electricity_data = electricity_data.groupBy("Date", "City").agg(
    *[
        F.round(F.avg(F.col(col)), 2).alias(col)
        for col in electricity_data.columns
        if col not in ["Date", "City"]
    ]
)

# Show the resulting DataFrame
final_electricity_data.show()

+----------+-----+--------------------+
|      Date| City|Avg Consumption (MW)|
+----------+-----+--------------------+
|2023-05-15|Delhi|             8250.36|
|2023-07-07|Delhi|             8248.67|
|2023-09-30|Delhi|              8251.2|
|2023-10-18|Delhi|             8249.98|
|2024-02-08|Delhi|             8500.01|
|2024-04-27|Delhi|             8499.97|
|2024-06-05|Delhi|             8502.22|
|2023-02-08|Delhi|             8249.83|
|2023-07-05|Delhi|             8251.63|
|2024-02-22|Delhi|             8498.76|
|2023-12-25|Delhi|              8248.1|
|2024-08-01|Delhi|             8500.39|
|2024-09-01|Delhi|             8498.74|
|2024-03-25|Delhi|              8498.0|
|2024-09-23|Delhi|             8502.53|
|2023-02-09|Delhi|             8249.58|
|2024-01-21|Delhi|             8500.54|
|2024-04-11|Delhi|             8502.59|
|2024-07-04|Delhi|             8500.59|
|2024-08-18|Delhi|             8500.83|
+----------+-----+--------------------+
only showing top 20 rows



In [19]:
# Merge the two DataFrames on the 'Date' column
merged_data = final_weather_data.join(final_electricity_data, on="Date", how="inner")

# Dropping one of the City columns, keeping only one City column
merged_data = merged_data.drop(final_electricity_data.City)

# Show the resulting merged DataFrame
merged_data.show()

+----------+-----+---------------+--------------+------------+--------------+----------------+--------------+------------+-------------------+--------------------+
|      Date| City|Temperature (C)|Feels Like (C)|Humidity (%)|Pressure (hPa)|Wind Speed (m/s)|Cloudiness (%)|Rain (1h mm)|Weather Description|Avg Consumption (MW)|
+----------+-----+---------------+--------------+------------+--------------+----------------+--------------+------------+-------------------+--------------------+
|2023-01-01|Delhi|          13.65|         13.68|       49.93|       1000.54|            2.57|          39.0|        2.52|              clear|              8249.3|
|2023-01-02|Delhi|          13.36|         13.32|       50.06|       1001.62|            2.54|         39.78|        2.52|              sunny|             8251.03|
|2023-01-03|Delhi|          13.53|         13.54|       50.45|         999.8|            2.55|         39.09|        2.54|              clear|             8248.97|
|2023-01-04|Delh

In [25]:
# Extracting date-related features from the Date column
merged_data = merged_data.withColumn("Year", F.year(F.col("Date")))
merged_data = merged_data.withColumn("Month", F.month(F.col("Date")))
merged_data = merged_data.withColumn("DayOfWeek", F.dayofweek(F.col("Date")))

# List of predictor columns, including date features
feature_columns = ["Temperature (C)","Humidity (%)","Wind Speed (m/s)","Cloudiness (%)","Rain (1h mm)"]

# Target column
target_column = "Avg Consumption (MW)"

# VectorAssembler to combine all features into a single feature vector
vector_assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Transform the merged DataFrame to create the features vector
assembled_data = vector_assembler.transform(merged_data)

# Select the features and target column for the final dataset
final_data = assembled_data.select("features", F.col(target_column).alias("label"))

# Split into train and test sets
train_data, test_data = final_data.randomSplit([0.8, 0.2], seed=42)

# Display the training and testing data counts
print(f"Training Data Count: {train_data.count()}")
print(f"Testing Data Count: {test_data.count()}")


Training Data Count: 564
Testing Data Count: 109


In [26]:
# Train the model using GBTRegressor
gbt = GBTRegressor(featuresCol="features", labelCol="label", maxIter=50, maxDepth=5, seed=42)
model = gbt.fit(train_data)

In [27]:
# Evaluate the Model on Test Data
predictions = model.transform(test_data)
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data: {rmse}")

Root Mean Squared Error (RMSE) on test data: 143.36469905124954


In [29]:
# Generate the Next 30 Days
last_date = merged_data.agg(F.max("Date").alias("last_date")).collect()[0]["last_date"]
start_date = last_date
future_dates_list = [(start_date + timedelta(days=i)) for i in range(1, 31)]
future_dates_df = spark.createDataFrame(future_dates_list, DateType()).toDF("Date")

In [31]:
# Add date-derived features
future_dates_df = future_dates_df.withColumn("Year", F.year(F.col("Date")))
future_dates_df = future_dates_df.withColumn("Month", F.month(F.col("Date")))
future_dates_df = future_dates_df.withColumn("DayOfWeek", F.dayofweek(F.col("Date")))

In [32]:
# Add placeholder values for weather features (adjust values as needed)
future_dates_df = future_dates_df.withColumn("Temperature (C)", F.lit(25.0))  # Example value
future_dates_df = future_dates_df.withColumn("Humidity (%)", F.lit(60.0))
future_dates_df = future_dates_df.withColumn("Wind Speed (m/s)", F.lit(3.0))
future_dates_df = future_dates_df.withColumn("Cloudiness (%)", F.lit(50.0))
future_dates_df = future_dates_df.withColumn("Rain (1h mm)", F.lit(0.0))

In [33]:
# Prepare Future Data for Prediction
future_features = vector_assembler.transform(future_dates_df).select("Date", "features")

In [34]:
# Forecast on Future Dates
future_predictions = model.transform(future_features)
forecast = future_predictions.select("Date", "prediction").withColumnRenamed("prediction", "Forecasted Avg Consumption (MW)")

In [35]:
# Show the forecast for the next 30 days
forecast.show()

+----------+-------------------------------+
|      Date|Forecasted Avg Consumption (MW)|
+----------+-------------------------------+
|2024-11-04|               8301.12245934646|
|2024-11-05|               8301.12245934646|
|2024-11-06|               8301.12245934646|
|2024-11-07|               8301.12245934646|
|2024-11-08|               8301.12245934646|
|2024-11-09|               8301.12245934646|
|2024-11-10|               8301.12245934646|
|2024-11-11|               8301.12245934646|
|2024-11-12|               8301.12245934646|
|2024-11-13|               8301.12245934646|
|2024-11-14|               8301.12245934646|
|2024-11-15|               8301.12245934646|
|2024-11-16|               8301.12245934646|
|2024-11-17|               8301.12245934646|
|2024-11-18|               8301.12245934646|
|2024-11-19|               8301.12245934646|
|2024-11-20|               8301.12245934646|
|2024-11-21|               8301.12245934646|
|2024-11-22|               8301.12245934646|
|2024-11-2