In [None]:
import pandas as pd
import numpy as np
import statsmodels.api as sm
from statsmodels.stats.outliers_influence import variance_inflation_factor
from sklearn.preprocessing import StandardScaler

# 1. 데이터 로드 (사용자 데이터 구조 예시)
# df = pd.read_csv('your_data.csv')
# 아래는 설명을 위한 가상 데이터 생성 예시입니다.
df = pd.read_csv("./final/data/refined/LGES_final.csv")
df = df.sort_values('Date')
df = df.fillna(method='ffill')
df = df.rename(columns={'y(stock)': 'y'})

# --- [1] 데이터 전처리: 날짜별 Aggregation ---
# E1~E7은 합산 후 x1으로 나누어 비율로 변환
e_columns = ['E1', 'E2', 'E3', 'E4', 'E5', 'E6', 'E7']
macro_columns = ['usd_krw', 'vix'] # 예시 지표

# 날짜별 그룹화
agg_dict = {col: 'sum' for col in e_columns}
agg_dict.update({col: 'first' for col in ['x1', 'y'] + macro_columns})
df_daily = df.groupby('Date').agg(agg_dict).reset_index()

# 주제별 노출도 계산 (E_i / x1)
for col in e_columns:
    df_daily[col] = df_daily[col] / df_daily['x1']

# 타겟 변수 절대값 변환 및 x1 로그 변환
df_daily['log_x1'] = np.log1p(df_daily['x1']) # log(1+x)로 안정성 확보

# 분석에 사용할 최종 피쳐 설정
features = e_columns + ['log_x1'] + macro_columns
X = df_daily[features]
y = df_daily['y']

# --- [2] 다중공선성(VIF) 확인 ---
# 스케일링 전 VIF 계산을 위해 상수항 추가
# VIF 계산용 데이터프레임 준비
X_vif = X.copy()
X_vif = sm.add_constant(X_vif) 

vif_data = pd.DataFrame()
vif_data["feature"] = X_vif.columns
vif_data["VIF"] = [variance_inflation_factor(X_vif.values, i) for i in range(len(X_vif.columns))]
print("--- [VIF 결과 (10 이상 주의)] ---")
print(vif_data)

# --- [3] 통계적 유의성 확인 (OLS Regression) ---
# 계수 비교를 위해 독립변수 표준화(Standardization) 수행
scaler = StandardScaler()
X_scaled = pd.DataFrame(scaler.fit_transform(X), columns=X.columns)
X_scaled = sm.add_constant(X_scaled) # 상수항 추가

model = sm.OLS(y.values, X_scaled).fit()
print("\n--- [회귀 분석 요약 결과 (P>|t| 확인)] ---")
print(model.summary())

# --- [4] 피쳐 중요도 산출 및 기여도 해석 ---
# 표준화된 계수(Standardized Coefficients)를 중요도로 사용
importance = model.params.drop('const').abs().sort_values(ascending=False)

print("\n--- [피쳐 중요도 (절대적 영향력 순위)] ---")
print(importance)

# 예시 출력: 특정 날짜의 기여도 분석
sample_date_idx = 0
contribution = (X_scaled.iloc[sample_date_idx] * model.params).drop('const')
print(f"\n[{df_daily.iloc[sample_date_idx]['Date']}] 실제 변동폭: {y.iloc[sample_date_idx]:.2f}%")
print("요인별 기여도:")
print(contribution.sort_values(ascending=False))

### Logistic Reg.

In [5]:
df_daily[['Date', 'x1', 'E1', 'usd_krw', 'y']].head()

Unnamed: 0,Date,x1,E1,usd_krw,y
0,2024-12-10,42,0.047619,1432.6,-0.645161
1,2024-12-11,25,0.2,1428.5,-1.818182
2,2024-12-12,43,0.069767,1429.47,7.010582
3,2024-12-13,29,0.0,1434.82,-1.112485
4,2024-12-16,28,0.071429,1437.86,-0.5


In [14]:
df_daily = df_daily[df_daily['y'] != 0].copy()
df_daily = df_daily.reset_index(drop=True) # 인덱스를 X와 맞추기 위해 초기화
df_daily['y_bin'] = (df_daily['y'] > 0).astype(int)

# --- [2] 데이터 준비 ---
e_columns = ['E1', 'E2', 'E3', 'E4', 'E5', 'E6', 'E7']
macro_columns = ['usd_krw', 'vix']
features = e_columns + ['log_x1'] + macro_columns

X = df_daily[features]
y = df_daily['y_bin']

# --- [3] 스케일링 및 상수항 추가 ---
scaler = StandardScaler()
X_scaled = pd.DataFrame(scaler.fit_transform(X), columns=X.columns)
X_scaled = sm.add_constant(X_scaled)

# --- [4] 로지스틱 회귀 수행 (Logit) ---
logit_model = sm.Logit(y, X_scaled).fit()

print(logit_model.summary())

# --- [5] 오즈비(Odds Ratio) 계산 ---
# 로지스틱 회귀 계수를 지수함수(exp)로 변환하면 영향력을 직관적으로 이해하기 좋습니다.
# 1보다 크면 상승 확률 증가, 1보다 작으면 상승 확률 감소를 의미합니다.
odds_ratios = pd.DataFrame({
    'Odds Ratio': np.exp(logit_model.params),
    'P-value': logit_model.pvalues
}).sort_values('Odds Ratio', ascending=False)

print("\n--- [오즈비 분석 (1보다 클수록 상승 기여)] ---")
print(odds_ratios)

Optimization terminated successfully.
         Current function value: 0.679023
         Iterations 5
                           Logit Regression Results                           
Dep. Variable:                  y_bin   No. Observations:                  233
Model:                          Logit   Df Residuals:                      222
Method:                           MLE   Df Model:                           10
Date:                Tue, 23 Dec 2025   Pseudo R-squ.:                 0.02036
Time:                        22:42:19   Log-Likelihood:                -158.21
converged:                       True   LL-Null:                       -161.50
Covariance Type:            nonrobust   LLR p-value:                    0.7646
                 coef    std err          z      P>|z|      [0.025      0.975]
------------------------------------------------------------------------------
const          0.0054      0.133      0.040      0.968      -0.255       0.266
E1            -0.1959      0.

### SHAP

In [None]:
import pandas as pd
import numpy as np
import os
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
import shap

# --- [설정] 파일 목록 및 경로 ---
base_path = "./final/data/refined/"
output_path = "./shap_results/"

# 결과 저장 폴더 생성
if not os.path.exists(output_path):
    os.makedirs(output_path)

file_names = [
    "DoosanEnerbility_final",
    "HanwhaAerospace_final",
    "HDHyundaiHeavy_final",
    "HyundaiMotor_final",
    "KBFinancial_final",
    "Kia_final",
    "LGES_final",
    "SamsungBio_final",
    "SamsungElectronics_final",
    "SKHynix_final"
]

# --- [전처리 및 모델링 함수 정의] ---
def process_stock_shap(file_name):
    print(f"Processing: {file_name}...")
    
    # 1. 데이터 로드
    file_path = os.path.join(base_path, f"{file_name}.csv")
    if not os.path.exists(file_path):
        print(f"Error: File not found - {file_path}")
        return

    df = pd.read_csv(file_path)
    df = df.sort_values('Date')
    df = df.ffill().bfill() # 최신 pandas 문법 권장
    df = df.rename(columns={'y(stock)': 'y'})

    # 2. 데이터 전처리
    e_columns = ['E1', 'E2', 'E3', 'E4', 'E5', 'E6', 'E7']
    # 매크로 지표가 파일에 있을 경우를 대비해 정의 (없으면 무시하도록 처리 가능하나, 기존 코드 로직 유지)
    macro_columns = ['usd_krw', 'vix'] 
    
    # agg_dict 생성 (컬럼이 실제 데이터에 있는지 확인 후 적용)
    agg_dict = {col: 'sum' for col in e_columns}
    existing_macros = [col for col in macro_columns if col in df.columns]
    agg_dict.update({col: 'first' for col in existing_macros + ['x1', 'y']})

    df_daily = df.groupby('Date').agg(agg_dict).reset_index()
    
    # y=0 제거
    df_daily = df_daily[df_daily['y'] != 0].copy()
    df_daily = df_daily.reset_index(drop=True)

    # 주제별 노출도 계산 (E_i / x1)
    for col in e_columns:
        df_daily[col] = df_daily[col] / df_daily['x1']

    # 타겟 변수 및 피쳐 생성
    df_daily['log_x1'] = np.log1p(df_daily['x1'])
    df_daily['y_bin'] = (df_daily['y'] > 0).astype(int)

    features = e_columns + ['log_x1']
    X = df_daily[features]
    y = df_daily['y_bin']

    # 3. 모델링 (Random Forest)
    scaler = StandardScaler()
    X_scaled = pd.DataFrame(scaler.fit_transform(X), columns=X.columns, index=X.index)

    # 전체 데이터셋에 대해 SHAP 값을 뽑기 위해 split은 학습용으로만 사용하고
    # SHAP 계산은 X_scaled 전체를 대상으로 합니다.
    X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2, random_state=42)

    rf_model = RandomForestClassifier(n_estimators=100, random_state=42, class_weight='balanced')
    rf_model.fit(X_train, y_train)

    # 4. SHAP Value 계산
    explainer = shap.TreeExplainer(rf_model)
    shap_values = explainer.shap_values(X_scaled)

    # 차원 확인 및 추출 (Class 1: 상승에 대한 기여도 추출)
    if isinstance(shap_values, list):
        actual_shap_values = shap_values[1]
    elif len(shap_values.shape) == 3:
        actual_shap_values = shap_values[:, :, 1]
    else:
        actual_shap_values = shap_values

    # 5. 결과 데이터프레임 생성 (Daily SHAP)
    # 날짜 컬럼 추가
    result_df = pd.DataFrame(actual_shap_values, columns=features)
    result_df.insert(0, 'Date', df_daily['Date'])

    # 6. Summary Row 생성 (Global Feature Importance)
    # SHAP Summary Plot에서 보여주는 '변수의 중요도'는 절대값의 평균입니다.
    summary_values = result_df[features].abs().mean()
    
    # Summary 행 생성 (Date에는 식별자 입력)
    summary_row = pd.DataFrame(summary_values).T
    summary_row.insert(0, 'Date', 'Global_Importance (Mean(|SHAP|))')
    
    # 최종 병합 (일별 데이터 + 마지막 줄 Summary)
    final_output = pd.concat([result_df, summary_row], ignore_index=True)

    #return final_output
    # 7. CSV 저장
    save_name = f"SHAP_{file_name}.csv"
    save_path = os.path.join(output_path, save_name)
    final_output.to_csv(save_path, index=False)
    print(f"Saved: {save_path}")

# --- [메인 실행 루프] ---
if __name__ == "__main__":
    for file_name in file_names:
        try:
            process_stock_shap(file_name)
        except Exception as e:
            print(f"Failed to process {file_name}: {e}")

    print("\nAll processes completed.")

# file = "SamsungElectronics_final"
# final_df = process_stock_shap(file)
# final_df.tail()