# Enable Agent Tutorial Part 2: FDS Enable Agent 구현

## 개요

Part 1에서 학습한 이상치 탐지 모델(Isolation Forest + Autoencoder)을 Enable Agent로 변환한다.

## 학습 내용

- YAML 기반 스킬 정의 파일 작성
- FDSEnableAgent 클래스 구현
- 위험도 등급 시스템 (LOW/MEDIUM/HIGH/CRITICAL)
- OpenAI Function Calling 통합

---
## 1. 라이브러리 임포트

In [1]:
import os
import json
import yaml
import joblib
import numpy as np
import pandas as pd
from pathlib import Path
from datetime import datetime
from typing import Dict, Any, Tuple, List

import torch
import torch.nn as nn

from dotenv import load_dotenv
from openai import OpenAI

load_dotenv()
print("라이브러리 임포트 완료")

라이브러리 임포트 완료


---
## 2. YAML 스킬 파일 생성

In [2]:
skill_definition = {
    'agent_name': 'FraudDetectionAgent',
    'version': '1.0.0',
    'description': '거래 데이터를 분석하여 사기 가능성을 탐지하는 이상치 탐지 에이전트',
    
    'capabilities': {
        'primary': '거래의 사기 가능성을 실시간으로 탐지하고 위험도를 평가한다',
        'secondary': [
            '앙상블 이상치 점수 산출',
            '위험도 등급 분류 (LOW/MEDIUM/HIGH/CRITICAL)',
            '의심 요인 분석',
            'AI 기반 탐지 설명 생성'
        ]
    },
    
    'input_schema': {
        'type': 'object',
        'properties': {
            'purchase_value': {'type': 'number', 'description': '구매 금액', 'minimum': 0},
            'age': {'type': 'integer', 'description': '사용자 나이', 'minimum': 0, 'maximum': 120},
            'source': {'type': 'string', 'description': '유입 경로', 'enum': ['SEO', 'Ads', 'Direct']},
            'browser': {'type': 'string', 'description': '브라우저', 'enum': ['Chrome', 'Safari', 'FireFox', 'IE', 'Opera']},
            'sex': {'type': 'string', 'description': '성별', 'enum': ['M', 'F']},
            'signup_time': {'type': 'string', 'description': '가입 시간 (ISO format)'},
            'purchase_time': {'type': 'string', 'description': '구매 시간 (ISO format)'}
        },
        'required': ['purchase_value', 'age', 'source', 'browser', 'sex', 'signup_time', 'purchase_time']
    },
    
    'output_schema': {
        'type': 'object',
        'properties': {
            'is_fraud': {'type': 'boolean', 'description': '사기 여부'},
            'risk_level': {'type': 'string', 'description': '위험도 등급'},
            'anomaly_score': {'type': 'number', 'description': '이상치 점수 (0-1)'},
            'risk_factors': {'type': 'array', 'description': '위험 요인 목록'}
        }
    },
    
    'model_info': {
        'isolation_forest_path': 'models/isolation_forest.pkl',
        'autoencoder_path': 'models/autoencoder.pth',
        'scaler_path': 'models/fds_scaler.pkl',
        'label_encoders_path': 'models/label_encoders.pkl',
        'metadata_path': 'models/fds_metadata.json'
    },
    
    'risk_thresholds': {
        'low': 0.3,
        'medium': 0.5,
        'high': 0.7,
        'critical': 0.85
    }
}

Path('skills').mkdir(exist_ok=True)
skill_path = 'skills/fds_agent_skill.yaml'

with open(skill_path, 'w', encoding='utf-8') as f:
    yaml.dump(skill_definition, f, default_flow_style=False, allow_unicode=True, sort_keys=False)

print(f"스킬 파일 생성 완료: {skill_path}")

스킬 파일 생성 완료: skills/fds_agent_skill.yaml


---
## 3. FDSEnableAgent 클래스 구현

In [3]:
class FraudAutoencoder(nn.Module):
    """이상치 탐지용 Autoencoder"""
    
    def __init__(self, input_dim=10, latent_dim=4):
        super(FraudAutoencoder, self).__init__()
        
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 16),
            nn.ReLU(),
            nn.Linear(16, 8),
            nn.ReLU(),
            nn.Linear(8, latent_dim)
        )
        
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 8),
            nn.ReLU(),
            nn.Linear(8, 16),
            nn.ReLU(),
            nn.Linear(16, input_dim)
        )
    
    def forward(self, x):
        latent = self.encoder(x)
        reconstructed = self.decoder(latent)
        return reconstructed
    
    def get_reconstruction_error(self, x):
        with torch.no_grad():
            reconstructed = self.forward(x)
            error = torch.mean((x - reconstructed) ** 2, dim=1)
        return error

print("FraudAutoencoder 클래스 정의 완료")

FraudAutoencoder 클래스 정의 완료


In [4]:
class FDSEnableAgent:
    """이상치 탐지 기반 사기 탐지 Enable Agent"""
    
    def __init__(self, skill_path: str):
        with open(skill_path, 'r', encoding='utf-8') as f:
            self.skill = yaml.safe_load(f)
        
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        
        self.iso_forest = joblib.load(self.skill['model_info']['isolation_forest_path'])
        
        checkpoint = torch.load(
            self.skill['model_info']['autoencoder_path'],
            map_location=self.device,
            weights_only=True
        )
        self.autoencoder = FraudAutoencoder(
            input_dim=checkpoint['input_dim'],
            latent_dim=checkpoint['latent_dim']
        ).to(self.device)
        self.autoencoder.load_state_dict(checkpoint['model_state_dict'])
        self.autoencoder.eval()
        self.ae_threshold = checkpoint['threshold']
        
        self.scaler = joblib.load(self.skill['model_info']['scaler_path'])
        self.label_encoders = joblib.load(self.skill['model_info']['label_encoders_path'])
        
        with open(self.skill['model_info']['metadata_path'], 'r', encoding='utf-8') as f:
            self.metadata = json.load(f)
        
        self.risk_thresholds = self.skill['risk_thresholds']
        self.client = OpenAI()
        
        print(f"FDS Enable Agent 초기화 완료: {self.skill['agent_name']}")
    
    def get_capability_description(self) -> str:
        return f"""
Agent: {self.skill['agent_name']} v{self.skill['version']}
설명: {self.skill['description']}
주요 기능: {self.skill['capabilities']['primary']}
모델 AUC-ROC: {self.metadata['test_auc_roc']:.4f}
""".strip()
    
    def _engineer_features(self, input_data: Dict[str, Any]) -> Tuple[np.ndarray, Dict[str, Any]]:
        signup_time = pd.to_datetime(input_data['signup_time'])
        purchase_time = pd.to_datetime(input_data['purchase_time'])
        
        time_diff_hours = (purchase_time - signup_time).total_seconds() / 3600
        signup_hour = signup_time.hour
        purchase_hour = purchase_time.hour
        is_weekend = 1 if purchase_time.dayofweek in [5, 6] else 0
        is_night = 1 if 0 <= purchase_hour < 6 else 0
        
        source_encoded = self.label_encoders['source'].transform([input_data['source']])[0]
        browser_encoded = self.label_encoders['browser'].transform([input_data['browser']])[0]
        sex_encoded = self.label_encoders['sex'].transform([input_data['sex']])[0]
        
        features = np.array([[
            input_data['purchase_value'], input_data['age'], time_diff_hours,
            signup_hour, purchase_hour, is_weekend, is_night,
            source_encoded, browser_encoded, sex_encoded
        ]])
        
        derived = {
            'time_diff_hours': time_diff_hours,
            'signup_hour': signup_hour,
            'purchase_hour': purchase_hour,
            'is_weekend': is_weekend,
            'is_night': is_night
        }
        
        return features, derived
    
    def _calculate_risk_factors(self, input_data: Dict, derived: Dict, score: float) -> List[str]:
        factors = []
        if derived['time_diff_hours'] < 1:
            factors.append("매우 빠른 구매 (가입 후 1시간 이내)")
        elif derived['time_diff_hours'] < 24:
            factors.append("빠른 구매 (가입 후 24시간 이내)")
        if derived['is_night']:
            factors.append("심야 시간대 거래 (00:00-06:00)")
        if input_data['purchase_value'] > 100:
            factors.append(f"고액 거래 (${input_data['purchase_value']})")
        if input_data['age'] < 20 or input_data['age'] > 60:
            factors.append(f"비일반적 연령대 ({input_data['age']}세)")
        if input_data['source'] == 'Direct':
            factors.append("직접 유입 (Direct)")
        if not factors and score > 0.3:
            factors.append("모델이 탐지한 이상 패턴")
        return factors
    
    def _get_risk_level(self, score: float) -> str:
        if score >= self.risk_thresholds['critical']:
            return 'CRITICAL'
        elif score >= self.risk_thresholds['high']:
            return 'HIGH'
        elif score >= self.risk_thresholds['medium']:
            return 'MEDIUM'
        return 'LOW'
    
    def analyze(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
        features, derived = self._engineer_features(input_data)
        features_scaled = self.scaler.transform(features)
        
        iso_score = -self.iso_forest.score_samples(features_scaled)[0]
        
        features_tensor = torch.FloatTensor(features_scaled).to(self.device)
        ae_score = self.autoencoder.get_reconstruction_error(features_tensor).cpu().numpy()[0]
        
        iso_norm = 1 / (1 + np.exp(-2 * (iso_score - 0.5)))
        ae_norm = 1 / (1 + np.exp(-2 * (ae_score / self.ae_threshold - 1)))
        anomaly_score = (iso_norm + ae_norm) / 2
        
        is_fraud = anomaly_score >= self.metadata['ensemble_threshold']
        risk_level = self._get_risk_level(anomaly_score)
        risk_factors = self._calculate_risk_factors(input_data, derived, anomaly_score)
        
        return {
            "is_fraud": bool(is_fraud),
            "risk_level": risk_level,
            "anomaly_score": float(anomaly_score),
            "isolation_forest_score": float(iso_norm),
            "autoencoder_score": float(ae_norm),
            "risk_factors": risk_factors,
            "input_data": input_data,
            "derived_features": derived,
            "timestamp": datetime.now().isoformat()
        }
    
    def generate_explanation(self, result: Dict[str, Any]) -> str:
        prompt = f"""
FDS 분석 결과를 설명해달라.

거래: ${result['input_data']['purchase_value']}, {result['input_data']['age']}세, {result['input_data']['source']}
가입-구매 시간차: {result['derived_features']['time_diff_hours']:.1f}시간
결과: {'사기 의심' if result['is_fraud'] else '정상'}, 위험도: {result['risk_level']}
점수: {result['anomaly_score']:.2%}
위험 요인: {', '.join(result['risk_factors']) if result['risk_factors'] else '없음'}

2-3문장으로 설명해달라. 문장은 ~다로 끝낸다.
"""
        response = self.client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}],
            temperature=0.7,
            max_tokens=200
        )
        return response.choices[0].message.content
    
    def generate_tool_definition(self) -> Dict[str, Any]:
        return {
            "type": "function",
            "function": {
                "name": "analyze_transaction",
                "description": self.skill['description'],
                "parameters": {
                    "type": "object",
                    "properties": {
                        "purchase_value": {"type": "number", "description": "구매 금액"},
                        "age": {"type": "integer", "description": "나이"},
                        "source": {"type": "string", "enum": ["SEO", "Ads", "Direct"]},
                        "browser": {"type": "string", "enum": ["Chrome", "Safari", "FireFox", "IE", "Opera"]},
                        "sex": {"type": "string", "enum": ["M", "F"]},
                        "signup_time": {"type": "string"},
                        "purchase_time": {"type": "string"}
                    },
                    "required": ["purchase_value", "age", "source", "browser", "sex", "signup_time", "purchase_time"]
                }
            }
        }

print("FDSEnableAgent 클래스 정의 완료")

FDSEnableAgent 클래스 정의 완료


---
## 4. Enable Agent 테스트

In [5]:
agent = FDSEnableAgent('skills/fds_agent_skill.yaml')
print(agent.get_capability_description())

FDS Enable Agent 초기화 완료: FraudDetectionAgent
Agent: FraudDetectionAgent v1.0.0
설명: 거래 데이터를 분석하여 사기 가능성을 탐지하는 이상치 탐지 에이전트
주요 기능: 거래의 사기 가능성을 실시간으로 탐지하고 위험도를 평가한다
모델 AUC-ROC: 0.5725


In [6]:
# 정상 거래
normal_tx = {
    'purchase_value': 45, 'age': 35, 'source': 'SEO', 'browser': 'Chrome', 'sex': 'M',
    'signup_time': '2024-01-15T10:30:00', 'purchase_time': '2024-02-20T14:25:00'
}
result_normal = agent.analyze(normal_tx)
print("=== 정상 거래 ===")
print(f"위험도: {result_normal['risk_level']}, 점수: {result_normal['anomaly_score']:.2%}")
print(f"사기: {result_normal['is_fraud']}")

=== 정상 거래 ===
위험도: LOW, 점수: 34.07%
사기: False


In [7]:
# 의심 거래
suspicious_tx = {
    'purchase_value': 150, 'age': 19, 'source': 'Direct', 'browser': 'Opera', 'sex': 'M',
    'signup_time': '2024-03-10T02:30:00', 'purchase_time': '2024-03-10T02:45:00'
}
result_suspicious = agent.analyze(suspicious_tx)
print("=== 의심 거래 ===")
print(f"위험도: {result_suspicious['risk_level']}, 점수: {result_suspicious['anomaly_score']:.2%}")
print(f"위험 요인: {result_suspicious['risk_factors']}")

=== 의심 거래 ===
위험도: HIGH, 점수: 78.80%
위험 요인: ['매우 빠른 구매 (가입 후 1시간 이내)', '심야 시간대 거래 (00:00-06:00)', '고액 거래 ($150)', '비일반적 연령대 (19세)', '직접 유입 (Direct)']


In [8]:
print("=== AI 설명 ===")
explanation = agent.generate_explanation(result_suspicious)
print(explanation)

=== AI 설명 ===
FDS 분석 결과, 해당 거래는 사기 의심으로 분류되었으며, 위험도가 높다고 평가되었다. 위험 요인으로는 가입 후 1시간 이내의 매우 빠른 구매, 심야 시간대 거래, 고액 거래, 비일반적 연령대, 그리고 직접 유입이 포함되었다. 이 모든 요소들이 결합되어 점수가 78.80%로 나타났다.
