# FDC 이상감지 알고리즘 추천 Agent

FDC(Fault Detection and Classification)에서 1초 단위로 수집되는 PV(Process Variable) 데이터를 분석하여 최적의 이상감지 알고리즘을 자동으로 추천합니다.

## 주요 기능
1. **데이터 로딩 및 전처리**: CSV 형태의 PV 데이터와 이상 발생 시점 정보 로딩
2. **탐색적 데이터 분석 (EDA)**: 데이터 특성 자동 분석
3. **특성 공학**: 이상감지에 유용한 특성 자동 추출
4. **알고리즘 학습 및 평가**: 8가지 이상감지 알고리즘 비교
5. **최적 알고리즘 추천**: 데이터 특성 기반 자동 추천
6. **결과 리포트 생성**: 상세 분석 리포트

## 1. 환경 설정 및 라이브러리 임포트

In [None]:
# 기본 라이브러리
import sys
import os
import warnings
warnings.filterwarnings('ignore')

# 경로 설정
sys.path.insert(0, os.path.abspath('../src'))

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm

# 시각화 설정
plt.style.use('seaborn-v0_8-whitegrid')
plt.rcParams['figure.figsize'] = (12, 6)
plt.rcParams['font.size'] = 12

# 커스텀 모듈
from anomaly_detection import DataLoader, FeatureEngineer, ModelEvaluator, AlgorithmRecommender
from anomaly_detection.algorithms import (
    ZScoreDetector, CUSUMDetector, SPCDetector,
    IsolationForestDetector, LOFDetector, OneClassSVMDetector,
    AutoEncoderDetector, LSTMAutoEncoderDetector
)

print("환경 설정 완료!")

## 2. 데이터 로딩

PV 데이터와 이상 발생 시점 정보를 로딩합니다.

In [None]:
# 데이터 경로 설정 (샘플 데이터 또는 실제 데이터 경로로 변경)
PV_DATA_PATH = '../sample_data/sample_pv_data.csv'
FAULT_LABELS_PATH = '../sample_data/sample_fault_labels.csv'

# 샘플 데이터가 없으면 생성
if not os.path.exists(PV_DATA_PATH):
    print("샘플 데이터가 없습니다. 생성 중...")
    exec(open('../scripts/generate_sample_data.py').read())

In [None]:
# 데이터 로더 초기화
loader = DataLoader()

# PV 데이터 로딩
pv_data = loader.load_pv_data(PV_DATA_PATH)
print(f"\nPV 데이터 컬럼: {list(pv_data.columns)}")
pv_data.head()

In [None]:
# 이상 발생 정보 로딩
fault_labels = loader.load_fault_labels(FAULT_LABELS_PATH)
print(f"\n이상 발생 정보:")
fault_labels

In [None]:
# 데이터 병합
merged_data = loader.merge_data()

# 전처리
processed_data = loader.preprocess(fill_missing='interpolate')
print(f"\n전처리 완료 데이터 shape: {processed_data.shape}")

## 3. 탐색적 데이터 분석 (EDA)

In [None]:
# 데이터 특성 분석
characteristics = loader.analyze_characteristics()

# 요약 정보 출력
summary = loader.summary()
print("=" * 50)
print("데이터 특성 요약")
print("=" * 50)
for key, value in summary.items():
    print(f"{key}: {value}")

In [None]:
# 시계열 시각화
feature_cols = loader.get_feature_columns()

fig, axes = plt.subplots(len(feature_cols), 1, figsize=(14, 3*len(feature_cols)), sharex=True)

for i, col in enumerate(feature_cols):
    ax = axes[i] if len(feature_cols) > 1 else axes
    
    # 정상 데이터
    normal_mask = processed_data['is_anomaly'] == 0
    ax.plot(processed_data.loc[normal_mask, 'timestamp'], 
            processed_data.loc[normal_mask, col], 
            'b-', alpha=0.5, label='정상', linewidth=0.5)
    
    # 이상 데이터
    anomaly_mask = processed_data['is_anomaly'] == 1
    ax.scatter(processed_data.loc[anomaly_mask, 'timestamp'], 
               processed_data.loc[anomaly_mask, col], 
               c='red', s=1, alpha=0.5, label='이상')
    
    ax.set_ylabel(col)
    ax.legend(loc='upper right')

axes[-1].set_xlabel('Timestamp') if len(feature_cols) > 1 else axes.set_xlabel('Timestamp')
plt.suptitle('PV 데이터 시계열 (빨간점: 이상)', fontsize=14)
plt.tight_layout()
plt.show()

In [None]:
# 상관관계 히트맵
plt.figure(figsize=(10, 8))
corr_matrix = processed_data[feature_cols].corr()
sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', center=0, 
            square=True, linewidths=0.5)
plt.title('센서 간 상관관계')
plt.tight_layout()
plt.show()

In [None]:
# 이상 유형별 분포 (fault_labels에서)
if 'fault_type' in fault_labels.columns:
    plt.figure(figsize=(10, 5))
    fault_labels['fault_type'].value_counts().plot(kind='bar', color='coral')
    plt.title('이상 유형별 발생 빈도')
    plt.xlabel('이상 유형')
    plt.ylabel('발생 횟수')
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()

## 4. 특성 공학 (Feature Engineering)

In [None]:
# 특성 추출기 초기화
feature_engineer = FeatureEngineer()

# 특성 추출 (시간이 걸릴 수 있음)
print("특성 추출 중... (데이터 크기에 따라 몇 분 소요될 수 있습니다)")
enriched_data = feature_engineer.extract_all_features(
    data=processed_data,
    feature_cols=feature_cols,
    window_sizes=[5, 10, 30],
    include_fft=True,
    include_peaks=True
)

print(f"\n추출된 특성 수: {len(feature_engineer.feature_names)}")
print(f"최종 데이터 shape: {enriched_data.shape}")

In [None]:
# 특성 중요도 분석
importance_df = feature_engineer.get_feature_importance_ranking(enriched_data)

# 상위 20개 특성 시각화
plt.figure(figsize=(12, 8))
top_n = 20
plt.barh(range(top_n), importance_df.head(top_n)['importance'].values, color='steelblue')
plt.yticks(range(top_n), importance_df.head(top_n)['feature'].values)
plt.xlabel('중요도 (상관계수)')
plt.title(f'이상 감지를 위한 상위 {top_n}개 특성')
plt.gca().invert_yaxis()
plt.tight_layout()
plt.show()

In [None]:
# 상위 50개 특성 선택
selected_data = feature_engineer.select_top_features(
    enriched_data, 
    top_n=50, 
    keep_original=True
)

print(f"선택된 특성 수: {selected_data.shape[1] - 2}개")  # timestamp, is_anomaly 제외

## 5. 데이터 분할 및 준비

In [None]:
# 학습/테스트 분할 (시계열 순서 유지)
train_data, test_data = loader.get_train_test_split(
    selected_data, 
    test_ratio=0.2, 
    temporal_split=True
)

# 특성과 라벨 분리
exclude_cols = ['timestamp', 'is_anomaly']
feature_columns = [c for c in selected_data.columns if c not in exclude_cols]

X_train = train_data[feature_columns].values
y_train = train_data['is_anomaly'].values
X_test = test_data[feature_columns].values
y_test = test_data['is_anomaly'].values

print(f"학습 데이터: {X_train.shape}, 이상 비율: {y_train.mean()*100:.2f}%")
print(f"테스트 데이터: {X_test.shape}, 이상 비율: {y_test.mean()*100:.2f}%")

## 6. 알고리즘 추천 및 분석

In [None]:
# 추천 엔진 초기화
recommender = AlgorithmRecommender()

# 데이터 특성 기반 추천
recommendations = recommender.analyze_and_recommend(
    characteristics=characteristics,
    priorities={
        'speed': 0.2,
        'accuracy': 0.4,
        'interpretability': 0.2,
        'early_detection': 0.2
    }
)

# 추천 리포트 출력
print(recommender.generate_recommendation_report())

## 7. 알고리즘 학습 및 평가

In [None]:
# 평가기 초기화
evaluator = ModelEvaluator()

# 모든 알고리즘 정의
algorithms = {
    'Z-Score': ZScoreDetector(threshold=3.0),
    'CUSUM': CUSUMDetector(threshold=5.0),
    'SPC': SPCDetector(n_sigma=3.0),
    'Isolation Forest': IsolationForestDetector(contamination=0.05),
    'LOF': LOFDetector(n_neighbors=20),
    'One-Class SVM': OneClassSVMDetector(nu=0.05),
}

# 딥러닝 모델 추가 (데이터가 충분할 경우)
if len(X_train) >= 5000:
    algorithms['AutoEncoder'] = AutoEncoderDetector(epochs=30, batch_size=64)
    
if len(X_train) >= 10000:
    algorithms['LSTM-AE'] = LSTMAutoEncoderDetector(sequence_length=30, epochs=30)

In [None]:
import time

# 알고리즘 평가
scores_dict = {}

for name, detector in tqdm(algorithms.items(), desc="알고리즘 평가"):
    print(f"\n{'='*50}")
    print(f"평가 중: {name}")
    print(f"{'='*50}")
    
    try:
        # 학습
        start_time = time.time()
        detector.fit(X_train, y_train)
        training_time = time.time() - start_time
        
        # 예측
        start_time = time.time()
        y_pred = detector.predict(X_test)
        prediction_time = time.time() - start_time
        
        # 점수
        y_score = detector.predict_score(X_test)
        scores_dict[name] = y_score
        
        # 평가
        result = evaluator.evaluate(
            y_true=y_test,
            y_pred=y_pred,
            y_score=y_score,
            algorithm_name=name,
            training_time=training_time,
            prediction_time=prediction_time
        )
        
        print(f"  Precision: {result.precision:.4f}")
        print(f"  Recall: {result.recall:.4f}")
        print(f"  F1 Score: {result.f1_score:.4f}")
        print(f"  AUC-ROC: {result.auc_roc:.4f}")
        print(f"  학습 시간: {training_time:.2f}초")
        
    except Exception as e:
        print(f"  오류 발생: {str(e)}")

In [None]:
# 결과 비교 테이블
comparison_df = evaluator.compare_algorithms()
print("\n" + "="*80)
print("알고리즘 성능 비교")
print("="*80)
display(comparison_df)

In [None]:
# 성능 시각화
try:
    evaluator.plot_comparison(figsize=(14, 6))
except:
    # 기본 바 차트로 대체
    fig, ax = plt.subplots(figsize=(12, 6))
    
    metrics = ['precision', 'recall', 'f1_score', 'auc_roc']
    x = np.arange(len(evaluator.results))
    width = 0.2
    
    for i, metric in enumerate(metrics):
        values = [getattr(r, metric) for r in evaluator.results]
        ax.bar(x + i*width, values, width, label=metric)
    
    ax.set_xticks(x + width*1.5)
    ax.set_xticklabels([r.algorithm_name for r in evaluator.results], rotation=45, ha='right')
    ax.legend()
    ax.set_ylim(0, 1.1)
    ax.set_title('알고리즘 성능 비교')
    plt.tight_layout()
    plt.show()

In [None]:
# ROC 곡선 비교
if len(scores_dict) > 0 and len(np.unique(y_test)) > 1:
    evaluator.plot_roc_curves(y_test, scores_dict, figsize=(14, 6))

## 8. 최종 추천 및 리포트

In [None]:
# 최고 성능 알고리즘 선정
best_name, best_result = evaluator.get_best_algorithm(metric='weighted')

print("="*70)
print("최종 추천 알고리즘")
print("="*70)
print(f"\n추천: {best_name}")
print(f"\n성능 지표:")
print(f"  - Accuracy: {best_result.accuracy:.4f}")
print(f"  - Precision: {best_result.precision:.4f}")
print(f"  - Recall: {best_result.recall:.4f}")
print(f"  - F1 Score: {best_result.f1_score:.4f}")
print(f"  - AUC-ROC: {best_result.auc_roc:.4f}")
print(f"  - AUC-PR: {best_result.auc_pr:.4f}")
print(f"  - 조기 감지 점수: {best_result.early_detection_score:.4f}")
print(f"\n혼동 행렬:")
print(f"  TN: {best_result.confusion_matrix[0,0]}, FP: {best_result.confusion_matrix[0,1]}")
print(f"  FN: {best_result.confusion_matrix[1,0]}, TP: {best_result.confusion_matrix[1,1]}")

In [None]:
# 전체 평가 리포트
print(evaluator.generate_report())

In [None]:
# 순위별 알고리즘 목록
print("\n알고리즘 순위 (F1 Score 기준):")
for i, (name, score) in enumerate(evaluator.get_ranking('f1_score'), 1):
    print(f"  {i}. {name}: {score:.4f}")

## 9. 구현 가이드

In [None]:
# 최고 성능 알고리즘의 구현 가이드
print(recommender.get_implementation_guide(best_name))

## 10. 결과 저장

In [None]:
# 결과 저장
output_dir = '../results'
os.makedirs(output_dir, exist_ok=True)

# 비교 테이블 저장
comparison_df.to_csv(f'{output_dir}/algorithm_comparison.csv', index=False)
print(f"비교 결과 저장: {output_dir}/algorithm_comparison.csv")

# 리포트 저장
with open(f'{output_dir}/evaluation_report.txt', 'w', encoding='utf-8') as f:
    f.write(evaluator.generate_report())
    f.write("\n\n")
    f.write(recommender.generate_recommendation_report())
print(f"리포트 저장: {output_dir}/evaluation_report.txt")

---

## 요약

이 Agent는 FDC PV 데이터를 분석하여 최적의 이상감지 알고리즘을 자동으로 추천합니다.

### 사용 방법
1. `PV_DATA_PATH`와 `FAULT_LABELS_PATH`를 실제 데이터 경로로 변경
2. 노트북 전체 실행
3. 추천 결과 확인 및 운영 환경 적용

### 지원 알고리즘
- **통계 기반**: Z-Score, CUSUM, SPC
- **ML 기반**: Isolation Forest, LOF, One-Class SVM
- **딥러닝 기반**: AutoEncoder, LSTM-AutoEncoder

### 문의
추가 기능이나 개선 사항은 개발팀에 문의하세요.