# Les imports

In [1]:
# Data
import numpy as np
import pandas as pd


# Pipeline
from sklearn.impute import SimpleImputer
from sklearn.base import BaseEstimator, TransformerMixin, clone
from sklearn.compose import ColumnTransformer, make_column_selector
from sklearn.preprocessing import RobustScaler, OrdinalEncoder
from sklearn.pipeline import Pipeline

# Models
from sklearn.linear_model import LinearRegression, Lasso
from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestRegressor, AdaBoostRegressor, GradientBoostingRegressor
from sklearn.tree import DecisionTreeRegressor
from xgboost import XGBRegressor

# Validation
from sklearn.model_selection import TimeSeriesSplit, RandomizedSearchCV
from sklearn.metrics import mean_absolute_error, root_mean_squared_error, r2_score
from sklearn.inspection import permutation_importance
from sklearn.feature_selection import SelectPercentile, mutual_info_regression, SelectFromModel
from scipy import stats
import statsmodels.api as sm

# Others
import joblib
from datetime import datetime
import os
import yfinance as yf
from bs4 import BeautifulSoup
import requests
import re

# Import data

In [None]:
def download_and_concat_tickers(tickers, start_date=None, end_date=None, interval='1d'):
    """Download data of provided tickers for the specified interval"""
    if start_date:
        df = yf.download(tickers, start=start_date, end=end_date, interval=interval)
    else:
        df = yf.download(tickers, period="max", interval=interval)

    df.columns = [f"{ticker}_{field}" for field, ticker in df.columns]

    return df

tickers = [
    "^GSPC", "^DJI", "^VIX", "^GVZ", "^OVX", "^MOVE", "BOND", "^STOXX",
    "EURUSD=X", "DX-Y.NYB", "CL=F", "BZ=F", "SI=F", "PL=F", "BTC-USD", "JPM",
    "PA=F", "^TNX", "GC=F", "GDX", "EGO", "USO", "GD=F",
]

result_df = download_and_concat_tickers(tickers, start_date="2000-08-30", end_date="2025-12-11")
result_df.tail()

  df = yf.download(tickers, start=start_date, end=end_date, interval=interval)
[*********************100%***********************]  23 of 23 completed


Unnamed: 0_level_0,BOND_Close,BTC-USD_Close,BZ=F_Close,CL=F_Close,DX-Y.NYB_Close,EGO_Close,EURUSD=X_Close,GC=F_Close,GD=F_Close,GDX_Close,...,SI=F_Volume,USO_Volume,^DJI_Volume,^GSPC_Volume,^GVZ_Volume,^MOVE_Volume,^OVX_Volume,^STOXX_Volume,^TNX_Volume,^VIX_Volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2025-12-06,,89272.375,,,,,,,,,...,,,,,,,,,,
2025-12-07,,90405.640625,,,,,,,,,...,,,,,,,,,,
2025-12-08,92.959999,90640.203125,62.490002,58.880001,99.089996,30.870001,1.164022,4187.200195,556.349976,79.160004,...,518.0,3073400.0,505490000.0,4757130000.0,0.0,0.0,0.0,159308700.0,0.0,0.0
2025-12-09,92.879997,92691.710938,61.939999,58.25,99.220001,32.360001,1.164144,4206.700195,552.299988,81.940002,...,324.0,3111300.0,425470000.0,4508050000.0,0.0,0.0,0.0,170040500.0,0.0,0.0
2025-12-10,93.199997,92020.945312,62.209999,58.459999,98.790001,33.57,1.162831,4196.399902,554.200012,83.32,...,462.0,8096000.0,545610000.0,5526570000.0,0.0,0.0,0.0,209327200.0,0.0,0.0


# Prétraitement

In [3]:
def clean_name(name: str):
    s = str(name)

    # 1) remove leading non-alphanumeric characters (anything not [A-Za-z0-9])
    s = re.sub(r'^[^A-Za-z0-9]+', '', s)

    # 2) replace any remaining non-word characters with underscores
    # \w = [A-Za-z0-9_]; anything else becomes '_'
    s = re.sub(r'\W+', '_', s)

    # 3) collapse multiple underscores
    s = re.sub(r'_+', '_', s)

    # 4) strip trailing underscores (optional but tidy)
    s = s.strip('_')

    return s

In [4]:
# --- Preprocessing ---
# Drop NaN based on gold price
result_df = result_df.dropna(how='any', subset=["GC=F_Close"], axis=0)

# Replace special characters in columns name
result_df.columns = [clean_name(c) for c in result_df.columns]

In [None]:
def chronological_split(X, y, train_ratio=0.7):
    """Chronological split (no shuffle). Returns X_train, X_val, y_train, y_val."""
    X = X.sort_index()
    y = y.loc[X.index]
    split_idx = int(np.floor(len(X) * train_ratio))
    return X.iloc[:split_idx], X.iloc[split_idx:], y.iloc[:split_idx], y.iloc[split_idx:]

def evaluate_metrics(y_true, y_pred, prefix=""):
    """Compute MAE, RMSE, R2 and print them."""
    mae = mean_absolute_error(y_true, y_pred)
    rmse = root_mean_squared_error(y_true, y_pred)
    r2 = r2_score(y_true, y_pred)
    print(f"{prefix} MAE={mae:.6f} | RMSE={rmse:.6f} | R2={r2:.6f}")
    return {"MAE": mae, "RMSE": rmse, "R2": r2}


In [6]:
class PrePreprocessTransformer(BaseEstimator, TransformerMixin):
    """
    Drop columns mostly fill of 0
    """
    def __init__(self, threshold=0.9, exclude=None):
        self.threshold = threshold
        self.exclude = exclude or []
        self._drop_cols_ = None  # learned set of columns to drop
        self._feature_names_in_ = None

    def fit(self, X, y=None):
        if not isinstance(X, pd.DataFrame):
            raise TypeError("PrePreprocessTransformer expects a pandas DataFrame in fit.")

        self._feature_names_in_ = X.columns.tolist()

        # Work on a copy to avoid mutation
        df = X.copy()

        # Restrict to numeric columns (excluding any explicitly protected ones)
        num_cols = df.select_dtypes(include=[np.number]).columns.tolist()
        num_cols = [c for c in num_cols if c not in self.exclude]

        # Compute zero ratio per column using non-NA counts
        zero_ratio = (df[num_cols] == 0).sum() / df[num_cols].count()

        # Columns to drop, learned from training data
        self._drop_cols_ = zero_ratio[zero_ratio >= self.threshold].index.tolist()
        return self

    def transform(self, X):
        if not isinstance(X, pd.DataFrame):
            raise TypeError("PrePreprocessTransformer expects a pandas DataFrame in transform.")

        if self._drop_cols_ is None:
            raise RuntimeError("Transformer not fitted. Call fit before transform.")

        # Drop the learned columns (ignore missing ones gracefully)
        return X.drop(columns=[c for c in self._drop_cols_ if c in X.columns], axis=1)

    def get_feature_names_out(self, input_features=None):
        if self._feature_names_in_ is None:
            raise RuntimeError("Transformer not fitted. Call fit before get_feature_names_out.")
        output = [c for c in self._feature_names_in_ if c not in (self._drop_cols_ or [])]
        return np.array(output, dtype=object)

    def set_output(self, *, transform=None):
        # compatibility with sklearn's set_output API
        return self


class CustomPreprocessTransformer(BaseEstimator, TransformerMixin):
    """
     Fill NaN based on a custom strategy:
    - if gap at the begining, fill with the first known value
    - if gap in the middle, linear interpolation
    - if gap at the end, fill with the last known value
    """
    def __init__(self, method='linear', drop_all_nan=False):
        self.method = method
        self.drop_all_nan = drop_all_nan
        self._feature_names_in_ = None
        self._feature_names_out_ = None

    def fit(self, X, y=None):
        if not isinstance(X, pd.DataFrame):
            raise TypeError("PreprocessEngineeringTransformer expects a pandas DataFrame in fit.")

        self._feature_names_in_ = X.columns.to_list()
        return self

    def transform(self, X):
        if not isinstance(X, pd.DataFrame):
            raise TypeError("PreprocessEngineeringTransformer expects a pandas DataFrame.")

        X = X.copy()

        # Select the columns
        numeric_cols = X.select_dtypes(include=[np.number]).columns

        # Fill NaN strategy
        X[numeric_cols] = (
            X[numeric_cols].interpolate(method=self.method, axis=0, limit_direction="both")
        )

        # Drop columns of full NaN
        if self.drop_all_nan:
            all_nan_cols = [c for c in numeric_cols if X[c].isna().all()]
            if all_nan_cols:
                X = X.drop(columns=all_nan_cols)

        # Track output feature names post-transform
        self._feature_names_out_ = X.columns.to_list()

        return X

    def get_feature_names_out(self, input_features=None):
        if self._feature_names_out_ is not None:
            return np.array(self._feature_names_out_, dtype=object)

        if self._feature_names_in_ is not None:
            return np.array(self._feature_names_in_, dtype=object)
        raise RuntimeError("Transformer not fitted. Call fit/transform before get_feature_names_out.")

    def set_output(self, *, transform=None):
        # No-op to be compatible with pipelines calling set_output
        return self


class CustomFeatureSelector(BaseEstimator, TransformerMixin):
    """
    Drop highly correlated numerical features (> threshold) based on upper triangle of correlation matrix.
    """
    def __init__(self, num_corr_threshold=0.95, method='pearson'):
        self.num_corr_threshold = num_corr_threshold
        self.method = method
        self.num_cols_ = None
        self.num_col_to_drop_ = None
        self._feature_names_in_ = None

    def fit(self, X, y=None):
        if not isinstance(X, pd.DataFrame):
            raise TypeError("CustomFeatureSelector expects a pandas DataFrame.")
        self._feature_names_in_ = list(X.columns)
        self.num_cols_ = list(X.select_dtypes(include=[np.number]).columns)
        if len(self.num_cols_) == 0:
            self.num_col_to_drop_ = []
            return self
        corr_num = X[self.num_cols_].corr(method=self.method)
        upper = corr_num.where(np.triu(np.ones(corr_num.shape), k=1).astype(bool)).abs()
        self.num_col_to_drop_ = [c for c in upper.columns if any(upper[c] > self.num_corr_threshold)]
        return self

    def transform(self, X, y=None):
        if not isinstance(X, pd.DataFrame):
            raise TypeError("CustomFeatureSelector expects a pandas DataFrame.")
        return X.drop(columns=getattr(self, 'num_col_to_drop_', []), errors='ignore')

    def get_feature_names_out(self, input_features=None):
        if self._feature_names_in_ is None:
            raise RuntimeError("Transformer not fitted. Call fit before get_feature_names_out.")
        drop = set(self.num_col_to_drop_ or [])
        output = [c for c in self._feature_names_in_ if c not in drop]
        return np.array(output, dtype=object)

    def set_output(self, *, transform=None):
        # No-op to be compatible with pipelines calling set_output
        return self

In [7]:
def build_preprocessing_pipeline(
    num_corr_threshold=0.95,
    method='linear',
    remove_features=False,
    strategy="custom",
    exclude_for_zero_drop=None
):
    exclude_for_zero_drop = exclude_for_zero_drop or []
    num_selector = make_column_selector(dtype_include=np.number)
    #cat_selector = make_column_selector(dtype_include=["object", "category"])

    if strategy == "custom":
        pre_steps = Pipeline([
            ("preprocess", PrePreprocessTransformer(exclude=exclude_for_zero_drop)),
            ("imputer", CustomPreprocessTransformer(method=method)),
        ]).set_output(transform="pandas")

        numeric_block = Pipeline([
            ("scaler", RobustScaler())
        ])

        preprocessor = Pipeline([
            ("pre_custom", pre_steps),
            ("ct", ColumnTransformer(
                transformers=[("num", numeric_block, num_selector),
                              #("cat", OrdinalEncoder(handle_unknown="ignore"), cat_selector),
                              ],
                remainder="drop"
            ).set_output(transform="pandas"))
        ]).set_output(transform="pandas")

    elif strategy == "mean":
        numeric_block = Pipeline([
            ("imputer", SimpleImputer(strategy="mean")),
            ("scaler", RobustScaler())
        ])

        preprocessor = ColumnTransformer(
            transformers=[("num", numeric_block, num_selector),
                          #("cat", OrdinalEncoder(handle_unknown="ignore"), cat_selector),
                          ],
            remainder="drop"
        ).set_output(transform="pandas")

    elif strategy == "median":
        numeric_block = Pipeline([
            ("imputer", SimpleImputer(strategy="median")),
            ("scaler", RobustScaler())
        ])

        preprocessor = ColumnTransformer(
            transformers=[("num", numeric_block, num_selector),
                          #("cat", OrdinalEncoder(handle_unknown="ignore"), cat_selector),
                          ],
            remainder="drop"
        ).set_output(transform="pandas")

    else:
        raise TypeError("Strategy not correct")

    if remove_features:
        pipe_new = Pipeline(steps=[
            ("preprocessing", preprocessor),
            ("corr_feature_dropper", CustomFeatureSelector(num_corr_threshold=num_corr_threshold, method='pearson')),
        ]).set_output(transform="pandas")
    else:
        pipe_new = Pipeline(steps=[
            ("preprocessing", preprocessor),
        ]).set_output(transform="pandas")

    return pipe_new

In [8]:
# --- Settings ---
horizon_day=1
train_ratio=0.7
num_corr_threshold=0.95
plot_learning_curves=True
strategy="custom" # median, mean

target_source_col = 'GC_F_Close'

In [9]:
# --- Features and target ---
df = result_df.sort_index().copy()
target_col = f"{target_source_col}_t+{horizon_day}"
df[target_col] = df[target_source_col].shift(-horizon_day)

# Drop values of the shift
df = df.dropna(subset=[target_col])

# Set close column as first column
cols = list(df.columns)
cols.remove(target_source_col)
cols.insert(0, target_source_col)
df = df[cols]

feature_cols = df.drop(columns=target_col).columns
X = df[feature_cols]
y = df[target_col]

# Model

In [10]:
# --- Feature selection ---
model = LinearRegression()

strategy="custom"

pipe_auto = build_preprocessing_pipeline(remove_features=True, strategy=strategy, exclude_for_zero_drop=['GC_F_Close']) # mean, median

pipe = Pipeline(steps=[
    ('preprocessing', pipe_auto),
    ('sfm', SelectFromModel(model)),
    ('model', model)
])

pipe.fit(X, y)