# Project IND-320
**Name** : _Izza Qamar_

## Links
 - **GitHub Repository** : https://github.com/izzaqamar/Izza_Ind_320.git  

 - **Streamlit App** : https://izza-ind320.streamlit.app/

# Deliverable 4

## Project Overview and AI Usage 

### AI tools
- For Jupyter task, I have used chatgpt to understand how I can extend my previous api_call function to multiple years for production and  consumption.
- For Streamlit, I have used both chatgpt and copilot. Initially to understand use of folium as I have never used it before and to add outlines and Choropleth to the map. Then to understand how to plot wind rose and snow drift. Then for the forecasting part I used it to read about Sarimax and its parameters and took help in building logic for the page. I also took its help to understand how I can incorporate the confidence interval on my graph for forecasting. Finally, I took its help to build my homepage and how I can set the navigation sidebar the way I wanted to organize pages in groups and subgroups. 

### Project Overview

#### Jupter Notebook
- I created new jupyter notebook for this deliverable. I began by fetching data from the API for production and consumption on similar patter. I first defined the required parameters. The API allows fetching data for only one month per call, so to retrieve data for multiple years, I added a for loop for list of years and then I created a month_range function to generate all months and passed them using a for loop. The data for October had to be split into two parts as I was getting error for 31st Oct. I used another for loop to access productionPerGroupMbaHour/consumptionPerGroupMbaHour for all areas. Then I used extend() to add all fetched data per request into the production_data to get list of dictionaries.
- I converted this list of dictionaries into a pandas DataFrame and formatted the time columns to UTC.
- I set up Cassandra, connected to it, and created the keyspace and table using case-sensitive formatting.
- I set up a Spark-Cassandra connection, converted the pandas DataFrame to a Spark DataFrame, and inserted it into Cassandra(just appended the new production data and added all the new consumption data). I then extracted the required columns from Cassandra using Spark. I repeated these steps for production and consumption datasets.
- Finally, I connected to MongoDB to my collection ind320_production_table_d4 and ind320_consumption_table, converted the Spark data to pandas DataFrame, then dictionaries, and inserted it into MongoDB.
- **Bonus Point** : I calculated the monthly snow drift and plotted it separately and together with the yearly snow drift.

#### Streamlit app
- Structural changes: The data is now either acquired from MongoDb or API. No csv files are used. 

1. Sections: I edited the homepage to have a custom sidebar navigation. It has sections 'energy' based on energy data fetched from MongoDB, 'weather' based on weather data fetch from open-meteo api and Cross domain which is common for both datasets.
2. Subgroup: For energy data, there is subgroup of visualization containing insights in form of Pie plot, line plot and maps page. The analysis subgroup has page STL/Spectogram and forecasting subgroup has Sarimax dynamic forecasting. The years range for energy section is 2021-2024.
3. Subgroup: For weather data, there are two subgroups which are Visualization(containing Data Insights) and Analysis(containing Outlier/Anomalies page). The years range for weather section is 2000-2024.
4. Subgroup: For Cross Domain there is only one subgroup and a page containing sliding window correlation. The years range for Cross Domain section is 2021-2024.

- Deliverable-4:
For this deliverable I created a utils.py file which contains my connection function for MongoDB and Api so its much cleaner and easier to use. I have added 4 new pages.
1. Maps and Energy Choropleth: Two columns. In the first column, I have added the map using the said geojson from folium. The 5 price areas borders are marked and highlights the price area border with a different color on a click. The click also save latitude and longitude in session state for other pages to use it. The second column gives user to select dataset and respective group and see the choropleth of quantitykwh in the user selected time for (2021-2024). The data is fetched only for selected time.
2. Snow Drift: The code is copied from the file provided. It fetches data from api for the location selected on map page and redirect to maps page if no location selected. It fetches data for 1 July fo year to 30 June of next year and the user can select the year range from slider (2000-2024). I have asked user to pick whether they want to see montly or yearly snow drift bar plots. Then the wind rose plot is also added.
3. Sliding Window Correlation: It fetches data from api for location selected and redirects to map page if no location selected. Then fetches data from api and mongodb for the user selected year (2021-2024). Some Ui elements are provided to make it interactive so user can explore the effect of weather properties on energy production(quantity kwh) or consumption (quantity kwh).
4. Forecasting: The user select the dataset either production or consumption. Then selects target type (individual forecasting for each price area with each group or aggregated forecasting of selected multiple price areas with multiple groups). Then user select the remaining exogenous variables that are the groups. The training and forecast time frame is selected by user. All relevant SARIMAX parameters are also selected by user to make it more interactive. Finally, after forecasting a plot is rendered which displays the training data and forecasted data with the confidence interval.

**Sliding Window Correlation**

- **(Consumption vs Temperature)** , Window unit: Hours , Window size: ~70 hours , Lag: ±4 hours   
- Lag +4h: Correlation is stable and narrow in summer (June–Sept), but spread out in winter, showing stronger variability during cold events.  
- Lag –4h: Pattern reverses — spread correlations in summer, while winter values cluster near zero, indicating weaker, steadier relationships.  
- Lag direction changes the seasonal dynamics: positive lag highlights stable summer and volatile winter, while negative lag shows variable summer and flatter winter correlations.

**(Consumption vs Precipitation)** :
When analyzing consumption against precipitation, the sliding‑window correlation graph shows gaps.  
This occurs because precipitation data often has long stretches of zeros (no rain).  
During these periods, the variance of precipitation is zero, making correlation undefined.  
The gaps highlight that meaningful correlation only emerges during rain events.  

**(Consumption vs Wind/Wind Gusts/wind direction)**:
- Winter months (Oct–Mar): 
  The correlation curve is less stable, with wider fluctuations and variability. This reflects the stronger influence of irregular wind patterns and extreme weather events on consumption.  

- Summer months (Jun–Sept): 
  The correlation curve is relatively stable , indicating a steadier relationship between wind conditions and consumption during calmer seasonal weather.  

Similarly trends can be explore for Production(kwh) vs weather properties.

## Jupyter Work
- The following sections contain the jupyter tasks. 

#### Data from API for Production

In [None]:
from datetime import datetime
from zoneinfo import ZoneInfo
import requests

#Defining to use for .get from the api
url = "https://api.elhub.no/energy-data/v0/price-areas"
dataset = "PRODUCTION_PER_GROUP_MBA_HOUR"
# list of years to collect
years = [2022, 2023, 2024]  # list of years to collect
tz_norway = ZoneInfo("Europe/Oslo")

#Creating a function to get the dates for each month call for api
def month_range(year):
    for month in range(1, 13):
        start_time = datetime(year, month, 1, tzinfo=tz_norway)
        if month == 12:
            end_time = datetime(year + 1, 1, 1, tzinfo=tz_norway)
        else:
            end_time = datetime(year, month + 1, 1, tzinfo=tz_norway)
        yield start_time, end_time

# last Sunday of October 
def last_sunday_of_october(year):
    for day in range(31, 24, -1):  # 31 → 25
        d = datetime(year, 10, day, tzinfo=tz_norway)
        if d.weekday() == 6:  # Sunday
            return d

##Defining an empty list in which data will be stored from api
production_data = []

#Loop for list of years
for year in years:
    print(f"\n=== Retrieving data for Production {year} ===")
    year_count = 0
    #Defining a loop to pass dates for each month call to api
    for start_time, end_time in month_range(year):
        # Split October due to DST change
        if start_time.month == 10:
            dst_shift = last_sunday_of_october(year).replace(hour=1)
            parts = [(start_time, dst_shift), (dst_shift, end_time)]
        else:
            parts = [(start_time, end_time)]

        # Requesting data for each part
        for s_time, e_time in parts:
            params = {
                "dataset": dataset,
                "startDate": s_time.isoformat(timespec="seconds"),
                "endDate": e_time.isoformat(timespec="seconds")
            }

            response = requests.get(url, params=params)
            if response.status_code == 200:
                data_per_request = response.json()
                # Looping over all areas in 'data' and extend production_data with their records 
                for area_data in data_per_request.get("data", []):
                    production_list = area_data.get("attributes", {}).get("productionPerGroupMbaHour", [])
                    production_data.extend(production_list)
                    year_count += len(production_list)
                print(f" ✅ Added data for {s_time} → {e_time}")
            else:
                print(f" Error {response.status_code} for {s_time} → {e_time}")

    print(f" Total records collected for {year}: {year_count}")

print(f"\n Total records collected across all years: {len(production_data)}")



=== Retrieving data for Production 2022 ===
 ✅ Added data for 2022-01-01 00:00:00+01:00 → 2022-02-01 00:00:00+01:00
 ✅ Added data for 2022-02-01 00:00:00+01:00 → 2022-03-01 00:00:00+01:00
 ✅ Added data for 2022-03-01 00:00:00+01:00 → 2022-04-01 00:00:00+02:00
 ✅ Added data for 2022-04-01 00:00:00+02:00 → 2022-05-01 00:00:00+02:00
 ✅ Added data for 2022-05-01 00:00:00+02:00 → 2022-06-01 00:00:00+02:00
 ✅ Added data for 2022-06-01 00:00:00+02:00 → 2022-07-01 00:00:00+02:00
 ✅ Added data for 2022-07-01 00:00:00+02:00 → 2022-08-01 00:00:00+02:00
 ✅ Added data for 2022-08-01 00:00:00+02:00 → 2022-09-01 00:00:00+02:00
 ✅ Added data for 2022-09-01 00:00:00+02:00 → 2022-10-01 00:00:00+02:00
 ✅ Added data for 2022-10-01 00:00:00+02:00 → 2022-10-30 01:00:00+02:00
 ✅ Added data for 2022-10-30 01:00:00+02:00 → 2022-11-01 00:00:00+01:00
 ✅ Added data for 2022-11-01 00:00:00+01:00 → 2022-12-01 00:00:00+01:00
 ✅ Added data for 2022-12-01 00:00:00+01:00 → 2023-01-01 00:00:00+01:00
 Total records coll

In [None]:
# Creating a pandas dataframe
import pandas as pd
production_df=pd.DataFrame(production_data)

# Converting startTime, endTime and lastUpdatedTime to datetime and setting the timezone to UTC 
production_df['startTime'] = pd.to_datetime(production_df['startTime'], utc=True)
production_df['endTime'] = pd.to_datetime(production_df['endTime'], utc=True)
production_df['lastUpdatedTime'] = pd.to_datetime(production_df['lastUpdatedTime'], utc=True)
#Displaying few rows to verify 
print(production_df.head())
print(production_df.tail())
num_rows = len(production_df)
print('num rows',num_rows)  

                    endTime           lastUpdatedTime priceArea  \
0 2022-01-01 00:00:00+00:00 2025-02-01 17:02:57+00:00       NO1   
1 2022-01-01 01:00:00+00:00 2025-02-01 17:02:57+00:00       NO1   
2 2022-01-01 02:00:00+00:00 2025-02-01 17:02:57+00:00       NO1   
3 2022-01-01 03:00:00+00:00 2025-02-01 17:02:57+00:00       NO1   
4 2022-01-01 04:00:00+00:00 2025-02-01 17:02:57+00:00       NO1   

  productionGroup  quantityKwh                 startTime  
0           hydro    1291422.4 2021-12-31 23:00:00+00:00  
1           hydro    1246209.4 2022-01-01 00:00:00+00:00  
2           hydro    1271757.0 2022-01-01 01:00:00+00:00  
3           hydro    1204251.8 2022-01-01 02:00:00+00:00  
4           hydro    1202086.9 2022-01-01 03:00:00+00:00  
                         endTime           lastUpdatedTime priceArea  \
657595 2024-12-31 19:00:00+00:00 2025-03-30 16:39:27+00:00       NO5   
657596 2024-12-31 20:00:00+00:00 2025-03-30 16:39:27+00:00       NO5   
657597 2024-12-31 21:00:00+

In [3]:
#Cassandra setup 
from cassandra.cluster import Cluster
cluster = Cluster(['localhost'], port=9042)
session = cluster.connect()

In [4]:
#Creating a Spark Cassandra Session
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Ind_320App').\
    config('spark.jars.packages', 'com.datastax.spark:spark-cassandra-connector_2.12:3.5.1').\
    config('spark.cassandra.connection.host', 'localhost').\
    config('spark.sql.extensions', 'com.datastax.spark.connector.CassandraSparkExtensions').\
    config('spark.sql.catalog.mycatalog', 'com.datastax.spark.connector.datasource.CassandraCatalog').\
    config('spark.cassandra.connection.port', '9042').getOrCreate()
print("Spark session created successfully:", spark.version)

Spark session created successfully: 3.5.1


In [None]:
#Converting to spark dataframe from pandas dataframe
spark_production_df = spark.createDataFrame(production_df)

#Displaying info about columns from Spark DataFrame to verify
spark_production_df.printSchema()
spark_production_df.show(5)
num_rows = spark_production_df.count()
print(f"Number of rows: {num_rows}")

root
 |-- endTime: timestamp (nullable = true)
 |-- lastUpdatedTime: timestamp (nullable = true)
 |-- priceArea: string (nullable = true)
 |-- productionGroup: string (nullable = true)
 |-- quantityKwh: double (nullable = true)
 |-- startTime: timestamp (nullable = true)

+-------------------+-------------------+---------+---------------+-----------+-------------------+
|            endTime|    lastUpdatedTime|priceArea|productionGroup|quantityKwh|          startTime|
+-------------------+-------------------+---------+---------------+-----------+-------------------+
|2022-01-01 01:00:00|2025-02-01 18:02:57|      NO1|          hydro|  1291422.4|2022-01-01 00:00:00|
|2022-01-01 02:00:00|2025-02-01 18:02:57|      NO1|          hydro|  1246209.4|2022-01-01 01:00:00|
|2022-01-01 03:00:00|2025-02-01 18:02:57|      NO1|          hydro|  1271757.0|2022-01-01 02:00:00|
|2022-01-01 04:00:00|2025-02-01 18:02:57|      NO1|          hydro|  1204251.8|2022-01-01 03:00:00|
|2022-01-01 05:00:00|2025-0

In [None]:
#Appending spark dataframe to cassandra, using .write to give data to cassandra
spark_production_df.write.format("org.apache.spark.sql.cassandra")\
.options(table="production_table", keyspace="ind_320_d2").mode("append").save()

In [7]:
#Data extraction from Cassandra into a Spark Dataframe, using .load() to load data from Cassandra as a Spark DataFrame.
extracted_df_prod=spark.read.format("org.apache.spark.sql.cassandra").options(table="production_table", keyspace="ind_320_d2").load().select("priceArea", "productionGroup", "startTime","quantityKwh")
extracted_df_prod.show(5) 

+---------+---------------+-------------------+-----------+
|priceArea|productionGroup|          startTime|quantityKwh|
+---------+---------------+-------------------+-----------+
|      NO4|          other|2021-01-01 00:00:00|      0.161|
|      NO4|          other|2021-01-01 01:00:00|      0.161|
|      NO4|          other|2021-01-01 02:00:00|      0.161|
|      NO4|          other|2021-01-01 03:00:00|      0.161|
|      NO4|          other|2021-01-01 04:00:00|      0.161|
+---------+---------------+-------------------+-----------+
only showing top 5 rows



#### Data from API for Consumption

In [None]:
from datetime import datetime
from zoneinfo import ZoneInfo
import requests

#Defining to use for .get from the api
url = "https://api.elhub.no/energy-data/v0/price-areas"
dataset = "CONSUMPTION_PER_GROUP_MBA_HOUR"
# list of years to collect
years = [2021,2022, 2023, 2024]  
tz_norway = ZoneInfo("Europe/Oslo")

#Creating a function to get the dates for each month call for api
def month_range(year):
    for month in range(1, 13):
        start_time = datetime(year, month, 1, tzinfo=tz_norway)
        if month == 12:
            end_time = datetime(year + 1, 1, 1, tzinfo=tz_norway)
        else:
            end_time = datetime(year, month + 1, 1, tzinfo=tz_norway)
        yield start_time, end_time

# last Sunday of October 
def last_sunday_of_october(year):
    for day in range(31, 24, -1):  # 31 → 25
        d = datetime(year, 10, day, tzinfo=tz_norway)
        if d.weekday() == 6:  # Sunday
            return d

##Defining an empty list in which data will be stored from api
consumption_data = []

#Loop for list of years
for year in years:
    print(f"\n=== Retrieving data for Consumption {year} ===")
    year_count = 0
    #Defining a loop to pass dates for each month call to api
    for start_time, end_time in month_range(year):
        # Split October due to DST change
        if start_time.month == 10:
            dst_shift = last_sunday_of_october(year).replace(hour=1)
            parts = [(start_time, dst_shift), (dst_shift, end_time)]
        else:
            parts = [(start_time, end_time)]

        # Requesting data for each part
        for s_time, e_time in parts:
            params = {
                "dataset": dataset,
                "startDate": s_time.isoformat(timespec="seconds"),
                "endDate": e_time.isoformat(timespec="seconds")
            }

            response = requests.get(url, params=params)
            if response.status_code == 200:
                data_per_request = response.json()
                # Looping over all areas in 'data' and extend consumption_data with their records 
                for area_data in data_per_request.get("data", []):
                    consumption_list = area_data.get("attributes", {}).get("consumptionPerGroupMbaHour", [])
                    consumption_data.extend(consumption_list)
                    year_count += len(consumption_list)
                print(f" ✅ Added data for {s_time} → {e_time}")
            else:
                print(f" Error {response.status_code} for {s_time} → {e_time}")

    print(f" Total records collected for {year}: {year_count}")

print(f"\n Total records collected across all years: {len(consumption_data)}")



=== Retrieving data for Consumption 2021 ===
 ✅ Added data for 2021-01-01 00:00:00+01:00 → 2021-02-01 00:00:00+01:00
 ✅ Added data for 2021-02-01 00:00:00+01:00 → 2021-03-01 00:00:00+01:00
 ✅ Added data for 2021-03-01 00:00:00+01:00 → 2021-04-01 00:00:00+02:00
 ✅ Added data for 2021-04-01 00:00:00+02:00 → 2021-05-01 00:00:00+02:00
 ✅ Added data for 2021-05-01 00:00:00+02:00 → 2021-06-01 00:00:00+02:00
 ✅ Added data for 2021-06-01 00:00:00+02:00 → 2021-07-01 00:00:00+02:00
 ✅ Added data for 2021-07-01 00:00:00+02:00 → 2021-08-01 00:00:00+02:00
 ✅ Added data for 2021-08-01 00:00:00+02:00 → 2021-09-01 00:00:00+02:00
 ✅ Added data for 2021-09-01 00:00:00+02:00 → 2021-10-01 00:00:00+02:00
 ✅ Added data for 2021-10-01 00:00:00+02:00 → 2021-10-31 01:00:00+02:00
 ✅ Added data for 2021-10-31 01:00:00+02:00 → 2021-11-01 00:00:00+01:00
 ✅ Added data for 2021-11-01 00:00:00+01:00 → 2021-12-01 00:00:00+01:00
 ✅ Added data for 2021-12-01 00:00:00+01:00 → 2022-01-01 00:00:00+01:00
 Total records col

In [9]:
# Creating a pandas dataframe
import pandas as pd
consumption_df=pd.DataFrame(consumption_data)

# Converting startTime, endTime and lastUpdatedTime to datetime and setting the timezone to UTC 
consumption_df['startTime'] = pd.to_datetime(consumption_df['startTime'], utc=True)
consumption_df['endTime'] = pd.to_datetime(consumption_df['endTime'], utc=True)
consumption_df['lastUpdatedTime'] = pd.to_datetime(consumption_df['lastUpdatedTime'], utc=True)
#Displaying few rows to verify 
print(consumption_df.head())
print(consumption_df.tail())

  consumptionGroup                   endTime           lastUpdatedTime  \
0            cabin 2021-01-01 00:00:00+00:00 2024-12-20 09:35:40+00:00   
1            cabin 2021-01-01 01:00:00+00:00 2024-12-20 09:35:40+00:00   
2            cabin 2021-01-01 02:00:00+00:00 2024-12-20 09:35:40+00:00   
3            cabin 2021-01-01 03:00:00+00:00 2024-12-20 09:35:40+00:00   
4            cabin 2021-01-01 04:00:00+00:00 2024-12-20 09:35:40+00:00   

   meteringPointCount priceArea  quantityKwh                 startTime  
0              100607       NO1    177071.56 2020-12-31 23:00:00+00:00  
1              100607       NO1    171335.12 2021-01-01 00:00:00+00:00  
2              100607       NO1    164912.02 2021-01-01 01:00:00+00:00  
3              100607       NO1    160265.77 2021-01-01 02:00:00+00:00  
4              100607       NO1    159828.69 2021-01-01 03:00:00+00:00  
       consumptionGroup                   endTime           lastUpdatedTime  \
876595         tertiary 2024-12-31 19:

In [10]:
#Cassandra setup 
import time
from cassandra.cluster import Cluster
cluster = Cluster(['localhost'], port=9042)
session = cluster.connect()


In [11]:
#Setting up a cassandra keyspace  

# Use the existing keyspace
session.set_keyspace("ind_320_d2")

#Setting a table in the keyspace and ensuring it didnt exist before
session.execute("DROP TABLE IF EXISTS ind_320_d2.consumption_table;")

#Making sure that column names are entact so using case sensitive format
session.execute("CREATE TABLE IF NOT EXISTS consumption_table ("
                "\"startTime\" timestamp, "
                "\"endTime\" timestamp, "
                "\"lastUpdatedTime\" timestamp, "
                "\"priceArea\" text, "
                "\"consumptionGroup\" text, "
                "\"meteringPointCount\" int, "
                "\"quantityKwh\" double, "  
                "PRIMARY KEY ((\"priceArea\", \"consumptionGroup\"), \"startTime\")) "
                "WITH CLUSTERING ORDER BY (\"startTime\" ASC);")

<cassandra.cluster.ResultSet at 0x1d6c348d310>

In [12]:
#Creating a Spark Cassandra Session
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Ind_320App').\
    config('spark.jars.packages', 'com.datastax.spark:spark-cassandra-connector_2.12:3.5.1').\
    config('spark.cassandra.connection.host', 'localhost').\
    config('spark.sql.extensions', 'com.datastax.spark.connector.CassandraSparkExtensions').\
    config('spark.sql.catalog.mycatalog', 'com.datastax.spark.connector.datasource.CassandraCatalog').\
    config('spark.cassandra.connection.port', '9042').getOrCreate()
print("Spark session created successfully:", spark.version)

Spark session created successfully: 3.5.1


In [13]:
#Converting to spark dataframe from pandas dataframe
spark_consumption_df = spark.createDataFrame(consumption_df)

#Displaying info about columns from Spark DataFrame to verify
spark_consumption_df.printSchema()
spark_consumption_df.show(5)

root
 |-- consumptionGroup: string (nullable = true)
 |-- endTime: timestamp (nullable = true)
 |-- lastUpdatedTime: timestamp (nullable = true)
 |-- meteringPointCount: long (nullable = true)
 |-- priceArea: string (nullable = true)
 |-- quantityKwh: double (nullable = true)
 |-- startTime: timestamp (nullable = true)

+----------------+-------------------+-------------------+------------------+---------+-----------+-------------------+
|consumptionGroup|            endTime|    lastUpdatedTime|meteringPointCount|priceArea|quantityKwh|          startTime|
+----------------+-------------------+-------------------+------------------+---------+-----------+-------------------+
|           cabin|2021-01-01 01:00:00|2024-12-20 10:35:40|            100607|      NO1|  177071.56|2021-01-01 00:00:00|
|           cabin|2021-01-01 02:00:00|2024-12-20 10:35:40|            100607|      NO1|  171335.12|2021-01-01 01:00:00|
|           cabin|2021-01-01 03:00:00|2024-12-20 10:35:40|            100607| 

In [14]:
#Inserting spark dataframe to cassandra, using .write to give data to cassandra
spark_consumption_df.write.format("org.apache.spark.sql.cassandra")\
.options(table="consumption_table", keyspace="ind_320_d2").mode("append").save()

In [15]:
#Data extraction from Cassandra into a Spark Dataframe, using .load() to load data from Cassandra as a Spark DataFrame.
extracted_df_cons=spark.read.format("org.apache.spark.sql.cassandra").options(table="consumption_table", keyspace="ind_320_d2").load().select("priceArea", "consumptionGroup", "startTime","endTime","quantityKwh","meteringPointCount")
extracted_df_cons.show(5) 

+---------+----------------+-------------------+-------------------+-----------+------------------+
|priceArea|consumptionGroup|          startTime|            endTime|quantityKwh|meteringPointCount|
+---------+----------------+-------------------+-------------------+-----------+------------------+
|      NO5|       secondary|2021-01-01 00:00:00|2021-01-01 01:00:00|  1094799.6|              5984|
|      NO5|       secondary|2021-01-01 01:00:00|2021-01-01 02:00:00|  1099480.8|              5984|
|      NO5|       secondary|2021-01-01 02:00:00|2021-01-01 03:00:00|  1054455.9|              5984|
|      NO5|       secondary|2021-01-01 03:00:00|2021-01-01 04:00:00|  1049728.6|              5984|
|      NO5|       secondary|2021-01-01 04:00:00|2021-01-01 05:00:00|  1099942.9|              5984|
+---------+----------------+-------------------+-------------------+-----------+------------------+
only showing top 5 rows



### MongoDb Connection

In [16]:
#Connecting to mongodb

from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi

username,password = open(r"D:\NMBU\semester_1\IND-320\mongodb_password.txt").read().strip().split(',')

uri = f"mongodb+srv://{username}:{password}@app-cluster.ihj1zbx.mongodb.net/?retryWrites=true&w=majority&appName=app-cluster"

# Create a new client and connect to the server
client = MongoClient(uri, server_api=ServerApi('1'))

# Send a ping to confirm a successful connection
try:
    client.admin.command('ping')
    print("Pinged your deployment. You successfully connected to MongoDB!")
except Exception as e:
    print(e)

Pinged your deployment. You successfully connected to MongoDB!


In [17]:
#Defining a database and collection inside the cluster 
database=client['ind320_production_db']
collection=database['ind320_production_table_d4']
collection2=database['ind320_consumption_table']

In [18]:
#Converting Spark Dataframe to Pandas Dataframe and dictionaries (PRODUCTION DATA)
pandas_df = extracted_df_prod.toPandas()
data_dict = pandas_df.to_dict("records")

#Inserting into MongoDB
collection.insert_many(data_dict)

print("Spark data inserted into MongoDB successfully!")

Spark data inserted into MongoDB successfully!


In [19]:
#Converting Spark Dataframe to Pandas Dataframe and dictionaries(CONSUMPTION DATA)
pandas_df_cons = extracted_df_cons.toPandas()
data_dict_cons = pandas_df_cons.to_dict("records")

#Inserting into MongoDB
collection2.insert_many(data_dict_cons)

print("Spark data inserted into MongoDB successfully!")

Spark data inserted into MongoDB successfully!
