# Algorithm Analysis with the Test Harness

This notebook demonstrates how to use the algorithm test harness to:
1. Load and visualize test scenarios
2. Run algorithms against scenarios
3. Analyze and compare algorithm performance
4. Generate reports and visualizations

In [None]:
# Standard imports
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path

# Algorithm harness imports
from openpilot.selfdrive.controls.lib.tests.algorithm_harness import (
    ScenarioRunner,
    generate_synthetic_scenario,
)
from openpilot.selfdrive.controls.lib.tests.algorithm_harness.adapters import (
    LatControlPIDAdapter,
    LatControlTorqueAdapter,
)
from openpilot.selfdrive.controls.lib.tests.algorithm_harness.metrics import (
    format_metrics_table,
    compare_metrics,
)

# Configure matplotlib for notebook display
%matplotlib inline
plt.style.use('seaborn-v0_8-whitegrid')
plt.rcParams['figure.figsize'] = (12, 6)

## 1. Create Test Scenarios

Generate synthetic scenarios for testing. The harness supports various scenario types.

In [None]:
# Create a set of test scenarios
scenarios = [
    generate_synthetic_scenario(
        name="constant_target",
        duration_s=5.0,
        scenario_type="constant",
        target=0.0,
    ),
    generate_synthetic_scenario(
        name="sine_wave",
        duration_s=10.0,
        scenario_type="sine",
        amplitude=0.01,
        frequency=0.2,
    ),
    generate_synthetic_scenario(
        name="step_response",
        duration_s=5.0,
        scenario_type="step",
        step_time=2.0,
        step_value=0.005,
    ),
]

print(f"Created {len(scenarios)} scenarios:")
for s in scenarios:
    print(f"  - {s.name}: {len(s)} steps, {s.metadata.get('duration_s', 0):.1f}s")

## 2. Visualize Scenario Ground Truth

Plot the ground truth (target) values to understand what each scenario tests.

In [None]:
fig, axes = plt.subplots(1, 3, figsize=(15, 4))

for ax, scenario in zip(axes, scenarios):
    if scenario.ground_truth:
        t = np.arange(len(scenario.ground_truth)) * 0.01  # assuming 100Hz
        ax.plot(t, scenario.ground_truth, 'b-', linewidth=2)
        ax.set_xlabel('Time (s)')
        ax.set_ylabel('Target Value')
        ax.set_title(scenario.name)
        ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## 3. Run Algorithms Against Scenarios

Execute algorithms and collect performance metrics.

In [None]:
# Initialize runner and algorithms
runner = ScenarioRunner(deterministic=True)
pid_adapter = LatControlPIDAdapter()
torque_adapter = LatControlTorqueAdapter()

# Run PID algorithm
pid_results = []
for scenario in scenarios:
    result = runner.run(pid_adapter, scenario, "LatControlPID")
    pid_results.append(result)
    print(f"PID on {scenario.name}: RMSE={result.metrics.tracking_error_rmse:.6f}")

print()

# Run Torque algorithm
torque_results = []
for scenario in scenarios:
    result = runner.run(torque_adapter, scenario, "LatControlTorque")
    torque_results.append(result)
    print(f"Torque on {scenario.name}: RMSE={result.metrics.tracking_error_rmse:.6f}")

## 4. Compare Algorithm Outputs

Visualize how each algorithm tracks the ground truth.

In [None]:
fig, axes = plt.subplots(len(scenarios), 1, figsize=(12, 4*len(scenarios)))

for ax, scenario, pid_result, torque_result in zip(axes, scenarios, pid_results, torque_results):
    t = np.arange(len(scenario)) * 0.01
    
    # Plot ground truth
    if scenario.ground_truth:
        ax.plot(t, scenario.ground_truth, 'k--', label='Ground Truth', linewidth=2, alpha=0.7)
    
    # Plot algorithm outputs
    ax.plot(t[:len(pid_result.outputs)], pid_result.outputs, 'b-', label='PID', linewidth=1.5)
    ax.plot(t[:len(torque_result.outputs)], torque_result.outputs, 'r-', label='Torque', linewidth=1.5)
    
    ax.set_xlabel('Time (s)')
    ax.set_ylabel('Output')
    ax.set_title(f'{scenario.name} - Algorithm Comparison')
    ax.legend(loc='upper right')
    ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## 5. Analyze Metrics

Compare performance metrics between algorithms.

In [None]:
# Collect metrics for comparison
metrics_data = {
    'Scenario': [],
    'PID RMSE': [],
    'Torque RMSE': [],
    'PID Smoothness': [],
    'Torque Smoothness': [],
    'PID Latency (ms)': [],
    'Torque Latency (ms)': [],
}

for scenario, pid_r, torque_r in zip(scenarios, pid_results, torque_results):
    metrics_data['Scenario'].append(scenario.name)
    metrics_data['PID RMSE'].append(pid_r.metrics.tracking_error_rmse)
    metrics_data['Torque RMSE'].append(torque_r.metrics.tracking_error_rmse)
    metrics_data['PID Smoothness'].append(pid_r.metrics.output_smoothness)
    metrics_data['Torque Smoothness'].append(torque_r.metrics.output_smoothness)
    metrics_data['PID Latency (ms)'].append(pid_r.metrics.latency_mean_ms)
    metrics_data['Torque Latency (ms)'].append(torque_r.metrics.latency_mean_ms)

# Display as table
import pandas as pd
df = pd.DataFrame(metrics_data)
df.set_index('Scenario', inplace=True)
display(df.style.format('{:.6f}').background_gradient(cmap='RdYlGn_r', subset=['PID RMSE', 'Torque RMSE']))

## 6. Metrics Visualization

Create bar charts comparing key metrics.

In [None]:
fig, axes = plt.subplots(1, 3, figsize=(15, 5))

x = np.arange(len(scenarios))
width = 0.35
scenario_names = [s.name for s in scenarios]

# RMSE comparison
axes[0].bar(x - width/2, metrics_data['PID RMSE'], width, label='PID', color='#007acc')
axes[0].bar(x + width/2, metrics_data['Torque RMSE'], width, label='Torque', color='#dc3545')
axes[0].set_ylabel('RMSE')
axes[0].set_title('Tracking Error (RMSE)')
axes[0].set_xticks(x)
axes[0].set_xticklabels(scenario_names, rotation=45, ha='right')
axes[0].legend()

# Smoothness comparison
axes[1].bar(x - width/2, metrics_data['PID Smoothness'], width, label='PID', color='#007acc')
axes[1].bar(x + width/2, metrics_data['Torque Smoothness'], width, label='Torque', color='#dc3545')
axes[1].set_ylabel('Smoothness (jerk RMS)')
axes[1].set_title('Output Smoothness')
axes[1].set_xticks(x)
axes[1].set_xticklabels(scenario_names, rotation=45, ha='right')
axes[1].legend()

# Latency comparison
axes[2].bar(x - width/2, metrics_data['PID Latency (ms)'], width, label='PID', color='#007acc')
axes[2].bar(x + width/2, metrics_data['Torque Latency (ms)'], width, label='Torque', color='#dc3545')
axes[2].set_ylabel('Latency (ms)')
axes[2].set_title('Mean Update Latency')
axes[2].set_xticks(x)
axes[2].set_xticklabels(scenario_names, rotation=45, ha='right')
axes[2].legend()

plt.tight_layout()
plt.show()

## 7. Detailed Metrics Report

Generate a formatted metrics report for each algorithm.

In [None]:
# Print detailed metrics for the sine wave scenario
sine_idx = 1  # sine_wave scenario index

print(format_metrics_table(pid_results[sine_idx].metrics, "LatControlPID"))
print()
print(format_metrics_table(torque_results[sine_idx].metrics, "LatControlTorque"))

## 8. Using the Comparison API

Use the built-in comparison functionality for structured analysis.

In [None]:
# Use the runner's compare method
comparison = runner.compare(
    baseline=pid_adapter,
    candidate=torque_adapter,
    scenarios=scenarios,
    baseline_name="PID",
    candidate_name="Torque",
)

# Display aggregate comparison
print("=" * 60)
print("AGGREGATE COMPARISON: PID vs Torque")
print("=" * 60)

agg = comparison['aggregate']['comparison']
for metric, data in agg.items():
    if metric in ['tracking_error_rmse', 'output_smoothness', 'latency_mean_ms']:
        symbol = "better" if data['improved'] else "worse"
        print(f"{metric}:")
        print(f"  PID:    {data['baseline']:.6f}")
        print(f"  Torque: {data['candidate']:.6f} ({data['pct_change']:+.1f}% - {symbol})")
        print()

## 9. Implementing Custom Algorithms

Example of creating and testing a custom algorithm.

In [None]:
from openpilot.selfdrive.controls.lib.tests.algorithm_harness.interface import (
    AlgorithmState,
    AlgorithmOutput,
)

class SimpleProportionalController:
    """A simple proportional controller for demonstration."""
    
    def __init__(self, gain: float = 1.0):
        self.gain = gain
    
    def reset(self) -> None:
        pass  # No state to reset
    
    def update(self, state: AlgorithmState) -> AlgorithmOutput:
        # Simple proportional control
        output = self.gain * 0.1  # Simplified - in practice, use state values
        saturated = abs(output) >= 1.0
        output = max(-1.0, min(1.0, output))
        
        return AlgorithmOutput(
            output=output,
            saturated=saturated,
        )

# Test custom algorithm
custom_algo = SimpleProportionalController(gain=0.5)
result = runner.run(custom_algo, scenarios[0], "CustomP")

print(f"Custom algorithm results:")
print(f"  RMSE: {result.metrics.tracking_error_rmse:.6f}")
print(f"  Smoothness: {result.metrics.output_smoothness:.6f}")
print(f"  Latency: {result.metrics.latency_mean_ms:.3f} ms")

## Summary

This notebook demonstrated:

1. **Scenario Generation**: Creating synthetic test scenarios with various profiles
2. **Algorithm Execution**: Running existing and custom algorithms
3. **Metrics Collection**: Automatic collection of tracking, smoothness, and latency metrics
4. **Comparison Analysis**: Comparing algorithms across multiple scenarios
5. **Visualization**: Creating plots to understand algorithm behavior

For more details, see the [Algorithm Harness README](../README.md).