In [1]:
import os
import requests
import pandas as pd
import seaborn as sns
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, collect_list, udf,expr
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, lit
from pyspark.ml.feature import MinMaxScaler, VectorAssembler
from pyspark.sql.types import DoubleType



In [2]:
spark = SparkSession.builder \
    .appName("Power Demand Tracker") \
    .getOrCreate()

25/04/04 10:30:00 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
temp_data = pd.read_csv("open-meteo-nwe-york.csv")
os.system("hdfs dfs -put -f ./open-meteo-nwe-york.csv /staging/temp")


0

In [4]:
df_tempurature_spark = spark.read.csv('hdfs:///staging/temp/open-meteo-nwe-york.csv', header=True, inferSchema=True)


                                                                                

In [5]:
df_tempurature_spark.show()

+----------+--------------------+--------------------+------------------+----------------+---------------------+
|  latitude|           longitude|           elevation|utc_offset_seconds|        timezone|timezone_abbreviation|
+----------+--------------------+--------------------+------------------+----------------+---------------------+
| 40.738136|           -74.04254|                51.0|            -14400|America/New_York|                GMT-4|
|      time|temperature_2m_me...|temperature_2m_ma...|              null|            null|                 null|
|2020-01-01|                 2.1|                 4.2|              null|            null|                 null|
|2020-01-02|                 2.3|                 8.8|              null|            null|                 null|
|2020-01-03|                 6.3|                 9.2|              null|            null|                 null|
|2020-01-04|                 7.5|                 9.9|              null|            null|      

In [6]:
## remove the first 2 rows 

df_tempurature_spark = df_tempurature_spark.rdd.zipWithIndex().filter(lambda row: row[1] >= 2).map(lambda row: row[0]).toDF(df_tempurature_spark.schema)

In [7]:
df_tempurature_spark.show()

[Stage 3:>                                                          (0 + 1) / 1]

+----------+---------+---------+------------------+--------+---------------------+
|  latitude|longitude|elevation|utc_offset_seconds|timezone|timezone_abbreviation|
+----------+---------+---------+------------------+--------+---------------------+
|2020-01-01|      2.1|      4.2|              null|    null|                 null|
|2020-01-02|      2.3|      8.8|              null|    null|                 null|
|2020-01-03|      6.3|      9.2|              null|    null|                 null|
|2020-01-04|      7.5|      9.9|              null|    null|                 null|
|2020-01-05|      2.5|      5.0|              null|    null|                 null|
|2020-01-06|      2.6|      7.5|              null|    null|                 null|
|2020-01-07|      2.4|      5.9|              null|    null|                 null|
|2020-01-08|      0.8|      4.9|              null|    null|                 null|
|2020-01-09|     -2.3|      0.4|              null|    null|                 null|
|202

                                                                                

In [8]:
filtered_tempurature_df = df_tempurature_spark.select(
    col("latitude").alias("Date"),
    col("longitude").alias("Min_temp"),
    col("elevation").alias("Max_temp")
)

In [9]:
filtered_tempurature_df.show()

+----------+--------+--------+
|      Date|Min_temp|Max_temp|
+----------+--------+--------+
|2020-01-01|     2.1|     4.2|
|2020-01-02|     2.3|     8.8|
|2020-01-03|     6.3|     9.2|
|2020-01-04|     7.5|     9.9|
|2020-01-05|     2.5|     5.0|
|2020-01-06|     2.6|     7.5|
|2020-01-07|     2.4|     5.9|
|2020-01-08|     0.8|     4.9|
|2020-01-09|    -2.3|     0.4|
|2020-01-10|     4.8|    11.0|
|2020-01-11|    13.6|    18.2|
|2020-01-12|    14.6|    18.6|
|2020-01-13|     4.7|     8.7|
|2020-01-14|     4.0|     6.6|
|2020-01-15|     5.5|     9.2|
|2020-01-16|     5.5|     8.5|
|2020-01-17|    -3.3|     0.6|
|2020-01-18|    -3.3|     2.3|
|2020-01-19|     2.6|     6.9|
|2020-01-20|    -4.1|    -1.0|
+----------+--------+--------+
only showing top 20 rows



In [10]:
filtered_tempurature_df = filtered_tempurature_df.withColumn("Avg_temp", (col("Min_temp") + col("Max_temp")) / 2)

In [11]:
filtered_tempurature_df.show()

+----------+--------+--------+-------------------+
|      Date|Min_temp|Max_temp|           Avg_temp|
+----------+--------+--------+-------------------+
|2020-01-01|     2.1|     4.2| 3.1500000000000004|
|2020-01-02|     2.3|     8.8|  5.550000000000001|
|2020-01-03|     6.3|     9.2|               7.75|
|2020-01-04|     7.5|     9.9|                8.7|
|2020-01-05|     2.5|     5.0|               3.75|
|2020-01-06|     2.6|     7.5|               5.05|
|2020-01-07|     2.4|     5.9|               4.15|
|2020-01-08|     0.8|     4.9|               2.85|
|2020-01-09|    -2.3|     0.4|              -0.95|
|2020-01-10|     4.8|    11.0|                7.9|
|2020-01-11|    13.6|    18.2| 15.899999999999999|
|2020-01-12|    14.6|    18.6|               16.6|
|2020-01-13|     4.7|     8.7|  6.699999999999999|
|2020-01-14|     4.0|     6.6|                5.3|
|2020-01-15|     5.5|     9.2|               7.35|
|2020-01-16|     5.5|     8.5|                7.0|
|2020-01-17|    -3.3|     0.6|-

In [12]:
import json

with open('secrets.json', 'r') as file:
    secret_data = json.load(file)



In [13]:
api_data_start = "2020-01-01"

api_key_arg = "&api_key=" + secret_data.get("eia_api_key")
api_start_arg = "&start=" + api_data_start

In [14]:
data_endpoint = "https://api.eia.gov/v2/electricity/rto/daily-region-data/data/?frequency=daily&data[0]=value&facets[respondent][]=NY&facets[timezone][]=Eastern&facets[type][]=D&sort[0][column]=period&sort[0][direction]=desc&offset=0&length=5000"

In [15]:

response = requests.get(data_endpoint+api_start_arg+api_key_arg)

if response.status_code == 200:
    data = response.json()
    print("request succeeded")

else:
    print("request failed")

request succeeded


In [16]:
raw_df = pd.DataFrame(data.get("response").get("data"))


In [17]:
raw_df.to_csv('raw-data.csv')

In [18]:


os.system("hdfs dfs -put -f ./raw-data.csv /staging/eia")
df_spark = spark.read.csv('hdfs:///staging/eia/raw-data.csv', header=True, inferSchema=True)

df_spark.show()

+---+----------+----------+---------------+----+---------+--------+--------------------+------+-------------+
|_c0|    period|respondent|respondent-name|type|type-name|timezone|timezone-description| value|  value-units|
+---+----------+----------+---------------+----+---------+--------+--------------------+------+-------------+
|  0|2025-04-03|        NY|       New York|   D|   Demand| Eastern|             Eastern|377519|megawatthours|
|  1|2025-04-02|        NY|       New York|   D|   Demand| Eastern|             Eastern|387527|megawatthours|
|  2|2025-04-01|        NY|       New York|   D|   Demand| Eastern|             Eastern|358784|megawatthours|
|  3|2025-03-31|        NY|       New York|   D|   Demand| Eastern|             Eastern|370439|megawatthours|
|  4|2025-03-30|        NY|       New York|   D|   Demand| Eastern|             Eastern|360251|megawatthours|
|  5|2025-03-29|        NY|       New York|   D|   Demand| Eastern|             Eastern|356584|megawatthours|
|  6|2025-

25/04/04 10:30:36 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , period, respondent, respondent-name, type, type-name, timezone, timezone-description, value, value-units
 Schema: _c0, period, respondent, respondent-name, type, type-name, timezone, timezone-description, value, value-units
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/staging/eia/raw-data.csv


In [19]:


filtered_spark_df = df_spark.select(
    col("period").alias("Date"),
    col("value").alias("MWh")
)

In [20]:
filtered_spark_df.show()

+----------+------+
|      Date|   MWh|
+----------+------+
|2025-04-03|377519|
|2025-04-02|387527|
|2025-04-01|358784|
|2025-03-31|370439|
|2025-03-30|360251|
|2025-03-29|356584|
|2025-03-28|373815|
|2025-03-27|371568|
|2025-03-26|383131|
|2025-03-25|378674|
|2025-03-24|398827|
|2025-03-23|351612|
|2025-03-22|354942|
|2025-03-21|360106|
|2025-03-20|365904|
|2025-03-19|354325|
|2025-03-18|367109|
|2025-03-17|383316|
|2025-03-16|350585|
|2025-03-15|353508|
+----------+------+
only showing top 20 rows



In [21]:
#df = raw_df[['period', 'value']]

In [22]:
features_df = filtered_spark_df.withColumn("Day", F.dayofweek("Date"))
features_df = features_df.withColumn("Month", F.month("Date"))
features_df.show()

+----------+------+---+-----+
|      Date|   MWh|Day|Month|
+----------+------+---+-----+
|2025-04-03|377519|  5|    4|
|2025-04-02|387527|  4|    4|
|2025-04-01|358784|  3|    4|
|2025-03-31|370439|  2|    3|
|2025-03-30|360251|  1|    3|
|2025-03-29|356584|  7|    3|
|2025-03-28|373815|  6|    3|
|2025-03-27|371568|  5|    3|
|2025-03-26|383131|  4|    3|
|2025-03-25|378674|  3|    3|
|2025-03-24|398827|  2|    3|
|2025-03-23|351612|  1|    3|
|2025-03-22|354942|  7|    3|
|2025-03-21|360106|  6|    3|
|2025-03-20|365904|  5|    3|
|2025-03-19|354325|  4|    3|
|2025-03-18|367109|  3|    3|
|2025-03-17|383316|  2|    3|
|2025-03-16|350585|  1|    3|
|2025-03-15|353508|  7|    3|
+----------+------+---+-----+
only showing top 20 rows



In [23]:

#features_df.write.mode("overwrite").option("header", True).csv("hdfs:///unscaled/")

In [24]:
joined_df = filtered_spark_df.join(filtered_tempurature_df, on="date", how="inner")  

In [25]:
joined_df.show()

+----------+------+--------+--------+-------------------+
|      Date|   MWh|Min_temp|Max_temp|           Avg_temp|
+----------+------+--------+--------+-------------------+
|2020-01-01|395316|     2.1|     4.2| 3.1500000000000004|
|2020-01-02|416985|     2.3|     8.8|  5.550000000000001|
|2020-01-03|411481|     6.3|     9.2|               7.75|
|2020-01-04|389879|     7.5|     9.9|                8.7|
|2020-01-05|396540|     2.5|     5.0|               3.75|
|2020-01-06|432354|     2.6|     7.5|               5.05|
|2020-01-07|432730|     2.4|     5.9|               4.15|
|2020-01-08|441775|     0.8|     4.9|               2.85|
|2020-01-09|457121|    -2.3|     0.4|              -0.95|
|2020-01-10|430162|     4.8|    11.0|                7.9|
|2020-01-11|374525|    13.6|    18.2| 15.899999999999999|
|2020-01-12|366821|    14.6|    18.6|               16.6|
|2020-01-13|419733|     4.7|     8.7|  6.699999999999999|
|2020-01-14|418257|     4.0|     6.6|                5.3|
|2020-01-15|40

In [26]:
#df_spark_test = spark.read.option("header", True).option("inferSchema", True).csv("hdfs:///unscaled/")
#df_spark_test.show()


##hdfs dfs -getmerge /unscaled/ ./unscaled-data.csv

In [27]:
## RNN

In [28]:
#RNN_df = features_df.select("Date", "MWh")

In [29]:

def scale_features(df):
    this_df = df
    
    for index, column in enumerate(df.columns):
        if column != "Date":
            assembler = VectorAssembler(inputCols=[column], outputCol="feature_"+column)
            df_vec = assembler.transform(this_df)
            #print(df_vec)
            scaler = MinMaxScaler(inputCol="feature_"+column, outputCol="scaled_"+column)
            scaler_model = scaler.fit(df_vec)
            scaled_df = scaler_model.transform(df_vec)
            extract_element = udf(lambda v: float(v[0]), DoubleType())
            #print("scaled_"+column)
            this_df = scaled_df.withColumn("scaled_"+column, extract_element(col("scaled_"+column)))

    return this_df


In [30]:
scaled_features_df = scale_features(features_df)

                                                                                

In [31]:
filtered_scaled_feature = scaled_features_df.select("Date", "scaled_MWh", "scaled_Day", "scaled_Month")

In [32]:
filtered_scaled_feature.show()

[Stage 22:>                                                         (0 + 1) / 1]

+----------+-------------------+-------------------+-------------------+
|      Date|         scaled_MWh|         scaled_Day|       scaled_Month|
+----------+-------------------+-------------------+-------------------+
|2025-04-03| 0.2215267487609218| 0.6666666666666666| 0.2727272727272727|
|2025-04-02| 0.2534873026416637|                0.5| 0.2727272727272727|
|2025-04-01| 0.1616965152521588| 0.3333333333333333| 0.2727272727272727|
|2025-03-31| 0.1989167646006847|0.16666666666666666|0.18181818181818182|
|2025-03-30|0.16638138061417404|                0.0|0.18181818181818182|
|2025-03-29|0.15467081395942978|                1.0|0.18181818181818182|
|2025-03-28|0.20969802258443618| 0.8333333333333333|0.18181818181818182|
|2025-03-27|0.20252222676408974| 0.6666666666666666|0.18181818181818182|
|2025-03-26|0.23944867405855605|                0.5|0.18181818181818182|
|2025-03-25|0.22521524193960452| 0.3333333333333333|0.18181818181818182|
|2025-03-24|0.28957385928159013|0.16666666666666666

                                                                                

In [33]:
filtered_scaled_feature.write.mode("overwrite").option("header", True).csv("hdfs:///transformed/")

                                                                                

In [34]:
########## hdfs dfs -getmerge /transformed/ ./transformed.csv

In [35]:
df_spark_test = spark.read.option("header", True).option("inferSchema", True).csv("hdfs:///transformed/")
df_spark_test.show()

+----------+-------------------+-------------------+-------------------+
|      Date|         scaled_MWh|         scaled_Day|       scaled_Month|
+----------+-------------------+-------------------+-------------------+
|2025-04-03| 0.2215267487609218| 0.6666666666666666| 0.2727272727272727|
|2025-04-02| 0.2534873026416637|                0.5| 0.2727272727272727|
|2025-04-01| 0.1616965152521588| 0.3333333333333333| 0.2727272727272727|
|2025-03-31| 0.1989167646006847|0.16666666666666666|0.18181818181818182|
|2025-03-30|0.16638138061417404|                0.0|0.18181818181818182|
|2025-03-29|0.15467081395942978|                1.0|0.18181818181818182|
|2025-03-28|0.20969802258443618| 0.8333333333333333|0.18181818181818182|
|2025-03-27|0.20252222676408974| 0.6666666666666666|0.18181818181818182|
|2025-03-26|0.23944867405855605|                0.5|0.18181818181818182|
|2025-03-25|0.22521524193960452| 0.3333333333333333|0.18181818181818182|
|2025-03-24|0.28957385928159013|0.16666666666666666

In [36]:
#RNN_df_scaled.write.mode("overwrite").option("header", True).csv("hdfs:///transformed/")

In [37]:
########## hdfs dfs -getmerge /transformed/ ./transformed.csv

Code to prepare for RNN

In [38]:
RNN_spark_df = filtered_scaled_feature.select("Date", "scaled_MWh") 

In [39]:
## Ensure correct direction

#RNN_spark_df = RNN_spark_df.orderBy(col("Date").desc())

RNN_spark_df.show()

+----------+-------------------+
|      Date|         scaled_MWh|
+----------+-------------------+
|2025-04-03| 0.2215267487609218|
|2025-04-02| 0.2534873026416637|
|2025-04-01| 0.1616965152521588|
|2025-03-31| 0.1989167646006847|
|2025-03-30|0.16638138061417404|
|2025-03-29|0.15467081395942978|
|2025-03-28|0.20969802258443618|
|2025-03-27|0.20252222676408974|
|2025-03-26|0.23944867405855605|
|2025-03-25|0.22521524193960452|
|2025-03-24|0.28957385928159013|
|2025-03-23|0.13879272903786216|
|2025-03-22|0.14942708599458382|
|2025-03-21|0.16591832302897144|
|2025-03-20|0.18443423943590007|
|2025-03-19| 0.1474566961320321|
|2025-03-18|  0.188282407643963|
|2025-03-17| 0.2400394716672628|
|2025-03-16|0.13551300393439272|
|2025-03-15|0.14484760615195955|
+----------+-------------------+
only showing top 20 rows



In [40]:
#RNN_spark_df = filtered_scaled_feature.select( "scaled_MWh") 

In [41]:



temp_df = RNN_spark_df.withColumn("part_id", lit(1)).orderBy("Date")


w = Window.partitionBy("part_id").orderBy("Date")

for i in range(1, 7):
    temp_df = temp_df.withColumn(f"lag_{i}", lag("scaled_MWh", i).over(w))

temp_df = temp_df.dropna()
temp_df = temp_df.withColumnRenamed("scaled_MWh", "scaled_MWh")

final_cols = ["Date","scaled_MWh"] + [f"lag_{i}" for i in range(1, 7)]
RNN_spark_df = temp_df.select(final_cols)
RNN_spark_df.show()


+----------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+
|      Date|         scaled_MWh|              lag_1|              lag_2|              lag_3|              lag_4|              lag_5|              lag_6|
+----------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+
|2020-01-07| 0.3978431097031322|0.39664235348219307| 0.2822703234377395|0.26099841602370855| 0.3299844157171325|0.34756144295130553| 0.2783614787185121|
|2020-01-08|0.42672832251801135| 0.3978431097031322|0.39664235348219307| 0.2822703234377395|0.26099841602370855| 0.3299844157171325|0.34756144295130553|
|2020-01-09|  0.475735782535384|0.42672832251801135| 0.3978431097031322|0.39664235348219307| 0.2822703234377395|0.26099841602370855| 0.3299844157171325|
|2020-01-10|0.38964220019416484|  0.475735782535384|0.42672832251801135| 0.3978431

In [42]:
RNN_spark_df.select("Date","scaled_MWh", "lag_1", "lag_2").orderBy(col("Date").desc()).show()

+----------+-------------------+-------------------+-------------------+
|      Date|         scaled_MWh|              lag_1|              lag_2|
+----------+-------------------+-------------------+-------------------+
|2025-04-03| 0.2215267487609218| 0.2534873026416637| 0.1616965152521588|
|2025-04-02| 0.2534873026416637| 0.1616965152521588| 0.1989167646006847|
|2025-04-01| 0.1616965152521588| 0.1989167646006847|0.16638138061417404|
|2025-03-31| 0.1989167646006847|0.16638138061417404|0.15467081395942978|
|2025-03-30|0.16638138061417404|0.15467081395942978|0.20969802258443618|
|2025-03-29|0.15467081395942978|0.20969802258443618|0.20252222676408974|
|2025-03-28|0.20969802258443618|0.20252222676408974|0.23944867405855605|
|2025-03-27|0.20252222676408974|0.23944867405855605|0.22521524193960452|
|2025-03-26|0.23944867405855605|0.22521524193960452|0.28957385928159013|
|2025-03-25|0.22521524193960452|0.28957385928159013|0.13879272903786216|
|2025-03-24|0.28957385928159013|0.13879272903786216

In [43]:
RNN_spark_df = RNN_spark_df.orderBy(col("Date").desc())
RNN_spark_df.show()

[Stage 42:>                                                         (0 + 1) / 1]                                                                                

+----------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+
|      Date|         scaled_MWh|              lag_1|              lag_2|              lag_3|              lag_4|              lag_5|              lag_6|
+----------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+
|2025-04-03| 0.2215267487609218| 0.2534873026416637| 0.1616965152521588| 0.1989167646006847|0.16638138061417404|0.15467081395942978|0.20969802258443618|
|2025-04-02| 0.2534873026416637| 0.1616965152521588| 0.1989167646006847|0.16638138061417404|0.15467081395942978|0.20969802258443618|0.20252222676408974|
|2025-04-01| 0.1616965152521588| 0.1989167646006847|0.16638138061417404|0.15467081395942978|0.20969802258443618|0.20252222676408974|0.23944867405855605|
|2025-03-31| 0.1989167646006847|0.16638138061417404|0.15467081395942978|0.20969802

In [44]:
final_RNN_spark_df = RNN_spark_df.drop("Date")

In [45]:
#final_RNN_spark_df.show()

In [46]:
final_RNN_spark_df.write.mode("overwrite").option("header", True).csv("hdfs:///transformed/rnn")

                                                                                

To install keras and tensorflow


sudo apt update
sudo apt install python3-venv python3-pip -y

python3 -m venv keras-env
source keras-env/bin/activate

pip install --upgrade pip
pip install keras tensorflow


source keras-env/bin/activate
source /home/hduser/Documents/GitHub/CCT-SEM2-CA1/SEM2-CA--HADOOP-SPARK/keras-env/bin/activate



pip install jupyter ipykernel

python -m ipykernel install --user --name=keras-env --display-name="Python (keras-env)"

In [47]:
#test_df = df.copy

In [48]:
#training_data = test_df.iloc[:,0].values

In [49]:
#training_data = scaler.fit_transform(training_data.reshape(-1, 1))

In [50]:
#scaler.fit(df)
#scaled_data = scaler.transform(df)

In [51]:
#sns.pairplot(scaled_data)

In [52]:
#training_data.shape

In [53]:
#new_test_df = pd.DataFrame(training_data)

In [54]:
#sns.pairplot(new_test_df)

Save Data to HDFS

In [55]:
## Save df as csv

##df.to_csv('test-data.csv')

In [56]:
## upload to Hadoop

#import os

#os.system("hdfs dfs -put -f ./test-data.csv /testdata/")


In [57]:
#

In [58]:
#from pyspark.sql import SparkSession
#import os

#os.system("hdfs dfs -put -f ./test-data.csv /testdata/")


#park = SparkSession.builder \
   # .appName("Power Demand Tracker") \
 #  .getOrCreate()

#df_spark = spark.read.csv('hdfs:///testdata/test-data.csv', header=True, inferSchema=True)

#df_spark.show()

In [59]:
#df_spark