# PSO-LSTMによる株価指数の騰落率予測（5分足）

このノートブックは、論文「Enhancing stock index prediction: A hybrid LSTM-PSO model for improved forecasting accuracy」に基づき、
PSOでLSTMのハイパーパラメータを最適化し、次時点の騰落率（pct_change）を予測します。

- データ: yfinance（5分足）
- ルックバック: 5分 / 30分 / 60分
- マクロ指標: USD/JPY、金利（^IRX）
- 評価指標: RMSE / MAE / MAPE / R2

> 注意: 5分足データは取得可能期間が短いです。必要に応じて期間を調整してください。


In [None]:
import warnings
from dataclasses import dataclass

import numpy as np
import pandas as pd
import yfinance as yf
import pywt

from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

import ta
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

import pyswarms as ps

warnings.filterwarnings("ignore")
np.random.seed(42)
tf.random.set_seed(42)

# GPUがある場合はメモリ成長を有効化
physical_gpus = tf.config.list_physical_devices("GPU")
if physical_gpus:
    for gpu in physical_gpus:
        tf.config.experimental.set_memory_growth(gpu, True)
    print(f"GPU利用: {len(physical_gpus)}台")
else:
    print("GPUなし: CPUで実行します")


In [None]:
# ===== 設定 =====
INDEX_TICKERS = ["^GSPC"]  # 例: S&P500
FX_TICKER = "USDJPY=X"
RATE_TICKER = "^IRX"  # 米国短期金利（代替指標）

BASE_INTERVAL = "5m"
BASE_PERIOD = "60d"  # 5分足の取得可能期間に合わせる

RESAMPLE_MINUTES = [5, 30, 60]
LOOKBACK_STEPS = {
    5: 5,    # 5分足 -> 5ステップ
    30: 30,  # 30分足 -> 30ステップ
    60: 60,  # 60分足 -> 60ステップ
}

TRAIN_RATIO = 0.8
VAL_RATIO = 0.2
BATCH_SIZE = 32
REPEATS = 1  # 1回 or 3回

# PSO設定（論文値はK=50）
PSO_PARTICLES = 20
PSO_ITERS = 10
PSO_W = 0.8
PSO_C1 = 1.5
PSO_C2 = 1.5

# PSO探索範囲
NEURON_BOUNDS = (50, 300)
EPOCH_BOUNDS = (50, 300)
LAYER_BOUNDS = (1, 3)

print("設定完了")


In [None]:
def _normalize_index(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df.index = pd.to_datetime(df.index)
    if df.index.tz is not None:
        df.index = df.index.tz_convert(None)
    return df


def download_price_data(ticker: str, interval: str, period: str) -> pd.DataFrame:
    df = yf.download(ticker, interval=interval, period=period, auto_adjust=False, progress=False)
    # 概要
    if df.empty:
        raise ValueError(f"データ取得に失敗: {ticker}")
    df = _normalize_index(df)
    df = df.rename(columns=str.lower)
    return df


def download_macro_daily(ticker: str, period_years: int = 5) -> pd.Series:
    df = yf.download(ticker, interval="1d", period=f"{period_years}y", auto_adjust=False, progress=False)
    if df.empty:
        raise ValueError(f"マクロデータ取得に失敗: {ticker}")
    df = _normalize_index(df)
    series = df["Close"].copy()
    series.name = ticker
    return series


def resample_ohlcv(df: pd.DataFrame, minutes: int) -> pd.DataFrame:
    if minutes == 5:
        return df.copy()
    rule = f"{minutes}min"
    ohlc = df["open"].resample(rule).first()
    high = df["high"].resample(rule).max()
    low = df["low"].resample(rule).min()
    close = df["close"].resample(rule).last()
    volume = df["volume"].resample(rule).sum()
    out = pd.concat([ohlc, high, low, close, volume], axis=1)
    out.columns = ["open", "high", "low", "close", "volume"]
    return out.dropna()


def merge_macro_features(price_df: pd.DataFrame, fx: pd.Series, rate: pd.Series) -> pd.DataFrame:
    df = price_df.copy()
    fx = fx.reindex(df.index, method="ffill")
    rate = rate.reindex(df.index, method="ffill")
    df["usd_jpy"] = fx
    df["interest_rate"] = rate
    return df


In [None]:
def wavelet_denoise(series: pd.Series, wavelet: str = "haar", level: int = 3) -> pd.Series:
    if isinstance(series, pd.DataFrame):
        series = series.squeeze()
        if isinstance(series, pd.DataFrame):
            series = series.iloc[:, 0]
    arr = np.asarray(series, dtype=np.float64).ravel().copy()
    coeffs = pywt.wavedec(arr, wavelet, level=level)
    detail_coeffs = coeffs[1:]
    sigma = np.median(np.abs(detail_coeffs[-1])) / 0.6745 if len(detail_coeffs) > 0 else 0
    uthresh = sigma * np.sqrt(2 * np.log(len(arr))) if sigma > 0 else 0
    coeffs[1:] = [pywt.threshold(c, value=uthresh, mode="soft") for c in coeffs[1:]]
    reconstructed = pywt.waverec(coeffs, wavelet)
    reconstructed = reconstructed[: len(arr)]
    name = getattr(series, "name", None)
    index = series.index if hasattr(series, "index") else pd.RangeIndex(len(reconstructed))
    return pd.Series(reconstructed, index=index, name=name)


def add_technical_indicators(df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()
    idx = out.index
    for col in ["open", "high", "low", "close", "volume"]:
        if col in out.columns:
            out[col] = pd.Series(np.asarray(out[col], dtype=float).ravel(), index=idx)

    macd = ta.trend.MACD(close=out["close"].squeeze())
    out["macd"] = macd.macd_diff().values.reshape(-1, 1)

    out["cci"] = ta.trend.cci(out["high"].squeeze(), out["low"].squeeze(), out["close"].squeeze()).values.reshape(-1, 1)
    out["atr"] = ta.volatility.average_true_range(out["high"].squeeze(), out["low"].squeeze(), out["close"].squeeze()).values.reshape(-1, 1)

    boll = ta.volatility.BollingerBands(close=out["close"].squeeze())
    out["boll"] = boll.bollinger_mavg().values.reshape(-1, 1)

    out["ema20"] = ta.trend.EMAIndicator(out["close"].squeeze(), window=20).ema_indicator().values.reshape(-1, 1)
    out["ma5"] = out["close"].rolling(5).mean()
    out["ma10"] = out["close"].rolling(10).mean()

    out["mtm6"] = out["close"] - out["close"].shift(6)
    out["mtm12"] = out["close"] - out["close"].shift(12)

    out["roc"] = ta.momentum.ROCIndicator(out["close"].squeeze(), window=10).roc().values.reshape(-1, 1)

    # SMI: 簡易実装（EMA二重平滑）
    hl = (out["high"] + out["low"]) / 2
    diff = out["close"] - hl
    hl_range = out["high"] - out["low"]
    ema1 = diff.ewm(span=14, adjust=False).mean()
    ema2 = ema1.ewm(span=14, adjust=False).mean()
    range_ema1 = hl_range.ewm(span=14, adjust=False).mean()
    range_ema2 = range_ema1.ewm(span=14, adjust=False).mean()
    out["smi"] = 100 * (ema2 / (0.5 * range_ema2.replace(0, np.nan)))

    # WVAD: Williams Variable Accumulation/Distribution
    denom = (out["high"] - out["low"]).replace(0, np.nan)
    out["wvad"] = ((out["close"] - out["open"]) / denom) * out["volume"]

    return out


def remove_high_corr_features(df: pd.DataFrame, target_col: str, threshold: float = 0.95) -> tuple[pd.DataFrame, list]:
    corr_series = df.corr(numeric_only=True)[target_col].abs()
    if isinstance(corr_series, pd.DataFrame):
        corr_series = corr_series.squeeze()
    vals = np.asarray(corr_series).ravel()
    names = list(corr_series.index)
    is_high = (vals > threshold).astype(bool)
    not_self = np.array([n != target_col for n in names], dtype=bool)
    mask = is_high & not_self
    drop_cols = [names[i] for i in range(len(names)) if mask[i]]
    return df.drop(columns=drop_cols), drop_cols


In [None]:
def build_target_returns(df: pd.DataFrame) -> pd.Series:
    close = pd.Series(np.asarray(df["close"]).ravel(), index=df.index)
    returns = close.pct_change().shift(-1)
    returns.name = "target_return"
    return returns


def create_sequences(features: np.ndarray, target: np.ndarray, lookback: int):
    xs, ys = [], []
    for i in range(lookback, len(features)):
        xs.append(features[i - lookback : i])
        ys.append(target[i])
    return np.array(xs), np.array(ys)


def train_val_test_split(X, y, train_ratio=0.8, val_ratio=0.2):
    n = len(X)
    train_end = int(n * train_ratio)
    val_end = int(train_end * (1 - val_ratio))

    X_train = X[:val_end]
    y_train = y[:val_end]
    X_val = X[val_end:train_end]
    y_val = y[val_end:train_end]
    X_test = X[train_end:]
    y_test = y[train_end:]
    return X_train, y_train, X_val, y_val, X_test, y_test


def scale_train_val_test(X_train, X_val, X_test, y_train, y_val, y_test):
    n_features = X_train.shape[2]

    x_scaler = MinMaxScaler(feature_range=(-1, 1))
    X_train_s = x_scaler.fit_transform(X_train.reshape(-1, n_features)).reshape(X_train.shape)
    X_val_s = x_scaler.transform(X_val.reshape(-1, n_features)).reshape(X_val.shape)
    X_test_s = x_scaler.transform(X_test.reshape(-1, n_features)).reshape(X_test.shape)

    y_scaler = MinMaxScaler(feature_range=(-1, 1))
    y_train_s = y_scaler.fit_transform(y_train.reshape(-1, 1))
    y_val_s = y_scaler.transform(y_val.reshape(-1, 1))
    y_test_s = y_scaler.transform(y_test.reshape(-1, 1))

    return X_train_s, X_val_s, X_test_s, y_train_s, y_val_s, y_test_s, x_scaler, y_scaler


In [None]:
def build_lstm_model(input_shape, num_layers: int, num_units: int):
    model = keras.Sequential()
    for i in range(num_layers):
        return_sequences = i < num_layers - 1
        if i == 0:
            model.add(layers.LSTM(num_units, return_sequences=return_sequences, input_shape=input_shape))
        else:
            model.add(layers.LSTM(num_units, return_sequences=return_sequences))
        model.add(layers.Dropout(0.2))

    model.add(layers.Dense(1))
    model.compile(optimizer="adam", loss="mse")
    return model


In [None]:
def pso_optimize(X_train, y_train, X_val, y_val, input_shape):
    def _objective(particles):
        costs = []
        for particle in particles:
            units = int(np.clip(round(particle[0]), *NEURON_BOUNDS))
            epochs = int(np.clip(round(particle[1]), *EPOCH_BOUNDS))
            layers = int(np.clip(round(particle[2]), *LAYER_BOUNDS))

            tf.keras.backend.clear_session()
            model = build_lstm_model(input_shape, layers, units)
            es = keras.callbacks.EarlyStopping(monitor="val_loss", patience=5, restore_best_weights=True)
            model.fit(
                X_train,
                y_train,
                validation_data=(X_val, y_val),
                epochs=epochs,
                batch_size=BATCH_SIZE,
                verbose=0,
                callbacks=[es],
            )
            preds = model.predict(X_val, verbose=0)
            rmse = np.sqrt(mean_squared_error(y_val, preds))
            costs.append(rmse)
        return np.array(costs)

    options = {"c1": PSO_C1, "c2": PSO_C2, "w": PSO_W}
    lower_bounds = np.array([NEURON_BOUNDS[0], EPOCH_BOUNDS[0], LAYER_BOUNDS[0]])
    upper_bounds = np.array([NEURON_BOUNDS[1], EPOCH_BOUNDS[1], LAYER_BOUNDS[1]])
    bounds = (lower_bounds, upper_bounds)

    optimizer = ps.single.GlobalBestPSO(
        n_particles=PSO_PARTICLES,
        dimensions=3,
        options=options,
        bounds=bounds,
    )
    best_cost, best_pos = optimizer.optimize(_objective, iters=PSO_ITERS, verbose=False)
    return best_pos, best_cost


In [None]:
def resample_series(series: pd.Series, minutes: int) -> pd.Series:
    if minutes == 5:
        return series.copy()
    rule = f"{minutes}min"
    return series.resample(rule).last().dropna()


def compute_metrics(y_true, y_pred):
    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    mae = mean_absolute_error(y_true, y_pred)
    eps = 1e-8
    mape = np.mean(np.abs((y_true - y_pred) / (y_true + eps))) * 100
    r2 = r2_score(y_true, y_pred)
    return rmse, mae, mape, r2


results = []

for ticker in INDEX_TICKERS:
    print(f"\n=== {ticker} ===")
    base_price = download_price_data(ticker, BASE_INTERVAL, BASE_PERIOD)

    try:
        fx_5m = download_price_data(FX_TICKER, BASE_INTERVAL, BASE_PERIOD)["close"]
    except Exception:
        fx_5m = download_macro_daily(FX_TICKER)

    rate_daily = download_macro_daily(RATE_TICKER)

    for minutes in RESAMPLE_MINUTES:
        print(f"--- {minutes}分足 ---")
        df = resample_ohlcv(base_price, minutes)
        df["close"] = wavelet_denoise(df["close"], level=3)

        df = add_technical_indicators(df)

        fx_resampled = resample_series(fx_5m, minutes)
        df = merge_macro_features(df, fx_resampled, rate_daily)

        df["target_return"] = build_target_returns(df)
        df = df.dropna()

        feature_df, dropped = remove_high_corr_features(df.drop(columns=["target_return"]), target_col="close")
        df_final = feature_df.copy()
        df_final["target_return"] = df["target_return"]

        feature_cols = [c for c in df_final.columns if c != "target_return"]
        features = df_final[feature_cols].values
        target = df_final["target_return"].values

        lookback = LOOKBACK_STEPS[minutes]
        X, y = create_sequences(features, target, lookback)

        X_train, y_train, X_val, y_val, X_test, y_test = train_val_test_split(
            X, y, train_ratio=TRAIN_RATIO, val_ratio=VAL_RATIO
        )

        X_train_s, X_val_s, X_test_s, y_train_s, y_val_s, y_test_s, x_scaler, y_scaler = scale_train_val_test(
            X_train, X_val, X_test, y_train, y_val, y_test
        )

        input_shape = (X_train_s.shape[1], X_train_s.shape[2])

        best_pos, best_cost = pso_optimize(X_train_s, y_train_s, X_val_s, y_val_s, input_shape)
        best_units = int(np.clip(round(best_pos[0]), *NEURON_BOUNDS))
        best_epochs = int(np.clip(round(best_pos[1]), *EPOCH_BOUNDS))
        best_layers = int(np.clip(round(best_pos[2]), *LAYER_BOUNDS))

        tf.keras.backend.clear_session()
        final_model = build_lstm_model(input_shape, best_layers, best_units)
        final_model.fit(
            np.concatenate([X_train_s, X_val_s]),
            np.concatenate([y_train_s, y_val_s]),
            epochs=best_epochs,
            batch_size=BATCH_SIZE,
            verbose=0,
        )

        pred_scaled = final_model.predict(X_test_s, verbose=0)
        y_pred = y_scaler.inverse_transform(pred_scaled).reshape(-1)
        y_true = y_scaler.inverse_transform(y_test_s).reshape(-1)

        rmse, mae, mape, r2 = compute_metrics(y_true, y_pred)

        results.append(
            {
                "ticker": ticker,
                "minutes": minutes,
                "lookback": lookback,
                "units": best_units,
                "epochs": best_epochs,
                "layers": best_layers,
                "rmse": rmse,
                "mae": mae,
                "mape": mape,
                "r2": r2,
                "dropped_features": dropped,
            }
        )

        print(
            f"units={best_units}, epochs={best_epochs}, layers={best_layers} | "
            f"RMSE={rmse:.6f}, MAE={mae:.6f}, MAPE={mape:.4f}, R2={r2:.4f}"
        )

results_df = pd.DataFrame(results)
results_df


In [None]:
results_df = results_df.sort_values(["ticker", "minutes"]).reset_index(drop=True)
results_df
