# Autoencoder-Based Anomaly Detection for Fraud

This notebook demonstrates an **unsupervised approach** to fraud detection using autoencoders:

1. **Train on legitimate transactions only** (unsupervised)
2. **Detect fraud as anomalies** based on reconstruction error
3. **Tune threshold** for optimal detection
4. **Compare with supervised methods**

## Key Advantage:
Autoencoders can detect novel fraud patterns not seen during training!

---

## 1. Setup & Imports

In [None]:
import pandas as pd
import numpy as np
import warnings
warnings.filterwarnings('ignore')

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns
plt.style.use('seaborn-v0_8-darkgrid')

# Sklearn
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import RobustScaler
from sklearn.metrics import (
    classification_report, confusion_matrix,
    roc_auc_score, average_precision_score,
    precision_recall_curve, roc_curve
)

# TensorFlow/Keras
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, models
from tensorflow.keras.callbacks import EarlyStopping

# Model persistence
import joblib

print(f"TensorFlow version: {tf.__version__}")
print("✓ All libraries imported successfully")

## 2. Load & Prepare Data

In [None]:
# Load dataset
df = pd.read_csv('../data/creditcard.csv')

print(f"Dataset shape: {df.shape}")
print(f"Fraud cases: {df['Class'].sum()} ({100*df['Class'].mean():.3f}%)")

# Feature engineering
df['Amount_log'] = np.log1p(df['Amount'])
df['Hour'] = (df['Time'] // 3600) % 24
df['Day'] = df['Time'] // (3600 * 24)
df['Amount_bin'] = pd.qcut(df['Amount'], q=5, labels=False, duplicates='drop')

print("\n✓ Feature engineering complete")
df.head()

In [None]:
# Train-test split
X = df.drop(columns=['Class'])
y = df['Class']

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, stratify=y, random_state=42
)

# Scale features
scaler = RobustScaler()
cols_to_scale = ['Amount', 'Amount_log', 'Hour', 'Day', 'Amount_bin']

scaler.fit(X_train[cols_to_scale])
X_train[cols_to_scale] = scaler.transform(X_train[cols_to_scale])
X_test[cols_to_scale] = scaler.transform(X_test[cols_to_scale])

print(f"Training set: {len(X_train)} samples")
print(f"Test set: {len(X_test)} samples")
print("✓ Scaling complete")

## 3. Extract Legitimate Transactions for Training

**Key Insight:** Train autoencoder ONLY on normal (non-fraud) transactions

In [None]:
# Extract only legitimate transactions
X_train_normal = X_train[y_train == 0].values
X_train_fraud = X_train[y_train == 1].values

print(f"Training on {len(X_train_normal):,} legitimate transactions")
print(f"Excluding {len(X_train_fraud):,} fraud transactions from training")

# Convert test to numpy
X_test_np = X_test.values

## 4. Build Autoencoder Architecture

In [None]:
def build_autoencoder(n_features, latent_dim=8):
    """
    Build autoencoder with bottleneck architecture.
    
    Architecture:
    Input -> Dense(64) -> Dense(32) -> Bottleneck(8) -> Dense(32) -> Dense(64) -> Output
    """
    # Input layer
    inp = layers.Input(shape=(n_features,), name='input')
    
    # Encoder
    x = layers.Dense(64, activation='relu', name='encoder_1')(inp)
    x = layers.BatchNormalization()(x)
    x = layers.Dropout(0.2)(x)
    
    x = layers.Dense(32, activation='relu', name='encoder_2')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Dropout(0.2)(x)
    
    # Bottleneck (compressed representation)
    encoded = layers.Dense(latent_dim, activation='relu', name='bottleneck')(x)
    
    # Decoder
    x = layers.Dense(32, activation='relu', name='decoder_1')(encoded)
    x = layers.BatchNormalization()(x)
    x = layers.Dropout(0.2)(x)
    
    x = layers.Dense(64, activation='relu', name='decoder_2')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Dropout(0.2)(x)
    
    # Output layer (reconstruct input)
    decoded = layers.Dense(n_features, activation='linear', name='output')(x)
    
    # Build model
    autoencoder = models.Model(inp, decoded, name='fraud_autoencoder')
    autoencoder.compile(optimizer='adam', loss='mse', metrics=['mae'])
    
    return autoencoder

# Build model
n_features = X_train_normal.shape[1]
autoencoder = build_autoencoder(n_features, latent_dim=8)

print("Autoencoder Architecture:")
autoencoder.summary()

## 5. Train Autoencoder

In [None]:
# Early stopping callback
early_stop = EarlyStopping(
    monitor='val_loss',
    patience=5,
    restore_best_weights=True,
    verbose=1
)

# Train the autoencoder
print("Training autoencoder on legitimate transactions...")
history = autoencoder.fit(
    X_train_normal, X_train_normal,  # Input = Output (reconstruction)
    epochs=50,
    batch_size=256,
    validation_split=0.1,
    callbacks=[early_stop],
    verbose=1
)

print("\n✓ Training complete!")

In [None]:
# Plot training history
fig, axes = plt.subplots(1, 2, figsize=(15, 5))

# Loss
axes[0].plot(history.history['loss'], label='Training Loss', linewidth=2)
axes[0].plot(history.history['val_loss'], label='Validation Loss', linewidth=2)
axes[0].set_xlabel('Epoch', fontsize=12)
axes[0].set_ylabel('Loss (MSE)', fontsize=12)
axes[0].set_title('Training History - Loss', fontsize=14, fontweight='bold')
axes[0].legend()
axes[0].grid(alpha=0.3)

# MAE
axes[1].plot(history.history['mae'], label='Training MAE', linewidth=2)
axes[1].plot(history.history['val_mae'], label='Validation MAE', linewidth=2)
axes[1].set_xlabel('Epoch', fontsize=12)
axes[1].set_ylabel('MAE', fontsize=12)
axes[1].set_title('Training History - MAE', fontsize=14, fontweight='bold')
axes[1].legend()
axes[1].grid(alpha=0.3)

plt.tight_layout()
plt.show()

## 6. Compute Reconstruction Errors

In [None]:
def compute_reconstruction_error(model, X):
    """Compute MSE reconstruction error for each sample."""
    reconstructed = model.predict(X, verbose=0)
    mse = np.mean(np.square(X - reconstructed), axis=1)
    return mse

# Compute errors on test set
print("Computing reconstruction errors...")
test_errors = compute_reconstruction_error(autoencoder, X_test_np)

# Separate by class
errors_normal = test_errors[y_test == 0]
errors_fraud = test_errors[y_test == 1]

print(f"\nReconstruction Error Statistics:")
print(f"Legitimate transactions:")
print(f"  Mean: {errors_normal.mean():.6f}")
print(f"  Std:  {errors_normal.std():.6f}")
print(f"  Max:  {errors_normal.max():.6f}")
print(f"\nFraud transactions:")
print(f"  Mean: {errors_fraud.mean():.6f}")
print(f"  Std:  {errors_fraud.std():.6f}")
print(f"  Max:  {errors_fraud.max():.6f}")

In [None]:
# Visualize error distributions
fig, axes = plt.subplots(1, 2, figsize=(15, 5))

# Histogram
axes[0].hist(errors_normal, bins=50, alpha=0.7, label='Legitimate', color='#2ecc71', density=True)
axes[0].hist(errors_fraud, bins=50, alpha=0.7, label='Fraud', color='#e74c3c', density=True)
axes[0].set_xlabel('Reconstruction Error (MSE)', fontsize=12)
axes[0].set_ylabel('Density', fontsize=12)
axes[0].set_title('Reconstruction Error Distribution', fontsize=14, fontweight='bold')
axes[0].legend(fontsize=12)
axes[0].set_yscale('log')
axes[0].grid(alpha=0.3)

# Box plot
data = [errors_normal, errors_fraud]
bp = axes[1].boxplot(data, labels=['Legitimate', 'Fraud'], patch_artist=True)
bp['boxes'][0].set_facecolor('#2ecc71')
bp['boxes'][1].set_facecolor('#e74c3c')
axes[1].set_ylabel('Reconstruction Error (MSE)', fontsize=12)
axes[1].set_title('Reconstruction Error by Class', fontsize=14, fontweight='bold')
axes[1].set_yscale('log')
axes[1].grid(alpha=0.3)

plt.tight_layout()
plt.show()

## 7. Threshold Selection

**Goal:** Find threshold that maximizes recall while maintaining acceptable precision

In [None]:
# Precision-Recall curve
precision, recall, thresholds = precision_recall_curve(y_test, test_errors)

# Find threshold for 90% recall
target_recall = 0.90
indices = np.where(recall >= target_recall)[0]

if len(indices) > 0:
    best_idx = indices[np.argmax(precision[indices])]
    optimal_threshold = thresholds[best_idx] if best_idx < len(thresholds) else thresholds[0]
    optimal_precision = precision[best_idx]
    optimal_recall = recall[best_idx]
else:
    # Fallback: use 95th percentile of normal errors
    optimal_threshold = np.percentile(errors_normal, 95)
    optimal_precision = precision[0]
    optimal_recall = recall[0]

print(f"Optimal Threshold Selection:")
print(f"  Threshold: {optimal_threshold:.6f}")
print(f"  Precision: {optimal_precision:.4f}")
print(f"  Recall: {optimal_recall:.4f}")

In [None]:
# Plot Precision-Recall curve with optimal threshold
fig, axes = plt.subplots(1, 2, figsize=(15, 5))

# PR Curve
axes[0].plot(recall, precision, linewidth=2, label='Autoencoder')
axes[0].scatter([optimal_recall], [optimal_precision], color='red', s=100, 
                zorder=5, label=f'Optimal (thresh={optimal_threshold:.4f})')
axes[0].set_xlabel('Recall', fontsize=12)
axes[0].set_ylabel('Precision', fontsize=12)
axes[0].set_title('Precision-Recall Curve', fontsize=14, fontweight='bold')
axes[0].legend()
axes[0].grid(alpha=0.3)

# Threshold vs Metrics
axes[1].plot(thresholds, precision[:-1], label='Precision', linewidth=2)
axes[1].plot(thresholds, recall[:-1], label='Recall', linewidth=2)
axes[1].axvline(optimal_threshold, color='red', linestyle='--', linewidth=2, label='Optimal Threshold')
axes[1].set_xlabel('Threshold', fontsize=12)
axes[1].set_ylabel('Score', fontsize=12)
axes[1].set_title('Precision & Recall vs Threshold', fontsize=14, fontweight='bold')
axes[1].legend()
axes[1].grid(alpha=0.3)
axes[1].set_xscale('log')

plt.tight_layout()
plt.show()

## 8. Evaluate Autoencoder Performance

In [None]:
# Make predictions with optimal threshold
predictions = (test_errors > optimal_threshold).astype(int)

print("="*60)
print("AUTOENCODER EVALUATION")
print("="*60)

# Classification report
print(classification_report(y_test, predictions, target_names=['Legitimate', 'Fraud']))

# Confusion matrix
cm = confusion_matrix(y_test, predictions)
print("Confusion Matrix:")
print(cm)
tn, fp, fn, tp = cm.ravel()
print(f"\nTN={tn:,}, FP={fp:,}, FN={fn:,}, TP={tp:,}")

# Metrics
roc_auc = roc_auc_score(y_test, test_errors)
pr_auc = average_precision_score(y_test, test_errors)

print(f"\nROC-AUC: {roc_auc:.4f}")
print(f"PR-AUC: {pr_auc:.4f}")

In [None]:
# Visualize confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', cbar=True,
            xticklabels=['Legitimate', 'Fraud'],
            yticklabels=['Legitimate', 'Fraud'])
plt.xlabel('Predicted', fontsize=12)
plt.ylabel('Actual', fontsize=12)
plt.title('Confusion Matrix - Autoencoder', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.show()

## 9. Compare with Supervised Model

In [None]:
# Load XGBoost model for comparison
try:
    xgb_model = joblib.load('../models/xgb.joblib')
    xgb_probs = xgb_model.predict_proba(X_test)[:, 1]
    
    # Plot ROC comparison
    plt.figure(figsize=(10, 6))
    
    # Autoencoder ROC
    fpr_ae, tpr_ae, _ = roc_curve(y_test, test_errors)
    plt.plot(fpr_ae, tpr_ae, label=f'Autoencoder (AUC={roc_auc:.3f})', linewidth=2)
    
    # XGBoost ROC
    fpr_xgb, tpr_xgb, _ = roc_curve(y_test, xgb_probs)
    roc_auc_xgb = roc_auc_score(y_test, xgb_probs)
    plt.plot(fpr_xgb, tpr_xgb, label=f'XGBoost (AUC={roc_auc_xgb:.3f})', linewidth=2)
    
    # Random baseline
    plt.plot([0, 1], [0, 1], 'k--', label='Random', linewidth=1)
    
    plt.xlabel('False Positive Rate', fontsize=12)
    plt.ylabel('True Positive Rate', fontsize=12)
    plt.title('ROC Curve Comparison', fontsize=14, fontweight='bold')
    plt.legend(fontsize=12)
    plt.grid(alpha=0.3)
    plt.tight_layout()
    plt.show()
    
    print(f"Autoencoder ROC-AUC: {roc_auc:.4f}")
    print(f"XGBoost ROC-AUC: {roc_auc_xgb:.4f}")
    
except FileNotFoundError:
    print("XGBoost model not found. Run 01-fraud-detection-complete.ipynb first.")

## 10. Analyze Misclassifications

In [None]:
# Find false negatives (missed frauds)
false_negatives = np.where((predictions == 0) & (y_test == 1))[0]
false_positives = np.where((predictions == 1) & (y_test == 0))[0]

print(f"False Negatives (Missed Frauds): {len(false_negatives)}")
print(f"False Positives (False Alarms): {len(false_positives)}")

if len(false_negatives) > 0:
    fn_errors = test_errors[false_negatives]
    print(f"\nFalse Negative Errors:")
    print(f"  Mean: {fn_errors.mean():.6f}")
    print(f"  Min: {fn_errors.min():.6f}")
    print(f"  Max: {fn_errors.max():.6f}")
    print(f"  These frauds have low reconstruction error (look normal)")

if len(false_positives) > 0:
    fp_errors = test_errors[false_positives]
    print(f"\nFalse Positive Errors:")
    print(f"  Mean: {fp_errors.mean():.6f}")
    print(f"  Min: {fp_errors.min():.6f}")
    print(f"  Max: {fp_errors.max():.6f}")
    print(f"  These normal transactions have high reconstruction error (look unusual)")

## 11. Feature Reconstruction Analysis

In [None]:
# Analyze which features have highest reconstruction error for frauds
reconstructed_test = autoencoder.predict(X_test_np, verbose=0)
feature_errors = np.abs(X_test_np - reconstructed_test)

# Average error per feature for fraud vs normal
fraud_mask = y_test == 1
feature_errors_fraud = feature_errors[fraud_mask].mean(axis=0)
feature_errors_normal = feature_errors[~fraud_mask].mean(axis=0)

# Create DataFrame
feature_names = X_test.columns
error_comparison = pd.DataFrame({
    'Feature': feature_names,
    'Fraud Error': feature_errors_fraud,
    'Normal Error': feature_errors_normal,
    'Difference': feature_errors_fraud - feature_errors_normal
}).sort_values('Difference', ascending=False)

print("Top 10 Features with Highest Reconstruction Error for Frauds:")
print(error_comparison.head(10).to_string(index=False))

In [None]:
# Visualize feature reconstruction errors
plt.figure(figsize=(12, 6))
top_features = error_comparison.head(15)
x = np.arange(len(top_features))
width = 0.35

plt.bar(x - width/2, top_features['Normal Error'], width, label='Normal', alpha=0.8, color='#2ecc71')
plt.bar(x + width/2, top_features['Fraud Error'], width, label='Fraud', alpha=0.8, color='#e74c3c')

plt.xlabel('Feature', fontsize=12)
plt.ylabel('Average Reconstruction Error', fontsize=12)
plt.title('Feature Reconstruction Errors (Top 15)', fontsize=14, fontweight='bold')
plt.xticks(x, top_features['Feature'], rotation=45, ha='right')
plt.legend()
plt.grid(alpha=0.3, axis='y')
plt.tight_layout()
plt.show()

## 12. Save Autoencoder Model

In [None]:
# Save model and threshold
autoencoder.save('../models/autoencoder.h5')

threshold_info = {
    'threshold': optimal_threshold,
    'roc_auc': roc_auc,
    'pr_auc': pr_auc,
    'precision': optimal_precision,
    'recall': optimal_recall
}
joblib.dump(threshold_info, '../models/ae_threshold.joblib')

print("✓ Autoencoder saved to ../models/autoencoder.h5")
print("✓ Threshold info saved to ../models/ae_threshold.joblib")

## 13. Summary & Insights

### Key Findings:

**Advantages of Autoencoder Approach:**
- ✅ Unsupervised: doesn't need labeled fraud examples
- ✅ Can detect novel fraud patterns not in training data
- ✅ Works well when fraud evolves over time
- ✅ Useful for highly imbalanced datasets

**Limitations:**
- ⚠️ Generally lower precision than supervised methods
- ⚠️ Threshold tuning can be challenging
- ⚠️ May not capture complex fraud patterns as well as tree-based models

### Reconstruction Error Insights:
- Fraud transactions have **higher reconstruction error** on average
- Features that differ most: check `error_comparison` DataFrame above
- Some sophisticated frauds may have low error (false negatives)

### Recommendations:

1. **Ensemble Approach**: Combine autoencoder with XGBoost
   - If EITHER flags as fraud → investigate
   - Increases recall, catches more fraud types

2. **Threshold Strategy**:
   - Use different thresholds for different transaction amounts
   - Higher amounts → lower threshold (more sensitive)

3. **Continuous Learning**:
   - Retrain autoencoder monthly on recent legitimate transactions
   - Adapts to changing patterns in normal behavior

4. **Feature Engineering**:
   - Add merchant category, location, time-of-day features
   - These capture behavioral patterns better

---
**Notebook Complete** ✓