In [1]:
import os
import bisect
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict
from openpilot.tools.lib.logreader import LogReader

import matplotlib.pyplot as plt
import matplotlib.widgets as widgets
import numpy as np
from ipywidgets import FloatSlider, Checkbox, VBox, HBox, Output
import ipywidgets as widgets
from IPython.display import display, clear_output

def sig_centered(z):
    pos = 1.0 / (1.0 + np.exp(-z)) - 0.5
    neg = np.exp(z) / (1.0 + np.exp(z)) - 0.5
    return np.where(z >= 0.0, pos, neg)

def model(x, a, b, c, d, e):
    xs = x - d
    return sig_centered(a * xs) * b + c * xs + e

def extract_torque_data(folder_path, min_vego=1.0, delay=0.2):
    """Extract torque, lateral acceleration, and vego data from all rlogs in subfolders"""
    points = []

    # Find all rlog files in subfolders
    rlog_files = []
    for root, dirs, files in os.walk(folder_path):
        for file in files:
            if file.endswith('.rlog') or file.endswith('.rlog.bz2') or file.endswith('rlog.zst'):
                rlog_files.append(os.path.join(root, file))

    print(f"Found {len(rlog_files)} rlog files")

    for rlog_file in rlog_files:
        print(f"Processing {rlog_file}")
        try:
            lr = LogReader(rlog_file)
            points.extend(process_single_log(lr, min_vego, delay))
        except Exception as e:
            print(f"Error processing {rlog_file}: {e}")
            continue

    return np.array(points)

def process_single_log(lr, min_vego, delay):
    """Process a single log file using fast binary search for closest values"""
    points = []

    # Collect all messages by type with timestamps
    messages_by_type = {
        'carControl': [],
        'carOutput': [],
        'carState': [],
        'controlsState': [],
        'liveParameters': []
    }

    # First pass: collect all messages
    for msg in lr:
        t = msg.logMonoTime * 1e-9
        msg_type = msg.which()

        if msg_type in messages_by_type:
            messages_by_type[msg_type].append((t, msg))

    # Sort by timestamp for binary search (already sorted in most cases)
    for msg_type in messages_by_type:
        messages_by_type[msg_type].sort(key=lambda x: x[0])

    # Pre-extract timestamps for faster binary search
    timestamps = {
        msg_type: [t for t, _ in msg_list]
        for msg_type, msg_list in messages_by_type.items()
    }

    # Process controlsState messages as the primary driver
    for t, controls_msg in messages_by_type['controlsState']:
        try:
            # Use binary search for O(log n) lookups
            lat_active = find_closest_value_fast(messages_by_type['carControl'],
                                               timestamps['carControl'], t,
                                               lambda msg: msg.carControl.latActive)
            steer_torque = find_closest_value_fast(messages_by_type['carOutput'],
                                                 timestamps['carOutput'], t - delay,
                                                 lambda msg: -msg.carOutput.actuatorsOutput.torque)
            vego = find_closest_value_fast(messages_by_type['carState'],
                                         timestamps['carState'], t,
                                         lambda msg: msg.carState.vEgo)
            steer_override = find_closest_value_fast(messages_by_type['carState'],
                                                   timestamps['carState'], t,
                                                   lambda msg: msg.carState.steeringPressed)
            curvature = controls_msg.controlsState.curvature
            roll = find_closest_value_fast(messages_by_type['liveParameters'],
                                         timestamps['liveParameters'], t,
                                         lambda msg: msg.liveParameters.roll)

            # Calculate lateral acceleration using the requested formula
            lateral_acc = (curvature * vego ** 2) - (roll * 9.81)

            LOW_SPEED_X = [0, 10, 20, 30]
            LOW_SPEED_Y = [15, 13, 10, 5]
            low_speed_factor = np.interp(vego, LOW_SPEED_X, LOW_SPEED_Y)**2

            lateral_acc += low_speed_factor * curvature

            # Filter conditions
            if (lat_active and not steer_override and
                vego > min_vego and abs(steer_torque) > 0.001 and
                abs(lateral_acc) <= 5.0):

                points.append([steer_torque, lateral_acc, vego])

        except (ValueError, IndexError, AttributeError):
            continue

    return points

def find_closest_value_fast(msg_list, timestamps, target_time, value_extractor):
    """Fast O(log n) closest value lookup using binary search"""
    if not msg_list:
        raise ValueError("Empty message list")

    target_time = target_time # delay

    # Use binary search to find insertion point
    idx = bisect.bisect_left(timestamps, target_time)

    # Check the closest candidates
    candidates = []
    if idx > 0:
        candidates.append((idx - 1, abs(timestamps[idx - 1] - target_time)))
    if idx < len(timestamps):
        candidates.append((idx, abs(timestamps[idx] - target_time)))

    # Find the closest one
    if not candidates:
        raise ValueError("No candidates found")

    closest_idx = min(candidates, key=lambda x: x[1])[0]
    return value_extractor(msg_list[closest_idx][1])

def bucket_by_speed(points, bucket_size=5.0):
    """Bucket points by speed in 5 m/s intervals"""
    if len(points) == 0:
        return {}

    bucketed_data = defaultdict(list)

    for steer, lat_acc, vego in points:
        bucket = int(vego // bucket_size) * bucket_size
        bucketed_data[bucket].append([steer, lat_acc])

    return dict(bucketed_data)

def create_interactive_plot_jupyter(bucketed_data):
    """Create interactive plot with model overlay for Jupyter notebooks"""

    # Create output widget for the plot
    plot_output = Output()

    # Create checkboxes for speed buckets
    speed_checkboxes = {}
    checkbox_widgets = []

    colors = plt.cm.viridis(np.linspace(0, 1, len(bucketed_data)))
    color_map = {}

    for i, (speed_bucket, points) in enumerate(sorted(bucketed_data.items())):
        color_map[speed_bucket] = colors[i]
        checkbox = Checkbox(
            value=True,
            description=f'{speed_bucket:.0f}-{speed_bucket+5:.0f} m/s ({len(points)} pts)',
            style={'description_width': 'initial'}
        )
        speed_checkboxes[speed_bucket] = checkbox
        checkbox_widgets.append(checkbox)

    # Model parameter sliders
    a_slider = FloatSlider(value=5, min=0.1, max=10.0, step=0.1, description='a (sharpness):')
    b_slider = FloatSlider(value=1.0, min=0.1, max=3.0, step=0.05, description='b (amplitude):')
    c_slider = FloatSlider(value=0.2, min=-1.0, max=1.0, step=0.01, description='c (slope):')
    d_slider = FloatSlider(value=-0.0, min=-1.0, max=1.0, step=0.01, description='d (h shift):')
    e_slider = FloatSlider(value=-0.00, min=-1.0, max=1.0, step=0.01, description='e (v shift):')
    f_slider = FloatSlider(value=20, min=0, max=40.0, step=2.5, description='vego:')

    show_model_checkbox = Checkbox(value=True, description='Show Model')

    def update_plot(*args):
        with plot_output:
            clear_output(wait=True)

            fig, ax = plt.subplots(figsize=(12, 8))

            # Plot data points for enabled speed buckets
            for speed_bucket, points in sorted(bucketed_data.items()):
                if speed_checkboxes[speed_bucket].value and len(points) > 0:
                    points_array = np.array(points)
                    steer = points_array[:, 0]  # steering torque (y-axis)
                    lat_acc = points_array[:, 1]  # lateral acceleration (x-axis)

                    # Flip axes: lat_acc on x, steer on y
                    ax.scatter(lat_acc, steer, c=[color_map[speed_bucket]],
                             label=f'{speed_bucket:.0f}-{speed_bucket+5:.0f} m/s ({len(points)} points)',
                             alpha=0.3, s=1)

            # Plot model if enabled
            if show_model_checkbox.value:
                # Get current parameter values
                a_val = a_slider.value
                b_val = b_slider.value
                c_val = c_slider.value
                d_val = d_slider.value
                e_val = e_slider.value
                v_ego = f_slider.value

                # Create x range for model
                all_lat_acc = []
                for points in bucketed_data.values():
                    if len(points) > 0:
                        points_array = np.array(points)
                        all_lat_acc.extend(points_array[:, 1])

                if all_lat_acc:
                    x_min, x_max = -3.5, 3.5#min(all_lat_acc), max(all_lat_acc)
                    x_range = np.linspace(x_min, x_max, 200)
                    y_model = model(x_range, a_val, b_val, c_val, d_val, e_val)
                    ax.plot(x_range, y_model, 'r-', linewidth=2)

            ax.set_xlabel('Lateral Acceleration (m/s²)')
            ax.set_ylabel('Steering Torque')
            ax.set_title('Lateral Acceleration vs Steering Torque by Speed')
            ax.grid(True, alpha=0.3)
            ax.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
            ax.set_xlim(-3.5, 3.5)
            ax.set_ylim(-1, 1)

            plt.tight_layout()
            plt.show()

    # Connect all widgets to update function
    for checkbox in speed_checkboxes.values():
        checkbox.observe(update_plot, names='value')

    a_slider.observe(update_plot, names='value')
    b_slider.observe(update_plot, names='value')
    c_slider.observe(update_plot, names='value')
    d_slider.observe(update_plot, names='value')
    e_slider.observe(update_plot, names='value')
    f_slider.observe(update_plot, names='value')
    show_model_checkbox.observe(update_plot, names='value')

    # Create layout
    model_controls = VBox([
        show_model_checkbox,
        a_slider,
        b_slider,
        c_slider,
        d_slider,
        e_slider,
        f_slider
    ])

    speed_controls = VBox([
        widgets.HTML("<b>Speed Buckets:</b>")
    ] + checkbox_widgets)

    controls = HBox([model_controls, speed_controls])

    # Initial plot
    update_plot()

    # Display everything
    display(controls)
    display(plot_output)

    return {
        'speed_checkboxes': speed_checkboxes,
        'model_params': {
            'a': a_slider,
            'b': b_slider,
            'c': c_slider,
            'd': d_slider,
            'e': e_slider,
            'f': f_slider
        },
        'show_model': show_model_checkbox
    }

In [2]:
folder_path = "./logs" # set to a folder containing rlogs
min_speed = 1.0
delay = 0.35 # set to your steer actuator delay
print(f"Extracting data from {folder_path}")
points = extract_torque_data(folder_path, min_speed, delay)

if len(points) == 0:
    print("No valid points found!")
print(f"Extracted {len(points)} total points")

# Bucket by speed
bucketed_data = bucket_by_speed(points, 5)
print(f"Data bucketed into {len(bucketed_data)} speed ranges")

Extracting data from ./logs
Found 3 rlog files
Processing ./logs/625ccd09daec723c_00000051--4aa8bf4631--2--rlog.zst
Processing ./logs/625ccd09daec723c_00000051--4aa8bf4631--1--rlog.zst
Processing ./logs/625ccd09daec723c_00000051--4aa8bf4631--0--rlog.zst
Extracted 1075 total points
Data bucketed into 1 speed ranges


In [3]:
# Plot results
create_interactive_plot_jupyter(bucketed_data)

HBox(children=(VBox(children=(Checkbox(value=True, description='Show Model'), FloatSlider(value=5.0, descripti…

Output()

{'speed_checkboxes': {10: Checkbox(value=True, description='10-15 m/s (1075 pts)', style=CheckboxStyle(description_width='initial'))},
 'model_params': {'a': FloatSlider(value=5.0, description='a (sharpness):', max=10.0, min=0.1),
  'b': FloatSlider(value=1.0, description='b (amplitude):', max=3.0, min=0.1, step=0.05),
  'c': FloatSlider(value=0.2, description='c (slope):', max=1.0, min=-1.0, step=0.01),
  'd': FloatSlider(value=-0.0, description='d (h shift):', max=1.0, min=-1.0, step=0.01),
  'e': FloatSlider(value=-0.0, description='e (v shift):', max=1.0, min=-1.0, step=0.01),
  'f': FloatSlider(value=20.0, description='vego:', max=40.0, step=2.5)},
 'show_model': Checkbox(value=True, description='Show Model')}