<a href="https://colab.research.google.com/github/kahram-y/AML_project/blob/main/AML(2_1).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

금융결제원 AML 탐지 프로젝트

IBM Transactions for Anti Money Laundering 데이터셋 활용

목표: 자금세탁 계좌 탐지 (노드 분류)

프로젝트 구조:
1. 데이터 탐색 및 전처리
2. 문제 정의
3. 피쳐 생성 (집계 피쳐 + 그래프 피쳐)
4. 모델 학습 및 평가
   - Baseline: XGBoost/CatBoost
   - 고도화 모델
      - 시계열 모델 앙상블
      - 그래프 피쳐 추가 전후 비교
      - GNN 모델
      - GNN 아키텍처 고도화
      - 백테스트/시뮬레이션

In [1]:
pip install pandas numpy scikit-learn xgboost catboost lightgbm networkx matplotlib seaborn imbalanced-learn shap

Collecting catboost
  Downloading catboost-1.2.8-cp312-cp312-manylinux2014_x86_64.whl.metadata (1.2 kB)
Downloading catboost-1.2.8-cp312-cp312-manylinux2014_x86_64.whl (99.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m99.2/99.2 MB[0m [31m8.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: catboost
Successfully installed catboost-1.2.8


In [2]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import warnings
warnings.filterwarnings('ignore')

from datetime import datetime, timedelta
from sklearn.model_selection import train_test_split, StratifiedKFold, TimeSeriesSplit
from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score, classification_report, confusion_matrix
from sklearn.ensemble import VotingClassifier
from sklearn.preprocessing import StandardScaler
from imblearn.over_sampling import SMOTE

import xgboost as xgb, XGBClassifier

from catboost import CatBoostClassifier
import lightgbm as lgb

import networkx as nx
import matplotlib.pyplot as plt
import seaborn as sns

In [3]:
from google.colab import drive
drive.mount('/content/drive')

# 파일 경로
trans_path = '/content/drive/MyDrive/HI-Small_Trans.csv'
accounts_path = '/content/drive/MyDrive/HI-Small_accounts.csv'

Mounted at /content/drive


In [4]:
# ====================================
# 1. 데이터 로드 및 탐색
# ====================================

class AMLDataLoader:
    """AML 데이터 로드 및 초기 탐색"""

    def __init__(self, trans_path, accounts_path):
        self.trans_path = trans_path
        self.accounts_path = accounts_path

    def load_data(self):
        """데이터 로드"""
        print("=" * 80)
        print("데이터 로딩 중...")

        # 거래 데이터 로드
        self.transactions = pd.read_csv(self.trans_path)

        # 계좌 데이터 로드
        self.accounts = pd.read_csv(self.accounts_path)

        print(f"거래 데이터 shape: {self.transactions.shape}")
        print(f"계좌 데이터 shape: {self.accounts.shape}")

        return self.transactions, self.accounts

    def explore_data(self, df_trans, df_accounts):
        """데이터 탐색"""
        print("\n" + "=" * 80)
        print("데이터 탐색")
        print("=" * 80)

        # 거래 데이터 기본 정보
        print("\n[거래 데이터 샘플]")
        print(df_trans.head())
        print(f"\n컬럼: {df_trans.columns.tolist()}")
        print(f"\n결측치:\n{df_trans.isnull().sum()}")

        # Is Laundering 분포
        if 'Is Laundering' in df_trans.columns:
            laundering_dist = df_trans['Is Laundering'].value_counts()
            print(f"\n[자금세탁 분포]")
            print(laundering_dist)
            print(f"자금세탁 비율: {laundering_dist[1] / len(df_trans) * 100:.4f}%")

        # 시간 정보 파싱 및 분포 확인
        df_trans['Timestamp'] = pd.to_datetime(df_trans['Timestamp'])
        df_trans['Year'] = df_trans['Timestamp'].dt.year
        df_trans['Month'] = df_trans['Timestamp'].dt.month
        df_trans['Day'] = df_trans['Timestamp'].dt.day
        df_trans['Hour'] = df_trans['Timestamp'].dt.hour
        df_trans['DayOfWeek'] = df_trans['Timestamp'].dt.dayofweek

        print(f"\n[시간 범위]")
        print(f"시작: {df_trans['Timestamp'].min()}")
        print(f"종료: {df_trans['Timestamp'].max()}")

        # Day별 자금세탁 분포 확인
        if 'Is Laundering' in df_trans.columns:
            day_dist = df_trans.groupby('Day')['Is Laundering'].agg(['sum', 'count', 'mean'])
            print(f"\n[일별 자금세탁 분포]")
            print(day_dist)

            # 치우침 확인
            if day_dist['mean'].std() > 0.1:
                print("\n⚠️ Day 피쳐의 label 분포가 치우쳐져 있습니다. 제거 고려 필요")

        # 계좌 데이터 정보
        print(f"\n[계좌 데이터 샘플]")
        print(df_accounts.head())

        # Bank Name 분포 확인
        if 'Bank Name' in df_accounts.columns:
            print(f"\n은행 분포:\n{df_accounts['Bank Name'].value_counts().head(10)}")

        return df_trans


# ====================================
# 2. 데이터 전처리 및 샘플링
# ====================================

class AMLPreprocessor:
    """데이터 전처리 및 샘플링"""

    def __init__(self, sample_ratio=0.1, random_state=42):
        self.sample_ratio = sample_ratio
        self.random_state = random_state

    def create_hourly_samples(self, df_trans):
        """시간 단위 배치로 모델 단위 생성
        각 계좌번호 + 시간 단위로 샘플 생성
        """
        print("\n" + "=" * 80)
        print("시간 단위 배치 샘플 생성")
        print("=" * 80)

        # From Account 기준 샘플 생성
        from_samples = df_trans.copy()
        from_samples['Account'] = from_samples['From Bank'].astype(str) + '_' + from_samples['Account'].astype(str)
        from_samples['TimeUnit'] = from_samples['Timestamp'].dt.strftime('%Y-%m-%d %H')
        from_samples['Direction'] = 'OUT'

        # To Account 기준 샘플 생성
        to_samples = df_trans.copy()
        to_samples['Account'] = to_samples['To Bank'].astype(str) + '_' + to_samples['Account.1'].astype(str)
        to_samples['TimeUnit'] = to_samples['Timestamp'].dt.strftime('%Y-%m-%d %H')
        to_samples['Direction'] = 'IN'

        # 합치기
        all_samples = pd.concat([from_samples, to_samples], ignore_index=True)

        # 계좌 + 시간 단위로 그룹화하여 label 결정
        # Is Laundering=1인 거래가 하나라도 있으면 해당 시간의 계좌는 suspicious
        account_time_labels = all_samples.groupby(['Account', 'TimeUnit']).agg({
            'Is Laundering': 'max',  # 하나라도 1이면 1
            'Timestamp': 'min'
        }).reset_index()

        account_time_labels.rename(columns={'Timestamp': 'TimeUnit_Start'}, inplace=True)

        print(f"총 샘플 수 (계좌-시간 단위): {len(account_time_labels)}")
        print(f"자금세탁 샘플: {account_time_labels['Is Laundering'].sum()}")

        return all_samples, account_time_labels

    def stratified_sample(self, df, target_col='Is Laundering'):
        """계층화 샘플링"""
        print(f"\n계층화 샘플링 (비율: {self.sample_ratio})")

        # 클래스별로 샘플링
        sampled_dfs = []
        for label in df[target_col].unique():
            label_df = df[df[target_col] == label]
            n_samples = int(len(label_df) * self.sample_ratio)
            sampled = label_df.sample(n=n_samples, random_state=self.random_state)
            sampled_dfs.append(sampled)

        result = pd.concat(sampled_dfs, ignore_index=True)
        print(f"샘플링 후 크기: {len(result)}")
        print(f"자금세탁 비율: {result[target_col].sum() / len(result) * 100:.4f}%")

        return result

    def analyze_laundering_patterns(self, df_trans):
        """자금세탁 건과 정상 건의 차이 분석"""
        print("\n" + "=" * 80)
        print("자금세탁 패턴 분석")
        print("=" * 80)

        laundering = df_trans[df_trans['Is Laundering'] == 1]
        normal = df_trans[df_trans['Is Laundering'] == 0]

        # 거래 금액 비교
        print(f"\n[거래 금액 통계]")
        print(f"자금세탁 - 평균: ${laundering['Amount Received'].mean():.2f}, "
              f"중앙값: ${laundering['Amount Received'].median():.2f}")
        print(f"정상 거래 - 평균: ${normal['Amount Received'].mean():.2f}, "
              f"중앙값: ${normal['Amount Received'].median():.2f}")

        # 결제 수단 분포
        print(f"\n[결제 수단 분포]")
        print("자금세탁:")
        print(laundering['Receiving Currency'].value_counts().head())
        print("\n정상 거래:")
        print(normal['Receiving Currency'].value_counts().head())

        # 시간대 분포
        print(f"\n[시간대 분포]")
        print("자금세탁 - 시간대별:")
        print(laundering['Hour'].value_counts().sort_index())

        return laundering, normal


# ====================================
# 3. 피쳐 생성
# ====================================

class FeatureEngineer:
    """집계 피쳐 및 그래프 피쳐 생성"""

    def __init__(self):
        self.feature_names = []

    def create_aggregation_features(self, df_trans, account_time_df):
        """50개 이상의 집계 피쳐 생성
        과거 정보만 사용 (Data Leakage 방지)
        """
        print("\n" + "=" * 80)
        print("집계 피쳐 생성")
        print("=" * 80)

        features_list = []
        error_count = 0

        # 샘플 수 제한 (시연용) - 증가
        max_samples = min(2000, len(account_time_df))  # 1000 → 2000
        print(f"총 {len(account_time_df)}건 중 {max_samples}건 처리")

        # TimeUnit을 문자열로 통일
        account_time_df['TimeUnit'] = account_time_df['TimeUnit'].astype(str)

        for idx, row in account_time_df.head(max_samples).iterrows():
            if idx % 200 == 0:
                print(f"진행: {idx}/{max_samples} (에러: {error_count}건)")

            try:
                account = row['Account']
                time_unit = pd.to_datetime(row['TimeUnit_Start'])
                time_unit_str = row['TimeUnit']  # 문자열 버전 저장

                # 해당 계좌의 과거 거래만 필터링
                past_trans = df_trans[
                    (df_trans['Account'] == account) &
                    (df_trans['Timestamp'] < time_unit)
                ].copy()

                if len(past_trans) == 0:
                    # 과거 거래가 없으면 기본값
                    features = self._default_features(account, time_unit_str)
                else:
                    features = self._compute_features(past_trans, account, time_unit_str)

                features_list.append(features)

            except Exception as e:
                error_count += 1
                if error_count <= 5:  # 처음 5개 에러만 출력
                    print(f"⚠️ 에러 발생 (idx={idx}): {e}")

                # 에러 발생 시에도 기본값 추가
                try:
                    features = self._default_features(row['Account'], row['TimeUnit'])
                    features_list.append(features)
                except:
                    pass  # 기본값 생성도 실패하면 스킵

        if len(features_list) == 0:
            print("⚠️ 피쳐 생성 실패! 빈 데이터프레임 반환")
            # 최소한의 피쳐 구조 생성
            return pd.DataFrame({
                'Account': [],
                'TimeUnit': [],
                'total_trans_count': []
            })

        feature_df = pd.DataFrame(features_list)
        self.feature_names = [c for c in feature_df.columns
                             if c not in ['Account', 'TimeUnit']]

        print(f"\n✅ 피쳐 생성 완료!")
        print(f"  - 총 샘플: {len(feature_df)}건")
        print(f"  - 생성된 피쳐 수: {len(self.feature_names)}개")
        print(f"  - 에러 발생: {error_count}건")
        print(f"  - 피쳐 목록: {self.feature_names[:100]}")

        return feature_df

    def _compute_features(self, past_trans, account, time_unit_str):
        """실제 피쳐 계산"""

        # time_unit_str를 datetime으로 변환
        try:
            time_unit = pd.to_datetime(time_unit_str)
        except:
            # 변환 실패 시 기본값 반환
            return self._default_features(account, time_unit_str)

        features = {'Account': account, 'TimeUnit': str(time_unit_str)}

        # 시간 윈도우 정의
        windows = {
            '1h': timedelta(hours=1),
            '3h': timedelta(hours=3),
            '1d': timedelta(days=1),
            '7d': timedelta(days=7)
        }

        for window_name, window_delta in windows.items():
            window_start = time_unit - window_delta  # 이제 datetime 연산 가능
            window_trans = past_trans[past_trans['Timestamp'] >= window_start]

            # OUT 거래 (송금)
            out_trans = window_trans[window_trans['Direction'] == 'OUT']
            features[f'out_count_{window_name}'] = len(out_trans)

            if len(out_trans) > 0:
                try:
                    out_amounts = out_trans['Amount Paid'].values
                    features[f'out_amount_sum_{window_name}'] = float(np.sum(out_amounts))
                    features[f'out_amount_mean_{window_name}'] = float(np.mean(out_amounts))
                    features[f'out_amount_std_{window_name}'] = float(np.std(out_amounts)) if len(out_amounts) > 1 else 0.0
                    features[f'out_amount_max_{window_name}'] = float(np.max(out_amounts))
                    features[f'out_amount_min_{window_name}'] = float(np.min(out_amounts))
                except Exception as e:
                    # 에러 발생 시 기본값
                    features[f'out_amount_sum_{window_name}'] = 0.0
                    features[f'out_amount_mean_{window_name}'] = 0.0
                    features[f'out_amount_std_{window_name}'] = 0.0
                    features[f'out_amount_max_{window_name}'] = 0.0
                    features[f'out_amount_min_{window_name}'] = 0.0
            else:
                features[f'out_amount_sum_{window_name}'] = 0.0
                features[f'out_amount_mean_{window_name}'] = 0.0
                features[f'out_amount_std_{window_name}'] = 0.0
                features[f'out_amount_max_{window_name}'] = 0.0
                features[f'out_amount_min_{window_name}'] = 0.0

            # IN 거래 (입금)
            in_trans = window_trans[window_trans['Direction'] == 'IN']
            features[f'in_count_{window_name}'] = len(in_trans)

            if len(in_trans) > 0:
                try:
                    in_amounts = in_trans['Amount Received'].values
                    features[f'in_amount_sum_{window_name}'] = float(np.sum(in_amounts))
                    features[f'in_amount_mean_{window_name}'] = float(np.mean(in_amounts))
                    features[f'in_amount_std_{window_name}'] = float(np.std(in_amounts)) if len(in_amounts) > 1 else 0.0
                    features[f'in_amount_max_{window_name}'] = float(np.max(in_amounts))
                    features[f'in_amount_min_{window_name}'] = float(np.min(in_amounts))
                except Exception as e:
                    features[f'in_amount_sum_{window_name}'] = 0.0
                    features[f'in_amount_mean_{window_name}'] = 0.0
                    features[f'in_amount_std_{window_name}'] = 0.0
                    features[f'in_amount_max_{window_name}'] = 0.0
                    features[f'in_amount_min_{window_name}'] = 0.0
            else:
                features[f'in_amount_sum_{window_name}'] = 0.0
                features[f'in_amount_mean_{window_name}'] = 0.0
                features[f'in_amount_std_{window_name}'] = 0.0
                features[f'in_amount_max_{window_name}'] = 0.0
                features[f'in_amount_min_{window_name}'] = 0.0

            # 순 흐름
            features[f'net_flow_{window_name}'] = (
                features[f'in_amount_sum_{window_name}'] -
                features[f'out_amount_sum_{window_name}']
            )

            # 외화 거래
            if len(window_trans) > 0:
                try:
                    foreign_curr = window_trans[
                        window_trans['Payment Currency'] != window_trans['Receiving Currency']
                    ]
                    features[f'foreign_count_{window_name}'] = len(foreign_curr)
                    features[f'foreign_ratio_{window_name}'] = len(foreign_curr) / len(window_trans)
                except:
                    features[f'foreign_count_{window_name}'] = 0
                    features[f'foreign_ratio_{window_name}'] = 0.0
            else:
                features[f'foreign_count_{window_name}'] = 0
                features[f'foreign_ratio_{window_name}'] = 0.0

        # 전체 거래 통계
        features['total_trans_count'] = len(past_trans)

        try:
            out_all = past_trans[past_trans['Direction'] == 'OUT']
            in_all = past_trans[past_trans['Direction'] == 'IN']

            if len(out_all) > 0:
                features['total_out_amount'] = float(np.sum(out_all['Amount Paid'].values))
            else:
                features['total_out_amount'] = 0.0

            if len(in_all) > 0:
                features['total_in_amount'] = float(np.sum(in_all['Amount Received'].values))
            else:
                features['total_in_amount'] = 0.0
        except:
            features['total_out_amount'] = 0.0
            features['total_in_amount'] = 0.0

        # 거래 상대방 다양성
        try:
            unique_counterparties = set()

            out_all = past_trans[past_trans['Direction'] == 'OUT']
            if len(out_all) > 0:
                for _, row in out_all.iterrows():
                    counterparty = str(row['From Bank']) + '_' + str(row['Account'])
                    unique_counterparties.add(counterparty)

            in_all = past_trans[past_trans['Direction'] == 'IN']
            if len(in_all) > 0:
                for _, row in in_all.iterrows():
                    counterparty = str(row['To Bank']) + '_' + str(row['Account.1'])
                    unique_counterparties.add(counterparty)

            features['unique_counterparties'] = len(unique_counterparties)
        except:
            features['unique_counterparties'] = 0

        # 결제 수단 다양성
        try:
            features['unique_currencies'] = past_trans['Payment Currency'].nunique() if len(past_trans) > 0 else 0
        except:
            features['unique_currencies'] = 0

        # 시간대 분포
        try:
            if len(past_trans) > 0:
                hour_counts = past_trans['Hour'].value_counts()
                night_mask = (hour_counts.index >= 0) & (hour_counts.index < 6)
                night_count = hour_counts[night_mask].sum() if night_mask.any() else 0
                features['night_trans_ratio'] = float(night_count) / len(past_trans)
            else:
                features['night_trans_ratio'] = 0.0
        except:
            features['night_trans_ratio'] = 0.0

        return features

    def _default_features(self, account, time_unit):
        """과거 거래가 없을 때 기본 피쳐"""
        features = {'Account': account, 'TimeUnit': str(time_unit)}

        # 모든 피쳐를 0으로 초기화
        windows = ['1h', '3h', '1d', '7d']
        for window in windows:
            for prefix in ['out', 'in']:
                for metric in ['count', 'amount_sum', 'amount_mean', 'amount_std', 'amount_max', 'amount_min']:
                    features[f'{prefix}_{metric}_{window}'] = 0.0 if 'amount' in metric else 0
            features[f'net_flow_{window}'] = 0.0
            features[f'foreign_count_{window}'] = 0
            features[f'foreign_ratio_{window}'] = 0.0

        features['total_trans_count'] = 0
        features['total_out_amount'] = 0.0
        features['total_in_amount'] = 0.0
        features['unique_counterparties'] = 0
        features['unique_currencies'] = 0
        features['night_trans_ratio'] = 0.0

        return features

    def create_graph_features(self, df_trans, account_time_df):
        """그래프 기반 피쳐 생성
        - Centrality 기반 (Degree, Closeness, Betweenness)
        - Path & Flow 패턴
        - Community 구조
        """
        print("\n" + "=" * 80)
        print("그래프 피쳐 생성")
        print("=" * 80)

        try:
            # 거래 네트워크 구축
            G = nx.DiGraph()

            print("그래프 구축 중...")
            edge_count = 0
            for idx, row in df_trans.iterrows():
                try:
                    from_acc = str(row['From Bank']) + '_' + str(row['Account'])
                    to_acc = str(row['To Bank']) + '_' + str(row['Account.1'])

                    # Amount Paid를 float으로 변환
                    try:
                        amount = float(row['Amount Paid'])
                        if np.isnan(amount) or np.isinf(amount):
                            amount = 0.0
                    except (ValueError, TypeError):
                        amount = 0.0

                    if G.has_edge(from_acc, to_acc):
                        G[from_acc][to_acc]['weight'] += amount
                        G[from_acc][to_acc]['count'] += 1
                    else:
                        G.add_edge(from_acc, to_acc, weight=amount, count=1)

                    edge_count += 1
                except Exception as e:
                    if edge_count < 5:
                        print(f"⚠️ 엣지 추가 실패: {e}")
                    continue

            print(f"그래프 구축 완료 - 노드: {G.number_of_nodes()}, 엣지: {G.number_of_edges()}")

            if G.number_of_nodes() == 0:
                print("⚠️ 그래프가 비어있습니다. 기본 그래프 피쳐 반환")
                return self._default_graph_features(account_time_df)

            # Centrality 계산
            print("Centrality 계산 중...")
            degree_centrality = nx.degree_centrality(G)
            in_degree_centrality = nx.in_degree_centrality(G)
            out_degree_centrality = nx.out_degree_centrality(G)

            # Betweenness는 계산량이 많으므로 샘플링
            sample_size = min(500, G.number_of_nodes())
            sample_nodes = list(G.nodes())[:sample_size]
            betweenness_centrality = nx.betweenness_centrality(
                G.subgraph(sample_nodes),
                weight='weight'
            )

            # PageRank
            try:
                pagerank = nx.pagerank(G, weight='weight', max_iter=50)
            except:
                pagerank = {}

            # 그래프 피쳐를 데이터프레임에 추가
            graph_features = []

            for _, row in account_time_df.iterrows():
                account = row['Account']

                features = {
                    'Account': account,
                    'TimeUnit': row['TimeUnit'],
                    'degree_centrality': degree_centrality.get(account, 0.0),
                    'in_degree_centrality': in_degree_centrality.get(account, 0.0),
                    'out_degree_centrality': out_degree_centrality.get(account, 0.0),
                    'betweenness_centrality': betweenness_centrality.get(account, 0.0),
                    'pagerank': pagerank.get(account, 0.0),
                }

                # 이웃 노드 정보
                try:
                    if account in G:
                        successors = list(G.successors(account))
                        predecessors = list(G.predecessors(account))

                        features['num_successors'] = len(successors)
                        features['num_predecessors'] = len(predecessors)
                        features['total_out_weight'] = float(sum(G[account][s]['weight'] for s in successors))
                        features['total_in_weight'] = float(sum(G[p][account]['weight'] for p in predecessors))
                    else:
                        features['num_successors'] = 0
                        features['num_predecessors'] = 0
                        features['total_out_weight'] = 0.0
                        features['total_in_weight'] = 0.0
                except:
                    features['num_successors'] = 0
                    features['num_predecessors'] = 0
                    features['total_out_weight'] = 0.0
                    features['total_in_weight'] = 0.0

                graph_features.append(features)

            graph_feature_df = pd.DataFrame(graph_features)

            print(f"✅ 그래프 피쳐 생성 완료 - 피쳐 수: {len(graph_feature_df.columns) - 2}")

            return graph_feature_df

        except Exception as e:
            print(f"⚠️ 그래프 피쳐 생성 실패: {e}")
            return self._default_graph_features(account_time_df)

    def _default_graph_features(self, account_time_df):
        """그래프 피쳐 생성 실패 시 기본값 반환"""
        graph_features = []

        for _, row in account_time_df.iterrows():
            features = {
                'Account': row['Account'],
                'TimeUnit': row['TimeUnit'],
                'degree_centrality': 0.0,
                'in_degree_centrality': 0.0,
                'out_degree_centrality': 0.0,
                'betweenness_centrality': 0.0,
                'pagerank': 0.0,
                'num_successors': 0,
                'num_predecessors': 0,
                'total_out_weight': 0.0,
                'total_in_weight': 0.0
            }
            graph_features.append(features)

        return pd.DataFrame(graph_features)


# ====================================
# 4. 모델 학습 및 평가
# ====================================

class AMLModelTrainer:
    """모델 학습 및 평가"""

    def __init__(self, random_state=42):
        self.random_state = random_state
        self.models = {}
        self.results = {}

    def prepare_train_test_split(self, feature_df, label_df, test_size=0.3):
        """시계열 기준 train/test 분할"""
        print("\n" + "=" * 80)
        print("Train/Test 분할")
        print("=" * 80)

        print(f"피쳐 데이터: {len(feature_df)}건")
        print(f"라벨 데이터: {len(label_df)}건")

        # TimeUnit 형식 통일
        feature_df['TimeUnit'] = feature_df['TimeUnit'].astype(str)
        label_df['TimeUnit'] = label_df['TimeUnit'].astype(str)

        print(f"\n피쳐 데이터 샘플:")
        print(feature_df[['Account', 'TimeUnit']].head(3))
        print(f"\n라벨 데이터 샘플:")
        print(label_df[['Account', 'TimeUnit', 'Is Laundering']].head(3))

        # 피쳐와 라벨 병합
        merged = feature_df.merge(
            label_df[['Account', 'TimeUnit', 'Is Laundering']],
            on=['Account', 'TimeUnit'],
            how='inner'
        )

        print(f"\n병합 후 데이터: {len(merged)}건")

        if len(merged) == 0:
            print("⚠️ 경고: 병합 후 데이터가 0건입니다!")
            print("피쳐와 라벨의 키가 매칭되지 않습니다.")
            print("\n피쳐 데이터의 Account 샘플:")
            print(feature_df['Account'].head(10).tolist())
            print("\n라벨 데이터의 Account 샘플:")
            print(label_df['Account'].head(10).tolist())

            # 대안: 인덱스 기반 매칭 시도
            print("\n대안: 순서 기반 매칭 시도...")
            min_len = min(len(feature_df), len(label_df))
            merged = feature_df.head(min_len).copy()
            merged['Is Laundering'] = label_df.head(min_len)['Is Laundering'].values
            print(f"순서 기반 매칭 결과: {len(merged)}건")

        if len(merged) == 0:
            raise ValueError("데이터 병합 실패! 피쳐와 라벨을 확인해주세요.")

        print(f"자금세탁 비율: {merged['Is Laundering'].mean():.4%}")

        # 시간순 정렬
        merged = merged.sort_values('TimeUnit')

        # 시간 기준 분할
        split_idx = int(len(merged) * (1 - test_size))

        # 최소한 1건 이상은 보장
        if split_idx == 0:
            split_idx = max(1, len(merged) // 2)

        train_df = merged.iloc[:split_idx]
        test_df = merged.iloc[split_idx:]

        print(f"\nTrain set: {len(train_df)} (자금세탁: {train_df['Is Laundering'].sum()})")
        print(f"Test set: {len(test_df)} (자금세탁: {test_df['Is Laundering'].sum()})")
        print(f"Train 자금세탁 비율: {train_df['Is Laundering'].mean():.4%}")
        print(f"Test 자금세탁 비율: {test_df['Is Laundering'].mean():.4%}")

        # Feature와 Label 분리
        feature_cols = [c for c in merged.columns
                       if c not in ['Account', 'TimeUnit', 'Is Laundering']]

        print(f"\n사용할 피쳐 수: {len(feature_cols)}개")

        X_train = train_df[feature_cols]
        y_train = train_df['Is Laundering']
        X_test = test_df[feature_cols]
        y_test = test_df['Is Laundering']

        return X_train, X_test, y_train, y_test, test_df

    def train_baseline_model(self, X_train, y_train, X_test, y_test,
                            use_smote=False, scale_pos_weight=None):
        """Baseline: XGBoost/CatBoost"""
        print("\n" + "=" * 80)
        print("Baseline 모델 학습 (XGBoost)")
        print("=" * 80)

        # 데이터 검증
        print(f"학습 데이터: {len(X_train)}건")
        print(f"테스트 데이터: {len(X_test)}건")
        print(f"Positive 비율 (Train): {y_train.sum() / len(y_train) * 100:.4f}%")
        print(f"Positive 비율 (Test): {y_test.sum() / len(y_test) * 100:.4f}%")

        # NaN 값 처리
        X_train = X_train.fillna(0)
        X_test = X_test.fillna(0)

        # Inf 값 처리
        X_train = X_train.replace([np.inf, -np.inf], 0)
        X_test = X_test.replace([np.inf, -np.inf], 0)

        # SMOTE 적용 여부
        if use_smote and y_train.sum() > 0:
            print("SMOTE 오버샘플링 적용 중...")
            try:
                smote = SMOTE(random_state=self.random_state)
                X_train_res, y_train_res = smote.fit_resample(X_train, y_train)
                print(f"SMOTE 후 - Positive: {y_train_res.sum()}, Negative: {len(y_train_res) - y_train_res.sum()}")
            except Exception as e:
                print(f"⚠️ SMOTE 실패: {e}")
                print("원본 데이터로 학습합니다.")
                X_train_res, y_train_res = X_train, y_train
        else:
            X_train_res, y_train_res = X_train, y_train

        # scale_pos_weight 계산
        if scale_pos_weight is None:
            if y_train_res.sum() > 0:
                scale_pos_weight = (len(y_train_res) - y_train_res.sum()) / y_train_res.sum()
            else:
                print("⚠️ 학습 데이터에 Positive 샘플이 없습니다!")
                scale_pos_weight = 1.0

        print(f"scale_pos_weight: {scale_pos_weight:.2f}")

        # base_score 계산 (0과 1 사이로 제한)
        positive_ratio = y_train_res.sum() / len(y_train_res)
        base_score = max(0.01, min(0.99, positive_ratio))  # 0.01 ~ 0.99 범위로 제한

        print(f"base_score: {base_score:.4f}")

        # XGBoost 학습
        xgb_model = xgb.XGBClassifier(
            n_estimators=100,
            max_depth=6,
            learning_rate=0.1,
            scale_pos_weight=scale_pos_weight,
            base_score=base_score,  # 명시적으로 설정
            random_state=self.random_state,
            eval_metric='logloss',  # auc 대신 logloss 사용
            use_label_encoder=False
        )

        try:
            xgb_model.fit(X_train_res, y_train_res, verbose=False)
        except Exception as e:
            print(f"⚠️ XGBoost 학습 실패: {e}")
            print("CatBoost로 전환합니다...")

            # CatBoost로 대체
            from catboost import CatBoostClassifier
            xgb_model = CatBoostClassifier(
                iterations=100,
                depth=6,
                learning_rate=0.1,
                loss_function='Logloss',
                random_state=self.random_state,
                verbose=False
            )
            xgb_model.fit(X_train_res, y_train_res)

        # 예측
        y_pred_proba = xgb_model.predict_proba(X_test)[:, 1]

        # 모델 저장
        self.models['baseline_xgb'] = xgb_model

        print("학습 완료!")

        return xgb_model, y_pred_proba

    def evaluate_topk(self, y_true, y_pred_proba, test_df, k_values=[50, 100, 200, 500]):
        """Top-K 평가"""
        print("\n" + "=" * 80)
        print("Top-K 평가")
        print("=" * 80)

        # 데이터 검증
        if len(y_true) == 0:
            print("⚠️ 테스트 데이터가 비어있습니다!")
            return {}

        total_positive = y_true.sum()
        print(f"테스트 데이터: {len(y_true)}건")
        print(f"자금세탁 건수: {total_positive}건 ({total_positive/len(y_true)*100:.4f}%)")

        if total_positive == 0:
            print("⚠️ 테스트 데이터에 자금세탁 건이 없습니다!")
            print("이는 다음 원인일 수 있습니다:")
            print("  1. 샘플링 비율이 너무 낮음 (현재 10%)")
            print("  2. 시계열 분할 시 자금세탁 건이 모두 Train에 포함됨")
            print("  3. 데이터 필터링 과정에서 자금세탁 건이 제외됨")
            print("\n해결 방법:")
            print("  - sample_ratio를 0.5 이상으로 높이기")
            print("  - test_size를 0.5로 조정하기")
            return {}

        results = {}

        # 점수 스케일링 (1000점 만점)
        if len(np.unique(y_pred_proba)) > 1:
            scores = (y_pred_proba - y_pred_proba.min()) / (y_pred_proba.max() - y_pred_proba.min()) * 1000
        else:
            scores = y_pred_proba * 1000

        for k in k_values:
            # K가 데이터보다 크면 조정
            k_actual = min(k, len(y_true))

            # Top K 선택
            top_k_idx = np.argsort(y_pred_proba)[-k_actual:]
            y_pred_topk = np.zeros(len(y_true))
            y_pred_topk[top_k_idx] = 1

            # 메트릭 계산
            precision = precision_score(y_true, y_pred_topk, zero_division=0)
            recall = recall_score(y_true, y_pred_topk, zero_division=0)
            f1 = f1_score(y_true, y_pred_topk, zero_division=0)

            detected_laundering = y_true[top_k_idx].sum()

            results[f'top_{k}'] = {
                'precision': precision,
                'recall': recall,
                'f1': f1,
                'detected': int(detected_laundering),
                'total': int(total_positive)
            }

            print(f"\nTop-{k_actual} 결과:")
            print(f"  Precision: {precision:.4f}")
            print(f"  Recall: {recall:.4f}")
            print(f"  F1-Score: {f1:.4f}")
            print(f"  탐지된 자금세탁: {int(detected_laundering)}/{int(total_positive)}")

        # 점수 구간별 분포
        print("\n" + "=" * 80)
        print("점수 구간별 분포")
        print("=" * 80)

        try:
            bins = list(range(0, 1001, 100))
            score_bins = pd.cut(scores, bins=bins, right=False)

            # Categorical의 categories 직접 접근
            for bin_range in score_bins.categories:
                mask = score_bins == bin_range
                if mask.sum() > 0:
                    bin_positive = y_true[mask].sum()
                    bin_negative = mask.sum() - bin_positive

                    print(f"{bin_range}: 정상={int(bin_negative)}, 자금세탁={int(bin_positive)}")
        except Exception as e:
            print(f"⚠️ 점수 구간별 분포 계산 실패: {e}")

        return results

    def explain_with_shap(self, model, X_train, X_test, feature_names):
        """SHAP을 이용한 피쳐 중요도 분석"""
        print("\n" + "=" * 80)
        print("XAI: SHAP 피쳐 중요도 분석")
        print("=" * 80)

        try:
            import shap

            # SHAP 값 계산
            explainer = shap.TreeExplainer(model)
            shap_values = explainer.shap_values(X_test.iloc[:100])  # 샘플만

            # 피쳐 중요도
            feature_importance = pd.DataFrame({
                'feature': feature_names,
                'importance': np.abs(shap_values).mean(axis=0)
            }).sort_values('importance', ascending=False)

            print("\nTop 20 중요 피쳐:")
            print(feature_importance.head(20))

            return feature_importance
        except ImportError:
            print("SHAP 라이브러리가 설치되지 않았습니다. pip install shap")
            return None


# ====================================
# 5. 메인 실행 파이프라인
# ====================================

def main():
    """전체 파이프라인 실행"""

    print("=" * 80)
    print("금융결제원 AML 탐지 프로젝트 시작")
    print("=" * 80)

    # ========== 1. 기본 데이터 로드 및 전처리 ==========
    loader = AMLDataLoader(
        trans_path=trans_path,
        accounts_path=accounts_path
    )

    df_trans, df_accounts = loader.load_data()
    df_trans = loader.explore_data(df_trans, df_accounts)

    # 샘플링 비율을 높여서 더 많은 데이터 확보
    preprocessor = AMLPreprocessor(sample_ratio=0.3)    # 10% → 30%로 증가

    # 시간 단위 배치 샘플 생성
    all_trans, account_time_labels = preprocessor.create_hourly_samples(df_trans)
    sampled_labels = preprocessor.stratified_sample(account_time_labels)

    print(f"\n전체 계좌-시간 샘플: {len(account_time_labels)}건")
    print(f"자금세탁 샘플: {account_time_labels['Is Laundering'].sum()}건")

    # 자금세탁 패턴 분석
    laundering, normal = preprocessor.analyze_laundering_patterns(df_trans)

    # 샘플링 (큰 데이터를 작게)
    sampled_labels = preprocessor.stratified_sample(account_time_labels)

    print(f"\n샘플링 후:")
    print(f"  총 샘플: {len(sampled_labels)}건")
    print(f"  자금세탁: {sampled_labels['Is Laundering'].sum()}건")
    print(f"  자금세탁 비율: {sampled_labels['Is Laundering'].mean():.4%}")

    # 자금세탁 건이 너무 적으면 경고
    if sampled_labels['Is Laundering'].sum() < 10:
        print("\n⚠️ 경고: 자금세탁 샘플이 너무 적습니다!")
        print("sample_ratio를 더 높이거나 전체 데이터를 사용하세요.")
        print("일단 전체 데이터로 진행합니다...")
        sampled_labels = account_time_labels

    # ========== 2. 피쳐 생성 ==========
    feature_engineer = FeatureEngineer()

    # 집계 피쳐 생성 (더 많은 샘플 처리)
    print("\n⚠️ 주의: 피쳐 생성에 시간이 걸릴 수 있습니다.")
    print(f"최대 {min(2000, len(sampled_labels))}건 처리")

    # 집계 피쳐 (그래프 피쳐 제외)
    agg_features = feature_engineer.create_aggregation_features(
        all_trans,
        sampled_labels
    )

    print(f"\n집계 피쳐 생성 완료: {len(agg_features)}건")

    # 그래프 피쳐 생성
    graph_features = feature_engineer.create_graph_features(
        all_trans,
        sampled_labels.head(len(agg_features))    # 집계 피쳐와 동일한 수만큼
    )

    print(f"그래프 피쳐 생성 완료: {len(graph_features)}건")

    # 그래프 피쳐명 추출
    graph_feature_names = [c for c in graph_features.columns
                          if c not in ['Account', 'TimeUnit']]

    print(f"\n그래프 피쳐 수: {len(graph_feature_names)}개")
    print(f"그래프 피쳐: {graph_feature_names}")

    # 피쳐 병합 (그래프 포함)
    all_features = agg_features.merge(
        graph_features,
        on=['Account', 'TimeUnit'],
        how='inner'
    )

    print(f"\n피쳐 병합 후: {len(all_features)}건")
    print(f"전체 피쳐 수: {len([c for c in all_features.columns if c not in ['Account', 'TimeUnit']])}")

    # 그래프 피쳐 제외 버전
    features_no_graph = agg_features.copy()

    # ========== 3.모델 학습 ==========
    trainer = AMLModelTrainer()

    # Train/Test 분할 (test_size를 크게 해서 테스트 데이터 확보)
    # 그래프 피쳐 포함 버전
    X_train, X_test, y_train, y_test, test_df = trainer.prepare_train_test_split(
        all_features,
        sampled_labels,
        test_size=0.4   # 30% → 40%로 증가
    )
    # 그래프 피쳐 제외 버전
    X_train_no_graph, X_test_no_graph, _, _, _ = trainer.prepare_train_test_split(
        features_no_graph,
        sampled_labels,
        test_size=0.4
    )

    # 데이터가 없으면 중단
    if len(X_train) == 0 or len(X_test) == 0:
        print("\n⚠️ 학습 또는 테스트 데이터가 없습니다. 프로세스를 중단합니다.")
        print("\n문제 해결 방법:")
        print("1. sample_ratio를 1.0으로 설정 (전체 데이터 사용)")
        print("2. 피쳐 생성의 max_samples 제한 제거")
        print("3. 데이터 파일 경로 확인")
        return None

    # Baseline 모델 학습
    print("\n" + "=" * 80)
    print("Baseline 모델 학습")
    print("=" * 80)

    baseline_model, baseline_pred = trainer.train_baseline_model(
        X_train, y_train, X_test, y_test,
        use_smote=False,  # SMOTE 사용 여부
        scale_pos_weight=None  # Auto 계산
    )

    # ========== 5. 평가 ==========
    # Top-K 평가
    topk_results = trainer.evaluate_topk(
        y_test.values,
        y_pred_proba,
        test_df,
        k_values=[10, 20, 50]  # K 값을 작게 조정
    )

    # Feature Importance (XAI)
    if len(X_train) > 0 and len(X_test) > 0:
        feature_names = [c for c in X_train.columns]
        feature_importance = trainer.explain_with_shap(
            baseline_model,
            X_train,
            X_test,
            feature_names
        )
    else:
        feature_importance = None

    # ========== 5. 고도화 실험 실행 ==========
    # 실험 프레임워크 초기화
    exp_framework = ExperimentFramework()

    # ---------- 실험 1: 그래프 피쳐 없음 ----------
    print("\n" + "=" * 80)
    print("실험 1: Baseline (그래프 피쳐 제외)")
    print("=" * 80)

    def train_no_graph(X_tr, y_tr, X_te, y_te):
        from xgboost import XGBClassifier
        scale_pos_weight = (len(y_tr) - y_tr.sum()) / y_tr.sum() if y_tr.sum() > 0 else 1.0
        model = XGBClassifier(
            n_estimators=100,
            max_depth=6,
            learning_rate=0.1,
            scale_pos_weight=scale_pos_weight,
            random_state=42,
            eval_metric='logloss'
        )
        model.fit(X_tr, y_tr)
        pred = model.predict_proba(X_te)[:, 1]
        return model, pred

    model_no_graph, pred_no_graph = exp_framework.run_experiment(
        "Baseline (No Graph)",
        train_no_graph,
        X_train_no_graph, y_train,
        X_test_no_graph, y_test
    )

    # ---------- 실험 2: 그래프 피쳐 포함 ----------
    print("\n" + "=" * 80)
    print("실험 2: 그래프 피쳐 포함")
    print("=" * 80)

    def train_with_graph(X_tr, y_tr, X_te, y_te):
        from xgboost import XGBClassifier
        scale_pos_weight = (len(y_tr) - y_tr.sum()) / y_tr.sum() if y_tr.sum() > 0 else 1.0
        model = XGBClassifier(
            n_estimators=100,
            max_depth=6,
            learning_rate=0.1,
            scale_pos_weight=scale_pos_weight,
            random_state=42,
            eval_metric='logloss'
        )
        model.fit(X_tr, y_tr)
        pred = model.predict_proba(X_te)[:, 1]
        return model, pred

    model_with_graph, pred_with_graph = exp_framework.run_experiment(
        "With Graph Features",
        train_with_graph,
        X_train, y_train,
        X_test, y_test
    )


    # ========== 6. 결과 저장 ==========
    print("\n다음 단계:")
    print("1. 그래프 피쳐 추가 전후 성능 비교")
    print("1. 시계열 모델 앙상블")
    print("2. GNN 모델 적용")
    print("3. 백테스트")

    return {
        'baseline_model': baseline_model,
        'topk_results': topk_results,
        'feature_importance': feature_importance
    }


# ====================================
# 추가: GNN 모델 (향후 구현)
# ====================================

class GNNModel:
    """Graph Neural Network for AML Detection
    향후 구현 예정:
    - GraphSAGE
    - GAT (Graph Attention Network)
    - Temporal GNN
    """

    def __init__(self):
        print("GNN 모델은 PyTorch Geometric 라이브러리 필요")
        print("pip install torch-geometric")

    def build_model(self):
        """GNN 모델 구축"""
        pass

    def train(self):
        """GNN 학습"""
        pass

"""
AML 탐지 프로젝트 - 고도화 모델 및 성능 비교
1. 시계열 모델 앙상블
2. 그래프 피쳐 추가 전후 비교
3. GNN 모델 적용
4. GNN 아키텍처 고도화
5. 백테스트/시뮬레이션
"""

# ====================================
# 1. 시계열 모델 앙상블
# ====================================

class TimeSeriesEnsemble:
    """시계열 특성을 다루는 모델과 Boosting 모델 앙상블"""

    def __init__(self, base_model, random_state=42):
        self.base_model = base_model  # XGBoost/CatBoost
        self.lstm_model = None
        self.ensemble_weights = {'base': 0.7, 'lstm': 0.3}
        self.random_state = random_state

    def build_lstm_model(self, input_dim, hidden_dim=64):
        """LSTM 기반 시계열 모델 구축"""

        class LSTMAMLDetector(nn.Module):
            def __init__(self, input_dim, hidden_dim):
                super(LSTMAMLDetector, self).__init__()
                self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True, num_layers=2)
                self.dropout = nn.Dropout(0.3)
                self.fc1 = nn.Linear(hidden_dim, 32)
                self.fc2 = nn.Linear(32, 1)

            def forward(self, x):
                # x: (batch, seq_len, features)
                lstm_out, (h_n, c_n) = self.lstm(x)
                # 마지막 시점의 hidden state 사용
                out = self.dropout(h_n[-1])
                out = F.relu(self.fc1(out))
                out = torch.sigmoid(self.fc2(out))
                return out

        return LSTMAMLDetector(input_dim, hidden_dim)

    def prepare_sequence_data(self, X, window_size=5):
        """시계열 윈도우 데이터 준비"""
        sequences = []

        # 계좌별로 시계열 윈도우 생성
        for i in range(len(X) - window_size + 1):
            seq = X[i:i+window_size]
            sequences.append(seq)

        return np.array(sequences)

    def train_lstm(self, X_train, y_train, epochs=20, batch_size=32):
        """LSTM 모델 학습"""
        print("\n시계열 LSTM 모델 학습 중...")

        # 시퀀스 데이터 준비
        X_seq = self.prepare_sequence_data(X_train.values)
        y_seq = y_train.values[len(y_train) - len(X_seq):]

        # PyTorch 텐서 변환
        X_tensor = torch.FloatTensor(X_seq)
        y_tensor = torch.FloatTensor(y_seq).reshape(-1, 1)

        # 모델 초기화
        self.lstm_model = self.build_lstm_model(X_train.shape[1])
        optimizer = torch.optim.Adam(self.lstm_model.parameters(), lr=0.001)
        criterion = nn.BCELoss()

        # 학습
        self.lstm_model.train()
        for epoch in range(epochs):
            for i in range(0, len(X_tensor), batch_size):
                batch_X = X_tensor[i:i+batch_size]
                batch_y = y_tensor[i:i+batch_size]

                optimizer.zero_grad()
                outputs = self.lstm_model(batch_X)
                loss = criterion(outputs, batch_y)
                loss.backward()
                optimizer.step()

            if (epoch + 1) % 5 == 0:
                print(f"Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}")

        print("LSTM 학습 완료!")

    def predict_ensemble(self, X_test):
        """앙상블 예측"""
        # Base model 예측
        base_pred = self.base_model.predict_proba(X_test)[:, 1]

        # LSTM 예측
        if self.lstm_model is not None:
            X_seq = self.prepare_sequence_data(X_test.values)
            X_tensor = torch.FloatTensor(X_seq)

            self.lstm_model.eval()
            with torch.no_grad():
                lstm_pred_full = self.lstm_model(X_tensor).numpy().flatten()

            # 길이 맞추기
            lstm_pred = np.zeros(len(X_test))
            lstm_pred[-len(lstm_pred_full):] = lstm_pred_full
            lstm_pred[:len(X_test)-len(lstm_pred_full)] = lstm_pred_full[0]
        else:
            lstm_pred = base_pred

        # 가중 평균 앙상블
        ensemble_pred = (
            self.ensemble_weights['base'] * base_pred +
            self.ensemble_weights['lstm'] * lstm_pred
        )

        return ensemble_pred


# ====================================
# 2. 그래프 피쳐 추가 전후 비교
# ====================================

class GraphFeatureComparison:
    """그래프 피쳐 추가 전후 성능 비교"""

    def __init__(self, model_class):
        self.model_class = model_class
        self.results = {}

    def train_without_graph_features(self, X_train, y_train, X_test, y_test, graph_feature_names):
        """그래프 피쳐 없이 학습"""
        print("\n[실험 1] 그래프 피쳐 제외 모델")

        # 그래프 피쳐 제거
        non_graph_cols = [c for c in X_train.columns if c not in graph_feature_names]
        X_train_no_graph = X_train[non_graph_cols]
        X_test_no_graph = X_test[non_graph_cols]

        print(f"사용 피쳐 수: {len(non_graph_cols)}개")

        # 모델 학습
        model = self.model_class(random_state=42)
        model.fit(X_train_no_graph, y_train)

        # 예측 및 평가
        y_pred_proba = model.predict_proba(X_test_no_graph)[:, 1]
        y_pred = (y_pred_proba > 0.5).astype(int)

        metrics = self._calculate_metrics(y_test, y_pred, y_pred_proba)
        self.results['without_graph'] = metrics

        return model, y_pred_proba

    def train_with_graph_features(self, X_train, y_train, X_test, y_test):
        """그래프 피쳐 포함 학습"""
        print("\n[실험 2] 그래프 피쳐 포함 모델")
        print(f"사용 피쳐 수: {X_train.shape[1]}개")

        # 모델 학습
        model = self.model_class(random_state=42)
        model.fit(X_train, y_train)

        # 예측 및 평가
        y_pred_proba = model.predict_proba(X_test)[:, 1]
        y_pred = (y_pred_proba > 0.5).astype(int)

        metrics = self._calculate_metrics(y_test, y_pred, y_pred_proba)
        self.results['with_graph'] = metrics

        return model, y_pred_proba

    def _calculate_metrics(self, y_true, y_pred, y_pred_proba):
        """성능 지표 계산"""
        return {
            'precision': precision_score(y_true, y_pred, zero_division=0),
            'recall': recall_score(y_true, y_pred, zero_division=0),
            'f1': f1_score(y_true, y_pred, zero_division=0),
            'auc': roc_auc_score(y_true, y_pred_proba) if len(np.unique(y_true)) > 1 else 0
        }

    def compare_results(self):
        """결과 비교 출력"""
        print("\n" + "=" * 80)
        print("그래프 피쳐 추가 효과 비교")
        print("=" * 80)

        df_results = pd.DataFrame(self.results).T
        print(df_results)

        # 개선율 계산
        if 'without_graph' in self.results and 'with_graph' in self.results:
            improvement = {}
            for metric in ['precision', 'recall', 'f1', 'auc']:
                without = self.results['without_graph'][metric]
                with_g = self.results['with_graph'][metric]
                if without > 0:
                    improvement[metric] = ((with_g - without) / without) * 100
                else:
                    improvement[metric] = 0

            print("\n개선율 (%):")
            for metric, value in improvement.items():
                print(f"  {metric}: {value:+.2f}%")

        return df_results


# ====================================
# 3. 간소화된 GNN 모델
# ====================================

class SimpleGNN:
    """PyTorch Geometric 없이 구현한 간소화 GNN"""

    def __init__(self, input_dim, hidden_dim=64, output_dim=1):
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim
        self.model = self._build_model()

    def _build_model(self):
        """GNN 모델 구축"""

        class SimpleGCN(nn.Module):
            def __init__(self, input_dim, hidden_dim, output_dim):
                super(SimpleGCN, self).__init__()
                self.fc1 = nn.Linear(input_dim, hidden_dim)
                self.fc2 = nn.Linear(hidden_dim, hidden_dim)
                self.fc3 = nn.Linear(hidden_dim, output_dim)
                self.dropout = nn.Dropout(0.3)

            def forward(self, x, adj):
                # x: 노드 피쳐 (N, input_dim)
                # adj: 인접 행렬 (N, N)

                # 첫 번째 GCN 레이어
                x = torch.mm(adj, x)  # 이웃 정보 집계
                x = self.fc1(x)
                x = F.relu(x)
                x = self.dropout(x)

                # 두 번째 GCN 레이어
                x = torch.mm(adj, x)
                x = self.fc2(x)
                x = F.relu(x)
                x = self.dropout(x)

                # 출력 레이어
                x = self.fc3(x)
                x = torch.sigmoid(x)

                return x

        return SimpleGCN(self.input_dim, self.hidden_dim, self.output_dim)

    def build_adjacency_matrix(self, edge_list, num_nodes):
        """인접 행렬 생성"""
        adj = np.zeros((num_nodes, num_nodes))

        for src, dst in edge_list:
            adj[src, dst] = 1
            # adj[dst, src] = 1  # 무방향 그래프인 경우

        # Self-loop 추가
        adj = adj + np.eye(num_nodes)

        # 정규화 (D^-1/2 * A * D^-1/2)
        rowsum = np.array(adj.sum(1))
        d_inv_sqrt = np.power(rowsum, -0.5).flatten()
        d_inv_sqrt[np.isinf(d_inv_sqrt)] = 0.
        d_mat_inv_sqrt = np.diag(d_inv_sqrt)
        adj_normalized = d_mat_inv_sqrt.dot(adj).dot(d_mat_inv_sqrt)

        return torch.FloatTensor(adj_normalized)

    def train_model(self, X, y, edge_list, epochs=50, lr=0.01):
        """GNN 학습"""
        print("\nGNN 모델 학습 중...")

        # 데이터 준비
        num_nodes = len(X)
        X_tensor = torch.FloatTensor(X)
        y_tensor = torch.FloatTensor(y.values).reshape(-1, 1)
        adj_matrix = self.build_adjacency_matrix(edge_list, num_nodes)

        # 옵티마이저 및 손실함수
        optimizer = torch.optim.Adam(self.model.parameters(), lr=lr)
        criterion = nn.BCELoss()

        # 학습
        self.model.train()
        for epoch in range(epochs):
            optimizer.zero_grad()
            outputs = self.model(X_tensor, adj_matrix)
            loss = criterion(outputs, y_tensor)
            loss.backward()
            optimizer.step()

            if (epoch + 1) % 10 == 0:
                print(f"Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}")

        print("GNN 학습 완료!")

    def predict(self, X, edge_list):
        """GNN 예측"""
        self.model.eval()

        num_nodes = len(X)
        X_tensor = torch.FloatTensor(X)
        adj_matrix = self.build_adjacency_matrix(edge_list, num_nodes)

        with torch.no_grad():
            outputs = self.model(X_tensor, adj_matrix)

        return outputs.numpy().flatten()


# ====================================
# 4. GNN 아키텍처 고도화
# ====================================

class AdvancedGNN:
    """고도화된 GNN 아키텍처"""

    def __init__(self, input_dim, hidden_dim=64):
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim

    def build_gnn_rnn_hybrid(self):
        """GNN + RNN 하이브리드 모델"""

        class GNN_RNN_Hybrid(nn.Module):
            def __init__(self, input_dim, hidden_dim):
                super(GNN_RNN_Hybrid, self).__init__()

                # GNN 부분
                self.gcn1 = nn.Linear(input_dim, hidden_dim)
                self.gcn2 = nn.Linear(hidden_dim, hidden_dim)

                # RNN 부분 (시계열)
                self.lstm = nn.LSTM(hidden_dim, hidden_dim, batch_first=True)

                # 융합 레이어
                self.fc1 = nn.Linear(hidden_dim * 2, hidden_dim)
                self.fc2 = nn.Linear(hidden_dim, 1)
                self.dropout = nn.Dropout(0.3)

            def forward(self, x, adj, x_seq=None):
                # GNN 경로
                h_gcn = torch.mm(adj, x)
                h_gcn = F.relu(self.gcn1(h_gcn))
                h_gcn = torch.mm(adj, h_gcn)
                h_gcn = F.relu(self.gcn2(h_gcn))

                # RNN 경로 (시계열이 있는 경우)
                if x_seq is not None:
                    _, (h_rnn, _) = self.lstm(x_seq)
                    h_rnn = h_rnn[-1]
                else:
                    h_rnn = torch.zeros_like(h_gcn)

                # 융합
                h_combined = torch.cat([h_gcn, h_rnn], dim=1)
                h_combined = self.dropout(h_combined)
                h_combined = F.relu(self.fc1(h_combined))
                output = torch.sigmoid(self.fc2(h_combined))

                return output

        return GNN_RNN_Hybrid(self.input_dim, self.hidden_dim)

    def build_temporal_gnn(self):
        """Temporal GNN (TGN)"""

        class TemporalGNN(nn.Module):
            def __init__(self, input_dim, hidden_dim):
                super(TemporalGNN, self).__init__()

                # 시간별 GNN 레이어
                self.temporal_gcn = nn.ModuleList([
                    nn.Linear(input_dim, hidden_dim) for _ in range(3)
                ])

                # Attention
                self.attention = nn.Linear(hidden_dim, 1)

                # 출력
                self.fc = nn.Linear(hidden_dim, 1)

            def forward(self, x_list, adj_list):
                # x_list: 시간 단계별 피쳐 리스트
                # adj_list: 시간 단계별 인접 행렬 리스트

                temporal_embeddings = []

                for t, (x, adj) in enumerate(zip(x_list, adj_list)):
                    if t < len(self.temporal_gcn):
                        h = torch.mm(adj, x)
                        h = F.relu(self.temporal_gcn[t](h))
                        temporal_embeddings.append(h)

                # Temporal attention
                if len(temporal_embeddings) > 0:
                    embeddings_stacked = torch.stack(temporal_embeddings, dim=1)
                    attention_weights = F.softmax(
                        self.attention(embeddings_stacked).squeeze(-1),
                        dim=1
                    )
                    h_final = torch.sum(
                        embeddings_stacked * attention_weights.unsqueeze(-1),
                        dim=1
                    )
                else:
                    h_final = x_list[0]

                output = torch.sigmoid(self.fc(h_final))
                return output

        return TemporalGNN(self.input_dim, self.hidden_dim)

    def build_dual_view_hybrid(self):
        """Dual-View Hybrid (구조적 + 행위적 뷰)"""

        class DualViewHybrid(nn.Module):
            def __init__(self, input_dim, hidden_dim):
                super(DualViewHybrid, self).__init__()

                # 구조적 뷰 (그래프 중심성 등)
                self.structural_encoder = nn.Sequential(
                    nn.Linear(input_dim // 2, hidden_dim),
                    nn.ReLU(),
                    nn.Dropout(0.3)
                )

                # 행위적 뷰 (거래 패턴 등)
                self.behavioral_encoder = nn.Sequential(
                    nn.Linear(input_dim // 2, hidden_dim),
                    nn.ReLU(),
                    nn.Dropout(0.3)
                )

                # Contrastive learning용 projection
                self.projection = nn.Linear(hidden_dim, hidden_dim)

                # 분류기
                self.classifier = nn.Sequential(
                    nn.Linear(hidden_dim * 2, hidden_dim),
                    nn.ReLU(),
                    nn.Linear(hidden_dim, 1),
                    nn.Sigmoid()
                )

            def forward(self, x_structural, x_behavioral):
                # 각 뷰별 인코딩
                h_struct = self.structural_encoder(x_structural)
                h_behav = self.behavioral_encoder(x_behavioral)

                # Projection (대조 학습)
                z_struct = self.projection(h_struct)
                z_behav = self.projection(h_behav)

                # 융합 및 분류
                h_combined = torch.cat([h_struct, h_behav], dim=1)
                output = self.classifier(h_combined)

                return output, z_struct, z_behav

        return DualViewHybrid(self.input_dim, self.hidden_dim)


# ====================================
# 5. 백테스트 및 시뮬레이션
# ====================================

class AMLBacktester:
    """과거 데이터 기반 백테스트 및 시뮬레이션"""

    def __init__(self, model, threshold=0.5):
        self.model = model
        self.threshold = threshold
        self.simulation_results = []

    def run_backtest(self, X_test, y_test, test_df, transaction_data):
        """백테스트 실행"""
        print("\n" + "=" * 80)
        print("백테스트 시뮬레이션")
        print("=" * 80)

        # 예측
        y_pred_proba = self.model.predict_proba(X_test)[:, 1]
        y_pred = (y_pred_proba >= self.threshold).astype(int)

        # 시뮬레이션 결과 저장
        results_df = test_df.copy()
        results_df['predicted_proba'] = y_pred_proba
        results_df['predicted'] = y_pred
        results_df['actual'] = y_test.values

        # 탐지된 케이스 분석
        detected_positive = results_df[
            (results_df['predicted'] == 1) &
            (results_df['actual'] == 1)
        ]

        false_positive = results_df[
            (results_df['predicted'] == 1) &
            (results_df['actual'] == 0)
        ]

        missed_cases = results_df[
            (results_df['predicted'] == 0) &
            (results_df['actual'] == 1)
        ]

        # 금액 계산
        total_laundering_amount = self._calculate_total_amount(
            detected_positive, transaction_data
        )
        prevented_amount = total_laundering_amount
        missed_amount = self._calculate_total_amount(
            missed_cases, transaction_data
        )

        # 결과 출력
        print(f"\n[시뮬레이션 기간]")
        print(f"  시작: {results_df['TimeUnit'].min()}")
        print(f"  종료: {results_df['TimeUnit'].max()}")

        print(f"\n[탐지 결과]")
        print(f"  정탐 (TP): {len(detected_positive)}건")
        print(f"  오탐 (FP): {len(false_positive)}건")
        print(f"  미탐 (FN): {len(missed_cases)}건")

        print(f"\n[금액 분석]")
        print(f"  탐지한 자금세탁 금액: ${prevented_amount:,.2f}")
        print(f"  놓친 자금세탁 금액: ${missed_amount:,.2f}")
        print(f"  탐지율 (금액 기준): {prevented_amount/(prevented_amount+missed_amount)*100:.2f}%")

        # 시계열 분석
        self._plot_temporal_detection(results_df)

        # 상세 케이스 분석
        self._analyze_detection_cases(detected_positive, false_positive, missed_cases)

        return results_df

    def _calculate_total_amount(self, cases_df, transaction_data):
        """케이스들의 총 거래 금액 계산"""
        if len(cases_df) == 0:
            return 0.0

        total = 0.0
        for _, case in cases_df.iterrows():
            account = case['Account']
            time_unit = case['TimeUnit']

            # 해당 계좌의 해당 시간대 거래 금액 합산
            relevant_trans = transaction_data[
                (transaction_data['Account'] == account) &
                (transaction_data['TimeUnit'] == time_unit)
            ]

            if len(relevant_trans) > 0:
                total += relevant_trans['Amount Received'].sum()

        return total

    def _plot_temporal_detection(self, results_df):
        """시계열 탐지 결과 시각화"""
        print("\n[시계열 탐지 패턴]")

        # 시간대별 탐지 현황
        time_detection = results_df.groupby('TimeUnit').agg({
            'actual': 'sum',
            'predicted': 'sum'
        }).reset_index()

        time_detection.columns = ['TimeUnit', 'Actual_Laundering', 'Detected']

        print(f"\n시간대별 탐지 현황 (샘플):")
        print(time_detection.head(10))

    def _analyze_detection_cases(self, detected, false_pos, missed):
        """탐지 케이스 상세 분석"""
        print("\n" + "=" * 80)
        print("케이스별 상세 분석")
        print("=" * 80)

        if len(detected) > 0:
            print(f"\n✅ 성공 탐지 케이스 (샘플 5건):")
            print(detected[['Account', 'TimeUnit', 'predicted_proba']].head())

        if len(false_pos) > 0:
            print(f"\n⚠️ 오탐 케이스 (샘플 5건):")
            print(false_pos[['Account', 'TimeUnit', 'predicted_proba']].head())

        if len(missed) > 0:
            print(f"\n❌ 미탐 케이스 (샘플 5건):")
            print(missed[['Account', 'TimeUnit', 'predicted_proba']].head())


# ====================================
# 6. XAI - SHAP 및 GNN 설명
# ====================================

class ModelExplainer:
    """모델 설명 가능성 분석"""

    def __init__(self, model, feature_names):
        self.model = model
        self.feature_names = feature_names

    def explain_with_shap(self, X_train, X_test):
        """SHAP 기반 피쳐 중요도"""
        print("\n" + "=" * 80)
        print("SHAP 피쳐 중요도 분석")
        print("=" * 80)

        try:
            import shap

            # Tree explainer
            explainer = shap.TreeExplainer(self.model)
            shap_values = explainer.shap_values(X_test.iloc[:100])

            # 피쳐 중요도
            feature_importance = pd.DataFrame({
                'feature': self.feature_names,
                'importance': np.abs(shap_values).mean(axis=0)
            }).sort_values('importance', ascending=False)

            print("\n상위 20개 중요 피쳐:")
            print(feature_importance.head(20))

            # 그래프 피쳐 vs 비그래프 피쳐 비교
            graph_keywords = ['centrality', 'degree', 'pagerank', 'betweenness',
                            'successors', 'predecessors', 'weight']

            feature_importance['is_graph'] = feature_importance['feature'].apply(
                lambda x: any(kw in x.lower() for kw in graph_keywords)
            )

            graph_importance = feature_importance[feature_importance['is_graph']]['importance'].sum()
            non_graph_importance = feature_importance[~feature_importance['is_graph']]['importance'].sum()

            print(f"\n그래프 피쳐 총 중요도: {graph_importance:.4f}")
            print(f"비그래프 피쳐 총 중요도: {non_graph_importance:.4f}")
            print(f"그래프 피쳐 기여도: {graph_importance/(graph_importance+non_graph_importance)*100:.2f}%")

            return feature_importance

        except ImportError:
            print("⚠️ SHAP 라이브러리가 설치되지 않았습니다.")
            return None

    def explain_gnn_attention(self, gnn_model, node_features, adj_matrix):
        """GNN attention 가중치 분석"""
        print("\n" + "=" * 80)
        print("GNN Attention 분석")
        print("=" * 80)

        # Attention 가중치 추출 (모델에 attention이 있는 경우)
        try:
            # 간단한 예시: 각 노드의 이웃에 대한 attention
            # 실제로는 모델 구조에 따라 다름

            attention_weights = torch.softmax(
                torch.mm(node_features, node_features.t()),
                dim=1
            )

            # 각 노드별 주요 이웃 분석
            top_k = 5
            for node_idx in range(min(10, len(node_features))):
                top_neighbors = torch.topk(attention_weights[node_idx], top_k)
                print(f"\n노드 {node_idx}의 주요 이웃 (Top {top_k}):")
                for i, (weight, neighbor) in enumerate(zip(top_neighbors.values, top_neighbors.indices)):
                    print(f"  {i+1}. 노드 {neighbor.item()}: {weight.item():.4f}")

        except Exception as e:
            print(f"⚠️ GNN Attention 분석 실패: {e}")


# ====================================
# 7. 통합 실험 프레임워크
# ====================================

class ExperimentFramework:
    """전체 실험 관리 및 비교"""

    def __init__(self):
        self.experiments = {}
        self.results_summary = []

    def run_experiment(self, exp_name, train_fn, X_train, y_train, X_test, y_test):
        """실험 실행"""
        print(f"\n{'='*80}")
        print(f"실험: {exp_name}")
        print(f"{'='*80}")

        # 학습 및 예측
        model, y_pred_proba = train_fn(X_train, y_train, X_test, y_test)

        # 평가
        y_pred = (y_pred_proba >= 0.5).astype(int)

        metrics = {
            'experiment': exp_name,
            'precision': precision_score(y_test, y_pred, zero_division=0),
            'recall': recall_score(y_test, y_pred, zero_division=0),
            'f1': f1_score(y_test, y_pred, zero_division=0),
            'auc': roc_auc_score(y_test, y_pred_proba) if len(np.unique(y_test)) > 1 else 0
        }

        self.experiments[exp_name] = {
            'model': model,
            'predictions': y_pred_proba,
            'metrics': metrics
        }

        self.results_summary.append(metrics)

        print(f"\n[결과]")
        for metric, value in metrics.items():
            if metric != 'experiment':
                print(f"  {metric}: {value:.4f}")

        return model, y_pred_proba

    def compare_all_experiments(self):
        """모든 실험 결과 비교"""
        print("\n" + "=" * 80)
        print("전체 실험 결과 비교")
        print("=" * 80)

        df_results = pd.DataFrame(self.results_summary)
        df_results = df_results.set_index('experiment')

        print("\n", df_results)

        # 최고 성능 모델
        best_f1_exp = df_results['f1'].idxmax()
        print(f"\n🏆 최고 F1 Score: {best_f1_exp} ({df_results.loc[best_f1_exp, 'f1']:.4f})")

        # 시각화
        self._plot_comparison(df_results)

        return df_results

    def _plot_comparison(self, df_results):
        """결과 비교 시각화"""
        print("\n[성능 비교 차트]")
        print("(실제 환경에서는 matplotlib으로 시각화)")

        # 각 지표별 비교
        for metric in ['precision', 'recall', 'f1', 'auc']:
            print(f"\n{metric.upper()}:")
            for exp in df_results.index:
                value = df_results.loc[exp, metric]
                bar = '█' * int(value * 50)
                print(f"  {exp:30s} {bar} {value:.4f}")


# ====================================
# 8. 메인 실행 함수
# ====================================

def run_all_experiments(X_train, y_train, X_test, y_test,
                       X_train_no_graph, X_test_no_graph,
                       baseline_model, transaction_data, test_df):
    """전체 실험 실행"""

    framework = ExperimentFramework()

    # ========== 실험 1: Baseline (그래프 피쳐 없음) ==========
    def exp1_baseline(X_tr, y_tr, X_te, y_te):
        from xgboost import XGBClassifier
        model = XGBClassifier(random_state=42, scale_pos_weight=10)
        model.fit(X_tr, y_tr)
        pred = model.predict_proba(X_te)[:, 1]
        return model, pred

    framework.run_experiment(
        "1. Baseline (No Graph Features)",
        exp1_baseline,
        X_train_no_graph, y_train,
        X_test_no_graph, y_test
    )

    # ========== 실험 2: 그래프 피쳐 추가 ==========
    def exp2_with_graph(X_tr, y_tr, X_te, y_te):
        from xgboost import XGBClassifier
        model = XGBClassifier(random_state=42, scale_pos_weight=10)
        model.fit(X_tr, y_tr)
        pred = model.predict_proba(X_te)[:, 1]
        return model, pred

    framework.run_experiment(
        "2. With Graph Features",
        exp2_with_graph,
        X_train, y_train,
        X_test, y_test
    )

    # ========== 실험 3: 시계열 앙상블 ==========
    print("\n⚠️ 시계열 앙상블 실험은 충분한 시계열 데이터가 필요합니다.")
    # 실제 구현 시 활성화

    # ========== 실험 4: 간소화 GNN ==========
    print("\n⚠️ GNN 실험은 그래프 구조 데이터가 필요합니다.")
    # 실제 구현 시 활성화

    # ========== 실험 결과 비교 ==========
    results_df = framework.compare_all_experiments()

    # ========== 백테스트 ==========
    best_model = framework.experiments["2. With Graph Features"]['model']
    backtester = AMLBacktester(best_model)
    backtest_results = backtester.run_backtest(
        X_test, y_test, test_df, transaction_data
    )

    # ========== XAI 분석 ==========
    explainer = ModelExplainer(best_model, X_train.columns.tolist())
    feature_importance = explainer.explain_with_shap(X_train, X_test)

    return framework, backtest_results, feature_importance


print("=" * 80)
print("AML 고도화 모델 및 실험 프레임워크 로드 완료")
print("=" * 80)


# ====================================
# 8. 고도화 실험 실행 파이프라인
# ====================================

def main_with_experiments():
    """전체 파이프라인 + 고도화 실험 실행"""

    print("=" * 80)
    print("금융결제원 AML 탐지 프로젝트 - 고도화 실험")
    print("=" * 80)

    # ========== 1. 기본 데이터 로드 및 전처리 ==========
    loader = AMLDataLoader(
        trans_path=trans_path,
        accounts_path=accounts_path
    )

    df_trans, df_accounts = loader.load_data()
    df_trans = loader.explore_data(df_trans, df_accounts)

    # 샘플링 비율을 높여서 더 많은 데이터 확보
    preprocessor = AMLPreprocessor(sample_ratio=0.3)    # 10% → 30%로 증가

    # 시간 단위 배치 샘플 생성
    all_trans, account_time_labels = preprocessor.create_hourly_samples(df_trans)
    sampled_labels = preprocessor.stratified_sample(account_time_labels)

    print(f"\n전체 계좌-시간 샘플: {len(account_time_labels)}건")
    print(f"자금세탁 샘플: {account_time_labels['Is Laundering'].sum()}건")

    # 자금세탁 패턴 분석
    laundering, normal = preprocessor.analyze_laundering_patterns(df_trans)

    # 샘플링 (큰 데이터를 작게)
    sampled_labels = preprocessor.stratified_sample(account_time_labels)

    print(f"\n샘플링 후:")
    print(f"  총 샘플: {len(sampled_labels)}건")
    print(f"  자금세탁: {sampled_labels['Is Laundering'].sum()}건")
    print(f"  자금세탁 비율: {sampled_labels['Is Laundering'].mean():.4%}")

    # 자금세탁 건이 너무 적으면 경고
    if sampled_labels['Is Laundering'].sum() < 10:
        print("\n⚠️ 경고: 자금세탁 샘플이 너무 적습니다!")
        print("sample_ratio를 더 높이거나 전체 데이터를 사용하세요.")
        print("일단 전체 데이터로 진행합니다...")
        sampled_labels = account_time_labels

    # ========== 2. 피쳐 생성 ==========
    feature_engineer = FeatureEngineer()

    # 집계 피쳐 생성 (더 많은 샘플 처리)
    print("\n⚠️ 주의: 피쳐 생성에 시간이 걸릴 수 있습니다.")
    print(f"최대 {min(2000, len(sampled_labels))}건 처리")

    # 집계 피쳐 (그래프 피쳐 제외)
    agg_features = feature_engineer.create_aggregation_features(
        all_trans,
        sampled_labels
    )

    print(f"\n집계 피쳐 생성 완료: {len(agg_features)}건")

    # 그래프 피쳐 생성
    graph_features = feature_engineer.create_graph_features(
        all_trans,
        sampled_labels.head(len(agg_features))    # 집계 피쳐와 동일한 수만큼
    )

    print(f"그래프 피쳐 생성 완료: {len(graph_features)}건")

    # 그래프 피쳐명 추출
    graph_feature_names = [c for c in graph_features.columns
                          if c not in ['Account', 'TimeUnit']]

    print(f"\n그래프 피쳐 수: {len(graph_feature_names)}개")
    print(f"그래프 피쳐: {graph_feature_names}")

    # 피쳐 병합 (그래프 포함)
    all_features = agg_features.merge(
        graph_features,
        on=['Account', 'TimeUnit'],
        how='inner'
    )

    print(f"\n피쳐 병합 후: {len(all_features)}건")
    print(f"전체 피쳐 수: {len([c for c in all_features.columns if c not in ['Account', 'TimeUnit']])}")

    # 그래프 피쳐 제외 버전
    features_no_graph = agg_features.copy()

    # ========== 3.모델 학습 ==========
    trainer = AMLModelTrainer()

    # Train/Test 분할 (test_size를 크게 해서 테스트 데이터 확보)
    # 그래프 피쳐 포함 버전
    X_train, X_test, y_train, y_test, test_df = trainer.prepare_train_test_split(
        all_features,
        sampled_labels,
        test_size=0.4   # 30% → 40%로 증가
    )
    # 그래프 피쳐 제외 버전
    X_train_no_graph, X_test_no_graph, _, _, _ = trainer.prepare_train_test_split(
        features_no_graph,
        sampled_labels,
        test_size=0.4
    )

    # 데이터가 없으면 중단
    if len(X_train) == 0 or len(X_test) == 0:
        print("\n⚠️ 학습 또는 테스트 데이터가 없습니다. 프로세스를 중단합니다.")
        print("\n문제 해결 방법:")
        print("1. sample_ratio를 1.0으로 설정 (전체 데이터 사용)")
        print("2. 피쳐 생성의 max_samples 제한 제거")
        print("3. 데이터 파일 경로 확인")
        return None

    # Baseline 모델 학습
    print("\n" + "=" * 80)
    print("Baseline 모델 학습")
    print("=" * 80)

    baseline_model, baseline_pred = trainer.train_baseline_model(
        X_train, y_train, X_test, y_test,
        use_smote=False,  # SMOTE 사용 여부
        scale_pos_weight=None  # Auto 계산
    )

    # ========== 5. 평가 ==========
    # Top-K 평가
    topk_results = trainer.evaluate_topk(
        y_test.values,
        y_pred_proba,
        test_df,
        k_values=[10, 20, 50]  # K 값을 작게 조정
    )

    # Feature Importance (XAI)
    if len(X_train) > 0 and len(X_test) > 0:
        feature_names = [c for c in X_train.columns]
        feature_importance = trainer.explain_with_shap(
            baseline_model,
            X_train,
            X_test,
            feature_names
        )
    else:
        feature_importance = None

    # ========== 5. 고도화 실험 실행 ==========

    # 실험 프레임워크 초기화
    exp_framework = ExperimentFramework()

    # ---------- 실험 1: 그래프 피쳐 없음 ----------
    print("\n" + "=" * 80)
    print("실험 1: Baseline (그래프 피쳐 제외)")
    print("=" * 80)

    def train_no_graph(X_tr, y_tr, X_te, y_te):
        scale_pos_weight = (len(y_tr) - y_tr.sum()) / y_tr.sum() if y_tr.sum() > 0 else 1.0
        model = XGBClassifier(
            n_estimators=100,
            max_depth=6,
            learning_rate=0.1,
            scale_pos_weight=scale_pos_weight,
            random_state=42,
            eval_metric='logloss'
        )
        model.fit(X_tr, y_tr)
        pred = model.predict_proba(X_te)[:, 1]
        return model, pred

    model_no_graph, pred_no_graph = exp_framework.run_experiment(
        "Baseline (No Graph)",
        train_no_graph,
        X_train_no_graph, y_train,
        X_test_no_graph, y_test
    )

    # ---------- 실험 2: 그래프 피쳐 포함 ----------
    print("\n" + "=" * 80)
    print("실험 2: 그래프 피쳐 포함")
    print("=" * 80)

    def train_with_graph(X_tr, y_tr, X_te, y_te):
        scale_pos_weight = (len(y_tr) - y_tr.sum()) / y_tr.sum() if y_tr.sum() > 0 else 1.0
        model = XGBClassifier(
            n_estimators=100,
            max_depth=6,
            learning_rate=0.1,
            scale_pos_weight=scale_pos_weight,
            random_state=42,
            eval_metric='logloss'
        )
        model.fit(X_tr, y_tr)
        pred = model.predict_proba(X_te)[:, 1]
        return model, pred

    model_with_graph, pred_with_graph = exp_framework.run_experiment(
        "With Graph Features",
        train_with_graph,
        X_train, y_train,
        X_test, y_test
    )

    # ---------- 실험 3: 시계열 앙상블 ----------
    if len(X_train) >= 10:  # 최소 데이터 요구사항
        print("\n" + "=" * 80)
        print("실험 3: 시계열 앙상블")
        print("=" * 80)

        try:
            ensemble = TimeSeriesEnsemble(model_with_graph)
            ensemble.train_lstm(X_train, y_train, epochs=10)
            pred_ensemble = ensemble.predict_ensemble(X_test)

            # 수동으로 메트릭 계산
            y_pred_ensemble = (pred_ensemble >= 0.5).astype(int)
            from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score

            metrics_ensemble = {
                'experiment': 'Time Series Ensemble',
                'precision': precision_score(y_test, y_pred_ensemble, zero_division=0),
                'recall': recall_score(y_test, y_pred_ensemble, zero_division=0),
                'f1': f1_score(y_test, y_pred_ensemble, zero_division=0),
                'auc': roc_auc_score(y_test, pred_ensemble) if len(np.unique(y_test)) > 1 else 0
            }

            exp_framework.results_summary.append(metrics_ensemble)

            print(f"\n[시계열 앙상블 결과]")
            for metric, value in metrics_ensemble.items():
                if metric != 'experiment':
                    print(f"  {metric}: {value:.4f}")
        except Exception as e:
            print(f"⚠️ 시계열 앙상블 실패: {e}")
    else:
        print("\n⚠️ 시계열 앙상블: 데이터 부족으로 스킵")

    # ---------- 실험 4: 간소화 GNN ----------
    print("\n" + "=" * 80)
    print("실험 4: 간소화 GNN")
    print("=" * 80)
    print("⚠️ GNN 실험은 에지 리스트 생성이 필요합니다.")
    print("현재는 스킵하고, 필요 시 활성화할 수 있습니다.")

    # GNN 실험 예시 (실제 구현 시)
    # from aml_advanced_models import SimpleGNN
    # gnn = SimpleGNN(input_dim=X_train.shape[1])
    # edge_list = [(0, 1), (1, 2), ...]  # 거래 그래프 엣지
    # gnn.train_model(X_train.values, y_train, edge_list)
    # pred_gnn = gnn.predict(X_test.values, edge_list)

    # ========== 6. 전체 실험 결과 비교 ==========
    results_df = exp_framework.compare_all_experiments()

    # ========== 7. 백테스트 시뮬레이션 ==========
    print("\n" + "=" * 80)
    print("백테스트 시뮬레이션")
    print("=" * 80)

    best_model = model_with_graph
    backtester = AMLBacktester(best_model, threshold=0.5)

    backtest_results = backtester.run_backtest(
        X_test, y_test, test_df, all_trans
    )

    # ========== 8. XAI 분석 ==========
    print("\n" + "=" * 80)
    print("XAI - 모델 설명 가능성 분석")
    print("=" * 80)

    explainer = ModelExplainer(best_model, X_train.columns.tolist())
    feature_importance = explainer.explain_with_shap(X_train, X_test)

    # ========== 9. 최종 요약 ==========
    print("\n" + "=" * 80)
    print("최종 요약")
    print("=" * 80)

    print("\n[실험 결과 요약]")
    print(results_df)

    if feature_importance is not None:
        print("\n[Top 10 중요 피쳐]")
        print(feature_importance.head(10))

    print("\n[권장 사항]")
    best_exp = results_df['f1'].idxmax()
    print(f"  1. 최고 성능 모델: {best_exp}")
    print(f"     - F1 Score: {results_df.loc[best_exp, 'f1']:.4f}")
    print(f"  2. 그래프 피쳐의 효과:")

    if 'With Graph Features' in results_df.index and 'Baseline (No Graph)' in results_df.index:
        f1_with = results_df.loc['With Graph Features', 'f1']
        f1_without = results_df.loc['Baseline (No Graph)', 'f1']
        improvement = ((f1_with - f1_without) / f1_without * 100) if f1_without > 0 else 0
        print(f"     - F1 Score 개선: {improvement:+.2f}%")

    print(f"  3. 백테스트 결과:")
    if backtest_results is not None and len(backtest_results) > 0:
        detected_count = len(backtest_results[
            (backtest_results['predicted'] == 1) &
            (backtest_results['actual'] == 1)
        ])
        total_laundering = backtest_results['actual'].sum()
        print(f"     - 탐지율: {detected_count}/{total_laundering} "
              f"({detected_count/total_laundering*100:.2f}%)" if total_laundering > 0 else "N/A")

    return {
        'experiment_framework': exp_framework,
        'results': results_df,
        'backtest': backtest_results,
        'feature_importance': feature_importance,
        'best_model': best_model
    }

AML 고도화 모델 및 실험 프레임워크 로드 완료


In [5]:
if __name__ == "__main__":
    # 실행
    results = main()

금융결제원 AML 탐지 프로젝트 시작
데이터 로딩 중...
거래 데이터 shape: (5078345, 11)
계좌 데이터 shape: (518581, 5)

데이터 탐색

[거래 데이터 샘플]
          Timestamp  From Bank    Account  To Bank  Account.1  \
0  2022/09/01 00:20         10  8000EBD30       10  8000EBD30   
1  2022/09/01 00:20       3208  8000F4580        1  8000F5340   
2  2022/09/01 00:00       3209  8000F4670     3209  8000F4670   
3  2022/09/01 00:02         12  8000F5030       12  8000F5030   
4  2022/09/01 00:06         10  8000F5200       10  8000F5200   

   Amount Received Receiving Currency  Amount Paid Payment Currency  \
0          3697.34          US Dollar      3697.34        US Dollar   
1             0.01          US Dollar         0.01        US Dollar   
2         14675.57          US Dollar     14675.57        US Dollar   
3          2806.97          US Dollar      2806.97        US Dollar   
4         36682.97          US Dollar     36682.97        US Dollar   

  Payment Format  Is Laundering  
0   Reinvestment              0  
1     

ModuleNotFoundError: No module named 'aml_detection_pipeline'

In [None]:
if __name__ == "__main__":
    print("=" * 80)
    print("AML 탐지 프로젝트 - 고도화 실험 시작")
    print("=" * 80)
    print("\n실행 방법:")
    print("  results = main_with_experiments()")
    print("\n포함된 실험:")
    print("  1. Baseline (그래프 피쳐 제외)")
    print("  2. 그래프 피쳐 추가")
    print("  3. 시계열 앙상블")
    print("  4. 간소화 GNN")
    print("  5. 백테스트 시뮬레이션")
    print("  6. XAI 분석 (SHAP)")
    print("=" * 80)

    # 실행
    results = main_with_experiments()