# 모델 에러 분석 (Error Analysis)

학습된 최적 모델(Tuned Model)의 예측 결과를 분석합니다.

Test Set에서의 오분류(False Positive, False Negative) 사례를 상세히 뜯어보고,

모델이 왜 틀렸는지(SHAP), 어떤 패턴의 판매자를 놓치고 있는지 파악하여 개선 아이디어를 도출합니다.

In [None]:
import glob
import os

import joblib
import pandas as pd
import plotly.express as px
import shap
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.model_selection import train_test_split

from src.features.feature_generation import FeatureGenerator, FEATURE_NAMES_KO, get_feature_name_ko
from src.visualize import plot_shap_waterfall

# 시각화 설정
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 100)

## 1. 데이터 로드 및 전처리
학습 때와 동일한 방식으로 피처 엔지니어링을 수행합니다.

In [None]:
# 1. 데이터 로드 및 피처 생성
generator = FeatureGenerator().load_data(from_db=True)
features_df = generator.generate_all_features()

# 타겟 및 피처 정의 (동적으로 피처 컬럼 추출)
exclude_cols = ['company_name', 'is_abusing_seller']
feature_columns = [col for col in features_df.columns if col not in exclude_cols]

X = features_df[feature_columns]
y = features_df['is_abusing_seller'].astype(int)

print(f"데이터 준비 완료: {features_df.shape}")
print(f"총 피처 수: {len(feature_columns)}개")

## 2. 모델 및 스케일러 로드

In [None]:
# 저장된 모델 찾기
model_files = glob.glob('../models/abusing_detector_tuned_*.pkl')
if not model_files:
    raise FileNotFoundError("모델 파일을 찾을 수 없습니다. 05_model_improvement.ipynb를 먼저 실행해주세요.")

# 가장 최근 파일 사용
latest_model_path = max(model_files, key=os.path.getctime)
print(f"로드할 모델: {latest_model_path}")

model = joblib.load(latest_model_path)
scaler = joblib.load('../models/scaler_tuned.pkl')
print("모델 및 스케일러 로드 완료")

## 3. Test Set 예측 및 평가
학습 때와 동일한 random_state(42)로 분리하여 Test Set을 확보합니다.

In [None]:
# Train/Test 분할
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

# 모델 타입 확인 후 스케일링 여부 결정
model_type = str(type(model).__name__)
print(f"모델 타입: {model_type}")

# 트리 기반 모델은 스케일링 불필요
tree_based_models = ['RandomForest', 'GradientBoosting', 'XGB', 'LGBM', 'ExtraTrees', 'DecisionTree', 'AdaBoost']
needs_scaling = not any(tb in model_type for tb in tree_based_models)

if needs_scaling:
    print("→ 스케일링 필요 모델: 스케일링 적용")
    X_test_eval = scaler.transform(X_test)
else:
    print("→ 트리 기반 모델: 스케일링 없이 원본 데이터 사용")
    X_test_eval = X_test

# 예측
y_pred = model.predict(X_test_eval)
y_proba = model.predict_proba(X_test_eval)[:, 1]

# 평가 결과
print("\nClassification Report:")
print(classification_report(y_test, y_pred))

## 4. 오분류(Error) 분석

Confusion Matrix를 통해 오분류 유형을 파악합니다.
- **FP (False Positive)**: 정상 판매자를 어뷰징으로 오인 (억울한 판매자 발생)
- **FN (False Negative)**: 어뷰징 판매자를 정상으로 오인 (탐지 실패)

In [None]:
# Confusion Matrix 시각화
cm = confusion_matrix(y_test, y_pred)
labels = ['정상 (0)', '어뷰징 (1)']

fig = px.imshow(cm, text_auto=True, 
                x=labels, y=labels,
                labels=dict(x="예측 결과", y="실제 값", color="빈도 수"),
                color_continuous_scale="Blues")
fig.update_layout(title="혼동 행렬 (Confusion Matrix)", width=600, height=500)
fig.show()

In [None]:
# 분석용 데이터프레임 생성
analysis_df = X_test.copy()
analysis_df['company_name'] = features_df.loc[X_test.index, 'company_name']
analysis_df['actual'] = y_test
analysis_df['predicted'] = y_pred
analysis_df['prob_abusing'] = y_proba

# 에러 유형 정의
analysis_df['error_type'] = '정상 탐지 (Correct)'
analysis_df.loc[(analysis_df['actual'] == 0) & (analysis_df['predicted'] == 1), 'error_type'] = '오탐 (False Positive)' # 정상인데 어뷰징이라 예측
analysis_df.loc[(analysis_df['actual'] == 1) & (analysis_df['predicted'] == 0), 'error_type'] = '미탐 (False Negative)' # 어뷰징인데 정상이라 예측

print("에러 유형 분포:")
print(analysis_df['error_type'].value_counts())

### 4.1 False Positive 분석 (정상 -> 어뷰징 오탐)
정상적인 판매자인데 왜 어뷰징으로 의심받았을까요? 특징을 살펴봅니다.

In [None]:
fp_df = analysis_df[analysis_df['error_type'] == '오탐 (False Positive)'].sort_values('prob_abusing', ascending=False)

# 표시할 주요 피처 (존재하는 것만)
display_cols = ['company_name', 'actual', 'predicted', 'prob_abusing']
optional_cols = ['satisfaction_score', 'review_count', 'avg_review_length', 'avg_rating', 'answer_rate']
display_cols.extend([c for c in optional_cols if c in analysis_df.columns])

print(f"총 {len(fp_df)}건의 False Positive 발견")
if len(fp_df) > 0:
    display(fp_df[display_cols])

In [None]:
# FP 사례에 대한 SHAP 분석 (가장 확신을 가지고 틀린 케이스)
if len(fp_df) > 0:
    target_idx = fp_df.index[0]
    target_row_loc = list(X_test.index).index(target_idx)
    
    print(f"CASE: {fp_df.iloc[0]['company_name']} (Prob: {fp_df.iloc[0]['prob_abusing']:.4f})")
    
    # TreeExplainer 지원 모델 (AdaBoost는 미지원)
    tree_explainer_models = ['RandomForest', 'GradientBoosting', 'XGB', 'LGBM', 'ExtraTrees', 'DecisionTree']
    use_tree_explainer = any(tm in str(type(model)) for tm in tree_explainer_models)
    
    if use_tree_explainer:
        explainer = shap.TreeExplainer(model)
        shap_values = explainer.shap_values(X_test_eval)
        
        # SHAP 값 추출 (Class 1에 대한 값)
        if isinstance(shap_values, list):
            sv = shap_values[1][target_row_loc]
        elif hasattr(shap_values, 'shape') and len(shap_values.shape) == 3:
            sv = shap_values[target_row_loc, :, 1]
        else:
            sv = shap_values[target_row_loc]

        # Base Value 추출
        if isinstance(explainer.expected_value, list):
            bv = explainer.expected_value[1]
        elif hasattr(explainer.expected_value, '__iter__') and len(explainer.expected_value) > 1:
            bv = explainer.expected_value[1]
        else:
            bv = explainer.expected_value
        
        # Plotly Waterfall 시각화
        fig = plot_shap_waterfall(
            shap_values=sv,
            sample_idx=target_row_loc,
            feature_columns=feature_columns,
            feature_values=X_test.iloc[target_row_loc],
            base_value=bv,
            top_n=20,
            title=f"<b>오탐 (False Positive): {fp_df.iloc[0]['company_name']}</b><br>Base: {float(bv[0]) if hasattr(bv, "__len__") else float(bv):.3f} → Prediction: {float((bv + sv.sum())[0]) if hasattr(bv + sv.sum(), "__len__") else float(bv + sv.sum()):.3f}"
        )
        fig.show()
    else:
        print("TreeExplainer 미지원 모델 - KernelExplainer로 대체")
        background = shap.sample(X_test, 50)
        explainer = shap.KernelExplainer(model.predict_proba, background)
        shap_values = explainer.shap_values(X_test_eval[target_row_loc:target_row_loc+1])
        
        if isinstance(shap_values, list):
            sv = shap_values[1][0]
        else:
            sv = shap_values[0]
        
        bv = explainer.expected_value[1] if isinstance(explainer.expected_value, list) else explainer.expected_value
        
        fig = plot_shap_waterfall(
            shap_values=sv,
            sample_idx=target_row_loc,
            feature_columns=feature_columns,
            feature_values=X_test.iloc[target_row_loc],
            base_value=bv,
            top_n=20,
            title=f"<b>오탐 (False Positive): {fp_df.iloc[0]['company_name']}</b><br>Base: {float(bv[0]) if hasattr(bv, "__len__") else float(bv):.3f} → Prediction: {float((bv + sv.sum())[0]) if hasattr(bv + sv.sum(), "__len__") else float(bv + sv.sum()):.3f}"
        )
        fig.show()

### 4.2 False Negative 분석 (어뷰징 -> 정상 미탐)
어뷰징 판매자임에도 불구하고 왜 정상으로 판단했을까요? (가장 위험한 케이스)

In [None]:
fn_df = analysis_df[analysis_df['error_type'] == '미탐 (False Negative)'].sort_values('prob_abusing')

# 표시할 주요 피처 (존재하는 것만)
display_cols = ['company_name', 'actual', 'predicted', 'prob_abusing']
optional_cols = ['satisfaction_score', 'review_count', 'avg_review_length', 'avg_rating', 'answer_rate']
display_cols.extend([c for c in optional_cols if c in analysis_df.columns])

print(f"총 {len(fn_df)}건의 False Negative 발견")
if len(fn_df) > 0:
    display(fn_df[display_cols])

In [None]:
# FN 사례에 대한 SHAP 분석
if len(fn_df) > 0:
    target_idx = fn_df.index[0]
    target_row_loc = list(X_test.index).index(target_idx)
    
    print(f"CASE: {fn_df.iloc[0]['company_name']} (Prob: {fn_df.iloc[0]['prob_abusing']:.4f})")
    
    # TreeExplainer 지원 모델 (AdaBoost는 미지원)
    tree_explainer_models = ['RandomForest', 'GradientBoosting', 'XGB', 'LGBM', 'ExtraTrees', 'DecisionTree']
    use_tree_explainer = any(tm in str(type(model)) for tm in tree_explainer_models)
    
    if use_tree_explainer:
        explainer = shap.TreeExplainer(model)
        shap_values = explainer.shap_values(X_test_eval)
        
        # SHAP 값 추출 (Class 1에 대한 값)
        if isinstance(shap_values, list):
            sv = shap_values[1][target_row_loc]
        elif hasattr(shap_values, 'shape') and len(shap_values.shape) == 3:
            sv = shap_values[target_row_loc, :, 1]
        else:
            sv = shap_values[target_row_loc]

        # Base Value 추출
        if isinstance(explainer.expected_value, list):
            bv = explainer.expected_value[1]
        elif hasattr(explainer.expected_value, '__iter__') and len(explainer.expected_value) > 1:
            bv = explainer.expected_value[1]
        else:
            bv = explainer.expected_value
        
        # Plotly Waterfall 시각화
        fig = plot_shap_waterfall(
            shap_values=sv,
            sample_idx=target_row_loc,
            feature_columns=feature_columns,
            feature_values=X_test.iloc[target_row_loc],
            base_value=bv,
            top_n=20,
            title=f"<b>미탐 (False Negative): {fn_df.iloc[0]['company_name']}</b><br>Base: {float(bv[0]) if hasattr(bv, "__len__") else float(bv):.3f} → Prediction: {float((bv + sv.sum())[0]) if hasattr(bv + sv.sum(), "__len__") else float(bv + sv.sum()):.3f}"
        )
        fig.show()
    else:
        print("TreeExplainer 미지원 모델 - KernelExplainer로 대체")
        background = shap.sample(X_test, 50)
        explainer = shap.KernelExplainer(model.predict_proba, background)
        shap_values = explainer.shap_values(X_test_eval[target_row_loc:target_row_loc+1])
        
        if isinstance(shap_values, list):
            sv = shap_values[1][0]
        else:
            sv = shap_values[0]
        
        bv = explainer.expected_value[1] if isinstance(explainer.expected_value, list) else explainer.expected_value
        
        fig = plot_shap_waterfall(
            shap_values=sv,
            sample_idx=target_row_loc,
            feature_columns=feature_columns,
            feature_values=X_test.iloc[target_row_loc],
            base_value=bv,
            top_n=20,
            title=f"<b>미탐 (False Negative): {fn_df.iloc[0]['company_name']}</b><br>Base: {float(bv[0]) if hasattr(bv, "__len__") else float(bv):.3f} → Prediction: {float((bv + sv.sum())[0]) if hasattr(bv + sv.sum(), "__len__") else float(bv + sv.sum()):.3f}"
        )
        fig.show()

## 5. 에러 유형별 피처 분포 비교
정상(TN), 어뷰징(TP) 그리고 에러(FP, FN) 그룹 간에 피처 분포 차이를 확인합니다.
어떤 피처가 모델을 헷갈리게 만들었는지 직관적으로 볼 수 있습니다.

In [None]:
# 주요 피처 선정 (새 피처 세트에 맞게 업데이트)
key_features = ['satisfaction_score', 'review_count', 'price', 'avg_review_length', 'answer_rate', 
                'negative_keyword_ratio', 'duplicate_review_ratio']

# 존재하는 피처만 필터링
key_features = [f for f in key_features if f in feature_columns]

for col in key_features:
    col_ko = get_feature_name_ko(col)
    fig = px.box(analysis_df, x='error_type', y=col, color='error_type', 
                 title=f'{col_ko} 분포 (에러 유형별)',
                 points="all")
    fig.update_layout(yaxis_title=col_ko)
    fig.show()