## GICS 체계로 클러스터링

In [3]:
import pandas as pd
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans, SpectralClustering
from sklearn.mixture import GaussianMixture
from sklearn.metrics import silhouette_score, calinski_harabasz_score
from sklearn.metrics import davies_bouldin_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.cluster import DBSCAN
from sklearn.neighbors import NearestNeighbors
from sklearn.inspection import permutation_importance
from statsmodels.stats.outliers_influence import variance_inflation_factor
import statsmodels.api as sm
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.decomposition import PCA
import matplotlib.font_manager as fm
import platform
import shap
import lime
import lime.lime_tabular
from sklearn.manifold import TSNE
import random  


# # CSV 파일 경로
# gics_file_path = '../csv/종목_GICS분류_클러스터.csv'
# kospi_file_path = '../csv/financial_data_processing_cospi200.csv'


In [4]:
# df_processing_cospi200 = pd.read_csv(kospi_file_path)
# df_gics = pd.read_csv(csv_file_path)


### Davies-Bouldin Index 계산

In [6]:
# from sklearn.metrics import davies_bouldin_score

# # 컬럼명 맞춰 병합 준비
# df_gics_renamed = df_gics.rename(columns={'종목명': '기업명_그룹'})

# # '기업명_그룹'을 기준으로 병합
# df_merged = pd.merge(df_processing_cospi200, df_gics_renamed[['기업명_그룹', 'Cluster']], on='기업명_그룹')

# # 클러스터 레이블과 특징 데이터 분리
# labels = df_merged['Cluster']
# features = df_merged.drop(columns=['기업명_그룹', 'Cluster'])

# # Davies-Bouldin Index 계산
# db_index = davies_bouldin_score(features, labels)

# # 결과 출력
# print("Davies-Bouldin Index:", db_index)

### Davies-Bouldin Index이 19.69로 나와 기존에 분류 체계로 분류하면 클러스터링 결과가 좋지 않은 것을 확인할 수 있다

In [7]:
# # GICS 기반 클러스터 수 확인
# num_gics_clusters = df_gics['Cluster'].nunique()
# print(f"GICS 클러스터 수: {num_gics_clusters}")


In [8]:
# df_gics_renamed = df_gics.rename(columns={'종목명': '기업명_그룹'})
# df_merged_gics = pd.merge(df_processing_cospi200, df_gics_renamed[['기업명_그룹', 'Cluster']], on='기업명_그룹')


In [9]:
# from sklearn.cluster import KMeans

# # 클러스터링 대상 feature만 추출
# X_features = df_processing_cospi200.drop(columns=['기업명_그룹'])

# # GICS 클러스터 개수에 맞춰 KMeans 적용
# kmeans_model = KMeans(n_clusters=num_gics_clusters, random_state=42, n_init='auto')
# kmeans_labels = kmeans_model.fit_predict(X_features)

# # 클러스터 결과 병합
# df_merged_kmeans = df_processing_cospi200.copy()
# df_merged_kmeans['Cluster'] = kmeans_labels


In [10]:
# import random

def make_inter_cluster_portfolios(df, num_trials=1000):
    portfolios = []
    cluster_to_stocks = df.groupby('Cluster')['기업명_그룹'].apply(list).to_dict()

    for _ in range(num_trials):
        sampled = []
        for stocks in cluster_to_stocks.values():
            if stocks:
                sampled.append(random.choice(stocks))
        portfolios.append(sampled)

    return portfolios

# # 생성
# random.seed(42)  # 재현성
# gics_portfolios = make_inter_cluster_portfolios(df_merged_gics)
# kmeans_portfolios = make_inter_cluster_portfolios(df_merged_kmeans)


In [11]:
def markowitz_min_variance(returns: pd.DataFrame, target_return):
    mean_returns = returns.mean().values
    cov_matrix = returns.cov().values
    num_assets = len(mean_returns)

    # 포트폴리오 분산 (리스크) 계산
    def portfolio_variance(weights):
        return weights.T @ cov_matrix @ weights

    # 제약조건: 총합 = 1, 기대 수익률 ≥ target_return
    constraints = [
        {'type': 'eq', 'fun': lambda w: np.sum(w) - 1},
        {'type': 'ineq', 'fun': lambda w: w @ mean_returns - target_return}
    ]

    # 비중 범위 (0~1)

    min_weights = 0.001
    bounds = tuple((min_weights, 1) for _ in range(num_assets))
    init_guess = np.repeat(1 / num_assets, num_assets)

    result = minimize(portfolio_variance, init_guess, method='SLSQP', bounds=bounds, constraints=constraints)

    if result.success:
        return result.x
    else:
        raise ValueError("최적화 실패: ", result.message)

In [12]:
from scipy.stats import ttest_ind
from scipy.optimize import minimize

def sortino_ratio(returns, risk_free_rate=0.0):
    downside_returns = returns[returns < risk_free_rate]
    denom = np.std(downside_returns)
    if denom == 0 or np.isnan(denom):
        return np.nan
    return np.mean(returns - risk_free_rate) / denom

def cvar(returns, alpha=0.01):
    if len(returns) == 0:
        return np.nan
    var_threshold = np.quantile(returns, alpha)
    tail_losses = returns[returns <= var_threshold]
    return tail_losses.mean() if len(tail_losses) > 0 else np.nan


def max_drawdown(returns: pd.Series) -> float:
    returns = returns.dropna()
    if len(returns) == 0:
        return np.nan
    cumulative = (1 + returns).cumprod()
    peak = cumulative.cummax()
    drawdown = (cumulative - peak) / peak
    return drawdown.min()


def omega_ratio(returns: pd.Series, target_return: float = 0.0) -> float:
    returns = returns.dropna()
    if len(returns) == 0:
        return np.nan
    excess_returns = returns - target_return
    gain = excess_returns[excess_returns > 0].sum()
    loss = -excess_returns[excess_returns < 0].sum()
    return gain / loss if loss != 0 else np.nan


In [17]:

def optimize_and_evaluate_with_weights(portfolio_list, returns_in, returns_out, optimizer_func):
    sortino_scores, cvar_scores, mdd_scores, omega_scores, weight_records = [], [], [], [], []

    for portfolio in portfolio_list:
        try:
            sub_in = returns_in[portfolio].dropna()
            sub_out = returns_out[portfolio].dropna()

            weights = optimizer_func(sub_in)
            pf_out = sub_out @ weights  # 포트폴리오 수익률

            sortino_scores.append(sortino_ratio(pf_out))
            cvar_scores.append(cvar(pf_out))
            mdd_scores.append(max_drawdown(pf_out))
            omega_scores.append(omega_ratio(pf_out))
            weight_records.append(pd.Series(weights, index=sub_in.columns))  # ⬅️ 종목 이름과 함께 저장

        except Exception:
            continue

    return sortino_scores, cvar_scores, mdd_scores, omega_scores, weight_records


In [13]:
import pandas as pd

# CSV 파일 불러오기
file_path = '../csv/log_returns_total.csv'
log_returns_total = pd.read_csv(file_path, index_col=0)

# 인덱스를 datetime으로 변환
log_returns_total.index = pd.to_datetime(log_returns_total.index)

# 전체 범위 확인
start_date = log_returns_total.index.min()
end_date = log_returns_total.index.max()

# in-sample: 앞 3년 / out-of-sample: 뒤 2년
cutoff_date = start_date + pd.DateOffset(years=3)
returns_in_sample = log_returns_total.loc[start_date : cutoff_date - pd.DateOffset(days=1)]
returns_out_sample = log_returns_total.loc[cutoff_date : end_date]

returns_in_sample.shape, returns_out_sample.shape, cutoff_date


((742, 195), (489, 195), Timestamp('2023-01-02 00:00:00'))

In [15]:
# results_in_all = {}
# results_out_all = {}

# target_returns = [round(x, 3) for x in np.arange(0.005, 0.051, 0.005)]

# for target_return in target_returns:
#     label = f"MinVar({int(target_return * 1000)/10:.1f}%)"
#     print(f"\n▶▶▶ {label} 최적화 방식으로 평가 중...")

#     # 최적화 함수 정의
#     optimizer = lambda x, tr=target_return: markowitz_min_variance(x, tr)

#     # ✅ In-sample 평가
#     gics_sortino_in, gics_cvar_in, gics_mdd_in, gics_omega_in, _ = optimize_and_evaluate_with_weights(
#         gics_portfolios, returns_in_sample, returns_in_sample, optimizer)
#     kmeans_sortino_in, kmeans_cvar_in, kmeans_mdd_in, kmeans_omega_in, _ = optimize_and_evaluate_with_weights(
#         kmeans_portfolios, returns_in_sample, returns_in_sample, optimizer)

#     results_in_all[label] = {
#         'gics_df': pd.DataFrame({
#             'Sortino': gics_sortino_in,
#             'CVaR': gics_cvar_in,
#             'MDD': gics_mdd_in,
#             'Omega': gics_omega_in
#         }),
#         'kmeans_df': pd.DataFrame({
#             'Sortino': kmeans_sortino_in,
#             'CVaR': kmeans_cvar_in,
#             'MDD': kmeans_mdd_in,
#             'Omega': kmeans_omega_in
#         })
#     }

#     # ✅ Out-of-sample 평가
#     gics_sortino_out, gics_cvar_out, gics_mdd_out, gics_omega_out, _ = optimize_and_evaluate_with_weights(
#         gics_portfolios, returns_in_sample, returns_out_sample, optimizer)
#     kmeans_sortino_out, kmeans_cvar_out, kmeans_mdd_out, kmeans_omega_out, _ = optimize_and_evaluate_with_weights(
#         kmeans_portfolios, returns_in_sample, returns_out_sample, optimizer)

#     results_out_all[label] = {
#         'gics_df': pd.DataFrame({
#             'Sortino': gics_sortino_out,
#             'CVaR': gics_cvar_out,
#             'MDD': gics_mdd_out,
#             'Omega': gics_omega_out
#         }),
#         'kmeans_df': pd.DataFrame({
#             'Sortino': kmeans_sortino_out,
#             'CVaR': kmeans_cvar_out,
#             'MDD': kmeans_mdd_out,
#             'Omega': kmeans_omega_out
#         })
#     }


In [16]:
# import os
# from scipy.stats import ttest_ind
# import pandas as pd

# # 저장 폴더 생성
# os.makedirs('results/gics_vs_kmeans', exist_ok=True)

# # 테스트할 메트릭
# metrics_to_test = ['Sortino', 'CVaR', 'MDD', 'Omega']

# # 유의수준 마커 함수
# def significance_marker(p):
#     if p < 0.01:
#         return "***"
#     elif p < 0.05:
#         return "**"
#     elif p < 0.10:
#         return "*"
#     else:
#         return ""

# # 각 최적화 방식별로 반복
# for method in results_in_all.keys():
#     for sample_type, result_data in [("in_sample", results_in_all[method]), ("out_sample", results_out_all[method])]:
#         rows = []
#         for metric in metrics_to_test:
#             # GICS와 KMeans 포트폴리오 수익률 추출
#             gics_vals = result_data['gics_df'][metric].dropna()
#             kmeans_vals = result_data['kmeans_df'][metric].dropna()

#             # 평균 ± 표준편차
#             gics_mean_std = f"{gics_vals.mean():.4f} ± {gics_vals.std():.4f}"
#             kmeans_mean_std = f"{kmeans_vals.mean():.4f} ± {kmeans_vals.std():.4f}"

#             # 단측 t-검정 (KMeans > GICS)
#             t_stat, p_val = ttest_ind(kmeans_vals, gics_vals, equal_var=False, alternative='greater')

#             # 행 추가
#             rows.append([
#                 kmeans_mean_std,
#                 gics_mean_std,
#                 f"{t_stat:.4f}",
#                 f"{p_val:.4f}",
#                 significance_marker(p_val)
#             ])

#         # DataFrame 생성 및 저장
#         df = pd.DataFrame(
#             rows,
#             columns=['KMeans Mean±Std', 'GICS Mean±Std', 't-stat', 'p-value', 'Significance'],
#             index=metrics_to_test
#         )

#         file_path = f"results/gics_vs_kmeans/ttest_gics_vs_kmeans_{method}_{sample_type}.csv"
#         df.to_csv(file_path, encoding='utf-8-sig')
#         print(f"✅ 저장 완료: {file_path}")

#         # 출력
#         display(df)
