In [None]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')

In [None]:
# 데이터 경로
DATA_PATH = '/aiffel/aiffel/fnguide/data/'

# 데이터 불러오기
modify_data = pd.read_csv(os.path.join(DATA_PATH, 'sub_upbit_eth_min_tick.csv'), index_col=0, parse_dates=True)

# 불러온 데이터 시각화하기
modify_data.loc['2017-11-01':'2017-12-31','close'].plot()

In [None]:
# window 지정
window = 10

# momentum_signal 만들기
momentum_signal = np.sign(modify_data['close'] - modify_data['close'].shift(window))

# s_momentum_signal 만들기
s_momentum_signal = pd.Series(momentum_signal, index=modify_data.index)

In [None]:
# 기존 데이터 만들기
# loc를 활용하여 2017-11-21부터 close(종가)까지 가져오기
sub_data = modify_data.loc['2017-11-21':, 'close']

# 수식 적용된 데이터(Signal) 만들기
# loc를 활용하여 2017-11-21의 시간대별 값을 가져오기 (데이터프레임으로 변환하여 색상 컬럼 추가 준비)
c_sig = s_momentum_signal.loc['2017-11-21':].to_frame(name='signal')

# 두 데이터의 비교를 위한 색상 바꾸기
# np.where 사용: 시그널이 0보다 크면(상승) 'red', 아니면 'blue'
c_sig['color'] = np.where(c_sig['signal'] > 0, 'red', 'blue')

# 시각화하기
plt.figure(figsize=(15, 5))
# 주가 선 그래프 그리기
plt.plot(sub_data.index, sub_data, label='Close Price', alpha=0.5)
# 시그널에 따른 산점도 그리기 (상승: 빨강, 하락: 파랑)
plt.scatter(sub_data.index, sub_data, c=c_sig['color'], s=20, label='Signal')

plt.title(f'Price Change Direction (Window={window})')
plt.legend()
plt.show()

In [None]:
# momentum_signal
# 주가가 이동평균보다 높으면 양수(+), 낮으면 음수(-)가 나오도록 차이를 계산
momentum_signal = np.sign(modify_data['close'] - modify_data['close'].rolling(window).mean())

# s_momentum_signal
s_momentum_signal = pd.Series(momentum_signal, index=modify_data.index)

In [None]:
# 기존 데이터 만들기
# 2017-11-21부터 종가 데이터 가져오기
sub_data = modify_data.loc['2017-11-21':, 'close']

# 수식 적용된 데이터 만들기
# 시그널 데이터를 가져와서 데이터프레임으로 변환 (색상 컬럼 추가를 위해)
c_sig = s_momentum_signal.loc['2017-11-21':].to_frame(name='signal')

# 두 데이터의 비교를 위한 색상 바꾸기
# 시그널이 0보다 크면(주가 > 이동평균) 'red', 아니면 'blue'
c_sig['color'] = np.where(c_sig['signal'] > 0, 'red', 'blue')

# 시각화하기
plt.figure(figsize=(15, 5))

# 1. 주가 선 그래프
plt.plot(sub_data.index, sub_data, label='Close Price', alpha=0.5)

# 2. 이동평균선 (시각적 이해를 돕기 위해 추가)
ma_data = modify_data['close'].rolling(window).mean().loc['2017-11-21':]
plt.plot(ma_data.index, ma_data, label=f'Moving Average ({window})', linestyle='--', color='orange')

# 3. 시그널 산점도
plt.scatter(sub_data.index, sub_data, c=c_sig['color'], s=20, label='Signal')

plt.title(f'Labeling: Using Moving Average (Window={window})')
plt.legend()
plt.show()

In [None]:
# Local min / max 를 추출하기 위한 함수
def get_local_min_max(close, wait=3):
    min_value = close.iloc[0]
    max_value = close.iloc[0]  # ① 초기값 설정
    n_cnt_min, n_cnt_max = 0, 0
    
    mins, maxes = [], []
    min_idxes, max_idxes = [], []
    b_min_update, b_max_update = False, False
    
    for idx, val in zip(close.index[1:], close.values[1:]):
        # 최저값 갱신 로직
        if val < min_value:
            min_value = val
            mins.append(min_value)
            min_idxes.append(idx)
            n_cnt_min = 0
            b_min_update = True
            
        # 최고값 갱신 로직
        if val > max_value:
            max_value = val  # ② 값 갱신
            maxes.append(max_value)  # ③ 리스트에 추가
            max_idxes.append(idx)
            n_cnt_max = 0
            b_max_update = True  # ④ 플래그 업데이트 (최고점 갱신됨)
        
        # 최고점이 갱신되지 않았을 때 (하락세 혹은 횡보 중일 때 최저점 로직 체크)
        if not b_max_update:
            b_min_update = False
            n_cnt_min += 1
            if n_cnt_min >= wait:
                max_value = min_value # 고점을 현재 저점으로 초기화 (추세 반전 감지용)
                n_cnt_min = 0
    
        # 최저점이 갱신되지 않았을 때 (상승세 혹은 횡보 중일 때 최고점 로직 체크)
        if not b_min_update:
            b_max_update = False  # ⑤ 플래그 초기화
            n_cnt_max += 1        # ⑥ 카운트 증가
            if n_cnt_max >= wait:
                min_value = max_value  # ⑦ 저점을 현재 고점으로 초기화
                n_cnt_max = 0
                
    # ⑧ 결과 반환 (시각화를 위해 DataFrame으로 변환하여 반환)
    return pd.DataFrame(mins, index=min_idxes, columns=['min']), pd.DataFrame(maxes, index=max_idxes, columns=['max'])


In [None]:
# Local mins, maxes를 확인
mins, maxes = get_local_min_max(sub_data, wait=3)

# mins, maxes 확인 
print(mins)
print('--'*20)
print(maxes)

In [None]:
# subplots 및 plot 생성
fig, ax = plt.subplots(1, 1, figsize=(10, 5)) # 적절한 사이즈 설정
ax.plot(sub_data, 'c', alpha=0.6) # 원본 데이터 (청록색)

# min_time, local_min을 활용한 scatter plot 생성
# 저점(매수 포인트)은 빨간색 점으로 표시
ax.scatter(mins.index, mins['min'], c='r', s=30, label='Local Min', zorder=2)

# maxes_time, local_max를 활용한 scatter plot 생성
# 고점(매도 포인트)은 파란색 점으로 표시
ax.scatter(maxes.index, maxes['max'], c='b', s=30, label='Local Max', zorder=2)

# y축 설정
ax.set_ylim([sub_data.min() * 0.99, sub_data.max() * 1.01])
ax.legend()
plt.show()

In [None]:
def t_val_lin_r(close):
    import statsmodels.api as sml
    
    # t-value from a linear trend
    x = np.ones((close.shape[0], 2))
    x[:, 1] = np.arange(close.shape[0])
    ols = sml.OLS(close, x).fit() 
    return ols.tvalues[1]

In [None]:
look_forward_window = 60
min_sample_length = 5
step = 1
t1_array = []
t_values_array = []

In [None]:
import statsmodels.api as sml

molecule = modify_data['2017-11-01':'2017-11-30'].index
label = pd.DataFrame(index=molecule, columns=['t1', 't_val', 'bin'])
tmp_out = []

for ind in tqdm(molecule):
    subset = modify_data.loc[ind:, 'close'].iloc[:look_forward_window]  # 전방 탐색을 위한 샘플 추출
    if look_forward_window > subset.shape[0]:
        continue

    tmp_subset = pd.Series(index=subset.index[min_sample_length-1:subset.shape[0]-1])
    tval = []

    # 회귀분석을 통해 t 통계량값을 이용하여 추세 추정
    for forward_window in np.arange(min_sample_length, subset.shape[0]):
        df = subset.iloc[:forward_window]
        # ① 해당 구간의 t-value 계산하여 리스트에 추가
        tval.append(t_val_lin_r(df))

    # 계산된 t-value들을 Series에 할당
    tmp_subset.loc[tmp_subset.index] = np.array(tval)

    # t-value의 절대값이 가장 큰(가장 강력한 추세인) 시점 찾기
    idx_max = tmp_subset.replace([-np.inf, np.inf, np.nan], 0).abs().idxmax()
    tmp_t_val = tmp_subset[idx_max]

    # 결과 저장: [추세 종료 시점, t-value값, 라벨(1 or -1)]
    tmp_out.append([tmp_subset.index[-1], tmp_t_val, np.sign(tmp_t_val)])

# ② 결과 리스트를 DataFrame에 할당
label.loc[molecule] = np.array(tmp_out)  # prevent leakage

label['t1'] = pd.to_datetime(label['t1'])
label['bin'] = pd.to_numeric(label['bin'], downcast='signed')

In [None]:
# 시각화
sub_data = modify_data.loc['2017-11-21', 'close']
c_sig = label['bin'].loc['2017-11-21']
c_sig['color'] = np.where(c_sig == 1, 'red', 'blue')

fig, ax = plt.subplots(1, 1, figsize=(10, 5))
ax.scatter(sub_data.index, sub_data.values, c=c_sig['color'])

In [None]:
!pip install ta==0.9.0
!pip install shap
!pip install

In [None]:
import datetime
import sys
import os
import re
import io
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import ta

import sys
sys.path.append('/aiffel/aiffel/fnguide/data/')
from libs.feature_importance import importance as imp
from sklearn.feature_selection import SequentialFeatureSelector, RFECV

from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import KFold
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier

In [None]:
# 데이터 경로 설정
DATA_PATH = '/aiffel/aiffel/fnguide/data/'
anno_file_name = os.path.join(DATA_PATH, 'sub_upbit_eth_min_tick_label.pkl')
target_file_name = os.path.join(DATA_PATH, 'sub_upbit_eth_min_tick.csv')

# 데이터 불러오기
df_modify_data = pd.read_csv(target_file_name, index_col=0, parse_dates=True)
df_label_data = pd.read_pickle(anno_file_name)
df_sub_modify_data = df_modify_data.loc[df_label_data.index]

# 학습 시간 단축을 위해 여기선 편의상 1000개의 데이터만 가져옵니다.
df_sub_modify_data = df_sub_modify_data.iloc[:1000]

In [None]:
# 기술적 지표를 적용합니다.

mt = 1
fillna = False
df_ = df_sub_modify_data.copy()
open, high, low, close, volume = 'open', 'high', 'low', 'close', 'volume'
cols = [open, high, low, close, volume]

## Volume Index
# Chaikin Money Flow
df_["volume_cmf"] = ta.volume.ChaikinMoneyFlowIndicator(
                        high=df_[high], low=df_[low], close=df_[close], volume=df_[volume], window=20*mt, fillna=fillna
                    ).chaikin_money_flow()
# Force Index
df_["volume_fi"] = ta.volume.ForceIndexIndicator(
                        close=df_[close], volume=df_[volume], window=15*mt, fillna=fillna
                    ).force_index()
# Money Flow Indicator
df_["volume_mfi"] = ta.volume.MFIIndicator(
                        high=df_[high],
                        low=df_[low],
                        close=df_[close],
                        volume=df_[volume],
                        window=15*mt,
                        fillna=fillna,
                    ).money_flow_index()
# Ease of Movement
df_["volume_sma_em"] = ta.volume.EaseOfMovementIndicator(
                            high=df_[high], low=df_[low], volume=df_[volume], window=15*mt, fillna=fillna
                        ).sma_ease_of_movement()

# Volume Price Trend
df_["volume_vpt"] = ta.volume.VolumePriceTrendIndicator(
                        close=df_[close], volume=df_[volume], fillna=fillna
                    ).volume_price_trend()

## volatility index
# Average True Range
df_["volatility_atr"] = ta.volatility.AverageTrueRange(
                            close=df_[close], high=df_[high], low=df_[low], window=10*mt, fillna=fillna
                        ).average_true_range()

# Ulcer Index
df_["volatility_ui"] = ta.volatility.UlcerIndex(
                            close=df_[close], window=15*mt, fillna=fillna
                        ).ulcer_index()

## trend index
# MACD
df_["trend_macd_diff"] = ta.trend.MACD(
                            close=df_[close], window_slow=25*mt, window_fast=10*mt, window_sign=9, fillna=fillna
                        ).macd_diff()

# Average Directional Movement Index (ADX)
df_["trend_adx"] = ta.trend.ADXIndicator(
                        high=df_[high], low=df_[low], close=df_[close], window=15*mt, fillna=fillna
                    ).adx()

# TRIX Indicator
df_["trend_trix"] = ta.trend.TRIXIndicator(
                        close=df_[close], window=15*mt, fillna=fillna
                    ).trix()

# Mass Index
df_["trend_mass_index"] = ta.trend.MassIndex(
                            high=df_[high], low=df_[low], window_fast=10*mt, window_slow=25*mt, fillna=fillna
                        ).mass_index()

# DPO Indicator
df_["trend_dpo"] = ta.trend.DPOIndicator(
                        close=df_[close], window=20*mt, fillna=fillna
                    ).dpo()

# Aroon Indicator
df_["trend_aroon_ind"] = ta.trend.AroonIndicator(close=df_[close], window=20, fillna=fillna).aroon_indicator()

## momentum index
# Relative Strength Index (RSI)
df_["momentum_rsi"] = ta.momentum.RSIIndicator(close=df_[close], window=15*mt, fillna=fillna).rsi()

# Williams R Indicator
df_["momentum_wr"] = ta.momentum.WilliamsRIndicator(
                        high=df_[high], low=df_[low], close=df_[close], lbp=15*mt, fillna=fillna
                    ).williams_r()

In [None]:
# 수익률 / 변동성 지표를 적용합니다.
windows_mom = [5, 10, 20]
windows_std = [30]

for i in windows_mom:
    df_[f'vol_change_{i}'] = df_.volume.pct_change(i).round(6)
    df_[f'ret_{i}'] = df_.close.pct_change(i).round(6)

for i in windows_std:
    df_[f'std_{i}'] = df_.close.rolling(i).std()
    df_[f'vol_std_{i}'] = df_.volume.rolling(i).std()

In [None]:
# df_label_data는 프로젝트 1에서 만든 라벨 데이터(예: Trend Scanning 결과)라고 가정합니다.
# df_tmp_data는 기술적 지표가 추가된 데이터(df_)와 라벨 데이터(df_label_data)를 합치고 결측치를 제거한 데이터프레임입니다.
df_tmp_data = df_.join(df_label_data).dropna()

# X, y 데이터셋 만들기
# X는 기술적 지표들이 있는 컬럼들 (5번째 컬럼부터 마지막 전까지)
# y는 라벨이 있는 마지막 컬럼
X = df_tmp_data.iloc[:, 5:-1] 
y = df_tmp_data.iloc[:, -1] # 라벨 데이터

# StandardScaler 적용
# 데이터의 스케일(단위)을 맞추기 위해 표준화(평균 0, 분산 1)를 진행합니다.
sc = StandardScaler()

# fit_transform 사용
# X 데이터를 표준화 변환합니다.
X_sc = sc.fit_transform(X)

# DataFrame 변환
# 변환된 numpy 배열을 다시 DataFrame으로 만들어서 컬럼명을 유지합니다.
X_sc = pd.DataFrame(X_sc, index=X.index, columns=X.columns)

In [None]:
# RandomForest 모델 적용
# 특성 중요도를 뽑아내기 위해 랜덤 포레스트 분류기를 사용
# random_state를 고정하여 결과의 재현성을 확보
rfc = RandomForestClassifier(class_weight='balanced', random_state=42)

# RandomForest fit
rfc.fit(X_sc, y)

In [None]:
# MDI, Mean Decrease Impurity 
feat_imp = imp.mean_decrease_impurity(rfc, X.columns)
feat_imp

In [None]:
# MDA, Mean Decrease Accuracy
svc_rbf = SVC(kernel='rbf', probability=True) # Tree 및 Support Vector Machine 외에 다른 분류기(classifier)를 사용해봅시다.
cv = KFold(n_splits=5) # n_splits을 변경해봅시다.
feat_imp_mda = imp.mean_decrease_accuracy(svc_rbf, X_sc, y, cv_gen=cv)

In [None]:
# plot_feature_importance 함수 만들기
def plot_feature_importance(importance_df, save_fig=False, output_path=None):
    # Plot mean imp bars with std
    plt.figure(figsize=(10, importance_df.shape[0] / 5))
    importance_df.sort_values('mean', ascending=True, inplace=True)
    importance_df['mean'].plot(kind='barh', color='b', alpha=0.25, xerr=importance_df['std'], error_kw={'ecolor': 'r'})
    if save_fig:
        plt.savefig(output_path) 
    else:
        plt.show()

In [None]:
# feat_imp (MDI) 확인
# 앞서 계산한 MDI 결과 변수인 feat_imp를 넣습니다.
plot_feature_importance(feat_imp)

# feat_imp_mda (MDA) 확인
# 앞서 계산한 MDA 결과 변수인 feat_imp_mda를 넣습니다.
plot_feature_importance(feat_imp_mda)

In [None]:
# RFE CV, Recursive Feature Elimination
# kernel='linear'여야 coef_ 속성을 통해 특성 중요도를 파악할 수 있습니다.
svc_rbf = SVC(kernel='linear', probability=True) 

# RFECV 인스턴스 생성
# step=1: 한 번에 하나씩 특성을 제거
# cv=5: 5-Fold 교차 검증 사용
# scoring='accuracy': 정확도를 기준으로 성능 평가
rfe_cv = RFECV(estimator=svc_rbf, step=1, cv=5, scoring='accuracy') 

# fit: 학습 진행 (X_sc: 스케일링된 특성 데이터, y: 라벨)
rfe_fitted = rfe_cv.fit(X_sc, y)

In [None]:
# 선택된 피쳐 확인하기

# support_ 속성을 사용하여 선택된 특성(True)의 컬럼명만 추출
selected_features = X_sc.columns[rfe_fitted.support_]

# 결과 출력
print("Optimal number of features : %d" % rfe_fitted.n_features_)
print("Selected features :", selected_features.tolist())

In [None]:
# SFS, Sequential Feature Selection

n = 2 
sfs_forward = SequentialFeatureSelector(svc_rbf, n_features_to_select=n, direction='forward')
sfs_fitted = sfs_forward.fit(X_sc, y)

In [None]:
# 선택된 피쳐 확인하기

# support_는 선택된 특성을 True로 표시하는 불리언 배열입니다.
sfs_features = X_sc.columns[sfs_fitted.support_]

print("Selected features by SFS:", sfs_features.tolist())

In [None]:
# Q. 코드를 작성해주세요
!pip install
# SHAP, Shapley Additive explanations
import shap
explainer = shap.TreeExplainer(rfc)
shap_value = explainer.shap_values(X_sc)

# shap_value, X_sc 사용 shap.summary_plot 그리기
shap.summary_plot(shap_value, X_sc)

In [None]:
import datetime
import sys
import os
import re
import io
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import ta

from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import KFold
from sklearn.ensemble import RandomForestClassifier, BaggingClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, confusion_matrix, f1_score, roc_auc_score, roc_curve

sys.path.append('/aiffel/aiffel/fnguide/data/')
from libs.mlutil.pkfold import PKFold

In [None]:
# 데이터 경로 설정 및 pickle 파일 불러오기
DATA_PATH = '/aiffel/aiffel/fnguide/data/'
data_file_name = os.path.join(DATA_PATH, 'sub_upbit_eth_min_feature_labels.pkl')


# 여기서부터 모델에 적용하기 위한 데이터 정제화를 시작합니다.
df_data['t_value'].value_counts()

# 데이터셋 비율 나누기
train_ratio, test_ratio = 0.7, 0.2
n_train = int(np.round(len(df_data) * train_ratio))
n_test = int(np.round(len(df_data) * test_ratio))

X, y = df_data.iloc[:, 5:-1], df_data.iloc[:, -1]

# standardzation
sc = StandardScaler()
X_sc = sc.fit_transform(X)

# 데이터셋 분리
train_x, test_x, train_y, test_y = X_sc[:n_train, :], X_sc[-n_test:, :], y.iloc[:n_train], y.iloc[-n_test:]

train_x = pd.DataFrame(train_x, index=train_y.index, columns=X.columns)
train_y = pd.Series(train_y, index=train_y.index)
test_x = pd.DataFrame(test_x, index=test_y.index, columns=X.columns)
test_y = pd.Series(test_y, index=test_y.index)

# 학습 시간 단축을 위해 여기선 편의상 1000개의 데이터만 가져옵니다.
train_x = train_x[:1000] # 데이터셋을 증가 혹은 감소시켜 결과를 비교해봅시다.
train_y = train_y[:1000]

In [None]:
# Q. 코드를 작성해주세요

from sklearn.model_selection import KFold
import numpy as np

class PurgedKFold(KFold):
    """
    K-Fold를 확장하여 시계열 데이터의 정보 유출(Leakage)을 방지하는 클래스입니다.
    - Purging: 테스트 기간과 겹치는(Label이 겹치는) 훈련 데이터를 제거합니다.
    - Embargo: 테스트 기간 직후의 훈련 데이터를 일정 기간 제외합니다.
    """
    def __init__(self, n_splits=3, t1=None, pctEmbargo=0.):
        # 시계열이므로 shuffle=False로 고정합니다.
        super(PurgedKFold, self).__init__(n_splits, shuffle=False, random_state=None)
        self.t1 = t1
        self.pctEmbargo = pctEmbargo

    def split(self, X, y=None, groups=None):
        indices = np.arange(X.shape[0])
        mbrg = int(X.shape[0] * self.pctEmbargo)
        
        for train_ix, test_ix in super(PurgedKFold, self).split(X):
            # 1. Purging (기본적으로 테스트셋 시간대 전후의 중첩 구간 제거)
            # 여기서는 t1(이벤트 종료 시점) 정보를 활용하여 
            # 훈련 데이터 중 라벨이 테스트 데이터 시점과 겹치는 부분을 찾아서 제외합니다.
            
            # 테스트 셋의 시작과 끝 시간 찾기
            t0 = self.t1.index[train_ix] # 훈련 데이터의 시작 시간들
            test_start_time = self.t1.index[test_ix[0]]
            test_end_time = self.t1.index[test_ix[-1]]

            # 훈련 셋에서 제외해야 할 인덱스 찾기 (Overlap)
            # t1(이벤트 종료)이 테스트 시작보다 뒤에 있는 경우 (미래 정보 포함)
            train_t1 = self.t1.iloc[train_ix]
            # 테스트 구간과 겹치는 훈련 데이터 제거
            train_ix = train_ix[train_t1 <= test_start_time] 
            
            # 2. Embargo (테스트셋 뒤의 훈련 데이터 일부 제거)
            # 테스트셋 인덱스 중 가장 큰 값 + 엠바고 크기보다 큰 훈련 인덱스만 남김
            max_test_ix = test_ix.max()
            train_ix = train_ix[~((train_ix > max_test_ix) & (train_ix <= max_test_ix + mbrg))]
            
            yield train_ix, test_ix

n_cv = 5

# t1: 이벤트(라벨) 종료 시점 정보
# (train_y의 인덱스가 datetime인 경우를 가정합니다)
t1 = pd.Series(train_y.index.values, index=train_y.index)

# Purged K-Fold 객체 생성
# 이제 위에서 정의했으므로 오류가 나지 않습니다.
cv = PurgedKFold(n_splits=n_cv, t1=t1, pctEmbargo=0.01)

In [None]:
# GridsearchCV에서 사용할 파라미터 설정합니다. 파라미터값을 바꿔보세요
bc_params = {'n_estimators': [5, 10, 20],
             'max_features': [0.5, 0.7],
             'base_estimator__max_depth': [3,5,10,20],
             'base_estimator__max_features': [None, 'auto'],
             'base_estimator__min_samples_leaf': [3, 5, 10],
             'bootstrap_features': [False, True]
            }

In [None]:
# RandomForest 사용
rfc = RandomForestClassifier(class_weight='balanced')

In [None]:
from sklearn.ensemble import BaggingClassifier

# Q. 코드를 작성해주세요

# Bagging 적용
# rfc(RandomForest)를 기본 모델(base_estimator)로 사용하는 배깅 분류기를 만듭니다.
# n_jobs=-1을 사용하여 가능한 모든 CPU 코어를 사용하여 속도를 높입니다.
bag_rfc = BaggingClassifier(base_estimator=rfc, n_jobs=-1, random_state=42)

In [None]:
from sklearn.model_selection import GridSearchCV

# Q. 코드를 작성해주세요

# GridSearchCV 적용
# estimator: 앞서 정의한 BaggingClassifier 객체 (bag_rfc)
# param_grid: 탐색할 파라미터 조합 (bc_params)
# cv: 정보 유출을 막는 Purged K-Fold 객체 (cv)
# scoring: 성능 평가 기준 (예: 'accuracy', 'f1_macro' 등)
# verbose: 진행 상황 출력 (1 이상이면 로그 출력)
gs_rfc = GridSearchCV(estimator=bag_rfc, param_grid=bc_params, cv=cv, scoring='accuracy', verbose=1)

In [None]:
# fit
gs_rfc.fit(train_x, train_y)

# best estimator 
gs_rfc_best = gs_rfc.best_estimator_
gs_rfc_best.fit(train_x, train_y)

In [None]:
# 예측값 확인
pred_y = gs_rfc_best.predict(test_x)
prob_y = gs_rfc_best.predict_proba(test_x)

In [None]:
# Q. 코드를 작성해주세요

# test_y, pred_y를 활용한 지표 적용
confusion = confusion_matrix(test_y, pred_y)
accuracy  = accuracy_score(test_y, pred_y)
precision = precision_score(test_y, pred_y)
recall    = recall_score(test_y, pred_y)

# 지표를 통한 결과 확인
print('================= confusion matrix ====================')
print(confusion)
print('=======================================================')
print(f'정확도:{accuracy:.4f}, 정밀도:{precision:.4f}, 재현율:{recall:.4f}')

In [None]:
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, roc_curve, auc
import matplotlib.pyplot as plt

# --- 확률 및 예측값 생성 ---
pred_proba_class1 = gs_rfc_best.predict_proba(test_x)[:, 1]
pred_y = gs_rfc_best.predict(test_x)
# --------------------------------------

# Q. 코드를 작성해주세요

# test_y, pred_y를 활용한 지표 적용
confusion = confusion_matrix(test_y, pred_y)
accuracy  = accuracy_score(test_y, pred_y)
precision = precision_score(test_y, pred_y)
recall    = recall_score(test_y, pred_y)

# 지표를 통한 결과 확인
print('================= confusion matrix ====================')
print(confusion)
print('=======================================================')
print(f'정확도:{accuracy:.4f}, 정밀도:{precision:.4f}, 재현율:{recall:.4f}')


# Q. 코드를 작성해주세요

# ROC curve 만들기
fpr, tpr, thresholds = roc_curve(test_y, pred_proba_class1)
auc_score = auc(fpr, tpr) # 변수명 중복 방지를 위해 auc -> auc_score로 변경 권장

# ROC curve 시각화
plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, linewidth=2, label=f'ROC Curve (AUC = {auc_score:.4f})')
plt.plot([0, 1], [0, 1], 'k--') # dashed diagonal
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.legend()
plt.title('ROC Curve')
print(f'auc:{auc_score:.4f}')
plt.show()