<a href="https://colab.research.google.com/github/nambaksa/tetris/blob/main/LSMC(251118).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# 1. 라이브러리 설치 및 임포트
!pip install openpyxl

import pandas as pd
import numpy as np
from google.colab import files
import io
import time
import warnings

# NumPy 경고 무시
warnings.filterwarnings('ignore', category=RuntimeWarning)

def run_lsmc_model(input_filename):
    try:
        print(f"'{input_filename}' 파일 로딩 중...")

        # 1. 기본정보 읽기
        df_input_sheet = pd.read_excel(input_filename, sheet_name='기본정보',
                                      header=None, usecols="A:B")
        inputs = df_input_sheet.set_index(0)[1].to_dict()

        # 2. 변수 할당
        try:
            S = inputs['기초자산(S)']
            k = inputs['행사가격(K)']
            v = inputs['연간변동성(V)']
            T = inputs['만기(T,연)']
            callputFlag = inputs['옵션종류']
            div = inputs['배당수익률(q)']
            Num_Simulation = int(inputs['총 시뮬레이션 횟수 (입력)'])
            exerciseStartStep = int(inputs['옵션행사시작시점'])
            exerciseEndStep = int(inputs['옵션행사종료시점'])
            knockInPrice = inputs['Knock-in']
        except Exception as e:
            print(f"--- [입력 오류] 변수 읽기 실패: {e}")
            return None

        MIN_REGRESSION_SAMPLES = 10
        nStep = int(T * 52) + 1
        dt = T / (nStep - 1)
        Z = 1 if str(callputFlag).lower().startswith('c') else -1
        isKnockIn = True if knockInPrice > 0 else False

        # 3. 할인율 읽기
        df_rates_sheet = pd.read_excel(input_filename, sheet_name='할인율', header=None)
        raw_rates = df_rates_sheet.iloc[1, 1:nStep]
        r_term_weekly = pd.to_numeric(raw_rates, errors='coerce').fillna(0).values

        if len(r_term_weekly) < (nStep - 1):
            missing = (nStep - 1) - len(r_term_weekly)
            r_term_weekly = np.concatenate([r_term_weekly, np.zeros(missing)])

        df_term = np.exp(-r_term_weekly)

        print(f"--- 계산 시작 (Simulations: {Num_Simulation:,}, Steps: {nStep}) ---")

        # 4. 주가 경로 생성
        dz = np.random.standard_normal(size=(Num_Simulation, nStep - 1))
        drift = r_term_weekly - (div + 0.5 * v**2) * dt
        diffusion = v * np.sqrt(dt) * dz

        sMat = np.zeros((Num_Simulation, nStep))
        sMat[:, 0] = S
        sMat[:, 1:] = S * np.exp(np.cumsum(drift + diffusion, axis=1))

        kiStatus = np.zeros_like(sMat, dtype=bool)
        if isKnockIn:
            ki_triggered = sMat >= knockInPrice
            kiStatus = np.cumsum(ki_triggered, axis=1) > 0
        else:
            kiStatus.fill(True)

        # 5. Backward Induction
        CC = np.zeros_like(sMat)
        CE = np.zeros_like(sMat)
        EF = np.zeros_like(sMat, dtype=int)

        j = nStep - 1
        intrinsicValue = np.maximum(Z * (sMat[:, j] - k), 0)
        exercise_paths = np.where((j >= exerciseStartStep) & (j <= exerciseEndStep) & kiStatus[:, j])
        CE[exercise_paths, j] = intrinsicValue[exercise_paths]
        CC[:, j] = CE[:, j]
        EF[np.where(CE[:, j] > 0), j] = 1

        for j in range(nStep - 2, 0, -1):
            df = df_term[j]
            intrinsicValue = np.maximum(Z * (sMat[:, j] - k), 0)
            exercise_paths = np.where((j >= exerciseStartStep) & (j <= exerciseEndStep) & kiStatus[:, j])
            CE[exercise_paths, j] = intrinsicValue[exercise_paths]

            in_the_money = np.where(CE[:, j] > 0)[0]
            discounted_future_CC = CC[:, j + 1] * df

            if len(in_the_money) >= MIN_REGRESSION_SAMPLES:
                S_itm = sMat[in_the_money, j]
                Y_itm = discounted_future_CC[in_the_money]
                try:
                    coeffs = np.polyfit(S_itm / S, Y_itm, 2)
                    continuation = np.polyval(coeffs, S_itm / S)
                    continuation[continuation < 0] = -1

                    ex_decision = CE[in_the_money, j] > continuation

                    ex_idx = in_the_money[ex_decision]
                    EF[ex_idx, j] = 1
                    CC[ex_idx, j] = CE[ex_idx, j]
                    CC[ex_idx, j + 1:] = 0
                    EF[ex_idx, j + 1:] = 0

                    hold_idx = in_the_money[~ex_decision]
                    EF[hold_idx, j] = 0
                    CC[hold_idx, j] = discounted_future_CC[hold_idx]

                    otm_idx = np.where(CE[:, j] <= 0)[0]
                    EF[otm_idx, j] = 0
                    CC[otm_idx, j] = discounted_future_CC[otm_idx]
                except:
                    CC[:, j] = discounted_future_CC
                    EF[:, j] = 0
            else:
                CC[:, j] = discounted_future_CC
                EF[:, j] = 0

        # 0시점 처리
        j = 0
        intrinsic_0 = np.maximum(Z * (sMat[:, 0] - k), 0)
        ce_0 = intrinsic_0 if (0 >= exerciseStartStep) and (0 <= exerciseEndStep) and kiStatus[0, 0] else 0
        cont_0 = CC[:, 1] * df_term[0]

        ex_0 = (ce_0 > cont_0) & (ce_0 > 0)
        final_payoff = np.where(ex_0, ce_0, cont_0)

        EF[:, 0] = np.where(ex_0, 1, 0)
        EF[ex_0, 1:] = 0
        CC[:, 0] = final_payoff

        Price = np.mean(final_payoff)
        positiveValueCount = np.sum(final_payoff > 0)

        print("--- 계산 완료 ---")
        return (Price, positiveValueCount, sMat, CC, CE, EF, kiStatus, inputs, df_rates_sheet, df_input_sheet)

    except Exception as e:
        print(f"오류 발생: {e}")
        import traceback
        traceback.print_exc()
        return None

# 메인 실행
start_time = time.time()

try:
    print("LSM_Input.xlsx 파일을 업로드하세요.")
    uploaded = files.upload()
    if not uploaded:
        print("파일이 업로드되지 않았습니다.")
    else:
        input_filename = list(uploaded.keys())[0]
        results = run_lsmc_model(input_filename)

        if results:
            (Price, positiveValueCount, sMat, CC, CE, EF, kiStatus,
             inputs, df_rates_sheet, df_input_sheet) = results

            output_filename = 'LSM_Output.xlsx'
            nStep = sMat.shape[1]
            Num_Simulation = sMat.shape[0]

            # [수정] 엑셀 출력 제한 설정 (5,000개)
            LIMIT_ROWS = 5000
            # 실제 시뮬레이션 횟수가 5000개보다 적을 경우를 대비한 안전장치
            output_rows = min(Num_Simulation, LIMIT_ROWS)

            print(f"\n[알림] 엑셀 파일 생성 중... (전체 {Num_Simulation:,}개 중 상위 {output_rows:,}개 경로만 저장합니다)")

            with pd.ExcelWriter(output_filename, engine='openpyxl') as writer:

                # --- 1. 기본정보 시트 ---
                df_output_info = df_input_sheet.copy()
                def update_info(df, key, value):
                    if key in df[0].values:
                        df.loc[df[df[0] == key].index[0], 1] = value
                    else:
                        df = pd.concat([df, pd.DataFrame([[key, value]], columns=[0, 1])], ignore_index=True)
                    return df

                df_output_info = update_info(df_output_info, '옵션가치', Price)
                df_output_info = update_info(df_output_info, '유효 옵션가치 갯수', positiveValueCount)

                df_output_info.to_excel(writer, sheet_name='기본정보', index=False, header=False)

                # --- 2. 할인율 시트 ---
                df_rates_sheet.to_excel(writer, sheet_name='할인율', index=False, header=False)

                # --- 3. 데이터 시트 (VBA 스타일 + 5000개 제한) ---
                node_header_df = pd.DataFrame(list(range(nStep))).T
                # 시뮬레이션 번호는 1부터 output_rows(5000)까지만 생성
                sim_num_df = pd.DataFrame(range(1, output_rows + 1))

                def write_sheet_vba_style(data_matrix, sheet_name):
                    # [핵심] 데이터 자르기 (Slicing)
                    sliced_data = data_matrix[:output_rows]

                    # 1. Main data -> B2
                    pd.DataFrame(sliced_data).to_excel(writer, sheet_name=sheet_name,
                                                       startrow=1, startcol=1,
                                                       index=False, header=False)
                    # 2. Header -> B1
                    node_header_df.to_excel(writer, sheet_name=sheet_name,
                                            startrow=0, startcol=1,
                                            index=False, header=False)
                    # 3. Sim Index -> A2
                    sim_num_df.to_excel(writer, sheet_name=sheet_name,
                                        startrow=1, startcol=0,
                                        index=False, header=False)

                    # 4. Title -> A1
                    ws = writer.sheets[sheet_name]
                    ws['A1'] = sheet_name

                # 5개 시트 작성
                write_sheet_vba_style(sMat, '주가경로')
                write_sheet_vba_style(CC, '옵션가치')
                write_sheet_vba_style(EF, '최적행사전략')
                write_sheet_vba_style(CE, '행사가치')
                write_sheet_vba_style(kiStatus.astype(int), 'KnockIn상태')

            print(f"'{output_filename}' 생성 완료! (다운로드 시작)")
            files.download(output_filename)

except Exception as e:
    print(f"실행 중 오류: {e}")

LSM_Input.xlsx 파일을 업로드하세요.


Saving 251002_LSMC_Base(13614)_Input.xlsx to 251002_LSMC_Base(13614)_Input (1).xlsx
'251002_LSMC_Base(13614)_Input (1).xlsx' 파일 로딩 중...
--- 계산 시작 (Simulations: 500,000, Steps: 365) ---
--- 계산 완료 ---

[알림] 엑셀 파일 생성 중... (전체 500,000개 중 상위 5,000개 경로만 저장합니다)
