# DBSCAN for Anomaly Detection - Educational Implementation

A bare-minimum implementation of DBSCAN (Density-Based Spatial Clustering of Applications with Noise) focused on **anomaly/outlier detection**.

## Key Concept

DBSCAN naturally identifies anomalies as **noise points** - data points that don't belong to any cluster. These are points that:
- Are not dense enough to be core points
- Are not close enough to core points to be border points
- Represent outliers or anomalies in the dataset

## Algorithm Overview

1. **Parameter Selection**: Choose ε (epsilon) and MinPts
2. **Select Starting Point**: Pick an arbitrary unvisited point
3. **Examine Neighborhood**: Find all points within ε distance
4. **Expand Cluster**: Recursively add neighbors if core point
5. **Repeat**: Process all unvisited points
6. **Finalize**: Points with label -1 are **anomalies**

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from collections import deque

## Core DBSCAN Implementation with Anomaly Detection

In [None]:
class DBSCAN:
    def __init__(self, epsilon, min_pts):
        """
        Initialize DBSCAN with parameters.
        
        Parameters:
        -----------
        epsilon : float
            Maximum distance between two points to be neighbors
        min_pts : int
            Minimum number of points to form a dense region
        """
        self.epsilon = epsilon
        self.min_pts = min_pts
        self.labels = None
        
    def _euclidean_distance(self, point1, point2):
        """Calculate Euclidean distance between two points."""
        return np.sqrt(np.sum((point1 - point2) ** 2))
    
    def _get_neighbors(self, data, point_idx):
        """
        Step 3: Examine the Neighborhood
        Find all points within epsilon distance of the given point.
        
        Returns:
        --------
        List of indices of neighboring points
        """
        neighbors = []
        for idx in range(len(data)):
            if self._euclidean_distance(data[point_idx], data[idx]) <= self.epsilon:
                neighbors.append(idx)
        return neighbors
    
    def _expand_cluster(self, data, point_idx, neighbors, cluster_id, visited):
        """
        Step 4: Expand the Cluster
        Add all reachable points to the current cluster.
        
        Parameters:
        -----------
        data : array
            Dataset
        point_idx : int
            Index of the core point
        neighbors : list
            Indices of neighboring points
        cluster_id : int
            Current cluster ID
        visited : set
            Set of visited point indices
        """
        # Assign the core point to the cluster
        self.labels[point_idx] = cluster_id
        
        # Use a queue for breadth-first expansion
        queue = deque(neighbors)
        
        while queue:
            current_point = queue.popleft()
            
            # If point was noise, it's now a border point
            if self.labels[current_point] == -1:
                self.labels[current_point] = cluster_id
            
            # Skip if already processed
            if current_point in visited:
                continue
                
            visited.add(current_point)
            self.labels[current_point] = cluster_id
            
            # Find neighbors of current point
            current_neighbors = self._get_neighbors(data, current_point)
            
            # If it's a core point, add its neighbors to the queue
            if len(current_neighbors) >= self.min_pts:
                for neighbor in current_neighbors:
                    if neighbor not in visited:
                        queue.append(neighbor)
    
    def fit(self, data):
        """
        Apply DBSCAN algorithm to the dataset.
        
        Parameters:
        -----------
        data : array-like, shape (n_samples, n_features)
            Dataset to cluster
            
        Returns:
        --------
        labels : array, shape (n_samples,)
            Cluster labels (-1 for noise/anomalies, 0+ for cluster IDs)
        """
        data = np.array(data)
        n_points = len(data)
        
        # Initialize all points as noise (-1)
        self.labels = np.full(n_points, -1)
        
        # Track visited points
        visited = set()
        
        # Current cluster ID
        cluster_id = 0
        
        # Step 2 & 5: Select starting point and repeat for all points
        for point_idx in range(n_points):
            # Skip if already visited
            if point_idx in visited:
                continue
            
            visited.add(point_idx)
            
            # Step 3: Examine the neighborhood
            neighbors = self._get_neighbors(data, point_idx)
            
            # If not enough neighbors, mark as noise (for now)
            if len(neighbors) < self.min_pts:
                self.labels[point_idx] = -1
            else:
                # It's a core point - expand the cluster
                self._expand_cluster(data, point_idx, neighbors, cluster_id, visited)
                cluster_id += 1
        
        return self.labels
    
    def get_anomalies(self, data):
        """
        Identify anomalies in the dataset.
        
        Returns:
        --------
        anomaly_indices : array
            Indices of anomalous points
        anomaly_points : array
            Coordinates of anomalous points
        """
        if self.labels is None:
            raise ValueError("Must call fit() before get_anomalies()")
        
        anomaly_indices = np.where(self.labels == -1)[0]
        anomaly_points = data[anomaly_indices]
        
        return anomaly_indices, anomaly_points
    
    def predict_anomaly(self, data, new_point):
        """
        Predict if a new point is an anomaly.
        
        Parameters:
        -----------
        data : array
            Original training data
        new_point : array
            New point to classify
            
        Returns:
        --------
        is_anomaly : bool
            True if point is anomaly, False otherwise
        """
        if self.labels is None:
            raise ValueError("Must call fit() before predict_anomaly()")
        
        # Count neighbors in the original dataset
        neighbors = []
        for idx in range(len(data)):
            if self._euclidean_distance(new_point, data[idx]) <= self.epsilon:
                neighbors.append(idx)
        
        # If point has enough neighbors from normal clusters, it's not an anomaly
        if len(neighbors) >= self.min_pts:
            return False
        
        # Check if close to any cluster (border point)
        for neighbor_idx in neighbors:
            if self.labels[neighbor_idx] >= 0:  # Neighbor belongs to a cluster
                return False
        
        return True  # It's an anomaly
    
    def get_anomaly_scores(self, data):
        """
        Calculate anomaly scores based on distance to nearest cluster.
        Higher score = more anomalous.
        
        Returns:
        --------
        scores : array
            Anomaly score for each point (0 for clustered points)
        """
        if self.labels is None:
            raise ValueError("Must call fit() before get_anomaly_scores()")
        
        scores = np.zeros(len(data))
        
        for idx in range(len(data)):
            if self.labels[idx] == -1:  # Anomaly
                # Find minimum distance to any clustered point
                min_dist = float('inf')
                for other_idx in range(len(data)):
                    if self.labels[other_idx] >= 0:  # Clustered point
                        dist = self._euclidean_distance(data[idx], data[other_idx])
                        min_dist = min(min_dist, dist)
                scores[idx] = min_dist
        
        return scores

## Generate Sample Data with Anomalies

In [None]:
# Set random seed for reproducibility
np.random.seed(42)

# Create normal clusters (normal behavior)
cluster1 = np.random.randn(100, 2) * 0.5 + np.array([0, 0])
cluster2 = np.random.randn(100, 2) * 0.5 + np.array([5, 5])
cluster3 = np.random.randn(80, 2) * 0.5 + np.array([0, 5])

# Add anomalies (unusual/rare events)
anomalies = np.random.uniform(-2, 7, (25, 2))

# Combine all data
data = np.vstack([cluster1, cluster2, cluster3, anomalies])

# Track which points are actual anomalies
true_anomalies = np.zeros(len(data), dtype=bool)
true_anomalies[-25:] = True  # Last 25 points are anomalies

print(f"Generated dataset with {len(data)} points")
print(f"Normal points: {len(data) - 25}")
print(f"Anomalies: 25")

## Step 1: Parameter Selection for Anomaly Detection

In [None]:
# Set DBSCAN parameters
# For anomaly detection:
# - epsilon should capture normal density
# - min_pts should reflect minimum cluster size

epsilon = 0.5      # Maximum distance for neighborhood
min_pts = 5        # Minimum points to form dense region

print(f"Anomaly Detection Parameters:")
print(f"  ε (epsilon): {epsilon}")
print(f"  MinPts: {min_pts}")
print(f"\nInterpretation:")
print(f"  - Points with < {min_pts} neighbors within {epsilon} distance")
print(f"    will be marked as anomalies")

## Run DBSCAN for Anomaly Detection

In [None]:
# Create and fit DBSCAN
dbscan = DBSCAN(epsilon=epsilon, min_pts=min_pts)
labels = dbscan.fit(data)

# Get anomalies
anomaly_indices, anomaly_points = dbscan.get_anomalies(data)

# Get anomaly scores
anomaly_scores = dbscan.get_anomaly_scores(data)

# Count clusters and anomalies
n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
n_detected_anomalies = len(anomaly_indices)

print(f"\nResults:")
print(f"  Number of normal clusters: {n_clusters}")
print(f"  Number of anomalies detected: {n_detected_anomalies}")
print(f"  Anomaly rate: {n_detected_anomalies / len(data) * 100:.1f}%")

# Show top anomalies by score
top_anomalies = np.argsort(anomaly_scores)[-5:][::-1]
print(f"\nTop 5 Anomalies (by isolation score):")
for i, idx in enumerate(top_anomalies, 1):
    print(f"  {i}. Index {idx}: score = {anomaly_scores[idx]:.3f}")

## Visualize Anomaly Detection Results

In [None]:
# Create comprehensive visualization
fig, axes = plt.subplots(1, 3, figsize=(18, 5))

# Plot 1: Original Data with True Anomalies
axes[0].scatter(data[~true_anomalies, 0], data[~true_anomalies, 1], 
               c='blue', alpha=0.6, s=50, label='Normal')
axes[0].scatter(data[true_anomalies, 0], data[true_anomalies, 1], 
               c='red', alpha=0.8, s=100, marker='x', label='True Anomalies')
axes[0].set_title('Original Data\n(Ground Truth)')
axes[0].set_xlabel('X')
axes[0].set_ylabel('Y')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# Plot 2: DBSCAN Results
unique_labels = set(labels)
colors = plt.cm.rainbow(np.linspace(0, 1, len(unique_labels)))

for label, color in zip(unique_labels, colors):
    if label == -1:
        # Detected anomalies in red
        color = 'red'
        marker = 'x'
        label_name = 'Detected Anomalies'
        size = 100
    else:
        marker = 'o'
        label_name = f'Cluster {label}'
        size = 50
    
    mask = labels == label
    axes[1].scatter(data[mask, 0], data[mask, 1], 
                   c=[color], label=label_name, 
                   alpha=0.6, s=size, marker=marker)

axes[1].set_title(f'DBSCAN Anomaly Detection\n(ε={epsilon}, MinPts={min_pts})')
axes[1].set_xlabel('X')
axes[1].set_ylabel('Y')
axes[1].legend()
axes[1].grid(True, alpha=0.3)

# Plot 3: Anomaly Scores Heatmap
scatter = axes[2].scatter(data[:, 0], data[:, 1], 
                         c=anomaly_scores, cmap='Reds', 
                         s=100, alpha=0.7)
axes[2].set_title('Anomaly Scores\n(Distance to Nearest Cluster)')
axes[2].set_xlabel('X')
axes[2].set_ylabel('Y')
plt.colorbar(scatter, ax=axes[2], label='Anomaly Score')
axes[2].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## Evaluate Anomaly Detection Performance

In [None]:
# Compare detected anomalies with true anomalies
detected_anomalies = labels == -1

# Calculate metrics
true_positives = np.sum(detected_anomalies & true_anomalies)
false_positives = np.sum(detected_anomalies & ~true_anomalies)
false_negatives = np.sum(~detected_anomalies & true_anomalies)
true_negatives = np.sum(~detected_anomalies & ~true_anomalies)

precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

print("Anomaly Detection Performance:")
print(f"\nConfusion Matrix:")
print(f"  True Positives:  {true_positives:3d} (correctly detected anomalies)")
print(f"  False Positives: {false_positives:3d} (normal points labeled as anomalies)")
print(f"  False Negatives: {false_negatives:3d} (missed anomalies)")
print(f"  True Negatives:  {true_negatives:3d} (correctly identified normal points)")

print(f"\nMetrics:")
print(f"  Precision: {precision:.3f} (of detected anomalies, how many are real?)")
print(f"  Recall:    {recall:.3f} (of true anomalies, how many did we detect?)")
print(f"  F1-Score:  {f1_score:.3f} (harmonic mean of precision and recall)")

## Test Anomaly Prediction on New Points

In [None]:
# Test prediction on new points
test_points = [
    np.array([0.1, 0.1]),    # Should be normal (near cluster 1)
    np.array([5.1, 5.1]),    # Should be normal (near cluster 2)
    np.array([10.0, 10.0]),  # Should be anomaly (far from all clusters)
    np.array([-3.0, -3.0]),  # Should be anomaly (far from all clusters)
    np.array([2.5, 2.5])     # Could be anomaly (between clusters)
]

print("Testing New Points:")
print("=" * 60)

for i, point in enumerate(test_points, 1):
    is_anomaly = dbscan.predict_anomaly(data, point)
    status = "ANOMALY" if is_anomaly else "NORMAL"
    print(f"{i}. Point {point}: {status}")

# Visualize test points
plt.figure(figsize=(10, 8))

# Plot original clusters
for label in set(labels):
    if label == -1:
        continue
    mask = labels == label
    plt.scatter(data[mask, 0], data[mask, 1], alpha=0.4, s=30)

# Plot detected anomalies from training
mask = labels == -1
plt.scatter(data[mask, 0], data[mask, 1], c='red', 
           marker='x', s=100, alpha=0.6, label='Training Anomalies')

# Plot test points
for i, point in enumerate(test_points, 1):
    is_anomaly = dbscan.predict_anomaly(data, point)
    color = 'darkred' if is_anomaly else 'darkgreen'
    marker = 'X' if is_anomaly else 'P'
    label = f'Test {i}: {"Anomaly" if is_anomaly else "Normal"}'
    plt.scatter(point[0], point[1], c=color, marker=marker, 
               s=300, edgecolors='black', linewidths=2, 
               label=label, zorder=10)

plt.title('Testing Anomaly Prediction on New Points')
plt.xlabel('X')
plt.ylabel('Y')
plt.legend(loc='best')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

## Real-World Anomaly Detection Example: Credit Card Transactions

In [None]:
# Simulate credit card transaction data
np.random.seed(123)

# Normal transactions (low amount, frequent locations)
normal_amount = np.random.gamma(2, 30, 400)  # Typical purchase amounts
normal_distance = np.random.gamma(1.5, 5, 400)  # Distance from home

# Fraudulent transactions (unusual amounts, unusual locations)
fraud_amount = np.random.uniform(500, 2000, 20)  # High amounts
fraud_distance = np.random.uniform(50, 200, 20)  # Far from home

# Combine data
amounts = np.concatenate([normal_amount, fraud_amount])
distances = np.concatenate([normal_distance, fraud_distance])
transactions = np.column_stack([amounts, distances])

# True labels
true_fraud = np.zeros(len(transactions), dtype=bool)
true_fraud[-20:] = True

# Fit DBSCAN
fraud_detector = DBSCAN(epsilon=15, min_pts=5)
fraud_labels = fraud_detector.fit(transactions)

# Results
detected_fraud = fraud_labels == -1
tp = np.sum(detected_fraud & true_fraud)
fp = np.sum(detected_fraud & ~true_fraud)
fn = np.sum(~detected_fraud & true_fraud)

print("Credit Card Fraud Detection:")
print(f"  Total transactions: {len(transactions)}")
print(f"  Actual fraud: {np.sum(true_fraud)}")
print(f"  Detected fraud: {np.sum(detected_fraud)}")
print(f"  Correctly detected: {tp}")
print(f"  False alarms: {fp}")
print(f"  Missed fraud: {fn}")

# Visualize
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.scatter(transactions[~true_fraud, 0], transactions[~true_fraud, 1],
           c='green', alpha=0.5, label='Normal')
plt.scatter(transactions[true_fraud, 0], transactions[true_fraud, 1],
           c='red', marker='x', s=100, label='Actual Fraud')
plt.xlabel('Transaction Amount ($)')
plt.ylabel('Distance from Home (km)')
plt.title('Actual Transactions')
plt.legend()
plt.grid(True, alpha=0.3)

plt.subplot(1, 2, 2)
plt.scatter(transactions[~detected_fraud, 0], transactions[~detected_fraud, 1],
           c='green', alpha=0.5, label='Normal')
plt.scatter(transactions[detected_fraud, 0], transactions[detected_fraud, 1],
           c='red', marker='x', s=100, label='Detected Fraud')
plt.xlabel('Transaction Amount ($)')
plt.ylabel('Distance from Home (km)')
plt.title('DBSCAN Detection Results')
plt.legend()
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## Parameter Sensitivity Analysis

In [None]:
# Test different parameter combinations
param_combinations = [
    (0.3, 5),
    (0.5, 5),
    (0.8, 5),
    (0.5, 3),
    (0.5, 8)
]

fig, axes = plt.subplots(2, 3, figsize=(15, 10))
axes = axes.flatten()

for idx, (eps, min_p) in enumerate(param_combinations):
    dbscan_temp = DBSCAN(epsilon=eps, min_pts=min_p)
    labels_temp = dbscan_temp.fit(data)
    
    detected = labels_temp == -1
    tp = np.sum(detected & true_anomalies)
    fp = np.sum(detected & ~true_anomalies)
    fn = np.sum(~detected & true_anomalies)
    
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    
    # Plot
    axes[idx].scatter(data[~detected, 0], data[~detected, 1],
                     c='blue', alpha=0.4, s=30)
    axes[idx].scatter(data[detected, 0], data[detected, 1],
                     c='red', marker='x', s=100, alpha=0.7)
    
    axes[idx].set_title(f'ε={eps}, MinPts={min_p}\n'
                       f'Prec: {precision:.2f}, Rec: {recall:.2f}\n'
                       f'Detected: {np.sum(detected)}')
    axes[idx].grid(True, alpha=0.3)

axes[-1].axis('off')

plt.suptitle('Parameter Sensitivity for Anomaly Detection', fontsize=14, y=1.00)
plt.tight_layout()
plt.show()

## Summary: DBSCAN for Anomaly Detection

### Key Concepts

1. **Anomalies as Noise**: DBSCAN identifies anomalies as points with label -1 (noise)
2. **Density-Based**: Anomalies are points in low-density regions
3. **Unsupervised**: No labeled data required for training

### Main Methods

- `fit(data)`: Train DBSCAN and identify clusters/anomalies
- `get_anomalies(data)`: Extract detected anomalies
- `predict_anomaly(data, new_point)`: Classify new points
- `get_anomaly_scores(data)`: Score anomalies by isolation

### Parameter Guidelines

**ε (epsilon):**
- Too small: Many normal points become anomalies (high false positives)
- Too large: Anomalies join clusters (high false negatives)
- Start with: Average distance to k-th nearest neighbor

**MinPts:**
- Too small: Anomalies form small clusters
- Too large: Small valid clusters become anomalies
- Rule of thumb: MinPts ≥ dimensions + 1

### Advantages

✓ Finds arbitrarily shaped clusters
✓ Robust to noise
✓ No assumption about cluster shape
✓ Automatically determines number of clusters

### Limitations

✗ Sensitive to parameter selection
✗ Struggles with varying densities
✗ Computational complexity: O(n²)

### Use Cases

- Fraud detection (credit cards, insurance)
- Network intrusion detection
- Manufacturing defect detection
- Medical diagnosis (unusual patient profiles)
- Sensor data anomalies