
# Concept Drift Detection Benchmark Framework

**Objective**: A minimal framework for systematic evaluation of concept drift detection methods on real-world datasets.

**Framework Components**:
- Real dataset loading interface for standard concept drift benchmarks
- Unified drift detection algorithm interface
- Prequential evaluation methodology
- Statistical validation and result analysis



## Evaluation Methodology

### Classification Performance Metrics
- **Prequential Accuracy**: Classification accuracy using test-then-train evaluation protocol
- **Macro F1-Score**: Harmonic mean of precision and recall, macro-averaged across classes

### Drift Detection Performance Metrics
- **True Positive Rate**: Proportion of correctly detected concept drifts
- **False Alarm Rate**: Rate of incorrect drift detections per unit time
- **Detection Delay**: Average temporal delay between drift occurrence and detection


In [3]:

# Dependencies and Configuration

import math, random, time, warnings
from dataclasses import dataclass
from typing import List, Tuple, Optional, Dict, Any
from pathlib import Path

# Core scientific computing libraries
import numpy as np
import pandas as pd
from collections import deque, defaultdict

# Machine learning and evaluation metrics
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score

# Data visualization
import matplotlib.pyplot as plt
import seaborn as sns

# Optional: River library for additional drift detection methods
try:
    from river import drift as river_drift
    from river import tree, metrics as river_metrics
    RIVER_AVAILABLE = True
    print("River library available for extended drift detection methods")
except ImportError:
    RIVER_AVAILABLE = False
    print("River library not found. Core functionality available without River.")

# Experimental reproducibility
np.random.seed(42)
random.seed(42)
warnings.filterwarnings('ignore')

print("Concept Drift Benchmark Framework - Initialization Complete")


River library available for extended drift detection methods
Concept Drift Benchmark Framework - Initialization Complete


## Dataset Loading Interface

In [4]:
# Real-World Dataset Loading Interface

import urllib.request
import os
import zipfile
from sklearn.datasets import fetch_openml

@dataclass
class DatasetMetadata:
    """Metadata container for dataset characteristics and drift information"""
    name: str
    length: int
    n_features: int
    n_classes: int
    drift_points: List[int]
    source: str
    description: str

class BenchmarkDatasetLoader:
    """Interface for loading standard concept drift benchmark datasets"""
    
    def __init__(self, data_dir: str = "./datasets"):
        self.data_dir = Path(data_dir)
        self.data_dir.mkdir(exist_ok=True)
        
    def load_electricity(self) -> Tuple[np.ndarray, np.ndarray, List[int], DatasetMetadata]:
        """Load Electricity Market dataset (Elec2) from OpenML repository"""
        # Implementation required: Load and preprocess Electricity dataset
        raise NotImplementedError("Electricity dataset loading not yet implemented")
        
    def load_airlines(self) -> Tuple[np.ndarray, np.ndarray, List[int], DatasetMetadata]:
        """Load Airlines dataset for delay prediction with concept drift"""
        # Implementation required: Load and preprocess Airlines dataset
        raise NotImplementedError("Airlines dataset loading not yet implemented")
        
    def load_covertype(self) -> Tuple[np.ndarray, np.ndarray, List[int], DatasetMetadata]:
        """Load Forest Cover Type dataset with temporal concept drift"""
        # Implementation required: Load and preprocess CoverType dataset
        raise NotImplementedError("CoverType dataset loading not yet implemented")
        
    def get_available_datasets(self) -> List[str]:
        """Return list of supported benchmark datasets"""
        return ["electricity", "airlines", "covertype"]

print("Dataset loading interface initialized")


Dataset loading interface initialized


In [5]:

# Concrete Dataset Loader Implementations

class ElectricityDatasetLoader:
    """Concrete implementation for Electricity Market dataset (NSW Electricity Market)"""
    
    def __init__(self, base_loader: BenchmarkDatasetLoader):
        self.base_loader = base_loader
    
    def load(self) -> Tuple[np.ndarray, np.ndarray, List[int], DatasetMetadata]:
        """Load and preprocess Electricity Market dataset"""
        print("Loading Electricity Market dataset...")
        # Implementation placeholder - dataset loading logic required
        return None, None, [], DatasetMetadata(
            name="Electricity",
            length=0,
            n_features=8,
            n_classes=2,
            drift_points=[],
            source="Harries & Wales (1999), UCI ML Repository",
            description="NSW Electricity Market demand and price data with concept drift"
        )


In [6]:

class AirlinesDatasetLoader:
    """Concrete implementation for Airlines dataset for flight delay prediction"""
    
    def __init__(self, base_loader: BenchmarkDatasetLoader):
        self.base_loader = base_loader
        
    def load(self) -> Tuple[np.ndarray, np.ndarray, List[int], DatasetMetadata]:
        """Load and preprocess Airlines delay prediction dataset"""
        print("Loading Airlines dataset...")
        # Implementation placeholder - dataset loading logic required
        return None, None, [], DatasetMetadata(
            name="Airlines",
            length=0,
            n_features=7,
            n_classes=2,
            drift_points=[],
            source="Elena Ikonomovska, OpenML",
            description="Flight delay prediction with seasonal and operational concept drift"
        )


In [7]:

class CoverTypeDatasetLoader:
    """Concrete implementation for Forest Cover Type dataset"""
    
    def __init__(self, base_loader: BenchmarkDatasetLoader):
        self.base_loader = base_loader
        
    def load(self) -> Tuple[np.ndarray, np.ndarray, List[int], DatasetMetadata]:
        """Load and preprocess Forest Cover Type dataset with temporal ordering"""
        print("Loading Forest Cover Type dataset...")
        # Implementation placeholder - dataset loading logic required
        return None, None, [], DatasetMetadata(
            name="CoverType",
            length=0,
            n_features=54,
            n_classes=7,
            drift_points=[],
            source="Blackard & Dean (1999), UCI ML Repository",
            description="Forest cover type prediction with geographical and temporal concept drift"
        )


In [8]:

# Dataset Factory Pattern Implementation
def create_dataset_loader(dataset_name: str, data_dir: str = "./datasets"):
    """Factory function for instantiating appropriate dataset loaders"""
    base_loader = BenchmarkDatasetLoader(data_dir)
    
    if dataset_name.lower() == "electricity":
        return ElectricityDatasetLoader(base_loader)
    elif dataset_name.lower() == "airlines":
        return AirlinesDatasetLoader(base_loader)
    elif dataset_name.lower() == "covertype":
        return CoverTypeDatasetLoader(base_loader)
    else:
        available = ["electricity", "airlines", "covertype"]
        raise ValueError(f"Unsupported dataset: {dataset_name}. Available: {available}")

print("Dataset factory implementation ready")


Dataset factory implementation ready


## 2) Basic Online Learner

In [9]:

class OnlineGaussianNB:
    def __init__(self, n_features, n_classes=2, var_smoothing=1e-9):
        self.n_features = n_features
        self.n_classes = n_classes
        self.var_smoothing = var_smoothing
        self.counts = np.zeros(n_classes, dtype=float)
        self.means = np.zeros((n_classes, n_features), dtype=float)
        self.M2 = np.zeros((n_classes, n_features), dtype=float)
        self._eps = 1e-12

    def partial_fit(self, X, y):
        X = np.atleast_2d(X); y = np.atleast_1d(y)
        for xi, yi in zip(X, y):
            c = int(yi) if yi < self.n_classes else int(yi % self.n_classes)
            self.counts[c] += 1.0
            delta = xi - self.means[c]
            self.means[c] += delta / max(self.counts[c],1.0)
            delta2 = xi - self.means[c]
            self.M2[c] += delta * delta2

    def _vars(self):
        var = np.zeros_like(self.M2)
        for c in range(self.n_classes):
            denom = max(self.counts[c]-1.0, 1.0)
            var[c] = self.M2[c] / denom + self.var_smoothing
        return var

    def predict_proba(self, X):
        X = np.atleast_2d(X)
        var = self._vars()
        priors = (self.counts + self._eps)/(self.counts.sum() + self.n_classes*self._eps)
        logp = []
        for c in range(self.n_classes):
            # For each class c, compute log probability for all samples in X
            # var[c] and self.means[c] are 1D arrays with shape (n_features,)
            # X has shape (n_samples, n_features)
            log_var_term = -0.5 * np.sum(np.log(2*np.pi*var[c]))  # scalar
            diff_sq = ((X - self.means[c])**2) / var[c]  # shape (n_samples, n_features)
            quad_term = -0.5 * np.sum(diff_sq, axis=1)  # shape (n_samples,)
            lp = log_var_term + quad_term + np.log(priors[c]+self._eps)
            logp.append(lp)
        logp = np.vstack(logp).T
        m = np.max(logp, axis=1, keepdims=True)
        p = np.exp(logp - m); p = p/np.sum(p, axis=1, keepdims=True)
        return p

    def predict(self, X):
        return np.argmax(self.predict_proba(X), axis=1)


## 3) Drift Detection Framework

In [10]:
# Basic Drift Detector Framework
from abc import ABC, abstractmethod

class BaseDriftDetector(ABC):
    """Base interface for drift detectors"""
    
    def __init__(self, name: str):
        self.name = name
        self.alarms = []
        self.t = 0
        
    @abstractmethod
    def update(self, value) -> bool:
        """Update detector with new data. Returns True if drift detected."""
        pass
    
    def reset(self):
        """Reset detector state"""
        self.alarms = []
        self.t = 0
    
    def get_alarms(self) -> List[int]:
        """Get list of alarm timestamps"""
        return self.alarms.copy()

print("✅ Basic drift detector framework initialized")


✅ Basic drift detector framework initialized


In [11]:

# Example detector implementations (placeholders)
class DDMDetector(BaseDriftDetector):
    """Drift Detection Method (DDM) - placeholder implementation"""
    
    def __init__(self):
        super().__init__("DDM")
        # TODO: Implement DDM parameters and state
        
    def update(self, error_rate: float) -> bool:
        """Update with prediction error rate"""
        self.t += 1
        # TODO: Implement DDM logic
        # For now, return False (no drift detected)
        return False

class ADWINDetector(BaseDriftDetector):
    """ADWIN - placeholder implementation"""
    
    def __init__(self):
        super().__init__("ADWIN")
        # TODO: Implement ADWIN parameters and state
        
    def update(self, value: float) -> bool:
        """Update with new value"""
        self.t += 1
        # TODO: Implement ADWIN logic
        return False

class PageHinkleyDetector(BaseDriftDetector):
    """Page Hinkley Test - placeholder implementation"""
    
    def __init__(self):
        super().__init__("PageHinkley")
        # TODO: Implement Page-Hinkley parameters and state
        
    def update(self, value: float) -> bool:
        """Update with new value"""
        self.t += 1
        # TODO: Implement Page-Hinkley logic
        return False

# Factory function for creating detectors
def create_detector(detector_name: str) -> BaseDriftDetector:
    """Factory function to create drift detectors"""
    if detector_name.lower() == "ddm":
        return DDMDetector()
    elif detector_name.lower() == "adwin":
        return ADWINDetector()
    elif detector_name.lower() == "pagehinkley":
        return PageHinkleyDetector()
    else:
        raise ValueError(f"Unknown detector: {detector_name}")

print("✅ Detector implementations ready (placeholder)")


✅ Detector implementations ready (placeholder)


## 4) Basic Evaluation Framework

In [12]:

@dataclass
class EvaluationResult:
    """Basic evaluation result"""
    dataset_name: str
    detector_name: str
    accuracy: float
    f1_score: float
    n_detections: int
    false_alarms: int
    mean_delay: float
    runtime_s: float

def evaluate_detector(X, y, drift_points, detector, classifier):
    """Basic evaluation function for a single detector"""
    start_time = time.time()
    
    predictions = []
    true_labels = []
    
    # Prequential evaluation loop
    for i in range(len(X)):
        xi, yi = X[i], y[i]
        
        # Predict with current model
        if hasattr(classifier, 'predict') and hasattr(classifier, 'counts') and classifier.counts.sum() > 0:
            y_pred = classifier.predict(xi.reshape(1, -1))[0]
        else:
            y_pred = 0  # Default prediction
            
        predictions.append(y_pred)
        true_labels.append(yi)
        
        # Update detector
        error = 1 if y_pred != yi else 0
        is_drift = detector.update(error)
        
        if is_drift:
            detector.alarms.append(detector.t)
        
        # Update classifier
        classifier.partial_fit(xi.reshape(1, -1), [yi])
    
    # Compute metrics
    accuracy = accuracy_score(true_labels, predictions)
    f1 = f1_score(true_labels, predictions, average='macro', zero_division=0)
    
    # Compute detection metrics (simplified)
    alarms = detector.get_alarms()
    false_alarms = len(alarms)  # Simplified - all alarms considered false for now
    mean_delay = np.nan  # TODO: Implement proper delay calculation
    
    runtime = time.time() - start_time
    
    return EvaluationResult(
        dataset_name="Unknown",
        detector_name=detector.name,
        accuracy=accuracy,
        f1_score=f1,
        n_detections=len(alarms),
        false_alarms=false_alarms,
        mean_delay=mean_delay,
        runtime_s=runtime
    )

print("✅ Basic evaluation framework ready")


✅ Basic evaluation framework ready


## 5) Experiment Execution Pipeline

In [13]:
# Basic Experiment Pipeline

def run_benchmark_experiment():
    """Main function to run concept drift benchmark"""
    print("Setting up benchmark experiment...")
    
    # 1. Setup data loaders
    available_datasets = ["elec2", "airlines", "covtype"]
    print(f"Available datasets: {available_datasets}")
    
    # 2. Setup detectors  
    available_detectors = ["ddm", "adwin", "pagehinkley"]
    print(f"Available detectors: {available_detectors}")
    
    # 3. TODO: Load datasets
    print("TODO: Implement dataset loading")
    
    # 4. TODO: Initialize detectors and classifiers
    print("TODO: Initialize detectors and classifiers")
    
    # 5. TODO: Run evaluation
    print("TODO: Run prequential evaluation")
    
    # 6. TODO: Collect and save results
    print("TODO: Collect and save results")
    
    print("Benchmark experiment completed")
    
    return None

# Demo execution
if __name__ == "__main__":
    results = run_benchmark_experiment()
    
print("✅ Experiment pipeline ready")


Setting up benchmark experiment...
Available datasets: ['elec2', 'airlines', 'covtype']
Available detectors: ['ddm', 'adwin', 'pagehinkley']
TODO: Implement dataset loading
TODO: Initialize detectors and classifiers
TODO: Run prequential evaluation
TODO: Collect and save results
Benchmark experiment completed
✅ Experiment pipeline ready


In [14]:

# Example Benchmark Execution (Placeholder)

def load_sample_data():
    """Load sample data for testing"""
    # Generate small sample dataset for testing
    print("Loading sample data...")
    np.random.seed(42)
    X = np.random.randn(1000, 5)  # 1000 samples, 5 features
    y = (X[:, 0] + X[:, 1] > 0).astype(int)  # Simple binary classification
    drift_points = [300, 600]  # Simulated drift points
    
    return X, y, drift_points

def run_sample_experiment():
    """Run a basic experiment with sample data"""
    print("Running sample experiment...")
    
    # Load sample data
    X, y, drift_points = load_sample_data()
    
    # Create detector and classifier
    detector = create_detector("ddm")
    classifier = OnlineGaussianNB(n_features=X.shape[1], n_classes=2)
    
    # Run evaluation
    result = evaluate_detector(X, y, drift_points, detector, classifier)
    
    print(f"Results: Accuracy={result.accuracy:.3f}, F1={result.f1_score:.3f}")
    print(f"Detections: {result.n_detections}, Runtime: {result.runtime_s:.2f}s")
    
    return result

# Demo execution
sample_result = run_sample_experiment()
print("✅ Sample experiment completed")


Running sample experiment...
Loading sample data...
Results: Accuracy=0.946, F1=0.946
Detections: 0, Runtime: 0.14s
✅ Sample experiment completed


In [15]:
# Validation Framework (Placeholder)

def validate_results(results: List[EvaluationResult]):
    """Basic validation of experiment results"""
    print("Validating experiment results...")
    
    if not results:
        print("Warning: No results to validate")
        return False
    
    # Basic checks
    for result in results:
        if result.accuracy < 0 or result.accuracy > 1:
            print(f"Warning: Invalid accuracy for {result.detector_name}")
        if result.f1_score < 0 or result.f1_score > 1:
            print(f"Warning: Invalid F1 score for {result.detector_name}")
        if result.runtime_s < 0:
            print(f"Warning: Invalid runtime for {result.detector_name}")
    
    print("Validation completed")
    return True

def save_results(results: List[EvaluationResult], filename: str = "results.csv"):
    """Save results to CSV file"""
    print(f"Saving results to {filename}...")
    
    # Convert to DataFrame and save
    data = []
    for result in results:
        data.append({
            'dataset': result.dataset_name,
            'detector': result.detector_name,
            'accuracy': result.accuracy,
            'f1_score': result.f1_score,
            'n_detections': result.n_detections,
            'false_alarms': result.false_alarms,
            'runtime_s': result.runtime_s
        })
    
    df = pd.DataFrame(data)
    df.to_csv(filename, index=False)
    print(f"Results saved to {filename}")
    
    return df

print("✅ Validation framework ready")


✅ Validation framework ready


In [16]:
# Complete Pipeline Integration

def run_complete_pipeline(dataset_names: List[str], detector_names: List[str]):
    """Run complete benchmark pipeline"""
    print("Starting complete benchmark pipeline...")
    
    all_results = []
    
    for dataset_name in dataset_names:
        print(f"\\nProcessing dataset: {dataset_name}")
        
        # Load dataset
        try:
            dataset_loader = create_dataset_loader(dataset_name)
            X, y, drift_points, metadata = dataset_loader.load()
            
            if X is None:
                print(f"Skipping {dataset_name} - not implemented yet")
                continue
                
        except Exception as e:
            print(f"Failed to load {dataset_name}: {e}")
            continue
        
        # Test each detector
        for detector_name in detector_names:
            print(f"  Testing detector: {detector_name}")
            
            try:
                # Create detector and classifier
                detector = create_detector(detector_name)
                classifier = OnlineGaussianNB(n_features=X.shape[1], n_classes=len(np.unique(y)))
                
                # Run evaluation
                result = evaluate_detector(X, y, drift_points, detector, classifier)
                result.dataset_name = dataset_name
                all_results.append(result)
                
            except Exception as e:
                print(f"    Failed to test {detector_name}: {e}")
                continue
    
    # Validate and save results
    if all_results:
        is_valid = validate_results(all_results)
        if is_valid:
            results_df = save_results(all_results, "benchmark_results.csv")
            print(f"\\nPipeline completed! {len(all_results)} experiments run.")
            return results_df
    
    print("Pipeline completed with no valid results")
    return None

print("✅ Complete pipeline ready")


✅ Complete pipeline ready


In [17]:
# Example Usage

def demo_benchmark():
    """Demonstrate the benchmark pipeline"""
    print("=" * 50)
    print("CONCEPT DRIFT BENCHMARK DEMO")
    print("=" * 50)
    
    # Define what to test
    datasets_to_test = ["elec2", "airlines"]  # Real datasets (when implemented)
    detectors_to_test = ["ddm", "adwin"]      # Available detectors
    
    print(f"Datasets: {datasets_to_test}")
    print(f"Detectors: {detectors_to_test}")
    
    # Run the complete pipeline
    results_df = run_complete_pipeline(datasets_to_test, detectors_to_test)
    
    if results_df is not None:
        print("\\nResults summary:")
        print(results_df.head())
        print(f"\\nTotal experiments: {len(results_df)}")
    else:
        print("No results generated (datasets not implemented yet)")
    
    return results_df

# Run demo
demo_results = demo_benchmark()
print("\\n✅ Demo completed")


CONCEPT DRIFT BENCHMARK DEMO
Datasets: ['elec2', 'airlines']
Detectors: ['ddm', 'adwin']
Starting complete benchmark pipeline...
\nProcessing dataset: elec2
Failed to load elec2: Unsupported dataset: elec2. Available: ['electricity', 'airlines', 'covertype']
\nProcessing dataset: airlines
Loading Airlines dataset...
Skipping airlines - not implemented yet
Pipeline completed with no valid results
No results generated (datasets not implemented yet)
\n✅ Demo completed


In [18]:
# Basic Visualization (Placeholder)

def create_simple_plots(results_df):
    """Create basic plots for results analysis"""
    if results_df is None or len(results_df) == 0:
        print("No data to plot")
        return
    
    fig, axes = plt.subplots(2, 2, figsize=(12, 8))
    
    # 1. Accuracy comparison
    axes[0, 0].bar(results_df['detector'], results_df['accuracy'])
    axes[0, 0].set_title('Accuracy by Detector')
    axes[0, 0].set_ylabel('Accuracy')
    axes[0, 0].tick_params(axis='x', rotation=45)
    
    # 2. F1 Score comparison
    axes[0, 1].bar(results_df['detector'], results_df['f1_score'])
    axes[0, 1].set_title('F1 Score by Detector')
    axes[0, 1].set_ylabel('F1 Score')
    axes[0, 1].tick_params(axis='x', rotation=45)
    
    # 3. Detection count
    axes[1, 0].bar(results_df['detector'], results_df['n_detections'])
    axes[1, 0].set_title('Number of Detections')
    axes[1, 0].set_ylabel('Count')
    axes[1, 0].tick_params(axis='x', rotation=45)
    
    # 4. Runtime comparison
    axes[1, 1].bar(results_df['detector'], results_df['runtime_s'])
    axes[1, 1].set_title('Runtime by Detector')
    axes[1, 1].set_ylabel('Runtime (seconds)')
    axes[1, 1].tick_params(axis='x', rotation=45)
    
    plt.tight_layout()
    plt.show()

def plot_sample_results():
    """Plot results from sample experiment"""
    print("Creating sample plots...")
    # TODO: Create plots when real results are available
    print("Plotting functionality ready for real data")

plot_sample_results()
print("✅ Basic visualization framework ready")


Creating sample plots...
Plotting functionality ready for real data
✅ Basic visualization framework ready


In [19]:
# Summary

print("""
# Concept Drift Benchmark - Barebone Pipeline Summary

## Components Implemented:
✅ Real dataset loader framework (placeholders for Elec2, Airlines, CovType)
✅ Basic drift detector interface (DDM, ADWIN, PageHinkley placeholders)  
✅ Simple online learner (Gaussian Naive Bayes)
✅ Basic evaluation framework (prequential accuracy, F1-score)
✅ Validation and results export functionality
✅ Complete pipeline integration

## Next Steps (TODOs):
1. Implement actual dataset loading from real sources
2. Complete drift detector implementations 
3. Add more real datasets (Weather, Sensor data, etc.)
4. Implement proper delay calculation and drift point matching
5. Add statistical significance testing
6. Extend visualization capabilities

## Usage:
```python
# Run benchmark with real datasets (when implemented)
results = run_complete_pipeline(
    dataset_names=["elec2", "airlines"],
    detector_names=["ddm", "adwin"]
)
```

Framework is ready for implementation of actual dataset loaders and detector algorithms.
""")



# Concept Drift Benchmark - Barebone Pipeline Summary

## Components Implemented:
✅ Real dataset loader framework (placeholders for Elec2, Airlines, CovType)
✅ Basic drift detector interface (DDM, ADWIN, PageHinkley placeholders)  
✅ Simple online learner (Gaussian Naive Bayes)
✅ Basic evaluation framework (prequential accuracy, F1-score)
✅ Validation and results export functionality
✅ Complete pipeline integration

## Next Steps (TODOs):
1. Implement actual dataset loading from real sources
2. Complete drift detector implementations 
3. Add more real datasets (Weather, Sensor data, etc.)
4. Implement proper delay calculation and drift point matching
5. Add statistical significance testing
6. Extend visualization capabilities

## Usage:
```python
# Run benchmark with real datasets (when implemented)
results = run_complete_pipeline(
    dataset_names=["elec2", "airlines"],
    detector_names=["ddm", "adwin"]
)
```

Framework is ready for implementation of actual dataset loaders and det

In [20]:

# Sample data export (when results are available)
# results_df.to_csv("concept_drift_results.csv", index=False)
print("Results export functionality ready")


Results export functionality ready


### Optional: Basic Visualization Examples

In [21]:

def plot_timeline_example():
    """Example function for plotting drift detection timeline"""
    # TODO: Implement timeline plotting when real results are available
    print("Timeline plotting functionality ready")
    
    # Example plot structure:
    # plt.figure(figsize=(10, 3))
    # plt.axvline(x=drift_point, linestyle="--", color="blue", label="True Drift")  
    # plt.axvline(x=alarm_time, color="red", alpha=0.7, label="Detection Alarm")
    # plt.xlabel("Time")
    # plt.title("Drift Detection Timeline")
    # plt.legend()
    # plt.show()

plot_timeline_example()
print("✅ Timeline visualization ready")


Timeline plotting functionality ready
✅ Timeline visualization ready
