In [1]:
import pandas as pd
import os
import numpy as np
import requests
import datetime
import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
import datetime
import time
import json
from geopy.geocoders import Nominatim
import matplotlib.pyplot as plt
from matplotlib.patches import Patch
from matplotlib.ticker import MultipleLocator
import openmeteo_requests
import requests_cache
from retry_requests import retry
import hsfs
from pathlib import Path
from dotenv import load_dotenv
import hopsworks
import sys

root_dir = Path().resolve().parent
sys.path.append(str(root_dir))

from format_data import format_weather_data, format_price_data, process_weather_data
from get_electricity_prices import get_data
from get_weather_data import get_historical_weather, get_weather_forecast
from entsoe_data import fetch_historical_data, ensure_valid_series



In [2]:
import hsfs

In [3]:
import hopsworks

In [2]:
load_dotenv()
entose_api = os.getenv("ENTSOE_API")
hopsworks_api = os.getenv("HOPSWORKS_API_KEY")

os.environ["HOPSWORKS_API_KEY"] = hopsworks_api

project = hopsworks.login()
print(f"Connected to project: {project.name}")

2025-01-05 15:33:53,813 INFO: Initializing external client
2025-01-05 15:33:53,813 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-01-05 15:33:55,284 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1164446
Connected to project: oskaralf


In [4]:
start_date = "2022-11-01"
end_date = "2025-01-05"
end_date = datetime.date.today().strftime('%Y-%m-%d')
hist_weather_df = get_historical_weather("Stockhom", "2022-11-01", "2025-01-05", 59.3294, 18.0687)
formatted_hist_weather_df = process_weather_data(hist_weather_df)

Coordinates 59.29701232910156°N 18.163265228271484°E
Elevation 24.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s


In [5]:
print(formatted_hist_weather_df.head())

                 date  temperature_2m  precipitation  snow_depth  \
0 2022-11-01 00:00:00        9.165000            0.0         0.0   
1 2022-11-01 01:00:00        8.764999            0.0         0.0   
2 2022-11-01 02:00:00        8.615000            0.0         0.0   
3 2022-11-01 03:00:00        8.565000            0.0         0.0   
4 2022-11-01 04:00:00        8.365000            0.0         0.0   

   pressure_msl  cloud_cover  wind_speed_10m  wind_speed_100m  \
0   1017.099976        100.0        9.114471        17.698677   
1   1017.200012        100.0        8.427383        16.622490   
2   1017.200012        100.0        8.427383        16.454008   
3   1017.099976        100.0        8.759178        17.283749   
4   1016.900024        100.0        8.640000        17.106628   

   wind_direction_10m  wind_direction_100m      city  
0           260.90979           274.666779  Stockhom  
1           250.01680           265.030334  Stockhom  
2           250.01680           259

In [6]:
entsoe_df = fetch_historical_data(entose_api, start_date, end_date)

[DEBUG] Successfully fetched load data for finland.
[DEBUG] Successfully fetched load data for norway.
[DEBUG] Successfully fetched load data for denmark.
[DEBUG] Successfully fetched cross-border flows SE3 to finland.
[DEBUG] Successfully fetched cross-border flows finland to SE3.
[DEBUG] Successfully fetched cross-border flows SE3 to norway.
[DEBUG] Successfully fetched cross-border flows norway to SE3.
[DEBUG] Successfully fetched cross-border flows SE3 to denmark.
[DEBUG] Successfully fetched cross-border flows denmark to SE3.


In [10]:
#print(formatted_hist_weather_df.head())
#print(entsoe_df.head())
entsoe_df.columns = entsoe_df.columns.str.lower()
entsoe_df.columns = entsoe_df.columns.str.lower().str.replace(' ', '_').str.replace('-', '_')
print(entsoe_df.columns)

Index(['load_se3', 'load_finland', 'load_norway', 'load_denmark', 'prices',
       'total_generation_biomass', 'total_generation_fossil_gas',
       'total_generation_fossil_hard_coal', 'total_generation_fossil_oil',
       'total_generation_hydro_run_of_river_and_poundage',
       'total_generation_other_renewable', 'total_generation_solar',
       'total_generation_waste', 'total_generation_wind_offshore',
       'total_generation_wind_onshore', 'flows_se3_to_finland',
       'flows_finland_to_se3', 'flows_se3_to_norway', 'flows_norway_to_se3',
       'flows_se3_to_denmark', 'flows_denmark_to_se3', 'date'],
      dtype='object')


In [11]:
# price_df = get_data("SE3")
# print(price_df.dtypes)
# print(price_df)

In [15]:
# price_df['time_start'] = price_df['time_start'].str.replace(r'\+\d{2}:\d{2}$', '', regex=True)
# print(price_df)
# print(price_df.dtypes)

In [13]:
# formatted_hist_weather_df['time_start'] = formatted_hist_weather_df['time_start'].str.replace(r'\+\d{2}:\d{2}$', '', regex=True)
# print(formatted_hist_weather_df)
# print(formatted_hist_weather_df.dtypes)


In [14]:
# print(price_df)
# print(price_df.dtypes)

In [17]:

# print(price_df.head())
# print(price_df.dtypes)

In [11]:
import great_expectations as ge

weather_expectation_suite = ge.core.ExpectationSuite(
    expectation_suite_name="weather_expectation_suite"
)

weather_expectation_suite.add_expectation(
    ge.core.ExpectationConfiguration(
        expectation_type="expect_column_min_to_be_between",
        kwargs={
            "column":"temperature_2m",
            "min_value":-100.0,
            "max_value":500.0,
        }
    )
)

weather_expectation_suite.add_expectation(
    ge.core.ExpectationConfiguration(
        expectation_type="expect_column_min_to_be_between",
        kwargs={
            "column":"precipitation",
            "min_value":-5.0,
            "max_value":500.0,

        }
    )
)

weather_expectation_suite.add_expectation(
    ge.core.ExpectationConfiguration(
        expectation_type="expect_column_min_to_be_between",
        kwargs={
            "column":"pressure_msl",
            "min_value":0.0,
            "max_value":2000,
        }
    )
)



weather_expectation_suite.add_expectation(
    ge.core.ExpectationConfiguration(
        expectation_type="expect_column_min_to_be_between",
        kwargs={
            "column":"cloud_cover",
            "min_value":-0.1,
            "max_value":100.0,
            "strict_min":True
        }
    )
)

weather_expectation_suite.add_expectation(
    ge.core.ExpectationConfiguration(
        expectation_type="expect_column_min_to_be_between",
        kwargs={
            "column":"wind_speed_10m",
            "min_value":-0.1,
            "max_value":1000,
            "strict_min":True
        }
    )
)



{"expectation_type": "expect_column_min_to_be_between", "kwargs": {"column": "wind_speed_10m", "min_value": -0.1, "max_value": 1000, "strict_min": true}, "meta": {}}

In [10]:
import great_expectations as ge

prices_expectation_suite = ge.core.ExpectationSuite(
    expectation_suite_name="price_expectation_suite"
)

prices_expectation_suite.add_expectation(
    ge.core.ExpectationConfiguration(
        expectation_type="expect_column_min_to_be_between",
        kwargs={
            "column":"price",
            "min_value":-0.1,
            "max_value":5000,
            "strict_min":True
        }
    )
)

{"expectation_type": "expect_column_min_to_be_between", "kwargs": {"column": "price", "min_value": -0.1, "max_value": 5000, "strict_min": true}, "meta": {}}

In [12]:
import great_expectations as ge

data_expectation_suite = ge.core.ExpectationSuite(
    expectation_suite_name="data_expectation_suite"
)
columns = [
    'load_se3', 'load_finland', 'load_norway', 'load_denmark', 'prices',
    'total_generation_biomass', 'total_generation_fossil_gas',
    'total_generation_fossil_hard_coal', 'total_generation_fossil_oil',
    'total_generation_hydro_run_of_river_and_poundage',
    'total_generation_other_renewable', 'total_generation_solar',
    'total_generation_waste', 'total_generation_wind_offshore',
    'total_generation_wind_onshore', 'flows_se3_to_finland',
    'flows_finland_to_se3', 'flows_se3_to_norway', 'flows_norway_to_se3',
    'flows_se3_to_denmark', 'flows_denmark_to_se3', 'date'
]

# Define min and max values for the expectations
min_max_values = {
    'load_se3': (0, 10000),
    'load_finland': (0, 10000),
    'load_norway': (0, 10000),
    'load_denmark': (0, 10000),
    'prices': (-100, 1000),
    'total_generation_biomass': (0, 10000),
    'total_generation_fossil_gas': (0, 10000),
    'total_generation_fossil_hard_coal': (0, 10000),
    'total_generation_fossil_oil': (0, 10000),
    'total_generation_hydro_run_of_river_and_poundage': (0, 10000),
    'total_generation_other_renewable': (0, 10000),
    'total_generation_solar': (0, 10000),
    'total_generation_waste': (0, 10000),
    'total_generation_wind_offshore': (0, 10000),
    'total_generation_wind_onshore': (0, 10000),
    'flows_se3_to_finland': (-10000, 10000),
    'flows_finland_to_se3': (-10000, 10000),
    'flows_se3_to_norway': (-10000, 10000),
    'flows_norway_to_se3': (-10000, 10000),
    'flows_se3_to_denmark': (-10000, 10000),
    'flows_denmark_to_se3': (-10000, 10000)
}

# Add expectations for each column
for column, (min_value, max_value) in min_max_values.items():
    data_expectation_suite.add_expectation(
        ge.core.ExpectationConfiguration(
            expectation_type="expect_column_min_to_be_between",
            kwargs={
                "column": column,
                "min_value": min_value,
                "max_value": max_value,
                "strict_min": True
            }
        )
    )

    print(data_expectation_suite)

{
  "expectation_suite_name": "data_expectation_suite",
  "ge_cloud_id": null,
  "expectations": [
    {
      "expectation_type": "expect_column_min_to_be_between",
      "kwargs": {
        "column": "load_se3",
        "min_value": 0,
        "max_value": 10000,
        "strict_min": true
      },
      "meta": {}
    }
  ],
  "data_asset_type": null,
  "meta": {
    "great_expectations_version": "0.18.12"
  }
}
{
  "expectation_suite_name": "data_expectation_suite",
  "ge_cloud_id": null,
  "expectations": [
    {
      "expectation_type": "expect_column_min_to_be_between",
      "kwargs": {
        "column": "load_se3",
        "min_value": 0,
        "max_value": 10000,
        "strict_min": true
      },
      "meta": {}
    },
    {
      "expectation_type": "expect_column_min_to_be_between",
      "kwargs": {
        "column": "load_finland",
        "min_value": 0,
        "max_value": 10000,
        "strict_min": true
      },
      "meta": {}
    }
  ],
  "data_asset_type":

In [13]:
try:
    fs = project.get_feature_store()
    print(f"Connected to feature store: {fs.name}")
except Exception as e:
    print(f"Failed to get feature store: {e}")

Connected to feature store: oskaralf_featurestore


In [14]:
weather_fg = fs.get_or_create_feature_group(
    name='weather_data_3',
    description='Weather data for SE3',
    version=1,
    primary_key=['date'],
    expectation_suite=weather_expectation_suite
)

In [15]:
entsoe_fg = fs.get_or_create_feature_group(
    name='entsoe_data_3',
    description='Entsoe data for SE3',
    version=1,
    primary_key=['date'],
    expectation_suite=data_expectation_suite
)

In [None]:
# price_fg = fs.get_or_create_feature_group(
#     name='electricity_price_data_3',
#     description='Electricity price data for SE3',
#     version=1,
#     primary_key=['time_start'],
#     expectation_suite=prices_expectation_suite
# )

In [16]:
weather_fg.insert(formatted_hist_weather_df)

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1164446/fs/1155149/fg/1394575
2025-01-05 15:46:56,186 INFO: 	5 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1164446/fs/1155149/fg/1394575


Uploading Dataframe: 100.00% |██████████| Rows 18480/18480 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: weather_data_3_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1164446/jobs/named/weather_data_3_1_offline_fg_materialization/executions


(Job('weather_data_3_1_offline_fg_materialization', 'SPARK'),
 {
   "success": true,
   "results": [
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_column_min_to_be_between",
         "kwargs": {
           "column": "cloud_cover",
           "min_value": -0.1,
           "max_value": 100.0,
           "strict_min": true
         },
         "meta": {
           "expectationId": 694354
         }
       },
       "result": {
         "observed_value": 0.0,
         "element_count": 18480,
         "missing_count": null,
         "missing_percent": null
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2025-01-05T02:46:56.000185Z"
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       }
     },
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_col

In [23]:
entsoe_df.columns = entsoe_df.columns.str.lower()
entsoe_df.columns = entsoe_df.columns.str.lower().str.replace(' ', '_').str.replace('-', '_')
entsoe_fg.insert(entsoe_df)

RestAPIError: Metadata operation error: (url: https://c.app.hopsworks.ai/hopsworks-api/api/project/1164446/featurestores/1155149/featuregroups). Server response: 
HTTP code: 400, HTTP reason: Bad Request, body: b'{"errorCode":270210,"usrMsg":"Expectation Kwargs contains name historical_load_SE3 which has not been found in this group\'s feature:\\n[historical_load_se3, historical_load_finland, historical_load_norway, historical_load_denmark, historical_day_ahead_prices, historical_total_generation_biomass, historical_total_generation_fossil_gas, historical_total_generation_fossil_hard_coal, historical_total_generation_fossil_oil, historical_total_generation_hydro_run_of_river_and_poundage, historical_total_generation_other_renewable, historical_total_generation_solar, historical_total_generation_waste, historical_total_generation_wind_offshore, historical_total_generation_wind_onshore, historical_physical_flows_se3_to_finland, historical_physical_flows_finland_to_se3, historical_physical_flows_se3_to_norway, historical_physical_flows_norway_to_se3, historical_physical_flows_se3_to_denmark, historical_physical_flows_denmark_to_se3, date].","errorMsg":"The Feature Name was not found in this version of the Feature Group."}', error code: 270210, error msg: The Feature Name was not found in this version of the Feature Group., user msg: Expectation Kwargs contains name historical_load_SE3 which has not been found in this group's feature:
[historical_load_se3, historical_load_finland, historical_load_norway, historical_load_denmark, historical_day_ahead_prices, historical_total_generation_biomass, historical_total_generation_fossil_gas, historical_total_generation_fossil_hard_coal, historical_total_generation_fossil_oil, historical_total_generation_hydro_run_of_river_and_poundage, historical_total_generation_other_renewable, historical_total_generation_solar, historical_total_generation_waste, historical_total_generation_wind_offshore, historical_total_generation_wind_onshore, historical_physical_flows_se3_to_finland, historical_physical_flows_finland_to_se3, historical_physical_flows_se3_to_norway, historical_physical_flows_norway_to_se3, historical_physical_flows_se3_to_denmark, historical_physical_flows_denmark_to_se3, date].

In [22]:
price_fg.insert(price_df)

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1164446/fs/1155149/fg/1393146
2024-12-23 12:41:23,583 INFO: 	1 expectation(s) included in expectation_suite.
Validation failed.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1164446/fs/1155149/fg/1393146


Uploading Dataframe: 100.00% |██████████| Rows 18815/18815 | Elapsed Time: 00:06 | Remaining Time: 00:00


Launching job: electricity_price_data_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1164446/jobs/named/electricity_price_data_1_offline_fg_materialization/executions


(Job('electricity_price_data_1_offline_fg_materialization', 'SPARK'),
 {
   "success": false,
   "results": [
     {
       "success": false,
       "expectation_config": {
         "expectation_type": "expect_column_min_to_be_between",
         "kwargs": {
           "column": "price",
           "min_value": -0.1,
           "max_value": 5000,
           "strict_min": true
         },
         "meta": {
           "expectationId": 695305
         }
       },
       "result": {
         "observed_value": -0.69112,
         "element_count": 18815,
         "missing_count": null,
         "missing_percent": null
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2024-12-23T11:41:23.000582Z"
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       }
     }
   ],
   "evaluation_parameters": {},
   "statistics": {
     "evaluated_expectations": 1,
    