# IND320 – Project part 4

## 1. Links
## 2. AI log
## 3. Work log
## 4. Elhub production data 2022–2024
## 5. Elhub consumption data 2021–2024
## 6. Storage in Cassandra and MongoDB

## 1. Linker
Github: https://github.com/marensofiesteen/IND320-streamlit/tree/main
Streamlit: might come later

## 2. AI-log
Throughout the project, I used ChatGPT as a support tool for troubleshooting, code review, and improving structure and readability. It helped me debug Streamlit errors, and write cleaner, more consistent Python code. I also used it to refine explanations, write short documentation, and ensure that the application met the assignment requirements. However, ChatGPT had clear limitations - it could not fully understand my specific file structure, debug live code execution, or make design decisions for me, so I had to test, adjust, and reason through several issues on my own.

## 3. Work Log
In this assignment I first retrieved hourly production and consumption data from the Elhub API for the years 2021–2024. I generated monthly date intervals in the Norwegian time zone and queried the PRODUCTION_PER_GROUP_MBA_HOUR and CONSUMPTION_PER_GROUP_MBA_HOUR endpoints. The responses were cleaned and transformed into a consistent structure using the same processing steps as in earlier parts of the project. I then created new tables in Cassandra and loaded all monthly datasets via Spark, before exporting the complete results to MongoDB Atlas. Old example collections were removed to ensure that only the updated 2021–2024 data remained available for the Streamlit application.

For the Streamlit app, I refactored data loading, added caching (@st.cache_data and @st.cache_resource), and implemented clear error handling for missing API connections, empty datasets and invalid selections. I added spinners and progress indicators so that long operations (API calls, model fitting) provide visible feedback. I developed new pages for sliding-window correlation between weather and energy, with selectable lag, window size, price area, group, year and meteorological variable. I also implemented a full SARIMAX forecasting interface with selectable model parameters, training period, forecast horizon, and optional weather-based exogenous variables. All results are visualised with interactive Plotly charts.

As part of the optional tasks, I completed several bonus components:
(1) Waiting time – spinners and progress bars across the app,
(2) Error handling – consistent checks and user-friendly warnings,
(3) Map page – a combined map showing NVE Elspot price areas coloured by mean GWh and, when zoomed in, municipality borders from Geonorge, with tooltips and stored click-coordinates,
(4) Snow drift – both yearly and monthly snow-drift calculations and plots using the coordinates selected on the map page.

In [1]:
import os
import requests
import pandas as pd
import pytz
from datetime import datetime, timedelta
from pyspark.sql import SparkSession

EXECUTE_API = True
EXECUTE_localDB = True

# --- Make sure Spark uses your venv Python ---
os.environ["PYSPARK_PYTHON"] = "/Users/marenssteen/Documents/IND320/Streamlit/IND320-streamlit/.venv/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/Users/marenssteen/Documents/IND320/Streamlit/IND320-streamlit/.venv/bin/python"
os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"

# --- Force Spark to download JAR even if offline cache is empty ---
spark = (
    SparkSession.builder
    .appName("SparkCassandraApp")
    .config(
        "spark.jars.packages",
        "com.datastax.spark:spark-cassandra-connector_2.12:3.5.1"
    )
    .config("spark.cassandra.connection.host", "127.0.0.1")
    .config("spark.cassandra.connection.port", "9042")
    .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions")
    .config("spark.sql.catalog.mycatalog", "com.datastax.spark.connector.datasource.CassandraCatalog")
    # ensures driver binds to localhost correctly
    .config("spark.driver.bindAddress", "127.0.0.1")
    .config("spark.driver.host", "127.0.0.1")
    .getOrCreate()
)

print("Spark session started.")
print("Spark version:", spark.version)
print("Packages:", spark.sparkContext.getConf().get("spark.jars.packages"))




:: loading settings :: url = jar:file:/Users/marenssteen/Documents/IND320/Streamlit/IND320-streamlit/.venv/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/marenssteen/.ivy2/cache
The jars for the packages stored in: /Users/marenssteen/.ivy2/jars
com.datastax.spark#spark-cassandra-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-bab69ac6-82bf-4c52-94be-cc207c3d00ec;1.0
	confs: [default]
	found com.datastax.spark#spark-cassandra-connector_2.12;3.5.1 in central
	found com.datastax.spark#spark-cassandra-connector-driver_2.12;3.5.1 in central
	found org.scala-lang.modules#scala-collection-compat_2.12;2.11.0 in central
	found org.apache.cassandra#java-driver-core-shaded;4.18.1 in central
	found com.datastax.oss#native-protocol;1.5.1 in central
	found com.datastax.oss#java-driver-shaded-guava;25.1-jre-graal-sub-1 in central
	found com.typesafe#config;1.4.1 in central
	found org.slf4j#slf4j-api;1.7.26 in central
	found io.dropwizard.metrics#metrics-core;4.1.18 in central
	found org.hdrhistogram#HdrHistogram;2.1.12 in central
	found org.reactivestreams#reactive

Spark session started.
Spark version: 3.5.1
Packages: com.datastax.spark:spark-cassandra-connector_2.12:3.5.1


In [2]:
def get_period_start_end(year: int, month: int):
    """
    Build a full-month time interval in Norwegian local time (Europe/Oslo).
    Start: first day of month at 00:00
    End:   last day of month at 23:00
    Returned datetimes are timezone-aware.
    """
    norway_tz = pytz.timezone("Europe/Oslo")

    # First day of this month
    start_naive = datetime(year, month, 1, 0, 0, 0)

    # First day of next month
    if month == 12:
        next_month_naive = datetime(year + 1, 1, 1, 0, 0, 0)
    else:
        next_month_naive = datetime(year, month + 1, 1, 0, 0, 0)

    # Localize both to Norwegian time zone
    start_local = norway_tz.localize(start_naive)

    # One hour before next month starts → last hour of current month
    end_local = norway_tz.localize(next_month_naive) - timedelta(hours=1)

    return start_local, end_local


In [3]:
def load_parse_production_data(start_dt, end_dt):
    """
    Fetch PRODUCTION_PER_GROUP_MBA_HOUR data from Elhub API for a given time interval,
    then parse into a flat python list of dicts.
    """
    url = "https://api.elhub.no/energy-data/v0/price-areas"

    params = {
        "dataset": "PRODUCTION_PER_GROUP_MBA_HOUR",  # Production dataset
        "startDate": start_dt.isoformat(),           # ISO 8601 with timezone
        "endDate": end_dt.isoformat()
    }

    print(f"Requesting production data: {start_dt} -> {end_dt}")
    response = requests.get(url, params=params)

    if response.status_code != 200:
        print(f"⚠️ Failed to get data for {start_dt} - {end_dt}: status {response.status_code}")
        return []

    data_json = response.json()

    parsed_data = []
    # The JSON has a list under 'data'; each element has attributes including
    # 'productionPerGroupMbaHour' which is itself a list
    for data in data_json.get("data", []):
        for item in data["attributes"]["productionPerGroupMbaHour"]:
            # Each 'item' is a dict with keys like:
            # endTime, lastUpdatedTime, priceArea, productionGroup, quantityKwh, startTime
            parsed_data.append(item)

    print(f"  -> Parsed {len(parsed_data)} rows")
    return parsed_data


In [4]:
def write_to_cassandra_via_spark(data_list,
                                 table: str = "production_data",
                                 keyspace: str = "elnub"):
    """
    Write a list of Elhub production records to a Cassandra table using Spark.
    Uses lower-case, snake-like column names as in Assignment 2.
    """
    if not data_list:
        print("No data to write to Cassandra for this chunk.")
        return

    # Convert to pandas DataFrame first
    df = pd.DataFrame(data_list)

    # Force consistent, lower-case column names (same order as in Assignment 2)
    df.columns = [
        "endtime",
        "lastupdatedtime",
        "pricearea",
        "productiongroup",
        "quantitykwh",
        "starttime",
    ]

    print(f"Writing {len(df)} rows to Cassandra table '{table}' in keyspace '{keyspace}' ...")

    # Use Spark to write into Cassandra (append mode)
    (spark.createDataFrame(df)
          .write
          .format("org.apache.spark.sql.cassandra")
          .options(table=table, keyspace=keyspace)
          .mode("append")
          .save())

    print("✅ Chunk written to Cassandra.")


In [5]:
if EXECUTE_API and EXECUTE_localDB:
    start_year = 2022
    end_year = 2024

    for year in range(start_year, end_year + 1):
        print(f"\n============================")
        print(f"Year {year}: starting Elhub import")
        print(f"============================\n")

        for month in range(1, 13):
            print(f"\n--- Month {month} ---")

            # Build default full-month interval
            start_dt, end_dt = get_period_start_end(year, month)

            # Special handling for October (DST transition) – split in two parts
            # This follows the same idea as in Assignment 2 for 2021.
            # It is a bit defensive, but keeps the time series clean.
            if month == 10:
                norway_tz = pytz.timezone("Europe/Oslo")

                # End of first part: day before last in October at 23:00
                # (so we avoid the DST change night in a single chunk)
                end_first_part = norway_tz.localize(datetime(year, 10, 30, 23, 0, 0))

                parts = [
                    (start_dt, end_first_part),
                    (
                        norway_tz.localize(datetime(year, 10, 31, 0, 0, 0)),
                        norway_tz.localize(datetime(year, 10, 31, 23, 0, 0)),
                    ),
                ]
            else:
                parts = [(start_dt, end_dt)]

            # For each (possibly split) time interval in this month:
            for start_part, end_part in parts:
                # 1) Fetch and parse data from Elhub
                data_list = load_parse_production_data(start_part, end_part)

                # 2) Write chunk to Cassandra
                write_to_cassandra_via_spark(
                    data_list,
                    table="production_data",   # Same table as in Assignment 2
                    keyspace="elnub",
                )

        print(f"\n✅ Finished year {year}.\n")

else:
    print("EXECUTE_API or EXECUTE_localDB is False → skipping Elhub import.")



Year 2022: starting Elhub import


--- Month 1 ---
Requesting production data: 2022-01-01 00:00:00+01:00 -> 2022-01-31 23:00:00+01:00
  -> Parsed 18575 rows
Writing 18575 rows to Cassandra table 'production_data' in keyspace 'elnub' ...


                                                                                

✅ Chunk written to Cassandra.

--- Month 2 ---
Requesting production data: 2022-02-01 00:00:00+01:00 -> 2022-02-28 23:00:00+01:00
  -> Parsed 16775 rows
Writing 16775 rows to Cassandra table 'production_data' in keyspace 'elnub' ...
✅ Chunk written to Cassandra.

--- Month 3 ---
Requesting production data: 2022-03-01 00:00:00+01:00 -> 2022-03-31 23:00:00+02:00
  -> Parsed 18550 rows
Writing 18550 rows to Cassandra table 'production_data' in keyspace 'elnub' ...
✅ Chunk written to Cassandra.

--- Month 4 ---
Requesting production data: 2022-04-01 00:00:00+02:00 -> 2022-04-30 23:00:00+02:00
  -> Parsed 17975 rows
Writing 17975 rows to Cassandra table 'production_data' in keyspace 'elnub' ...
✅ Chunk written to Cassandra.

--- Month 5 ---
Requesting production data: 2022-05-01 00:00:00+02:00 -> 2022-05-31 23:00:00+02:00
  -> Parsed 18575 rows
Writing 18575 rows to Cassandra table 'production_data' in keyspace 'elnub' ...
✅ Chunk written to Cassandra.

--- Month 6 ---
Requesting production

                                                                                

✅ Chunk written to Cassandra.

--- Month 5 ---
Requesting production data: 2023-05-01 00:00:00+02:00 -> 2023-05-31 23:00:00+02:00
  -> Parsed 18575 rows
Writing 18575 rows to Cassandra table 'production_data' in keyspace 'elnub' ...
✅ Chunk written to Cassandra.

--- Month 6 ---
Requesting production data: 2023-06-01 00:00:00+02:00 -> 2023-06-30 23:00:00+02:00
  -> Parsed 17975 rows
Writing 17975 rows to Cassandra table 'production_data' in keyspace 'elnub' ...
✅ Chunk written to Cassandra.

--- Month 7 ---
Requesting production data: 2023-07-01 00:00:00+02:00 -> 2023-07-31 23:00:00+02:00
  -> Parsed 18575 rows
Writing 18575 rows to Cassandra table 'production_data' in keyspace 'elnub' ...
✅ Chunk written to Cassandra.

--- Month 8 ---
Requesting production data: 2023-08-01 00:00:00+02:00 -> 2023-08-31 23:00:00+02:00
  -> Parsed 18575 rows
Writing 18575 rows to Cassandra table 'production_data' in keyspace 'elnub' ...
✅ Chunk written to Cassandra.

--- Month 9 ---
Requesting production

In [6]:
# This cell reads the full 'production_data' table from Cassandra
# and prints the total number of rows + shows the first 5 records.

# Read back from Cassandra to verify that data has been written correctly
df_check = (spark.read
            .format("org.apache.spark.sql.cassandra")
            .options(table="production_data", keyspace="elnub")
            .load())

print("Number of rows in production_data:", df_check.count())
df_check.show(5)


                                                                                

Number of rows in production_data: 817978
+---------+---------------+-------------------+-------------------+-------------------+-----------+
|pricearea|productiongroup|          starttime|            endtime|    lastupdatedtime|quantitykwh|
+---------+---------------+-------------------+-------------------+-------------------+-----------+
|      NO4|          hydro|2021-01-01 00:00:00|2021-01-01 01:00:00|2024-12-20 10:35:40|  3740830.0|
|      NO4|          hydro|2021-01-01 01:00:00|2021-01-01 02:00:00|2024-12-20 10:35:40|  3746663.5|
|      NO4|          hydro|2021-01-01 02:00:00|2021-01-01 03:00:00|2024-12-20 10:35:40|  3712439.8|
|      NO4|          hydro|2021-01-01 03:00:00|2021-01-01 04:00:00|2024-12-20 10:35:40|  3699229.0|
|      NO4|          hydro|2021-01-01 04:00:00|2021-01-01 05:00:00|2024-12-20 10:35:40|  3685393.8|
+---------+---------------+-------------------+-------------------+-------------------+-----------+
only showing top 5 rows



In [16]:
### Exporting data from Cassandra to MongoDB Atlas (updated for Part 4)

"""In Assignment 2, data was exported from Cassandra to a **local MongoDB instance**
(`mongodb://localhost:27017`).  
In Assignment 4, the Streamlit application uses **MongoDB Atlas**, so the export
pipeline is updated accordingly:

- No local MongoDB is used anymore.
- Atlas credentials are stored securely in `.secrets_notebook.toml` (ignored by Git).
- Both *production_data* and *consumption_data* are now exported directly to Atlas.

The code below replaces the original local-Mongo versions."""


# --- Export production_data (2021-2024) from Cassandra to MongoDB Atlas ---

import tomllib
from pymongo import MongoClient
import math

# Load URIs securely
with open(".secrets_notebook.toml", "rb") as f:
    secrets = tomllib.load(f)

mongo_uri_atlas = secrets["mongo"]["uri"]
db_name_atlas = secrets["mongo"]["db"]

print("✅ Loaded MongoDB Atlas URI from secret file.")

# 0) Read production data from Cassandra
df_prod_atlas = (
    spark.read
    .format("org.apache.spark.sql.cassandra")
    .options(table="production_data", keyspace="elnub")
    .load()
)

total_prod = df_prod_atlas.count()
print("Cassandra rows in production_data:", total_prod)

if total_prod == 0:
    print("⚠️ No rows found in production_data. Nothing to export.")
else:
    # 1) Connect to Atlas
    client_atlas = MongoClient(mongo_uri_atlas)
    db_atlas = client_atlas[db_name_atlas]
    coll_prod_atlas = db_atlas["production_data"]

    # Clear existing data
    coll_prod_atlas.delete_many({})
    print("MongoDB Atlas 'production_data' collection cleared.")

    # 2) Export in batches
    BATCH = 10_000
    num_batches = math.ceil(total_prod / BATCH)
    print(f"Exporting production in ~{num_batches} batches of {BATCH} rows")

    buffer = []
    count_written = 0

    for row in df_prod_atlas.toLocalIterator():
        buffer.append(row.asDict())
        if len(buffer) >= BATCH:
            coll_prod_atlas.insert_many(buffer)
            count_written += len(buffer)
            print(f"Wrote {count_written}/{total_prod}")
            buffer = []

    if buffer:
        coll_prod_atlas.insert_many(buffer)
        count_written += len(buffer)

    print("✅ Export to Atlas completed (production).")
    print("Atlas count:", coll_prod_atlas.count_documents({}))



✅ Loaded MongoDB Atlas URI from secret file.


                                                                                

Cassandra rows in production_data: 817978
MongoDB Atlas 'production_data' collection cleared.
Exporting production in ~82 batches of 10000 rows


[Stage 132:>                                                        (0 + 1) / 1]

Wrote 10000/817978
Wrote 20000/817978
Wrote 30000/817978
Wrote 40000/817978
Wrote 50000/817978
Wrote 60000/817978
Wrote 70000/817978
Wrote 80000/817978
Wrote 90000/817978
Wrote 100000/817978
Wrote 110000/817978
Wrote 120000/817978
Wrote 130000/817978
Wrote 140000/817978
Wrote 150000/817978
Wrote 160000/817978
Wrote 170000/817978
Wrote 180000/817978
Wrote 190000/817978


[Stage 137:>                                                        (0 + 1) / 1]

Wrote 200000/817978
Wrote 210000/817978
Wrote 220000/817978
Wrote 230000/817978
Wrote 240000/817978
Wrote 250000/817978
Wrote 260000/817978
Wrote 270000/817978
Wrote 280000/817978
Wrote 290000/817978
Wrote 300000/817978
Wrote 310000/817978
Wrote 320000/817978


[Stage 138:>                                                        (0 + 1) / 1]

Wrote 330000/817978
Wrote 340000/817978
Wrote 350000/817978
Wrote 360000/817978
Wrote 370000/817978
Wrote 380000/817978
Wrote 390000/817978


[Stage 139:>                                                        (0 + 1) / 1]

Wrote 400000/817978
Wrote 410000/817978
Wrote 420000/817978
Wrote 430000/817978
Wrote 440000/817978
Wrote 450000/817978


[Stage 142:>                                                        (0 + 1) / 1]

Wrote 460000/817978
Wrote 470000/817978
Wrote 480000/817978
Wrote 490000/817978
Wrote 500000/817978
Wrote 510000/817978
Wrote 520000/817978
Wrote 530000/817978
Wrote 540000/817978
Wrote 550000/817978


[Stage 144:>                                                        (0 + 1) / 1]

Wrote 560000/817978
Wrote 570000/817978
Wrote 580000/817978
Wrote 590000/817978
Wrote 600000/817978
Wrote 610000/817978
Wrote 620000/817978
Wrote 630000/817978
Wrote 640000/817978
Wrote 650000/817978


[Stage 145:>                                                        (0 + 1) / 1]

Wrote 660000/817978
Wrote 670000/817978
Wrote 680000/817978
Wrote 690000/817978
Wrote 700000/817978
Wrote 710000/817978


[Stage 147:>                                                        (0 + 1) / 1]

Wrote 720000/817978
Wrote 730000/817978
Wrote 740000/817978
Wrote 750000/817978
Wrote 760000/817978
Wrote 770000/817978
Wrote 780000/817978
Wrote 790000/817978
Wrote 800000/817978
Wrote 810000/817978
✅ Export to Atlas completed (production).
Atlas count: 817978


In [9]:
# --- Prepare Cassandra table for consumption data (2021-2024) ---

from cassandra.cluster import Cluster

# Connect to Cassandra cluster
cluster = Cluster(["127.0.0.1"], port=9042)
session = cluster.connect()

if EXECUTE_localDB:
    # Make sure keyspace exists
    session.execute("""
        CREATE KEYSPACE IF NOT EXISTS elnub WITH REPLICATION = {
            'class': 'SimpleStrategy',
            'replication_factor': 1}
    """)

    # Use keyspace
    session.set_keyspace("elnub")

    # Create consumption_data table if it does not exist
    session.execute("""
        CREATE TABLE IF NOT EXISTS elnub.consumption_data (
            priceArea TEXT,
            consumptionGroup TEXT,
            startTime TIMESTAMP,
            quantityKwh DOUBLE,
            endTime TIMESTAMP,
            lastUpdatedTime TIMESTAMP,
            PRIMARY KEY ((priceArea, consumptionGroup), startTime)
        );
    """)

    print("✅ Cassandra table 'elnub.consumption_data' is ready.")
else:
    print("EXECUTE_localDB is False → skipping Cassandra table creation for consumption.")


✅ Cassandra table 'elnub.consumption_data' is ready.


In [10]:
def load_parse_consumption_data(start_dt, end_dt):
    """
    Fetch CONSUMPTION_PER_GROUP_MBA_HOUR data from Elhub API for a given time interval,
    then parse into a flat python list of dicts.
    """
    url = "https://api.elhub.no/energy-data/v0/price-areas"

    params = {
        "dataset": "CONSUMPTION_PER_GROUP_MBA_HOUR",  # Consumption dataset
        "startDate": start_dt.isoformat(),            # ISO 8601 with timezone
        "endDate": end_dt.isoformat()
    }

    print(f"Requesting consumption data: {start_dt} -> {end_dt}")
    response = requests.get(url, params=params)

    if response.status_code != 200:
        print(f"⚠️ Failed to get data for {start_dt} - {end_dt}: status {response.status_code}")
        return []

    data_json = response.json()

    parsed_data = []
    # The JSON has a list under 'data'; each element has attributes including
    # 'consumptionPerGroupMbaHour' which is itself a list
    for data in data_json.get("data", []):
        for item in data["attributes"]["consumptionPerGroupMbaHour"]:
            # Each 'item' is a dict with keys like:
            # endTime, lastUpdatedTime, priceArea, consumptionGroup, quantityKwh, startTime
            parsed_data.append(item)

    print(f"  -> Parsed {len(parsed_data)} rows")
    return parsed_data


In [13]:
def write_consumption_to_cassandra_via_spark(data_list,
                                             table: str = "consumption_data",
                                             keyspace: str = "elnub"):
    """
    Write a list of Elhub consumption records to a Cassandra table using Spark.
    We explicitly select only the columns we need from the JSON.
    """
    if not data_list:
        print("No consumption data to write to Cassandra for this chunk.")
        return

    # Convert to pandas DataFrame first
    df = pd.DataFrame(data_list)

    # Keep only the columns we actually need from the Elhub JSON
    # Typical keys are: endTime, lastUpdatedTime, priceArea, consumptionGroup, quantityKwh, startTime
    needed_cols = [
        "endTime",
        "lastUpdatedTime",
        "priceArea",
        "consumptionGroup",
        "quantityKwh",
        "startTime",
    ]

    missing = [c for c in needed_cols if c not in df.columns]
    if missing:
        print("⚠️ Missing expected columns in consumption data:", missing)
        print("Available columns:", list(df.columns))
        return

    df = df[needed_cols]

    # Rename to lower-case names used in Cassandra (Cassandra is case-insensitive, but we stay consistent)
    df.columns = [
        "endtime",
        "lastupdatedtime",
        "pricearea",
        "consumptiongroup",
        "quantitykwh",
        "starttime",
    ]

    print(f"Writing {len(df)} rows to Cassandra table '{table}' in keyspace '{keyspace}' ...")

    (spark.createDataFrame(df)
          .write
          .format("org.apache.spark.sql.cassandra")
          .options(table=table, keyspace=keyspace)
          .mode("append")
          .save())

    print("✅ Consumption chunk written to Cassandra.")


In [14]:
if EXECUTE_API and EXECUTE_localDB:
    start_year_cons = 2021
    end_year_cons = 2024

    for year in range(start_year_cons, end_year_cons + 1):
        print(f"\n============================")
        print(f"Year {year}: starting Elhub CONSUMPTION import")
        print(f"============================\n")

        for month in range(1, 13):
            print(f"\n--- Month {month} ---")

            # Build default full-month interval
            start_dt, end_dt = get_period_start_end(year, month)

            # Same defensive DST handling for October as for production
            if month == 10:
                norway_tz = pytz.timezone("Europe/Oslo")

                end_first_part = norway_tz.localize(datetime(year, 10, 30, 23, 0, 0))

                parts = [
                    (start_dt, end_first_part),
                    (
                        norway_tz.localize(datetime(year, 10, 31, 0, 0, 0)),
                        norway_tz.localize(datetime(year, 10, 31, 23, 0, 0)),
                    ),
                ]
            else:
                parts = [(start_dt, end_dt)]

            for start_part, end_part in parts:
                # 1) Fetch and parse consumption data from Elhub
                cons_list = load_parse_consumption_data(start_part, end_part)

                # 2) Write chunk to Cassandra
                write_consumption_to_cassandra_via_spark(
                    cons_list,
                    table="consumption_data",
                    keyspace="elnub",
                )

        print(f"\n✅ Finished CONSUMPTION year {year}.\n")

else:
    print("EXECUTE_API or EXECUTE_localDB is False → skipping Elhub CONSUMPTION import.")



Year 2021: starting Elhub CONSUMPTION import


--- Month 1 ---
Requesting consumption data: 2021-01-01 00:00:00+01:00 -> 2021-01-31 23:00:00+01:00
  -> Parsed 18575 rows
Writing 18575 rows to Cassandra table 'consumption_data' in keyspace 'elnub' ...


                                                                                

✅ Consumption chunk written to Cassandra.

--- Month 2 ---
Requesting consumption data: 2021-02-01 00:00:00+01:00 -> 2021-02-28 23:00:00+01:00
  -> Parsed 16775 rows
Writing 16775 rows to Cassandra table 'consumption_data' in keyspace 'elnub' ...
✅ Consumption chunk written to Cassandra.

--- Month 3 ---
Requesting consumption data: 2021-03-01 00:00:00+01:00 -> 2021-03-31 23:00:00+02:00
  -> Parsed 18550 rows
Writing 18550 rows to Cassandra table 'consumption_data' in keyspace 'elnub' ...
✅ Consumption chunk written to Cassandra.

--- Month 4 ---
Requesting consumption data: 2021-04-01 00:00:00+02:00 -> 2021-04-30 23:00:00+02:00
  -> Parsed 17975 rows
Writing 17975 rows to Cassandra table 'consumption_data' in keyspace 'elnub' ...
✅ Consumption chunk written to Cassandra.

--- Month 5 ---
Requesting consumption data: 2021-05-01 00:00:00+02:00 -> 2021-05-31 23:00:00+02:00
  -> Parsed 18575 rows
Writing 18575 rows to Cassandra table 'consumption_data' in keyspace 'elnub' ...
✅ Consumptio

                                                                                

✅ Consumption chunk written to Cassandra.

--- Month 12 ---
Requesting consumption data: 2022-12-01 00:00:00+01:00 -> 2022-12-31 23:00:00+01:00
  -> Parsed 18575 rows
Writing 18575 rows to Cassandra table 'consumption_data' in keyspace 'elnub' ...
✅ Consumption chunk written to Cassandra.

✅ Finished CONSUMPTION year 2022.


Year 2023: starting Elhub CONSUMPTION import


--- Month 1 ---
Requesting consumption data: 2023-01-01 00:00:00+01:00 -> 2023-01-31 23:00:00+01:00
  -> Parsed 18575 rows
Writing 18575 rows to Cassandra table 'consumption_data' in keyspace 'elnub' ...
✅ Consumption chunk written to Cassandra.

--- Month 2 ---
Requesting consumption data: 2023-02-01 00:00:00+01:00 -> 2023-02-28 23:00:00+01:00
  -> Parsed 16775 rows
Writing 16775 rows to Cassandra table 'consumption_data' in keyspace 'elnub' ...
✅ Consumption chunk written to Cassandra.

--- Month 3 ---
Requesting consumption data: 2023-03-01 00:00:00+01:00 -> 2023-03-31 23:00:00+02:00
  -> Parsed 18550 rows
Writing 1

In [18]:
### Exporting data from Cassandra to MongoDB Atlas (updated for Part 4)

"""In Assignment 2, data was exported from Cassandra to a **local MongoDB instance**
(`mongodb://localhost:27017`).  
In Assignment 4, the Streamlit application uses **MongoDB Atlas**, so the export
pipeline is updated accordingly:

- No local MongoDB is used anymore.
- Atlas credentials are stored securely in `.secrets_notebook.toml` (ignored by Git).
- Both *production_data* and *consumption_data* are now exported directly to Atlas.

The code below replaces the original local-Mongo versions."""

# --- Export consumption_data (2021-2024) from Cassandra to MongoDB Atlas ---

import tomllib
from pymongo import MongoClient
import math

# Load URIs securely
with open(".secrets_notebook.toml", "rb") as f:
    secrets = tomllib.load(f)

mongo_uri_atlas = secrets["mongo"]["uri"]
db_name_atlas = secrets["mongo"]["db"]

print("✅ Loaded MongoDB Atlas URI from secret file.")

# 0) Read consumption data from Cassandra
df_cons_atlas = (
    spark.read
    .format("org.apache.spark.sql.cassandra")
    .options(table="consumption_data", keyspace="elnub")
    .load()
)

total_cons = df_cons_atlas.count()
print("Cassandra rows in consumption_data:", total_cons)

if total_cons == 0:
    print("⚠️ No rows found in consumption_data. Nothing to export.")
else:
    # 1) Connect to Atlas
    client_atlas = MongoClient(mongo_uri_atlas)
    db_atlas = client_atlas[db_name_atlas]
    coll_cons_atlas = db_atlas["consumption_data"]

    # Clear existing data
    coll_cons_atlas.delete_many({})
    print("MongoDB Atlas 'consumption_data' collection cleared.")

    # 2) Export in batches
    BATCH = 10_000
    num_batches = math.ceil(total_cons / BATCH)
    print(f"Exporting consumption in ~{num_batches} batches of {BATCH} rows")

    buffer = []
    count_written = 0

    for row in df_cons_atlas.toLocalIterator():
        buffer.append(row.asDict())
        if len(buffer) >= BATCH:
            coll_cons_atlas.insert_many(buffer)
            count_written += len(buffer)
            print(f"Wrote {count_written}/{total_cons}")
            buffer = []

    if buffer:
        coll_cons_atlas.insert_many(buffer)
        count_written += len(buffer)

    print("✅ Export to Atlas completed (consumption).")
    print("Atlas count:", coll_cons_atlas.count_documents({}))


✅ Loaded MongoDB Atlas URI from secret file.


                                                                                

Cassandra rows in consumption_data: 821300
MongoDB Atlas 'consumption_data' collection cleared.
Exporting consumption in ~83 batches of 10000 rows


[Stage 170:>                                                        (0 + 1) / 1]

Wrote 10000/821300
Wrote 20000/821300
Wrote 30000/821300
Wrote 40000/821300
Wrote 50000/821300
Wrote 60000/821300
Wrote 70000/821300
Wrote 80000/821300
Wrote 90000/821300


[Stage 172:>                                                        (0 + 1) / 1]

Wrote 100000/821300
Wrote 110000/821300
Wrote 120000/821300
Wrote 130000/821300
Wrote 140000/821300
Wrote 150000/821300
Wrote 160000/821300


[Stage 173:>                                                        (0 + 1) / 1]

Wrote 170000/821300
Wrote 180000/821300
Wrote 190000/821300
Wrote 200000/821300
Wrote 210000/821300
Wrote 220000/821300
Wrote 230000/821300
Wrote 240000/821300
Wrote 250000/821300
Wrote 260000/821300
Wrote 270000/821300
Wrote 280000/821300
Wrote 290000/821300
Wrote 300000/821300
Wrote 310000/821300
Wrote 320000/821300
Wrote 330000/821300
Wrote 340000/821300
Wrote 350000/821300
Wrote 360000/821300


[Stage 177:>                                                        (0 + 1) / 1]

Wrote 370000/821300
Wrote 380000/821300
Wrote 390000/821300


[Stage 178:>                                                        (0 + 1) / 1]

Wrote 400000/821300
Wrote 410000/821300
Wrote 420000/821300
Wrote 430000/821300
Wrote 440000/821300
Wrote 450000/821300


[Stage 180:>                                                        (0 + 1) / 1]

Wrote 460000/821300
Wrote 470000/821300
Wrote 480000/821300
Wrote 490000/821300
Wrote 500000/821300
Wrote 510000/821300
Wrote 520000/821300
Wrote 530000/821300
Wrote 540000/821300
Wrote 550000/821300


[Stage 181:>                                                        (0 + 1) / 1]

Wrote 560000/821300
Wrote 570000/821300
Wrote 580000/821300
Wrote 590000/821300
Wrote 600000/821300
Wrote 610000/821300
Wrote 620000/821300
Wrote 630000/821300
Wrote 640000/821300
Wrote 650000/821300
Wrote 660000/821300
Wrote 670000/821300
Wrote 680000/821300
Wrote 690000/821300
Wrote 700000/821300
Wrote 710000/821300
Wrote 720000/821300


[Stage 186:>                                                        (0 + 1) / 1]

Wrote 730000/821300
Wrote 740000/821300
Wrote 750000/821300
Wrote 760000/821300
Wrote 770000/821300
Wrote 780000/821300
Wrote 790000/821300
Wrote 800000/821300
Wrote 810000/821300
Wrote 820000/821300
✅ Export to Atlas completed (consumption).
Atlas count: 821300
