# LightGBM – Single Multi-Horizon Model

## Objective
Forecast hourly CO₂ emission factor (kg CO₂/kWh) up to 168 hours ahead using a single LightGBM regression model.

## Data Sources
- NED API: CO₂ emission factor and energy generation signals
- Weather data (Open-Meteo): used as exogenous features

## Feature Engineering
- Lag features of target variable
- Calendar features (hour, weekday, etc.)
- Selected weather covariates
- Only features available at prediction time are used

## Train / Validation / Test Split
- Strict time-based split
- No random shuffling
- No data leakage from future timestamps

## Evaluation Metrics
- MAE
- RMSE
- MAPE
- MAE segmented by forecast horizon (1–24h, 25–72h, 73–168h)

## Reproducibility Notes
This notebook can be executed end-to-end after installing dependencies listed in `requirements.txt`.

In [None]:
%pip install lightgbm
#%pip install pandas numpy matplotlib lightgbm scikit-learn requests
import requests
import os
import pandas as pd
import numpy as np
from datetime import datetime, timedelta, timezone
import time
import matplotlib.pyplot as plt
from pandas.plotting import autocorrelation_plot
from email.utils import parsedate_to_datetime
from dotenv import load_dotenv

load_dotenv()

URL = "https://api.ned.nl/v1/utilizations"
API_KEY = os.environ.get("NED_API_KEY")
if not API_KEY:
    raise RuntimeError("NED_API_KEY is not set. Put it into .env or set env var.")


headers = {
    "X-AUTH-TOKEN": API_KEY,
    "accept": "application/ld+json",
}

TYPE_MAP = {
    "co2-factor": 27,     # ElectricityMix
    "solar-energy": 2,    # Solar
    "wind-energy": 1,     # Wind (onshore)
    "biomass-power": 25,
    "waste-power": 21,
    "other-power": 26,
    "fossil-gas-power": 18,
    "fossil-coal": 19,
    "nuclear": 20,
    "wind-offshore": 51,
}

START_DATE = "2023-01-01T00:00:00Z"
END_DATE_STRICTLY_BEFORE = "2026-01-01T00:00:00Z"

ALERTS = {
    # If the fraction of rows with missing values in any column per day > threshold → trigger alert
    "max_missing_rows_frac_per_day": 0.01,     # 1%
    # # If the fraction of imputed (filled) values in the key column per day > threshold → trigger alert
    "max_imputed_frac_per_day_target": 0.01,   # 1%
    # If the maximum consecutive gap in the target > N hours → trigger alert
    "max_consecutive_missing_hours_target": 6,
}

FILL_LIMITS = {
    "co2_emissionfactor_interp_hours": 6,  # fill only short gaps
    "wind_interp_hours": 3,               # short gaps in wind
}

def fetch_all(url, headers, params, sleep_s=0.2, max_retries=8, max_wait_s=300):
    def retry_after_seconds(retry_after: str | None, attempt: int) -> int:
        if not retry_after:
            return min(60, 2 ** attempt)

        ra = retry_after.strip()

        # 1) delta-seconds
        if ra.isdigit():
            return min(max_wait_s, max(0, int(ra)))

        # 2) ISO-8601 datetime
        try:
            dt = datetime.fromisoformat(ra)
            if dt.tzinfo is None:
                dt = dt.replace(tzinfo=timezone.utc)
            now = datetime.now(timezone.utc)
            wait = int((dt - now).total_seconds())
            return min(max_wait_s, max(0, wait))
        except Exception:
            pass

        # 3) HTTP-date
        try:
            dt = parsedate_to_datetime(ra)
            if dt.tzinfo is None:
                dt = dt.replace(tzinfo=timezone.utc)
            now = datetime.now(timezone.utc)
            wait = int((dt - now).total_seconds())
            return min(max_wait_s, max(0, wait))
        except Exception:
            pass

        # 4) fallback exponential backoff
        return min(60, 2 ** attempt)
    
    all_records = []
    next_url = url
    next_params = params

    while True:
        r = None
        for attempt in range(max_retries):
            r = requests.get(next_url, headers=headers, params=next_params, timeout=60)

            if r.status_code == 429:
                retry_after = r.headers.get("Retry-After")
                wait_s = retry_after_seconds(retry_after, attempt)
                time.sleep(wait_s)
                continue

            if 500 <= r.status_code < 600:
                time.sleep(min(60, 2 ** attempt))
                continue

            break
        
        if r.status_code != 200:
            raise RuntimeError(f"HTTP {r.status_code}: {r.text[:500]}")
            
        if r is None:
            raise RuntimeError("Request failed: no response object")


        data = r.json()
        all_records.extend(data.get("hydra:member", []))

        view = data.get("hydra:view") or {}
        hydra_next = view.get("hydra:next")
        if not hydra_next:
            break

        next_url = "https://api.ned.nl" + hydra_next
        next_params = None
        time.sleep(sleep_s)

    return all_records

def build_df(records):
    df = pd.DataFrame(records)
    if df.empty:
        return df
    df["validfrom"] = pd.to_datetime(df["validfrom"], utc=True, errors="coerce")
    df["validto"]   = pd.to_datetime(df["validto"], utc=True, errors="coerce")
    cols = [c for c in ["validfrom","validto","capacity","volume","percentage","emission","emissionfactor"] if c in df.columns]
    return df[cols].sort_values("validfrom").reset_index(drop=True)


def download_dataset(name, start_date, end_date, classification=2, point=0, activity=1):
    type_id = TYPE_MAP[name]
    params = {
        "point": point,
        "type": type_id,
        "granularity": 5,           # Hour
        "granularitytimezone": 1,   # CET
        "activity": activity,              # Providing
        "classification": classification,
        "validfrom[after]": start_date,
        "validfrom[strictly_before]": end_date,
    }
    records = fetch_all(URL, headers, params, sleep_s=0.2)
    df = build_df(records)
    return df


def to_hourly(df, value_cols, prefix):
    ts = df.set_index("validfrom").sort_index().asfreq("1H")
    keep = [c for c in value_cols if c in ts.columns]
    return ts[keep].add_prefix(prefix)

def make_master_index(start_date, end_date_strictly_before):
    start_cet = pd.Timestamp(start_date, tz="Europe/Amsterdam")
    end_cet = pd.Timestamp(end_date_strictly_before, tz="Europe/Amsterdam")

    idx_cet = pd.date_range(start=start_cet, end=end_cet - pd.Timedelta(hours=1), freq="1H")
    return idx_cet.tz_convert("UTC")


def align_to_master(ts, master_index, name):
    ts = ts.sort_index()
    aligned = ts.reindex(master_index)
    missing_any = aligned.isna().any(axis=1).sum()
    print(f"{name}: rows={len(aligned)} missing_rows_anycol={missing_any}")
    return aligned


def longest_nan_streak(s: pd.Series) -> int:
    is_nan = s.isna().to_numpy()
    if is_nan.size == 0:
        return 0
    max_streak = 0
    cur = 0
    for v in is_nan:
        if v:
            cur += 1
            if cur > max_streak:
                max_streak = cur
        else:
            cur = 0
    return max_streak


def daily_missing_row_fraction(df: pd.DataFrame) -> pd.Series:
    # fraction of rows per day where ANY col is NaN
    miss_any = df.isna().any(axis=1).astype("int8")
    return miss_any.resample("D").mean()

def daily_imputed_fraction(flag_series: pd.Series) -> pd.Series:
    # flag_series: 1 if was missing, else 0 (on hourly index)
    return flag_series.resample("D").mean()


def alert_if_bad(df_before_fill: pd.DataFrame, df_after_fill: pd.DataFrame, alerts=ALERTS):
    # 1) daily missing rows before fill (signals upstream gaps)
    daily_miss_frac = daily_missing_row_fraction(df_before_fill)
    worst_day = daily_miss_frac.max()
    if worst_day > alerts["max_missing_rows_frac_per_day"]:
        bad_days = daily_miss_frac[daily_miss_frac > alerts["max_missing_rows_frac_per_day"]]
        print("\n[ALERT] Too many missing rows (any column) BEFORE fill.")
        print("Worst daily missing-row fraction:", float(worst_day))
        print("Days above threshold:\n", bad_days.head(10))
        
    # 2) target imputation ratio per day (how much we “patched”)
    flag_col = "co2_emissionfactor_was_missing"
    if flag_col in df_after_fill.columns:
        daily_imp = daily_imputed_fraction(df_after_fill[flag_col])
        worst_imp = daily_imp.max()
        if worst_imp > alerts["max_imputed_frac_per_day_target"]:
            bad_days = daily_imp[daily_imp > alerts["max_imputed_frac_per_day_target"]]
            print("\n[ALERT] Too much target imputation (co2_emissionfactor).")
            print("Worst daily imputed fraction:", float(worst_imp))
            print("Days above threshold:\n", bad_days.head(10))
        # 3) longest missing streak in target BEFORE fill
    if "co2_emissionfactor" in df_before_fill.columns:
        streak = longest_nan_streak(df_before_fill["co2_emissionfactor"])
        if streak > alerts["max_consecutive_missing_hours_target"]:
            print("\n[ALERT] Long missing streak in target BEFORE fill.")
            print("Longest NaN streak (hours):", streak)
                    
def qa_summary(ts: pd.DataFrame, name: str):
    print(f"\n=== QA: {name} ===")
    print("Rows:", len(ts))
    print("Start:", ts.index.min(), "End:", ts.index.max())
    print("Duplicate timestamps:", int(ts.index.duplicated().sum()))
    print("Missing rows after asfreq:", int(ts.asfreq("1H").isna().any(axis=1).sum()))
    
    
def qa_numeric(ts: pd.DataFrame, cols: list[str], name: str):
    for c in cols:
        if c not in ts.columns:
            continue
        s = ts[c]
        print(f"\n{name}.{c}:")
        print(s.describe())
        print("Zero fraction:", round((s.fillna(0) == 0).mean(), 4), "NaN fraction:", round(s.isna().mean(), 4))
        

#def controlled_fill(df: pd.DataFrame, fill_limits=FILL_LIMITS) -> pd.DataFrame:
def controlled_fill_save(df):
    df = df.copy()

    # keep missingness as explicit features (useful + auditable)
    for c in df.columns:
        df[c + "_was_missing"] = df[c].isna().astype("int8")

    if "co2_emissionfactor" in df.columns:
        df["co2_emissionfactor"] = df["co2_emissionfactor"].interpolate(
            method="time",
            limit=6,
            limit_direction="forward",   # <-- important
        )


    # Solar: fill NaN with 0 (domain assumption: missing == no generation)
    for c in ["solar_capacity", "solar_volume"]:
        if c in df.columns:
            df[c] = df[c].fillna(0)

    # Wind: only short gaps
    for c in ["wind_capacity", "wind_volume"]:
        if c in df.columns:
            df[c] = df[c].interpolate(
                method="time",
                limit=3,
                limit_direction="forward",
            )

    return df

        
def main():
    master_idx = make_master_index(START_DATE, END_DATE_STRICTLY_BEFORE)

    # Download raw
    df_co2   = download_dataset("co2-factor", START_DATE, END_DATE_STRICTLY_BEFORE, classification=2)
    df_solar = download_dataset("solar-energy", START_DATE, END_DATE_STRICTLY_BEFORE, classification=2)
    df_wind  = download_dataset("wind-energy", START_DATE, END_DATE_STRICTLY_BEFORE, classification=2)
    df_biomass = download_dataset("biomass-power", START_DATE, END_DATE_STRICTLY_BEFORE)
    df_wastep  = download_dataset("waste-power", START_DATE, END_DATE_STRICTLY_BEFORE)
    df_gasp    = download_dataset("fossil-gas-power", START_DATE, END_DATE_STRICTLY_BEFORE)
    df_coal    = download_dataset("fossil-coal", START_DATE, END_DATE_STRICTLY_BEFORE)
    df_nucl    = download_dataset("nuclear", START_DATE, END_DATE_STRICTLY_BEFORE)
    df_offwind = download_dataset("wind-offshore", START_DATE, END_DATE_STRICTLY_BEFORE)

# imports total (point=0) : type=27, activity=3
#    df_import_total = download_dataset("cross-border-electricity", START_DATE, END_DATE_STRICTLY_BEFORE, point=0, activity=3)

    # Hourly + prefixes
    ts_co2   = to_hourly(df_co2,   ["emissionfactor", "emission", "volume"], "co2_")
    ts_solar = to_hourly(df_solar, ["capacity", "volume"], "solar_")
    ts_wind  = to_hourly(df_wind,  ["capacity", "volume"], "wind_")
    ts_biomass = to_hourly(df_biomass, ["capacity", "volume"], "biomassp_")
    ts_wastep  = to_hourly(df_wastep,  ["capacity", "volume"], "wastep_")
    ts_gasp    = to_hourly(df_gasp,    ["capacity", "volume"], "gasp_")
    ts_coal    = to_hourly(df_coal,    ["capacity", "volume"], "coal_")
    ts_nucl    = to_hourly(df_nucl,    ["capacity", "volume"], "nucl_")
    ts_offwind = to_hourly(df_offwind, ["capacity", "volume"], "offwind_")

# Cross-border data may not have capacity/volume structured the same way as generation; check the columns
# ts_import  = to_hourly(df_import_total, ["capacity", "volume"], "import_")

    # QA before alignment (optional)
    qa_summary(ts_co2, "ts_co2")
    qa_summary(ts_solar, "ts_solar")
    qa_summary(ts_wind, "ts_wind")

    # Align to master (prevents silent coverage drift)
    co2_aligned   = align_to_master(ts_co2, master_idx, "co2")
    solar_aligned = align_to_master(ts_solar, master_idx, "solar")
    wind_aligned  = align_to_master(ts_wind, master_idx, "wind")
    biomass_aligned = align_to_master(ts_biomass, master_idx, "biomassp")
    wastep_aligned  = align_to_master(ts_wastep,  master_idx, "wastep")
    gasp_aligned    = align_to_master(ts_gasp,    master_idx, "gasp")
    coal_aligned    = align_to_master(ts_coal,    master_idx, "coal")
    nucl_aligned    = align_to_master(ts_nucl,    master_idx, "nucl")
    offwind_aligned = align_to_master(ts_offwind, master_idx, "offwind")
#    import_aligned  = align_to_master(ts_import,  master_idx, "import")

    # Outer join via concat
    df_outer = pd.concat(
    [co2_aligned, solar_aligned, wind_aligned,
     biomass_aligned, wastep_aligned,
     gasp_aligned, coal_aligned, nucl_aligned, offwind_aligned], axis=1
)
 
    print("\nBefore fill:")
    print("Rows:", len(df_outer))
    print("Missing rows (any col):", int(df_outer.isna().any(axis=1).sum()))
    
    train_end = "2025-01-01T00:00:00Z"
    test_end  = "2026-01-01T00:00:00Z"

    raw_train = df_outer.loc[df_outer.index < train_end].copy()
    raw_test  = df_outer.loc[(df_outer.index >= train_end) & (df_outer.index < test_end)].copy()

    train_filled = controlled_fill_save(raw_train)
    test_filled  = controlled_fill_save(raw_test)

    df_filled = pd.concat([train_filled, test_filled]).sort_index()
    
    # Alerts (too many missing/imputed)
    alert_if_bad(df_outer, df_filled, alerts=ALERTS)
    
    df_filled["hour"] = df_filled.index.hour
    df_filled["dow"] = df_filled.index.dayofweek

    df_filled["sin_hour"] = np.sin(2 * np.pi * df_filled["hour"] / 24)
    df_filled["cos_hour"] = np.cos(2 * np.pi * df_filled["hour"] / 24)

    df_filled["sin_dow"] = np.sin(2 * np.pi * df_filled["dow"] / 7)
    df_filled["cos_dow"] = np.cos(2 * np.pi * df_filled["dow"] / 7)

    # Numeric QA (key columns)
    qa_numeric(df_filled, ["co2_emissionfactor", "solar_volume", "wind_volume"], "df_filled")

    # Consistency check: emissionfactor ~ emission/volume (where available)
    if all(c in df_filled.columns for c in ["co2_emissionfactor", "co2_emission", "co2_volume"]):
        ratio = df_filled["co2_emission"] / df_filled["co2_volume"]
        diff = (df_filled["co2_emissionfactor"] - ratio).abs()
        print("\nConsistency |co2_emissionfactor - co2_emission/co2_volume|:")
        print(diff.describe())

    # Save
    out_csv = "ned_hourly_outerjoin_filled_2023_2025.csv"
    df_filled.to_csv(out_csv, index=True)
    print("\nSaved:", out_csv)

    return df_outer, df_filled

if __name__ == "__main__":
    df_outer, df_filled = main()        
    


=== QA: ts_co2 ===
Rows: 26304
Start: 2022-12-31 23:00:00+00:00 End: 2025-12-31 22:00:00+00:00
Duplicate timestamps: 0
Missing rows after asfreq: 0

=== QA: ts_solar ===
Rows: 26304
Start: 2022-12-31 23:00:00+00:00 End: 2025-12-31 22:00:00+00:00
Duplicate timestamps: 0
Missing rows after asfreq: 0

=== QA: ts_wind ===
Rows: 26304
Start: 2022-12-31 23:00:00+00:00 End: 2025-12-31 22:00:00+00:00
Duplicate timestamps: 0
Missing rows after asfreq: 0
co2: rows=26304 missing_rows_anycol=0
solar: rows=26304 missing_rows_anycol=0
wind: rows=26304 missing_rows_anycol=0
biomassp: rows=26304 missing_rows_anycol=0
wastep: rows=26304 missing_rows_anycol=0
gasp: rows=26304 missing_rows_anycol=0
coal: rows=26304 missing_rows_anycol=0
nucl: rows=26304 missing_rows_anycol=0
offwind: rows=26304 missing_rows_anycol=0

Before fill:
Rows: 26304
Missing rows (any col): 0

df_filled.co2_emissionfactor:
count    26304.000000
mean         0.209709
std          0.108145
min          0.019279
25%          0.1136

In [None]:
import numpy as np
import pandas as pd
import lightgbm as lgb
from sklearn.metrics import mean_absolute_error

# =========================
# CONFIG (CET/CEST ONLY)
# =========================
TARGET = "co2_emissionfactor"
EXOG = ["solar_volume", "wind_volume", "gasp_volume"]
LAGS_Y = [1, 24, 168, 336]
LAGS_X = [1, 24, 168, 336]
HORIZON = 168

TZ = "UTC"
train_end = pd.Timestamp("2025-01-01T00:00:00Z", tz=TZ)
test_end  = pd.Timestamp("2026-01-01T00:00:00Z", tz=TZ)

# =========================
# 0) Ensure UTC index
# =========================
df_filled = df_filled.copy()
if df_filled.index.tz is None:
    df_filled.index = df_filled.index.tz_localize(TZ)
else:
    df_filled.index = df_filled.index.tz_convert(TZ)

# =========================
# 1) Features (lagged exog only)
# =========================
def add_lags(df, cols, lags):
    out = df.copy()
    for c in cols:
        for l in lags:
            out[f"{c}_lag{l}"] = out[c].shift(l)
    return out

df_feat = df_filled.copy()
df_feat = add_lags(df_feat, [TARGET], LAGS_Y)
df_feat = add_lags(df_feat, EXOG, LAGS_X)

for w in [24, 168, 336]:
    df_feat[f"{TARGET}_roll_mean_{w}"] = df_feat[TARGET].rolling(w).mean().shift(1)
    df_feat[f"{TARGET}_roll_std_{w}"]  = df_feat[TARGET].rolling(w).std().shift(1)

base_time_cols = ["sin_hour","cos_hour","sin_dow","cos_dow"]
lag_cols = [c for c in df_feat.columns if ("_lag" in c) or ("roll_" in c)]
X_cols = base_time_cols + lag_cols

df_feat = df_feat[X_cols + [TARGET]].dropna()
print("df_feat:", df_feat.shape, "tz:", df_feat.index.tz)

# Base matrices (float32)
X_base = df_feat[X_cols].to_numpy(dtype=np.float32)
y_base = df_feat[TARGET].to_numpy(dtype=np.float32)
idx = df_feat.index  # tz-aware

# Helper: add horizon features to numpy matrix
def with_h_features(X, h):
    h_val = np.float32(h)
    h_sin = np.float32(np.sin(2*np.pi*h/168))
    h_cos = np.float32(np.cos(2*np.pi*h/168))
    # append 3 columns
    h_block = np.full((X.shape[0], 3), [h_val, h_sin, h_cos], dtype=np.float32)
    return np.hstack([X, h_block])

feature_names = X_cols + ["h", "h_sin", "h_cos"]

# =========================
# 2) Build TRAIN only (no leakage via target_time)
# =========================
X_train_parts = []
y_train_parts = []

for h in range(1, HORIZON+1):
    # y(t+h)
    y = np.roll(y_base, -h)
    # mark invalid tail where rolled values wrap
    y[-h:] = np.nan

    target_time = idx + pd.to_timedelta(h, unit="h")
    ok = ~np.isnan(y)

    train_mask = ok & (target_time < train_end)
    if not np.any(train_mask):
        continue

    Xh = with_h_features(X_base[train_mask], h)
    yh = y[train_mask].astype(np.float32)

    X_train_parts.append(Xh)
    y_train_parts.append(yh)

X_train = np.vstack(X_train_parts)
y_train = np.concatenate(y_train_parts)

print("X_train:", X_train.shape, "y_train:", y_train.shape)

# =========================
# 3) Train LightGBM
# =========================
model = lgb.LGBMRegressor(
    n_estimators=4000,
    learning_rate=0.03,
    num_leaves=256,
    random_state=42,
    min_data_in_leaf=30,
    feature_fraction=0.9,
    bagging_fraction=0.9,
    lambda_l2=3.0,
    objective="huber",
    force_col_wise=True,
)

model.fit(X_train, y_train, feature_name=feature_names)

# =========================
# 4) Evaluate TEST per horizon (NO concat anywhere)
# =========================
mae_by_h = {}
total_abs = 0.0
total_n = 0

for h in range(1, HORIZON+1):
    y = np.roll(y_base, -h)
    y[-h:] = np.nan

    target_time = idx + pd.to_timedelta(h, unit="h")
    ok = ~np.isnan(y)

    test_mask = ok & (idx >= train_end) & (target_time < test_end)
    if not np.any(test_mask):
        continue

    Xh_test = with_h_features(X_base[test_mask], h)
    yh_test = y[test_mask].astype(np.float32)

    pred = model.predict(Xh_test)
    mae = mean_absolute_error(yh_test, pred)

    mae_by_h[h] = mae
    total_abs += np.abs(yh_test - pred).sum()
    total_n += yh_test.shape[0]

global_mae = total_abs / total_n
print("Global MAE:", global_mae)
print("MAE mean:", float(np.mean(list(mae_by_h.values()))))
print("MAE h=1:", float(mae_by_h.get(1, np.nan)))
print("MAE h=24:", float(mae_by_h.get(24, np.nan)))
print("MAE h=168:", float(mae_by_h.get(168, np.nan)))

df_feat: (25968, 27) tz: Europe/Amsterdam
X_train: (2876748, 29) y_train: (2876748,)
[LightGBM] [Info] Total Bins 5994
[LightGBM] [Info] Number of data points in the train set: 2876748, number of used features: 29
[LightGBM] [Info] Start training from score 0.209559


















Global MAE: 0.0803485430711556
MAE mean: 0.08037049121800488
MAE h=1: 0.0348420275114212
MAE h=24: 0.07560674226296986
MAE h=168: 0.0812868234024418
