# Online Streaming Fraud Detection System

## Tutorial 7: Real-Time Fraud Detection with Online Learning

In this tutorial, you'll learn how to build production-ready streaming fraud detection systems:
- **Online Learning**: Incremental learning without retraining
- **Concept Drift**: Detecting and adapting to changing patterns
- **Real-Time Processing**: High-throughput streaming architecture
- **Adaptive Ensembles**: Dynamic model weighting and adaptation

## Learning Objectives

By the end of this tutorial, you'll understand:

1. **Online vs Batch Learning**: Why streaming systems need different approaches
2. **Concept Drift**: How fraud patterns evolve and how to detect changes
3. **Incremental Learning**: Update models with new data without full retraining
4. **Streaming Architecture**: Build scalable real-time processing systems
5. **Adaptive Ensembles**: Combine multiple models with dynamic weighting
6. **Performance Monitoring**: Track system health in real-time
7. **Production Deployment**: Handle threading, queues, and error recovery

In [None]:
# Import required libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import SGDClassifier, PassiveAggressiveClassifier
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    confusion_matrix, classification_report
)
from collections import deque, defaultdict
import threading
import queue
import time
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

# Set style for better visualizations
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")

print("Online Streaming Fraud Detection System")
print("Real-time learning and adaptation tutorial")

## Part 1: Understanding Online Learning vs Batch Learning

### The Challenge of Real-Time Fraud Detection

Traditional ML approaches (batch learning) have limitations in fraud detection:
- **Static Models**: Trained once, performance degrades over time
- **Concept Drift**: Fraud patterns constantly evolve
- **Latency**: Retraining takes hours or days
- **Scalability**: Cannot handle continuous high-volume streams

### Online Learning Advantages

Online learning addresses these challenges:
- **Incremental Updates**: Learn from each new transaction
- **Adaptation**: Automatically adjust to changing patterns
- **Low Latency**: Real-time predictions and updates
- **Memory Efficiency**: No need to store entire dataset

In [None]:
# Load and prepare data
df = pd.read_csv('creditcard.csv')
print(f"Dataset shape: {df.shape}")
print(f"Fraud rate: {df['Class'].mean()*100:.3f}%")

# Sort by time for streaming simulation
df_sorted = df.sort_values('Time').reset_index(drop=True)

# Prepare features
feature_columns = [col for col in df.columns if col not in ['Class', 'Time']]
X = df_sorted[feature_columns]
y = df_sorted['Class']
times = df_sorted['Time']

# Scale features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

print(f"\nFeatures: {len(feature_columns)}")
print(f"Transactions: {len(X):,}")
print(f"Time range: {times.min():.0f} to {times.max():.0f} seconds")

# Visualize the concept of streaming vs batch learning
def visualize_learning_paradigms():
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
    
    # Batch Learning
    ax1.text(0.5, 0.9, 'Batch Learning', ha='center', fontsize=16, fontweight='bold')
    
    # Training phase
    ax1.add_patch(plt.Rectangle((0.1, 0.7), 0.8, 0.1, facecolor='lightblue', alpha=0.7))
    ax1.text(0.5, 0.75, 'Training Phase\n(Hours/Days)', ha='center', fontsize=12)
    
    # Deployment phase
    ax1.add_patch(plt.Rectangle((0.1, 0.5), 0.8, 0.1, facecolor='lightgreen', alpha=0.7))
    ax1.text(0.5, 0.55, 'Deployment Phase\n(Static Model)', ha='center', fontsize=12)
    
    # Performance degradation
    ax1.add_patch(plt.Rectangle((0.1, 0.3), 0.8, 0.1, facecolor='lightcoral', alpha=0.7))
    ax1.text(0.5, 0.35, 'Performance Degradation\n(Concept Drift)', ha='center', fontsize=12)
    
    # Retraining
    ax1.add_patch(plt.Rectangle((0.1, 0.1), 0.8, 0.1, facecolor='lightyellow', alpha=0.7))
    ax1.text(0.5, 0.15, 'Retraining Required\n(Manual Process)', ha='center', fontsize=12)
    
    ax1.set_xlim(0, 1)
    ax1.set_ylim(0, 1)
    ax1.axis('off')
    
    # Online Learning
    ax2.text(0.5, 0.9, 'Online Learning', ha='center', fontsize=16, fontweight='bold')
    
    # Continuous learning
    ax2.add_patch(plt.Rectangle((0.1, 0.6), 0.8, 0.2, facecolor='lightgreen', alpha=0.7))
    ax2.text(0.5, 0.7, 'Continuous Learning\n(Real-time Updates)', ha='center', fontsize=12)
    
    # Adaptive performance
    ax2.add_patch(plt.Rectangle((0.1, 0.3), 0.8, 0.2, facecolor='lightblue', alpha=0.7))
    ax2.text(0.5, 0.4, 'Adaptive Performance\n(Concept Drift Handling)', ha='center', fontsize=12)
    
    # Real-time processing
    ax2.add_patch(plt.Rectangle((0.1, 0.1), 0.8, 0.1, facecolor='lightyellow', alpha=0.7))
    ax2.text(0.5, 0.15, 'Real-time Processing\n(Low Latency)', ha='center', fontsize=12)
    
    ax2.set_xlim(0, 1)
    ax2.set_ylim(0, 1)
    ax2.axis('off')
    
    plt.tight_layout()
    plt.show()
    
    print("Key Differences:")
    print("1. Batch: Train once, deploy static model")
    print("2. Online: Continuous learning from each transaction")
    print("3. Batch: Performance degrades over time")
    print("4. Online: Adapts to changing patterns automatically")

visualize_learning_paradigms()

## Part 2: Concept Drift in Fraud Detection

### Understanding Concept Drift

Concept drift occurs when the underlying data distribution changes over time:
- **Fraud Evolution**: Attackers develop new techniques
- **Seasonal Patterns**: Shopping behaviors change during holidays
- **Economic Changes**: Financial crises affect transaction patterns
- **Technology Adoption**: New payment methods emerge

### Types of Concept Drift

1. **Sudden Drift**: Abrupt change in patterns
2. **Gradual Drift**: Slow evolution over time
3. **Incremental Drift**: Small continuous changes
4. **Recurring Drift**: Cyclical patterns that repeat

In [None]:
# Simulate concept drift in fraud patterns
def simulate_concept_drift(X, y, times, drift_points=[0.3, 0.7]):
    """
    Simulate concept drift by modifying fraud patterns at specific time points.
    
    Args:
        X: Features
        y: Labels
        times: Timestamps
        drift_points: Relative time points where drift occurs
    
    Returns:
        Modified features with simulated drift
    """
    X_drift = X.copy()
    time_max = times.max()
    
    for i, drift_point in enumerate(drift_points):
        drift_time = time_max * drift_point
        
        # Find transactions after drift point
        drift_mask = (times > drift_time) & (y == 1)  # Only fraud transactions
        
        if drift_mask.sum() > 0:
            # Simulate drift by shifting fraud patterns
            shift_amount = (i + 1) * 0.5  # Increasing shift for later drifts
            
            # Shift specific features that might change in fraud patterns
            feature_indices = [0, 1, 2, 3, 4]  # V1-V5 features
            for idx in feature_indices:
                X_drift[drift_mask, idx] += shift_amount
    
    return X_drift

# Create dataset with concept drift
X_with_drift = simulate_concept_drift(X_scaled, y, times)

# Visualize concept drift
def visualize_concept_drift(X_original, X_drift, y, times):
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))
    
    # Create time windows
    time_windows = np.linspace(times.min(), times.max(), 4)
    window_labels = ['Early', 'Mid-Early', 'Mid-Late', 'Late']
    
    for i, (start_time, end_time, label) in enumerate(zip(time_windows[:-1], time_windows[1:], window_labels)):
        ax = axes[i//2, i%2]
        
        # Get transactions in this time window
        window_mask = (times >= start_time) & (times < end_time)
        fraud_mask = window_mask & (y == 1)
        normal_mask = window_mask & (y == 0)
        
        # Plot original vs drifted patterns (using first two features)
        if fraud_mask.sum() > 0:
            ax.scatter(X_original[normal_mask, 0], X_original[normal_mask, 1], 
                      alpha=0.3, s=10, label='Normal', color='blue')
            ax.scatter(X_original[fraud_mask, 0], X_original[fraud_mask, 1], 
                      alpha=0.7, s=30, label='Fraud (Original)', color='red', marker='o')
            ax.scatter(X_drift[fraud_mask, 0], X_drift[fraud_mask, 1], 
                      alpha=0.7, s=30, label='Fraud (Drifted)', color='orange', marker='s')
        
        ax.set_xlabel('Feature V1')
        ax.set_ylabel('Feature V2')
        ax.set_title(f'{label} Period')
        ax.legend()
        ax.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    print("Concept Drift Simulation:")
    print("- Orange squares show how fraud patterns change over time")
    print("- Red circles show original fraud patterns")
    print("- Blue dots show normal transactions (unchanged)")

# Visualize the drift
visualize_concept_drift(X_scaled, X_with_drift, y, times)

## Part 3: Adaptive Online Classifier

### Key Components of Online Learning

1. **Incremental Updates**: `partial_fit()` method for streaming updates
2. **Forgetting Factor**: Give higher weight to recent examples
3. **Drift Detection**: Monitor performance to detect changes
4. **Adaptive Learning**: Adjust learning rate based on performance

In [None]:
class AdaptiveOnlineClassifier:
    """
    Adaptive online classifier with concept drift detection and handling.
    
    Key features:
    - Incremental learning with forgetting factor
    - Automatic drift detection
    - Adaptive learning rate adjustment
    - Performance monitoring
    """
    
    def __init__(self, base_model_type='sgd', learning_rate=0.01, 
                 forgetting_factor=0.95, drift_threshold=0.05):
        """
        Initialize adaptive online classifier.
        
        Args:
            base_model_type: Type of base model ('sgd' or 'passive_aggressive')
            learning_rate: Initial learning rate
            forgetting_factor: Weight decay for older samples (0.9-0.99)
            drift_threshold: Performance drop threshold for drift detection
        """
        self.base_model_type = base_model_type
        self.learning_rate = learning_rate
        self.initial_learning_rate = learning_rate
        self.forgetting_factor = forgetting_factor
        self.drift_threshold = drift_threshold
        
        # Initialize base model
        if base_model_type == 'sgd':
            self.model = SGDClassifier(
                loss='log_loss',
                learning_rate='constant',
                eta0=learning_rate,
                random_state=42
            )
        elif base_model_type == 'passive_aggressive':
            self.model = PassiveAggressiveClassifier(
                C=1.0,
                random_state=42
            )
        else:
            raise ValueError(f"Unsupported model type: {base_model_type}")
        
        # Performance tracking
        self.performance_history = deque(maxlen=100)
        self.recent_performance = deque(maxlen=20)
        self.is_fitted = False
        self.drift_detected = False
        
        # Sample weighting
        self.sample_weights = deque(maxlen=1000)
        
    def partial_fit(self, X, y, sample_weight=None):
        """
        Incremental learning with forgetting factor.
        
        Args:
            X: Features (n_samples, n_features)
            y: Labels (n_samples,)
            sample_weight: Optional sample weights
        """
        # Initialize classes if first fit
        if not self.is_fitted:
            self.model.partial_fit(X, y, classes=[0, 1])
            self.is_fitted = True
        else:
            # Apply forgetting factor to sample weights
            if sample_weight is None:
                # Create exponential decay weights
                n_samples = len(X)
                weights = np.array([self.forgetting_factor ** (n_samples - i - 1) 
                                  for i in range(n_samples)])
            else:
                weights = sample_weight
            
            # Update model
            self.model.partial_fit(X, y, sample_weight=weights)
        
        # Store sample weights for analysis
        if len(X) > 0:
            self.sample_weights.extend(weights if 'weights' in locals() else [1.0] * len(X))
    
    def predict(self, X):
        """Make predictions."""
        if not self.is_fitted:
            return np.zeros(len(X))
        return self.model.predict(X)
    
    def predict_proba(self, X):
        """Predict probabilities."""
        if not self.is_fitted:
            # Return uniform probabilities
            return np.full((len(X), 2), 0.5)
        
        if hasattr(self.model, 'predict_proba'):
            return self.model.predict_proba(X)
        else:
            # For models without predict_proba, use decision function
            decisions = self.model.decision_function(X)
            # Convert to probabilities using sigmoid
            prob_pos = 1 / (1 + np.exp(-decisions))
            return np.column_stack([1 - prob_pos, prob_pos])
    
    def update_performance(self, y_true, y_pred):
        """
        Update performance metrics and detect drift.
        
        Args:
            y_true: True labels
            y_pred: Predicted labels
        """
        # Calculate current performance
        current_accuracy = accuracy_score(y_true, y_pred)
        current_f1 = f1_score(y_true, y_pred) if len(np.unique(y_true)) > 1 else 0.0
        
        # Update performance history
        self.performance_history.append(current_accuracy)
        self.recent_performance.append(current_accuracy)
        
        # Detect drift
        self.drift_detected = self._detect_drift()
        
        # Handle drift if detected
        if self.drift_detected:
            self._handle_drift()
        
        # Adapt learning rate
        self._adapt_learning_rate()
    
    def _detect_drift(self):
        """
        Detect concept drift based on performance degradation.
        
        Returns:
            bool: True if drift detected, False otherwise
        """
        if len(self.performance_history) < 50:  # Need sufficient history
            return False
        
        # Compare recent performance with historical average
        historical_avg = np.mean(list(self.performance_history)[:-20])
        recent_avg = np.mean(list(self.recent_performance))
        
        # Drift detected if recent performance significantly drops
        performance_drop = historical_avg - recent_avg
        return performance_drop > self.drift_threshold
    
    def _handle_drift(self):
        """
        Handle detected concept drift.
        
        Strategies:
        1. Increase learning rate for faster adaptation
        2. Reset performance history
        3. Reduce forgetting factor (focus more on recent data)
        """
        # Increase learning rate temporarily
        self.learning_rate = min(self.learning_rate * 2, 0.1)
        
        # Update model learning rate if possible
        if hasattr(self.model, 'eta0'):
            self.model.eta0 = self.learning_rate
        
        # Reset performance history
        self.performance_history.clear()
        self.recent_performance.clear()
        
        # Reduce forgetting factor to focus on recent data
        self.forgetting_factor = max(self.forgetting_factor * 0.9, 0.8)
        
        print(f"Drift detected! Adapted learning rate to {self.learning_rate:.4f}")
    
    def _adapt_learning_rate(self):
        """
        Adapt learning rate based on performance trends.
        
        - Increase if performance is declining
        - Decrease if performance is stable
        """
        if len(self.recent_performance) < 10:
            return
        
        # Calculate performance trend
        recent_scores = list(self.recent_performance)
        early_avg = np.mean(recent_scores[:5])
        late_avg = np.mean(recent_scores[-5:])
        
        # Adapt learning rate
        if late_avg < early_avg - 0.02:  # Performance declining
            self.learning_rate = min(self.learning_rate * 1.1, 0.1)
        elif late_avg > early_avg + 0.01:  # Performance improving
            self.learning_rate = max(self.learning_rate * 0.95, self.initial_learning_rate)
        
        # Update model learning rate
        if hasattr(self.model, 'eta0'):
            self.model.eta0 = self.learning_rate
    
    def get_status(self):
        """
        Get current status of the classifier.
        
        Returns:
            dict: Status information
        """
        return {
            'is_fitted': self.is_fitted,
            'learning_rate': self.learning_rate,
            'forgetting_factor': self.forgetting_factor,
            'drift_detected': self.drift_detected,
            'performance_history_length': len(self.performance_history),
            'recent_performance_avg': np.mean(self.recent_performance) if self.recent_performance else 0,
            'sample_weights_avg': np.mean(self.sample_weights) if self.sample_weights else 0
        }

# Test the adaptive classifier
print("Testing Adaptive Online Classifier...")

# Create classifier
classifier = AdaptiveOnlineClassifier(base_model_type='sgd', learning_rate=0.01)

# Simulate streaming learning
batch_size = 100
performance_track = []
drift_points = []

for i in range(0, min(5000, len(X_with_drift)), batch_size):
    end_idx = min(i + batch_size, len(X_with_drift))
    
    X_batch = X_with_drift[i:end_idx]
    y_batch = y.iloc[i:end_idx]
    
    # Train on batch
    classifier.partial_fit(X_batch, y_batch)
    
    # Evaluate on batch
    y_pred = classifier.predict(X_batch)
    accuracy = accuracy_score(y_batch, y_pred)
    
    # Update performance
    classifier.update_performance(y_batch, y_pred)
    
    # Track performance
    performance_track.append(accuracy)
    
    # Track drift detection
    if classifier.drift_detected:
        drift_points.append(i // batch_size)

# Visualize performance
plt.figure(figsize=(12, 6))
plt.plot(performance_track, 'b-', linewidth=2, label='Accuracy')
plt.axhline(y=np.mean(performance_track), color='g', linestyle='--', label='Average')

# Mark drift points
for drift_point in drift_points:
    plt.axvline(x=drift_point, color='r', linestyle='--', alpha=0.7)

plt.xlabel('Batch Number')
plt.ylabel('Accuracy')
plt.title('Adaptive Online Classifier Performance')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

# Print final status
print("\nFinal Classifier Status:")
status = classifier.get_status()
for key, value in status.items():
    print(f"  {key}: {value}")

print(f"\nDrift detected {len(drift_points)} times at batches: {drift_points}")

## Part 4: Online Ensemble Methods

### Why Ensembles for Online Learning?

Online ensembles provide several advantages:
- **Robustness**: Multiple models reduce individual model failures
- **Diversity**: Different models capture different patterns
- **Adaptation**: Poor-performing models get lower weights
- **Stability**: Ensemble predictions are more stable than individual models

In [None]:
class OnlineEnsemble:
    """
    Online ensemble of adaptive classifiers with dynamic weighting.
    
    Features:
    - Multiple diverse base models
    - Performance-based weighting
    - Automatic model selection
    - Robust error handling
    """
    
    def __init__(self, n_models=3, learning_rate=0.01, forgetting_factor=0.95):
        """
        Initialize online ensemble.
        
        Args:
            n_models: Number of base models
            learning_rate: Learning rate for base models
            forgetting_factor: Forgetting factor for base models
        """
        self.n_models = n_models
        self.learning_rate = learning_rate
        self.forgetting_factor = forgetting_factor
        
        # Create diverse base models
        self.models = []
        model_configs = [
            {'base_model_type': 'sgd', 'learning_rate': learning_rate},
            {'base_model_type': 'sgd', 'learning_rate': learning_rate * 0.5},
            {'base_model_type': 'passive_aggressive', 'learning_rate': learning_rate}
        ]
        
        for i in range(n_models):
            config = model_configs[i % len(model_configs)]
            model = AdaptiveOnlineClassifier(
                base_model_type=config['base_model_type'],
                learning_rate=config['learning_rate'],
                forgetting_factor=forgetting_factor
            )
            self.models.append(model)
        
        # Model weights (performance-based)
        self.model_weights = np.ones(n_models) / n_models
        self.model_performances = [deque(maxlen=50) for _ in range(n_models)]
        
        # Ensemble performance tracking
        self.ensemble_performance = deque(maxlen=100)
        
    def partial_fit(self, X, y):
        """
        Train all models in the ensemble.
        
        Args:
            X: Features
            y: Labels
        """
        # Train each model
        for model in self.models:
            try:
                model.partial_fit(X, y)
            except Exception as e:
                print(f"Error training model: {e}")
                continue
        
        # Update model weights based on performance
        self._update_weights(X, y)
    
    def _update_weights(self, X, y):
        """
        Update model weights based on recent performance.
        
        Args:
            X: Features
            y: Labels
        """
        # Evaluate each model
        for i, model in enumerate(self.models):
            try:
                if model.is_fitted:
                    y_pred = model.predict(X)
                    accuracy = accuracy_score(y, y_pred)
                    self.model_performances[i].append(accuracy)
                    
                    # Update model's performance tracking
                    model.update_performance(y, y_pred)
                else:
                    self.model_performances[i].append(0.5)  # Default performance
            except Exception as e:
                print(f"Error evaluating model {i}: {e}")
                self.model_performances[i].append(0.0)  # Penalize failed models
        
        # Calculate new weights based on recent performance
        recent_performances = []
        for i in range(self.n_models):
            if len(self.model_performances[i]) > 0:
                recent_perf = np.mean(list(self.model_performances[i])[-10:])  # Last 10 evaluations
            else:
                recent_perf = 0.5
            recent_performances.append(recent_perf)
        
        # Convert to weights (softmax with temperature)
        temperature = 2.0  # Controls weight distribution sharpness
        exp_perfs = np.exp(np.array(recent_performances) / temperature)
        self.model_weights = exp_perfs / np.sum(exp_perfs)
        
        # Ensure minimum weight to prevent models from being completely ignored
        min_weight = 0.05
        self.model_weights = np.maximum(self.model_weights, min_weight)
        self.model_weights = self.model_weights / np.sum(self.model_weights)
    
    def predict(self, X):
        """
        Make ensemble predictions.
        
        Args:
            X: Features
        
        Returns:
            Ensemble predictions
        """
        if not any(model.is_fitted for model in self.models):
            return np.zeros(len(X))
        
        # Get predictions from all models
        predictions = []
        for model in self.models:
            try:
                if model.is_fitted:
                    pred = model.predict(X)
                else:
                    pred = np.zeros(len(X))
                predictions.append(pred)
            except Exception as e:
                print(f"Error getting predictions: {e}")
                predictions.append(np.zeros(len(X)))
        
        # Weighted ensemble prediction
        predictions = np.array(predictions)
        ensemble_pred = np.average(predictions, axis=0, weights=self.model_weights)
        
        return (ensemble_pred > 0.5).astype(int)
    
    def predict_proba(self, X):
        """
        Make ensemble probability predictions.
        
        Args:
            X: Features
        
        Returns:
            Ensemble prediction probabilities
        """
        if not any(model.is_fitted for model in self.models):
            return np.full((len(X), 2), 0.5)
        
        # Get probabilities from all models
        probabilities = []
        for model in self.models:
            try:
                if model.is_fitted:
                    prob = model.predict_proba(X)
                else:
                    prob = np.full((len(X), 2), 0.5)
                probabilities.append(prob)
            except Exception as e:
                print(f"Error getting probabilities: {e}")
                probabilities.append(np.full((len(X), 2), 0.5))
        
        # Weighted ensemble probabilities
        probabilities = np.array(probabilities)
        ensemble_prob = np.average(probabilities, axis=0, weights=self.model_weights)
        
        return ensemble_prob
    
    def get_ensemble_status(self):
        """
        Get ensemble status and model information.
        
        Returns:
            dict: Ensemble status
        """
        model_statuses = []
        for i, model in enumerate(self.models):
            status = model.get_status()
            status['weight'] = self.model_weights[i]
            status['recent_performance'] = (np.mean(list(self.model_performances[i])[-10:]) 
                                          if len(self.model_performances[i]) > 0 else 0.5)
            model_statuses.append(status)
        
        return {
            'n_models': self.n_models,
            'model_weights': self.model_weights,
            'model_statuses': model_statuses,
            'ensemble_performance_length': len(self.ensemble_performance)
        }

# Test the online ensemble
print("Testing Online Ensemble...")

# Create ensemble
ensemble = OnlineEnsemble(n_models=3, learning_rate=0.01)

# Simulate streaming learning
batch_size = 100
ensemble_performance = []
weight_history = []

for i in range(0, min(5000, len(X_with_drift)), batch_size):
    end_idx = min(i + batch_size, len(X_with_drift))
    
    X_batch = X_with_drift[i:end_idx]
    y_batch = y.iloc[i:end_idx]
    
    # Train ensemble
    ensemble.partial_fit(X_batch, y_batch)
    
    # Evaluate ensemble
    y_pred = ensemble.predict(X_batch)
    accuracy = accuracy_score(y_batch, y_pred)
    
    # Track performance and weights
    ensemble_performance.append(accuracy)
    weight_history.append(ensemble.model_weights.copy())

# Visualize ensemble performance and weights
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))

# Performance plot
ax1.plot(ensemble_performance, 'b-', linewidth=2, label='Ensemble Accuracy')
ax1.axhline(y=np.mean(ensemble_performance), color='g', linestyle='--', label='Average')
ax1.set_xlabel('Batch Number')
ax1.set_ylabel('Accuracy')
ax1.set_title('Online Ensemble Performance')
ax1.legend()
ax1.grid(True, alpha=0.3)

# Weight evolution plot
weight_history = np.array(weight_history)
for i in range(ensemble.n_models):
    ax2.plot(weight_history[:, i], label=f'Model {i+1}', linewidth=2)

ax2.set_xlabel('Batch Number')
ax2.set_ylabel('Model Weight')
ax2.set_title('Model Weight Evolution')
ax2.legend()
ax2.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Print ensemble status
print("\nEnsemble Status:")
status = ensemble.get_ensemble_status()
print(f"Model weights: {status['model_weights']}")
print(f"\nIndividual model performance:")
for i, model_status in enumerate(status['model_statuses']):
    print(f"  Model {i+1}: Weight={model_status['weight']:.3f}, "
          f"Performance={model_status['recent_performance']:.3f}, "
          f"Drift={model_status['drift_detected']}")

## Part 5: Complete Streaming System

### Production-Ready Streaming Architecture

A complete streaming fraud detection system needs:
- **Threaded Processing**: Handle high-throughput streams
- **Queue Management**: Buffer transactions and handle backpressure
- **Real-time Monitoring**: Track performance and system health
- **Error Handling**: Graceful degradation and recovery

In [None]:
class StreamingFraudDetector:
    """
    Complete streaming fraud detection system.
    
    Features:
    - Real-time transaction processing
    - Threaded architecture
    - Performance monitoring
    - Adaptive learning
    """
    
    def __init__(self, model_type='ensemble', batch_size=50, update_frequency=10):
        """
        Initialize streaming fraud detector.
        
        Args:
            model_type: Type of model ('ensemble' or 'single')
            batch_size: Batch size for model updates
            update_frequency: How often to update model (in batches)
        """
        self.model_type = model_type
        self.batch_size = batch_size
        self.update_frequency = update_frequency
        
        # Initialize model
        if model_type == 'ensemble':
            self.model = OnlineEnsemble(n_models=3)
        else:
            self.model = AdaptiveOnlineClassifier()
        
        # Streaming infrastructure
        self.transaction_queue = queue.Queue(maxsize=10000)
        self.processing_thread = None
        self.is_running = False
        
        # Buffers for batch processing
        self.feature_buffer = deque(maxlen=self.batch_size * 2)
        self.label_buffer = deque(maxlen=self.batch_size * 2)
        self.prediction_buffer = deque(maxlen=1000)
        
        # Performance monitoring
        self.performance_metrics = {
            'transactions_processed': 0,
            'predictions_made': 0,
            'model_updates': 0,
            'processing_times': deque(maxlen=1000),
            'accuracies': deque(maxlen=100),
            'f1_scores': deque(maxlen=100),
            'precisions': deque(maxlen=100),
            'recalls': deque(maxlen=100)
        }
        
        # Thread synchronization
        self.lock = threading.Lock()
        
    def start_streaming(self):
        """
        Start the streaming processing thread.
        """
        if self.is_running:
            print("Streaming already running")
            return
        
        self.is_running = True
        self.processing_thread = threading.Thread(target=self._process_stream)
        self.processing_thread.daemon = True
        self.processing_thread.start()
        
        print("Streaming fraud detection started")
    
    def stop_streaming(self):
        """
        Stop the streaming processing thread.
        """
        if not self.is_running:
            print("Streaming not running")
            return
        
        self.is_running = False
        
        # Wait for thread to finish
        if self.processing_thread:
            self.processing_thread.join(timeout=5)
        
        print("Streaming fraud detection stopped")
    
    def submit_transaction(self, features, label=None, transaction_id=None):
        """
        Submit a transaction for processing.
        
        Args:
            features: Transaction features
            label: True label (if available)
            transaction_id: Optional transaction ID
        
        Returns:
            bool: True if successfully queued, False if queue full
        """
        try:
            transaction = {
                'features': features,
                'label': label,
                'transaction_id': transaction_id,
                'timestamp': time.time()
            }
            
            self.transaction_queue.put(transaction, block=False)
            return True
            
        except queue.Full:
            print("Warning: Transaction queue full, dropping transaction")
            return False
    
    def _process_stream(self):
        """
        Main processing loop (runs in separate thread).
        """
        print("Processing thread started")
        
        while self.is_running:
            try:
                # Get transaction from queue
                transaction = self.transaction_queue.get(timeout=1)
                
                # Process transaction
                self._process_single_transaction(transaction)
                
                # Update model if enough data accumulated
                if len(self.feature_buffer) >= self.batch_size:
                    self._update_model()
                
            except queue.Empty:
                continue
            except Exception as e:
                print(f"Error processing transaction: {e}")
                continue
        
        print("Processing thread stopped")
    
    def _process_single_transaction(self, transaction):
        """
        Process a single transaction.
        
        Args:
            transaction: Transaction dictionary
        """
        start_time = time.time()
        
        features = transaction['features']
        label = transaction['label']
        
        # Make prediction
        if hasattr(self.model, 'predict_proba'):
            prob = self.model.predict_proba([features])[0]
            prediction = 1 if prob[1] > 0.5 else 0
            confidence = prob[1]
        else:
            prediction = self.model.predict([features])[0]
            confidence = 0.5
        
        # Store prediction
        self.prediction_buffer.append({
            'transaction_id': transaction['transaction_id'],
            'prediction': prediction,
            'confidence': confidence,
            'timestamp': transaction['timestamp']
        })
        
        # Buffer for training (if label available)
        if label is not None:
            self.feature_buffer.append(features)
            self.label_buffer.append(label)
        
        # Update performance metrics
        with self.lock:
            self.performance_metrics['transactions_processed'] += 1
            self.performance_metrics['predictions_made'] += 1
            
            processing_time = time.time() - start_time
            self.performance_metrics['processing_times'].append(processing_time)
    
    def _update_model(self):
        """
        Update the model with buffered data.
        """
        if len(self.feature_buffer) == 0 or len(self.label_buffer) == 0:
            return
        
        # Convert buffers to arrays
        X_batch = np.array(list(self.feature_buffer))
        y_batch = np.array(list(self.label_buffer))
        
        # Update model
        self.model.partial_fit(X_batch, y_batch)
        
        # Update performance metrics
        self._update_performance_metrics(X_batch, y_batch)
        
        # Clear buffers
        self.feature_buffer.clear()
        self.label_buffer.clear()
        
        with self.lock:
            self.performance_metrics['model_updates'] += 1
    
    def _update_performance_metrics(self, X_batch, y_batch):
        """
        Update performance metrics.
        
        Args:
            X_batch: Features batch
            y_batch: Labels batch
        """
        try:
            # Make predictions
            y_pred = self.model.predict(X_batch)
            
            # Calculate metrics
            accuracy = accuracy_score(y_batch, y_pred)
            precision = precision_score(y_batch, y_pred, zero_division=0)
            recall = recall_score(y_batch, y_pred, zero_division=0)
            f1 = f1_score(y_batch, y_pred, zero_division=0)
            
            # Update metrics
            with self.lock:
                self.performance_metrics['accuracies'].append(accuracy)
                self.performance_metrics['precisions'].append(precision)
                self.performance_metrics['recalls'].append(recall)
                self.performance_metrics['f1_scores'].append(f1)
                
        except Exception as e:
            print(f"Error updating performance metrics: {e}")
    
    def get_performance_report(self):
        """
        Get comprehensive performance report.
        
        Returns:
            dict: Performance metrics
        """
        with self.lock:
            metrics = self.performance_metrics.copy()
        
        # Calculate averages
        report = {
            'transactions_processed': metrics['transactions_processed'],
            'predictions_made': metrics['predictions_made'],
            'model_updates': metrics['model_updates'],
            'avg_processing_time': np.mean(metrics['processing_times']) if metrics['processing_times'] else 0,
            'avg_accuracy': np.mean(metrics['accuracies']) if metrics['accuracies'] else 0,
            'avg_precision': np.mean(metrics['precisions']) if metrics['precisions'] else 0,
            'avg_recall': np.mean(metrics['recalls']) if metrics['recalls'] else 0,
            'avg_f1_score': np.mean(metrics['f1_scores']) if metrics['f1_scores'] else 0,
            'processing_rate': metrics['transactions_processed'] / (time.time() - (metrics['processing_times'][0] if metrics['processing_times'] else time.time())) if metrics['processing_times'] else 0
        }
        
        return report
    
    def get_recent_predictions(self, n=10):
        """
        Get recent predictions.
        
        Args:
            n: Number of recent predictions to return
        
        Returns:
            list: Recent predictions
        """
        return list(self.prediction_buffer)[-n:]

# Test the complete streaming system
print("Testing Complete Streaming System...")

# Initialize streaming detector
detector = StreamingFraudDetector(model_type='ensemble', batch_size=50)

# Start streaming
detector.start_streaming()

# Simulate streaming transactions
print("\nSimulating streaming transactions...")
n_transactions = 2000
transactions_per_second = 100

for i in range(n_transactions):
    # Get transaction
    features = X_with_drift[i]
    label = y.iloc[i]
    
    # Submit transaction
    success = detector.submit_transaction(features, label, transaction_id=f"txn_{i}")
    
    if not success:
        print(f"Failed to submit transaction {i}")
    
    # Control rate
    if i % transactions_per_second == 0:
        time.sleep(0.1)  # Brief pause
        
        # Print progress
        if i % 500 == 0:
            report = detector.get_performance_report()
            print(f"Processed {report['transactions_processed']} transactions, "
                  f"Accuracy: {report['avg_accuracy']:.3f}, "
                  f"F1: {report['avg_f1_score']:.3f}")

# Wait for processing to complete
time.sleep(2)

# Get final report
final_report = detector.get_performance_report()
print("\nFinal Performance Report:")
print("="*50)
for key, value in final_report.items():
    if isinstance(value, float):
        print(f"{key}: {value:.4f}")
    else:
        print(f"{key}: {value}")

# Stop streaming
detector.stop_streaming()

print("\nStreaming simulation completed!")

## Part 6: Performance Monitoring and Visualization

### Real-time Performance Tracking

Production streaming systems need comprehensive monitoring:

In [None]:
def simulate_streaming_fraud_detection():
    """
    Comprehensive streaming fraud detection simulation with visualization.
    """
    print("Starting comprehensive streaming simulation...")
    
    # Initialize detector
    detector = StreamingFraudDetector(model_type='ensemble', batch_size=50)
    detector.start_streaming()
    
    # Tracking variables
    performance_timeline = []
    timestamp_start = time.time()
    
    # Simulate realistic streaming
    n_transactions = 3000
    report_interval = 200
    
    for i in range(n_transactions):
        # Get transaction
        features = X_with_drift[i]
        
        # Simulate delayed labeling (common in real systems)
        # Only 30% of transactions have immediate labels
        label = y.iloc[i] if np.random.random() < 0.3 else None
        
        # Submit transaction
        detector.submit_transaction(features, label, transaction_id=f"txn_{i}")
        
        # Collect performance data
        if i % report_interval == 0 and i > 0:
            report = detector.get_performance_report()
            report['timestamp'] = time.time() - timestamp_start
            report['transaction_index'] = i
            performance_timeline.append(report)
            
            print(f"Batch {i//report_interval}: "
                  f"Processed {report['transactions_processed']}, "
                  f"Accuracy: {report['avg_accuracy']:.3f}, "
                  f"Rate: {report['processing_rate']:.1f} tx/s")
        
        # Small delay to simulate realistic timing
        if i % 50 == 0:
            time.sleep(0.01)
    
    # Wait for processing to complete
    time.sleep(3)
    
    # Final report
    final_report = detector.get_performance_report()
    final_report['timestamp'] = time.time() - timestamp_start
    final_report['transaction_index'] = n_transactions
    performance_timeline.append(final_report)
    
    # Stop streaming
    detector.stop_streaming()
    
    return performance_timeline, detector

# Run simulation
performance_timeline, detector = simulate_streaming_fraud_detection()

# Visualize performance timeline
def plot_streaming_performance(timeline):
    """
    Plot comprehensive streaming performance metrics.
    """
    if not timeline:
        print("No performance data to plot")
        return
    
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))
    
    # Extract data
    timestamps = [t['timestamp'] for t in timeline]
    accuracies = [t['avg_accuracy'] for t in timeline]
    f1_scores = [t['avg_f1_score'] for t in timeline]
    processing_times = [t['avg_processing_time'] * 1000 for t in timeline]  # Convert to ms
    processing_rates = [t['processing_rate'] for t in timeline]
    
    # Accuracy over time
    axes[0, 0].plot(timestamps, accuracies, 'b-', linewidth=2, label='Accuracy')
    axes[0, 0].plot(timestamps, f1_scores, 'r-', linewidth=2, label='F1-Score')
    axes[0, 0].set_xlabel('Time (seconds)')
    axes[0, 0].set_ylabel('Score')
    axes[0, 0].set_title('Model Performance Over Time')
    axes[0, 0].legend()
    axes[0, 0].grid(True, alpha=0.3)
    
    # Processing time
    axes[0, 1].plot(timestamps, processing_times, 'g-', linewidth=2)
    axes[0, 1].set_xlabel('Time (seconds)')
    axes[0, 1].set_ylabel('Processing Time (ms)')
    axes[0, 1].set_title('Processing Latency')
    axes[0, 1].grid(True, alpha=0.3)
    
    # Processing rate
    axes[1, 0].plot(timestamps, processing_rates, 'orange', linewidth=2)
    axes[1, 0].set_xlabel('Time (seconds)')
    axes[1, 0].set_ylabel('Transactions/second')
    axes[1, 0].set_title('Processing Rate')
    axes[1, 0].grid(True, alpha=0.3)
    
    # Cumulative transactions
    transactions = [t['transactions_processed'] for t in timeline]
    axes[1, 1].plot(timestamps, transactions, 'purple', linewidth=2)
    axes[1, 1].set_xlabel('Time (seconds)')
    axes[1, 1].set_ylabel('Transactions Processed')
    axes[1, 1].set_title('Cumulative Transactions')
    axes[1, 1].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()

# Plot performance
plot_streaming_performance(performance_timeline)

# Show recent predictions
recent_predictions = detector.get_recent_predictions(10)
print("\nRecent Predictions:")
print("="*70)
for pred in recent_predictions:
    print(f"ID: {pred['transaction_id']}, "
          f"Prediction: {pred['prediction']}, "
          f"Confidence: {pred['confidence']:.3f}")

# Final summary
if performance_timeline:
    final_perf = performance_timeline[-1]
    print("\nFinal Performance Summary:")
    print("="*70)
    print(f"Total transactions processed: {final_perf['transactions_processed']:,}")
    print(f"Average accuracy: {final_perf['avg_accuracy']:.4f}")
    print(f"Average F1-score: {final_perf['avg_f1_score']:.4f}")
    print(f"Average processing time: {final_perf['avg_processing_time']*1000:.2f} ms")
    print(f"Processing rate: {final_perf['processing_rate']:.1f} transactions/second")
    print(f"Model updates: {final_perf['model_updates']}")
    print(f"Total simulation time: {final_perf['timestamp']:.1f} seconds")

## Practice Exercises

Now it's your turn! Try these exercises to deepen your understanding:

### Exercise 1: Drift Detection Sensitivity
Experiment with different drift detection parameters:
- Modify the `drift_threshold` in `AdaptiveOnlineClassifier`
- Try different window sizes for performance monitoring
- Implement alternative drift detection methods (e.g., statistical tests)

How do these changes affect adaptation speed and stability?

In [None]:
# Your code here
# Hint: Create classifiers with different drift thresholds
# Compare their performance on the same data stream

### Exercise 2: Ensemble Composition
Experiment with different ensemble compositions:
- Try different numbers of base models (2, 5, 10)
- Mix different model types (SGD, Passive-Aggressive, etc.)
- Implement different weighting strategies

Which ensemble composition works best for your data?

In [None]:
# Your code here
# Hint: Modify the OnlineEnsemble class to accept different model configurations
# Compare performance with different ensemble sizes

### Exercise 3: Streaming Performance Optimization
Optimize the streaming system for higher throughput:
- Implement batch processing for predictions
- Add connection pooling for database updates
- Implement backpressure handling

How much can you improve the processing rate?

In [None]:
# Your code here
# Hint: Modify the StreamingFraudDetector to process transactions in batches
# Measure the impact on processing rate and latency

## Key Takeaways

### 1. Online Learning Fundamentals
- **Incremental Updates**: Learn from each new transaction without retraining
- **Memory Efficiency**: No need to store entire training dataset
- **Real-time Adaptation**: Adjust to changing patterns immediately
- **Forgetting Factor**: Give higher weight to recent examples

### 2. Concept Drift Handling
- **Detection**: Monitor performance degradation to detect drift
- **Adaptation**: Increase learning rate and focus on recent data
- **Types**: Understand sudden, gradual, and recurring drift patterns
- **Recovery**: Implement strategies to quickly adapt to new patterns

### 3. Ensemble Methods
- **Diversity**: Use different model types and configurations
- **Weighting**: Dynamically adjust model weights based on performance
- **Robustness**: Ensemble predictions are more stable than individual models
- **Error Handling**: Graceful degradation when individual models fail

### 4. Streaming Architecture
- **Threading**: Separate processing from transaction ingestion
- **Queues**: Buffer transactions to handle variable rates
- **Monitoring**: Track performance and system health in real-time
- **Scalability**: Design for high-throughput processing

### 5. Production Considerations
- **Error Recovery**: Handle failures gracefully without stopping
- **Monitoring**: Comprehensive metrics for system health
- **Latency**: Optimize for low-latency predictions
- **Backpressure**: Handle queue overflow and high load

### 6. Performance Optimization
- **Batch Processing**: Process multiple transactions together
- **Memory Management**: Use circular buffers and efficient data structures
- **Threading**: Proper synchronization and resource management
- **Caching**: Cache frequently used computations

### 7. Real-world Applications
- **Financial Services**: Credit card fraud, loan defaults, market anomalies
- **E-commerce**: Fake reviews, account takeovers, payment fraud
- **Cybersecurity**: Intrusion detection, malware analysis, network monitoring
- **IoT**: Sensor anomalies, device failures, security breaches

## Next Steps

In the next tutorial, we'll explore:
- Hybrid ensemble systems combining multiple approaches
- Advanced model selection and meta-learning
- Distributed streaming architectures
- Production deployment strategies

Remember: Online learning is essential for production fraud detection systems. The key is balancing adaptation speed with stability, and ensuring your system can handle real-world complexities like delayed labels, variable data rates, and concept drift!