TODO: fetch historical data and store in hopsworks

In [1]:
import argparse
import pandas as pd
import hopsworks
from datetime import date
import json
import requests

import math


parser = argparse.ArgumentParser()
parser.add_argument("--latitude", type=float, default=59.3284)
parser.add_argument("--longitude", type=float, default=18.0664)
parser.add_argument("--sensor-id", type=int, default=20389)
parser.add_argument("--sensor-name", type=str, default="malaren_w")
parser.add_argument("--sensor-url", type=str, default="https://opendata-download-hydroobs.smhi.se/api/version/1.0/parameter/3/station/20389/period/corrected-archive/data.json")

args, _ = parser.parse_known_args()

latitude = args.latitude
longitude = args.longitude
sensor_id = args.sensor_id
sensor_name = args.sensor_name
sensor_url = args.sensor_url

### Step 1: Login to hopsworks

Environment variable `HOPSWORKS_API_KEY` should be set

In [2]:
project = hopsworks.login(
    host="eu-west.cloud.hopsworks.ai",
    port=443,
    project="ml_project",
    api_key_value="wtoN33kI3N6NEoXN.uGN8iXjW9Cqw6lSFkvNMrDJh8xe9VEXnhhWhbaeiY1DmMHspWCFJrRzYSkzdfk0R"
)

2026-01-10 19:34:54,431 INFO: Initializing external client
2026-01-10 19:34:54,432 INFO: Base URL: https://eu-west.cloud.hopsworks.ai:443
2026-01-10 19:34:56,365 INFO: Python Engine initialized.

Logged in to project, explore it here https://eu-west.cloud.hopsworks.ai:443/p/2184


### Step 2: Fetch historical water level data for a water level sensor

In [3]:
import requests
import pandas as pd


def fetch_smhi_water_level(
    api_url
) -> pd.DataFrame:
    """
    Fetch historical water level data from SMHI and return
    a DataFrame with columns:
      - date (datetime64[ns], UTC)
      - water_level_cm (float)
    """

    url = api_url
    # url = (
    #     f"https://opendata-download-hydroobs.smhi.se/api/version/1.0/"
    #     f"parameter/{parameter_id}/station/{station_id}/period/{period}/data.json"
    # )

    response = requests.get(url, timeout=60)
    response.raise_for_status()

    data = response.json()

    values = data["value"]

    df = pd.DataFrame(values)

    # Convert unix milliseconds to datetime
    df["date"] = pd.to_datetime(df["date"], unit="ms", utc=True)

    # Rename and keep only what we need
    df = df.rename(columns={"value": "water_level_cm"})

    df = df[["date", "water_level_cm"]]

    # Sort just to be safe
    df = df.sort_values("date").reset_index(drop=True)

    return df

In [4]:
df_water_level = fetch_smhi_water_level(sensor_url)
print(df_water_level)

                          date  water_level_cm
0    2014-01-12 22:00:00+00:00            97.0
1    2014-01-13 22:00:00+00:00            96.0
2    2014-01-14 22:00:00+00:00            94.0
3    2014-01-15 22:00:00+00:00            93.0
4    2014-01-16 22:00:00+00:00            92.0
...                        ...             ...
4375 2026-01-04 22:00:00+00:00            85.0
4376 2026-01-05 22:00:00+00:00            85.0
4377 2026-01-06 22:00:00+00:00            85.0
4378 2026-01-07 22:00:00+00:00            86.0
4379 2026-01-08 22:00:00+00:00            85.0

[4380 rows x 2 columns]


Only keep the data from 2020 onwards

In [5]:
cutoff = pd.Timestamp("2020-01-01", tz="UTC")

df_water_level = df_water_level[
    df_water_level["date"] >= cutoff
].reset_index(drop=True)

print(df_water_level)

                          date  water_level_cm
0    2020-01-01 22:00:00+00:00           110.0
1    2020-01-02 22:00:00+00:00           109.0
2    2020-01-03 22:00:00+00:00           108.0
3    2020-01-04 22:00:00+00:00           108.0
4    2020-01-05 22:00:00+00:00           107.0
...                        ...             ...
2195 2026-01-04 22:00:00+00:00            85.0
2196 2026-01-05 22:00:00+00:00            85.0
2197 2026-01-06 22:00:00+00:00            85.0
2198 2026-01-07 22:00:00+00:00            86.0
2199 2026-01-08 22:00:00+00:00            85.0

[2200 rows x 2 columns]


### Add lagged features

In [6]:
from typing import Iterable


def add_water_level_lags(
    df: pd.DataFrame,
    value_col: str = "water_level_cm",
    date_col: str = "date",
    lags: Iterable[int] = (1, 3, 7, 14),
) -> pd.DataFrame:
    """
    Add lagged water level features to a daily time-series DataFrame.

    Parameters
    ----------
    df : pd.DataFrame
        Input DataFrame containing date and water level columns.
    value_col : str
        Name of the water level column.
    date_col : str
        Name of the datetime column.
    lags : Iterable[int]
        Lags (in days) to compute.

    Returns
    -------
    pd.DataFrame
        DataFrame with added lagged features.
    """

    # --- Safety checks ---
    if date_col not in df.columns:
        raise ValueError(f"Missing required column: {date_col}")
    if value_col not in df.columns:
        raise ValueError(f"Missing required column: {value_col}")

    if not pd.api.types.is_datetime64_any_dtype(df[date_col]):
        raise ValueError(f"{date_col} must be datetime dtype")

    # --- Sort to ensure correct lagging ---
    df = df.sort_values(date_col).copy()

    # --- Add lag features ---
    for lag in lags:
        df[f"{value_col}_t_{lag}"] = df[value_col].shift(lag)

    return df

In [7]:
df_water_level_lagged = add_water_level_lags(df_water_level)

print(df_water_level_lagged)

                          date  water_level_cm  water_level_cm_t_1  \
0    2020-01-01 22:00:00+00:00           110.0                 NaN   
1    2020-01-02 22:00:00+00:00           109.0               110.0   
2    2020-01-03 22:00:00+00:00           108.0               109.0   
3    2020-01-04 22:00:00+00:00           108.0               108.0   
4    2020-01-05 22:00:00+00:00           107.0               108.0   
...                        ...             ...                 ...   
2195 2026-01-04 22:00:00+00:00            85.0                86.0   
2196 2026-01-05 22:00:00+00:00            85.0                85.0   
2197 2026-01-06 22:00:00+00:00            85.0                85.0   
2198 2026-01-07 22:00:00+00:00            86.0                85.0   
2199 2026-01-08 22:00:00+00:00            85.0                86.0   

      water_level_cm_t_3  water_level_cm_t_7  water_level_cm_t_14  
0                    NaN                 NaN                  NaN  
1                    Na

In [8]:
df_water_level_lagged = df_water_level_lagged.dropna()

print(df_water_level_lagged)

                          date  water_level_cm  water_level_cm_t_1  \
14   2020-01-15 22:00:00+00:00           102.0               104.0   
15   2020-01-16 22:00:00+00:00           103.0               102.0   
16   2020-01-17 22:00:00+00:00           103.0               103.0   
17   2020-01-18 22:00:00+00:00           102.0               103.0   
18   2020-01-19 22:00:00+00:00           101.0               102.0   
...                        ...             ...                 ...   
2195 2026-01-04 22:00:00+00:00            85.0                86.0   
2196 2026-01-05 22:00:00+00:00            85.0                85.0   
2197 2026-01-06 22:00:00+00:00            85.0                85.0   
2198 2026-01-07 22:00:00+00:00            86.0                85.0   
2199 2026-01-08 22:00:00+00:00            85.0                86.0   

      water_level_cm_t_3  water_level_cm_t_7  water_level_cm_t_14  
14                 103.0               105.0                110.0  
15                 104.

### Store the data in hopsworks feature store

In [9]:
fs = project.get_feature_store()

In [10]:
df_fg_level = df_water_level_lagged.copy()

# Add primary key
df_fg_level["sensor_id"] = sensor_id  # e.g. "malaren_w_20389"

# Reorder columns (optional but clean)
df_fg_level = df_fg_level[
    ["sensor_id", "date"] +
    [c for c in df_fg_level.columns if c not in ["sensor_id", "date"]]
]

df_fg_level["date"] = (
    df_fg_level["date"]
        .dt.tz_convert("UTC")
        .dt.floor("D")
)



In [11]:
print(df_fg_level["date"].head(10))

14   2020-01-15 00:00:00+00:00
15   2020-01-16 00:00:00+00:00
16   2020-01-17 00:00:00+00:00
17   2020-01-18 00:00:00+00:00
18   2020-01-19 00:00:00+00:00
19   2020-01-20 00:00:00+00:00
20   2020-01-21 00:00:00+00:00
21   2020-01-22 00:00:00+00:00
22   2020-01-23 00:00:00+00:00
23   2020-01-24 00:00:00+00:00
Name: date, dtype: datetime64[ns, UTC]


In [12]:
water_level_fg = fs.get_or_create_feature_group(
    name=f"water_level_lagged_{sensor_name}_{sensor_id}",
    description=(
        "Daily water level observations with lagged features "
        "for water level prediction"
    ),
    version=4,
    primary_key=["sensor_id", "date"],
    event_time="date",
)

In [13]:
water_level_fg.insert(df_fg_level)


2026-01-10 19:35:10,848 INFO: Computing insert statistics


(None, None)

In [14]:
# Core identifiers
water_level_fg.update_feature_description(
    "date",
    "Date of the water level observation (daily resolution, UTC)"
)

water_level_fg.update_feature_description(
    "sensor_id",
    "Unique identifier of the water level sensor / station"
)

# Raw water level
water_level_fg.update_feature_description(
    "water_level_cm",
    "Observed daily water level measured at the station (cm)"
)

# Lagged features
water_level_fg.update_feature_description(
    "water_level_cm_t_1",
    "Water level measured 1 day before the current date (cm)"
)

water_level_fg.update_feature_description(
    "water_level_cm_t_3",
    "Water level measured 3 days before the current date (cm)"
)

water_level_fg.update_feature_description(
    "water_level_cm_t_7",
    "Water level measured 7 days before the current date (cm)"
)

water_level_fg.update_feature_description(
    "water_level_cm_t_14",
    "Water level measured 14 days before the current date (cm)"
)

<hsfs.feature_group.FeatureGroup at 0x78ca34fa2a50>