ELS 엔진 클래스 (GBM + 할인 + 가격 + 델타)

In [None]:
import numpy as np
import pandas as pd
import os

# ============================================================
# 1) ELS2Star4838 엔진
#    - 2자산 GBM
#    - 일별 sigma, rho, (r-q), 제로커브를 받아서
#      오늘 기준 가격/델타 계산
# ============================================================

class ELS2Star4838:
    def __init__(self,
                 T=3.0,
                 gap=0.5,
                 step_down=(0.90, 0.90, 0.85, 0.85, 0.80),
                 redemp_pay=(1.0275, 1.0550, 1.0825, 1.1100, 1.1375),
                 maturity_pay=1.1650,
                 ki=0.60,
                 sigma=None,           # [sigma_kospi, sigma_samsung]
                 rho=0.7477,
                 drift=None,           # [r_q_kospi, r_q_samsung]
                 S0=(1.0, 1.0),
                 zero_T=None,          # 제로커브 만기 (년 단위, ex. [0.25,0.5,1,2,3])
                 zero_r=None,          # 각 만기 제로금리 (연율, 소수; ex. 0.01 = 1%)
                 n_paths=200000,
                 seed=42):

        self.T = T
        self.gap = gap
        self.obs = np.arange(gap, T + 1e-12, gap)[:len(step_down)]

        self.step_down   = np.array(step_down)
        self.redemp_pay  = np.array(redemp_pay) * 10000
        self.maturity_pay = maturity_pay * 10000
        self.ki = ki

        self.sigma = np.array(sigma, dtype=float)
        self.rho   = float(rho)
        self.drift = np.array(drift, dtype=float)
        self.S0    = np.array(S0, dtype=float)

        # 공분산 → Cholesky
        corr = np.array([[1.0, self.rho],
                         [self.rho, 1.0]])
        D   = np.diag(self.sigma)
        cov = D @ corr @ D
        self.L = np.linalg.cholesky(cov)

        # 제로커브
        self.zero_T = np.array(zero_T, dtype=float)   # [0.25,0.5,1,2,3] 이런 식
        self.zero_r = np.array(zero_r, dtype=float)   # 각 만기 제로금리 (소수)

        self.n_paths = n_paths
        self.rng = np.random.default_rng(seed)

    # ------------------------------------------
    # 제로커브 기반 할인함수 DF(0→tau)
    # log DF를 선형보간 (log-linear DF)
    # ------------------------------------------
    def discount(self, tau):
        tau = np.asarray(tau, dtype=float)
        # 노드에서의 log DF
        log_df_nodes = -self.zero_r * self.zero_T     # r * T
        # tau에 대해 보간
        log_df_tau = np.interp(tau, self.zero_T, log_df_nodes)
        return np.exp(log_df_tau)

    # ------------------------------------------
    # GBM 시뮬레이션 (drift = r-q)
    # ------------------------------------------
    def _simulate(self, S0, T_remain, steps_per_year=252):
        dt = 1.0 / steps_per_year
        n_steps = int(T_remain * steps_per_year)

        paths = np.zeros((self.n_paths, n_steps + 1, 2))
        paths[:, 0, :] = S0

        drift_term = (self.drift - 0.5 * self.sigma**2) * dt
        sdt = np.sqrt(dt)

        Z  = self.rng.standard_normal((self.n_paths, n_steps, 2))
        dW = Z @ self.L.T    # 상관 구조 입힌 Brownian 증분 (단위분산)

        for t in range(n_steps):
            incr = np.exp(drift_term + sdt * dW[:, t, :])
            paths[:, t+1, :] = paths[:, t, :] * incr

        return paths

    # ------------------------------------------
    # 가격 계산 (오늘 t0 기준)
    # ------------------------------------------
    def price(self, t0, S0_current, past_min, steps_per_year=252):
        """
        t0         : 발행 후 경과연수 (예: 1.0 = 1년)
        S0_current : (S1/S1_start, S2/S2_start)
        past_min   : (과거 min1/S1_start, min2/S2_start)
        """
        T_remain = self.T - t0
        if T_remain <= 0:
            raise ValueError("t0 must be earlier than maturity.")

        S0_current = np.array(S0_current, dtype=float)
        past_min   = np.array(past_min, dtype=float)

        # 남은 조기상환 관측일
        remain_obs = self.obs[self.obs > t0]
        obs_tau = remain_obs - t0   # "오늘" 기준 남은 기간

        # 경로 시뮬레이션
        paths = self._simulate(S0_current, T_remain, steps_per_year)
        # shape: (n_paths, n_steps+1, 2)
        n = len(paths)

        # 과거/미래 최소값 합쳐서 녹인 여부 계산
        future_min = paths.min(axis=1).min(axis=1)     # 각 경로별 두 자산 전체기간 최솟값
        total_min  = np.minimum(future_min, np.min(past_min))
        ki_touch   = total_min < self.ki

        payoff   = np.zeros(n)
        redeemed = np.zeros(n, dtype=bool)

        # ----- 남은 조기상환일 체크 -----
        # obs_idx: 시뮬레이션 시간 index
        obs_idx = ((remain_obs - t0) * steps_per_year).astype(int)

        for i, idx in enumerate(obs_idx):
            # 아직 상환 안된 경로 중 배리어 위에 있는 것
            cond = (~redeemed) & np.all(paths[:, idx, :] >= self.step_down[i + len(self.obs) - len(remain_obs)], axis=1)
            if not np.any(cond):
                continue

            df = self.discount(obs_tau[i])
            payoff[cond]   = self.redemp_pay[i + len(self.obs) - len(remain_obs)] * df
            redeemed[cond] = True

        # ----- 만기 처리 (조기상환 안된 경로) -----
        not_red = ~redeemed
        if np.any(not_red):
            worst_T = paths[not_red, -1, :].min(axis=1)    # 만기 worst-of
            ki_hit  = ki_touch[not_red]

            p = np.zeros_like(worst_T)

            # 1) worst_T >= 0.80 : 쿠폰 지급
            p[worst_T >= 0.80] = self.maturity_pay

            # 2) worst_T < 0.80 이지만 KI 미터치 : 쿠폰 그대로 (설계별로 조정 가능)
            p[(worst_T < 0.80) & (~ki_hit)] = self.maturity_pay

            # 3) worst_T < 0.80 이고 KI 터짐 : 원금 × worst_T
            p[(worst_T < 0.80) & (ki_hit)] = 10000 * worst_T[(worst_T < 0.80) & (ki_hit)]

            df_T = self.discount(T_remain)
            payoff[not_red] = p * df_T

        # Monte Carlo 평균/표준오차
        price_mean = payoff.mean()
        price_se   = payoff.std(ddof=1) / np.sqrt(n)

        return price_mean, price_se

    # ------------------------------------------
    # 델타 (기초자산 2개 각각에 대해 중앙차분)
    # ------------------------------------------
    def delta(self, t0, S0_current, past_min, h=0.01, steps_per_year=252):
        S1, S2 = S0_current

        # 공통 난수 쓰기 위해 RNG 보존
        original_rng = self.rng
        self.rng = np.random.default_rng(12345)

        # Δ1 (KOSPI)
        V_up,  _ = self.price(t0, (S1*(1+h), S2), past_min, steps_per_year)
        V_dn,  _ = self.price(t0, (S1*(1-h), S2), past_min, steps_per_year)
        delta1 = (V_up - V_dn) / (2 * S1 * h)

        # Δ2 (Samsung)
        V_up2, _ = self.price(t0, (S1, S2*(1+h)), past_min, steps_per_year)
        V_dn2, _ = self.price(t0, (S1, S2*(1-h)), past_min, steps_per_year)
        delta2 = (V_up2 - V_dn2) / (2 * S2 * h)

        # RNG 원상복구
        self.rng = original_rng

        return delta1, delta2


ELS_Sheet1에서 데이터 읽고, 일별 가격+델타 계산

In [None]:
# ============================================================
# 2) 데이터 로딩 및 전처리
# ============================================================

df = pd.read_csv("ELS_Sheet1(EWMA).csv")   # 로컬에서는 여기 경로만 수정
df['date'] = pd.to_datetime(df['date'])

num_cols = [
    'samsung', 'kospi200',
    'vol_samsung', 'vol_kospi',
    'corr',
    'zero_rate_3M', 'zero_rate_6M', 'zero_rate_1Y',
    'zero_rate_2Y', 'zero_rate_3Y'
]

for col in num_cols:
    df[col] = (
        df[col].astype(str)
        .str.replace(',', '')
        .str.strip()
        .replace('', np.nan)
        .astype(float)
    )

# 발행일 이후 필터링
issue_date = pd.to_datetime("2021-10-22")
filtered_df = df[df['date'] >= issue_date].reset_index(drop=True)

zero_rate_cols = ['zero_rate_3M', 'zero_rate_6M', 'zero_rate_1Y',
                  'zero_rate_2Y', 'zero_rate_3Y']
horizons = np.array([0.25, 0.5, 1.0, 2.0, 3.0])

base_row = filtered_df.iloc[0]
samsung_start = base_row['samsung']
kospi_start   = base_row['kospi200']

results = []


# ------------------------------------------------------------
# 조기상환일 (실제 계약서 기준, 고정값)
# ------------------------------------------------------------
autocall_dates = [
    pd.Timestamp("2022-04-22"),
    pd.Timestamp("2022-10-24"),
    pd.Timestamp("2023-04-24"),
    pd.Timestamp("2023-10-23"),
    pd.Timestamp("2024-04-22"),
]
obs_years = np.array([0.5, 1.0, 1.5, 2.0, 2.5])


# ============================================================
# 이어서 돌리기 기능
# ============================================================

out_path = "ELS_result_EWNA.csv"   # 로컬 저장경로

if os.path.exists(out_path):
    old = pd.read_csv(out_path)
    old['date'] = pd.to_datetime(old['date'])
    last_saved_date = old['date'].max()
    print("기존 결과 불러옴:", last_saved_date.date())

    # last_saved_date보다 큰 날짜만 계산
    filtered_df = filtered_df[filtered_df['date'] > last_saved_date].reset_index(drop=True)

    results = old.to_dict("records")

else:
    print("기존 결과 없음 → 처음부터 시작")
    results = []


# ============================================================
# 3) 메인 루프
# ============================================================

for i in range(len(filtered_df)):

    row = filtered_df.iloc[i]
    date_i = row['date']
    print(f"{date_i.date()} 계산 시작")

    # ====== 오늘까지 녹인 최소값 (발행일 포함 강제) ======
    samsung_min = min(samsung_start, filtered_df.loc[:i, 'samsung'].min())
    kospi_min   = min(kospi_start,   filtered_df.loc[:i, 'kospi200'].min())

    # ====== 상대가격 ======
    S0_current = (
        row['kospi200'] / kospi_start,
        row['samsung']  / samsung_start
    )

    past_min = (
        kospi_min   / kospi_start,
        samsung_min / samsung_start
    )

    # ====== 경과연수 ======
    t0 = (date_i - issue_date).days / 365.0

    # ====== 제로커브 ======
    zero_i = row[zero_rate_cols].to_numpy(dtype=float) / 100.0

    # ====== 모델 생성 ======
    model = ELS2Star4838(
        sigma = np.array([row['vol_kospi'], row['vol_samsung']]),
        rho   = row['corr'],
        drift = np.array([0.0, 0.0]),
        zero_T = horizons,
        zero_r = zero_i,
        n_paths = 200000,
        seed = 42
    )


    # ------------------------------------------------------
    # A) 조기상환 체크
    # ------------------------------------------------------
    if date_i in autocall_dates:

        level_idx = autocall_dates.index(date_i)
        barrier = model.step_down[level_idx]

        cond_real = (S0_current[0] >= barrier) and (S0_current[1] >= barrier)

        if cond_real:

            tau = obs_years[level_idx] - t0
            payoff_today = model.redemp_pay[level_idx] * model.discount(tau)

            results.append({
                "date": date_i,
                "price": payoff_today,
                "price_se": 0.0,
                "delta_kospi_norm": 0.0,
                "delta_samsung_norm": 0.0,
                "delta_kospi_mkt": 0.0,
                "delta_samsung_mkt": 0.0,
            })

            pd.DataFrame(results).to_csv(out_path, index=False, encoding="utf-8-sig")
            print(f"조기상환 발생! {date_i.date()} 지급액={payoff_today:.4f}")
            break


    # ------------------------------------------------------
    # B) 일반 가격 / 델타 계산
    # ------------------------------------------------------

    price_mean, price_se = model.price(t0, S0_current, past_min)
    dK, dS = model.delta(t0, S0_current, past_min)

    dK_mkt = dK / kospi_start
    dS_mkt = dS / samsung_start

    results.append({
        "date": date_i,
        "price": price_mean,
        "price_se": price_se,
        "delta_kospi_norm": dK,
        "delta_samsung_norm": dS,
        "delta_kospi_mkt": dK_mkt,
        "delta_samsung_mkt": dS_mkt,
    })

    # ====== 매일 저장 ======
    pd.DataFrame(results).to_csv(out_path, index=False, encoding="utf-8-sig")
    print(f"{date_i.date()} 저장 완료")


print("전체 계산 완료!")


기존 결과 불러옴: 2021-11-30
2021-12-01 계산 시작
2021-12-01 저장 완료
2021-12-02 계산 시작
2021-12-02 저장 완료
2021-12-03 계산 시작
2021-12-03 저장 완료
2021-12-06 계산 시작
2021-12-06 저장 완료
2021-12-07 계산 시작
2021-12-07 저장 완료
2021-12-08 계산 시작
2021-12-08 저장 완료
2021-12-09 계산 시작
2021-12-09 저장 완료
2021-12-10 계산 시작
2021-12-10 저장 완료
2021-12-13 계산 시작
2021-12-13 저장 완료
2021-12-14 계산 시작
2021-12-14 저장 완료
2021-12-15 계산 시작
2021-12-15 저장 완료
2021-12-16 계산 시작
2021-12-16 저장 완료
2021-12-17 계산 시작
2021-12-17 저장 완료
2021-12-20 계산 시작
2021-12-20 저장 완료
2021-12-21 계산 시작
2021-12-21 저장 완료
2021-12-22 계산 시작
2021-12-22 저장 완료
2021-12-23 계산 시작
2021-12-23 저장 완료
2021-12-24 계산 시작
2021-12-24 저장 완료
2021-12-27 계산 시작
2021-12-27 저장 완료
2021-12-28 계산 시작
2021-12-28 저장 완료
2021-12-29 계산 시작
2021-12-29 저장 완료
2021-12-30 계산 시작
2021-12-30 저장 완료
2022-01-03 계산 시작
2022-01-03 저장 완료
2022-01-04 계산 시작
2022-01-04 저장 완료
2022-01-05 계산 시작
2022-01-05 저장 완료
2022-01-06 계산 시작
2022-01-06 저장 완료
2022-01-07 계산 시작
2022-01-07 저장 완료
2022-01-10 계산 시작
2022-01-10 저장 완료
2022-01-11 계산 시작
2022-01-1

KeyboardInterrupt: 

In [None]:
import os
os.getcwd()


'/Users/bag-yuli/Documents/UFEA/ELS 프로젝트'