In [1]:
"""
co2_pilot_pipeline.py

Pilot pipeline for CO2 markets:
  - Load 5 carbon markets from Excel (Refinitiv exports).
  - Slice to 2022-01-01 ... 2024-12-31.
  - Compute price-only technical indicators.
  - Fetch macro panel from Yahoo Finance (indices, commodities, FX).
  - Correlation-based feature selection (top-k tech + macro).
  - 10-day rolling windows, 90% train / 10% test.
  - Models: naive baseline, XGBoost, Keras MLP, Keras GRU.

Designed as a proof of concept; you can later:
  - Swap macro tickers per region,
  - Beef up feature selection,
  - Plug into your existing logging / tuning infra.
"""

'\nco2_pilot_pipeline.py\n\nPilot pipeline for CO2 markets:\n  - Load 5 carbon markets from Excel (Refinitiv exports).\n  - Slice to 2022-01-01 ... 2024-12-31.\n  - Compute price-only technical indicators.\n  - Fetch macro panel from Yahoo Finance (indices, commodities, FX).\n  - Correlation-based feature selection (top-k tech + macro).\n  - 10-day rolling windows, 90% train / 10% test.\n  - Models: naive baseline, XGBoost, Keras MLP, Keras GRU.\n\nDesigned as a proof of concept; you can later:\n  - Swap macro tickers per region,\n  - Beef up feature selection,\n  - Plug into your existing logging / tuning infra.\n'

In [2]:
from pathlib import Path
import numpy as np
import pandas as pd

from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_percentage_error, r2_score
from xgboost import XGBRegressor

from tensorflow import keras
from tensorflow.keras import layers
import yfinance as yf

In [3]:
# -------------------------------------------------------------------
# CONFIG
# -------------------------------------------------------------------

CO2_FILES = {
    "Shanghai": "Shanghai Spot - 2020.8.1 - 2025.11.28.xlsx",
    "Beijing": "Beijing - 2020.8.1 - 2025.11.28.xlsx",
    "Japan": "Japan - 01-Oct-2023 - 28-Nov-2025.xlsx",
    "EUA_ICE": "EUA - 01-Sep-2020 - 24-Nov-2025.xlsx",
    "EUA_EEX": "EEX-EUSP4-SPOT - 01Jan2021 - 28Nov 2025.xlsx",
}

DATA_DIR = Path("data")

# project time slice
START_DATE = "2022-01-01"
END_DATE   = "2024-12-31"

# macro proxies (can refine per-region later)
MACRO_TICKERS = [
    # Global / US risk
    "^GSPC", "^IXIC",
    # China
    "000001.SS", "399001.SZ",
    # Japan
    "^N225",
    # Euro area (Euro Stoxx 50)
    "^STOXX50E",
    # Energy
    "CL=F", "BZ=F", "NG=F",
    # Metals
    "GC=F", "SI=F", "HG=F",
    # Dollar index
    "DX-Y.NYB",
]

In [None]:
# default Keras hyperparams (single config each, as requested)
DEFAULT_MLP_CONFIG = {
    "epochs": 30,
    "batch_size": 32,
    "learning_rate": 1e-3,
    "hidden_units": (64, 32),
    "dropout": 0.1,
}

DEFAULT_GRU_CONFIG = {
    "epochs": 30,
    "batch_size": 32,
    "learning_rate": 1e-3,
    "hidden_units": 32,
    "dropout": 0.1,
}

In [5]:
# -------------------------------------------------------------------
# TECHNICAL INDICATORS (price-only)
#   Reusing the style of your stock pipeline, but only using close/price.
# -------------------------------------------------------------------

def SMA(series: pd.Series, window: int = 20) -> pd.Series:
    return series.rolling(window=window, min_periods=window).mean()

def MACD(series: pd.Series, fast: int = 12, slow: int = 26, signal: int = 9):
    fast_ema = series.ewm(span=fast, adjust=False, min_periods=fast).mean()
    slow_ema = series.ewm(span=slow, adjust=False, min_periods=slow).mean()
    macd_line = fast_ema - slow_ema
    signal_line = macd_line.ewm(span=signal, adjust=False, min_periods=signal).mean()
    hist = macd_line - signal_line
    return macd_line, signal_line, hist

def RSI(series: pd.Series, window: int = 14) -> pd.Series:
    delta = series.diff()
    gains = delta.where(delta > 0, 0.0)
    losses = -delta.where(delta < 0, 0.0)
    avg_gain = gains.ewm(alpha=1/window, adjust=False, min_periods=window).mean()
    avg_loss = losses.ewm(alpha=1/window, adjust=False, min_periods=window).mean()
    rs = avg_gain / avg_loss
    return 100 - (100 / (1 + rs))

def BBANDS(series: pd.Series, window: int = 20, num_std: float = 2.0):
    mid = series.rolling(window=window, min_periods=window).mean()
    rolling_std = series.rolling(window=window, min_periods=window).std()
    upper = mid + num_std * rolling_std
    lower = mid - num_std * rolling_std
    return mid, upper, lower

In [6]:
def compute_technical_features(price: pd.Series) -> pd.DataFrame:
    """
    Price-only indicators that make sense for CO2 markets:
      - 1d / 5d returns, 20d volatility
      - Short/medium/long SMAs + slope/spread
      - 20d z-score (distance from local mean)
      - Momentum and ROC
      - RSI(14) for overbought/oversold
      - MACD (trend) + Bollinger bands
    """
    df = pd.DataFrame(index=price.index)

    ret_1d = price.pct_change()
    log_ret_1d = np.log(price).diff()

    df["ret_1d"] = ret_1d
    df["log_ret_1d"] = log_ret_1d
    df["ret_5d"] = price.pct_change(5)
    df["vol_20d"] = ret_1d.rolling(20).std()

    df["sma_5"] = SMA(price, 5)
    df["sma_20"] = SMA(price, 20)
    df["sma_60"] = SMA(price, 60)
    df["sma_5_20_diff"] = df["sma_5"] - df["sma_20"]

    roll_mean_20 = price.rolling(20).mean()
    roll_std_20 = price.rolling(20).std()
    df["zscore_20"] = (price - roll_mean_20) / roll_std_20

    df["momentum_10"] = price.diff(10)
    df["roc_10"] = price.pct_change(10)

    df["rsi_14"] = RSI(price, 14)

    macd_line, macd_signal, macd_hist = MACD(price, 12, 26, 9)
    df["macd_line"] = macd_line
    df["macd_signal"] = macd_signal
    df["macd_hist"] = macd_hist

    _, bb_up, bb_low = BBANDS(price, 20, 2.0)
    df["bb_upper_20_2"] = bb_up
    df["bb_lower_20_2"] = bb_low

    return df

In [7]:
# -------------------------------------------------------------------
# MACRO PANEL (Yahoo Finance)
#   Daily Adj Close, ffilled. Same logic as your stock pipeline.
# -------------------------------------------------------------------

def fetch_macro_panel(start_date: str, end_date: str) -> pd.DataFrame:
    print(f"[INFO] Fetching macro tickers from Yahoo: {MACRO_TICKERS}")
    data = yf.download(
        MACRO_TICKERS,
        start=start_date,
        end=end_date,
        auto_adjust=False,
        progress=False,
        group_by="ticker",
    )

    macro = pd.DataFrame()

    if isinstance(data.columns, pd.MultiIndex):
        for tkr in MACRO_TICKERS:
            if tkr not in data.columns.levels[0]:
                print(f"[WARN] Macro ticker {tkr} missing from download; skipping.")
                continue
            s = data[tkr]["Adj Close"]
            if s.isna().all():
                print(f"[WARN] Macro ticker {tkr} is all-NaN; dropping.")
                continue
            macro[tkr] = s
    else:
        # single-ticker edge case
        s = data.get("Adj Close")
        if s is not None and not s.isna().all():
            macro[MACRO_TICKERS[0]] = s

    macro.index.name = "date"
    macro = macro.sort_index().ffill()
    # just in case: drop any columns that are entirely NaN
    macro = macro.dropna(axis=1, how="all")

    print(f"[INFO] Macro panel shape: {macro.shape}")
    return macro

In [8]:
# -------------------------------------------------------------------
# CO2 LOADERS (Refinitiv Excel)
#   Handle slightly different layouts for each market.
# -------------------------------------------------------------------

def load_co2_series(market: str) -> pd.DataFrame:
    """
    Returns a DataFrame with:
      index = datetime (sorted ascending),
      column 'price' = closing/spot price.
    Sliced to [START_DATE, END_DATE] and drops non-positive/NaN prices.
    """
    path = DATA_DIR / CO2_FILES[market]
    if not path.exists():
        raise FileNotFoundError(f"{path} not found")

    print(f"[INFO] Loading {market} from {path}")

    if market == "Shanghai":
        # Row 0: 日期 | SAXSHEA (TRDPRC_1)
        # Row 1: NaN | 收盘价
        # Row 2+: 2025年11月26日 | 54.01, etc.
        raw = pd.read_excel(path, header=None)
        df = raw.iloc[2:, :2].copy()
        df.columns = ["date", "price"]

        # Chinese date format: 2025年11月26日
        df["date"] = pd.to_datetime(df["date"], format="%Y年%m月%d日", errors="coerce")
        df["price"] = pd.to_numeric(df["price"], errors="coerce")
        df = df.dropna(subset=["date", "price"])
        df = df.set_index("date").sort_index()

    elif market in {"Beijing", "Japan", "EUA_EEX"}:
        # Structure:
        # ...
        # row_k: '交易日期' | 'Trade Price'
        # rows k+1+: date | price
        raw = pd.read_excel(path, header=None)
        header_idx = raw.index[raw.iloc[:, 0] == "交易日期"][0]
        df = raw.iloc[header_idx + 1 :, :2].copy()
        df.columns = ["date", "price"]

    elif market == "EUA_ICE":
        # Structure:
        # row31: '交易日期' | '收盘' | '净值' | ...
        # rows 32+: full OHLC + extras
        raw = pd.read_excel(path, header=None)
        header_idx = raw.index[raw.iloc[:, 0] == "交易日期"][0]
        df = raw.iloc[header_idx + 1 :, :].copy()
        cols = raw.iloc[header_idx].tolist()
        df.columns = cols
        df = df.rename(columns={"交易日期": "date", "收盘": "price"})
        df = df[["date", "price"]]

    else:
        raise ValueError(f"Unknown market: {market}")

    # Parse & clean
    df["date"] = pd.to_datetime(df["date"], errors="coerce")
    df["price"] = pd.to_numeric(df["price"], errors="coerce")
    df = df.dropna(subset=["date", "price"])
    df = df.set_index("date").sort_index()

    # Project slice
    df = df.loc[START_DATE:END_DATE]

    # Skip missing / illiquid days: price <= 0 or NaNs already dropped
    df = df[df["price"] > 0]

    print(f"[INFO] {market} price series shape after slice: {df.shape}")
    return df

In [9]:
def select_top_by_corr(
    df: pd.DataFrame,
    target_col: str,
    candidate_cols,
    k: int = 5,
    min_abs_corr: float = 0.05,
):
    cols = [c for c in candidate_cols if c in df.columns]
    if not cols:
        return [], pd.Series(dtype=float)

    corr_full = df[cols + [target_col]].corr()[target_col].drop(target_col)
    corr_full = corr_full.dropna()
    if corr_full.empty:
        return [], corr_full

    # Apply threshold first
    corr_sorted = corr_full.abs().sort_values(ascending=False)
    corr_filtered = corr_sorted[corr_sorted >= min_abs_corr]

    # Fallback: if nothing passes the threshold, just take top-k by abs corr
    if corr_filtered.empty:
        corr_filtered = corr_sorted

    top = list(corr_filtered.head(k).index)
    return top, corr_full

In [10]:
# -------------------------------------------------------------------
# ROLLING WINDOWS: 10-day, 90% train / 10% test
#   Mirrors your sliding-window logic but in Keras/XGB.
# -------------------------------------------------------------------

def rolling_windows(
    df: pd.DataFrame,
    feature_cols,
    target_col: str,
    window_size: int = 10,
    test_ratio: float = 0.1,
):
    """
    Yield rolling windows:
      - window_size rows each
      - within each window: first 90% for train, last 10% for test
    """
    n = len(df)
    for start in range(0, n - window_size + 1):
        end = start + window_size
        win = df.iloc[start:end]
        X_all = win[feature_cols].values
        y_all = win[target_col].values
        split = int(window_size * (1 - test_ratio))
        if split < 1 or split >= window_size:
            continue
        X_tr, X_te = X_all[:split], X_all[split:]
        y_tr, y_te = y_all[:split], y_all[split:]
        yield win.index[0], win.index[-1], X_tr, y_tr, X_te, y_te


In [11]:
# -------------------------------------------------------------------
# MODELS
#   - Naive: last train y
#   - XGBoost (benchmark ML model)
#   - Keras MLP
#   - Keras GRU (seq_len=1 for now)
# -------------------------------------------------------------------

def build_mlp(
    input_dim: int,
    learning_rate: float = 1e-3,
    hidden_units = (64, 32),
    dropout: float = 0.1,
):
    model = keras.Sequential()
    model.add(layers.Input(shape=(input_dim,)))
    for h in hidden_units:
        model.add(layers.Dense(h, activation="relu"))
        if dropout > 0:
            model.add(layers.Dropout(dropout))
    model.add(layers.Dense(1))
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=learning_rate),
        loss="mse",
    )
    return model

def build_gru(
    input_dim: int,
    learning_rate: float = 1e-3,
    hidden_units: int = 32,
    dropout: float = 0.1,
):
    model = keras.Sequential()
    # seq_len=1 for now; later you can upgrade to true sequence inputs
    model.add(layers.Input(shape=(1, input_dim)))
    model.add(
        layers.GRU(
            hidden_units,
            activation="tanh",
            dropout=dropout,
            recurrent_dropout=0.0,
        )
    )
    model.add(layers.Dense(1))
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=learning_rate),
        loss="mse",
    )
    return model

In [12]:
# -------------------------------------------------------------------
# MAIN EVALUATION PER MARKET
# -------------------------------------------------------------------

def eval_models_for_market(
    market: str,
    macro_panel: pd.DataFrame,
    window_size: int = 10,
    test_ratio: float = 0.1,
    n_top_tech: int = 5,
    n_top_macro: int = 5,
    mlp_config=None,
    gru_config=None,
):
    """
    Full pipeline for a single market:
      1) Load CO2 prices (2022-2024).
      2) Compute technical indicators.
      3) Join macro panel (ffill on that market's dates).
      4) Define target y = next-day price.
      5) Select top-k tech + macro features by |corr(y)|.
      6) Run rolling windows with naive, XGB, MLP, GRU.
      7) Return metrics + feature list + correlations.
    """
    if mlp_config is None:
        mlp_config = DEFAULT_MLP_CONFIG
    if gru_config is None:
        gru_config = DEFAULT_GRU_CONFIG

    # 1) Load price
    price_df = load_co2_series(market)
    if price_df.empty:
        raise ValueError(f"No data for {market} in {START_DATE}-{END_DATE}")

    # 2) Technicals
    tech = compute_technical_features(price_df["price"])

    # 3) Macro join: align on this market's dates; we don't create new days
    macro = macro_panel.reindex(price_df.index).ffill()

    # Combine everything
    df = pd.concat([price_df, tech, macro], axis=1)

    # 4) Target: next-day price
    df["y"] = df["price"].shift(-1)
    df = df[df["y"].notna()]

    # Candidate feature sets
    tech_cols = [c for c in tech.columns if c in df.columns]
    macro_cols = [c for c in macro.columns if c in df.columns]

    # 5) Feature selection
    tech_keep, tech_corr = select_top_by_corr(
        df, "y", tech_cols, k=n_top_tech
    )
    macro_keep, macro_corr = select_top_by_corr(
        df, "y", macro_cols, k=n_top_macro
    )

    feature_cols = tech_keep + macro_keep

    if not feature_cols:
        print(f"[WARN] {market}: no features passed correlation filter; "
            f"using all tech + macro with non-NaN instead.")
        feature_cols = tech_cols + macro_cols

    df_model = df[feature_cols + ["y"]].dropna()

    print(f"[INFO] {market} using features:")
    print("       TECH:", tech_keep)
    print("       MACRO:", macro_keep)
    print(f"[INFO] {market} model dataset shape: {df_model.shape}")

    # Rolling evaluation
    y_true_all = {name: [] for name in ["naive", "xgb", "mlp", "gru"]}
    y_pred_all = {name: [] for name in ["naive", "xgb", "mlp", "gru"]}

    for start_date, end_date, X_tr, y_tr, X_te, y_te in rolling_windows(
        df_model, feature_cols, "y", window_size, test_ratio
    ):
        if len(y_tr) < 5 or len(y_te) == 0:
            continue

        # Scale per-window to avoid leakage
        sx = StandardScaler().fit(X_tr)
        sy = StandardScaler().fit(y_tr.reshape(-1, 1))

        X_tr_s = sx.transform(X_tr)
        X_te_s = sx.transform(X_te)
        y_tr_s = sy.transform(y_tr.reshape(-1, 1)).ravel()
        y_te_s = sy.transform(y_te.reshape(-1, 1)).ravel()

        # 1) Naive: last train y
        naive_pred_s = np.full_like(y_te_s, fill_value=y_tr_s[-1])

        # 2) XGBoost benchmark
        xgb = XGBRegressor(
            max_depth=3,
            n_estimators=100,
            learning_rate=0.05,
            subsample=0.8,
            colsample_bytree=0.8,
            objective="reg:squarederror",
            random_state=42,
        )
        xgb.fit(X_tr_s, y_tr_s)
        xgb_pred_s = xgb.predict(X_te_s)

        # 3) Keras MLP
        mlp = build_mlp(
            input_dim=X_tr_s.shape[1],
            learning_rate=mlp_config["learning_rate"],
            hidden_units=mlp_config["hidden_units"],
            dropout=mlp_config["dropout"],
        )
        mlp.fit(
            X_tr_s,
            y_tr_s,
            epochs=mlp_config["epochs"],
            batch_size=mlp_config["batch_size"],
            verbose=0,
        )
        mlp_pred_s = mlp.predict(X_te_s, verbose=0).ravel()

        # 4) Keras GRU (seq_len=1)
        X_tr_seq = X_tr_s[:, None, :]
        X_te_seq = X_te_s[:, None, :]
        gru = build_gru(
            input_dim=X_tr_s.shape[1],
            learning_rate=gru_config["learning_rate"],
            hidden_units=gru_config["hidden_units"],
            dropout=gru_config["dropout"],
        )
        gru.fit(
            X_tr_seq,
            y_tr_s,
            epochs=gru_config["epochs"],
            batch_size=gru_config["batch_size"],
            verbose=0,
        )
        gru_pred_s = gru.predict(X_te_seq, verbose=0).ravel()

        # Inverse-transform all predictions back to price scale
        for name, pred_s in [
            ("naive", naive_pred_s),
            ("xgb", xgb_pred_s),
            ("mlp", mlp_pred_s),
            ("gru", gru_pred_s),
        ]:
            y_pred = sy.inverse_transform(pred_s.reshape(-1, 1)).ravel()
            y_true = sy.inverse_transform(y_te_s.reshape(-1, 1)).ravel()
            y_true_all[name].extend(y_true.tolist())
            y_pred_all[name].extend(y_pred.tolist())

    # Aggregate metrics across all windows
    metrics = {}
    for name in y_true_all:
        if len(y_true_all[name]) == 0:
            continue
        yt = np.array(y_true_all[name])
        yp = np.array(y_pred_all[name])
        metrics[name] = {
            "RMSE": float(np.sqrt(mean_squared_error(yt, yp))),
            "MAPE": float(mean_absolute_percentage_error(yt, yp)),
            "R2": float(r2_score(yt, yp)),
            "n_points": int(len(yt)),
        }

    return {
        "market": market,
        "feature_cols": feature_cols,
        "tech_corr": tech_corr,
        "macro_corr": macro_corr,
        "metrics": metrics,
    }

In [13]:
# -------------------------------------------------------------------
# MAIN (for quick proof-of-concept run)
# -------------------------------------------------------------------

if __name__ == "__main__":
    # 1) Macro panel once
    macro_panel = fetch_macro_panel(START_DATE, END_DATE)

    # 2) Run over all markets
    all_results = {}
    for mkt in CO2_FILES:
        print("=" * 80)
        print(f"[RUN] {mkt}")
        try:
            res = eval_models_for_market(mkt, macro_panel)
            all_results[mkt] = res
            print(f"[RESULT] {mkt} metrics:")
            for model_name, m in res["metrics"].items():
                print(
                    f"  {model_name:6s} | "
                    f"RMSE={m['RMSE']:.4f}  "
                    f"MAPE={m['MAPE']:.4f}  "
                    f"R2={m['R2']:.4f}  "
                    f"(n={m['n_points']})"
                )
        except Exception as e:
            print(f"[ERROR] {mkt}: {e}")


[INFO] Fetching macro tickers from Yahoo: ['^GSPC', '^IXIC', '000001.SS', '399001.SZ', '^N225', '^STOXX50E', 'CL=F', 'BZ=F', 'NG=F', 'GC=F', 'SI=F', 'HG=F', 'DX-Y.NYB']
[INFO] Macro panel shape: (779, 13)
[RUN] Shanghai
[INFO] Loading Shanghai from data/Shanghai Spot - 2020.8.1 - 2025.11.28.xlsx
[ERROR] Shanghai: 'date'
[RUN] Beijing
[INFO] Loading Beijing from data/Beijing - 2020.8.1 - 2025.11.28.xlsx
[INFO] Beijing price series shape after slice: (193, 1)
[INFO] Beijing using features:
       TECH: ['sma_5', 'sma_20', 'sma_60', 'bb_lower_20_2', 'vol_20d']
       MACRO: ['SI=F', '^GSPC', 'GC=F', '^IXIC', '^STOXX50E']
[INFO] Beijing model dataset shape: (133, 11)
[RESULT] Beijing metrics:
  naive  | RMSE=10.5396  MAPE=0.0642  R2=0.0410  (n=124)
  xgb    | RMSE=9.1414  MAPE=0.0607  R2=0.2786  (n=124)
  mlp    | RMSE=10.9079  MAPE=0.0684  R2=-0.0272  (n=124)
  gru    | RMSE=8.3605  MAPE=0.0578  R2=0.3966  (n=124)
[RUN] Japan
[INFO] Loading Japan from data/Japan - 01-Oct-2023 - 28-Nov-202