In [None]:
import os
import pandas as pd
import numpy as np
from statsmodels.tsa.statespace.sarimax import SARIMAX
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error
import logging
import warnings
warnings.filterwarnings("ignore")

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)

# Load datasets
data_path = "data"
file_names = {
    "bitcoin": "bitcoin.csv",
    "gold": "gold.csv",
    "google_trends": "google_trends.csv",
    "sp500": "sp500.csv",
    "treasury_3m": "treasury_3m.csv",
    "treasury_10y": "treasury_10y.csv",
    "copper": "copper.csv",
    "oil": "oil.csv",
    "unemployment": "unemployment.csv",
    "moex": "MOEX.csv",
    "sse": "SSE.csv",
    "stoxx": "STOXX_600.csv",
    "vix": "vix.csv",
    "spgsci": "spgsci.csv"
}

data = {}
for key, file in file_names.items():
    file_path = os.path.join(data_path, file)
    if os.path.exists(file_path):
        logging.info(f"Loading {file}")
        df = pd.read_csv(file_path)
        if "timestamp" not in df.columns:
            logging.warning(f"{file} skipped: no 'timestamp' column")
            continue
        df["timestamp"] = pd.to_datetime(df["timestamp"])
        df.replace({'.': np.nan}, inplace=True)
        for col in df.columns:
            if col != "timestamp":
                df[col] = pd.to_numeric(df[col], errors='coerce')
        data[key] = df
    else:
        logging.warning(f"{file} not found")

rename_map = {
    "bitcoin": {"Close": "bitcoin_close", "Open": "bitcoin_open", "High": "bitcoin_high", "Low": "bitcoin_low", "Volume": "bitcoin_volume"},
    "gold": {"Close": "gold_close", "Open": "gold_open", "High": "gold_high", "Low": "gold_low", "Volume": "gold_volume"},
    "oil": {"Close": "oil_close", "Open": "oil_open", "High": "oil_high", "Low": "oil_low", "Volume": "oil_volume"},
    "copper": {"price": "copper_price"},
    "google_trends": {"SPX": "google_spx", "ETF": "google_etf", "index fund": "google_index_fund", "sp500": "google_sp500"},
    "unemployment": {"Unemployment": "unemployment_rate"},
    "treasury_3m": {"Close": "treasury_3m"},
    "treasury_10y": {"Close": "treasury_10y"},
    "sp500": {"Close": "sp500_close"},
    "moex": {"Close": "moex_close"},
    "sse": {"Close": "sse_close"},
    "stoxx": {"Close": "stoxx_close"},
    "vix": {"Close": "vix_close"},
    "spgsci": {"Close": "spgsci_close"}
}

for key, renames in rename_map.items():
    if key in data:
        data[key] = data[key].rename(columns=renames)

logging.info("Merging all dataframes")
sp500 = data["sp500"]
all_data = sp500[["timestamp", "sp500_close"]]
for key, df in data.items():
    if key != "sp500":
        all_data = all_data.merge(df, on="timestamp", how="left")

all_data.sort_values("timestamp", inplace=True)
all_data.fillna(method="ffill", inplace=True)
all_data.dropna(inplace=True)

def add_lag_rolling_features(df, base_cols, lags=[1, 3, 7], windows=[3, 7]):
    for col in base_cols:
        for lag in lags:
            df[f"{col}_lag_{lag}"] = df[col].shift(lag)
        for window in windows:
            df[f"{col}_roll_mean_{window}"] = df[col].rolling(window).mean()
            df[f"{col}_roll_std_{window}"] = df[col].rolling(window).std()
    return df

key_features = ["sp500_close", "vix_close", "bitcoin_close", "oil_close"]
all_data = add_lag_rolling_features(all_data, key_features)
all_data.dropna(inplace=True)
all_data = all_data.loc[:, all_data.nunique() > 1]
logging.info("Final dataset shape with lags/rolls: %s", all_data.shape)

target = "sp500_close"
exclude_cols = ["timestamp", target]
exog_vars = [col for col in all_data.columns if col not in exclude_cols]


from statsmodels.tsa.stattools import adfuller

def adf_test(series, signif=0.05, name=''):
    result = adfuller(series.dropna(), autolag='AIC')
    p_value = result[1]
    print(f'ADF Test for {name}: p-value = {p_value}')
    return p_value < signif

def make_stationary(df, columns, signif=0.05):
    stationary_df = df.copy()
    for col in columns:
        if not adf_test(df[col], signif=signif, name=col):
            stationary_df[col] = df[col].diff().dropna()
            print(f'--> Differenced {col}')
        else:
            print(f'--> {col} is already stationary')
    return stationary_df

# Make target and exogenous variables stationary
all_data = make_stationary(all_data, [target] + exog_vars)
all_data.dropna(inplace=True)

results = []
start_date = pd.Timestamp("2023-01-01")
end_date = pd.Timestamp("2025-03-01")
current_date = start_date

while current_date <= end_date:
    logging.info(f"Backtest for month starting {current_date.strftime('%Y-%m-%d')}")
    next_month = current_date + pd.DateOffset(months=1)

    train = all_data[all_data["timestamp"] < current_date]
    test = all_data[(all_data["timestamp"] >= current_date) & (all_data["timestamp"] < next_month)]

    if len(train) < 100 or len(test) < 10:
        logging.warning("Skipping month due to insufficient data")
        current_date += pd.DateOffset(months=1)
        continue

    y_train = train[target]
    y_test = test[target]
    X_train = train[exog_vars]
    X_test = test[exog_vars]

    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)

    pca = PCA(n_components=0.95)
    X_train_pca = pca.fit_transform(X_train_scaled)
    X_test_pca = pca.transform(X_test_scaled)

    best_aic = np.inf
    best_model = None
    best_order = None
    best_seasonal_order = None

    for p in [0, 1]:
        for q in [0, 1]:
            for P in [0, 1]:
                for Q in [0, 1]:
                    try:
                        model = SARIMAX(
                            y_train,
                            exog=X_train_pca,
                            order=(p, 1, q),
                            seasonal_order=(P, 0, Q, 12),
                            enforce_stationarity=False,
                            enforce_invertibility=False
                        )
                        model_fit = model.fit(disp=False)
                        if model_fit.aic < best_aic:
                            best_aic = model_fit.aic
                            best_model = model_fit
                            best_order = (p, 1, q)
                            best_seasonal_order = (P, 0, Q, 12)
                    except:
                        continue

    if best_model:
        forecast = best_model.forecast(steps=len(X_test_pca), exog=X_test_pca)
        r2 = r2_score(y_test, forecast)
        mae = mean_absolute_error(y_test, forecast)
        rmse = np.sqrt(mean_squared_error(y_test, forecast))

        results.append({
            "Backtest": len(results) + 1,
            "Train Size": len(train),
            "Test Start": current_date.strftime("%Y-%m-%d"),
            "R2": r2,
            "MAE": mae,
            "RMSE": rmse,
            "Order": best_order,
            "Seasonal Order": best_seasonal_order,
            "AIC": best_aic
        })

    current_date += pd.DateOffset(months=1)

results_df = pd.DataFrame(results)
os.makedirs("results", exist_ok=True)
results_df.to_csv("results/sarimax_backtest_lags_rolls.csv", index=False)
print(results_df)

In [None]:

from statsmodels.tsa.stattools import adfuller

def adf_test(series, signif=0.05, name=''):
    result = adfuller(series.dropna(), autolag='AIC')
    p_value = result[1]
    print(f'ADF Test for {name}: p-value = {p_value}')
    return p_value < signif

def make_stationary(df, columns, signif=0.05):
    stationary_df = df.copy()
    for col in columns:
        if not adf_test(df[col], signif=signif, name=col):
            stationary_df[col] = df[col].diff().dropna()
            print(f'--> Differenced {col}')
        else:
            print(f'--> {col} is already stationary')
    return stationary_df

# Example usage (adjust as needed):
# target_col = 'sp500'
# exog_cols = ['gold', 'oil', 'bitcoin', 'treasury_3m', 'treasury_10y', 'copper', 'unemployment']
# df = make_stationary(df, [target_col] + exog_cols)


In [None]:

from statsmodels.tsa.stattools import adfuller

def adf_test(series, signif=0.05, name=''):
    result = adfuller(series.dropna(), autolag='AIC')
    p_value = result[1]
    print(f'ADF Test for {name}: p-value = {p_value}')
    return p_value < signif

def make_stationary(df, columns, signif=0.05):
    stationary_df = df.copy()
    for col in columns:
        if not adf_test(df[col], signif=signif, name=col):
            stationary_df[col] = df[col].diff().dropna()
            print(f'--> Differenced {col}')
        else:
            print(f'--> {col} is already stationary')
    return stationary_df

# Example usage (adjust as needed):
# target_col = 'sp500'
# exog_cols = ['gold', 'oil', 'bitcoin', 'treasury_3m', 'treasury_10y', 'copper', 'unemployment']
# df = make_stationary(df, [target_col] + exog_cols)
