In [56]:
import requests
import datetime
import pandas as pd
import requests
import hopsworks
from pathlib import Path
import json
import re
import os
import warnings
from dotenv import load_dotenv
import openmeteo_requests
import requests_cache
from retry_requests import retry

In [57]:
# Load environment variables from .env file
import os, json
from pathlib import Path
from dotenv import load_dotenv

# Loads .env locally if it exists (in GitHub Actions, it usually doesn't)
load_dotenv()

HW_API_KEY = os.getenv("HOPSWORKS_API_KEY")
HW_PROJECT = os.getenv("HOPSWORKS_PROJECT")

if not HW_API_KEY or not HW_PROJECT:
    raise ValueError("Missing HOPSWORKS_API_KEY or HOPSWORKS_PROJECT environment variables")

# Repo root in GitHub Actions; fallback to current working directory locally
root_dir = Path(os.getenv("GITHUB_WORKSPACE", Path.cwd()))

# Read locations.json
locations_path = root_dir / "locations.json"
if not locations_path.exists():
    raise FileNotFoundError(f"Could not find locations.json at: {locations_path}")

with open(locations_path, "r", encoding="utf-8") as f:
    locations = json.load(f)

# location = locations[0] # Choose location manually when running locally

# city = location["city"]
# latitude = location["latitude"]
# longitude = location["longitude"]
# price_area = location["price_area"]

# # Read location from github actions environment variables if available
# if os.environ.get("CITY"):
#     city = os.environ.get("CITY")
# if os.environ.get("LATITUDE"):
#     latitude = os.environ.get("LATITUDE")
# if os.environ.get("LONGITUDE"):
#     longitude = os.environ.get("LONGITUDE")
# if os.environ.get("PRICE_AREA"):
#     price_area = os.environ.get("PRICE_AREA")

# Connect to Hopsworks project and feature store
project = hopsworks.login(
    project=HW_PROJECT,
    api_key_value=HW_API_KEY,
    host="eu-west.cloud.hopsworks.ai",
)
fs = project.get_feature_store()

2026-01-12 10:32:38,217 INFO: Closing external client and cleaning up certificates.
2026-01-12 10:32:38,219 INFO: Connection closed.
2026-01-12 10:32:38,221 INFO: Initializing external client
2026-01-12 10:32:38,222 INFO: Base URL: https://eu-west.cloud.hopsworks.ai:443
2026-01-12 10:32:39,656 INFO: Python Engine initialized.

Logged in to project, explore it here https://eu-west.cloud.hopsworks.ai:443/p/2186


In [58]:
# Retrieve feature groups
electricity_fg = fs.get_feature_group(
    name='electricity_hourly',
    version=2,
)
weather_fg = fs.get_feature_group(
    name='weather',
    version=1,
)

# Daily weather pipeline

In [59]:
import pandas as pd
import requests_cache
import openmeteo_requests
from retry_requests import retry

def get_weather_forecast(locations, weather_key="se3_set_v1"):
    """
    locations: list of dicts with keys:
      - city (sanitized, e.g. "vasteras")
      - latitude
      - longitude
      - price_area (unused here, but ok to include)
    returns: wide df with one row per UTC timestamp and one column per (variable, city)
    """

    # Cache for 1 hour
    cache_session = requests_cache.CachedSession(".cache", expire_after=3600)
    retry_session = retry(cache_session, retries=5, backoff_factor=0.2)
    openmeteo = openmeteo_requests.Client(session=retry_session)

    url = "https://api.open-meteo.com/v1/forecast"

    wide_parts = []

    for loc in locations:
        city = loc["city"]          # already sanitized
        latitude = loc["latitude"]
        longitude = loc["longitude"]

        params = {
            "latitude": latitude,
            "longitude": longitude,
            "hourly": ["temperature_2m", "precipitation", "cloud_cover", "wind_speed_10m"],
            "timezone": "UTC",  # important: force UTC timestamps
        }

        responses = openmeteo.weather_api(url, params=params)
        response = responses[0]

        hourly = response.Hourly()
        temperature_2m = hourly.Variables(0).ValuesAsNumpy()
        precipitation = hourly.Variables(1).ValuesAsNumpy()
        cloud_cover = hourly.Variables(2).ValuesAsNumpy()
        wind_speed_10m = hourly.Variables(3).ValuesAsNumpy()

        dates = pd.date_range(
            start=pd.to_datetime(hourly.Time(), unit="s", utc=True),
            end=pd.to_datetime(hourly.TimeEnd(), unit="s", utc=True),
            freq=pd.Timedelta(seconds=hourly.Interval()),
            inclusive="left",
        )

        df_city = pd.DataFrame(
            {
                "date": dates,
                f"temperature_2m_{city}": temperature_2m,
                f"precipitation_{city}": precipitation,
                f"cloud_cover_{city}": cloud_cover,
                f"wind_speed_10m_{city}": wind_speed_10m,
            }
        ).dropna()

        wide_parts.append(df_city)

    # Outer merge on date to align all cities
    wide = wide_parts[0]
    for part in wide_parts[1:]:
        wide = wide.merge(part, on="date", how="outer")

    wide = wide.sort_values("date").reset_index(drop=True)
    wide["weather_key"] = weather_key

    return wide


In [60]:
weather_forecast_df = get_weather_forecast(locations)

In [61]:
weather_forecast_df

Unnamed: 0,date,temperature_2m_stockholm,precipitation_stockholm,cloud_cover_stockholm,wind_speed_10m_stockholm,temperature_2m_uppsala,precipitation_uppsala,cloud_cover_uppsala,wind_speed_10m_uppsala,temperature_2m_vasteras,...,wind_speed_10m_karlstad,temperature_2m_sundsvall,precipitation_sundsvall,cloud_cover_sundsvall,wind_speed_10m_sundsvall,temperature_2m_malmo,precipitation_malmo,cloud_cover_malmo,wind_speed_10m_malmo,weather_key
0,2026-01-12 00:00:00+00:00,-3.2825,0.1,100.0,8.280000,-4.937,0.0,100.0,14.400000,-7.272,...,5.400000,-13.033501,0.0,92.0,8.640000,-5.6955,0.0,11.0,4.320000,se3_set_v1
1,2026-01-12 01:00:00+00:00,-3.2825,0.1,100.0,8.280000,-4.887,0.0,100.0,14.040000,-6.722,...,5.400000,-12.133499,0.0,98.0,8.640000,-6.2455,0.0,51.0,4.680000,se3_set_v1
2,2026-01-12 02:00:00+00:00,-3.4325,0.1,100.0,8.640000,-4.437,0.0,100.0,13.320000,-6.272,...,4.320000,-11.433500,0.0,98.0,9.000000,-6.7455,0.0,98.0,7.559999,se3_set_v1
3,2026-01-12 03:00:00+00:00,-3.4825,0.0,97.0,8.640000,-4.187,0.0,100.0,12.599999,-6.072,...,5.040000,-10.733500,0.0,93.0,7.559999,-6.6955,0.0,93.0,8.280000,se3_set_v1
4,2026-01-12 04:00:00+00:00,-3.4325,0.0,91.0,9.000000,-3.987,0.1,100.0,9.360000,-5.722,...,5.760000,-10.483500,0.0,89.0,6.840000,-6.3955,0.0,52.0,8.640000,se3_set_v1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
163,2026-01-18 19:00:00+00:00,-1.4500,0.0,88.0,11.398420,-1.500,0.0,100.0,13.040997,-0.800,...,16.032129,-3.300000,0.0,100.0,10.914888,-0.9085,0.0,100.0,26.865265,se3_set_v1
164,2026-01-18 20:00:00+00:00,-1.3000,0.0,88.0,11.672275,-1.450,0.0,100.0,13.379461,-0.800,...,16.391863,-3.350000,0.0,100.0,10.739833,-1.2085,0.0,100.0,27.158533,se3_set_v1
165,2026-01-18 21:00:00+00:00,-1.1000,0.0,88.0,11.953409,-1.350,0.0,100.0,13.661038,-0.800,...,16.575644,-3.350000,0.0,100.0,10.390226,-1.5085,0.0,100.0,27.209188,se3_set_v1
166,2026-01-18 22:00:00+00:00,-0.8500,0.0,89.0,12.241323,-1.250,0.0,100.0,14.120539,-0.850,...,16.956343,-3.400000,0.0,100.0,10.041354,-1.8085,0.0,100.0,27.413237,se3_set_v1


In [62]:
# Insert data
weather_fg.insert(weather_forecast_df, wait=True)



2026-01-12 10:33:10,884 INFO: Computing insert statistics


(None, None)

# Daily electricity pipeline

In [63]:
def get_prices(date, price_class):
    url = (
        f"https://www.elprisetjustnu.se/api/v1/prices/"
        f"{date.year}/{date.month:02d}-{date.day:02d}_{price_class}.json"
    )
    r = requests.get(url)
    r.raise_for_status()
    return r.json()


In [64]:
price_area = "SE3"
electricity_data = get_prices(datetime.datetime.now(), price_area)
electricity_daily_df = pd.DataFrame(electricity_data)

In [65]:
# --- Parse timestamps to UTC (DST-safe) ---
time_start_utc = pd.to_datetime(electricity_daily_df["time_start"], utc=True)
time_end_utc   = pd.to_datetime(electricity_daily_df["time_end"], utc=True)

# --- Compute resolution in minutes ---
electricity_daily_df = electricity_daily_df.copy()
electricity_daily_df["resolution_minutes"] = (
    (time_end_utc - time_start_utc).dt.total_seconds().astype(int) // 60
)

# Use start time as event time (UTC)
electricity_daily_df["date"] = time_start_utc

# Add hour bucket for aggregation
electricity_daily_df["date_hour"] = electricity_daily_df["date"].dt.floor("h")

# --- Aggregate to hourly (works for both 60-min and 15-min rows) ---
# If an hour already has a 60-min row, mean() just returns that value.
# If an hour has four 15-min rows, mean() aggregates them to hourly.
electricity_hourly = (
    electricity_daily_df.groupby(["date_hour"], as_index=False)
      .agg(
          SEK_per_kWh=("SEK_per_kWh", "mean"),
          EUR_per_kWh=("EUR_per_kWh", "mean"),
          EXR=("EXR", "last"),  # or "mean"
          n_intervals=("resolution_minutes", "size"),  # 1 (hourly) or 4 (15-min)
          resolution_minutes=("resolution_minutes", "sum"),  # should be 60 for a full hour
      )
      .rename(columns={"date_hour": "date"})
)

# Add price area
electricity_hourly["price_area"] = "SE3"

# Optional: sanity checks
print("n_intervals distribution:\n", electricity_hourly["n_intervals"].value_counts().sort_index())
print("resolution_minutes distribution:\n", electricity_hourly["resolution_minutes"].value_counts().sort_index())

# Reorder columns
electricity_hourly = electricity_hourly[
    ["date", "price_area", "SEK_per_kWh", "EUR_per_kWh", "EXR", "resolution_minutes", "n_intervals"]
]

electricity_hourly["weather_key"] = "se3_set_v1"

print(electricity_hourly.dtypes)
print(electricity_hourly.head())


n_intervals distribution:
 n_intervals
4    24
Name: count, dtype: int64
resolution_minutes distribution:
 resolution_minutes
60    24
Name: count, dtype: int64
date                  datetime64[ns, UTC]
price_area                         object
SEK_per_kWh                       float64
EUR_per_kWh                       float64
EXR                               float64
resolution_minutes                  int64
n_intervals                         int64
weather_key                        object
dtype: object
                       date price_area  SEK_per_kWh  EUR_per_kWh        EXR  \
0 2026-01-11 23:00:00+00:00        SE3     0.851387     0.079482  10.711646   
1 2026-01-12 00:00:00+00:00        SE3     0.839233     0.078348  10.711646   
2 2026-01-12 01:00:00+00:00        SE3     0.833152     0.077780  10.711646   
3 2026-01-12 02:00:00+00:00        SE3     0.820885     0.076635  10.711646   
4 2026-01-12 03:00:00+00:00        SE3     0.823085     0.076840  10.711646   

   resolution_

In [66]:
# Insert data
electricity_fg.insert(electricity_hourly, wait=True)



2026-01-12 10:33:24,630 INFO: Computing insert statistics


(None, None)