# Assignment 3
Sigvard Bratlie

In [4]:
import pandas as pd
import requests
import matplotlib.pyplot as plt
import numpy as np
from pymongo import MongoClient
import os
from typing import Literal
import plotly.graph_objects as go
from plotly.subplots import make_subplots


In [5]:
def get_period(month : int,year : int,dataset : str,entity : str="price-areas") -> dict:
    '''
    A function to fetch a single month's data from the Elhub API.

    Args:
        month (int): The month for which to fetch data (1-12).
    Returns:
        dict: The JSON response from the API.
    '''
    entity = "price-areas" #Fetched per price areas
    dataset = dataset #Dataset to fetch
    base_url = f"https://api.elhub.no/energy-data/v0/{entity}" #Base URL for the API
    year = year
    end_year = year
    end_month = month+1
    if month == 12:
        end_year += 1
        end_month = 1
    params = { #Parameters for the API request
        "dataset": dataset,
        "startDate": f"{year}-{str(month).zfill(2)}-01", #using zfill to ensure two digits
        "endDate": f"{end_year}-{str(end_month).zfill(2)}-01" 
    }
    try:
        response = requests.get(base_url, params=params)
        response.raise_for_status()  # Raise an error for HTTP errors
        return response.json()
    except requests.RequestException as e:
        print(f"Error fetching data: {e}")

def ext_single(data : dict,dataset : str) -> list:
    '''
    A function to extract the productionPerGroupMbaHour list from the API response.

    Args:
        data (dict): The JSON response from the API.
    Returns:
        list: A list of productionPerGroupMbaHour entries.    
    '''
    ext_data = [] #tmp list to hold the data for a single month
    dataset = "".join(
                    x.lower() if idx == 0 else x.capitalize()
                    for idx, x in enumerate(dataset.lower().split("_"))
                    )
    for price_area in data.get("data"):
        attrs = price_area.get("attributes")
        if attrs:
            ext_data.extend(attrs.get(dataset))
    return ext_data

def fetch_and_extract_all(start_year: int, end_year: int,dataset : str) -> pd.DataFrame:
    '''
    A function to fetch and extract data for all months of the year.

    Returns:
        pd.DataFrame: A DataFrame containing all the extracted data.
    '''
    #Loops trough all months, fetches and extracts the data, and combines it into a single DataFrame
    if end_year < start_year:
        raise ValueError("end_year must be greater than or equal to start_year")
    all_data = []
    for year in range(start_year, end_year + 1):
        for month in range(1, 12 + 1):
            data = get_period(month = month, year = year, dataset=dataset) #defined above
            if data:
                ext_data = ext_single(data = data, dataset = dataset) #defined above
                all_data.extend(ext_data)

    df = pd.DataFrame(all_data)
    return df

In [80]:
df_prod = fetch_and_extract_all(start_year = 2022, end_year = 2024,dataset="PRODUCTION_PER_GROUP_MBA_HOUR")

In [51]:
df_cons = fetch_and_extract_all(start_year = 2021, end_year = 2024,dataset="CONSUMPTION_PER_GROUP_MBA_HOUR")

In [81]:
def convert_to_utc(df):
    '''
    A function to convert the time columns to datetime format with UTC timezone.

    Args:
        df (pd.DataFrame): The DataFrame containing the time columns.
    Returns:
        pd.DataFrame: The DataFrame with converted time columns.
    '''
    #Convert the time columns to datetime format
    df["endTime"] = pd.to_datetime(df["endTime"], errors='coerce',utc= True) #utc = True to avoid timezone issues
    df["startTime"] = pd.to_datetime(df["startTime"],errors='coerce',utc = True)
    df["lastUpdatedTime"] = pd.to_datetime(df["lastUpdatedTime"],errors='coerce',utc = True)
    return df

df_prod = convert_to_utc(df_prod)
df_cons = convert_to_utc(df_cons)

In [82]:
df_prod.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 657600 entries, 0 to 657599
Data columns (total 6 columns):
 #   Column           Non-Null Count   Dtype              
---  ------           --------------   -----              
 0   endTime          657600 non-null  datetime64[ns, UTC]
 1   lastUpdatedTime  657600 non-null  datetime64[ns, UTC]
 2   priceArea        657600 non-null  object             
 3   productionGroup  657600 non-null  object             
 4   quantityKwh      657600 non-null  float64            
 5   startTime        657600 non-null  datetime64[ns, UTC]
dtypes: datetime64[ns, UTC](3), float64(1), object(2)
memory usage: 30.1+ MB


In [83]:
df_cons.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 876600 entries, 0 to 876599
Data columns (total 7 columns):
 #   Column              Non-Null Count   Dtype              
---  ------              --------------   -----              
 0   consumptionGroup    876600 non-null  object             
 1   endTime             876600 non-null  datetime64[ns, UTC]
 2   lastUpdatedTime     876600 non-null  datetime64[ns, UTC]
 3   meteringPointCount  876600 non-null  int64              
 4   priceArea           876600 non-null  object             
 5   quantityKwh         876600 non-null  float64            
 6   startTime           876600 non-null  datetime64[ns, UTC]
dtypes: datetime64[ns, UTC](3), float64(1), int64(1), object(2)
memory usage: 46.8+ MB


In [84]:
print(f'Production data from {df_prod["startTime"].min()} to {df_prod["endTime"].max()}') #Note: UTC, therefore deviant times
print(f'Consumption data from {df_cons["startTime"].min()} to {df_cons["endTime"].max()}')

Production data from 2021-12-31 23:00:00+00:00 to 2024-12-31 23:00:00+00:00
Consumption data from 2020-12-31 23:00:00+00:00 to 2024-12-31 23:00:00+00:00


In [85]:
#drop endtime and lastupdatedtime columns to match with existing data
df_prod_spark = df_prod.drop(columns=["endTime","lastUpdatedTime"],errors='ignore')
df_cons_spark = df_cons.drop(columns=["endTime","lastUpdatedTime"],errors='ignore')

### Cassandra & Spark

In [57]:
from cassandra.cluster import Cluster
from pyspark.sql import SparkSession

#!docker start my_cassandra
#wait for cassandra container to start

cluster = Cluster(["localhost"], port=9042) #defining the cassandra cluster
session = cluster.connect() #connection to local cassandra container

In [59]:
# !which java #to ensure java is installed
# !java -version #to check java version
spark = SparkSession.builder.appName('SparkCassandraApp').\
    config('spark.jars.packages', 'com.datastax.spark:spark-cassandra-connector_2.12:3.5.1').\
    config('spark.cassandra.connection.host', 'localhost').\
    config('spark.cassandra.connection.port', '9042').\
    config("spark.driver.extraJavaOptions", "-Dhadoop.security.subject.provider.backend=hadoop").\
    getOrCreate() #connecting spark and cassandra
print(spark.version) #checking spark version

3.5.1


In [60]:
def mk_sdf(df: pd.DataFrame):
    sdf = spark.createDataFrame(df) #creating a spark dataframe from the pandas dataframe
    sdf.printSchema() #printing the schema
    print("\n\n")
    return sdf
sdf_prod = mk_sdf(df_prod_spark)
sdf_cons = mk_sdf(df_cons_spark)

root
 |-- priceArea: string (nullable = true)
 |-- productionGroup: string (nullable = true)
 |-- quantityKwh: double (nullable = true)
 |-- startTime: timestamp (nullable = true)




root
 |-- consumptionGroup: string (nullable = true)
 |-- meteringPointCount: long (nullable = true)
 |-- priceArea: string (nullable = true)
 |-- quantityKwh: double (nullable = true)
 |-- startTime: timestamp (nullable = true)






In [66]:
#Building the CQL query to create an empty table
def build_query(df,dataset:str="prod_data",primary_keys : list = ["startTime","priceArea"]):
    '''
    A function to build a CQL query to create a table based on the DataFrame schema.

    Args:
        df (pd.DataFrame): The DataFrame for which to build the CQL query.
    Returns:
        str: The CQL query string.
    '''
    sql_query = f'CREATE TABLE IF NOT EXISTS elhub.{dataset} ('
    for name,dtype in df.dtypes.items():
        if dtype == 'object':
            dtype = 'text'
        elif dtype == 'int64':
            dtype = 'int'
        elif dtype == 'float64':
            dtype = 'float'
        elif dtype == 'datetime64[ns, UTC]':
            dtype = 'timestamp'
        else:
            dtype = 'text'  #default to text if unknown type
        sql_query += f"{name} {dtype}, "

    sql_query += f"PRIMARY KEY ({', '.join(primary_keys)}));"
    return sql_query

In [86]:
query_cons = build_query(df_cons_spark,dataset="cons_data")
print(query_cons)

CREATE TABLE IF NOT EXISTS elhub.cons_data (consumptionGroup text, meteringPointCount int, priceArea text, quantityKwh float, startTime timestamp, PRIMARY KEY (startTime, priceArea));


In [None]:
session.execute(query_cons)

<cassandra.cluster.ResultSet at 0x35b5b6010>

In [70]:
#ADD PRODUCTION DATA
sdf = sdf_prod.toDF(*[c.lower() for c in sdf_prod.columns]) #lower column names

sdf.write.format("org.apache.spark.sql.cassandra")\
   .options(table="prod_data", keyspace="elhub")\
   .mode("append").save() #writing data to cassandra table

25/11/12 15:38:47 WARN TaskSetManager: Stage 15 contains a task of very large size (3009 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [71]:
#ADD CONSUMPTION DATA
sdf = sdf_cons.toDF(*[c.lower() for c in sdf_cons.columns]) #lower column names

sdf.write.format("org.apache.spark.sql.cassandra")\
   .options(table="cons_data", keyspace="elhub")\
   .mode("append").save() #writing data to cassandra table

25/11/12 15:39:00 WARN TaskSetManager: Stage 16 contains a task of very large size (3525 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [72]:
spark.read.format("org.apache.spark.sql.cassandra")\
    .options(table="prod_data",keyspace="elhub").load().show() #verifying data is written

print("=" * 100 + "\n\n")
spark.read.format("org.apache.spark.sql.cassandra")\
    .options(table="cons_data",keyspace="elhub").load().show() #verifying data is written

+-------------------+---------+---------------+-----------+
|          starttime|pricearea|productiongroup|quantitykwh|
+-------------------+---------+---------------+-----------+
|2023-05-24 00:00:00|      NO1|           wind|  2693058.8|
|2023-05-24 00:00:00|      NO2|           wind|  4704706.0|
|2023-05-24 00:00:00|      NO3|           wind|  1994079.1|
|2023-05-24 00:00:00|      NO4|           wind|  1441569.6|
|2023-05-24 00:00:00|      NO5|          solar|  3130841.8|
|2021-05-06 07:00:00|      NO1|           wind|  1875766.8|
|2021-05-06 07:00:00|      NO2|           wind|  7956765.5|
|2021-05-06 07:00:00|      NO3|           wind|  2363898.8|
|2021-05-06 07:00:00|      NO4|           wind|  3057914.2|
|2021-05-06 07:00:00|      NO5|        thermal|  3703828.8|
|2023-05-23 17:00:00|      NO1|           wind|  2610378.2|
|2023-05-23 17:00:00|      NO2|           wind|  3653528.5|
|2023-05-23 17:00:00|      NO3|           wind|  2495928.2|
|2023-05-23 17:00:00|      NO4|         

In [74]:
def mk_view(dataset = "prod_data"):
    spark.read.format("org.apache.spark.sql.cassandra")\
        .options(table=dataset,keyspace="elhub").load()\
        .createOrReplaceTempView(f"{dataset}_view") #create view
    
mk_view(dataset="prod_data")
mk_view(dataset="cons_data")

query_prod = "SELECT pricearea,productiongroup,starttime,quantitykwh FROM prod_data_view;"
query_cons = "SELECT consumptiongroup ,meteringpointcount ,pricearea ,quantitykwh ,starttime FROM cons_data_view;"
prod_view = spark.sql(query_prod).toPandas() #read view into pandas
cons_view = spark.sql(query_cons).toPandas() #read view into pandas

In [75]:
print(prod_view["starttime"].min(), prod_view["starttime"].max())
print(cons_view["starttime"].min(), cons_view["starttime"].max())

2021-01-01 00:00:00 2024-12-31 23:00:00
2021-01-01 00:00:00 2024-12-31 23:00:00


### MongoDB

In [76]:
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
import os
from dotenv import load_dotenv
load_dotenv() #loading the mongo db password as an environment variable

True

In [77]:
def get_database():
 
   # Connection string from MongoDB
   CONNECTION_STRING = f"mongodb+srv://sigvardbratlie:{os.getenv('MONGODB_PASSWORD')}@cluster0.y7mplij.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"
 
   # Create a connection using MongoClient. 
   client = MongoClient(CONNECTION_STRING)
   try:
      client.admin.command("ping")
      print(f'Everything Okay') #print if connection is successful
      return client
   except Exception as e:
     print(e)

client = get_database()
db = client["elhub"] #create database
prod_data = db["prod_data"] #create collection
cons_data = db["cons_data"] #create collection

Everything Okay


In [None]:
prod_data.insert_many(prod_view.to_dict('records')) # Append data 22-24 into collection, already contains 21
cons_data.insert_many(cons_view.to_dict('records')) # Innsert data 21-24 into collection

InsertManyResult([ObjectId('69149d80df88ad38f4041cb6'), ObjectId('69149d80df88ad38f4041cb7'), ObjectId('69149d80df88ad38f4041cb8'), ObjectId('69149d80df88ad38f4041cb9'), ObjectId('69149d80df88ad38f4041cba'), ObjectId('69149d80df88ad38f4041cbb'), ObjectId('69149d80df88ad38f4041cbc'), ObjectId('69149d80df88ad38f4041cbd'), ObjectId('69149d80df88ad38f4041cbe'), ObjectId('69149d80df88ad38f4041cbf'), ObjectId('69149d80df88ad38f4041cc0'), ObjectId('69149d80df88ad38f4041cc1'), ObjectId('69149d80df88ad38f4041cc2'), ObjectId('69149d80df88ad38f4041cc3'), ObjectId('69149d80df88ad38f4041cc4'), ObjectId('69149d80df88ad38f4041cc5'), ObjectId('69149d80df88ad38f4041cc6'), ObjectId('69149d80df88ad38f4041cc7'), ObjectId('69149d80df88ad38f4041cc8'), ObjectId('69149d80df88ad38f4041cc9'), ObjectId('69149d80df88ad38f4041cca'), ObjectId('69149d80df88ad38f4041ccb'), ObjectId('69149d80df88ad38f4041ccc'), ObjectId('69149d80df88ad38f4041ccd'), ObjectId('69149d80df88ad38f4041cce'), ObjectId('69149d80df88ad38f4041c

In [97]:
res_prod = prod_data.aggregate([
    {"$group" : {
        "_id" : None,
        "min" : {"$min" : "$starttime"},
        "max" : {"$max" : "$starttime"},    
    }}])
for doc in res_prod:
    print(f'Production data from {doc["min"]} to {doc["max"]}')

Production data from 2021-01-01 00:00:00 to 2024-12-31 23:00:00


In [98]:
res_prod = cons_data.aggregate([
    {"$group" : {
        "_id" : None,
        "min" : {"$min" : "$starttime"},
        "max" : {"$max" : "$starttime"},    
    }}])
for doc in res_prod:
    print(f'Production data from {doc["min"]} to {doc["max"]}')

Production data from 2021-01-01 00:00:00 to 2024-12-31 23:00:00


In [110]:
import datetime
d = datetime.date(2024, 12, 31)

datetime.datetime(2024, 12, 31, 0, 0)

## AI Usage

Used for content refinement (spelling, syntax debugging) and structuring/drafting log entries. \
Especially helpful for issues related to cassandra-spark connection errors.

Specifically for this CA4, AI was used to for syntax look-ups regarding plotly, geosjon and FolioMap. 

## Project Log (300-500 words)

This assignment built upon previous work by introducing advanced statistical methods for analyzing Norwegian electricity production and meteorological data. The work was divided between Jupyter Notebook development and Streamlit application deployment.

**Jupyter Notebook Development:**


**Streamlit Application:**


## Links

**GitHub Repository:**
https://github.com/sigvardbratlie/ind320 \

**Streamlit App:**
https://ind320-h63n5qj5uc26acyzlq3x39.streamlit.app/ 



---