# 05. Anomaly Detection

This notebook implements anomaly detection using a Variational Autoencoder (VAE).

## Experiment Overview
- **Goal**: Detect anomalies in multivariate data using autoencoders
- **Model**: Variational Autoencoder (VAE)
- **Features**: Anomaly scoring, threshold optimization, ROC curves
- **Learning**: Understanding unsupervised anomaly detection

## What You'll Learn
- Variational Autoencoders
- Anomaly detection techniques
- ROC curve analysis
- Threshold optimization


In [None]:
# Import necessary libraries
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
import sys
import os
from sklearn.datasets import make_blobs
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import roc_auc_score, roc_curve

# Add scripts directory to path
sys.path.append('../scripts')
from utils import get_device, set_seed

# Set random seed for reproducibility
set_seed(42)

# Get device
device = get_device()
print(f"Using device: {device}")

# Generate synthetic data with anomalies
print("Generating synthetic data with anomalies...")
# Normal data
normal_data, _ = make_blobs(n_samples=1000, centers=2, n_features=2, random_state=42, cluster_std=1.0)
# Anomalous data
anomaly_data, _ = make_blobs(n_samples=50, centers=1, n_features=2, random_state=42, cluster_std=0.3)
anomaly_data += np.array([6, 6])  # Shift anomalies away from normal data

# Combine data
X = np.vstack([normal_data, anomaly_data])
y = np.hstack([np.zeros(len(normal_data)), np.ones(len(anomaly_data))])  # 0=normal, 1=anomaly

# Shuffle data
indices = np.random.permutation(len(X))
X, y = X[indices], y[indices]

# Normalize data
scaler = StandardScaler()
X = scaler.fit_transform(X)

print(f"Data shape: {X.shape}")
print(f"Normal samples: {np.sum(y == 0)}")
print(f"Anomaly samples: {np.sum(y == 1)}")

# Visualize data
plt.figure(figsize=(10, 4))
plt.subplot(1, 2, 1)
plt.scatter(X[y == 0, 0], X[y == 0, 1], c='blue', alpha=0.6, label='Normal')
plt.scatter(X[y == 1, 0], X[y == 1, 1], c='red', alpha=0.6, label='Anomaly')
plt.title('Synthetic Data with Anomalies')
plt.xlabel('Feature 1')
plt.ylabel('Feature 2')
plt.legend()
plt.grid(True)

plt.subplot(1, 2, 2)
plt.hist(y, bins=2, alpha=0.7, edgecolor='black')
plt.title('Class Distribution')
plt.xlabel('Class (0=Normal, 1=Anomaly)')
plt.ylabel('Count')
plt.xticks([0, 1])
plt.grid(True)

plt.tight_layout()
plt.show()


In [None]:
# Define Variational Autoencoder for anomaly detection
class VAE(nn.Module):
    def __init__(self, input_dim=2, hidden_dim=32, latent_dim=2):
        super(VAE, self).__init__()
        
        # Encoder
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU()
        )
        
        # Latent space parameters
        self.mu_layer = nn.Linear(hidden_dim, latent_dim)
        self.logvar_layer = nn.Linear(hidden_dim, latent_dim)
        
        # Decoder
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, input_dim)
        )
        
    def encode(self, x):
        h = self.encoder(x)
        mu = self.mu_layer(h)
        logvar = self.logvar_layer(h)
        return mu, logvar
    
    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std
    
    def decode(self, z):
        return self.decoder(z)
    
    def forward(self, x):
        mu, logvar = self.encode(x)
        z = self.reparameterize(mu, logvar)
        recon = self.decode(z)
        return recon, mu, logvar

# Create model instance
model = VAE().to(device)

# Print model summary
print("VAE Architecture:")
print(model)
print(f"\nTotal parameters: {sum(p.numel() for p in model.parameters()):,}")
print(f"Model size: {sum(p.numel() for p in model.parameters()) * 4 / 1024 / 1024:.2f} MB")


In [None]:
# Prepare data for training (only use normal data for training)
normal_indices = y == 0
X_normal = X[normal_indices]
y_normal = y[normal_indices]

# Split normal data for training and validation
train_size = int(0.8 * len(X_normal))
X_train = X_normal[:train_size]
X_val = X_normal[train_size:]

# Convert to PyTorch tensors
X_train = torch.FloatTensor(X_train).to(device)
X_val = torch.FloatTensor(X_val).to(device)
X_all = torch.FloatTensor(X).to(device)

print(f"Training samples (normal only): {len(X_train)}")
print(f"Validation samples (normal only): {len(X_val)}")
print(f"Total samples for evaluation: {len(X)}")

# VAE Loss function
def vae_loss(recon_x, x, mu, logvar):
    """VAE loss = reconstruction loss + KL divergence."""
    # Reconstruction loss (MSE)
    recon_loss = F.mse_loss(recon_x, x, reduction='sum')
    
    # KL divergence loss
    kl_loss = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    
    return recon_loss + kl_loss

# Training function
def train_vae(model, X_train, X_val, epochs=100, lr=0.001):
    """Train VAE model."""
    optimizer = optim.Adam(model.parameters(), lr=lr)
    train_losses = []
    val_losses = []
    
    for epoch in range(epochs):
        # Training
        model.train()
        train_loss = 0
        for i in range(0, len(X_train), 32):  # Batch size 32
            batch = X_train[i:i+32]
            optimizer.zero_grad()
            
            recon_batch, mu, logvar = model(batch)
            loss = vae_loss(recon_batch, batch, mu, logvar)
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
        
        # Validation
        model.eval()
        val_loss = 0
        with torch.no_grad():
            for i in range(0, len(X_val), 32):
                batch = X_val[i:i+32]
                recon_batch, mu, logvar = model(batch)
                loss = vae_loss(recon_batch, batch, mu, logvar)
                val_loss += loss.item()
        
        train_loss /= (len(X_train) // 32)
        val_loss /= (len(X_val) // 32)
        
        train_losses.append(train_loss)
        val_losses.append(val_loss)
        
        if (epoch + 1) % 20 == 0:
            print(f'Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.2f}, Val Loss: {val_loss:.2f}')
    
    return train_losses, val_losses

# Train the VAE
print("Starting VAE training...")
train_losses, val_losses = train_vae(model, X_train, X_val, epochs=100)

# Plot training history
plt.figure(figsize=(10, 4))
plt.subplot(1, 2, 1)
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Validation Loss')
plt.title('VAE Training History')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)

plt.subplot(1, 2, 2)
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Validation Loss')
plt.title('Training History (Log Scale)')
plt.xlabel('Epoch')
plt.ylabel('Loss (log)')
plt.yscale('log')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.savefig('../results/plots/vae_training.png', dpi=300, bbox_inches='tight')
plt.show()


In [None]:
# Anomaly detection using reconstruction error
def detect_anomalies(model, X, threshold_percentile=95):
    """Detect anomalies using reconstruction error."""
    model.eval()
    reconstruction_errors = []
    
    with torch.no_grad():
        for i in range(0, len(X), 32):
            batch = X[i:i+32]
            recon_batch, _, _ = model(batch)
            
            # Calculate reconstruction error for each sample
            mse = F.mse_loss(recon_batch, batch, reduction='none').sum(dim=1)
            reconstruction_errors.extend(mse.cpu().numpy())
    
    reconstruction_errors = np.array(reconstruction_errors)
    
    # Set threshold based on percentile of reconstruction errors
    threshold = np.percentile(reconstruction_errors, threshold_percentile)
    
    # Predict anomalies
    predictions = (reconstruction_errors > threshold).astype(int)
    
    return reconstruction_errors, predictions, threshold

# Detect anomalies
print("Detecting anomalies...")
recon_errors, predictions, threshold = detect_anomalies(model, X_all, threshold_percentile=95)

# Calculate metrics
auc = roc_auc_score(y, recon_errors)
print(f"Anomaly Detection Results:")
print(f"Threshold: {threshold:.4f}")
print(f"AUC Score: {auc:.4f}")
print(f"Detected anomalies: {np.sum(predictions)}")
print(f"True anomalies: {np.sum(y)}")

# Visualize results
plt.figure(figsize=(15, 10))

# Plot 1: Original data
plt.subplot(2, 3, 1)
plt.scatter(X[y == 0, 0], X[y == 0, 1], c='blue', alpha=0.6, label='Normal')
plt.scatter(X[y == 1, 0], X[y == 1, 1], c='red', alpha=0.6, label='True Anomaly')
plt.title('Original Data')
plt.xlabel('Feature 1')
plt.ylabel('Feature 2')
plt.legend()
plt.grid(True)

# Plot 2: Detected anomalies
plt.subplot(2, 3, 2)
plt.scatter(X[predictions == 0, 0], X[predictions == 0, 1], c='blue', alpha=0.6, label='Normal')
plt.scatter(X[predictions == 1, 0], X[predictions == 1, 1], c='red', alpha=0.6, label='Detected Anomaly')
plt.title('Detected Anomalies')
plt.xlabel('Feature 1')
plt.ylabel('Feature 2')
plt.legend()
plt.grid(True)

# Plot 3: Reconstruction errors
plt.subplot(2, 3, 3)
plt.scatter(X[y == 0, 0], X[y == 0, 1], c=recon_errors[y == 0], cmap='Blues', alpha=0.6, label='Normal')
plt.scatter(X[y == 1, 0], X[y == 1, 1], c=recon_errors[y == 1], cmap='Reds', alpha=0.6, label='Anomaly')
plt.colorbar(label='Reconstruction Error')
plt.title('Reconstruction Errors')
plt.xlabel('Feature 1')
plt.ylabel('Feature 2')
plt.legend()
plt.grid(True)

# Plot 4: ROC Curve
plt.subplot(2, 3, 4)
fpr, tpr, _ = roc_curve(y, recon_errors)
plt.plot(fpr, tpr, label=f'ROC Curve (AUC = {auc:.3f})')
plt.plot([0, 1], [0, 1], 'k--', label='Random')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend()
plt.grid(True)

# Plot 5: Reconstruction error distribution
plt.subplot(2, 3, 5)
plt.hist(recon_errors[y == 0], bins=30, alpha=0.7, label='Normal', color='blue')
plt.hist(recon_errors[y == 1], bins=30, alpha=0.7, label='Anomaly', color='red')
plt.axvline(threshold, color='black', linestyle='--', label=f'Threshold: {threshold:.3f}')
plt.xlabel('Reconstruction Error')
plt.ylabel('Count')
plt.title('Reconstruction Error Distribution')
plt.legend()
plt.grid(True)

# Plot 6: Latent space visualization
plt.subplot(2, 3, 6)
model.eval()
with torch.no_grad():
    mu, _ = model.encode(X_all)
    mu = mu.cpu().numpy()
    
plt.scatter(mu[y == 0, 0], mu[y == 0, 1], c='blue', alpha=0.6, label='Normal')
plt.scatter(mu[y == 1, 0], mu[y == 1, 1], c='red', alpha=0.6, label='Anomaly')
plt.xlabel('Latent Dimension 1')
plt.ylabel('Latent Dimension 2')
plt.title('Latent Space')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.savefig('../results/plots/anomaly_detection_results.png', dpi=300, bbox_inches='tight')
plt.show()

# Confusion matrix
from sklearn.metrics import confusion_matrix, classification_report
cm = confusion_matrix(y, predictions)
print(f"\nConfusion Matrix:")
print(cm)
print(f"\nClassification Report:")
print(classification_report(y, predictions, target_names=['Normal', 'Anomaly']))
