## light-GBMによる極値予測

ステップ1：ライブラリと定数の準備

In [None]:
import pandas as pd
import numpy as np
import lightgbm as lgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report
from tqdm import tqdm
import os

# 定数
TRAIN_PATH  = "/home/nagumo/TSAT/CryptoData/5min_Full/5min_Full_Train/BTC_full_5min_Full_Train.csv"
VALID_PATH  = "/home/nagumo/TSAT/CryptoData/5min_Full/5min_Full_Valid/BTC_full_5min_Full_Valid.csv"
WINDOW      = 288  # 過去24時間分（5分刻み×288）
SEED        = 42

# 指定パラメータ（1パターン）
PARAMS = {
    'num_leaves':    31,
    'learning_rate': 0.05,
    'n_estimators':  200,
    'objective':     'multiclass',
    'num_class':     3,
    'boosting_type': 'gbdt',
    'verbose':       -1,
    'seed':          SEED,
    # class_weight を強化
    # 'class_weight':  {0: 10.0, 1: 10.0, 2: 1.0}
}

# 出力ディレクトリ
os.makedirs("results", exist_ok=True)




ステップ2：データ読み込み＆クリーニング

In [2]:
def load_and_clean(path):
    df = pd.read_csv(path, parse_dates=['date'], index_col='date')
    # 5分刻みの連続データのみを抽出
    return df.asfreq('5T').dropna()

df_train = load_and_clean(TRAIN_PATH)
df_valid = load_and_clean(VALID_PATH)


In [3]:
df_train_filtered = df_train[df_train.index >= '2022-01-01']
df_train_filtered

Unnamed: 0_level_0,open,high,low,close,volume
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2022-01-01 00:00:00,46211.24,46386.33,46150.00,46306.89,184.9810
2022-01-01 00:05:00,46315.10,46504.00,46268.25,46364.90,164.7490
2022-01-01 00:10:00,46363.03,46388.00,46257.90,46316.17,89.0320
2022-01-01 00:15:00,46315.45,46339.90,46211.35,46265.41,67.0770
2022-01-01 00:20:00,46265.42,46400.00,46262.12,46397.00,87.3910
...,...,...,...,...,...
2022-12-31 23:35:00,16529.41,16573.00,16519.90,16535.96,146.5723
2022-12-31 23:40:00,16534.42,16576.00,16528.00,16536.93,87.8290
2022-12-31 23:45:00,16536.93,16576.00,16527.70,16528.45,154.5496
2022-12-31 23:50:00,16529.92,16568.00,16521.70,16529.01,70.3992


ステップ3：ラベル生成関数

In [4]:
def make_labels(close_series, k):
    w = 2*k + 1
    roll_max = close_series.rolling(w, center=True).max()
    roll_min = close_series.rolling(w, center=True).min()
    labels = pd.Series(2, index=close_series.index)
    labels[close_series == roll_max] = 1  # 極大
    labels[close_series == roll_min] = 0  # 極小
    return labels.iloc[k:-k]


ステップ4：特徴量生成関数（ログリターン＋hl_diff、lag0含む）

予測時点でのcloseでログリターンと高低差を正規化

In [5]:
# def make_ts_features_norm(df, window):
#     # （1）ログリターンと高低差比を計算
#     log_ret = np.log(df['close'] / df['close'].shift(1))
#     hl_diff = (df['high'] - df['low']) / df['close']

#     # （2）lag 特徴量をリスト内包で生成
#     log_feats = [log_ret.shift(lag).rename(f'log_ret_lag_{lag}') for lag in range(window)]
#     hl_feats  = [hl_diff.shift(lag).rename(f'hl_diff_lag_{lag}')   for lag in range(window)]

#     # （3）一括結合＆NaN除去
#     feat = pd.concat(log_feats + hl_feats, axis=1)
#     feat.dropna(inplace=True)
#     return feat

# # 特徴量作成
# # feat_train = make_ts_features_norm(df_train, WINDOW)
# feat_train = make_ts_features_norm(df_train_filtered, WINDOW)
# feat_valid = make_ts_features_norm(df_valid, WINDOW)


In [6]:
from tqdm import tqdm

# ── ステップ4：特徴量生成関数 ──────────────────────────
def make_ts_features_norm(df, window):
    """
    各時刻 t について、過去 window ステップの
    - ログリターン(log_ret)
    - 高低差比(hl_diff)
    を lag 毎に生成し、一括で concat して返す
    """
    log_ret = np.log(df['close'] / df['close'].shift(1))
    hl_diff = (df['high'] - df['low']) / df['close']

    # lag特徴量リストをプログレスバー付きで構築
    log_ret_feats = []
    hl_diff_feats = []
    for lag in tqdm(range(window), desc="Generating lag features"):
        log_ret_feats.append(log_ret.shift(lag).rename(f'log_ret_lag_{lag}'))
        hl_diff_feats.append(hl_diff.shift(lag).rename(f'hl_diff_lag_{lag}'))

    # 一括結合＆欠損削除
    feat = pd.concat(log_ret_feats + hl_diff_feats, axis=1)
    feat.dropna(inplace=True)
    return feat

# 特徴量作成
# feat_train = make_ts_features_norm(df_train, WINDOW)
feat_train = make_ts_features_norm(df_train_filtered, WINDOW)
feat_valid = make_ts_features_norm(df_valid, WINDOW)


Generating lag features:   0%|          | 0/288 [00:00<?, ?it/s]

Generating lag features: 100%|██████████| 288/288 [00:00<00:00, 499.94it/s]
Generating lag features: 100%|██████████| 288/288 [00:00<00:00, 696.33it/s] 


ステップ5：特徴量・ラベルの作成

In [7]:
def align_features_and_labels(feat_df, close_series, k):
    """
    feat_df と close_series からラベルを生成し、
    両者の共通インデックスのみ抽出して返す
    """
    labels = make_labels(close_series, k)
    common = feat_df.index.intersection(labels.index)
    return feat_df.loc[common], labels.loc[common]

ステップ6：データ準備・分割関数

In [8]:
k = 5
# 学習データ
X_tr, y_tr = align_features_and_labels(feat_train, df_train['close'], k)
# 検証＋テスト用データ
X_vf, y_vf = align_features_and_labels(feat_valid, df_valid['close'], k)
# valid を検証用/テスト用に 5:5 分割（分布均等・再現性確保）
X_val, X_test, y_val, y_test = train_test_split(
    X_vf, y_vf, test_size=0.5, random_state=SEED, stratify=y_vf
)


ステップ7：モデル学習・評価ループ

In [9]:
import numpy as np
from tqdm import tqdm
import lightgbm as lgb
from sklearn.metrics import precision_score, confusion_matrix, classification_report

# カスタム評価関数：macro precision
def precision_eval(preds, dataset):
    labels = dataset.get_label().astype(int)
    preds = preds.reshape(-1, 3)
    pred_labels = preds.argmax(axis=1)
    p = precision_score(labels, pred_labels, average='macro', zero_division=0)
    return 'macro_precision', p, True

# 学習進捗バー用コールバック作成
def make_progress_callback(total, desc):
    pbar = tqdm(total=total, desc=desc, leave=False)
    def _callback(env):
        pbar.update(1)
    return _callback, pbar

# (1) モデル学習
train_cb, train_pbar = make_progress_callback(PARAMS['n_estimators'], desc=f"Train k={k}")
model = lgb.train(
    PARAMS,
    lgb.Dataset(X_tr, label=y_tr),
    num_boost_round=PARAMS['n_estimators'],
    valid_sets=[lgb.Dataset(X_val, label=y_val)],
    feval=precision_eval,
    callbacks=[
        lgb.early_stopping(stopping_rounds=20, verbose=False),
        lgb.log_evaluation(period=0),
        train_cb
    ]
)
train_pbar.close()

# (2) テスト進捗バー + 予測
test_cb, test_pbar = make_progress_callback(len(X_test), desc=f"Test Predict k={k}")
y_pred_test = []
for prob in model.predict(X_test):
    test_pbar.update(1)
    y_pred_test.append(np.argmax(prob))
test_pbar.close()

# (3) 結果表示：混合行列
cm_test = confusion_matrix(y_test, y_pred_test, labels=[0, 1, 2])
print(f"\n=== k = {k} | Params: {PARAMS} ===")
print("Test Confusion Matrix:\n", cm_test)

# (4) 結果表示：precision, recall, f1-score
print("\nClassification Report:\n")
print(classification_report(
    y_test, y_pred_test,
    labels=[0, 1, 2],
    target_names=['Min(0)', 'Max(1)', 'Other(2)'],
    digits=4,
    zero_division=0
))

# (5) ファイル保存
with open(f"results/report_k{k}.txt", "w") as f:
    f.write("Test Confusion Matrix:\n")
    np.savetxt(f, cm_test, fmt='%d')
    f.write("\n\nClassification Report:\n")
    f.write(classification_report(
        y_test, y_pred_test,
        labels=[0, 1, 2],
        target_names=['Min(0)', 'Max(1)', 'Other(2)'],
        digits=4,
        zero_division=0
    ))
print(f"Saved → results/report_k{k}.txt")


Train k=5:   0%|          | 0/200 [00:00<?, ?it/s]

TypeError: Unknown type of parameter:class_weight, got:dict

In [None]:
from tqdm import tqdm
import lightgbm as lgb
from sklearn.metrics import confusion_matrix, classification_report, precision_score

# ハイパーパラメータ（1パターン、class_weightは除く）
PARAMS = {
    'num_leaves': 31,
    'learning_rate': 0.05,
    'n_estimators': 200,
    'objective': 'multiclass',
    'num_class': 3,
    'boosting_type': 'gbdt',
    'verbose': -1,
    'seed': SEED
}

# クラスごとの重み（0: Minima, 1: Maxima, 2: Other）
class_weights = {0: 10.0, 1: 10.0, 2: 1.0}
sample_weights = y_tr.map(class_weights)

# LightGBMデータセット（重み付き）
train_set = lgb.Dataset(X_tr, label=y_tr, weight=sample_weights)
valid_set = lgb.Dataset(X_val, label=y_val)

# カスタム評価関数：macro precision
def precision_eval(preds, dataset):
    labels = dataset.get_label().astype(int)
    preds = preds.reshape(-1, 3)
    pred_labels = preds.argmax(axis=1)
    p = precision_score(labels, pred_labels, average='macro', zero_division=0)
    return 'macro_precision', p, True

# 学習進捗バー用コールバック
def make_progress_callback(total, desc):
    pbar = tqdm(total=total, desc=desc, leave=False)
    def _callback(env):
        pbar.update(1)
    return _callback, pbar

# (1) モデル学習
train_cb, train_pbar = make_progress_callback(PARAMS['n_estimators'], desc=f"Train k={k}")
model = lgb.train(
    PARAMS,
    train_set,
    num_boost_round=PARAMS['n_estimators'],
    valid_sets=[valid_set],
    feval=precision_eval,
    callbacks=[
        lgb.early_stopping(stopping_rounds=20, verbose=False),
        lgb.log_evaluation(period=0),
        train_cb
    ]
)
train_pbar.close()

# (2) テスト予測（進捗バー付き）
test_cb, test_pbar = make_progress_callback(len(X_test), desc=f"Test Predict k={k}")
y_pred_test = []
for prob in model.predict(X_test):
    test_pbar.update(1)
    y_pred_test.append(np.argmax(prob))
test_pbar.close()

# (3) 混合行列とレポート出力
cm_test = confusion_matrix(y_test, y_pred_test, labels=[0, 1, 2])
print(f"\n=== k = {k} | Params: {PARAMS} ===")
print("Test Confusion Matrix:\n", cm_test)

report = classification_report(
    y_test, y_pred_test,
    labels=[0, 1, 2],
    target_names=['Min(0)', 'Max(1)', 'Other(2)'],
    digits=4,
    zero_division=0
)
print("\nClassification Report:\n", report)

# (4) 保存
with open(f"results/report_k{k}.txt", "w") as f:
    f.write("Test Confusion Matrix:\n")
    np.savetxt(f, cm_test, fmt='%d')
    f.write("\n\nClassification Report:\n")
    f.write(report)

print(f"Saved → results/report_k{k}.txt")


