# 07 — Realtime Feature Update Job (Modal, every 30 minutes)

This notebook is designed to run **headlessly** (e.g., on Modal) every 30 minutes.

It:
1. Logs into Hopsworks
2. Loads monitoring points from metadata Feature Group
3. Pulls **current** TomTom traffic data for each point
4. Inserts raw rows into `traffic_flow_fg` (10-minute bucket `ts_10m`)
5. Computes the **same engineered features** as your historical backfill notebook and inserts them into `traffic_temporal_fg`
6. Updates hourly **weather** features from Open-Meteo and hourly **TfL disruptions** features (optional)

### Notes on consistency
Your historical engineering uses rolling windows defined in **number of rows** (10-min buckets).
If you only fetch TomTom every 30 minutes, you will have fewer observations and the *effective time span*
of a “window=3” rolling feature will become ~90 minutes.

If you need strict *time-based* window semantics (e.g., exactly 30/60/120 minutes), you should either:
- collect every 10 minutes, OR
- redefine rolling features in minutes and backfill them consistently.

For now, this notebook reproduces the **same feature definitions** as in the historical code.


In [1]:
# If you run this notebook outside an environment that already has dependencies,
# uncomment the following line:
# !pip install -U hopsworks pandas numpy requests

import os
import json
import time
from datetime import datetime, timezone
from typing import Dict, List, Optional

import numpy as np
import pandas as pd
import requests
import hopsworks

pd.set_option("display.max_columns", None)
pd.set_option("display.width", 200)


  from .autonotebook import tqdm as notebook_tqdm


## 1) Configuration

In [6]:
# -------------------------
# Run time
# -------------------------
# For Modal/papermill runs you can pass RUN_TS_UTC="2026-01-09T12:30:00Z"
RUN_TS_UTC = os.getenv("RUN_TS_UTC", "")
if RUN_TS_UTC:
    run_ts = pd.to_datetime(RUN_TS_UTC, utc=True, errors="raise")
else:
    run_ts = pd.Timestamp.now(tz="UTC")

# Use the same 10-minute bucketing as your raw Feature Group
ts_10m = run_ts.floor("10min")

# Hourly buckets for weather and TfL
weather_time_utc = ts_10m.floor("H")
tfl_time_utc = ts_10m.floor("H")

print("run_ts:", run_ts)
print("ts_10m:", ts_10m)
print("weather_time_utc:", weather_time_utc)
print("tfl_time_utc:", tfl_time_utc)

# -------------------------
# Hopsworks connection
# -------------------------
HOPSWORKS_HOST = os.getenv("HOPSWORKS_HOST", "")
HOPSWORKS_PROJECT = os.getenv("HOPSWORKS_PROJECT", "")
HOPSWORKS_API_KEY = os.getenv("HOPSWORKS_API_KEY", "")

# -------------------------
# API Keys
# -------------------------
TOMTOM_API_KEY = os.getenv("TOMTOM_API_KEY", "")
TFL_APP_ID = os.getenv("TFL_APP_ID", "")
TFL_APP_KEY = os.getenv("TFL_APP_KEY", "")

# -------------------------
# Feature Groups (defaults aligned with your existing notebooks)
# -------------------------
RAW_FG_NAME = os.getenv("RAW_FG_NAME", "traffic_flow_fg")
RAW_FG_VERSION = int(os.getenv("RAW_FG_VERSION", "1"))

ENGINEERED_FG_NAME = os.getenv("ENGINEERED_FG_NAME", "traffic_temporal_fg")
ENGINEERED_FG_VERSION = int(os.getenv("ENGINEERED_FG_VERSION", "1"))

METADATA_FG_NAME = os.getenv("METADATA_FG_NAME", "traffic_points_metadata")
METADATA_FG_VERSION = int(os.getenv("METADATA_FG_VERSION", "1"))

WEATHER_FG_NAME = os.getenv("WEATHER_FG_NAME", "weather_hourly_fg")
WEATHER_FG_VERSION = int(os.getenv("WEATHER_FG_VERSION", "1"))

TFL_FG_NAME = os.getenv("TFL_FG_NAME", "tfl_disruptions_hourly_fg")
TFL_FG_VERSION = int(os.getenv("TFL_FG_VERSION", "1"))

# -------------------------
# Feature engineering params (match your backfill defaults)
# -------------------------
LOW_CONF_THRESHOLD = float(os.getenv("LOW_CONF_THRESHOLD", "0.7"))
ROLL_WINDOWS = [3, 6, 12]  # designed for 10-min buckets

# -------------------------
# Controls
# -------------------------
INSERT_RAW = os.getenv("INSERT_RAW", "1") == "1"
INSERT_ENGINEERED = os.getenv("INSERT_ENGINEERED", "1") == "1"
UPDATE_WEATHER = os.getenv("UPDATE_WEATHER", "1") == "1"
UPDATE_TFL = os.getenv("UPDATE_TFL", "1") == "1"

LOOKBACK_HOURS = int(os.getenv("LOOKBACK_HOURS", "24"))  # used to compute rolling features reliably

assert TOMTOM_API_KEY, "Missing TOMTOM_API_KEY env var"



run_ts: 2026-01-10 14:42:22.386980+00:00
ts_10m: 2026-01-10 14:40:00+00:00
weather_time_utc: 2026-01-10 14:00:00+00:00
tfl_time_utc: 2026-01-10 14:00:00+00:00




## 2) Login to Hopsworks

In [7]:
project = hopsworks.login(
    host="eu-west.cloud.hopsworks.ai",
    project="London_traffic"
)
fs = project.get_feature_store()
raw_fg = fs.get_feature_group(name=RAW_FG_NAME, version=RAW_FG_VERSION)
engineered_fg = fs.get_feature_group(name=ENGINEERED_FG_NAME, version=ENGINEERED_FG_VERSION)
meta_fg = fs.get_feature_group(name=METADATA_FG_NAME, version=METADATA_FG_VERSION)

print("Connected. Feature store:", fs.name)


2026-01-10 15:42:26,044 INFO: Closing external client and cleaning up certificates.
2026-01-10 15:42:26,064 INFO: Connection closed.
2026-01-10 15:42:26,066 INFO: Initializing external client
2026-01-10 15:42:26,066 INFO: Base URL: https://eu-west.cloud.hopsworks.ai:443
2026-01-10 15:42:27,295 INFO: Python Engine initialized.

Logged in to project, explore it here https://eu-west.cloud.hopsworks.ai:443/p/3209
Connected. Feature store: london_traffic_featurestore


## 3) Load monitoring points (metadata)

In [8]:
meta_df = meta_fg.read()
required = {"point_id", "latitude", "longitude"}
missing = required - set(meta_df.columns)
if missing:
    raise ValueError(f"Metadata FG missing required columns: {missing}")

points = meta_df[["point_id", "latitude", "longitude"]].copy()
points["point_id"] = points["point_id"].astype(str)
points["latitude"] = pd.to_numeric(points["latitude"], errors="coerce")
points["longitude"] = pd.to_numeric(points["longitude"], errors="coerce")
points = points.dropna(subset=["point_id", "latitude", "longitude"]).reset_index(drop=True)

print("Loaded points:", len(points))
points.head()


Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.10s) 
Loaded points: 200


Unnamed: 0,point_id,latitude,longitude
0,8618,51.509307,-0.084878
1,16756,51.383489,-0.105944
2,38815,51.393451,0.029589
3,6458,51.573103,-0.212077
4,38022,51.589213,0.270734


## 4) Fetch TomTom Flow data for all points

In [15]:
MAX_POINTS = 50

points_subset = (
    points
    .dropna(subset=["latitude", "longitude"])
    .head(MAX_POINTS)
    .reset_index(drop=True)
)

def fetch_tomtom_flow(lat: float, lon: float, *, zoom: int = 10, unit: str = "kmph") -> Dict:
    # TomTom Flow Segment Data API (v4)
    url = f"https://api.tomtom.com/traffic/services/4/flowSegmentData/absolute/{zoom}/json"
    params = {
        "key": TOMTOM_API_KEY,
        "point": f"{lat},{lon}",
        "unit": unit,
    }
    r = requests.get(url, params=params, timeout=30)
    r.raise_for_status()
    return r.json()

def parse_tomtom_payload(payload: Dict) -> Dict:
    fsd = payload.get("flowSegmentData", {}) if isinstance(payload, dict) else {}
    frc = fsd.get("frc")
    current_speed = fsd.get("currentSpeed")
    free_flow_speed = fsd.get("freeFlowSpeed")
    current_tt = fsd.get("currentTravelTime")
    free_flow_tt = fsd.get("freeFlowTravelTime")
    confidence = fsd.get("confidence")
    road_closure = fsd.get("roadClosure")

    # Derived
    speed_ratio = None
    delay_seconds = None
    try:
        if current_speed is not None and free_flow_speed:
            speed_ratio = float(current_speed) / float(free_flow_speed)
    except Exception:
        speed_ratio = None

    try:
        if current_tt is not None and free_flow_tt is not None:
            delay_seconds = float(current_tt) - float(free_flow_tt)
    except Exception:
        delay_seconds = None

    return {
        "current_speed": float(current_speed) if current_speed is not None else np.nan,
        "free_flow_speed": float(free_flow_speed) if free_flow_speed is not None else np.nan,
        "frc": str(frc) if frc is not None else np.nan,
        "current_travel_time": float(current_tt) if current_tt is not None else np.nan,
        "free_flow_travel_time": float(free_flow_tt) if free_flow_tt is not None else np.nan,
        "confidence": float(confidence) if confidence is not None else np.nan,
        "road_closure": bool(road_closure) if road_closure is not None else False,
        "speed_ratio": float(speed_ratio) if speed_ratio is not None else np.nan,
        "delay_seconds": float(delay_seconds) if delay_seconds is not None else np.nan,
    }

rows = []
errors = 0

for _, p in points_subset.iterrows():
    pid = str(p["point_id"])
    lat = float(p["latitude"])
    lon = float(p["longitude"])
    try:
        payload = fetch_tomtom_flow(lat, lon)
        parsed = parse_tomtom_payload(payload)
        rows.append({
            "point_id": pid,
            "timestamp_utc": run_ts,
            "ts_10m": ts_10m,
            **parsed,
        })
    except Exception as e:
        errors += 1
        rows.append({
            "point_id": pid,
            "timestamp_utc": run_ts,
            "ts_10m": ts_10m,
            "error": str(e),
        })

raw_tick_df = pd.DataFrame(rows)

print("Raw tick df shape:", raw_tick_df.shape, "errors:", errors)

raw_tick_df.head()


Raw tick df shape: (50, 12) errors: 0


Unnamed: 0,point_id,timestamp_utc,ts_10m,current_speed,free_flow_speed,frc,current_travel_time,free_flow_travel_time,confidence,road_closure,speed_ratio,delay_seconds
0,8618,2026-01-10 14:42:22.386980+00:00,2026-01-10 14:40:00+00:00,23.0,29.0,FRC2,572.0,454.0,1.0,False,0.793103,118.0
1,16756,2026-01-10 14:42:22.386980+00:00,2026-01-10 14:40:00+00:00,4.0,18.0,FRC3,223.0,49.0,1.0,False,0.222222,174.0
2,38815,2026-01-10 14:42:22.386980+00:00,2026-01-10 14:40:00+00:00,33.0,44.0,FRC3,49.0,37.0,1.0,False,0.75,12.0
3,6458,2026-01-10 14:42:22.386980+00:00,2026-01-10 14:40:00+00:00,48.0,58.0,FRC2,176.0,146.0,1.0,False,0.827586,30.0
4,38022,2026-01-10 14:42:22.386980+00:00,2026-01-10 14:40:00+00:00,108.0,108.0,FRC0,132.0,132.0,1.0,False,1.0,0.0


## 5) Insert RAW tick into `traffic_flow_fg` (optional, recommended)

In [16]:
if INSERT_RAW:
    # Keep only columns expected by your raw FG schema. If schema differs, adjust here.
    # We also ensure road_closure is numpy bool (not pandas nullable boolean).
    keep_cols = [
        "point_id", "timestamp_utc", "ts_10m", "frc",
        "current_speed", "free_flow_speed", "current_travel_time", "free_flow_travel_time",
        "confidence", "road_closure", "speed_ratio", "delay_seconds"
    ]
    missing = [c for c in keep_cols if c not in raw_tick_df.columns]
    if missing:
        raise ValueError(f"raw_tick_df missing columns: {missing}")

    df_ins = raw_tick_df[keep_cols].copy()
    df_ins["point_id"] = df_ins["point_id"].astype(str)
    df_ins["timestamp_utc"] = pd.to_datetime(df_ins["timestamp_utc"], utc=True)
    df_ins["ts_10m"] = pd.to_datetime(df_ins["ts_10m"], utc=True)

    # Fix Avro bool edge case
    df_ins["road_closure"] = df_ins["road_closure"].fillna(False).astype(bool)

    # Deduplicate on PK
    df_ins = df_ins.drop_duplicates(subset=["point_id", "ts_10m"], keep="last")

    # Avoid inserting duplicates if rerun
    existing_raw = raw_fg.read()
    existing_raw["ts_10m"] = pd.to_datetime(existing_raw["ts_10m"], utc=True, errors="coerce")
    existing_keys = set(zip(existing_raw["point_id"].astype(str), existing_raw["ts_10m"]))
    df_ins["__key"] = list(zip(df_ins["point_id"].astype(str), df_ins["ts_10m"]))
    df_ins = df_ins[~df_ins["__key"].isin(existing_keys)].drop(columns="__key")

    print("Raw rows to insert:", len(df_ins))
    if len(df_ins) > 0:
        raw_fg.insert(df_ins)
        print("Inserted raw tick.")
    else:
        print("No new raw rows (already present).")
else:
    print("Skipping RAW insert (INSERT_RAW=0).")


Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (2.09s) 
Raw rows to insert: 50


Uploading Dataframe: 100.00% |██████████| Rows 50/50 | Elapsed Time: 00:00 | Remaining Time: 00:00


Launching job: traffic_flow_fg_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://eu-west.cloud.hopsworks.ai:443/p/3209/jobs/named/traffic_flow_fg_1_offline_fg_materialization/executions
Inserted raw tick.


## 6) Compute engineered features for this tick and insert into `traffic_temporal_fg`

In [20]:
def time_interval(hour: int) -> str:
    if 7 <= hour < 10:
        return "morning_peak"
    elif 10 <= hour < 16:
        return "midday"
    elif 16 <= hour < 19:
        return "evening_peak"
    else:
        return "night"

def build_engineered_features(df_raw: pd.DataFrame) -> pd.DataFrame:
    df = df_raw.copy()

    # Ensure types
    df["point_id"] = df["point_id"].astype(str)
    df["ts_10m"] = pd.to_datetime(df["ts_10m"], utc=True, errors="coerce")
    df["timestamp_utc"] = pd.to_datetime(df.get("timestamp_utc", df["ts_10m"]), utc=True, errors="coerce")

    # Sort + dedupe
    df = df.dropna(subset=["point_id", "ts_10m"])
    df = df.sort_values(["point_id", "ts_10m"]).drop_duplicates(subset=["point_id", "ts_10m"], keep="last")

    # Temporal features (same as your backfill)
    df["day_of_week"] = df["ts_10m"].dt.weekday
    df["is_weekend"] = df["day_of_week"].isin([5, 6]).astype(int)

    df["hour"] = df["ts_10m"].dt.hour
    df["minute"] = df["ts_10m"].dt.minute

    df["is_rush_hour"] = ((df["hour"].between(7, 9)) | (df["hour"].between(16, 18))).astype(int)

    df["time_interval"] = df["hour"].apply(time_interval)
    dummies = pd.get_dummies(df["time_interval"], prefix="ti")

    # Ensure stable dummy columns
    for col in ["ti_morning_peak", "ti_midday", "ti_evening_peak", "ti_night"]:
        if col not in dummies.columns:
            dummies[col] = 0

    df = pd.concat([df.drop(columns=["time_interval"]), dummies[["ti_morning_peak","ti_midday","ti_evening_peak","ti_night"]]], axis=1)
    #change to bool
    df["ti_morning_peak"] = df["ti_morning_peak"].astype(bool)
    df["ti_midday"] = df["ti_midday"].astype(bool)
    df["ti_evening_peak"] = df["ti_evening_peak"].astype(bool)
    df["ti_night"] = df["ti_night"].astype(bool)
    # Traffic features (same as your backfill)
    eps = 1e-6
    df["speed_diff"] = df["free_flow_speed"] - df["current_speed"]
    df["travel_time_ratio"] = df["current_travel_time"] / (df["free_flow_travel_time"] + eps)
    df["low_confidence_flag"] = (df["confidence"] < LOW_CONF_THRESHOLD).astype(int)
    df["travel_time_ratio"] = df["travel_time_ratio"].clip(lower=0, upper=10)

    # Rolling features over rows (same as your backfill)
    for w in ROLL_WINDOWS:
        df[f"speed_roll_mean_{w}"] = (
            df.groupby("point_id")["current_speed"]
              .rolling(window=w, min_periods=1)
              .mean()
              .reset_index(level=0, drop=True)
        )
        df[f"speed_roll_std_{w}"] = (
            df.groupby("point_id")["current_speed"]
              .rolling(window=w, min_periods=1)
              .std()
              .reset_index(level=0, drop=True)
              .fillna(0.0)
        )
        df[f"delay_roll_mean_{w}"] = (
            df.groupby("point_id")["delay_seconds"]
              .rolling(window=w, min_periods=1)
              .mean()
              .reset_index(level=0, drop=True)
        )

    return df

if INSERT_ENGINEERED:
    # Read history for lookback
    hist = raw_fg.read()
    hist["ts_10m"] = pd.to_datetime(hist["ts_10m"], utc=True, errors="coerce")
    hist["timestamp_utc"] = pd.to_datetime(hist.get("timestamp_utc", hist["ts_10m"]), utc=True, errors="coerce")
    hist["point_id"] = hist["point_id"].astype(str)

    lookback_start = ts_10m - pd.Timedelta(hours=LOOKBACK_HOURS)
    hist = hist[hist["ts_10m"] >= lookback_start].copy()

    # Append current tick 
    current_raw = raw_tick_df.copy()
    # Keep only required raw columns for feature engineering
    needed = [
        "point_id", "timestamp_utc", "ts_10m", "frc",
        "current_speed", "free_flow_speed", "current_travel_time", "free_flow_travel_time",
        "confidence", "road_closure", "speed_ratio", "delay_seconds"
    ]
    current_raw = current_raw[needed].copy()
    current_raw["timestamp_utc"] = pd.to_datetime(current_raw["timestamp_utc"], utc=True)
    current_raw["ts_10m"] = pd.to_datetime(current_raw["ts_10m"], utc=True)

    combined = pd.concat([hist, current_raw], ignore_index=True)
    combined = combined.drop_duplicates(subset=["point_id", "ts_10m"], keep="last")
    combined = combined.sort_values(["point_id", "ts_10m"]).reset_index(drop=True)

    eng_all = build_engineered_features(combined)

    # Keep only current tick rows
    eng_tick = eng_all[eng_all["ts_10m"] == ts_10m].copy()
    eng_tick = eng_tick.drop_duplicates(subset=["point_id", "ts_10m"], keep="last")

    # Align columns to FG schema (important!)
    schema_cols = [f.name for f in engineered_fg.features]
    for c in schema_cols:
        if c not in eng_tick.columns:
            eng_tick[c] = np.nan

    eng_tick = eng_tick[schema_cols].copy()

    # Avoid duplicates if rerun
    existing_eng = engineered_fg.read()
    existing_eng["ts_10m"] = pd.to_datetime(existing_eng["ts_10m"], utc=True, errors="coerce")
    existing_keys = set(zip(existing_eng["point_id"].astype(str), existing_eng["ts_10m"]))
    eng_tick["__key"] = list(zip(eng_tick["point_id"].astype(str), eng_tick["ts_10m"]))
    eng_tick = eng_tick[~eng_tick["__key"].isin(existing_keys)].drop(columns="__key")

    print("Engineered rows to insert:", len(eng_tick))
    if len(eng_tick) > 0:
        engineered_fg.insert(eng_tick)
        print("Inserted engineered tick.")
    else:
        print("No new engineered rows (already present).")
else:
    print("Skipping engineered insert (INSERT_ENGINEERED=0).")


Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.37s) 
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.72s) 
Engineered rows to insert: 50


Uploading Dataframe: 100.00% |██████████| Rows 50/50 | Elapsed Time: 00:00 | Remaining Time: 00:00


Launching job: traffic_temporal_fg_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://eu-west.cloud.hopsworks.ai:443/p/3209/jobs/named/traffic_temporal_fg_1_offline_fg_materialization/executions
Inserted engineered tick.


%6|1768058159.066|FAIL|rdkafka#producer-3| [thrd:ssl://57.130.28.56:9093/0]: ssl://57.130.28.56:9093/0: Disconnected: SSL connection closed by peer (after 50071ms in state UP)


## 7) Update hourly Weather features from Open-Meteo (optional)

In [28]:
def fetch_openmeteo_hourly(lat: float, lon: float, hour_utc: pd.Timestamp) -> Dict[str, float]:
    # Forecast API is better for "now" 
    url = "https://api.open-meteo.com/v1/forecast"
    hourly_vars = [
        "temperature_2m", "precipitation", "rain", "snowfall",
    ]
    params = {
        "latitude": lat,
        "longitude": lon,
        "hourly": ",".join(hourly_vars),
        "timezone": "UTC",
        "past_days": 1,
    }
    r = requests.get(url, params=params, timeout=60)
    r.raise_for_status()
    js = r.json()

    times = js.get("hourly", {}).get("time", [])
    if not times:
        raise RuntimeError("Open-Meteo returned no hourly times")

    t = pd.to_datetime(times, utc=True, errors="coerce")
    idx = int(np.argmin(np.abs((t - hour_utc).total_seconds())))

    out = {}
    for v in hourly_vars:
        arr = js.get("hourly", {}).get(v, [])
        out[v] = float(arr[idx]) if idx < len(arr) and arr[idx] is not None else np.nan
    return out

if UPDATE_WEATHER:
    weather_fg = fs.get_feature_group(name=WEATHER_FG_NAME, version=WEATHER_FG_VERSION)

    # Load existing keys for this hour to avoid duplicates
    existing_weather = weather_fg.read()
    if "weather_time_utc" in existing_weather.columns:
        existing_weather["weather_time_utc"] = pd.to_datetime(existing_weather["weather_time_utc"], utc=True, errors="coerce")
        existing_weather["point_id"] = existing_weather["point_id"].astype(str)
        existing_keys = set(zip(existing_weather["point_id"], existing_weather["weather_time_utc"]))
    else:
        existing_keys = set()

    weather_rows = []
    for _, p in points.iterrows():
        pid = str(p["point_id"])
        lat = float(p["latitude"])
        lon = float(p["longitude"])
        key = (pid, weather_time_utc)
        if key in existing_keys:
            continue
        try:
            vals = fetch_openmeteo_hourly(lat, lon, weather_time_utc)
            weather_rows.append({"point_id": pid, "weather_time_utc": weather_time_utc, **vals})
        except Exception as e:
            weather_rows.append({"point_id": pid, "weather_time_utc": weather_time_utc, "error": str(e)})

    weather_df_new = pd.DataFrame(weather_rows)

    if len(weather_df_new) > 0:
        # Drop error column if schema does not include it
        schema_cols = [f.name for f in weather_fg.features]
        if "error" in weather_df_new.columns and "error" not in schema_cols:
            weather_df_new = weather_df_new.drop(columns=["error"])

        for c in schema_cols:
            if c not in weather_df_new.columns:
                weather_df_new[c] = np.nan
        weather_df_new = weather_df_new[schema_cols]

        print("Weather rows to insert:", len(weather_df_new))
        weather_fg.insert(weather_df_new)
        print("Inserted weather hour.")
    else:
        print("No new weather rows for this hour (already present).")
else:
    print("Skipping weather update (UPDATE_WEATHER=0).")


%6|1768059547.378|FAIL|rdkafka#producer-9| [thrd:ssl://57.130.18.242:9093/2]: ssl://57.130.18.242:9093/2: Disconnected: SSL connection closed by peer (after 50035ms in state UP, 1 identical error(s) suppressed)


Weather rows to insert: 200



Uploading Dataframe: 100.00% |██████████| Rows 200/200 | Elapsed Time: 00:00 | Remaining Time: 00:00


Launching job: weather_hourly_fg_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://eu-west.cloud.hopsworks.ai:443/p/3209/jobs/named/weather_hourly_fg_1_offline_fg_materialization/executions
Upserted weather hour.


%6|1768059573.849|FAIL|rdkafka#producer-7| [thrd:ssl://57.130.28.56:9093/0]: ssl://57.130.28.56:9093/0: Disconnected: SSL connection closed by peer (after 50039ms in state UP, 1 identical error(s) suppressed)


## 8) Update hourly TfL disruption features (optional)

In [None]:
import os
import numpy as np
import pandas as pd
import requests
from typing import Optional

def haversine_km(lat1, lon1, lat2, lon2):
    R = 6371.0
    lat1 = np.radians(lat1); lon1 = np.radians(lon1)
    lat2 = np.radians(lat2); lon2 = np.radians(lon2)
    dlat = lat2 - lat1
    dlon = lon2 - lon1
    a = np.sin(dlat/2)**2 + np.cos(lat1)*np.cos(lat2)*np.sin(dlon/2)**2
    c = 2*np.arctan2(np.sqrt(a), np.sqrt(1-a))
    return R*c

def extract_point_latlon(point_obj) -> Optional[tuple]:
    if not isinstance(point_obj, dict):
        return None
    lat = point_obj.get("lat", None)
    lon = point_obj.get("lon", None)
    if lat is None or lon is None:
        lat = point_obj.get("latitude", None)
        lon = point_obj.get("longitude", None)
    try:
        if lat is None or lon is None:
            return None
        return (float(lat), float(lon))
    except Exception:
        return None

def fetch_tfl_active_road_disruptions() -> pd.DataFrame:
    # Stable: returns current/active road disruptions
    url = "https://api.tfl.gov.uk/Road/all/Disruption"
    params = {
        "stripContent": True,  # lighter payload
        "closures": True,      # include closures if available
    }
    if TFL_APP_ID:
        params["app_id"] = TFL_APP_ID
    if TFL_APP_KEY:
        params["app_key"] = TFL_APP_KEY

    r = requests.get(url, params=params, timeout=60)
    if r.status_code >= 400:
        raise RuntimeError(f"TfL {r.status_code}: {r.text[:500]}")
    r.raise_for_status()
    raw = r.json()
    return pd.DataFrame(raw if isinstance(raw, list) else [])

def to_int64_nullable(s: pd.Series) -> pd.Series:
    return pd.to_numeric(s, errors="coerce").round().astype("Int64")

RADIUS_KM = float(os.getenv("TFL_RADIUS_KM", "0.5"))

if UPDATE_TFL:
    if not TFL_APP_KEY and not TFL_APP_ID:
        print("Skipping TfL update because TFL_APP_KEY/TFL_APP_ID are not set.")
    else:
        tfl_fg = fs.get_feature_group(name=TFL_FG_NAME, version=TFL_FG_VERSION)

        # IMPORTANT: create a row for every point for this hour (no need to read FG)
        pts = points.copy()
        pts["point_id"] = pts["point_id"].astype(str)
        pts = pts.dropna(subset=["latitude", "longitude"]).reset_index(drop=True)

        # (optional) limit to your working 50 points
        # pts = pts.head(50).reset_index(drop=True)

        # Fetch current disruptions
        raw_df = fetch_tfl_active_road_disruptions()

        # Define hour window
        hour_start = pd.to_datetime(tfl_time_utc, utc=True)
        hour_end = hour_start + pd.Timedelta(hours=1)

        # If nothing returned, just write zeros for all points
        if raw_df.empty:
            out = pd.DataFrame({
                "point_id": pts["point_id"],
                "tfl_time_utc": hour_start,
                "disruption_count": 0,
                "is_works": 0,
                "is_incident": 0,
                "is_active": 0,
                "max_ordinal": 0,
            })
        else:
            # Parse disruption times (if present)
            raw_df["start"] = pd.to_datetime(raw_df.get("startDateTime"), utc=True, errors="coerce")
            raw_df["end"] = pd.to_datetime(raw_df.get("endDateTime"), utc=True, errors="coerce")
            raw_df["end_filled"] = raw_df["end"]
            raw_df.loc[raw_df["end_filled"].isna(), "end_filled"] = raw_df["start"] + pd.Timedelta(hours=1)

            # Keep only disruptions overlapping the hour (if start is missing, keep it as "maybe")
            raw_df = raw_df[
                (raw_df["start"].isna()) | ((raw_df["start"] < hour_end) & (raw_df["end_filled"] >= hour_start))
            ].copy()

            # Extract disruption point coords
            coords = raw_df.get("point", pd.Series([None]*len(raw_df))).apply(extract_point_latlon)
            raw_df["d_lat"] = coords.apply(lambda x: x[0] if x else np.nan)
            raw_df["d_lon"] = coords.apply(lambda x: x[1] if x else np.nan)
            raw_df = raw_df.dropna(subset=["d_lat", "d_lon"]).reset_index(drop=True)

            if raw_df.empty:
                out = pd.DataFrame({
                    "point_id": pts["point_id"],
                    "tfl_time_utc": hour_start,
                    "disruption_count": 0,
                    "is_works": 0,
                    "is_incident": 0,
                    "is_active": 0,
                    "max_ordinal": 0,
                })
            else:
                # Cross join disruptions x points (small: ~50)
                cross = pts.rename(columns={"latitude": "p_lat", "longitude": "p_lon"}).copy()
                cross["key"] = 1
                raw_df["key"] = 1
                cross = cross.merge(raw_df, on="key").drop(columns=["key"])

                # Distance filter
                cross["dist_km"] = haversine_km(cross["p_lat"], cross["p_lon"], cross["d_lat"], cross["d_lon"])
                cross = cross[cross["dist_km"] <= RADIUS_KM].copy()

                if cross.empty:
                    out = pd.DataFrame({
                        "point_id": pts["point_id"],
                        "tfl_time_utc": hour_start,
                        "disruption_count": 0,
                        "is_works": 0,
                        "is_incident": 0,
                        "is_active": 0,
                        "max_ordinal": 0,
                    })
                else:
                    cross["disruption_count"] = 1
                    cross["is_works"] = (cross.get("category").astype(str) == "Works").astype(int)
                    cross["is_incident"] = (cross.get("category").astype(str) == "Incident").astype(int)
                    cross["is_active"] = (cross.get("status").astype(str).str.lower() == "active").astype(int)
                    cross["max_ordinal"] = pd.to_numeric(cross.get("ordinal"), errors="coerce").fillna(0)

                    out = (
                        cross.groupby("point_id", as_index=False)
                        .agg(
                            disruption_count=("disruption_count", "sum"),
                            is_works=("is_works", "max"),
                            is_incident=("is_incident", "max"),
                            is_active=("is_active", "max"),
                            max_ordinal=("max_ordinal", "max"),
                        )
                    )
                    out["tfl_time_utc"] = hour_start

                    # Ensure all points exist (fill zeros)
                    out = out.merge(pd.DataFrame({"point_id": pts["point_id"]}), on="point_id", how="right")
                    out = out.fillna(0)

        # Align to schema
        schema_cols = [f.name for f in tfl_fg.features]
        for c in schema_cols:
            if c not in out.columns:
                out[c] = np.nan
        out = out[schema_cols]

        print("TfL rows to upsert:", len(out))
        tfl_fg.insert(out, write_options={"upsert": True})
        print("Upserted TfL hour.")
else:
    print("Skipping TfL update (UPDATE_TFL=0).")


TfL rows to upsert: 200


Uploading Dataframe: 100.00% |██████████| Rows 200/200 | Elapsed Time: 00:00 | Remaining Time: 00:00


Launching job: tfl_disruptions_hourly_fg_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://eu-west.cloud.hopsworks.ai:443/p/3209/jobs/named/tfl_disruptions_hourly_fg_1_offline_fg_materialization/executions
Upserted TfL hour.


%6|1768060404.812|FAIL|rdkafka#producer-13| [thrd:ssl://57.130.18.242:9093/2]: ssl://57.130.18.242:9093/2: Disconnected: SSL connection closed by peer (after 50142ms in state UP)
%6|1768060456.010|FAIL|rdkafka#producer-13| [thrd:ssl://57.130.28.56:9093/0]: ssl://57.130.28.56:9093/0: Disconnected: SSL connection closed by peer (after 50149ms in state UP)
%6|1768060507.045|FAIL|rdkafka#producer-13| [thrd:ssl://57.130.19.97:9093/1]: ssl://57.130.19.97:9093/1: Disconnected: SSL connection closed by peer (after 50040ms in state UP)
%6|1768060557.274|FAIL|rdkafka#producer-13| [thrd:ssl://57.130.19.97:9093/1]: ssl://57.130.19.97:9093/1: Disconnected: SSL connection closed by peer (after 50073ms in state UP, 1 identical error(s) suppressed)
%6|1768060607.506|FAIL|rdkafka#producer-13| [thrd:ssl://57.130.18.242:9093/2]: ssl://57.130.18.242:9093/2: Disconnected: SSL connection closed by peer (after 50079ms in state UP, 1 identical error(s) suppressed)
%6|1768060703.531|FAIL|rdkafka#producer-13| [

## 9) Done