# <b> <span style="color:white">Electricity Sector Data Streaming & Analysis</span></b>


# <b> <span style="color:white">GROUP 04</span></b>


| Name                   | SID       | Unikey   |
| ---------------------- | --------- | -------- |
| Putu Eka Udiyani Putri | 550067302 | pput0940 |
| Rengga Firmandika      | 550126632 | rfir0117 |
| Vincentius Ansel Suppa | 550206406 | vsup0468 |


## <b> <span style="color:orange">0. Configuration and Import Required Libraries</span></b>


**Quick start:**
1. Project structure:
   
   <pre>
   Assignment2_Tut07_G04/
   ├── Assignment_2.ipynb      # main notebook
   └── requirements.txt        # list of required libraries to run the notebook
   </pre>

   Ensure your working directory is writable.

2. Create venv & install exact dependencies<br/>
   `python -m venv .venv`<br/>
   Windows: `.\.venv\Scripts\activate` | macOS/Linux: `source .venv/bin/activate`<br/>
   `python -m pip install --upgrade pip`<br/>
   `pip install -r requirements.txt`

3. Copy `.env.template` to `.env` file, replace `your_api_key` with your actual API key. 

4. Run the full pipeline (extract -> clean -> augment -> transform -> load)<br/>


Import all the required libraries first.


In [114]:
from dotenv import load_dotenv
import os

import requests
import pandas as pd
from datetime import datetime, timedelta
import time
import json
import math
import glob
from pathlib import Path
import paho.mqtt.client as mqtt

## <b> <span style="color:orange">1. Data Retrieval</span></b>


In [90]:
# basic configs
API_KEY = os.getenv("OPENELECTRICITY_API_KEY")
API_KEY = API_KEY.strip().strip('"').strip("'")  
BASE_URL = "https://api.openelectricity.org.au/v4/"
HEADERS = {
        "Authorization": f"Bearer {API_KEY}",
        "Accept": "application/json",
    }


# function to fetch data
def fetch_data_from_API(endpoint: str, query_params: dict): 
    try:
        response = requests.get(f"{BASE_URL}{endpoint}", headers=HEADERS, params=query_params)
        
        print(f"Response status: {response.status_code}")
        print(f"Response url: {response.url}")
        
        if response.status_code == 200:
            return response.json()
        else:
            print(f"API Error {response.status_code}: {response.text}")
            print(f"Response headers: {dict(response.headers)}")

            try:
                error_json = response.json()
                print(f"Error details: {error_json}")
            except:
                print("Could not parse error response as JSON")
            return None
    except requests.exceptions.RequestException as e:
        print(f"Request failed: {e}")
        return None

# helper function to save dataset
def save_dataset(df: pd.DataFrame, out_csv_path: str):
	out_path = Path(out_csv_path)
	out_path.parent.mkdir(parents=True, exist_ok=True)
	df.to_csv(out_path, index=False)
	print(f"Saved: {out_path}")

### <b> <span style="color:pink">1.1 Get All Facilities in NEM Region</span></b>


In [78]:
# set endpoint and params
ENDPOINT = "facilities/"
PARAMS = {
    'network_id': 'NEM',
}

# fetch facilities data
facilities = fetch_data_from_API(endpoint=ENDPOINT, query_params=PARAMS)
facilities_df = pd.json_normalize(facilities['data'])
facilities_df.head()

Response status: 200
Response url: https://api.openelectricity.org.au/v4/facilities/?network_id=NEM


Unnamed: 0,code,name,network_id,network_region,description,units,updated_at,created_at,location.lat,location.lng
0,ADP,Adelaide Desalination,NEM,SA1,"<p>The Adelaide Desalination plant (ADP), form...","[{'code': 'ADPPV1', 'fueltech_id': 'solar_util...",2025-08-05T06:08:12Z,2023-10-18T04:34:30Z,-35.096948,138.484061
1,ALDGASF,Aldoga,NEM,QLD1,<p>The Aldoga Solar Farm will be approximately...,"[{'code': 'ALDGASF1', 'fueltech_id': 'solar_ut...",2025-03-25T00:52:44Z,2025-01-31T04:19:33Z,-23.839544,151.0849
2,AMCORGR,Amcor Glass,NEM,SA1,<p></p>,"[{'code': 'AMCORGR', 'fueltech_id': 'distillat...",2023-10-18T04:34:32Z,2023-10-18T04:34:32Z,-34.882663,138.577975
3,ANGASTON,Angaston,NEM,SA1,<p>Angaston Power Station is a diesel-powered ...,"[{'code': 'ANGAS1', 'fueltech_id': 'distillate...",2025-09-07T01:53:13Z,2023-10-18T04:34:32Z,-34.503948,139.024296
4,APS,Anglesea,NEM,VIC1,<p>The Anglesea Power Station was a brown coal...,"[{'code': 'APS', 'fueltech_id': 'coal_brown', ...",2024-11-04T00:41:34Z,2023-10-18T04:34:32Z,-38.389031,144.180589


### <b> <span style="color:pink">1.2 Get All Power Generated and CO2 Emissions per Facility</span></b>


Facility data will return total data instead of facility specific data if we do not specify the facility code in tha API call. Hence, we need to pass the facility code we get from previous API call to this endpoint.

In [99]:
# get all facility code
FACILITY_LIST = facilities_df["code"].tolist()
print(f"Total facilities: {len(FACILITY_LIST)}")

Total facilities: 514


However, since the parameter only accept 30 max characters and we have more than 500 facilities, passing all facility code at once will result in error. To get around that, we will use the batching strategy.

In [103]:
OUT_DIR = "./DATA/EXTRACTED"
os.makedirs(OUT_DIR, exist_ok=True)
ENDPOINT = "data/facilities/NEM"
batch_size = 5

# function chunk the facility code list into batches
def chunk_list(lst, n):
    for i in range(0, len(lst), n):
        yield lst[i:i+n], i, min(i+n, len(lst))  

# batch retrieval
def batch_retrieval(batch_size:int):
    for batch, start, end in chunk_list(FACILITY_LIST, batch_size):
        batch_id = math.ceil(end/batch_size)
        cache_path = os.path.join(OUT_DIR, f"batch_{start+1:04d}_{end:04d}.json")

        if os.path.exists(cache_path):
            print(f"Batch {batch_id}: {start+1}-{end} already cached.")
            continue

        print(f"Fetching batch {batch_id}: facilities {start+1}–{end} ({batch})")

        # params
        params = {
            'network_code': 'NEM',
            'metrics': {'power', 'emissions'},
            'interval': '5m',
            "date_start": "2025-10-01",
            "date_end": "2025-10-08",
            "facility_code": {f for f in batch},
        }

        try:
            r = requests.get(f"{BASE_URL}{ENDPOINT}", headers=HEADERS, params=params, timeout=90)
            if r.status_code == 200:
                payload = r.json()
                with open(cache_path, "w") as f:
                    json.dump(payload, f, indent=2)
                print(f"Saved {cache_path}")
            else:
                print(f"HTTP {r.status_code}: {r.text[:150]}")
        except Exception as e:
            print(f"Batch {batch_id}: {e}")

        time.sleep(0.3)

# function to flatten the payload
def flatten(payload):
    rows = []
    for block in payload.get("data", []):
        metric   = block.get("metric")
        unit     = block.get("unit")
        interval = block.get("interval")
        for res in block.get("results", []):
            unit_code = (res.get("columns") or {}).get("unit_code")
            for ts, val in res.get("data", []):
                rows.append({"timestamp": ts, "unit_code": unit_code,
                            "metric": metric, "interval": interval, "unit": unit, "value": val})
    return rows


In [None]:
# Retrieve all facility data in batch of 5 per API call
batch_retrieval(5)

Batch 1: 1-5 already cached.
Batch 2: 6-10 already cached.
Batch 3: 11-15 already cached.
Batch 4: 16-20 already cached.
Batch 5: 21-25 already cached.
Batch 6: 26-30 already cached.
Batch 7: 31-35 already cached.
Batch 8: 36-40 already cached.
Batch 9: 41-45 already cached.
Batch 10: 46-50 already cached.
Batch 11: 51-55 already cached.
Batch 12: 56-60 already cached.
Batch 13: 61-65 already cached.
Batch 14: 66-70 already cached.
Batch 15: 71-75 already cached.
Batch 16: 76-80 already cached.
Batch 17: 81-85 already cached.
Batch 18: 86-90 already cached.
Batch 19: 91-95 already cached.
Batch 20: 96-100 already cached.
Batch 21: 101-105 already cached.
Batch 22: 106-110 already cached.
Batch 23: 111-115 already cached.
Batch 24: 116-120 already cached.
Batch 25: 121-125 already cached.
Batch 26: 126-130 already cached.
Batch 27: 131-135 already cached.
Batch 28: 136-140 already cached.
Batch 29: 141-145 already cached.
Batch 30: 146-150 already cached.
Batch 31: 151-155 already cache

## <b> <span style="color:orange">2. Data Integration and Caching</span></b>


Some facilities have more than one units, so we need to make separate tables for easier analysis.

In [None]:
# function to separate the facilities and units rows
def build_tables(facilities: list[dict]):
    facility_rows: list[dict] = []
    unit_rows: list[dict] = []

    for f in facilities:
        f_code = f.get("code")
        facility_rows.append({
            "facility_code": f_code,
            "facility_name": f.get("name"),
            "network_id": f.get("network_id"),
            "network_region": f.get("network_region"),
            "lat": (f.get("location") or {}).get("lat"),
            "lng": (f.get("location") or {}).get("lng"),
            "created_at": f.get("created_at"),
            "updated_at": f.get("updated_at"),
        })

        for u in (f.get("units") or []):
                # unify field names we care about
                unit_rows.append({
                    "unit_code": u.get("code"),
                    "facility_code": f_code,
                    "fueltech_id": u.get("fueltech_id"),
                    "status_id": u.get("status_id"),
                    "dispatch_type": u.get("dispatch_type"),
                    "capacity_registered": u.get("capacity_registered"),
                    "capacity_maximum": u.get("capacity_maximum"),
                    "capacity_storage": u.get("capacity_storage"),
                    "data_first_seen": u.get("data_first_seen"),
                    "data_last_seen": u.get("data_last_seen"),
                    "unit_created_at": u.get("created_at"),
                    "unit_updated_at": u.get("updated_at"),
                })
    
    facilities_df = pd.DataFrame(facility_rows).drop_duplicates(subset=["facility_code"]).reset_index(drop=True)
    units_lookup_df = pd.DataFrame(unit_rows).drop_duplicates(subset=["unit_code"]).reset_index(drop=True)

    return facilities_df, units_lookup_df


Save the tables into separate csv files.

In [43]:
facilities_df, units_facilities_df = build_tables(facilities['data'])

# save to csv
save_dataset(facilities_df, "DATA/EXTRACTED/electricity_facilities.csv")
save_dataset(units_facilities_df, "DATA/EXTRACTED/electricity_units_facilities.csv")

Saved: DATA\EXTRACTED\electricity_facilities.csv
Saved: DATA\EXTRACTED\electricity_units_facilities.csv


For power and emission data per facility, we need to perform some pre-processing to store them into a cached csv file. Specifically, for this process we need to:
1. Combine all cached .json data of into one dataframe.
2. Sum the facilitiy data to get total power and emissions per facility (some facilities have more than one units).
3. Append additional information to each facility (e.g. lat, lon, facility_name, etc).

In [None]:
# combine all json cache into one dataframe
records = []
for path in glob.glob("./DATA/EXTRACTED/*.json"):
    payload = json.load(open(path))
    
    records.extend(flatten(payload))

series_df = pd.DataFrame(records)

lookup = pd.read_csv("./DATA/EXTRACTED/electricity_units_facilities.csv")[["unit_code","facility_code"]]
series_df = series_df.merge(lookup, on="unit_code", how="left")
facility_df = (series_df.groupby(["timestamp","facility_code","metric"], as_index=False)["value"].sum())

facilities_df = pd.read_csv("./DATA/EXTRACTED/electricity_facilities.csv")[
    ["facility_code", "facility_name", "network_id", "network_region", "lat", "lng"]
]
facility_df = facility_df.merge(facilities_df, on="facility_code", how="left")

# reorder columns for clarity
facility_df = facility_df[
    [
        "timestamp", "facility_code", "facility_name", "network_id", "network_region",
        "lat", "lng", "metric", "value"
    ]
]


In [113]:
# save to csv
save_dataset(facility_df, "DATA/EXTRACTED/consolidated_facilities.csv")

Saved: DATA\EXTRACTED\consolidated_facilities.csv


## <b> <span style="color:orange">3. Data Publishing via MTQQ</span></b>


In [None]:
# MQTT Config
BROKER = "172.17.34.107"   
PORT = 1883
TOPIC = "COMP5339/facilities/rengga"

# define what happens upon connection to the server
def on_connect(client, userdata, connect_flags, reason_code, properties):
    print("Connected with result code " + str(reason_code))
    client.subscribe("COMP5339/ASSIGNMENT2/TUT07GR04")

# define what happens upon receiving a message from the server
def on_message(client, userdata, msg):
    print(f"Received message on topic {msg.topic}: {msg.payload}")

# setup client
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
client.on_connect = on_connect
client.connect(BROKER, PORT, 60)
client.loop_start()

df = pd.read_csv("DATA/EXTRACTED/consolidated_facilities.csv")



## <b> <span style="color:orange">4. Dashboard and Visualisation</span></b>
