
# Vectorized Adjustment & Stop-Loss Backtest (Full)

**구성**  
1) 벡터화된 상·하방 조정률 & 최적 인덱스 탐색  
2) (1)의 베스트 인덱스와 조정률로 **엄격부등호(`<`, `>`)** 밴드(필터) 탐색  
3) 필터 + **스탑로스 최적화** (매수/매도 각각 → 동시 적용)  
4) Backtest 성과지표(MDD 등), Equity Curve 시각화, Excel 리포트 저장(`backtest_report.xlsx`)


In [None]:

# =========================
# Setup & Imports
# =========================
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

PRIMARY_FILE = "mt-10yr0901-new.csv"   # 우선 사용
FALLBACK_FILE = "mt-10yr-new.csv"      # 대체 사용 (없으면 무시)

EXPECTED_COLS = [
    "Date", "Open", "High", "Low", "Close",
    "Index1", "Index2", "Index3", "Index4", "Index5",
    "Index6", "Index7", "Index8", "Index9", "Index10", "Index11"
]


In [None]:

# =========================
# Utils
# =========================
def load_price_csv() -> pd.DataFrame:
    file_to_use = PRIMARY_FILE if os.path.exists(PRIMARY_FILE) else (
        FALLBACK_FILE if os.path.exists(FALLBACK_FILE) else None
    )
    if file_to_use is None:
        raise FileNotFoundError(f"CSV 파일을 찾을 수 없습니다. '{PRIMARY_FILE}' 또는 '{FALLBACK_FILE}'")
    raw = pd.read_csv(file_to_use, header=None)
    if raw.shape[1] < len(EXPECTED_COLS):
        raise ValueError(f"CSV 열 개수({raw.shape[1]})가 기대치({len(EXPECTED_COLS)})보다 적습니다.")
    elif raw.shape[1] > len(EXPECTED_COLS):
        raw = raw.iloc[:, :len(EXPECTED_COLS)]
    raw.columns = EXPECTED_COLS
    raw["Date"] = pd.to_datetime(raw["Date"])
    return raw

def calculate_pct_change(df_: pd.DataFrame, col_name_current: str, col_name_previous: str) -> pd.Series:
    """(금일 / 전일) - 1) * 100"""
    return (df_[col_name_current] / df_[col_name_previous].shift(1) - 1) * 100

def ceil_to_nearest(value: float, step: float) -> float:
    """step 단위 올림 (예: step=0.01 -> 0.01 단위 올림)"""
    return np.ceil(value / step) * step


In [None]:

# =========================
# Load Data & Preprocessing
# =========================
df = load_price_csv()
df["Open_Pct_Change"] = calculate_pct_change(df, "Open", "Close")
for i in range(1, 12):
    df[f"Index{i}_Pct_Change"] = calculate_pct_change(df, f"Index{i}", f"Index{i}")
df["Prev_Close"] = df["Close"].shift(1)

display(df.head())
print("Rows:", len(df))


## Step 1) 벡터화된 상·하방 조정률 & 최적 인덱스 탐색

In [None]:

def find_optimal_adjustment_rate_vectorized(index_col: str, open_pct_col: str, df: pd.DataFrame,
                                            rate_min: float = 0.000, rate_max: float = 5.000, rate_step: float = 0.005):
    data = df[[index_col, open_pct_col, "Open", "Close"]].dropna().copy()

    idx   = data[index_col].to_numpy(dtype=np.float64)
    opct  = data[open_pct_col].to_numpy(dtype=np.float64)
    O     = data["Open"].to_numpy(dtype=np.float64)
    C     = data["Close"].to_numpy(dtype=np.float64)

    up    = (idx > 0)
    down  = (idx < 0)

    rates = np.arange(rate_min, rate_max, rate_step, dtype=np.float64)
    if rates.size == 0:
        raise ValueError("rate range is empty. check min/max/step.")

    adj = np.outer(rates, idx)     # (R, N)
    op  = opct[None, :]
    O_mat = O[None, :]
    C_mat = C[None, :]

    # 상승 구간 수익
    up_mask = up[None, :]
    rise_profit = np.where(up_mask & (op > adj),  C_mat - O_mat, 0.0)                 + np.where(up_mask & (op < adj),  O_mat - C_mat, 0.0)
    rise_profit_sum = rise_profit.sum(axis=1)  # (R,)

    # 하락 구간 수익
    down_mask = down[None, :]
    fall_profit = np.where(down_mask & (op < adj),  O_mat - C_mat, 0.0)                 + np.where(down_mask & (op > adj),  C_mat - O_mat, 0.0)
    fall_profit_sum = fall_profit.sum(axis=1)  # (R,)

    r_idx = int(np.argmax(rise_profit_sum))
    f_idx = int(np.argmax(fall_profit_sum))

    best_rising_rate    = float(rates[r_idx])
    best_rising_profit  = float(rise_profit_sum[r_idx])
    best_falling_rate   = float(rates[f_idx])
    best_falling_profit = float(fall_profit_sum[f_idx])

    return best_rising_rate, best_rising_profit, best_falling_rate, best_falling_profit

def step1_optimal_rates(df: pd.DataFrame, rate_min=0.000, rate_max=5.000, rate_step=0.005) -> pd.DataFrame:
    rows = []
    for i in range(1, 12):
        index_col = f"Index{i}_Pct_Change"
        rising_rate, rising_profit, falling_rate, falling_profit = find_optimal_adjustment_rate_vectorized(
            index_col=index_col, open_pct_col="Open_Pct_Change", df=df,
            rate_min=rate_min, rate_max=rate_max, rate_step=rate_step
        )
        total_profit = rising_profit + falling_profit
        rows.append({
            "Index": f"Index{i}",
            "Rising Optimal Rate": rising_rate,
            "Rising Max Profit": rising_profit,
            "Falling Optimal Rate": falling_rate,
            "Falling Max Profit": falling_profit,
            "Total Profit": total_profit
        })
    return pd.DataFrame(rows)

step1 = step1_optimal_rates(df, rate_min=0.000, rate_max=5.000, rate_step=0.005)
display(step1.sort_values("Total Profit", ascending=False).reset_index(drop=True))
best_row = step1.sort_values("Total Profit", ascending=False).iloc[0]
best_index = best_row["Index"]
rising_rate = float(best_row["Rising Optimal Rate"])
falling_rate = float(best_row["Falling Optimal Rate"])

print(f"[Step1] Best Index: {best_index}, rising_rate={rising_rate:.3f}, falling_rate={falling_rate:.3f}")


## Step 2) 베스트 인덱스 & 조정률로 엄격부등호 밴드(필터) 탐색

In [None]:

def simulate_profit_with_band(df: pd.DataFrame, index_col: str, rate: float,
                              band_min: float, band_max: float) -> float:
    """밴드 (band_min < idx < band_max) 내에서만 거래. 비교연산은 <, >만 사용."""
    local = df[[index_col, "Open_Pct_Change", "Open", "Close"]].dropna().copy()
    idx = local[index_col].to_numpy(dtype=np.float64)
    opct = local["Open_Pct_Change"].to_numpy(dtype=np.float64)
    O = local["Open"].to_numpy(dtype=np.float64)
    C = local["Close"].to_numpy(dtype=np.float64)

    in_band = (idx > band_min) & (idx < band_max)

    # 상승일
    up = (idx > 0) & in_band
    adj_up = idx * rate
    long_mask = up & (opct > adj_up)
    short_mask = up & (opct < adj_up)
    profit = (C - O) * long_mask + (O - C) * short_mask

    # 하락일
    down = (idx < 0) & in_band
    adj_dn = idx * rate
    short_mask2 = down & (opct < adj_dn)
    long_mask2 = down & (opct > adj_dn)
    profit += (O - C) * short_mask2 + (C - O) * long_mask2

    return float(profit.sum())

def step2_find_bands(df: pd.DataFrame, best_index: str, rising_rate: float, falling_rate: float,
                     grid: np.ndarray | None = None):
    if grid is None:
        grid = np.round(np.arange(-3.0, 3.0001, 0.01), 2)  # 탐색 공간

    index_col = f"{best_index}_Pct_Change"
    best_r_band = (None, None, -np.inf)
    best_f_band = (None, None, -np.inf)

    for i, lo in enumerate(grid[:-1]):
        for hi in grid[i+1:]:
            if not lo < hi:  # 엄격부등호
                continue
            p_r = simulate_profit_with_band(df, index_col, rising_rate, lo, hi)
            if p_r > best_r_band[2]:
                best_r_band = (lo, hi, p_r)
            p_f = simulate_profit_with_band(df, index_col, falling_rate, lo, hi)
            if p_f > best_f_band[2]:
                best_f_band = (lo, hi, p_f)

    result = {
        "Rising Band Min": best_r_band[0],
        "Rising Band Max": best_r_band[1],
        "Rising Band Profit": best_r_band[2],
        "Falling Band Min": best_f_band[0],
        "Falling Band Max": best_f_band[1],
        "Falling Band Profit": best_f_band[2],
    }
    return result

bands = step2_find_bands(df, best_index, rising_rate, falling_rate)
bands


## Step 3) 필터 + 스탑로스 최적화

In [None]:

def calculate_filtered_trades_with_stop_loss(
    df_: pd.DataFrame,
    index_col: str,
    open_pct_col: str,
    open_col: str,
    rising_rate: float,
    falling_rate: float,
    rising_filter_min: float,
    rising_filter_max: float,
    falling_filter_min: float,
    falling_filter_max: float,
    stop_loss_buy_ratio: float,
    stop_loss_sell_ratio: float,
    tick_step: float = 0.01
) -> float:
    profit = 0.0
    need_cols = ["Open", "High", "Low", "Close", "Prev_Close", index_col, open_pct_col]
    local = df_[need_cols].dropna().copy()

    OPEN = 0; HIGH = 1; LOW = 2; CLOSE = 3; PREV = 4; IDX = 5; OPCT = 6

    for row in local.itertuples(index=False, name=None):
        O = float(row[OPEN]); H = float(row[HIGH]); L = float(row[LOW]); C = float(row[CLOSE]); P = float(row[PREV])
        idx_chg = float(row[IDX]); opct    = float(row[OPCT])

        if idx_chg > 0:
            if (rising_filter_min < idx_chg < rising_filter_max):
                adj = idx_chg * float(rising_rate)
                if opct > adj:  # long
                    if stop_loss_buy_ratio and stop_loss_buy_ratio > 0:
                        stop_price = O - ceil_to_nearest(P * stop_loss_buy_ratio, tick_step)
                        if round(L,3) <= round(stop_price,3):
                            profit += (stop_price - O)
                        else:
                            profit += (C - O)
                    else:
                        profit += (C - O)
                elif opct < adj:  # short
                    if stop_loss_sell_ratio and stop_loss_sell_ratio > 0:
                        stop_price = O + ceil_to_nearest(P * stop_loss_sell_ratio, tick_step)
                        if round(H,3) >= round(stop_price,3):
                            profit += (O - stop_price)
                        else:
                            profit += (O - C)
                    else:
                        profit += (O - C)

        elif idx_chg < 0:
            if (falling_filter_min < idx_chg < falling_filter_max):
                adj = idx_chg * float(falling_rate)
                if opct < adj:  # short
                    if stop_loss_sell_ratio and stop_loss_sell_ratio > 0:
                        stop_price = O + ceil_to_nearest(P * stop_loss_sell_ratio, tick_step)
                        if round(H,3) >= round(stop_price,3):
                            profit += (O - stop_price)
                        else:
                            profit += (O - C)
                    else:
                        profit += (O - C)
                elif opct > adj:  # long
                    if stop_loss_buy_ratio and stop_loss_buy_ratio > 0:
                        stop_price = O - ceil_to_nearest(P * stop_loss_buy_ratio, tick_step)
                        if round(L,3) <= round(stop_price,3):
                            profit += (stop_price - O)
                        else:
                            profit += (C - O)
                    else:
                        profit += (C - O)

    return float(profit)

def find_optimal_stop_loss(
    df_: pd.DataFrame,
    index_col: str,
    open_pct_col: str,
    open_col: str,
    rising_rate: float,
    falling_rate: float,
    rising_filter_min: float,
    rising_filter_max: float,
    falling_filter_min: float,
    falling_filter_max: float,
    stop_loss_type: str = "buy",
    stop_min: float = 0.00050,
    stop_max: float = 0.05000,
    stop_step: float = 0.00050,
    tick_step: float = 0.01
):
    best_profit = -np.inf
    best_stop = None

    stops = np.arange(stop_min, stop_max + 1e-12, stop_step)
    for s in stops:
        if stop_loss_type == "buy":
            p = calculate_filtered_trades_with_stop_loss(
                df_, index_col, open_pct_col, open_col,
                rising_rate, falling_rate,
                rising_filter_min, rising_filter_max,
                falling_filter_min, falling_filter_max,
                stop_loss_buy_ratio=s,
                stop_loss_sell_ratio=0.0,
                tick_step=tick_step
            )
        else:
            p = calculate_filtered_trades_with_stop_loss(
                df_, index_col, open_pct_col, open_col,
                rising_rate, falling_rate,
                rising_filter_min, rising_filter_max,
                falling_filter_min, falling_filter_max,
                stop_loss_buy_ratio=0.0,
                stop_loss_sell_ratio=s,
                tick_step=tick_step
            )

        if p > best_profit:
            best_profit = p
            best_stop = s

    return best_stop, float(best_profit)

index_col = f"{best_index}_Pct_Change"
open_pct_col = "Open_Pct_Change"

stop_buy, profit_buy = find_optimal_stop_loss(
    df, index_col, open_pct_col, "Open",
    rising_rate, falling_rate,
    bands["Rising Band Min"], bands["Rising Band Max"],
    bands["Falling Band Min"], bands["Falling Band Max"],
    stop_loss_type="buy",
    stop_min=0.00050, stop_max=0.05000, stop_step=0.00050
)
stop_sell, profit_sell = find_optimal_stop_loss(
    df, index_col, open_pct_col, "Open",
    rising_rate, falling_rate,
    bands["Rising Band Min"], bands["Rising Band Max"],
    bands["Falling Band Min"], bands["Falling Band Max"],
    stop_loss_type="sell",
    stop_min=0.00050, stop_max=0.05000, stop_step=0.00050
)

final_profit = calculate_filtered_trades_with_stop_loss(
    df, index_col, open_pct_col, "Open",
    rising_rate, falling_rate,
    bands["Rising Band Min"], bands["Rising Band Max"],
    bands["Falling Band Min"], bands["Falling Band Max"],
    stop_loss_buy_ratio=stop_buy,
    stop_loss_sell_ratio=stop_sell,
    tick_step=0.01
)

print(f"[Step3] 최적 스탑로스 (매수): {stop_buy:.3%}, 해당 수익: {profit_buy:.2f}")
print(f"[Step3] 최적 스탑로스 (매도): {stop_sell:.3%}, 해당 수익: {profit_sell:.2f}")
print(f"[Step3] 최종 수익 (필터+스탑): {final_profit:.2f}")


## Backtest: Equity Curve & Performance

In [None]:

def equity_curve_from_trades(df_: pd.DataFrame, index_col: str, open_pct_col: str,
                             open_col: str, rising_rate: float, falling_rate: float,
                             rising_filter_min: float, rising_filter_max: float,
                             falling_filter_min: float, falling_filter_max: float,
                             stop_loss_buy_ratio: float, stop_loss_sell_ratio: float) -> pd.DataFrame:
    need_cols = ["Date", "Open", "High", "Low", "Close", "Prev_Close", index_col, open_pct_col]
    local = df_[need_cols].dropna().copy()

    OPEN = 1; HIGH = 2; LOW = 3; CLOSE = 4; PREV = 5; IDX = 6; OPCT = 7

    daily_pnl = []
    for row in local.itertuples(index=False, name=None):
        date = row[0]
        O = float(row[OPEN]); H = float(row[HIGH]); L = float(row[LOW]); C = float(row[CLOSE]); P = float(row[PREV])
        idx_chg = float(row[IDX]); opct = float(row[OPCT])

        pnl = 0.0
        if idx_chg > 0 and (bands["Rising Band Min"] < idx_chg < bands["Rising Band Max"]):
            adj = idx_chg * rising_rate
            if opct > adj:  # long
                if stop_loss_buy_ratio and stop_loss_buy_ratio > 0:
                    stop_price = O - ceil_to_nearest(P * stop_loss_buy_ratio, 0.01)
                    pnl = (stop_price - O) if (L <= stop_price) else (C - O)
                else:
                    pnl = (C - O)
            elif opct < adj:  # short
                if stop_loss_sell_ratio and stop_loss_sell_ratio > 0:
                    stop_price = O + ceil_to_nearest(P * stop_loss_sell_ratio, 0.01)
                    pnl = (O - stop_price) if (H >= stop_price) else (O - C)
                else:
                    pnl = (O - C)

        elif idx_chg < 0 and (bands["Falling Band Min"] < idx_chg < bands["Falling Band Max"]):
            adj = idx_chg * falling_rate
            if opct < adj:  # short
                if stop_loss_sell_ratio and stop_loss_sell_ratio > 0:
                    stop_price = O + ceil_to_nearest(P * stop_loss_sell_ratio, 0.01)
                    pnl = (O - stop_price) if (H >= stop_price) else (O - C)
                else:
                    pnl = (O - C)
            elif opct > adj:  # long
                if stop_loss_buy_ratio and stop_loss_buy_ratio > 0:
                    stop_price = O - ceil_to_nearest(P * stop_loss_buy_ratio, 0.01)
                    pnl = (stop_price - O) if (L <= stop_price) else (C - O)
                else:
                    pnl = (C - O)

        daily_pnl.append((date, pnl))

    ec = pd.DataFrame(daily_pnl, columns=["Date", "PnL"])
    ec["Equity"] = ec["PnL"].cumsum()
    return ec

def performance_summary(equity_df: pd.DataFrame) -> dict:
    if equity_df.empty:
        return {"Total PnL": 0.0, "Trades": 0, "Win Rate": 0.0, "MDD": 0.0, "MDD(%)": 0.0}
    pnl = equity_df["PnL"]; eq = equity_df["Equity"]
    total = float(pnl.sum()); trades = int((pnl != 0).sum()); wins = int((pnl > 0).sum())
    win_rate = float(wins / trades) if trades > 0 else 0.0
    rolling_max = eq.cummax(); drawdown = eq - rolling_max
    mdd = float(drawdown.min())
    mdd_pct = float((mdd / rolling_max.where(rolling_max != 0).max()) * 100.0) if rolling_max.max() != 0 else 0.0
    return {"Total PnL": total, "Trades": trades, "Win Rate": win_rate, "MDD": mdd, "MDD(%)": mdd_pct}

eq = equity_curve_from_trades(
    df, index_col, open_pct_col, "Open",
    rising_rate, falling_rate,
    bands["Rising Band Min"], bands["Rising Band Max"],
    bands["Falling Band Min"], bands["Falling Band Max"],
    stop_loss_buy_ratio=stop_buy, stop_loss_sell_ratio=stop_sell
)
perf = performance_summary(eq)
perf


In [None]:

# =========================
# Plot Equity Curve
# =========================
plt.figure()
plt.plot(eq["Date"], eq["Equity"])
plt.title("Equity Curve")
plt.xlabel("Date")
plt.ylabel("Equity")
plt.tight_layout()
plt.show()


In [None]:

# =========================
# Export: Excel Report
# =========================
def export_excel_report(path: str, step1_df: pd.DataFrame, bands_info: dict,
                        stops_info: dict, equity_df: pd.DataFrame, perf: dict):
    with pd.ExcelWriter(path, engine="xlsxwriter") as xw:
        step1_df.to_excel(xw, index=False, sheet_name="OptimalRates")
        pd.DataFrame([bands_info]).to_excel(xw, index=False, sheet_name="Bands")
        pd.DataFrame([stops_info]).to_excel(xw, index=False, sheet_name="StopLoss")
        equity_df.to_excel(xw, index=False, sheet_name="EquityCurve")
        pd.DataFrame([perf]).to_excel(xw, index=False, sheet_name="Summary")

stops_info = {
    "StopLoss Buy": stop_buy,
    "StopLoss Sell": stop_sell,
    "Profit Buy Only": profit_buy,
    "Profit Sell Only": profit_sell,
    "Final Profit": float(eq["PnL"].sum())
}
export_excel_report("backtest_report.xlsx", step1, bands, stops_info, eq, perf)
print("Saved: backtest_report.xlsx")
