# IND320 — Part 2: Elhub → Spark/Cassandra → MongoDB → Streamlit
**Student:** Isma Sohail  
**Course:** IND320 Data to Decision  
**Part:** 2

This Notebook implements the required pipeline:

1. Fetch hourly production data for 2021 from Elhub API (`PRODUCTION_PER_GROUP_MBA_HOUR`).
2. Normalize JSON → `pandas.DataFrame` (only the `productionPerGroupMbaHour` list).
3. Insert data into Cassandra using Spark.
4. Use Spark to read and select: `priceArea, productionGroup, startTime, quantityKwh`.
5. Create plots: (i) **Pie** = total year by production group for a chosen price area; (ii) **Line** = first month by production group for a chosen price area.
6. Insert the Spark-extracted data into **MongoDB**.
7. Fill the **AI usage** note and a **300–500 word log**.

> **Tip:** If Docker is unavailable on your laptop, use **managed services**:
- **Cassandra** → DataStax **Astra DB** (free-tier, compatible with Spark connector).
- **MongoDB** → **MongoDB Atlas** free-tier.


## 0. Setup (env, packages, secrets)
Fill in `.streamlit/secrets.toml` or export environment variables locally. The Streamlit app will read the same secrets.

In [None]:
import os, json, pandas as pd, numpy as np
import matplotlib.pyplot as plt
from datetime import datetime, timedelta, timezone
from dateutil import tz
import requests

ELHUB_API_BASE = os.environ.get("ELHUB_API_BASE", "https://api.elhub.no/energy-data-api")
ELHUB_API_TOKEN = os.environ.get("ELHUB_API_TOKEN")  # Bearer token from Elhub portal
assert ELHUB_API_TOKEN is not None, "Set ELHUB_API_TOKEN (bearer token) via env vars or secrets."

MONGODB_URI = os.environ.get("MONGODB_URI", "")
MONGODB_DB = os.environ.get("MONGODB_DB", "ind320")
MONGODB_COLLECTION = os.environ.get("MONGODB_COLLECTION", "elhub_production")

print("Base:", ELHUB_API_BASE)


## 1. Elhub API fetch — `PRODUCTION_PER_GROUP_MBA_HOUR` (year 2021)
**Observe:**
- Time is encoded in ISO-8601 (UTC). Handle CET/CEST transitions for Europe/Oslo if converting.
- Requests have **period limits** → fetch **month-by-month**.
- Keep only `productionPerGroupMbaHour` list.
**Fields:** `priceArea`, `productionGroup`, `startTime`, `quantityKwh`.


In [None]:
from dateutil.relativedelta import relativedelta

def elhub_headers():
    return {"Authorization": f"Bearer {ELHUB_API_TOKEN}", "Accept": "application/json"}

def fetch_month(year: int, month: int):
    endpoint = f"{ELHUB_API_BASE}/v1/production/PRODUCTION_PER_GROUP_MBA_HOUR"
    start = datetime(year, month, 1, tzinfo=timezone.utc)
    end = start + relativedelta(months=1)
    params = {"startTime": start.isoformat(), "endTime": end.isoformat(), "aggregation": "Hour"}
    r = requests.get(endpoint, headers=elhub_headers(), params=params, timeout=60)
    r.raise_for_status()
    data = r.json()
    return data.get("productionPerGroupMbaHour", [])

all_rows = []
for m in range(1, 13):
    print(f"Fetching 2021-{m:02d} ...")
    all_rows.extend(fetch_month(2021, m))
len(all_rows)


In [None]:
import pandas as pd
raw_df = pd.json_normalize(all_rows)
colmap = {"priceArea":"priceArea","productionGroup":"productionGroup","startTime":"startTime","quantityKwh":"quantityKwh"}
df = raw_df[list(colmap.keys())].rename(columns=colmap).copy()
df["startTime"] = pd.to_datetime(df["startTime"], utc=True)
df.head()


### (Fallback) Create a tiny sample if API access is blocked
Run **only** if the cell above failed. This unblocks later steps during development.


In [None]:
if df.empty:
    import numpy as np
    rng = pd.date_range("2021-01-01", "2021-01-31 23:00", freq="H", tz="UTC")
    demo = []
    for area in ["NO1","NO2"]:
        for grp in ["WIND","HYDRO","THERMAL"]:
            demo.extend([{"priceArea": area, "productionGroup": grp, "startTime": t, "quantityKwh": int(np.random.randint(1000, 5000))} for t in rng])
    df = pd.DataFrame(demo)
df.sample(5)


## 2. Insert to Cassandra using Spark (Astra)
Download secure connect bundle and set env vars: `ASTRA_SECURE_BUNDLE_PATH`, `ASTRA_CLIENT_ID`, `ASTRA_CLIENT_SECRET`.
Create keyspace `ind320` first in Astra.

In [None]:
import os
from pyspark.sql import SparkSession

astra_bundle = os.environ.get("ASTRA_SECURE_BUNDLE_PATH")
astra_client_id = os.environ.get("ASTRA_CLIENT_ID")
astra_client_secret = os.environ.get("ASTRA_CLIENT_SECRET")
assert astra_bundle and astra_client_id and astra_client_secret, "Set Astra bundle + client id/secret env vars."

spark = SparkSession.builder.appName("IND320-Elhub-Cassandra") \
    .config("spark.cassandra.connection.config.cloud.path", astra_bundle) \
    .config("spark.cassandra.auth.username", astra_client_id) \
    .config("spark.cassandra.auth.password", astra_client_secret) \
    .getOrCreate()

sdf = spark.createDataFrame(df)
keyspace = "ind320"
table = "elhub_production_2021"
sdf.write.format("org.apache.spark.sql.cassandra").mode("append").options(keyspace=keyspace, table=table).save()
print("Written to Cassandra:", f"{keyspace}.{table}")


## 3. Read back & select required columns

In [None]:
sdf2 = spark.read.format("org.apache.spark.sql.cassandra").options(keyspace=keyspace, table=table).load()
sel = sdf2.select("priceArea","productionGroup","startTime","quantityKwh")
sel.limit(5).toPandas()


## 4. Plots

In [None]:
pdf = sel.toPandas()
pdf["startTime"] = pd.to_datetime(pdf["startTime"], utc=True)
areas = sorted(pdf["priceArea"].unique().tolist())
chosen = areas[0] if areas else "NO1"
pie_df = pdf[pdf["priceArea"] == chosen].groupby("productionGroup", as_index=False)["quantityKwh"].sum()
pie_df.plot(kind="pie", y="quantityKwh", labels=pie_df["productionGroup"], autopct="%1.1f%%", legend=False, ylabel="")
import matplotlib.pyplot as plt
plt.title(f"Total production 2021 — {chosen}")
plt.show()

# Line — month 1
jan = pdf[(pdf["priceArea"] == chosen) & (pdf["startTime"].dt.month == 1)].copy()
for grp, sub in jan.groupby("productionGroup"):
    sub = sub.sort_values("startTime")
    plt.plot(sub["startTime"], sub["quantityKwh"], label=grp)
plt.legend(); plt.title(f"Hourly production — January 2021 — {chosen}"); plt.xlabel("Time"); plt.ylabel("kWh"); plt.show()


## 5. Insert selected data into MongoDB

In [None]:
from pymongo import MongoClient
client = MongoClient(MONGODB_URI) if MONGODB_URI else None
if client is None:
    raise RuntimeError("Set MONGODB_URI to insert into MongoDB Atlas.")
mdb = client[MONGODB_DB]
coll = mdb[MONGODB_COLLECTION]
coll.delete_many({})
records = pdf[["priceArea","productionGroup","startTime","quantityKwh"]].copy()
records["startTime"] = records["startTime"].dt.tz_convert("UTC").dt.strftime("%Y-%m-%dT%H:%M:%SZ")
coll.insert_many(records.to_dict(orient="records"))
coll.count_documents({})


## 6. AI usage (brief)
Write 2–4 sentences about AI assistance.

_(Fill here in your own words.)_

## 7. Work log (300–500 words)
Describe Notebook + Streamlit experience, auth, DST, limits.

_(Write 300–500 words here.)_