IND 320 - NMBU

Project work, part 2 - Data Sources

\newpage

## AI usage


## Log describing



## Github and Streamlit app links

- Streamlit app: [https://liserochat-ind320-dashboard.streamlit.app
](https://liserochat-ind320-dashboard.streamlit.app)  
- GitHub repository: [https://github.com/lise-dev/liserochat-ind320-dashboard.git](https://github.com/lise-dev/liserochat-ind320-dashboard.git)

\newpage

## 1. Setup and imports

In [1]:
import os
from pathlib import Path
import pandas as pd
import requests
from datetime import datetime
from pyspark.sql import SparkSession
from pymongo import MongoClient

## 2. Define constants

I define the constants that will be used throughout the notebook.

They include:
- API parameters for fetching data from Elhub.
- Lists of price areas, production groups, and months for 2021.
- MongoDB connection details for storing the processed dataset online.


In [2]:
# API settings
API_BASE = "https://api.elhub.no/energy-data/v0/price-areas"  
DATASET  = "PRODUCTION_PER_GROUP_MBA_HOUR"

# Production and area constants
PRICE_AREAS = ["NO1","NO2","NO3","NO4","NO5"]
PROD_GROUPS = ["solar","hydro","wind","thermal","other"]
MONTHS = [f"2021-{m:02d}" for m in range(1,13)]

# MongoDB connection details
MONGO_URI = "mongodb+srv://rochatlise17_db_user:7Ydw0jbNZLMWKhb@ind320.nrhhfgb.mongodb.net/?retryWrites=true&w=majority&appName=ind320"
MONGO_DB = "ind320"
MONGO_COLLECTION = "elhub_production_2021"

## 3. Define helper for monthly time ranges

The Elhub API requires UTC timestamps for the start and end of each request.  
I created a helper function that takes a year-month string (for example `"2021-01"`) and returns the corresponding start and end timestamps in UTC format.  

In [3]:
def month_range_utc(ym: str):
    start = pd.Timestamp(f"{ym}-01 00:00:00", tz="UTC")
    end   = (start + pd.offsets.MonthEnd(1)) + pd.Timedelta(days=1)
    
    def fmt(ts):
        s = ts.strftime("%Y-%m-%dT%H:%M:%S%z")
        return s[:-2] + ":" + s[-2:]
    
    return fmt(start), fmt(end)

## 4. Fetch hourly production data for one area and month

This function sends a request to the Elhub API for one production group, one price area, and one month.  
It builds the correct query parameters (`priceArea`, `startDate`, `endDate`, `productionGroup`), retrieves the JSON response, and converts the relevant data into a clean Pandas DataFrame.

The returned dataframe contains the following columns:
- `price_area`
- `production_group`
- `start_time`
- `quantity_kwh`


In [4]:
def fetch_month_one_group(area: str, ym: str, group: str) -> pd.DataFrame:
    """Fetch hourly production data for a specific area, month and production group."""
    
    start = pd.Timestamp(f"{ym}-01").strftime("%Y-%m-%d")
    end = (pd.Timestamp(f"{ym}-01") + pd.offsets.MonthEnd(1)).strftime("%Y-%m-%d")

    url = f"https://api.elhub.no/energy-data/v0/price-areas/{area}"
    params = {
        "dataset": "PRODUCTION_PER_GROUP_MBA_HOUR",
        "startDate": start,
        "endDate": end,
        "productionGroup": group,
    }

    response = requests.get(url, params=params, timeout=60)
    response.raise_for_status()
    data_json = response.json()

    data = data_json.get("data", [])
    if not data:
        return pd.DataFrame(columns=["price_area", "production_group", "start_time", "quantity_kwh"])
    
    attributes = data[0].get("attributes", {})
    items = attributes.get("productionPerGroupMbaHour", [])

    if not items:
        return pd.DataFrame(columns=["price_area", "production_group", "start_time", "quantity_kwh"])

    df = (
        pd.json_normalize(items)[["priceArea", "productionGroup", "startTime", "quantityKwh"]]
        .rename(columns={
            "priceArea": "price_area",
            "productionGroup": "production_group",
            "startTime": "start_time",
            "quantityKwh": "quantity_kwh",
        })
    )

    df["start_time"] = pd.to_datetime(df["start_time"], utc=True, errors="coerce")
    return df


## 5. Loop over all areas and months of 2021

In this step, I loop through all five Norwegian price areas and the twelve months of 2021.  
For each area, month, and production group, the `fetch_month_one_group()` function is called.  
All the resulting DataFrames are combined into a single dataset called `raw_df`.


In [None]:
all_chunks = []

for area in PRICE_AREAS:
    for ym in MONTHS:
        for g in PROD_GROUPS:
            try:
                dfm = fetch_month_one_group(area, ym, g)
                if not dfm.empty:
                    all_chunks.append(dfm)
                else:
                    print(f"Empty result for {area} {ym} {g}")
            except Exception as e:
                print(f"Failed for {area} {ym} {g}: {e}")

if all_chunks:
    raw_df = pd.concat(all_chunks, ignore_index=True)
else:
    raw_df = pd.DataFrame(columns=["price_area", "production_group", "start_time", "quantity_kwh"])

print("Total number of rows and columns:", raw_df.shape)
raw_df.head()


## 6. Save the raw data to CSV

Once all data are collected, I export the full dataset to a CSV file.  
This file will be used later in Spark (for Cassandra) and in the Streamlit app.

The CSV is saved in the project data folder:
`/home/lse/Documents/IND320/liserochat-ind320-dashboard/app/data/elhub_production_2021_raw.csv`

In [None]:
out_path = Path("/home/lse/Documents/IND320/liserochat-ind320-dashboard/app/data/elhub_production_2021_raw.csv")
out_path.parent.mkdir(parents=True, exist_ok=True)

if raw_df.empty:
    print("No CSV file was created.")
else:
    raw_df.to_csv(out_path, index=False)
    print(f"CSV file successfully saved to: {out_path.resolve()}")

## 7. Initialize Spark with the Cassandra connector

Here I create a `SparkSession` configured to connect to the local Cassandra instance running in Docker  
(on `127.0.0.1:9042`). The Spark Cassandra connector is automatically downloaded via  
`spark.jars.packages` when the session starts.


In [None]:
# Stop any previous Spark session if running
try:
    spark.stop()
except Exception:
    pass

# Create a new Spark session configured for Cassandra
spark = (
    SparkSession.builder
    .appName("IND320-Elhub-2021")
    .config("spark.jars.packages", "com.datastax.spark:spark-cassandra-connector_2.13:3.5.0")
    .config("spark.cassandra.connection.host", "127.0.0.1")
    .config("spark.cassandra.connection.port", "9042")
    .config("spark.cassandra.output.consistency.level", "LOCAL_ONE")
    .getOrCreate()
)

spark

## 8. Prepare the dataframe we want to persist in Cassandra

Before inserting data into Cassandra, I check that the dataframe `raw_df` exists,  
is not empty, and includes the expected columns with correct names:
- `price_area`
- `production_group`
- `start_time`
- `quantity_kwh`

In [None]:
# Check that the dataframe is ready for Cassandra
if "raw_df" not in globals():
    raise ValueError("raw_df is not defined. Please run the previous steps first.")

if raw_df.empty:
    raise ValueError("raw_df is empty. Fetch data from the API before continuing.")

required_cols = ["price_area", "production_group", "start_time", "quantity_kwh"]
for col in required_cols:
    if col not in raw_df.columns:
        raise ValueError(f"Missing column: {col}")

raw_df.head()


## 9. Write the data into Cassandra using Spark

Here I convert the pandas dataframe (`raw_df`) to a Spark DataFrame  
and insert it into the Cassandra keyspace **elhub**, table **production_2021**.

I use `mode("append")` to keep existing rows and only add new ones.  
The Cassandra table schema was already created with the primary key  
`(price_area, start_time, production_group)`.

In [None]:
# Set Cassandra connection parameters for Spark
spark.conf.set("spark.cassandra.connection.host", "127.0.0.1")
spark.conf.set("spark.cassandra.connection.port", "9042")
spark.conf.set("spark.cassandra.output.consistency.level", "LOCAL_ONE")

# Basic checks before writing
print("Columns:", raw_df.columns.tolist())
print("Types:\n", raw_df.dtypes.head())
print("Rows:", len(raw_df))

# Convert pandas dataframe to Spark dataframe
sdf = spark.createDataFrame(raw_df)

# Write to Cassandra
try:
    (
        sdf.write
        .format("org.apache.spark.sql.cassandra")
        .mode("append")
        .options(keyspace="elhub", table="production_2021")
        .save()
    )
    print("Data successfully written to elhub.production_2021")
except Exception as e:
    print("Write failed:", e)
    raise


## 10. Read back from Cassandra and inspect

I now read the table `elhub.production_2021` back from Cassandra using Spark.  
This allows me to confirm that the data was correctly written.  
Then, I convert it to a pandas dataframe for later use (for plotting and MongoDB upload).


In [None]:
# Read data from Cassandra
sdf_check = (
    spark.read
    .format("org.apache.spark.sql.cassandra")
    .options(keyspace="elhub", table="production_2021")
    .load()
    .select("price_area", "production_group", "start_time", "quantity_kwh")
)

# Convert to pandas dataframe
pdf = sdf_check.toPandas()

print("Data shape:", pdf.shape)
pdf.head()


## 11. Insert the curated data into MongoDB

In this step I push the cleaned dataset to MongoDB Atlas.

The goal is:
- Store the data in a cloud database (`ind320.elhub_production_2021`)
- Let the Streamlit app read directly from MongoDB (so the app does not have to connect to Cassandra)

I use `pymongo` to connect to the cluster.  
The connection string is stored in my local secrets and is not committed to GitHub.

In [None]:
# Connection details:
# I keep the URI in a local variable here, but in production Streamlit reads it from st.secrets.
client = MongoClient(MONGO_URI)
mongo_db = client[MONGO_DB]
mongo_collection = mongo_db[MONGO_COLLECTION]

# Quick check that the connection works
print("MongoDB collections:", mongo_db.list_collection_names())