# Project work, part 4
Sara H√∏rte

### Links

GitHub repo link: 
https://github.com/sarahorte/ind320project.git

Streamlit app link: 
https://ind320project.streamlit.app/

### Bonus exercise: 
Snow drift

### Log

The first thing I did was fetch the data: production data for 2022‚Äì2024 and consumption data for 2021‚Äì2024. I had some trouble with database connections and setting up the MongoDB collections correctly, but eventually got it working. Unfortunately, I accidentally deleted the existing 2021 data in MongoDB, so I had to go back to CA2 and reload it. After that, I adjusted the code so it wouldn‚Äôt clear the table before inserting new data. Later, I discovered that some column names were not in lowercase, which caused many null values in the database. I fixed the naming issues and downloaded all the data again using consistent lowercase columns.

Next, I updated the spectrogram and weather plots to use Plotly instead of Matplotlib.

Then I started implementing the map page. I downloaded the GeoJSON data for the price areas and used the lecture code as a starting point. After that, I added UI elements to let the user choose an energy production/consumption group and a time interval (in days). The map then colours the price areas based on the mean values for the selected settings.

After the map, I moved on to the snow drift task. I used the Snow_drift module with some modifications. I chose the bonus exercise, plotting the monthly snow drift together with the seasonal snow drift. I struggled with aligning the time frames and making the plot work correctly. My solution was to repeat the seasonal average 12 times to match the monthly data. Then I adjusted the visuals. The plot shows one year before the period for seasonal drift and one year after for both monthly and seasonal. It was harder than expected. The plot should now be correct if you look away from the first and last year (one year before and one year after the period we actually are investigating).

Next, I implemented the sliding window correlation with selectable lag and window size. Using the lecture material and some help from ChatGPT, this part went fairly smoothly. I played with the controls and checked if I could spot any changes in correlations in normal conditions and in/after extreme weather events. The findings are reported later in this notebook.

Finally, I worked on the SARIMAX model. Fitting the model took a very long time and often crashed. I therefore decided to aggregate the data to one value per day. This made the model run much faster. The predictions were not very accurate, especially for production, but consumption gave slightly better results.

Towards the end, I reorganised the app slightly. I moved the LineChartColumn showing the first month of data to the weather page. I also rearranged the pages so the energy-related pages come first, then weather, and finally the weather‚Äìenergy correlation page.

My app is quite slow, especially the maps page, likely due to limited experience with caching. I tried to implement caching but probably not well enough.

Overall, I found this assignment very challenging and time-consuming. I managed to solve all the tasks in some way, but I know the app is far from perfect. Still, this is my first app ever, and I‚Äôm satisfied with what I achieved given my knowledge and time. I learned a lot during this course and also realised how much more there is to learn.

### AI usage
AI was used extensively throughout this project, primarily through ChatGPT and GitHub Copilot in VS Code. I leveraged AI to generate code suggestions, adapt examples from lectures, and refine solutions for specific tasks. For instance, when implementing the Sliding Window Correlation, I combined code snippets from lectures with AI-generated suggestions, then iteratively adjusted and tested the code until it functioned correctly.

I also used AI to interpret error messages and propose potential fixes. By testing different solutions suggested by ChatGPT, I was able to identify the most effective approach. For parameter selection in the various analysis functions, such as the SARIMAX forecasting and the sliding window correlation, ChatGPT provided guidance. The interactive nature of the Streamlit app made it easy to experiment with different parameters, and AI guidance accelerated this process. 

Beyond coding, I also used ChatGPT to refine and improve the wording in this notebook, ensuring clarity and readability.

Overall, AI proved to be a valuable tool for both code development and documentation, complementing my own understanding and enabling more efficient problem-solving.

### Meteorology and energy production: Sliding Window Correlation findings

Throughout the years, we observe a clear correlation between temperature and energy consumption. In summer, higher temperatures are associated with lower energy consumption, likely due to reduced heating demand. Conversely, in winter, colder temperatures coincide with increased energy consumption.

During the extreme weather event Hans in August 2023, there was a strong negative correlation between wind speed and energy production in price area NO5 (western Norway). This effect, observed with zero lag and a 48-hour rolling window, is likely due to operational constraints or grid-balancing measures that limited wind power output during the storm.

At the same time, there was a positive correlation between precipitation and energy production, using a 48-hour window with a 48-hour lag. This suggests that the heavy rainfall contributed to increased hydro reservoir levels, which subsequently boosted hydroelectric generation.

These findings illustrate how extreme weather events can temporarily alter the typical relationships between meteorological variables and energy production. At the same time, some correlations may be coincidental, so interpretations should be made with caution.

### Elhub data

In [2]:
# =============================
# FETCH DATA FROM ELHUB API (2021-2024) ‚Äì FASTER
# =============================
import requests
import pandas as pd
from datetime import datetime, timedelta

# --- API SETTINGS ---
BASE_URL = "https://api.elhub.no/energy-data/v0/price-areas"

# datasets
DATASET_PROD = "PRODUCTION_PER_GROUP_MBA_HOUR"
DATASET_CONS = "CONSUMPTION_PER_GROUP_MBA_HOUR"

# --- DATE RANGES ---
PROD_START = datetime(2022, 1, 1)
PROD_END   = datetime(2024, 12, 31)

CONS_START = datetime(2021, 1, 1)
CONS_END   = datetime(2024, 12, 31)

# --- FUNCTION TO FORMAT DATES ---
def format_date(dt_obj):
    """Formats datetime for Elhub API (+02:00 offset)."""
    return dt_obj.strftime("%Y-%m-%dT%H:%M:%S%%2B02:00")  # +02:00 encoded

HEADERS = {"Accept": "application/json"}

# -----------------------
# Fetch data for a given month
# -----------------------
def fetch_month(dataset, year, month):
    start = datetime(year, month, 1)
    next_month = (start + timedelta(days=32)).replace(day=1)
    end = next_month - timedelta(seconds=1)

    url = f"{BASE_URL}?dataset={dataset}&startDate={format_date(start)}&endDate={format_date(end)}"
    resp = requests.get(url, headers=HEADERS, timeout=60)
    if resp.status_code != 200:
        print(f"‚ùå Error {resp.status_code} for {start.date()} ‚Üí {end.date()}")
        return pd.DataFrame()

    data = resp.json().get("data", [])
    records = []
    for entry in data:
        attrs = entry.get("attributes", {})
        if dataset == DATASET_PROD:
            key = "productionPerGroupMbaHour"
            group_field = "productionGroup"
        else:
            key = "consumptionPerGroupMbaHour"
            group_field = "consumptionGroup"
        recs = attrs.get(key, [])
        recs = [r for r in recs if r.get(group_field) != "*" and r.get(group_field) is not None]
        for r in recs:
            records.append({
                "pricearea": r.get("priceArea"),
                "groupname": r.get(group_field),
                "starttime": r.get("startTime"),
                "endtime": r.get("endTime"),
                "quantitykwh": r.get("quantityKwh")
            })
    if records:
        df = pd.DataFrame(records)
        df['starttime'] = pd.to_datetime(df['starttime'], utc=True, errors='coerce')
        df['endtime'] = pd.to_datetime(df['endtime'], utc=True, errors='coerce')
        df['quantitykwh'] = pd.to_numeric(df['quantitykwh'], errors='coerce')
        df = df[['pricearea', 'groupname', 'starttime', 'endtime', 'quantitykwh']]
        df.sort_values('starttime', inplace=True)
        df.set_index('starttime', inplace=True)
        return df
    return pd.DataFrame()

# -----------------------
# Fetch all months in a range
# -----------------------
def fetch_range_monthly(dataset, start_dt, end_dt):
    all_dfs = []
    cur = start_dt
    while cur <= end_dt:
        df_month = fetch_month(dataset, cur.year, cur.month)
        if not df_month.empty:
            all_dfs.append(df_month)
        # move to next month
        next_month = (cur + timedelta(days=32)).replace(day=1)
        cur = next_month
    return pd.concat(all_dfs) if all_dfs else pd.DataFrame()

# -----------------------
# Run ingestion
# -----------------------
# Production 2022‚Äì2024
df_prod = fetch_range_monthly(DATASET_PROD, PROD_START, PROD_END)
print("Production data shape:", df_prod.shape)
print(df_prod.head())

# Consumption 2021‚Äì2024
df_cons = fetch_range_monthly(DATASET_CONS, CONS_START, CONS_END)
print("Consumption data shape:", df_cons.shape)
print(df_cons.head())


Production data shape: (657600, 4)
                          pricearea groupname                   endtime  \
starttime                                                                 
2021-12-31 23:00:00+00:00       NO1     hydro 2022-01-01 00:00:00+00:00   
2021-12-31 23:00:00+00:00       NO4      wind 2022-01-01 00:00:00+00:00   
2021-12-31 23:00:00+00:00       NO2     other 2022-01-01 00:00:00+00:00   
2021-12-31 23:00:00+00:00       NO4   thermal 2022-01-01 00:00:00+00:00   
2021-12-31 23:00:00+00:00       NO1   thermal 2022-01-01 00:00:00+00:00   

                           quantitykwh  
starttime                               
2021-12-31 23:00:00+00:00   1291422.40  
2021-12-31 23:00:00+00:00    320912.40  
2021-12-31 23:00:00+00:00         0.20  
2021-12-31 23:00:00+00:00     38372.56  
2021-12-31 23:00:00+00:00     30049.92  
Consumption data shape: (876600, 4)
                          pricearea  groupname                   endtime  \
starttime                               

In [3]:
# ============================
# FIX DATAFRAMES FOR SPARK / CASSANDRA
# ============================

# 1Ô∏è‚É£ Reset index so 'starttime' becomes a column
df_prod = df_prod.reset_index()   # 'starttime' moves from index ‚Üí column
df_cons = df_cons.reset_index()

# 2Ô∏è‚É£ Strip any extra whitespace from column names
df_prod.columns = df_prod.columns.str.strip()
df_cons.columns = df_cons.columns.str.strip()

# 3Ô∏è‚É£ Ensure uniform group column name
# (Already 'groupname', but this covers any old variations)
if 'productiongroup' in df_prod.columns:
    df_prod = df_prod.rename(columns={"productiongroup": "groupname"})
if 'consumptiongroup' in df_cons.columns:
    df_cons = df_cons.rename(columns={"consumptiongroup": "groupname"})

# 4Ô∏è‚É£ Ensure proper types
df_prod['starttime'] = pd.to_datetime(df_prod['starttime'], utc=True, errors='coerce')
df_prod['quantitykwh'] = pd.to_numeric(df_prod['quantitykwh'], errors='coerce')

df_cons['starttime'] = pd.to_datetime(df_cons['starttime'], utc=True, errors='coerce')
df_cons['quantitykwh'] = pd.to_numeric(df_cons['quantitykwh'], errors='coerce')

# 5Ô∏è‚É£ Drop rows with missing critical values
df_prod = df_prod.dropna(subset=['starttime','pricearea','groupname','quantitykwh'])
df_cons = df_cons.dropna(subset=['starttime','pricearea','groupname','quantitykwh'])

# 6Ô∏è‚É£ Optional: drop 'endtime' if not needed in Cassandra
df_prod = df_prod.drop(columns=['endtime'], errors='ignore')
df_cons = df_cons.drop(columns=['endtime'], errors='ignore')

# ‚úÖ Check that DataFrames are ready
print("Production DataFrame ready:", df_prod.shape)
print("Consumption DataFrame ready:", df_cons.shape)
print("Production group names:", df_prod['groupname'].unique())
print("Consumption group names:", df_cons['groupname'].unique())


Production DataFrame ready: (657600, 4)
Consumption DataFrame ready: (876600, 4)
Production group names: ['hydro' 'wind' 'other' 'thermal' 'solar']
Consumption group names: ['cabin' 'tertiary' 'household' 'secondary' 'primary']


In [4]:
from cassandra.cluster import Cluster

# Connect to local Cassandra
cluster = Cluster(['127.0.0.1'])
session = cluster.connect()

# --- Create keyspace ---
session.execute("""
    CREATE KEYSPACE IF NOT EXISTS energy
    WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
""")

# --- Create table for production ---
session.execute("""
    CREATE TABLE IF NOT EXISTS energy.production_per_group (
        pricearea text,
        starttime timestamp,
        groupname text,
        quantitykwh double,
        PRIMARY KEY ((pricearea), starttime, groupname)
    );
""")

# --- Create table for consumption ---
session.execute("""
    CREATE TABLE IF NOT EXISTS energy.consumption_per_group (
        pricearea text,
        starttime timestamp,
        groupname text,
        quantitykwh double,
        PRIMARY KEY ((pricearea), starttime, groupname)
    );
""")

print("‚úÖ Keyspace and tables for production and consumption are ready")


‚úÖ Keyspace and tables for production and consumption are ready


In [5]:
# -------------------------------
# Ensure starttime is timezone-aware UTC
# -------------------------------

# Production
if df_prod['starttime'].dt.tz is None:
    df_prod['starttime'] = df_prod['starttime'].dt.tz_localize('UTC')
else:
    df_prod['starttime'] = df_prod['starttime'].dt.tz_convert('UTC')

# Consumption
if df_cons['starttime'].dt.tz is None:
    df_cons['starttime'] = df_cons['starttime'].dt.tz_localize('UTC')
else:
    df_cons['starttime'] = df_cons['starttime'].dt.tz_convert('UTC')

print("‚úÖ starttime timezone fixed to UTC for both DataFrames")


‚úÖ starttime timezone fixed to UTC for both DataFrames


In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType
import math

# --- CREATE SPARK SESSION ---
spark = SparkSession.builder \
    .appName("ElhubDataIngest") \
    .config("spark.cassandra.connection.host", "127.0.0.1") \
    .config("spark.jars.packages", "com.datastax.spark:spark-cassandra-connector_2.12:3.5.0") \
    .getOrCreate()

print("Spark version:", spark.version)

# --- DEFINE COMMON SCHEMA ---
schema = StructType([
    StructField("pricearea", StringType(), True),
    StructField("groupname", StringType(), True),
    StructField("starttime", TimestampType(), True),
    StructField("quantitykwh", DoubleType(), True)
])

# --- FUNCTION TO WRITE IN CHUNKS ---
def write_to_cassandra_fast(df_pandas, dataset_type="production", chunk_size=100_000):
    if df_pandas is None or df_pandas.empty:
        print(f"No data to write for {dataset_type}")
        return

    # Drop leftover index columns and lowercase all columns
    df_pandas = df_pandas.drop(columns=['index', 'level_0'], errors='ignore')
    df_pandas.columns = [c.lower() for c in df_pandas.columns]

    # Drop rows with missing critical values
    df_pandas = df_pandas.dropna(subset=['pricearea', 'groupname', 'starttime', 'quantitykwh'])

    total_rows = len(df_pandas)
    n_chunks = math.ceil(total_rows / chunk_size)
    print(f"Writing {total_rows} rows in {n_chunks} chunks for {dataset_type}...")

    for i in range(n_chunks):
        start_idx = i * chunk_size
        end_idx = min((i+1) * chunk_size, total_rows)
        df_chunk = df_pandas.iloc[start_idx:end_idx]

        # Reorder columns for schema
        df_chunk_spark = df_chunk[['pricearea', 'groupname', 'starttime', 'quantitykwh']]

        # Convert to Spark DataFrame
        spark_df = spark.createDataFrame(df_chunk_spark, schema=schema)

        # Rename columns for Cassandra
        if dataset_type == "production":
            spark_df_cassandra = spark_df.selectExpr(
                "pricearea",
                "starttime",
                "groupname as productiongroup",
                "quantitykwh"
            )
            table = "production_per_group"
        else:
            spark_df_cassandra = spark_df.selectExpr(
                "pricearea",
                "starttime",
                "groupname as consumptiongroup",
                "quantitykwh"
            )
            table = "consumption_per_group"

        # Reduce partitions for faster writes
        spark_df_cassandra.coalesce(1).write \
            .format("org.apache.spark.sql.cassandra") \
            .options(keyspace="energy", table=table) \
            .mode("append") \
            .save()

        print(f"‚úÖ Chunk {i+1}/{n_chunks} written ({len(df_chunk)} rows)")

# --- WRITE DATASETS ---
write_to_cassandra_fast(df_prod, "production")
write_to_cassandra_fast(df_cons, "consumption")


:: loading settings :: url = jar:file:/opt/anaconda3/envs/D2D_env/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/sarahorte/.ivy2/cache
The jars for the packages stored in: /Users/sarahorte/.ivy2/jars
com.datastax.spark#spark-cassandra-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-9e1e1ede-c2d5-45c7-aed7-d58cb73df9f2;1.0
	confs: [default]
	found com.datastax.spark#spark-cassandra-connector_2.12;3.5.0 in central
	found com.datastax.spark#spark-cassandra-connector-driver_2.12;3.5.0 in central
	found org.scala-lang.modules#scala-collection-compat_2.12;2.11.0 in central
	found com.datastax.oss#java-driver-core-shaded;4.13.0 in central
	found com.datastax.oss#native-protocol;1.5.0 in central
	found com.datastax.oss#java-driver-shaded-guava;25.1-jre-graal-sub-1 in central
	found com.typesafe#config;1.4.1 in central
	found org.slf4j#slf4j-api;1.7.26 in central
	found io.dropwizard.metrics#metrics-core;4.1.18 in central
	found org.hdrhistogram#HdrHistogram;2.1.12 in central
	found org.reactivestreams#reactive-streams

Spark version: 3.5.1
Writing 657600 rows in 7 chunks for production...


25/11/19 12:11:55 WARN TaskSetManager: Stage 0 contains a task of very large size (3451 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

‚úÖ Chunk 1/7 written (100000 rows)


25/11/19 12:11:57 WARN TaskSetManager: Stage 1 contains a task of very large size (3451 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

‚úÖ Chunk 2/7 written (100000 rows)


25/11/19 12:11:59 WARN TaskSetManager: Stage 2 contains a task of very large size (3451 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

‚úÖ Chunk 3/7 written (100000 rows)


25/11/19 12:12:01 WARN TaskSetManager: Stage 3 contains a task of very large size (3451 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

‚úÖ Chunk 4/7 written (100000 rows)


25/11/19 12:12:03 WARN TaskSetManager: Stage 4 contains a task of very large size (3451 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

‚úÖ Chunk 5/7 written (100000 rows)


25/11/19 12:12:04 WARN TaskSetManager: Stage 5 contains a task of very large size (3451 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

‚úÖ Chunk 6/7 written (100000 rows)


25/11/19 12:12:06 WARN TaskSetManager: Stage 6 contains a task of very large size (1993 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

‚úÖ Chunk 7/7 written (57600 rows)
Writing 876600 rows in 9 chunks for consumption...


25/11/19 12:12:07 WARN TaskSetManager: Stage 7 contains a task of very large size (3686 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

‚úÖ Chunk 1/9 written (100000 rows)


25/11/19 12:12:09 WARN TaskSetManager: Stage 8 contains a task of very large size (3686 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

‚úÖ Chunk 2/9 written (100000 rows)


25/11/19 12:12:10 WARN TaskSetManager: Stage 9 contains a task of very large size (3686 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

‚úÖ Chunk 3/9 written (100000 rows)


25/11/19 12:12:12 WARN TaskSetManager: Stage 10 contains a task of very large size (3686 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

‚úÖ Chunk 4/9 written (100000 rows)


25/11/19 12:12:13 WARN TaskSetManager: Stage 11 contains a task of very large size (3686 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

‚úÖ Chunk 5/9 written (100000 rows)


25/11/19 12:12:15 WARN TaskSetManager: Stage 12 contains a task of very large size (3686 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

‚úÖ Chunk 6/9 written (100000 rows)


25/11/19 12:12:16 WARN TaskSetManager: Stage 13 contains a task of very large size (3686 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

‚úÖ Chunk 7/9 written (100000 rows)


25/11/19 12:12:18 WARN TaskSetManager: Stage 14 contains a task of very large size (3686 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

‚úÖ Chunk 8/9 written (100000 rows)


25/11/19 12:12:20 WARN TaskSetManager: Stage 15 contains a task of very large size (2826 KiB). The maximum recommended task size is 1000 KiB.
[Stage 15:>                                                         (0 + 1) / 1]

‚úÖ Chunk 9/9 written (76600 rows)


                                                                                

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 54996)
Traceback (most recent call last):
  File "/opt/anaconda3/envs/D2D_env/lib/python3.11/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/anaconda3/envs/D2D_env/lib/python3.11/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/opt/anaconda3/envs/D2D_env/lib/python3.11/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/anaconda3/envs/D2D_env/lib/python3.11/socketserver.py", line 755, in __init__
    self.handle()
  File "/opt/anaconda3/envs/D2D_env/lib/python3.11/site-packages/pyspark/accumulators.py", line 295, in handle
    poll(accum_updates)
  File "/opt/anaconda3/envs/D2D_env/lib/python3.11/site-packages/pyspark/accumulators.py", line 267, in poll
    if self.rfile in r and func():
  

In [7]:
# ==========================================
# 3Ô∏è‚É£ READ DATA BACK FROM CASSANDRA
# ==========================================
def read_from_cassandra(table_name):
    """Reads a table from the 'energy' keyspace into a Spark DataFrame."""
    df = spark.read \
        .format("org.apache.spark.sql.cassandra") \
        .options(keyspace="energy", table=table_name) \
        .load()
    return df

# --- Read production table ---
cass_prod_df = read_from_cassandra("production_per_group")
print("‚úÖ Production table preview:")
cass_prod_df.show(5, truncate=False)

prod_count = cass_prod_df.count()
print(f"Total rows in production_per_group: {prod_count}")

# --- Read consumption table ---
cass_cons_df = read_from_cassandra("consumption_per_group")
print("\n‚úÖ Consumption table preview:")
cass_cons_df.show(5, truncate=False)

cons_count = cass_cons_df.count()
print(f"Total rows in consumption_per_group: {cons_count}")

# --- Optional: basic sanity check on groups ---
print("\nProduction groups in Cassandra:", cass_prod_df.select("productiongroup").distinct().collect())
print("Consumption groups in Cassandra:", cass_cons_df.select("consumptiongroup").distinct().collect())


print("Distinct production groups:", [row['productiongroup'] for row in cass_prod_df.select('productiongroup').distinct().collect()])
print("Distinct consumption groups:", [row['consumptiongroup'] for row in cass_cons_df.select('consumptiongroup').distinct().collect()])


‚úÖ Production table preview:
+---------+-------------------+---------------+-----------+
|pricearea|starttime          |productiongroup|quantitykwh|
+---------+-------------------+---------------+-----------+
|NO1      |2021-01-01 00:00:00|hydro          |2507716.8  |
|NO1      |2021-01-01 00:00:00|other          |0.0        |
|NO1      |2021-01-01 00:00:00|solar          |6.106      |
|NO1      |2021-01-01 00:00:00|thermal        |51369.035  |
|NO1      |2021-01-01 00:00:00|wind           |937.072    |
+---------+-------------------+---------------+-----------+
only showing top 5 rows

Total rows in production_per_group: 872853

‚úÖ Consumption table preview:
+---------+-------------------+----------------+-----------+
|pricearea|starttime          |consumptiongroup|quantitykwh|
+---------+-------------------+----------------+-----------+
|NO1      |2021-01-01 00:00:00|cabin           |177071.56  |
|NO1      |2021-01-01 00:00:00|household       |2366888.8  |
|NO1      |2021-01-01 00:

In [8]:
from pymongo import MongoClient, UpdateOne
import math
from concurrent.futures import ThreadPoolExecutor, as_completed

# === MongoDB Atlas connection ===
uri = "mongodb+srv://{}:{}@cluster0.qwrlccf.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"
USR, PWD = open('/Users/sarahorte/Documents/IND320/Personlig/No_sync/MongoDB').read().splitlines()
client = MongoClient(uri.format(USR, PWD))
database = client['elhub']

def upsert_chunk(df_chunk, collection_name, group_field):
    collection = database[collection_name]
    records = df_chunk.to_dict(orient='records')
    operations = [
        UpdateOne(
            {
                "pricearea": rec["pricearea"],
                "starttime": rec["starttime"],
                group_field: rec[group_field]
            },
            {"$set": rec},
            upsert=True
        )
        for rec in records
    ]
    if operations:
        collection.bulk_write(operations)
    return len(df_chunk)  # for progress tracking

def upsert_into_mongo_parallel(df, collection_name, group_field, chunk_size=50_000, max_workers=4):
    total_rows = len(df)
    n_chunks = math.ceil(total_rows / chunk_size)
    print(f"Upserting {total_rows} documents to '{collection_name}' in {n_chunks} chunks using {max_workers} workers...")

    chunks = [df.iloc[i*chunk_size : min((i+1)*chunk_size, total_rows)] for i in range(n_chunks)]
    inserted_so_far = 0

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(upsert_chunk, chunk, collection_name, group_field): i for i, chunk in enumerate(chunks)}
        for future in as_completed(futures):
            inserted = future.result()
            inserted_so_far += inserted
            percent = inserted_so_far / total_rows * 100
            print(f"‚úÖ {inserted_so_far}/{total_rows} documents upserted ({percent:.1f}% done)")

    total_docs = database[collection_name].count_documents({})
    print(f"‚úÖ Finished. Total documents in '{collection_name}': {total_docs}")


In [None]:
from pymongo import MongoClient
import math

# === MongoDB Atlas connection ===
uri = "mongodb+srv://{}:{}@cluster0.qwrlccf.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"
USR, PWD = open('/Users/sarahorte/Documents/IND320/Personlig/No_sync/MongoDB').read().splitlines()
client = MongoClient(uri.format(USR, PWD))
database = client['elhub']

# --- Fast bulk insert function ---
def bulk_insert(df, collection_name, chunk_size=100_000, clear_existing=False):
    collection = database[collection_name]
    if clear_existing:
        collection.delete_many({})
        print(f"üóë Cleared existing data in '{collection_name}'")

    total_rows = len(df)
    n_chunks = math.ceil(total_rows / chunk_size)
    print(f"Writing {total_rows} documents to '{collection_name}' in {n_chunks} chunks...")

    for i in range(n_chunks):
        start_idx = i * chunk_size
        end_idx = min((i+1) * chunk_size, total_rows)
        df_chunk = df.iloc[start_idx:end_idx]
        records = df_chunk.to_dict(orient='records')
        collection.insert_many(records)
        print(f"‚úÖ Chunk {i+1}/{n_chunks} inserted ({len(df_chunk)} docs)")

    total_docs = collection.count_documents({})
    print(f"‚úÖ Total documents in '{collection_name}': {total_docs}")

# --- Prepare DataFrames for MongoDB ---
df_prod_mongo = df_prod.copy()  # production data
df_cons_mongo = df_cons.copy()  # consumption data

# --- Run fast insertion ---
bulk_insert(df_prod_mongo, 'production_data', clear_existing=False)  # append new production
bulk_insert(df_cons_mongo, 'consumption_data', clear_existing=True)  # reload consumption


Writing 657600 documents to 'production_data' in 7 chunks...
‚úÖ Chunk 1/7 inserted (100000 docs)
‚úÖ Chunk 2/7 inserted (100000 docs)
‚úÖ Chunk 3/7 inserted (100000 docs)
‚úÖ Chunk 4/7 inserted (100000 docs)
‚úÖ Chunk 5/7 inserted (100000 docs)
‚úÖ Chunk 6/7 inserted (100000 docs)
‚úÖ Chunk 7/7 inserted (57600 docs)
‚úÖ Total documents in 'production_data': 872953
üóë Cleared existing data in 'consumption_data'
Writing 876600 documents to 'consumption_data' in 9 chunks...
‚úÖ Chunk 1/9 inserted (100000 docs)
‚úÖ Chunk 2/9 inserted (100000 docs)
‚úÖ Chunk 3/9 inserted (100000 docs)
‚úÖ Chunk 4/9 inserted (100000 docs)
‚úÖ Chunk 5/9 inserted (100000 docs)
‚úÖ Chunk 6/9 inserted (100000 docs)
‚úÖ Chunk 7/9 inserted (100000 docs)
‚úÖ Chunk 8/9 inserted (100000 docs)
‚úÖ Chunk 9/9 inserted (76600 docs)
‚úÖ Total documents in 'consumption_data': 876600


25/11/19 17:05:01 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1070137 ms exceeds timeout 120000 ms
25/11/19 17:05:01 WARN SparkContext: Killing executors is not supported by current scheduler.
25/11/19 17:05:02 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$