In [None]:
# ============================================================
# Hyperparam sweep: Layers × LR × Dropout on sl + fwd (TorchDNN)
# ============================================================
import os, sys, re, json, warnings, pandas as pd
warnings.filterwarnings("ignore")

from rolling_framework import Machine  # 프로젝트의 핵심 API

# --------------------- 경로/기본설정 ------------------------
DATA_DIR      = "data/"
Y_FILE        = os.path.join(DATA_DIR, "exrets.csv")
SLOPE_FILE    = os.path.join(DATA_DIR, "slope.csv")
FWD_FILE      = os.path.join(DATA_DIR, "fwds.csv")  # fwd 해석: 수익률 레벨
OUT_DIR       = "./output_dnn_sweep"; os.makedirs(OUT_DIR, exist_ok=True)

BURN_START, BURN_END     = "197108", "199001"
PERIOD_START, PERIOD_END = "197108", "202312"
HORIZON = 12
MATURITIES = ["xr_2","xr_3","xr_5","xr_7","xr_10"]

def _load_csv(path, name):
    try:
        return pd.read_csv(path, index_col="Time")
    except FileNotFoundError as e:
        sys.exit(f"[ERROR] missing {name} → {e.filename}")

def _align_time(*dfs):
    idx=None
    for d in dfs: idx = d.index if idx is None else idx.intersection(d.index)
    return [d.loc[idx].sort_index() for d in dfs]

# --------------------- 데이터 로드 --------------------------
y   = _load_csv(Y_FILE,   "exrets")
sl  = _load_csv(SLOPE_FILE, "slope")
fwd = _load_csv(FWD_FILE,   "fwds")

y_cols = [c for c in MATURITIES if c in y.columns]
if not y_cols: sys.exit("[ERROR] MATURITIES not in exrets")
y = y[y_cols]

y, sl, fwd = _align_time(y, sl, fwd)

# 실험용 입력: sl + fwd
X = pd.concat([sl, fwd], axis=1)

print("✓ shapes:", {k:v.shape for k,v in [("y",y),("sl",sl),("fwd",fwd),("X",X)]})

# --------------------- 실험 그리드 --------------------------
# 레이어: (16), (16,8)
# 학습률: 1e-3, 5e-4
# 드롭아웃: 0.0, 0.2
GRID = {
    "hidden":   [(16,), (16,8)],
    "lr":       [1e-3, 5e-4],
    "dropout":  [0.0, 0.2],
}
# weight_decay(l2)는 고정(예: 1e-4). 필요하면 아래 리스트로 늘리면 됨.
L2 = 1e-4

# --------------------- 러너 -------------------------------
def run_one_combo(hid, lr, do, seed=42, tag=None):
    opt = {
        "scaler": "standard",
        "hidden": tuple(hid),
        "dropout": float(do),
        "lr": float(lr),
        "wd": float(L2),
        "bs": 32,
        "epochs": 200,
        "patience": 20,
        "seed": int(seed),
    }
    # GridSearchCV를 쓰지만 여기서는 하나의 설정만 전달해서 ‘고정’ 효과
    grid = {
        "dnn__module__hidden":          [tuple(hid)],
        "dnn__module__dropout":         [float(do)],
        "dnn__optimizer__lr":           [float(lr)],
        "dnn__optimizer__weight_decay": [float(L2)],
        "dnn__batch_size":              [32],
        "dnn__max_epochs":              [200],
        "dnn__train_split":             [None],  # Machine/Strategy 기본 ValidSplit 사용시 None 그대로 OK
        "dnn__patience":                [20],
    }

    m = Machine(
        X, y, "TorchDNN",
        option=opt, params_grid=grid,
        burn_in_start=BURN_START, burn_in_end=BURN_END,
        period=[PERIOD_START, PERIOD_END], forecast_horizon=HORIZON
    )
    if tag is None:
        tag = f"hid{hid}_lr{lr}_do{do}"
    print(f"\n▶ run {tag}")
    m.training()
    r2  = float(m.R2OOS())
    mse = float(m.MSEOOS())
    return {"tag": tag, "hidden": hid, "lr": lr, "dropout": do, "l2": L2, "R2OOS": r2, "MSEOOS": mse}

# --------------------- 실행 & 집계 -------------------------
rows = []
for hid in GRID["hidden"]:
    for lr in GRID["lr"]:
        for do in GRID["dropout"]:
            res = run_one_combo(hid, lr, do, seed=42)
            rows.append(res)

df = pd.DataFrame(rows).sort_values(["R2OOS"], ascending=False)
csv_path = os.path.join(OUT_DIR, "sweep_results.csv")
df.to_csv(csv_path, index=False)

print("\n=== Top by R2OOS ===")
print(df.head(5))
print(f"\nSaved: {csv_path}")

# 가장 좋은 설정을 JSON으로도 저장
best = df.iloc[0].to_dict()
with open(os.path.join(OUT_DIR, "best_config.json"), "w") as f:
    json.dump(best, f, indent=2)
print("Best config:", best)

▶ OLS-SL_nonDNN


OLS rolling:  93%|█████████▎| 485/520 [00:08<00:00, 59.08it/s]