<!--
Copyright (c) 2025 Milin Patel
Hochschule Kempten - University of Applied Sciences
-->

*Copyright (c) 2025 Milin Patel. All Rights Reserved.*

# Out-of-Distribution Detection for SOTIF

**Module 04: Safety of the Intended Functionality (ISO 21448)**

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/milinpatel07/Autonomous-Driving_AI-Safety-and-Security/blob/master/04_SOTIF/notebooks/03_ood_detection_sotif.ipynb)

**Author:** Milin Patel  
**Institution:** Hochschule Kempten - University of Applied Sciences

---

## Learning Objectives

By the end of this notebook, you will:
- Understand the role of OOD detection in SOTIF
- Learn practical OOD detection methods for perception systems
- Implement basic OOD detection techniques
- Connect OOD detection to the SOTIF safety lifecycle
- Design runtime monitors for unknown scenario detection

---

## Background

This notebook is based on research published in:

> Patel, M., Jung, R., Khatun, M. (2023). "Out-of-Distribution Detection as Support for Autonomous Driving Safety Lifecycle." REFSQ 2023.

OOD detection is critical for SOTIF because it helps identify when the system encounters scenarios outside its training distribution - potentially indicating unknown unsafe situations (Area 4 in the SOTIF quadrant model).

In [None]:
# Setup
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from scipy.spatial.distance import mahalanobis
from sklearn.neighbors import NearestNeighbors
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.datasets import make_blobs
import warnings
warnings.filterwarnings('ignore')

np.random.seed(42)
plt.style.use('seaborn-v0_8-whitegrid')
print("Setup complete.")

## 1. OOD Detection and SOTIF

### The Problem

Machine learning models for perception are trained on finite datasets. When deployed, they may encounter:
- Objects not in training data (rare vehicles, animals)
- Environmental conditions not represented (unusual weather)
- Scenes with unusual combinations of factors

### Connection to SOTIF

| SOTIF Concept | OOD Detection Role |
|---------------|--------------------|
| Unknown Unsafe (Area 4) | Detect inputs that may lead to failures |
| Triggering Conditions | Identify conditions outside training distribution |
| Functional Insufficiency | Signal when model cannot reliably process input |
| Runtime Monitoring | Continuous OOD scoring during operation |
| Safe State Transition | Trigger fallback when OOD detected |

In [None]:
def visualize_ood_sotif_connection():
    """Visualize how OOD detection supports SOTIF."""
    fig, ax = plt.subplots(figsize=(14, 6))
    
    # Draw SOTIF lifecycle with OOD integration
    stages = [
        ('Training Data', 0.1, '#3498db'),
        ('Model Training', 0.25, '#2ecc71'),
        ('OOD Detector', 0.4, '#e74c3c'),
        ('Runtime\nMonitoring', 0.55, '#9b59b6'),
        ('SOTIF\nDecision', 0.7, '#f39c12'),
        ('Safe\nBehavior', 0.85, '#1abc9c')
    ]
    
    for label, x, color in stages:
        rect = plt.Rectangle((x-0.06, 0.35), 0.12, 0.3, 
                             facecolor=color, alpha=0.8, edgecolor='black')
        ax.add_patch(rect)
        ax.text(x, 0.5, label, ha='center', va='center',
               fontsize=10, fontweight='bold', color='white')
        
        if x < 0.85:
            ax.annotate('', xy=(x+0.08, 0.5), xytext=(x+0.04, 0.5),
                       arrowprops=dict(arrowstyle='->', lw=2))
    
    # Add annotations
    ax.text(0.4, 0.15, 'OOD Score > Threshold?', ha='center', fontsize=10,
           bbox=dict(boxstyle='round', facecolor='lightyellow'))
    ax.annotate('', xy=(0.55, 0.35), xytext=(0.4, 0.2),
               arrowprops=dict(arrowstyle='->', lw=1.5, ls='--'))
    
    ax.text(0.7, 0.8, 'If OOD: Trigger fallback\nIf ID: Normal operation',
           ha='center', fontsize=9, style='italic')
    
    ax.set_xlim(0, 1)
    ax.set_ylim(0, 1)
    ax.axis('off')
    ax.set_title('OOD Detection in SOTIF Safety Lifecycle', fontsize=14, fontweight='bold')
    
    plt.tight_layout()
    plt.show()

visualize_ood_sotif_connection()

## 2. OOD Detection Methods

### Categories of OOD Detection

1. **Distance-based**: Measure distance to training data distribution
2. **Density-based**: Estimate probability density of input
3. **Model-based**: Use model outputs (softmax, features) to detect OOD
4. **Ensemble-based**: Use disagreement between models

### Methods Covered

| Method | Type | Complexity | Effectiveness |
|--------|------|------------|---------------|
| Mahalanobis Distance | Distance | Low | Good |
| k-NN Distance | Distance | Medium | Good |
| Maximum Softmax Probability | Model | Low | Moderate |
| Energy-based | Model | Low | Good |
| Ensemble Disagreement | Ensemble | High | Very Good |

## 3. Distance-Based Methods

### Mahalanobis Distance

Mahalanobis distance measures the distance of a point from a distribution, accounting for correlations:

$$D_M(x) = \sqrt{(x - \mu)^T \Sigma^{-1} (x - \mu)}$$

Where:
- $\mu$ = mean of training distribution
- $\Sigma$ = covariance matrix of training distribution

In [None]:
class MahalanobisOODDetector:
    """OOD detector using Mahalanobis distance."""
    
    def __init__(self):
        self.mean = None
        self.cov_inv = None
        self.threshold = None
    
    def fit(self, X_train, percentile=95):
        """Fit detector on training data."""
        self.mean = np.mean(X_train, axis=0)
        cov = np.cov(X_train.T)
        # Add small regularization for numerical stability
        cov += np.eye(cov.shape[0]) * 1e-6
        self.cov_inv = np.linalg.inv(cov)
        
        # Calculate threshold from training data
        train_distances = self.score(X_train)
        self.threshold = np.percentile(train_distances, percentile)
        
        return self
    
    def score(self, X):
        """Calculate Mahalanobis distance for each sample."""
        distances = []
        for x in X:
            d = mahalanobis(x, self.mean, self.cov_inv)
            distances.append(d)
        return np.array(distances)
    
    def predict(self, X):
        """Predict OOD (1) or ID (0)."""
        scores = self.score(X)
        return (scores > self.threshold).astype(int)

# Create synthetic data
# In-distribution: Normal driving features
X_train, _ = make_blobs(n_samples=500, centers=1, n_features=2, 
                        cluster_std=1.0, random_state=42)

# Test data: Mix of ID and OOD
X_test_id, _ = make_blobs(n_samples=100, centers=1, n_features=2,
                          cluster_std=1.0, center_box=(-0.5, 0.5), random_state=43)
X_test_ood = np.random.uniform(-8, 8, size=(100, 2))  # OOD: Random points

# Fit detector
detector = MahalanobisOODDetector()
detector.fit(X_train, percentile=95)

# Score test data
scores_id = detector.score(X_test_id)
scores_ood = detector.score(X_test_ood)

print(f"Threshold: {detector.threshold:.2f}")
print(f"ID scores - Mean: {scores_id.mean():.2f}, Std: {scores_id.std():.2f}")
print(f"OOD scores - Mean: {scores_ood.mean():.2f}, Std: {scores_ood.std():.2f}")

In [None]:
def visualize_mahalanobis_ood(X_train, X_test_id, X_test_ood, detector):
    """Visualize Mahalanobis-based OOD detection."""
    fig, axes = plt.subplots(1, 3, figsize=(15, 5))
    
    # Plot 1: Feature space
    axes[0].scatter(X_train[:, 0], X_train[:, 1], c='blue', alpha=0.3, 
                   label='Training (ID)', s=20)
    axes[0].scatter(X_test_id[:, 0], X_test_id[:, 1], c='green', alpha=0.6,
                   label='Test ID', s=30, marker='s')
    axes[0].scatter(X_test_ood[:, 0], X_test_ood[:, 1], c='red', alpha=0.6,
                   label='Test OOD', s=30, marker='^')
    
    # Draw ellipse at threshold
    from matplotlib.patches import Ellipse
    eigenvalues, eigenvectors = np.linalg.eig(np.linalg.inv(detector.cov_inv))
    angle = np.degrees(np.arctan2(eigenvectors[1, 0], eigenvectors[0, 0]))
    width, height = 2 * detector.threshold * np.sqrt(eigenvalues)
    ellipse = Ellipse(xy=detector.mean, width=width, height=height, angle=angle,
                     fill=False, color='purple', linewidth=2, linestyle='--',
                     label='Threshold boundary')
    axes[0].add_patch(ellipse)
    
    axes[0].set_xlabel('Feature 1')
    axes[0].set_ylabel('Feature 2')
    axes[0].set_title('Feature Space', fontweight='bold')
    axes[0].legend(loc='upper right')
    axes[0].set_xlim(-10, 10)
    axes[0].set_ylim(-10, 10)
    
    # Plot 2: Score distributions
    axes[1].hist(scores_id, bins=20, alpha=0.7, color='green', 
                label='In-Distribution', density=True)
    axes[1].hist(scores_ood, bins=20, alpha=0.7, color='red',
                label='Out-of-Distribution', density=True)
    axes[1].axvline(detector.threshold, color='purple', linestyle='--', 
                   linewidth=2, label=f'Threshold ({detector.threshold:.1f})')
    axes[1].set_xlabel('Mahalanobis Distance')
    axes[1].set_ylabel('Density')
    axes[1].set_title('Score Distribution', fontweight='bold')
    axes[1].legend()
    
    # Plot 3: Detection performance
    all_scores = np.concatenate([scores_id, scores_ood])
    all_labels = np.array([0]*len(scores_id) + [1]*len(scores_ood))
    
    thresholds = np.linspace(all_scores.min(), all_scores.max(), 100)
    tprs, fprs = [], []
    for t in thresholds:
        predictions = (all_scores > t).astype(int)
        tp = np.sum((predictions == 1) & (all_labels == 1))
        fp = np.sum((predictions == 1) & (all_labels == 0))
        fn = np.sum((predictions == 0) & (all_labels == 1))
        tn = np.sum((predictions == 0) & (all_labels == 0))
        tprs.append(tp / (tp + fn) if (tp + fn) > 0 else 0)
        fprs.append(fp / (fp + tn) if (fp + tn) > 0 else 0)
    
    axes[2].plot(fprs, tprs, 'b-', linewidth=2, label='ROC Curve')
    axes[2].plot([0, 1], [0, 1], 'k--', alpha=0.5, label='Random')
    axes[2].set_xlabel('False Positive Rate')
    axes[2].set_ylabel('True Positive Rate')
    axes[2].set_title('OOD Detection ROC', fontweight='bold')
    axes[2].legend()
    
    # Calculate AUC
    from sklearn.metrics import roc_auc_score
    auc = roc_auc_score(all_labels, all_scores)
    axes[2].text(0.6, 0.2, f'AUC: {auc:.3f}', fontsize=12, fontweight='bold')
    
    plt.tight_layout()
    plt.show()
    
    return auc

auc = visualize_mahalanobis_ood(X_train, X_test_id, X_test_ood, detector)

## 4. k-NN Based OOD Detection

The k-NN method detects OOD by measuring the average distance to k nearest neighbors in the training set.

In [None]:
class KNNOODDetector:
    """OOD detector using k-Nearest Neighbors distance."""
    
    def __init__(self, k=5):
        self.k = k
        self.nn = None
        self.threshold = None
    
    def fit(self, X_train, percentile=95):
        """Fit detector on training data."""
        self.nn = NearestNeighbors(n_neighbors=self.k + 1)  # +1 because point is its own neighbor
        self.nn.fit(X_train)
        
        # Calculate threshold from training data
        train_scores = self.score(X_train)
        self.threshold = np.percentile(train_scores, percentile)
        
        return self
    
    def score(self, X):
        """Calculate average k-NN distance for each sample."""
        distances, _ = self.nn.kneighbors(X)
        # Exclude self (first neighbor) and take mean
        return distances[:, 1:].mean(axis=1)
    
    def predict(self, X):
        """Predict OOD (1) or ID (0)."""
        scores = self.score(X)
        return (scores > self.threshold).astype(int)

# Fit k-NN detector
knn_detector = KNNOODDetector(k=10)
knn_detector.fit(X_train, percentile=95)

# Score test data
knn_scores_id = knn_detector.score(X_test_id)
knn_scores_ood = knn_detector.score(X_test_ood)

print(f"k-NN Detector (k={knn_detector.k})")
print(f"Threshold: {knn_detector.threshold:.2f}")
print(f"ID scores - Mean: {knn_scores_id.mean():.2f}, Std: {knn_scores_id.std():.2f}")
print(f"OOD scores - Mean: {knn_scores_ood.mean():.2f}, Std: {knn_scores_ood.std():.2f}")

## 5. Energy-Based OOD Detection

For neural network classifiers, energy-based detection uses the logits (pre-softmax outputs):

$$E(x) = -\log \sum_{i} e^{f_i(x)}$$

Where $f_i(x)$ are the logits for class $i$. Lower energy indicates in-distribution.

In [None]:
def energy_score(logits):
    """Calculate energy score from logits."""
    return -np.log(np.sum(np.exp(logits), axis=1))

def softmax(logits):
    """Calculate softmax probabilities."""
    exp_logits = np.exp(logits - np.max(logits, axis=1, keepdims=True))
    return exp_logits / np.sum(exp_logits, axis=1, keepdims=True)

def max_softmax_score(logits):
    """Maximum softmax probability (baseline OOD method)."""
    probs = softmax(logits)
    return np.max(probs, axis=1)

# Simulate model outputs
# ID: Confident predictions (high logits for one class)
logits_id = np.random.randn(100, 5) * 0.5
logits_id[np.arange(100), np.random.randint(0, 5, 100)] += 5  # Boost one class

# OOD: Uncertain predictions (uniform-ish logits)
logits_ood = np.random.randn(100, 5) * 1.5

# Calculate scores
energy_id = energy_score(logits_id)
energy_ood = energy_score(logits_ood)
msp_id = max_softmax_score(logits_id)
msp_ood = max_softmax_score(logits_ood)

print("Energy-based OOD Detection:")
print(f"  ID energy: {energy_id.mean():.2f} +/- {energy_id.std():.2f}")
print(f"  OOD energy: {energy_ood.mean():.2f} +/- {energy_ood.std():.2f}")
print("\nMax Softmax Probability:")
print(f"  ID MSP: {msp_id.mean():.3f} +/- {msp_id.std():.3f}")
print(f"  OOD MSP: {msp_ood.mean():.3f} +/- {msp_ood.std():.3f}")

In [None]:
def compare_ood_methods():
    """Compare different OOD detection methods."""
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))
    
    # Energy-based
    axes[0].hist(energy_id, bins=20, alpha=0.7, color='green', 
                label='ID', density=True)
    axes[0].hist(energy_ood, bins=20, alpha=0.7, color='red',
                label='OOD', density=True)
    axes[0].set_xlabel('Energy Score')
    axes[0].set_ylabel('Density')
    axes[0].set_title('Energy-Based Detection', fontweight='bold')
    axes[0].legend()
    axes[0].text(0.05, 0.95, 'Lower = more confident (ID)',
                transform=axes[0].transAxes, fontsize=9, style='italic')
    
    # Max Softmax
    axes[1].hist(msp_id, bins=20, alpha=0.7, color='green',
                label='ID', density=True)
    axes[1].hist(msp_ood, bins=20, alpha=0.7, color='red',
                label='OOD', density=True)
    axes[1].set_xlabel('Max Softmax Probability')
    axes[1].set_ylabel('Density')
    axes[1].set_title('Max Softmax Probability', fontweight='bold')
    axes[1].legend()
    axes[1].text(0.05, 0.95, 'Higher = more confident (ID)',
                transform=axes[1].transAxes, fontsize=9, style='italic')
    
    plt.tight_layout()
    plt.show()

compare_ood_methods()

## 6. Runtime OOD Monitoring for SOTIF

In practice, OOD detection must run continuously during vehicle operation. Key considerations:

1. **Latency**: Must not slow down perception pipeline
2. **Threshold selection**: Balance false alarms vs missed detections
3. **Temporal filtering**: Avoid reacting to momentary anomalies
4. **Graceful degradation**: What action to take when OOD detected

In [None]:
class RuntimeOODMonitor:
    """Runtime OOD monitor with temporal filtering."""
    
    def __init__(self, detector, window_size=5, alert_threshold=0.6):
        self.detector = detector
        self.window_size = window_size
        self.alert_threshold = alert_threshold
        self.score_history = []
        self.alert_history = []
    
    def process_frame(self, features):
        """Process a single frame and return monitoring state."""
        # Get OOD score
        score = self.detector.score(features.reshape(1, -1))[0]
        is_ood = score > self.detector.threshold
        
        # Update history
        self.score_history.append(score)
        if len(self.score_history) > self.window_size:
            self.score_history.pop(0)
        
        # Temporal filtering: Alert if majority of recent frames are OOD
        ood_ratio = sum(s > self.detector.threshold for s in self.score_history) / len(self.score_history)
        trigger_alert = ood_ratio >= self.alert_threshold
        
        self.alert_history.append(trigger_alert)
        
        return {
            'score': score,
            'is_ood': is_ood,
            'ood_ratio': ood_ratio,
            'alert': trigger_alert,
            'action': self._determine_action(ood_ratio)
        }
    
    def _determine_action(self, ood_ratio):
        """Determine appropriate action based on OOD ratio."""
        if ood_ratio >= 0.8:
            return 'CRITICAL: Request driver takeover'
        elif ood_ratio >= 0.6:
            return 'WARNING: Increase monitoring, prepare fallback'
        elif ood_ratio >= 0.4:
            return 'CAUTION: Elevated OOD activity detected'
        else:
            return 'NORMAL: Continue operation'

# Simulate runtime monitoring
monitor = RuntimeOODMonitor(detector, window_size=5, alert_threshold=0.6)

# Simulate a driving sequence
np.random.seed(123)
n_frames = 100

# Create sequence: Normal -> OOD event -> Normal
sequence = []
for i in range(n_frames):
    if 30 <= i <= 50:  # OOD event
        features = np.random.uniform(-6, 6, size=2)
    else:  # Normal
        features = np.random.randn(2) * 1.0
    sequence.append(features)

# Process sequence
results = []
for i, features in enumerate(sequence):
    result = monitor.process_frame(features)
    result['frame'] = i
    results.append(result)

results_df = pd.DataFrame(results)
print("Sample monitoring results:")
display(results_df[['frame', 'score', 'is_ood', 'ood_ratio', 'action']].iloc[25:55:3])

In [None]:
def visualize_runtime_monitoring(results_df):
    """Visualize runtime OOD monitoring results."""
    fig, axes = plt.subplots(3, 1, figsize=(14, 10), sharex=True)
    
    frames = results_df['frame']
    
    # Plot 1: OOD scores
    axes[0].plot(frames, results_df['score'], 'b-', linewidth=1, alpha=0.7)
    axes[0].axhline(detector.threshold, color='red', linestyle='--', 
                   linewidth=2, label=f'Threshold ({detector.threshold:.1f})')
    axes[0].fill_between(frames, results_df['score'], detector.threshold,
                        where=results_df['score'] > detector.threshold,
                        color='red', alpha=0.3, label='OOD region')
    axes[0].set_ylabel('OOD Score')
    axes[0].set_title('Real-time OOD Score', fontweight='bold')
    axes[0].legend(loc='upper right')
    axes[0].axvspan(30, 50, alpha=0.2, color='red', label='OOD Event')
    
    # Plot 2: OOD ratio (temporal filter)
    axes[1].plot(frames, results_df['ood_ratio'], 'purple', linewidth=2)
    axes[1].axhline(0.6, color='orange', linestyle='--', linewidth=2, 
                   label='Alert threshold (0.6)')
    axes[1].fill_between(frames, results_df['ood_ratio'], 0.6,
                        where=results_df['ood_ratio'] >= 0.6,
                        color='orange', alpha=0.3)
    axes[1].set_ylabel('OOD Ratio')
    axes[1].set_title('Temporal Filtered OOD Ratio (window=5)', fontweight='bold')
    axes[1].legend(loc='upper right')
    axes[1].set_ylim(0, 1.1)
    axes[1].axvspan(30, 50, alpha=0.2, color='red')
    
    # Plot 3: Alerts and actions
    alert_frames = results_df[results_df['alert'] == True]['frame']
    axes[2].scatter(alert_frames, [1]*len(alert_frames), c='red', s=50, marker='|')
    axes[2].set_ylabel('Alert')
    axes[2].set_xlabel('Frame')
    axes[2].set_title('Alert Signals', fontweight='bold')
    axes[2].set_yticks([0, 1])
    axes[2].set_yticklabels(['Normal', 'Alert'])
    axes[2].axvspan(30, 50, alpha=0.2, color='red', label='OOD Event Period')
    axes[2].legend(loc='upper right')
    
    plt.tight_layout()
    plt.show()

visualize_runtime_monitoring(results_df)

## 7. Exercise: Implement Custom OOD Detector

**Task:** Implement an OOD detector using Local Outlier Factor (LOF) or another method of your choice.

In [None]:
# Exercise: Implement your own OOD detector

# TODO: Create a class similar to MahalanobisOODDetector
# using sklearn.neighbors.LocalOutlierFactor or another method

# class LOFOODDetector:
#     def __init__(self, ...):
#         pass
#     
#     def fit(self, X_train):
#         pass
#     
#     def score(self, X):
#         pass
#     
#     def predict(self, X):
#         pass

print("Implement your OOD detector in the cell above.")

## Summary

In this notebook, you learned:

- **OOD Detection Role**: Critical for identifying SOTIF Area 4 (Unknown Unsafe) scenarios
- **Detection Methods**: Distance-based (Mahalanobis, k-NN), model-based (Energy, MSP)
- **Runtime Monitoring**: Temporal filtering and action policies for deployment
- **Practical Implementation**: Python implementations of key methods

### Key Insight

> OOD detection is not about achieving perfect detection, but about providing an additional safety layer that can trigger appropriate fallback behaviors when the system encounters unfamiliar situations.

### References

- Patel, M., Jung, R., Khatun, M. (2023). "Out-of-Distribution Detection as Support for Autonomous Driving Safety Lifecycle." REFSQ.
- Lee, K., et al. (2018). "A Simple Unified Framework for Detecting Out-of-Distribution Samples." NeurIPS.
- Liu, W., et al. (2020). "Energy-based Out-of-distribution Detection." NeurIPS.

---

*Notebook created by Milin Patel | Hochschule Kempten*  
*Last updated: 2025-01-22*