# IND320 CA3 Ivar Eftedal

NB! The map does for some reason not show on chrome but it does work on safari. I do apologise for this inconvenience. 

### Use of AI
- AI was used a lot for the streamlit. For the jupiter notebook i did not use ai at all for this assignment, but i might have used it for assignment 2. Therefore the code might be somewhat ai generated, but all of what was editet for this assignment was done without external help. It was mostly fairly simple to use previous code and make some small altercations to get the code that i needed for this assignment. For the streamlit i used ai to convert the matplotlib plots all to plotly so that they are more dynamic. I also used a lot of ai to create my pages as well as for debugging. Most of the work done without the use of ai was getting the right data as well as making the plan and architecture for the project. When it came to the decision of what libraries to use, adding simple features or making extensive changes i usually did them myself to lay a foundation for the ai to alter on later. I found that doing everything with ai there would be a lot of mistakes as weird code snippets. For example i experimented with trying claude, but it wrote way too much code for my taste which made me lose control of what was being implemented so i did not use it. And just stuck with using the free tier of chatgpt that rather gives small snippets to implement.

### Log of compulsary work
- Working on this project was a mix of smooth experiences and real challenges. I found doing the jupiter notebook the easiest as it was very similar to the second assignment so there was a lot i could take of code from there. Retrieving hourly production data from the Elhub API and processing it for storage in MongoDB and Cassandra felt straightforward. Overall, the Jupyter part felt understandable and manageable, and I actually enjoyed diving into the data exploration and processing steps. 
- In contrast, the Streamlit app turned out to be much harder. Refactoring static plots into dynamic Plotly visualizations was rewarding, but figuring out page structure, menus, and interactive selections required a lot more trial and error than I expected. The mapping component, in particular, has been tricky. While I successfully integrated GeoJSON overlays for the Price areas and implemented interactive features like storing clicked coordinates and highlighting selected areas, I ran into a frustrating issue: the map only renders properly in Safari and doesn’t display at all in Chrome. This has been a major headache, and I’ve spent a lot of time trying to troubleshoot it without a complete solution yet. Despite this, seeing the map work on safari felt exciting, and it’s satisfying to know the underlying logic is correct. Other aspects of Streamlit, like snow drift calculations, wind rose visualizations, and dynamic forecasting with SARIMAX, were challenging but fun once they started working. I especially liked seeing the results respond interactively to user input. 


### Experience with jupiter notebook and streamlit
- This time it all pretty much went smooth, apart from forgetting to save files occasionally which was frustrating but not really a jupiter or streamlit problem. This is the assigmnet where i have enjoyed working with jupiter notebooks the most and still really enjoyed streamlit. 

### Choice of additional task
Forecasting
- Add eather properties to the list exoogenous variables and download them when needed.

### Reflection
Using the Streamlit sliding window correlation tool, I might have noticed some correlations between meteorological variables and energy production or consumption are subtle under normal conditions. However, during extreme weather events, patterns might become somwhat more pronounced for example, energy demand increases when it gets very hot or very cold. I also notice that sometimes the effects don’t happen immediately, there can be a bit of a delay before the energy response shows up. Overall, the correlations are most pronounced during unusual or intense weather events if there are any.

### Links:
- [GitHub](https://github.com/ioeftedal/ind320ioeftedal)
- https://github.com/ioeftedal/ind320ioeftedal
- [Streamlit](https://ind320ioeftedal.streamlit.app)
- https://ind320ioeftedal.streamlit.app

## Getting into the code:

### Import dependencies

In [5]:
import os
import sys
import requests
import pandas as pd
import matplotlib.pyplot as plt
from pymongo import MongoClient
from pyspark.sql import SparkSession
from cassandra.cluster import Cluster
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType, IntegerType
from pyspark.sql import functions as F

### Ensure cassandra connector is loaded

In [6]:
os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--packages com.datastax.spark:spark-cassandra-connector_2.12:3.5.1 pyspark-shell"
)

# Defining some essential constansts
BASE_URL = "https://api.elhub.no/energy-data/v0/price-areas"
START_YEAR = 2022
END_YEAR = 2024
KEYSPACE = "elhub_data"
TABLE = f"consumption_{START_YEAR}_{END_YEAR}"

### Connect to cassandra

In [7]:
cluster = Cluster(['localhost'], port=9042)
session = cluster.connect()

# Create keyspace if it does not exist
session.execute(f"""
CREATE KEYSPACE IF NOT EXISTS {KEYSPACE}
WITH REPLICATION = {{ 'class': 'SimpleStrategy', 'replication_factor': 1 }};
""")

session.set_keyspace(KEYSPACE)

# Drop and recreate table so that i dont get too much data in the database while developing
session.execute(f"DROP TABLE IF EXISTS {TABLE};")

# Creating the structure of how the database should look
session.execute(f"""
CREATE TABLE IF NOT EXISTS {TABLE} (
    priceArea text,
    consumptionGroup text,
    quantityKwh double,
    startTime timestamp,
    endTime timestamp,
    meteringPointCount int,
    lastUpdatedTime timestamp,
    PRIMARY KEY ((priceArea, consumptionGroup), startTime)
);
""")

<cassandra.cluster.ResultSet at 0x322169d60>

### Fetch data from API

In [8]:
# Creating an empty list that will store the data for each entry
records = []

for year in range(START_YEAR, END_YEAR + 1):
    # Loop over all months of the year
    for month in range(1, 13):
        start = f"{year}-{month:02d}-01"
        end = f"{year + 1}-01-01" if month == 12 else f"{year}-{month + 1:02d}-01"
        url = f"{BASE_URL}?startDate={start}&endDate={end}&dataset=CONSUMPTION_PER_GROUP_MBA_HOUR"

        print(f"Fetching data for {start} until {end}")

        # Save the request as json format
        response = requests.get(url).json()

        for item in response.get("data", []):
            attributes = item.get("attributes", {})
            consumption_list = attributes.get("consumptionPerGroupMbaHour", [])

            if isinstance(consumption_list, list):
                for column in consumption_list:
                    records.append({
                        "priceArea": column.get("priceArea"),
                        "consumptionGroup": column.get("consumptionGroup"),
                        "quantityKwh": column.get("quantityKwh"),
                        "startTime": column.get("startTime"),
                        "endTime": column.get("endTime"),
                        "meteringPointCount": column.get("meteringPointCount"),
                        "lastUpdatedTime": column.get("lastUpdatedTime"),
                    })

    print(f"Finished fetching. Total records: {len(records)}")

Fetching data for 2022-01-01 until 2022-02-01
Fetching data for 2022-02-01 until 2022-03-01
Fetching data for 2022-03-01 until 2022-04-01
Fetching data for 2022-04-01 until 2022-05-01
Fetching data for 2022-05-01 until 2022-06-01
Fetching data for 2022-06-01 until 2022-07-01
Fetching data for 2022-07-01 until 2022-08-01
Fetching data for 2022-08-01 until 2022-09-01
Fetching data for 2022-09-01 until 2022-10-01
Fetching data for 2022-10-01 until 2022-11-01
Fetching data for 2022-11-01 until 2022-12-01
Fetching data for 2022-12-01 until 2023-01-01
Finished fetching. Total records: 219000
Fetching data for 2023-01-01 until 2023-02-01
Fetching data for 2023-02-01 until 2023-03-01
Fetching data for 2023-03-01 until 2023-04-01
Fetching data for 2023-04-01 until 2023-05-01
Fetching data for 2023-05-01 until 2023-06-01
Fetching data for 2023-06-01 until 2023-07-01
Fetching data for 2023-07-01 until 2023-08-01
Fetching data for 2023-08-01 until 2023-09-01
Fetching data for 2023-09-01 until 2023

### Convert to spark dataframe

In [9]:
# Define schema for the data that will be inserted into the database
schema = StructType([
    StructField("priceArea", StringType(), True),
    StructField("consumptionGroup", StringType(), True),
    StructField("quantityKwh", DoubleType(), True),
    StructField("startTime", TimestampType(), True),
    StructField("endTime", TimestampType(), True),
    StructField("meteringPointCount", IntegerType(), True),
    StructField("lastUpdatedTime", TimestampType(), True),
])

# Convert to pandas and make sure to take the summertimes and wintertimes into account (utc=true)
pandas_df = pd.DataFrame(records)
pandas_df["startTime"] = pd.to_datetime(pandas_df["startTime"], utc=True)
pandas_df["endTime"] = pd.to_datetime(pandas_df["endTime"], utc=True)
pandas_df["lastUpdatedTime"] = pd.to_datetime(pandas_df["lastUpdatedTime"], utc=True)

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("SparkCassandraApp") \
    .config("spark.cassandra.connection.host", "localhost") \
    .config("spark.cassandra.connection.port", "9042") \
    .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions") \
    .config("spark.sql.catalog.mycatalog", "com.datastax.spark.connector.datasource.CassandraCatalog") \
    .getOrCreate()

# Create a spark dataframe that follows the schema that was created
spark_df = spark.createDataFrame(pandas_df, schema=schema)

# Rename column names to lowercase so that it works with cassandra since it is case sensitive
spark_df = (
    spark_df
    .withColumnRenamed("priceArea", "pricearea")
    .withColumnRenamed("consumptionGroup", "consumptiongroup")
    .withColumnRenamed("startTime", "starttime")
    .withColumnRenamed("endTime", "endtime")
    .withColumnRenamed("meteringPointCount", "meteringpointcount")
    .withColumnRenamed("lastUpdatedTime", "lastupdatedtime")
    .withColumnRenamed("quantityKwh", "quantitykwh")
)

25/11/27 17:08:57 WARN Utils: Your hostname, Ivars-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.42.93.165 instead (on interface en0)
25/11/27 17:08:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/ivareftedal/.ivy2/cache
The jars for the packages stored in: /Users/ivareftedal/.ivy2/jars
com.datastax.spark#spark-cassandra-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-ec4e26d9-a80b-4a5d-911a-64456d0061bf;1.0
	confs: [default]


:: loading settings :: url = jar:file:/Users/ivareftedal/repositories/github.com/ioeftedal/ind320ioeftedal/.venv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found com.datastax.spark#spark-cassandra-connector_2.12;3.5.1 in central
	found com.datastax.spark#spark-cassandra-connector-driver_2.12;3.5.1 in central
	found org.scala-lang.modules#scala-collection-compat_2.12;2.11.0 in central
	found org.apache.cassandra#java-driver-core-shaded;4.18.1 in central
	found com.datastax.oss#native-protocol;1.5.1 in central
	found com.datastax.oss#java-driver-shaded-guava;25.1-jre-graal-sub-1 in central
	found com.typesafe#config;1.4.1 in central
	found org.slf4j#slf4j-api;1.7.26 in central
	found io.dropwizard.metrics#metrics-core;4.1.18 in central
	found org.hdrhistogram#HdrHistogram;2.1.12 in central
	found org.reactivestreams#reactive-streams;1.0.3 in central
	found org.apache.cassandra#java-driver-mapper-runtime;4.18.1 in central
	found org.apache.cassandra#java-driver-query-builder;4.18.1 in central
	found org.apache.commons#commons-lang3;3.10 in central
	found com.thoughtworks.paranamer#paranamer;2.8 in central
	found org.scala-lang#scala-reflect

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 65255)
Traceback (most recent call last):
  File "/nix/store/d5bvj78dzx6wjnz13vawcjb3pa5hpdkv-python3-3.12.11/lib/python3.12/socketserver.py", line 318, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/nix/store/d5bvj78dzx6wjnz13vawcjb3pa5hpdkv-python3-3.12.11/lib/python3.12/socketserver.py", line 349, in process_request
    self.finish_request(request, client_address)
  File "/nix/store/d5bvj78dzx6wjnz13vawcjb3pa5hpdkv-python3-3.12.11/lib/python3.12/socketserver.py", line 362, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/nix/store/d5bvj78dzx6wjnz13vawcjb3pa5hpdkv-python3-3.12.11/lib/python3.12/socketserver.py", line 766, in __init__
    self.handle()
  File "/Users/ivareftedal/repositories/github.com/ioeftedal/ind320ioeftedal/.venv/lib/python3.12/site-packages/pyspark/accumulators.py", line 295, in ha

### Write to cassandra

In [10]:
spark_df.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("append") \
    .option("keyspace", KEYSPACE) \
    .option("table", TABLE) \
    .save()

25/11/27 17:09:18 WARN TaskSetManager: Stage 0 contains a task of very large size (3155 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

### Verify that it worked by reading from the cassandra database

In [11]:
spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .option("keyspace", KEYSPACE) \
    .option("table", TABLE) \
    .load() \
    .show(5)

+---------+----------------+-------------------+-------------------+-------------------+------------------+-----------+
|pricearea|consumptiongroup|          starttime|            endtime|    lastupdatedtime|meteringpointcount|quantitykwh|
+---------+----------------+-------------------+-------------------+-------------------+------------------+-----------+
|      NO5|       household|2022-01-01 00:00:00|2022-01-01 01:00:00|2025-02-01 18:02:02|            251668|  461827.56|
|      NO5|       household|2022-01-01 01:00:00|2022-01-01 02:00:00|2025-02-01 18:02:02|            251668|  463685.66|
|      NO5|       household|2022-01-01 02:00:00|2022-01-01 03:00:00|2025-02-01 18:02:02|            251668|   458524.7|
|      NO5|       household|2022-01-01 03:00:00|2022-01-01 04:00:00|2025-02-01 18:02:02|            251668|  451057.38|
|      NO5|       household|2022-01-01 04:00:00|2022-01-01 05:00:00|2025-02-01 18:02:02|            251668|   444570.5|
+---------+----------------+------------

### Insert data into mongodb

In [12]:
# Read entire cassandra database
cassandra_df = (
    spark.read
    .format("org.apache.spark.sql.cassandra")
    .option("keyspace", KEYSPACE)
    .option("table", TABLE)
    .load()
)

# Convert the cassandra dataframe to pandas
cassandra_pd = cassandra_df.toPandas()

# Impporting my uri from a secret file
sys.path.append(os.path.abspath(".."))
from secret import *
# Connect to MongoDB
client = MongoClient(uri)

# Choose target database and collection
mongo_db = client["elhub_data"]
mongo_collection = mongo_db[f"consumption_{START_YEAR}_{END_YEAR}"]

# Convert timestamps to native Python datetime (for BSON)
for col in ["starttime", "endtime", "lastupdatedtime"]:
    if col in cassandra_pd.columns:
        cassandra_pd[col] = pd.to_datetime(cassandra_pd[col]).dt.to_pydatetime()

# Convert DataFrame to list of dictionaries
records_to_insert = cassandra_pd.to_dict("records")

# Clear old data to avoid duplicates for bettr development
mongo_collection.delete_many({})

# Insert all records
if records_to_insert:
    mongo_collection.insert_many(records_to_insert)

# Verify insertion
for doc in mongo_collection.find().limit(3):
    print(doc)


  cassandra_pd[col] = pd.to_datetime(cassandra_pd[col]).dt.to_pydatetime()
  cassandra_pd[col] = pd.to_datetime(cassandra_pd[col]).dt.to_pydatetime()
  cassandra_pd[col] = pd.to_datetime(cassandra_pd[col]).dt.to_pydatetime()


{'_id': ObjectId('6928784bd39f946d24c96d73'), 'pricearea': 'NO3', 'consumptiongroup': 'cabin', 'starttime': datetime.datetime(2022, 1, 1, 0, 0), 'endtime': datetime.datetime(2022, 1, 1, 1, 0), 'lastupdatedtime': datetime.datetime(2025, 2, 1, 18, 2, 2), 'meteringpointcount': 58109, 'quantitykwh': 64627.54}
{'_id': ObjectId('6928784bd39f946d24c96d74'), 'pricearea': 'NO3', 'consumptiongroup': 'cabin', 'starttime': datetime.datetime(2022, 1, 1, 1, 0), 'endtime': datetime.datetime(2022, 1, 1, 2, 0), 'lastupdatedtime': datetime.datetime(2025, 2, 1, 18, 2, 2), 'meteringpointcount': 58109, 'quantitykwh': 64400.266}
{'_id': ObjectId('6928784bd39f946d24c96d75'), 'pricearea': 'NO3', 'consumptiongroup': 'cabin', 'starttime': datetime.datetime(2022, 1, 1, 2, 0), 'endtime': datetime.datetime(2022, 1, 1, 3, 0), 'lastupdatedtime': datetime.datetime(2025, 2, 1, 18, 2, 2), 'meteringpointcount': 58109, 'quantitykwh': 63396.773}


In [13]:
# Defining some essential constansts
START_YEAR = 2021
END_YEAR = 2024
KEYSPACE = "elhub_data"
TABLE = f"production_{START_YEAR}_{END_YEAR}"

### Connect to cassandra

In [14]:
cluster = Cluster(['localhost'], port=9042)
session = cluster.connect()

# Create keyspace if it does not exist
session.execute(f"""
CREATE KEYSPACE IF NOT EXISTS {KEYSPACE}
WITH REPLICATION = {{ 'class': 'SimpleStrategy', 'replication_factor': 1 }};
""")

session.set_keyspace(KEYSPACE)

# Drop and recreate table so that i dont get too much data in the database while developing
session.execute(f"DROP TABLE IF EXISTS {TABLE};")

# Creating the structure of how the database should look
session.execute(f"""
CREATE TABLE IF NOT EXISTS {TABLE} (
    priceArea text,
    productionGroup text,
    quantityKwh double,
    startTime timestamp,
    endTime timestamp,
    meteringPointCount int,
    lastUpdatedTime timestamp,
    PRIMARY KEY ((priceArea, productionGroup), startTime)
);
""")

<cassandra.cluster.ResultSet at 0x32216a7e0>

### Fetch data from API

In [15]:
# Creating an empty list that will store the data for each entry
records = []

for year in range(START_YEAR, END_YEAR + 1):
    # Loop over all months of the year
    for month in range(1, 13):
        start = f"{year}-{month:02d}-01"
        end = f"{year + 1}-01-01" if month == 12 else f"{year}-{month + 1:02d}-01"
        url = f"{BASE_URL}?startDate={start}&endDate={end}&dataset=PRODUCTION_PER_GROUP_MBA_HOUR"

        print(f"Fetching data for {start} until {end}")

        # Save the request as json format
        response = requests.get(url).json()

        for item in response.get("data", []):
            attributes = item.get("attributes", {})
            production_list = attributes.get("productionPerGroupMbaHour", [])

            if isinstance(production_list, list):
                for column in production_list:
                    records.append({
                        "priceArea": column.get("priceArea"),
                        "productionGroup": column.get("productionGroup"),
                        "quantityKwh": column.get("quantityKwh"),
                        "startTime": column.get("startTime"),
                        "endTime": column.get("endTime"),
                        "meteringPointCount": column.get("meteringPointCount"),
                        "lastUpdatedTime": column.get("lastUpdatedTime"),
                    })

    print(f"Finished fetching. Total records: {len(records)}")

Fetching data for 2021-01-01 until 2021-02-01
Fetching data for 2021-02-01 until 2021-03-01
Fetching data for 2021-03-01 until 2021-04-01
Fetching data for 2021-04-01 until 2021-05-01
Fetching data for 2021-05-01 until 2021-06-01
Fetching data for 2021-06-01 until 2021-07-01
Fetching data for 2021-07-01 until 2021-08-01
Fetching data for 2021-08-01 until 2021-09-01
Fetching data for 2021-09-01 until 2021-10-01
Fetching data for 2021-10-01 until 2021-11-01
Fetching data for 2021-11-01 until 2021-12-01
Fetching data for 2021-12-01 until 2022-01-01
Finished fetching. Total records: 215353
Fetching data for 2022-01-01 until 2022-02-01
Fetching data for 2022-02-01 until 2022-03-01
Fetching data for 2022-03-01 until 2022-04-01
Fetching data for 2022-04-01 until 2022-05-01
Fetching data for 2022-05-01 until 2022-06-01
Fetching data for 2022-06-01 until 2022-07-01
Fetching data for 2022-07-01 until 2022-08-01
Fetching data for 2022-08-01 until 2022-09-01
Fetching data for 2022-09-01 until 2022

### Convert to spark dataframe

In [16]:
# Define schema for the data that will be inserted into the database
schema = StructType([
    StructField("priceArea", StringType(), True),
    StructField("productionGroup", StringType(), True),
    StructField("quantityKwh", DoubleType(), True),
    StructField("startTime", TimestampType(), True),
    StructField("endTime", TimestampType(), True),
    StructField("meteringPointCount", IntegerType(), True),
    StructField("lastUpdatedTime", TimestampType(), True),
])

# Convert to pandas and make sure to take the summertimes and wintertimes into account (utc=true)
pandas_df = pd.DataFrame(records)
pandas_df["startTime"] = pd.to_datetime(pandas_df["startTime"], utc=True)
pandas_df["endTime"] = pd.to_datetime(pandas_df["endTime"], utc=True)
pandas_df["lastUpdatedTime"] = pd.to_datetime(pandas_df["lastUpdatedTime"], utc=True)

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("SparkCassandraApp") \
    .config("spark.cassandra.connection.host", "localhost") \
    .config("spark.cassandra.connection.port", "9042") \
    .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions") \
    .config("spark.sql.catalog.mycatalog", "com.datastax.spark.connector.datasource.CassandraCatalog") \
    .getOrCreate()

# Create a spark dataframe that follows the schema that was created
spark_df = spark.createDataFrame(pandas_df, schema=schema)

# Rename column names to lowercase so that it works with cassandra since it is case sensitive
spark_df = (
    spark_df
    .withColumnRenamed("priceArea", "pricearea")
    .withColumnRenamed("productionGroup", "productiongroup")
    .withColumnRenamed("startTime", "starttime")
    .withColumnRenamed("endTime", "endtime")
    .withColumnRenamed("meteringPointCount", "meteringpointcount")
    .withColumnRenamed("lastUpdatedTime", "lastupdatedtime")
    .withColumnRenamed("quantityKwh", "quantitykwh")
)

### Write to cassandra

In [17]:
spark_df.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("append") \
    .option("keyspace", KEYSPACE) \
    .option("table", TABLE) \
    .save()

25/11/27 17:20:51 WARN TaskSetManager: Stage 3 contains a task of very large size (3864 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

### Verify that it worked by reading from the cassandra database

In [18]:
spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .option("keyspace", KEYSPACE) \
    .option("table", TABLE) \
    .load() \
    .show(5)

+---------+---------------+-------------------+-------------------+-------------------+------------------+-----------+
|pricearea|productiongroup|          starttime|            endtime|    lastupdatedtime|meteringpointcount|quantitykwh|
+---------+---------------+-------------------+-------------------+-------------------+------------------+-----------+
|      NO4|          hydro|2021-01-01 00:00:00|2021-01-01 01:00:00|2024-12-20 10:35:40|              NULL|  3740830.0|
|      NO4|          hydro|2021-01-01 01:00:00|2021-01-01 02:00:00|2024-12-20 10:35:40|              NULL|  3746663.5|
|      NO4|          hydro|2021-01-01 02:00:00|2021-01-01 03:00:00|2024-12-20 10:35:40|              NULL|  3712439.8|
|      NO4|          hydro|2021-01-01 03:00:00|2021-01-01 04:00:00|2024-12-20 10:35:40|              NULL|  3699229.0|
|      NO4|          hydro|2021-01-01 04:00:00|2021-01-01 05:00:00|2024-12-20 10:35:40|              NULL|  3685393.8|
+---------+---------------+-------------------+-

### Insert data into mongodb

In [19]:
# Read entire cassandra database
cassandra_df = (
    spark.read
    .format("org.apache.spark.sql.cassandra")
    .option("keyspace", KEYSPACE)
    .option("table", TABLE)
    .load()
)

# Convert the cassandra dataframe to pandas
cassandra_pd = cassandra_df.toPandas()

# Impporting my uri from a secret file
sys.path.append(os.path.abspath(".."))
from secret import *
# Connect to MongoDB
client = MongoClient(uri)

# Choose target database and collection
mongo_db = client["elhub_data"]
mongo_collection = mongo_db[f"production_{START_YEAR}_{END_YEAR}"]

# Convert timestamps to native Python datetime (for BSON)
for col in ["starttime", "endtime", "lastupdatedtime"]:
    if col in cassandra_pd.columns:
        cassandra_pd[col] = pd.to_datetime(cassandra_pd[col]).dt.to_pydatetime()

# Convert DataFrame to list of dictionaries
records_to_insert = cassandra_pd.to_dict("records")

# Clear old data to avoid duplicates for bettr development
mongo_collection.delete_many({})

# Insert all records
if records_to_insert:
    mongo_collection.insert_many(records_to_insert)

# Verify insertion
for doc in mongo_collection.find().limit(3):
    print(doc)

  cassandra_pd[col] = pd.to_datetime(cassandra_pd[col]).dt.to_pydatetime()
  cassandra_pd[col] = pd.to_datetime(cassandra_pd[col]).dt.to_pydatetime()
  cassandra_pd[col] = pd.to_datetime(cassandra_pd[col]).dt.to_pydatetime()


{'_id': ObjectId('69287b30d39f946d24d375e9'), 'pricearea': 'NO2', 'productiongroup': 'other', 'starttime': datetime.datetime(2021, 1, 1, 0, 0), 'endtime': datetime.datetime(2021, 1, 1, 1, 0), 'lastupdatedtime': datetime.datetime(2024, 12, 20, 10, 35, 40), 'meteringpointcount': nan, 'quantitykwh': 4.346}
{'_id': ObjectId('69287b30d39f946d24d375ea'), 'pricearea': 'NO2', 'productiongroup': 'other', 'starttime': datetime.datetime(2021, 1, 1, 1, 0), 'endtime': datetime.datetime(2021, 1, 1, 2, 0), 'lastupdatedtime': datetime.datetime(2024, 12, 20, 10, 35, 40), 'meteringpointcount': nan, 'quantitykwh': 3.642}
{'_id': ObjectId('69287b30d39f946d24d375eb'), 'pricearea': 'NO2', 'productiongroup': 'other', 'starttime': datetime.datetime(2021, 1, 1, 2, 0), 'endtime': datetime.datetime(2021, 1, 1, 3, 0), 'lastupdatedtime': datetime.datetime(2024, 12, 20, 10, 35, 40), 'meteringpointcount': nan, 'quantitykwh': 3.562}


25/11/28 02:13:37 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 908685 ms exceeds timeout 120000 ms
25/11/28 02:13:37 WARN SparkContext: Killing executors is not supported by current scheduler.
25/11/28 02:13:39 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
	at o