# 拡張時系列モデル評価
複数のモデルを試し、途中経過を表示しながら精度を比較します。

In [None]:
import warnings
warnings.filterwarnings('ignore')

import yfinance as yf
import pandas as pd
import numpy as np

from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, roc_auc_score
from sklearn.model_selection import TimeSeriesSplit, RandomizedSearchCV
from sklearn.linear_model import LogisticRegression
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier
from catboost import CatBoostClassifier

try:
    from prophet import Prophet
    prophet_available = True
except Exception as e:
    Prophet = None
    prophet_available = False
    prophet_error = e

try:
    from statsmodels.tsa.api import ExponentialSmoothing, ARIMA
    from statsmodels.tsa.statespace.sarimax import SARIMAX
    statsmodels_available = True
except Exception as e:
    ExponentialSmoothing = ARIMA = SARIMAX = None
    statsmodels_available = False
    statsmodels_error = e

try:
    from tbats import TBATS
    tbats_available = True
except Exception as e:
    TBATS = None
    tbats_available = False
    tbats_error = e

try:
    from neuralprophet import NeuralProphet
    neuralprophet_available = True
except Exception as e:
    NeuralProphet = None
    neuralprophet_available = False
    neuralprophet_error = e

# 深層学習系: TensorFlow/Keras を使う
try:
    from tensorflow.keras.models import Sequential
    from tensorflow.keras.layers import Dense, SimpleRNN, LSTM, GRU, Bidirectional
    from tensorflow.keras.callbacks import EarlyStopping
    tf_available = True
except Exception as e:
    Sequential = Dense = SimpleRNN = LSTM = GRU = Bidirectional = EarlyStopping = None
    tf_available = False
    tf_error = e

try:
    from gluonts.dataset.common import ListDataset
    from gluonts.model.deepar import DeepAREstimator
    from gluonts.model.deep_factor import DeepFactorEstimator
    from gluonts.model.seq2seq import MQCNNEstimator
    from gluonts.mx.trainer import Trainer
    gluonts_available = True
except Exception as e:
    DeepAREstimator = DeepFactorEstimator = MQCNNEstimator = Trainer = None
    gluonts_available = False
    gluonts_error = e


In [None]:
print('1. データ取得と前処理 ...')
TICKERS = ['AAPL', '^GSPC']
START_DATE = '2010-01-01'
END_DATE = '2023-12-31'
TARGET_TICKER = 'AAPL'
TARGET_COL = f'{TARGET_TICKER}_Close'
TEST_SIZE_RATIO = 0.3

# 株価データの取得
df = yf.download(TICKERS, start=START_DATE, end=END_DATE, progress=False)

# 特徴量作成
data_for_tabular = pd.concat([df['Close'].rename(columns=lambda c: f"{c}_Close"),
                             df['Volume'].rename(columns=lambda c: f"{c}_Volume")], axis=1)
for t in TICKERS:
    data_for_tabular[f'{t}_Return1D'] = data_for_tabular[f'{t}_Close'].pct_change()
    data_for_tabular[f'{t}_MA20'] = data_for_tabular[f'{t}_Close'].rolling(20).mean()
    data_for_tabular[f'{t}_Volatility'] = data_for_tabular[f'{t}_Close'].pct_change().rolling(20).std()

data_for_tabular['Target_Dir'] = (data_for_tabular[TARGET_COL].shift(-1) > data_for_tabular[TARGET_COL]).astype(int)
data_for_tabular = data_for_tabular.dropna()
if '^GSPC_Volume' in data_for_tabular.columns:
    data_for_tabular = data_for_tabular.drop(columns='^GSPC_Volume')

split_idx = int(len(data_for_tabular) * (1 - TEST_SIZE_RATIO))
X = data_for_tabular.drop(columns='Target_Dir')
y = data_for_tabular['Target_Dir']
X_tr, X_te = X.iloc[:split_idx], X.iloc[split_idx:]
y_tr, y_te = y.iloc[:split_idx], y.iloc[split_idx:]
scaler = StandardScaler()
X_tr_scaled = scaler.fit_transform(X_tr)
X_te_scaled = scaler.transform(X_te)

ts_df = df['Close'][[TARGET_TICKER]].reset_index()
ts_df.columns = ['ds', 'y']
ts_train, ts_test = ts_df.iloc[:split_idx], ts_df.iloc[split_idx:]

y_true_dir = (ts_test['y'].values > ts_test['y'].shift(1).values).astype(int)[1:]
results_list = []


In [None]:
print('2. テーブルデータ系モデルのチューニングと評価 ...')
pos_weight = (y_tr == 0).sum() / (y_tr == 1).sum()
cv = TimeSeriesSplit(n_splits=3)

lr_params = {'C': [0.1, 1, 10]}
xgb_params = {'n_estimators': [100, 200], 'max_depth': [3, 5]}
lgbm_params = {'n_estimators': [100, 200], 'num_leaves': [31, 63]}
cat_params = {'depth': [4, 6], 'learning_rate': [0.03, 0.1]}

models = [
    ('Logistic Regression', LogisticRegression(max_iter=1000, class_weight='balanced'), lr_params),
    ('XGBoost', XGBClassifier(random_state=42, scale_pos_weight=pos_weight), xgb_params),
    ('LightGBM', LGBMClassifier(random_state=42, is_unbalance=True), lgbm_params),
    ('CatBoost', CatBoostClassifier(random_state=42, scale_pos_weight=pos_weight, verbose=0), cat_params),
]

for name, base_model, params in models:
    print(f'{name} をチューニング中...')
    search = RandomizedSearchCV(base_model, params, n_iter=2, cv=cv, scoring='accuracy', random_state=42, verbose=1)
    search.fit(X_tr_scaled, y_tr)
    best_model = search.best_estimator_
    pred = best_model.predict(X_te_scaled)
    acc = accuracy_score(y_te, pred)
    try:
        proba = best_model.predict_proba(X_te_scaled)[:,1]
        auc = roc_auc_score(y_te, proba)
    except Exception:
        auc = np.nan
    results_list.append({'Model': name, 'Accuracy': acc, 'AUC': auc})


In [None]:
print('3. 一般的な時系列モデルの評価 ...')
if statsmodels_available:
    hw_model = ExponentialSmoothing(ts_train['y'], trend='add', seasonal='add', seasonal_periods=252).fit()
    hw_pred = hw_model.forecast(len(ts_test))
    hw_dir = (hw_pred.values > ts_test['y'].shift(1).values).astype(int)[1:]
    acc_hw = accuracy_score(y_true_dir, hw_dir)
    results_list.append({'Model': 'Holt-Winters', 'Accuracy': acc_hw, 'AUC': np.nan})

    best_aic = float('inf')
    best_order = None
    for p in range(3):
        for q in range(3):
            try:
                m = ARIMA(ts_train['y'], order=(p,1,q)).fit()
                if m.aic < best_aic:
                    best_aic = m.aic
                    best_order = (p,1,q)
            except Exception:
                continue
    if best_order:
        arima_model = ARIMA(ts_train['y'], order=best_order).fit()
        arima_pred = arima_model.forecast(len(ts_test))
        arima_dir = (arima_pred.values > ts_test['y'].shift(1).values).astype(int)[1:]
        acc_arima = accuracy_score(y_true_dir, arima_dir)
        results_list.append({'Model': f'ARIMA{best_order}', 'Accuracy': acc_arima, 'AUC': np.nan})

    best_aic = float('inf')
    best_order = None
    for p in range(2):
        for q in range(2):
            try:
                m = SARIMAX(ts_train['y'], order=(p,1,q), seasonal_order=(p,0,q,252)).fit(disp=False)
                if m.aic < best_aic:
                    best_aic = m.aic
                    best_order = (p,1,q)
            except Exception:
                continue
    if best_order:
        sarimax_model = SARIMAX(ts_train['y'], order=best_order, seasonal_order=(best_order[0],0,best_order[2],252)).fit(disp=False)
        sarimax_pred = sarimax_model.forecast(len(ts_test))
        sarimax_dir = (sarimax_pred.values > ts_test['y'].shift(1).values).astype(int)[1:]
        acc_sarimax = accuracy_score(y_true_dir, sarimax_dir)
        results_list.append({'Model': f'SARIMAX{best_order}', 'Accuracy': acc_sarimax, 'AUC': np.nan})
else:
    print(f'statsmodels の読み込みに失敗: {statsmodels_error}')

if tbats_available:
    tbats_est = TBATS(seasonal_periods=(252,))
    tbats_model = tbats_est.fit(ts_train['y'])
    tbats_pred = tbats_model.forecast(steps=len(ts_test))
    tbats_dir = (tbats_pred > ts_test['y'].shift(1).values).astype(int)[1:]
    acc_tbats = accuracy_score(y_true_dir, tbats_dir)
    results_list.append({'Model': 'TBATS', 'Accuracy': acc_tbats, 'AUC': np.nan})
else:
    print(f'TBATS の読み込みに失敗: {tbats_error}')

if prophet_available:
    prophet_model = Prophet()
    prophet_model.fit(ts_train)
    future = prophet_model.make_future_dataframe(periods=len(ts_test))
    fcst = prophet_model.predict(future)
    prophet_pred = fcst['yhat'].iloc[-len(ts_test):]
    prophet_dir = (prophet_pred.values > ts_test['y'].shift(1).values).astype(int)[1:]
    acc_prophet = accuracy_score(y_true_dir, prophet_dir)
    results_list.append({'Model': 'Prophet', 'Accuracy': acc_prophet, 'AUC': np.nan})
else:
    print(f'Prophet の読み込みに失敗: {prophet_error}')

if neuralprophet_available:
    nprophet = NeuralProphet()
    nprophet.fit(ts_train, freq='D')
    forecast = nprophet.predict(nprophet.make_future_dataframe(ts_train, periods=len(ts_test)))
    nprophet_pred = forecast['yhat1'].iloc[-len(ts_test):]
    nprophet_dir = (nprophet_pred.values > ts_test['y'].shift(1).values).astype(int)[1:]
    acc_nprophet = accuracy_score(y_true_dir, nprophet_dir)
    results_list.append({'Model': 'NeuralProphet', 'Accuracy': acc_nprophet, 'AUC': np.nan})
else:
    print(f'NeuralProphet の読み込みに失敗: {neuralprophet_error}')


In [None]:
print('4. 深層学習系モデルの評価 ...')
if tf_available:
    # シンプルな LSTM モデル
    def build_lstm():
        model = Sequential([
            LSTM(32, input_shape=(1, 1)),
            Dense(1, activation='sigmoid')
        ])
        model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
        return model

    X_dl = ts_train['y'].pct_change().dropna().values.reshape(-1,1,1)
    y_dl = (ts_train['y'].pct_change().shift(-1).dropna() > 0).astype(int).values
    split = int(len(X_dl)*0.8)
    X_tr_dl, X_te_dl = X_dl[:split], X_dl[split:]
    y_tr_dl, y_te_dl = y_dl[:split], y_dl[split:]

    lstm = build_lstm()
    lstm.fit(X_tr_dl, y_tr_dl, epochs=2, verbose=1)
    preds = lstm.predict(X_te_dl).ravel()
    preds_bin = (preds > 0.5).astype(int)
    acc_lstm = accuracy_score(y_te_dl, preds_bin)
    results_list.append({'Model': 'LSTM', 'Accuracy': acc_lstm, 'AUC': roc_auc_score(y_te_dl, preds)})
else:
    print(f'TensorFlow の読み込みに失敗: {tf_error}')

if gluonts_available:
    train_ds = ListDataset([{'start': ts_train['ds'].iloc[0], 'target': ts_train['y'].values}], freq='D')
    test_ds = ListDataset([{'start': ts_test['ds'].iloc[0], 'target': ts_test['y'].values}], freq='D')
    trainer = Trainer(epochs=1)
    deepar_est = DeepAREstimator(freq='D', prediction_length=len(ts_test), trainer=trainer)
    deepar_model = deepar_est.train(train_ds)
    for entry, forecast in zip(test_ds, deepar_model.predict(test_ds)):
        deepar_pred = forecast.mean
    deepar_dir = (deepar_pred > ts_test['y'].shift(1).values).astype(int)[1:]
    acc_deepar = accuracy_score(y_true_dir, deepar_dir)
    results_list.append({'Model': 'DeepAR', 'Accuracy': acc_deepar, 'AUC': np.nan})
else:
    print(f'GluonTS の読み込みに失敗: {gluonts_error}')


In [None]:
print('
5. 評価結果')
res_df = pd.DataFrame(results_list)
print(res_df)
