In [7]:
import os
import pandas as pd
import numpy as np
import FinanceDataReader as fdr
import yfinance as yf
from datetime import datetime, timedelta
from scipy.stats import norm

def get_T(df):
    df['std_dt'] = pd.to_datetime(df['std_dt'], format='%Y%m%d')
    df['exr_dt'] = pd.to_datetime(df['exr_dt'], format='%Y%m%d')

    # 만기까지 남은 일수
    days_to_expiry = (df['exr_dt'] - df['std_dt']).dt.days

    # 연 단위 환산 (ACT/365)
    df['T'] = days_to_expiry / 365.0

    return df

def get_sigma(start: str, end: str) -> float:
    """S&P500 지수의 연율화 변동성 계산."""
    df = yf.download("^GSPC", start, end, progress=False)
    df["log_ret"] = np.log(df["Close"] / df["Close"].shift(1))
    returns = df["log_ret"].dropna()
    return returns.std() * np.sqrt(252)

def assign_r_by_maturity(df: pd.DataFrame, lookback_days: int) -> pd.DataFrame:
    """
    만기에 따라 무위험 금리(r)를 매핑해 'r' 컬럼을 추가.
      - T ≤ 0.25년 → 3개월 T-Bill (DTB3)
      - 0.25 < T ≤ 1년 → 5년 국채 (DGS5)
      - T > 1년 → 10년 국채 (DGS10)
    """
    end = datetime.now()
    start = end - timedelta(days=lookback_days)
    r_short = fdr.DataReader('FRED:DTB3', start, end)['DTB3'].dropna().iloc[-1] / 100
    r_mid   = fdr.DataReader('FRED:DGS5', start, end)['DGS5'].dropna().iloc[-1] / 100
    r_long  = fdr.DataReader('FRED:DGS10', start, end)['DGS10'].dropna().iloc[-1] / 100

    def choose_r(T):
        if T <= 0.25:
            return r_short
        elif T <= 1.0:
            return r_mid
        else:
            return r_long

    df = df.copy()
    df['r'] = df['T'].apply(choose_r)
    return df

def bs_gamma(S, K, T, r, sigma):
    """Black-Scholes 감마 계산."""
    d1 = (np.log(S / K) + (r + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T))
    return norm.pdf(d1) / (S * sigma * np.sqrt(T))

def add_gamma_column(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()

    # 1) strike 파싱  (컬럼 이름을 정확히!)
    df['strike'] = (
        df['atk_nm']                   # ← 원본에서 실제 옵션코드 컬럼명
        .str.extract(r'[CP](\d{8})')[0]
        .astype(float) / 1000.0
    )

    df = df.dropna(subset=['strike'])

    # 2) γ 계산
    df['gamma'] = bs_gamma(
        S=df['base_clprc'],
        K=df['strike'],
        T=df['T'],
        r=df['r'],
        sigma=df['sigma']
    )
    return df

def calc_gex(
    df: pd.DataFrame,
    *,                         # 키워드 인자 강제
    front_expiry: str | None = None,
    contract_size: int = 100,      # SPX 대형=100, Mini=10
    dte_limit: int | None = None,           # n일 안쪽의 옵션만 보겠다?
    pct_strike: float |  None = None,      # ATM ± 5 %
    scale: int = 1_000,            # 1=달러, 1_000='K$', 1_000_000='M$'
) -> pd.DataFrame:
    """
    UnusualWhales 방식 γ-Dollar 노출 계산
    
    γ-Dollar(row) = γ × OI × 계약멀티플라이어(100) × S × 0.01
    ───────────────────────────────────────────────────────────
    Parameters
    ----------
    df : DataFrame
        필수 컬럼
          std_dt, exr_dt : 'YYYYMMDD' 또는 datetime
          strike         : add_gamma_column()에서 생성
          base_clprc     : 현물 지수 S
          gamma          : per-contract γ
          opn_int        : 미결제약정
    front_expiry : str | None
        'YYYY-MM-DD' 만기만 포함. None이면 모든 만기.
    contract_size : int
        옵션 1계약당 멀티플라이어 (SPX=100, XSP=10).
    dte_limit : int
        만기까지 남은 최대 일수(DTE) 필터.
    pct_strike : float
        ATM 대비 허용 퍼센트 폭 (±).
    scale : int
        결과를 나눠 출력할 스케일 (1=달러, 1_000='K').

    Returns
    -------
    DataFrame[std_dt, gex]  (gex는 scale 적용 단위)
    """

    df_use = df.copy()

    # 0) front 만기 필터
    if front_expiry:
        exp_dt = pd.to_datetime(front_expiry)
        df_use = df_use[df_use['exr_dt'] == exp_dt]
        if df_use.empty:
            raise ValueError(f"exr_dt = {front_expiry} 해당 행이 없습니다.")

    # 1) 날짜 형식 통일
    if df_use['std_dt'].dtype == 'object':
        df_use['std_dt'] = pd.to_datetime(df_use['std_dt'], format='%Y%m%d')

    # 2) DTE 필터 (0–dte_limit 일)
    if dte_limit:
        df_use = df_use[df_use['T'] <= dte_limit / 365]

    # 3) ATM ± pct_strike 필터
    if pct_strike:
        df_use = df_use[
            np.abs(df_use['strike'] - df_use['base_clprc']) <= df_use['base_clprc'] * pct_strike
        ]

    # 4) γ-Dollar (UW 공식)
    df_use['row_gex'] = (
        df_use['gamma']
        * df_use['opn_int']
        * contract_size                    # ← 멀티플라이어 100
        * (df_use['base_clprc'])
        * 0.01                             # 1 % 가격변화 당
    )

    # 5) 날짜별 합계 & 스케일링
    result = (
        df_use.groupby('std_dt')['row_gex']
        .sum()
        .div(scale)
        .reset_index(name='gex')
    )
    return result

def calc_gex_main(target_date, OPN_INT_FILTER, SPAN):
    try:
        df = pd.read_csv(f"./data/sp500_raw_data_{target_date}.csv")

        if len(df) != 0:
            df_call, df_put = df[df['stk_tp_cd']=="C"].dropna(), df[df['stk_tp_cd']=="P"].dropna()


            df_call, df_put = df_call[df_call['opn_int']>OPN_INT_FILTER], df_put[df_put['opn_int']>OPN_INT_FILTER]

            # T 추가
            df_call, df_put = get_T(df_call), get_T(df_put)

            # sigma 구하기 위한 시간범위 설정
            # 일단 60일 기준
            end_date_raw = datetime.strptime(str(list(df['std_dt'])[0]), '%Y%m%d')
            start_date_raw = end_date_raw - timedelta(days=SPAN)
            START_DATE, END_DATE = start_date_raw.strftime('%Y-%m-%d'), end_date_raw.strftime('%Y-%m-%d')

            # sigma 추가
            sigma = get_sigma(START_DATE, END_DATE)

            # 무위험이자율 r도 추가
            df_call['sigma'], df_put['sigma'] = sigma, sigma
            df_call, df_put = assign_r_by_maturity(df_call, SPAN), assign_r_by_maturity(df_put, SPAN)
            df_call, df_put = add_gamma_column(df_call), add_gamma_column(df_put)

            call_gex = calc_gex(df_call)
            put_gex  = calc_gex(df_put)

            net_gex = call_gex.set_index('std_dt')['gex'] - put_gex.set_index('std_dt')['gex']

            return {
                'call_gex': int(call_gex['gex']),
                'put_gex': int(put_gex['gex']),
                'net_gex': int(net_gex)
            }

        else:
            print(f"{e} | csv는 있는데 데이터가 없음")
            return None
    except Exception as e:
        print(f"{e} | 데이터 없음 ({target_date})")
        return None
    


In [8]:
OPN_INT_FILTER = 0
SPAN = 365

date_range_raw = [t for t in os.listdir("./data") if t != '.DS_Store']
date_range = []
for d in date_range_raw:
    d = d.split("_")[-1][:8]
    date_range.append(d)
date_range = sorted(date_range)

store_GEX_data = {}
# 원본 데이터 불러오고 C, P별로 df 분리
for i, d in enumerate(date_range):
    print(f' === ({i+1}/{len(date_range)}) | {d} ===')
    result = calc_gex_main(d, OPN_INT_FILTER, SPAN)
    if result:
        store_GEX_data[d] = result

 === (1/362) | 20240102 ===
No columns to parse from file | 데이터 없음 (20240102)
 === (2/362) | 20240103 ===
 === (3/362) | 20240104 ===
 === (4/362) | 20240105 ===
 === (5/362) | 20240108 ===
 === (6/362) | 20240109 ===
 === (7/362) | 20240110 ===
 === (8/362) | 20240111 ===
 === (9/362) | 20240112 ===
 === (10/362) | 20240115 ===
 === (11/362) | 20240116 ===
No columns to parse from file | 데이터 없음 (20240116)
 === (12/362) | 20240117 ===
 === (13/362) | 20240118 ===
 === (14/362) | 20240119 ===
 === (15/362) | 20240122 ===
 === (16/362) | 20240123 ===
 === (17/362) | 20240124 ===
 === (18/362) | 20240125 ===
 === (19/362) | 20240126 ===
 === (20/362) | 20240129 ===
 === (21/362) | 20240130 ===
 === (22/362) | 20240131 ===
 === (23/362) | 20240201 ===
 === (24/362) | 20240202 ===
 === (25/362) | 20240205 ===
 === (26/362) | 20240206 ===
 === (27/362) | 20240207 ===
 === (28/362) | 20240208 ===
 === (29/362) | 20240213 ===
No columns to parse from file | 데이터 없음 (20240213)
 === (30/362) | 20

In [10]:
my_GEX = pd.DataFrame(store_GEX_data).T

In [11]:
crawled_GEX = pd.read_csv('./SPX_GEX_df.csv')

In [12]:
crawled_GEX

Unnamed: 0,Date,Call GEX,Put GEX,Net GEX,P/C GEX
0,2025-06-25,489.43K,-309.01K,180.42K,0.63
1,2025-06-24,444.91K,-308.35K,136.56K,0.69
2,2025-06-23,358.45K,-368.97K,-10.52K,1.03
3,2025-06-20,365.2K,-345.19K,20.02K,0.95
4,2025-06-18,659.07K,-646.62K,12.45K,0.98
...,...,...,...,...,...
246,2024-07-01,460.86K,-431.49K,29.37K,0.94
247,2024-06-28,505.97K,-387.31K,118.66K,0.77
248,2024-06-27,496.2K,-413.05K,83.15K,0.83
249,2024-06-26,466.85K,-443.32K,23.53K,0.95


In [13]:
my_GEX

Unnamed: 0,call_gex,put_gex,net_gex
20240103,15217,11873,3343
20240104,15310,13064,2245
20240105,15474,13869,1605
20240108,16034,14152,1882
20240109,16598,12876,3721
...,...,...,...
20250623,11078,9707,1371
20250624,11515,9543,1972
20250625,11659,9037,2622
20250626,11936,9340,2595


In [18]:
import pandas as pd
import numpy as np

# ── 1) Series ① : store_GEX_data ------------------------------------------------
#     (이미 메모리에 store_GEX_data 딕셔너리가 있다고 가정)
s1 = pd.DataFrame(store_GEX_data).T['net_gex']

# 문자열이면 정수로 변환
s1_num = pd.to_numeric(
    s1.astype(str).str.replace('[^0-9.-]', '', regex=True)
)

# ── 2) Series ② : CSV에서 "Net GEX" ------------------------------------------------
df2 = pd.read_csv('./SPX_GEX_df.csv')

col2 = 'Net GEX' if 'Net GEX' in df2.columns else df2.columns[0]
raw2 = df2[col2]

def parse_km(x: str) -> float:
    x = str(x).strip()
    if x.endswith('K'):
        return float(x[:-1].replace(',', '')) * 1_000      # K → ×1,000
    if x.endswith('M'):
        return float(x[:-1].replace(',', '')) * 1_000_000  # M → ×1,000,000
    return float(x.replace(',', ''))

s2_num = raw2.apply(parse_km)

# ── 3) Z-score 표준화 ------------------------------------------------------------
s1_z = (s1_num - s1_num.mean()) / s1_num.std(ddof=0)
s2_z = (s2_num - s2_num.mean()) / s2_num.std(ddof=0)

# 길이가 다르면 짧은 쪽에 맞춰 절단
n = min(len(s1_z), len(s2_z))
s1_z, s2_z = s1_z.iloc[:n].reset_index(drop=True), s2_z.iloc[:n].reset_index(drop=True)

# ── 4) 유사도 지표 ---------------------------------------------------------------
pearson = s1_z.corr(s2_z)
mae      = np.mean(np.abs(s1_z - s2_z))
rmse     = np.sqrt(np.mean((s1_z - s2_z) ** 2))

print(f"Pearson r  : {pearson:.3f}")
print(f"MAE (z-score): {mae:.3f}")
print(f"RMSE (z-score): {rmse:.3f}")

Pearson r  : 0.077
MAE (z-score): 0.990
RMSE (z-score): 1.259
