In [None]:
from opendbc.car import structs
from opendbc.car.hyundai.values import CAR, HyundaiFlags
from opendbc.car.hyundai.fingerprints import FW_VERSIONS

TEST_PLATFORMS = set(CAR.with_flags(HyundaiFlags.CANFD)) & set(CAR.with_flags(HyundaiFlags.CANFD_ANGLE_STEERING))  # CAN-FD electric vehicles only
#TEST_PLATFORMS = set(CAR.with_flags(HyundaiFlags.CANFD)) - set(CAR.with_flags(HyundaiFlags.EV))  # CAN-FD hybrid and ICE vehicles only

print(f"Found {len(TEST_PLATFORMS)} qualifying vehicles:")
for platform in TEST_PLATFORMS:
  print(f"  {platform}")

In [None]:
TEST_SEGMENTS = ['e1107f9d04dfb1e2/00000096--02ecca61a6']


In [None]:
import copy
import matplotlib.pyplot as plt
import numpy as np
from opendbc.can.parser import CANParser
from opendbc.car.hyundai.values import DBC
from opendbc.car.hyundai.hyundaicanfd import CanBus
from openpilot.selfdrive.pandad import can_capnp_to_list
from openpilot.tools.lib.logreader import LogReader

# Keep both messages we're interested in
message_names = ["LKAS_ALT", "LFA_ALT"]

# You need to define this variable if it's not already defined
# TEST_SEGMENTS = ["path/to/segment1", "path/to/segment2"]
# And platform variable if not defined
# platform = "CANFD"

# Select one segment for testing/debugging
segment = TEST_SEGMENTS[0]  # Change index as needed

In [None]:
# Load the segment
lr = LogReader(segment)
CP = lr.first("carParams")
if CP is None:
    print(f"No carParams found in segment {segment}")
else:
    print(f"Analyzing segment {segment} for {CP.carFingerprint}")

    # Get CAN messages
    can_msgs = [msg for msg in lr if msg.which() == "can"]
    print(f"Found {len(can_msgs)} CAN messages")
    
    # Setup parser
    parser_messages = []
    for name in message_names:
        parser_messages.append((name, 0))
    
    try:
        cp = CANParser(DBC[platform]["pt"], parser_messages, CanBus(CP).ECAN)
        print("Parser initialized successfully")
    except Exception as e:
        print(f"Error initializing parser: {e}")

In [None]:
parser_messages = [("LKAS_ALT",0),("LFA_ALT",0)]
# Reset parser
cp = CANParser(DBC[platform]["pt"], parser_messages, CanBus(CP).ECAN)

# Check a single message
example_idx = 0
a = set()
while example_idx < len(can_msgs):
    try:
        cp.update_strings(can_capnp_to_list([can_msgs[example_idx].as_builder().to_bytes()]))
        
        #Print all available signals in both messages
        #print("Available signals:")
        #if "LKAS_ALT" in cpacan.vl:
        #    print("  LKAS_ALT:", cpacan.vl["LKAS_ALT"])
        #if "LFA_ALT" in cp.vl:
        #    print("  LFA_ALT:", cp.vl["LFA_ALT"])
        
        # Found at least one message with data
        #if "LKAS_ALT" in cp.vl or "LFA_ALT" in cp.vl:
        #    break
        #if cp.vl["LFA_ALT"]["LKAS_ANGLE_MAX_TORQUE"]>0:
        #    a.add(cp.vl["LFA_ALT"]["LKAS_ANGLE_MAX_TORQUE"])
            #print("AAAA")
            #break
        if cp.vl["LFA_ALT"]["LKAS_ANGLE_ACTIVE"]>0:
            a.add(cp.vl["LFA_ALT"]["LKAS_ANGLE_ACTIVE"])
            #print("AAAA")
            #break
            
    except Exception as e:
        print(f"Error examining message {example_idx}: {e}")
    
    example_idx += 1
    

print(example_idx) # 72009
print(a)

In [None]:
# Reset parser
cp = CANParser(DBC[platform]["pt"], parser_messages, CanBus(CP).ECAN)

# Track timestamps and angle commands when active
timestamps = []
angle_cmds = []
active_flags = []  # For debugging
msg_indices = []   # Store original message indices for reference

# Process all CAN messages
for i, msg in enumerate(can_msgs):
    try:
        cp.update_strings(can_capnp_to_list([msg.as_builder().to_bytes()]))
        
        # Check if both required messages and signals are present
        if "LFA_ALT" in cp.vl and "LKAS_ALT" in cp.vl:
            if "LKAS_ANGLE_ACTIVE" in cp.vl["LFA_ALT"] and "LKAS_ANGLE_CMD" in cp.vl["LKAS_ALT"]:
                active_val = cp.vl["LFA_ALT"]["LKAS_ANGLE_ACTIVE"]
                active_flags.append(active_val)
                
                # Only collect data when active is 2
                if active_val == 2:
                    timestamps.append(msg.logMonoTime / 1e9)  # Convert to seconds
                    angle_cmds.append(cp.vl["LFA_ALT"]["LKAS_ANGLE_CMD"])
                    msg_indices.append(i)
    except Exception as e:
        if i % 1000 == 0:  # Only print errors occasionally to avoid flooding
            print(f"Error processing message {i}: {e}")

print(f"Total messages processed: {len(can_msgs)}")
print(f"LKAS_ANGLE_ACTIVE values encountered: {set(active_flags)}")
print(f"Active steering points collected: {len(timestamps)}")

In [None]:
# Find continuous active periods
active_periods = []
if msg_indices:
    # Start with the first active message
    current_period = [0, 0]
    
    for i in range(1, len(msg_indices)):
        # Check if consecutive in the original message sequence
        if msg_indices[i] == msg_indices[i-1] + 1:
            # Part of the same period
            current_period[1] = i
        else:
            # Gap in sequence, start new period
            active_periods.append(current_period)
            current_period = [i, i]
    
    # Add the last period
    active_periods.append(current_period)

print(f"Found {len(active_periods)} active steering periods")
for i, (start, end) in enumerate(active_periods):
    duration = timestamps[end] - timestamps[start]
    num_points = end - start + 1
    print(f"  Period {i+1}: {num_points} points over {duration:.2f}s")

In [None]:
## Choose a period to analyze (if any exist)
if active_periods:
    # Find the period with the most points
    #longest_period_idx = max(range(len(active_periods)), 
    #                         key=lambda i: active_periods[i][1] - active_periods[i][0])
    longest_period_idx = 1
    
    start_idx, end_idx = active_periods[longest_period_idx]
    
    # Get data for this period
    period_timestamps = timestamps[start_idx:end_idx+1]
    period_angle_cmds = angle_cmds[start_idx:end_idx+1]
    
    # Normalize timestamps to start at 0
    norm_timestamps = [t - period_timestamps[0] for t in period_timestamps]
    
    # Calculate rate of change
    angle_cmd_diffs = np.diff(period_angle_cmds)
    
    # Print statistics
    print(f"Analysis of Period {longest_period_idx+1}:")
    print(f"  Duration: {period_timestamps[-1] - period_timestamps[0]:.2f}s")
    print(f"  Points: {len(period_angle_cmds)}")
    print(f"  Angle range: {min(period_angle_cmds):.2f} to {max(period_angle_cmds):.2f}")
    print(f"  Max absolute rate of change: {max(abs(angle_cmd_diffs)):.2f} units/frame")
    
    # Plot the angle command evolution
    plt.figure(figsize=(12, 8))
    
    # Plot angle commands
    plt.subplot(2, 1, 1)
    plt.plot(norm_timestamps, period_angle_cmds)
    plt.title(f"LKAS_ANGLE_CMD Evolution - Period {longest_period_idx+1}")
    plt.xlabel("Time (s)")
    plt.ylabel("LKAS_ANGLE_CMD")
    plt.grid(True)
    
    # Plot rate of change
    plt.subplot(2, 1, 2)
    plt.plot(norm_timestamps[1:], angle_cmd_diffs)
    plt.title("Rate of Change")
    plt.xlabel("Time (s)")
    plt.ylabel("Delta LKAS_ANGLE_CMD per frame")
    plt.grid(True)
    
    plt.tight_layout()
    plt.show()
else:
    print("No active periods found to analyze")

In [None]:
# Analyze all active periods
if active_periods:
    # Initialize variables to track overall statistics
    all_max_rates = []
    
    # Create a figure with subplots for all periods
    num_periods = len(active_periods)
    fig, axes = plt.subplots(num_periods, 2, figsize=(14, 5*num_periods))
    
    # If there's only one period, make axes indexable as a 2D array
    if num_periods == 1:
        axes = np.array([axes])
    
    # Process each active period
    for i, (start_idx, end_idx) in enumerate(active_periods):
        # Get data for this period
        period_timestamps = timestamps[start_idx:end_idx+1]
        period_angle_cmds = angle_cmds[start_idx:end_idx+1]
        
        # Normalize timestamps to start at 0
        norm_timestamps = [t - period_timestamps[0] for t in period_timestamps]
        
        # Calculate rate of change
        angle_cmd_diffs = np.diff(period_angle_cmds)
        max_rate = max(abs(angle_cmd_diffs))
        all_max_rates.append(max_rate)
        
        # Print statistics for this period
        print(f"Analysis of Period {i+1}:")
        print(f"  Duration: {period_timestamps[-1] - period_timestamps[0]:.2f}s")
        print(f"  Points: {len(period_angle_cmds)}")
        print(f"  Angle range: {min(period_angle_cmds):.2f} to {max(period_angle_cmds):.2f}")
        print(f"  Max absolute rate of change: {max_rate:.2f} units/frame")
        print()
        
        # Plot angle commands
        axes[i, 0].plot(norm_timestamps, period_angle_cmds)
        axes[i, 0].set_title(f"LKAS_ANGLE_CMD Evolution - Period {i+1}")
        axes[i, 0].set_xlabel("Time (s)")
        axes[i, 0].set_ylabel("LKAS_ANGLE_CMD")
        axes[i, 0].grid(True)
        
        # Plot rate of change
        axes[i, 1].plot(norm_timestamps[1:], angle_cmd_diffs)
        axes[i, 1].set_title(f"Rate of Change - Period {i+1}")
        axes[i, 1].set_xlabel("Time (s)")
        axes[i, 1].set_ylabel("Delta LKAS_ANGLE_CMD per frame")
        axes[i, 1].grid(True)
    
    # Print average maximum rate of change
    avg_max_rate = sum(all_max_rates) / len(all_max_rates)
    print(f"Average maximum rate of change across all periods: {avg_max_rate:.2f} units/frame")
    
    # Add a summary section with all periods on one plot
    plt.figure(figsize=(14, 8))
    
    # Plot all periods with different colors
    for i, (start_idx, end_idx) in enumerate(active_periods):
        period_timestamps = timestamps[start_idx:end_idx+1]
        period_angle_cmds = angle_cmds[start_idx:end_idx+1]
        norm_timestamps = [t - period_timestamps[0] for t in period_timestamps]
        
        plt.plot(norm_timestamps, period_angle_cmds, label=f"Period {i+1}")
    
    plt.title("LKAS_ANGLE_CMD Evolution - All Periods (Time Normalized)")
    plt.xlabel("Time (s)")
    plt.ylabel("LKAS_ANGLE_CMD")
    plt.grid(True)
    plt.legend()
    
    plt.tight_layout()
    plt.show()
    
    # Also display the original figure with individual period plots
    fig.tight_layout()
    plt.show()
else:
    print("No active periods found to analyze")

In [None]:
# Analyze all active periods with enhanced metrics
if active_periods:
    # Initialize variables to track overall statistics
    all_max_rates_per_frame = []
    all_max_rates_per_second = []
    all_min_to_max_times = []
    all_min_to_max_rates = []
    
    # Create a figure with subplots for all periods
    num_periods = len(active_periods)
    fig, axes = plt.subplots(num_periods, 3, figsize=(18, 6*num_periods))
    
    # If there's only one period, make axes indexable as a 2D array
    if num_periods == 1:
        axes = np.array([axes]).reshape(1, 3)
    
    # Process each active period
    for i, (start_idx, end_idx) in enumerate(active_periods):
        # Get data for this period
        period_timestamps = timestamps[start_idx:end_idx+1]
        period_angle_cmds = angle_cmds[start_idx:end_idx+1]
        
        # Normalize timestamps to start at 0
        norm_timestamps = [t - period_timestamps[0] for t in period_timestamps]
        
        # Calculate rate of change per frame
        angle_cmd_diffs = np.diff(period_angle_cmds)
        max_rate_per_frame = max(abs(angle_cmd_diffs))
        all_max_rates_per_frame.append(max_rate_per_frame)
        
        # Calculate rate of change per second
        time_diffs = np.diff(period_timestamps)
        rates_per_second = [diff/t_diff if t_diff > 0 else 0 for diff, t_diff in zip(angle_cmd_diffs, time_diffs)]
        max_rate_per_second = max(abs(rate) for rate in rates_per_second)
        all_max_rates_per_second.append(max_rate_per_second)
        
        # Find local minima and maxima
        from scipy.signal import find_peaks
        
        # Find local maxima
        max_peaks, _ = find_peaks(period_angle_cmds)
        # Find local minima (by finding maxima of negative values)
        min_peaks, _ = find_peaks([-x for x in period_angle_cmds])
        
        min_to_max_times = []
        min_to_max_rates = []
        
        # For each minimum, find the next maximum and calculate metrics
        for min_idx in min_peaks:
            # Find the next maximum after this minimum
            next_max_indices = [idx for idx in max_peaks if idx > min_idx]
            if next_max_indices:  # If there's a maximum after this minimum
                next_max_idx = next_max_indices[0]
                
                # Time from min to max
                time_diff = period_timestamps[next_max_idx] - period_timestamps[min_idx]
                
                # Change in angle
                angle_diff = period_angle_cmds[next_max_idx] - period_angle_cmds[min_idx]
                
                # Rate of change
                rate = angle_diff / time_diff if time_diff > 0 else 0
                
                min_to_max_times.append(time_diff)
                min_to_max_rates.append(rate)
        
        if min_to_max_times:  # If we found min-to-max transitions
            avg_min_to_max_time = sum(min_to_max_times) / len(min_to_max_times)
            avg_min_to_max_rate = sum(min_to_max_rates) / len(min_to_max_rates)
            max_min_to_max_rate = max(abs(rate) for rate in min_to_max_rates)
            
            all_min_to_max_times.extend(min_to_max_times)
            all_min_to_max_rates.extend(min_to_max_rates)
        else:
            avg_min_to_max_time = 0
            avg_min_to_max_rate = 0
            max_min_to_max_rate = 0
        
        # Print statistics for this period
        print(f"Analysis of Period {i+1}:")
        print(f"  Duration: {period_timestamps[-1] - period_timestamps[0]:.2f}s")
        print(f"  Points: {len(period_angle_cmds)}")
        print(f"  Angle range: {min(period_angle_cmds):.2f} to {max(period_angle_cmds):.2f}")
        print(f"  Max absolute rate of change: {max_rate_per_frame:.2f} degrees/frame")
        print(f"  Max absolute rate of change: {max_rate_per_second:.2f} degrees/second")
        if min_to_max_times:
            print(f"  Avg time from min to max: {avg_min_to_max_time:.2f}s")
            print(f"  Avg rate from min to max: {avg_min_to_max_rate:.2f} degrees/second")
            print(f"  Max rate from min to max: {max_min_to_max_rate:.2f} degrees/second")
        print()
        
        # Plot angle commands
        axes[i, 0].plot(norm_timestamps, period_angle_cmds)
        # Add markers for mins and maxs
        for min_idx in min_peaks:
            axes[i, 0].plot(norm_timestamps[min_idx], period_angle_cmds[min_idx], 'rv', markersize=8)
        for max_idx in max_peaks:
            axes[i, 0].plot(norm_timestamps[max_idx], period_angle_cmds[max_idx], 'g^', markersize=8)
            
        axes[i, 0].set_title(f"LKAS_ANGLE_CMD Evolution - Period {i+1}")
        axes[i, 0].set_xlabel("Time (s)")
        axes[i, 0].set_ylabel("LKAS_ANGLE_CMD (degrees)")
        axes[i, 0].grid(True)
        
        # Plot rate of change per frame
        axes[i, 1].plot(norm_timestamps[1:], angle_cmd_diffs)
        axes[i, 1].set_title(f"Rate of Change - Period {i+1}")
        axes[i, 1].set_xlabel("Time (s)")
        axes[i, 1].set_ylabel("Delta (degrees/frame)")
        axes[i, 1].grid(True)
        
        # Plot rate of change per second
        axes[i, 2].plot([norm_timestamps[j] for j in range(1, len(norm_timestamps))], rates_per_second)
        axes[i, 2].set_title(f"Rate of Change - Period {i+1}")
        axes[i, 2].set_xlabel("Time (s)")
        axes[i, 2].set_ylabel("Delta (degrees/second)")
        axes[i, 2].grid(True)
    
    # Calculate overall statistics
    avg_max_rate_per_frame = sum(all_max_rates_per_frame) / len(all_max_rates_per_frame)
    avg_max_rate_per_second = sum(all_max_rates_per_second) / len(all_max_rates_per_second)
    
    print("\nOverall Statistics:")
    print(f"Average maximum rate of change: {avg_max_rate_per_frame:.2f} degrees/frame")
    print(f"Average maximum rate of change: {avg_max_rate_per_second:.2f} degrees/second")
    
    if all_min_to_max_times:
        avg_min_to_max_time = sum(all_min_to_max_times) / len(all_min_to_max_times)
        avg_min_to_max_rate = sum(all_min_to_max_rates) / len(all_min_to_max_rates)
        max_min_to_max_rate = max(abs(rate) for rate in all_min_to_max_rates)
        
        print(f"Average time from min to max: {avg_min_to_max_time:.2f}s")
        print(f"Average rate from min to max: {avg_min_to_max_rate:.2f} degrees/second")
        print(f"Maximum rate from min to max: {max_min_to_max_rate:.2f} degrees/second")
    
    # Add a summary section with all periods on one plot
    plt.figure(figsize=(14, 8))
    
    # Plot all periods with different colors
    for i, (start_idx, end_idx) in enumerate(active_periods):
        period_timestamps = timestamps[start_idx:end_idx+1]
        period_angle_cmds = angle_cmds[start_idx:end_idx+1]
        norm_timestamps = [t - period_timestamps[0] for t in period_timestamps]
        
        plt.plot(norm_timestamps, period_angle_cmds, label=f"Period {i+1}")
    
    plt.title("LKAS_ANGLE_CMD Evolution - All Periods (Time Normalized)")
    plt.xlabel("Time (s)")
    plt.ylabel("LKAS_ANGLE_CMD (degrees)")
    plt.grid(True)
    plt.legend()
    
    # Create a histogram of min-to-max times
    if all_min_to_max_times:
        plt.figure(figsize=(10, 6))
        plt.hist(all_min_to_max_times, bins=15)
        plt.title("Histogram of Min-to-Max Transition Times")
        plt.xlabel("Time (s)")
        plt.ylabel("Frequency")
        plt.grid(True)
        
        # Create a histogram of min-to-max rates
        plt.figure(figsize=(10, 6))
        plt.hist(all_min_to_max_rates, bins=15)
        plt.title("Histogram of Min-to-Max Transition Rates")
        plt.xlabel("Rate (degrees/second)")
        plt.ylabel("Frequency")
        plt.grid(True)
    
    plt.tight_layout()
    plt.show()
    
    # Also display the original figure with individual period plots
    fig.tight_layout()
    plt.show()
else:
    print("No active periods found to analyze")

In [None]:
# torque_ramping.py

import math
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker # For formatting the secondary x-axis

# --- Ramping Functions (ema_torque_ramp, sigmoid_smootherstep_torque_ramp, linear_torque_ramp) ---
# These functions remain unchanged from the previous version provided in the document.
# For brevity, I'll represent them as comments here, but they are the same as in the document.
# def ema_torque_ramp(...): ...
# def sigmoid_smootherstep_torque_ramp(...): ...
# def linear_torque_ramp(...): ...

def ema_torque_ramp(initial_torque: float, target_torque: float, alpha: float, completion_threshold: float = 0.999) -> list[float]:
    """
    Calculates a torque ramp using Exponential Moving Average (EMA).
    (Implementation as previously provided)
    """
    if not (0 < alpha <= 1):
        raise ValueError("Alpha must be between 0 (exclusive) and 1 (inclusive).")
    if not (0 < completion_threshold < 1):
        raise ValueError("Completion threshold must be between 0 and 1 (exclusive).")

    torque_values = [initial_torque]
    current_ema_torque = initial_torque
    total_change = target_torque - initial_torque
    max_steps = 10000 

    if abs(total_change) < 1e-6:
        return torque_values

    for _ in range(max_steps):
        current_ema_torque = (target_torque * alpha) + (current_ema_torque * (1 - alpha))
        torque_values.append(current_ema_torque)
        current_progress = abs(current_ema_torque - initial_torque)
        required_progress = abs(total_change) * completion_threshold
        if current_progress >= required_progress:
            break
        if abs(current_ema_torque - target_torque) < 1e-6:
            if len(torque_values) > 1 or initial_torque == target_torque:
                break
    
    if abs(torque_values[-1] - target_torque) > 1e-6 :
        threshold_met = (total_change > 0 and torque_values[-1] >= initial_torque + total_change * completion_threshold) or \
                        (total_change < 0 and torque_values[-1] <= initial_torque + total_change * completion_threshold) or \
                        (abs(current_ema_torque - target_torque) < 1e-5) # Check against the true EMA value
        if threshold_met:
             torque_values[-1] = target_torque
    return torque_values

def sigmoid_smootherstep_torque_ramp(initial_torque: float, target_torque: float, duration_steps: int) -> list[float]:
    """
    Calculates a torque ramp using an S-curve (Smootherstep) profile.
    s = 6x⁵ - 15x⁴ + 10x³
    (Implementation as previously provided)
    """
    if duration_steps < 0: 
        raise ValueError("Duration steps must be non-negative.")
    if duration_steps == 0:
        return [initial_torque, target_torque] if initial_torque != target_torque else [initial_torque]

    torque_values = []
    total_torque_change = target_torque - initial_torque
    for i in range(duration_steps + 1):
        x = i / duration_steps
        if x == 0: s = 0.0
        elif x == 1: s = 1.0
        else:
            x_3 = x * x * x
            x_4 = x_3 * x
            x_5 = x_4 * x
            s = 6 * x_5 - 15 * x_4 + 10 * x_3
        current_torque = initial_torque + total_torque_change * s
        torque_values.append(current_torque)
    return torque_values

def linear_torque_ramp(initial_torque: float, target_torque: float, step_per_cycle: float = 1.0) -> list[float]:
    """
    Calculates a torque ramp using a fixed linear step per cycle.
    (Implementation as previously provided)
    """
    if step_per_cycle <= 0:
        raise ValueError("step_per_cycle must be positive.")
    torque_values = [initial_torque]
    current_torque = initial_torque
    if abs(initial_torque - target_torque) < 1e-6:
        return torque_values

    actual_step = step_per_cycle if target_torque > initial_torque else -step_per_cycle
    # Adjusted max_iterations to handle very small steps relative to total_change
    if abs(actual_step) < 1e-9 : # Avoid division by zero or extremely small step
        max_iterations = 20000 # A large fallback
    else:
        max_iterations = int(abs(target_torque - initial_torque) / abs(actual_step)) + 10 


    for _ in range(max_iterations):
        if (actual_step > 0 and current_torque >= target_torque) or \
           (actual_step < 0 and current_torque <= target_torque):
            # Ensure the target is precisely hit if we just reached or passed it
            if torque_values[-1] != target_torque:
                 # This logic can be complex due to floating point.
                 # The break condition itself should handle it, then the post-loop ensures target.
                 pass
            break 
        
        next_torque = current_torque + actual_step
        if (actual_step > 0 and next_torque >= target_torque) or \
           (actual_step < 0 and next_torque <= target_torque):
            current_torque = target_torque
            torque_values.append(current_torque)
            break
        current_torque = next_torque
        torque_values.append(current_torque)

    # Ensure the final value is exactly the target if the ramp intended to complete.
    if abs(current_torque - target_torque) < 1e-6 : # If current_torque is essentially target
        if not torque_values or torque_values[-1] != target_torque:
            if torque_values and abs(torque_values[-1] - target_torque) < abs(actual_step) * 1.1 : # If last value is close
                 torque_values[-1] = target_torque
            else: # If list is empty or last value is far, append target
                 torque_values.append(target_torque)
    elif (actual_step > 0 and current_torque > target_torque) or \
         (actual_step < 0 and current_torque < target_torque) : # If overshot
        if not torque_values or torque_values[-1] != target_torque:
             torque_values[-1] = target_torque # Correct the overshoot to target

    # Remove duplicates at the end if any were formed by the logic
    if len(torque_values) > 1 and torque_values[-1] == torque_values[-2]:
        torque_values.pop()
        
    # If only initial torque is present and it's not the target, add target.
    if len(torque_values) == 1 and initial_torque != target_torque:
        torque_values.append(target_torque)

    return torque_values


def plot_ramps(ramp_data: list[dict], title: str, hz: float = 100.0):
    """
    Plots multiple torque ramps on the same graph, with a secondary x-axis for time in seconds.

    Args:
        ramp_data: A list of dictionaries, where each dictionary contains:
                   'label': The label for the ramp (e.g., "EMA Alpha 0.1").
                   'values': The list of torque values for the ramp.
                   'style': (Optional) matplotlib line style (e.g., '--')
                   'marker': (Optional) matplotlib marker style (e.g., '.')
        title: The title for the plot.
        hz: The frequency of the control loop in Hertz (for time calculation).
    """
    fig, ax1 = plt.subplots(figsize=(14, 8)) 
    
    max_steps = 0
    for ramp_info in ramp_data:
        steps_values = range(len(ramp_info['values']))
        if len(steps_values) > max_steps:
            max_steps = len(steps_values)
        ax1.plot(steps_values, ramp_info['values'], 
                 label=ramp_info['label'], 
                 linestyle=ramp_info.get('style', '-'), 
                 marker=ramp_info.get('marker', None),
                 markersize=ramp_info.get('markersize', 4)) 

    ax1.set_title(title, fontsize=16)
    ax1.set_xlabel(f"Steps (Control Cycles @ {hz}Hz)", fontsize=12)
    ax1.set_ylabel("Torque", fontsize=12)
    ax1.legend(loc='best') 
    ax1.grid(True, which='both', linestyle='--', linewidth=0.5) 

    ax2 = ax1.twiny() 

    ax1_xlim = ax1.get_xlim()
    # Ensure xlim[0] and xlim[1] are not identical before division, can happen if max_steps is 0 or 1
    if ax1_xlim[1] > ax1_xlim[0] and hz > 0:
        ax2.set_xlim(ax1_xlim[0] / hz, ax1_xlim[1] / hz)
    else: # Fallback if limits are problematic or hz is zero
        ax2.set_xlim(ax1_xlim[0], ax1_xlim[1])


    ax2.set_xlabel("Time (seconds)", fontsize=12)
    
    # Format the ticks on the secondary x-axis to show 4 decimal places
    ax2.xaxis.set_major_formatter(ticker.FormatStrFormatter('%.4f')) # Changed from %.2f

    if max_steps > 0 :
        # Ensure xlim is at least 1 step wide for proper display if max_steps is 1 (len is 2)
        current_xlim_max = max_steps -1 if max_steps > 1 else 1 # steps are 0-indexed
        padding = int(current_xlim_max*0.05) if current_xlim_max > 20 else 1
        ax1.set_xlim(0, current_xlim_max + padding ) 
    else: # Handle case with no steps or single point data
        ax1.set_xlim(0,1)

    # Update secondary axis limits again after primary axis might have changed
    if ax1.get_xlim()[1] > ax1.get_xlim()[0] and hz > 0:
         ax2.set_xlim(ax1.get_xlim()[0] / hz, ax1.get_xlim()[1] / hz)
    else:
         ax2.set_xlim(ax1.get_xlim()[0], ax1.get_xlim()[1])


    fig.tight_layout() 
    plt.show()

# --- Example Usage ---
if __name__ == "__main__":
    controller_hz = 100.0 

    initial_tq_up = 50.0
    target_tq_up = 200.0
    initial_tq_down = 200.0
    target_tq_down = 25.0
    linear_step_size = 1.0 

    alphas = {
        "Slow (α=0.05)": 0.05, 
        "Medium (α=0.2)": 0.2, 
        "Fast (α=0.5)": 0.5
    }
    
    durations = {
        "Short (10 steps)": 10,
        "Medium (20 steps)": 20,
        "Long (40 steps)": 40,
        "Longer (60 steps)": 60,
        "Very Long (100 steps)": 100 
    }

    # Ramp Up Data Collection
    ramp_up_plot_data = []
    print(f"\n--- Generating Ramp Up Data ({initial_tq_up} to {target_tq_up}) ---")

    linear_ramp_up = linear_torque_ramp(initial_tq_up, target_tq_up, linear_step_size)
    ramp_up_plot_data.append({"label": f"Linear (+{linear_step_size}/step)", "values": linear_ramp_up, "style": ":", "marker":"."})
    # Number of steps is len(ramp_values) - 1 (since ramp_values includes the initial point)
    print(f"Linear Ramp Up: {max(0, len(linear_ramp_up)-1)} steps. First 5: {[round(t,1) for t in linear_ramp_up[:5]]}..., Last 5: {[round(t,1) for t in linear_ramp_up[-5:]]}")

    for label_suffix, alpha_val in alphas.items():
        ramp = ema_torque_ramp(initial_tq_up, target_tq_up, alpha_val)
        ramp_up_plot_data.append({"label": f"EMA {label_suffix}", "values": ramp})
        print(f"EMA {label_suffix}: {max(0, len(ramp)-1)} steps. First 5: {[round(t,1) for t in ramp[:5]]}..., Last 5: {[round(t,1) for t in ramp[-5:]]}")

    for label_suffix, dur_val in durations.items():
        ramp = sigmoid_smootherstep_torque_ramp(initial_tq_up, target_tq_up, dur_val)
        ramp_up_plot_data.append({"label": f"Sigmoid {label_suffix}", "values": ramp, "style": "--"})
        print(f"Sigmoid {label_suffix}: {max(0, len(ramp)-1)} steps. First 5: {[round(t,1) for t in ramp[:5]]}..., Last 5: {[round(t,1) for t in ramp[-5:]]}")
    
    # Ramp Down Data Collection
    ramp_down_plot_data = []
    print(f"\n--- Generating Ramp Down Data ({initial_tq_down} to {target_tq_down}) ---")

    linear_ramp_down = linear_torque_ramp(initial_tq_down, target_tq_down, linear_step_size)
    ramp_down_plot_data.append({"label": f"Linear (-{linear_step_size}/step)", "values": linear_ramp_down, "style": ":", "marker":"."})
    print(f"Linear Ramp Down: {max(0, len(linear_ramp_down)-1)} steps. First 5: {[round(t,1) for t in linear_ramp_down[:5]]}..., Last 5: {[round(t,1) for t in linear_ramp_down[-5:]]}")

    for label_suffix, alpha_val in alphas.items():
        ramp = ema_torque_ramp(initial_tq_down, target_tq_down, alpha_val)
        ramp_down_plot_data.append({"label": f"EMA {label_suffix}", "values": ramp})
        print(f"EMA {label_suffix}: {max(0, len(ramp)-1)} steps. First 5: {[round(t,1) for t in ramp[:5]]}..., Last 5: {[round(t,1) for t in ramp[-5:]]}")

    for label_suffix, dur_val in durations.items():
        ramp = sigmoid_smootherstep_torque_ramp(initial_tq_down, target_tq_down, dur_val)
        ramp_down_plot_data.append({"label": f"Sigmoid {label_suffix}", "values": ramp, "style": "--"})
        print(f"Sigmoid {label_suffix}: {max(0, len(ramp)-1)} steps. First 5: {[round(t,1) for t in ramp[:5]]}..., Last 5: {[round(t,1) for t in ramp[-5:]]}")

    # Plotting
    plot_ramps(ramp_up_plot_data, f"Torque Ramp Up: {initial_tq_up} to {target_tq_up}", hz=controller_hz)
    plot_ramps(ramp_down_plot_data, f"Torque Ramp Down: {initial_tq_down} to {target_tq_down}", hz=controller_hz)


In [None]:
# torque_ramping.py

import math
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker # For formatting the secondary x-axis

class EmaRamp:
    """
    Manages EMA torque ramping, calculating one step at a time.
    """
    def __init__(self, initial_torque: float, target_torque: float, alpha: float, 
                 completion_threshold: float = 0.999): # Completion threshold is for internal 'done' state
        if not (0 < alpha <= 1):
            raise ValueError("Alpha must be between 0 (exclusive) and 1 (inclusive).")
        
        self.current_torque = initial_torque
        self.target_torque = target_torque
        self.alpha = alpha
        
        # Internal state to know if the ramp considers itself "done"
        # This doesn't stop step() from being called, but stops further changes.
        self._is_effectively_done = False 
        if abs(self.current_torque - self.target_torque) < 1e-7:
            self._is_effectively_done = True

        # For monitoring progress if needed, not strictly used by step() to halt
        self._initial_torque_for_threshold = initial_torque
        self._total_change_for_threshold = target_torque - initial_torque
        self._completion_threshold = completion_threshold


    def step(self) -> float:
        """Calculates and returns the next torque value for this cycle."""
        if self._is_effectively_done:
            return self.current_torque

        # Calculate next EMA value
        next_torque = (self.target_torque * self.alpha) + (self.current_torque * (1 - self.alpha))
        self.current_torque = next_torque
        
        # Check if the ramp is now effectively complete
        if abs(self.current_torque - self.target_torque) < 1e-7:
            self.current_torque = self.target_torque # Snap to target
            self._is_effectively_done = True
        elif abs(self._total_change_for_threshold) > 1e-7: # Check threshold if there's change
            current_progress_fraction = abs(self.current_torque - self._initial_torque_for_threshold) / abs(self._total_change_for_threshold)
            if current_progress_fraction >= self._completion_threshold:
                # If threshold met, snap to target for subsequent calls if not already there
                self.current_torque = self.target_torque 
                self._is_effectively_done = True
        
        return self.current_torque

class SigmoidRamp:
    """
    Manages Sigmoid (Smootherstep) torque ramping, calculating one step at a time.
    Formula: s = 6x⁵ - 15x⁴ + 10x³
    """
    def __init__(self, initial_torque: float, target_torque: float, duration_steps: int):
        if duration_steps < 0: 
            raise ValueError("Duration steps must be non-negative.")
        
        self.initial_torque = initial_torque
        self.target_torque = target_torque
        self.duration_steps = duration_steps # This is the number of steps to reach the target
        self.total_torque_change = target_torque - initial_torque
        
        self.current_simulation_step = 0 # Tracks how many times step() has been called effectively
        self.current_torque = initial_torque

        if self.duration_steps == 0: # If duration is 0, it's immediately at target or initial
            self.current_torque = self.target_torque


    def step(self) -> float:
        """Calculates and returns the next torque value for this cycle."""
        if self.duration_steps == 0: # Already at target if duration is 0
            return self.current_torque

        # current_ramp_step is the step *within the defined sigmoid duration*
        current_ramp_step = self.current_simulation_step 
        self.current_simulation_step +=1 # Increment for next call

        if current_ramp_step >= self.duration_steps:
            # If we've completed the defined duration, output target torque
            self.current_torque = self.target_torque
            return self.current_torque

        x = current_ramp_step / self.duration_steps # Normalized time for the ramp segment
        s = 0.0
        if x == 0: s = 0.0
        elif x >= 1.0: s = 1.0 # Should be caught by current_ramp_step >= duration_steps
        else:
            x_3 = x * x * x 
            x_4 = x_3 * x   
            x_5 = x_4 * x   
            s = 6 * x_5 - 15 * x_4 + 10 * x_3
        
        self.current_torque = self.initial_torque + self.total_torque_change * s
        
        # If this step is the last one in the defined duration, ensure it's exactly target
        if current_ramp_step == self.duration_steps -1 : # Last calculation before it's considered "done"
             # The next call will have current_ramp_step == self.duration_steps
             # So, this calculation is for x = (D-1)/D. The one for x=1 is when current_ramp_step == D
             pass # The logic for current_ramp_step >= self.duration_steps will handle snapping to target.
        
        # Ensure if we are at the exact duration_steps point, torque is target
        if current_ramp_step == self.duration_steps :
            self.current_torque = self.target_torque


        return self.current_torque

class LinearRamp:
    """
    Manages Linear torque ramping, calculating one step at a time.
    """
    def __init__(self, initial_torque: float, target_torque: float, step_per_cycle: float = 1.0):
        if step_per_cycle <= 0:
            raise ValueError("step_per_cycle must be positive.")

        self.current_torque = initial_torque
        self.target_torque = target_torque
        
        self.actual_step_value = 0 # Default to 0 if no change needed
        if abs(initial_torque - target_torque) >= 1e-7: # Only set step if change is needed
            self.actual_step_value = step_per_cycle if target_torque > initial_torque else -step_per_cycle
        
        self._is_done = abs(initial_torque - target_torque) < 1e-7


    def step(self) -> float:
        """Calculates and returns the next torque value for this cycle."""
        if self._is_done or self.actual_step_value == 0:
            return self.current_torque

        next_torque_candidate = self.current_torque + self.actual_step_value

        if (self.actual_step_value > 0 and next_torque_candidate >= self.target_torque) or \
           (self.actual_step_value < 0 and next_torque_candidate <= self.target_torque):
            self.current_torque = self.target_torque # Snap to target
            self._is_done = True
        else:
            self.current_torque = next_torque_candidate
        
        return self.current_torque


def plot_ramps(ramp_data: list[dict], title: str, hz: float = 100.0):
    """
    Plots multiple torque ramps on the same graph, with a secondary x-axis for time in seconds.
    (Implementation remains largely the same)
    """
    fig, ax1 = plt.subplots(figsize=(14, 8)) 
    max_len = 0 
    for ramp_info in ramp_data:
        num_points = len(ramp_info['values'])
        if num_points > max_len: max_len = num_points
        steps_values = range(num_points) 
        ax1.plot(steps_values, ramp_info['values'], 
                 label=ramp_info['label'], 
                 linestyle=ramp_info.get('style', '-'), 
                 marker=ramp_info.get('marker', None),
                 markersize=ramp_info.get('markersize', 4)) 
    ax1.set_title(title, fontsize=16)
    ax1.set_xlabel(f"Simulation Cycles (Steps @ {hz}Hz)", fontsize=12) # Changed label
    ax1.set_ylabel("Torque", fontsize=12)
    ax1.legend(loc='best') 
    ax1.grid(True, which='both', linestyle='--', linewidth=0.5) 
    ax2 = ax1.twiny() 
    ax1_xlim = ax1.get_xlim()
    if ax1_xlim[1] > ax1_xlim[0] and hz > 0:
        ax2.set_xlim(ax1_xlim[0] / hz, ax1_xlim[1] / hz)
    else: 
        ax2.set_xlim(ax1_xlim[0], ax1_xlim[1])
    ax2.set_xlabel("Time (seconds)", fontsize=12)
    ax2.xaxis.set_major_formatter(ticker.FormatStrFormatter('%.4f'))
    if max_len > 0:
        xlim_max_steps = max_len -1 
        padding = int(xlim_max_steps * 0.05) if xlim_max_steps > 20 else 1
        ax1.set_xlim(-0.5, xlim_max_steps + padding + 0.5) 
    else: 
        ax1.set_xlim(-0.5, 1.5)
    new_ax1_xlim = ax1.get_xlim()
    if new_ax1_xlim[1] > new_ax1_xlim[0] and hz > 0:
         ax2.set_xlim(new_ax1_xlim[0] / hz, new_ax1_xlim[1] / hz)
    else:
         ax2.set_xlim(new_ax1_xlim[0], new_ax1_xlim[1])
    fig.tight_layout() 
    plt.show()

# --- Example Usage ---
if __name__ == "__main__":
    controller_hz = 100.0 
    simulation_cycles = 160 # Arbitrary number of cycles to run the simulation

    initial_tq_up = 50.0
    target_tq_up = 200.0
    initial_tq_down = 200.0
    target_tq_down = 25.0
    linear_step_size = 1.0 

    alphas_config = {
        "Slow (α=0.05)": 0.05, 
        "Medium (α=0.2)": 0.2, 
        "Fast (α=0.5)": 0.5
    }
    
    # For Sigmoid, duration_steps is the number of steps *to complete* the ramp.
    # The ramp will output target_torque after these many steps.
    durations_config = { 
        "Short (10 steps completion)": 10, 
        "Medium (20 steps completion)": 20,
        "Long (40 steps completion)": 40 
    }

    # --- Ramp Up Simulation ---
    print(f"\n--- Simulating Ramp Up ({initial_tq_up} to {target_tq_up}) for {simulation_cycles} cycles ---")
    ramp_up_plot_data = []

    # Linear Ramp Up
    linear_ramper_up = LinearRamp(initial_tq_up, target_tq_up, linear_step_size)
    linear_ramp_up_values = [initial_tq_up] # Start with initial value
    for _ in range(simulation_cycles):
        linear_ramp_up_values.append(linear_ramper_up.step())
    ramp_up_plot_data.append({"label": f"Linear (+{linear_step_size}/step)", "values": linear_ramp_up_values, "style": ":", "marker":"."})
    print(f"Linear Ramp Up after {simulation_cycles} cycles: final torque {linear_ramp_up_values[-1]:.1f}")

    # EMA Ramps Up
    for label_suffix, alpha_val in alphas_config.items():
        ema_ramper_up = EmaRamp(initial_tq_up, target_tq_up, alpha_val)
        ema_ramp_up_values = [initial_tq_up]
        for _ in range(simulation_cycles):
            ema_ramp_up_values.append(ema_ramper_up.step())
        ramp_up_plot_data.append({"label": f"EMA {label_suffix}", "values": ema_ramp_up_values})
        print(f"EMA {label_suffix} after {simulation_cycles} cycles: final torque {ema_ramp_up_values[-1]:.1f}")

    # Sigmoid Ramps Up
    for label_suffix, dur_val in durations_config.items():
        sigmoid_ramper_up = SigmoidRamp(initial_tq_up, target_tq_up, dur_val)
        sigmoid_ramp_up_values = [initial_tq_up]
        for _ in range(simulation_cycles):
            sigmoid_ramp_up_values.append(sigmoid_ramper_up.step())
        ramp_up_plot_data.append({"label": f"Sigmoid {label_suffix}", "values": sigmoid_ramp_up_values, "style": "--"})
        print(f"Sigmoid {label_suffix} after {simulation_cycles} cycles: final torque {sigmoid_ramp_up_values[-1]:.1f}")
    
    plot_ramps(ramp_up_plot_data, f"Torque Ramp Up: {initial_tq_up} to {target_tq_up} (Simulated for {simulation_cycles} cycles)", hz=controller_hz)

    # --- Ramp Down Simulation ---
    print(f"\n--- Simulating Ramp Down ({initial_tq_down} to {target_tq_down}) for {simulation_cycles} cycles ---")
    ramp_down_plot_data = []

    # Linear Ramp Down
    linear_ramper_down = LinearRamp(initial_tq_down, target_tq_down, linear_step_size)
    linear_ramp_down_values = [initial_tq_down]
    for _ in range(simulation_cycles):
        linear_ramp_down_values.append(linear_ramper_down.step())
    ramp_down_plot_data.append({"label": f"Linear (-{linear_step_size}/step)", "values": linear_ramp_down_values, "style": ":", "marker":"."})
    print(f"Linear Ramp Down after {simulation_cycles} cycles: final torque {linear_ramp_down_values[-1]:.1f}")
    
    # EMA Ramps Down
    for label_suffix, alpha_val in alphas_config.items():
        ema_ramper_down = EmaRamp(initial_tq_down, target_tq_down, alpha_val)
        ema_ramp_down_values = [initial_tq_down]
        for _ in range(simulation_cycles):
            ema_ramp_down_values.append(ema_ramper_down.step())
        ramp_down_plot_data.append({"label": f"EMA {label_suffix}", "values": ema_ramp_down_values})
        print(f"EMA {label_suffix} after {simulation_cycles} cycles: final torque {ema_ramp_down_values[-1]:.1f}")

    # Sigmoid Ramps Down
    for label_suffix, dur_val in durations_config.items():
        sigmoid_ramper_down = SigmoidRamp(initial_tq_down, target_tq_down, dur_val)
        sigmoid_ramp_down_values = [initial_tq_down]
        for _ in range(simulation_cycles):
            sigmoid_ramp_down_values.append(sigmoid_ramper_down.step())
        ramp_down_plot_data.append({"label": f"Sigmoid {label_suffix}", "values": sigmoid_ramp_down_values, "style": "--"})
        print(f"Sigmoid {label_suffix} after {simulation_cycles} cycles: final torque {sigmoid_ramp_down_values[-1]:.1f}")

    plot_ramps(ramp_down_plot_data, f"Torque Ramp Down: {initial_tq_down} to {target_tq_down} (Simulated for {simulation_cycles} cycles)", hz=controller_hz)



In [None]:
# torque_ramping.py

import math
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker # For formatting the secondary x-axis

class EmaRamp:
    """
    Manages EMA torque ramping, calculating one step at a time.
    Can handle dynamic target torque changes.
    """
    def __init__(self, initial_torque: float, target_torque: float, alpha: float, 
                 completion_threshold: float = 0.999):
        if not (0 < alpha <= 1):
            raise ValueError("Alpha must be between 0 (exclusive) and 1 (inclusive).")
        
        self.current_torque = initial_torque
        self.target_torque = target_torque
        self.alpha = alpha
        
        self._is_effectively_done = False 
        self._initial_torque_for_threshold = initial_torque # Used for current segment
        self._total_change_for_threshold = target_torque - initial_torque # For current segment
        self._completion_threshold = completion_threshold
        self._update_done_status()

    def _update_done_status(self):
        """Checks if the current torque is at the target torque."""
        self._is_effectively_done = abs(self.current_torque - self.target_torque) < 1e-7
        if not self._is_effectively_done:
            # Recalculate threshold parameters for the new segment if target changed
            self._initial_torque_for_threshold = self.current_torque 
            self._total_change_for_threshold = self.target_torque - self.current_torque


    def set_target_torque(self, new_target_torque: float):
        """Updates the target torque and resets ramp state if necessary."""
        if abs(self.target_torque - new_target_torque) > 1e-7: # If target actually changed
            self.target_torque = new_target_torque
            self._update_done_status() # Reset done status and threshold params

    def step(self) -> float:
        """Calculates and returns the next torque value for this cycle."""
        if self._is_effectively_done:
            return self.current_torque

        next_torque = (self.target_torque * self.alpha) + (self.current_torque * (1 - self.alpha))
        self.current_torque = next_torque
        
        if abs(self.current_torque - self.target_torque) < 1e-7:
            self.current_torque = self.target_torque 
            self._is_effectively_done = True
        elif abs(self._total_change_for_threshold) > 1e-7:
            # Check progress only if there's a change to be made for current segment
            current_progress_fraction = abs(self.current_torque - self._initial_torque_for_threshold) / abs(self._total_change_for_threshold)
            if current_progress_fraction >= self._completion_threshold:
                self.current_torque = self.target_torque 
                self._is_effectively_done = True
        elif abs(self._total_change_for_threshold) <= 1e-7: # No change needed for this segment
             self._is_effectively_done = True


        return self.current_torque

class SigmoidRamp:
    """
    Manages Sigmoid (Smootherstep) torque ramping, calculating one step at a time.
    Can handle dynamic target torque changes by starting a new S-curve segment.
    Formula: s = 6x⁵ - 15x⁴ + 10x³
    """
    def __init__(self, initial_torque: float, target_torque: float, duration_steps: int):
        if duration_steps < 0: 
            raise ValueError("Duration steps must be non-negative (0 means immediate).")
        
        self.initial_segment_torque = initial_torque # Torque at the start of the current S-curve segment
        self.target_torque = target_torque
        self.original_duration_steps = duration_steps # Fixed duration for any segment
        
        self.current_torque = initial_torque
        self.current_segment_step = 0 # Steps taken *within the current S-curve segment*
        self._recalculate_segment_parameters()

        if self.original_duration_steps == 0:
            self.current_torque = self.target_torque

    def _recalculate_segment_parameters(self):
        """Recalculates parameters for the current S-curve segment."""
        self.current_segment_total_change = self.target_torque - self.initial_segment_torque

    def set_target_torque(self, new_target_torque: float):
        """Updates the target torque. Starts a new S-curve segment from current torque."""
        if abs(self.target_torque - new_target_torque) > 1e-7:
            self.target_torque = new_target_torque
            self.initial_segment_torque = self.current_torque # New segment starts from current torque
            self.current_segment_step = 0 # Reset step count for the new segment
            self._recalculate_segment_parameters()
            if self.original_duration_steps == 0: # If duration is 0, snap to new target
                self.current_torque = self.target_torque


    def step(self) -> float:
        """Calculates and returns the next torque value for this cycle."""
        if self.original_duration_steps == 0:
            # self.current_torque is already set to target_torque in init or set_target_torque
            return self.current_torque

        if self.current_segment_step >= self.original_duration_steps:
            # If current segment duration is complete, output target torque of this segment
            self.current_torque = self.target_torque 
            # We don't increment current_segment_step beyond duration_steps here,
            # it will just keep returning target_torque.
            return self.current_torque

        # Calculate S-curve factor for the current step in the current segment
        # Note: x should go from 0 to 1 over original_duration_steps.
        # If original_duration_steps is 1, x will be 0/1 then 1/1.
        x = 0.0
        if self.original_duration_steps > 0 : # Avoid division by zero if duration is 0 (though handled above)
             x = self.current_segment_step / self.original_duration_steps
        
        s = 0.0
        if x <= 0: s = 0.0 # Handles x=0
        elif x >= 1.0: s = 1.0 # Handles x=1
        else:
            x_3 = x * x * x 
            x_4 = x_3 * x   
            x_5 = x_4 * x   
            s = 6 * x_5 - 15 * x_4 + 10 * x_3
        
        self.current_torque = self.initial_segment_torque + self.current_segment_total_change * s
        self.current_segment_step += 1 # Increment for the next call
        
        # If this was the last step to calculate for the segment, ensure it's precisely the target
        if self.current_segment_step == self.original_duration_steps:
             self.current_torque = self.target_torque


        return self.current_torque

class LinearRamp:
    """
    Manages Linear torque ramping, calculating one step at a time.
    Can handle dynamic target torque changes.
    """
    def __init__(self, initial_torque: float, target_torque: float, step_per_cycle: float = 1.0):
        if step_per_cycle <= 0:
            raise ValueError("step_per_cycle must be positive.")

        self.current_torque = initial_torque
        self.target_torque = target_torque
        self.step_per_cycle_magnitude = step_per_cycle # Store the magnitude
        
        self._actual_step_value = 0
        self._is_done = False
        self._update_step_direction_and_done_status()

    def _update_step_direction_and_done_status(self):
        """Updates step direction based on current and target torque, and done status."""
        if abs(self.current_torque - self.target_torque) < 1e-7:
            self._actual_step_value = 0
            self._is_done = True
        else:
            self._actual_step_value = self.step_per_cycle_magnitude if self.target_torque > self.current_torque else -self.step_per_cycle_magnitude
            self._is_done = False

    def set_target_torque(self, new_target_torque: float):
        """Updates the target torque and re-evaluates step direction and done status."""
        if abs(self.target_torque - new_target_torque) > 1e-7:
            self.target_torque = new_target_torque
            self._update_step_direction_and_done_status()

    def step(self) -> float:
        """Calculates and returns the next torque value for this cycle."""
        if self._is_done or self._actual_step_value == 0:
            return self.current_torque

        next_torque_candidate = self.current_torque + self._actual_step_value

        if (self._actual_step_value > 0 and next_torque_candidate >= self.target_torque) or \
           (self._actual_step_value < 0 and next_torque_candidate <= self.target_torque):
            self.current_torque = self.target_torque 
            self._is_done = True # Mark as done for this target
            self._actual_step_value = 0 # Stop stepping until target changes again
        else:
            self.current_torque = next_torque_candidate
        
        return self.current_torque


def plot_ramps(ramp_data: list[dict], title: str, hz: float = 100.0, events: list[dict] = None):
    """
    Plots multiple torque ramps on the same graph, with a secondary x-axis for time in seconds.
    Can also plot vertical lines for events like target changes.
    """
    fig, ax1 = plt.subplots(figsize=(14, 8)) 
    max_len = 0 
    for ramp_info in ramp_data:
        num_points = len(ramp_info['values'])
        if num_points > max_len: max_len = num_points
        steps_values = range(num_points) 
        ax1.plot(steps_values, ramp_info['values'], 
                 label=ramp_info['label'], 
                 linestyle=ramp_info.get('style', '-'), 
                 marker=ramp_info.get('marker', None),
                 markersize=ramp_info.get('markersize', 4)) 

    if events:
        for event in events:
            ax1.axvline(x=event['cycle'], color=event.get('color', 'r'), linestyle=event.get('linestyle', '--'), label=event['label'])


    ax1.set_title(title, fontsize=16)
    ax1.set_xlabel(f"Simulation Cycles (Steps @ {hz}Hz)", fontsize=12)
    ax1.set_ylabel("Torque", fontsize=12)
    ax1.legend(loc='best') 
    ax1.grid(True, which='both', linestyle='--', linewidth=0.5) 
    ax2 = ax1.twiny() 
    ax1_xlim = ax1.get_xlim()
    if ax1_xlim[1] > ax1_xlim[0] and hz > 0:
        ax2.set_xlim(ax1_xlim[0] / hz, ax1_xlim[1] / hz)
    else: 
        ax2.set_xlim(ax1_xlim[0], ax1_xlim[1])
    ax2.set_xlabel("Time (seconds)", fontsize=12)
    ax2.xaxis.set_major_formatter(ticker.FormatStrFormatter('%.4f'))
    if max_len > 0:
        xlim_max_steps = max_len -1 
        padding = int(xlim_max_steps * 0.05) if xlim_max_steps > 20 else 1
        ax1.set_xlim(-0.5, xlim_max_steps + padding + 0.5) 
    else: 
        ax1.set_xlim(-0.5, 1.5)
    new_ax1_xlim = ax1.get_xlim()
    if new_ax1_xlim[1] > new_ax1_xlim[0] and hz > 0:
         ax2.set_xlim(new_ax1_xlim[0] / hz, new_ax1_xlim[1] / hz)
    else:
         ax2.set_xlim(new_ax1_xlim[0], new_ax1_xlim[1])
    fig.tight_layout() 
    plt.show()

# --- Example Usage ---
if __name__ == "__main__":
    controller_hz = 100.0 
    simulation_cycles = 200 # Increased cycles to see effect of change
    change_target_cycle = 50 # Cycle at which target torque changes

    initial_tq_up = 50.0
    target_tq_up_initial = 200.0
    target_tq_up_new = 120.0 # New target for ramp up scenario

    initial_tq_down = 200.0
    target_tq_down_initial = 50.0
    target_tq_down_new = 65.0 # New target for ramp down scenario
    
    linear_step_size = 1.0 

    alphas_config = {
        #"Slow (α=0.05)": 0.05, 
        "Medium (α=0.2)": 0.2, 
        #"Fast (α=0.5)": 0.5
    }
    
    durations_config = { 
        "Short (10 steps completion)": 10, 
        "Medium (25 steps completion)": 25,
        "Long (40 steps completion)": 40 
    }

    # Store all rampers for dynamic target change
    all_rampers_up = []
    all_rampers_down = []

    # --- Ramp Up Simulation ---
    print(f"\n--- Simulating Ramp Up ({initial_tq_up} to {target_tq_up_initial}, then to {target_tq_up_new}) for {simulation_cycles} cycles ---")
    ramp_up_plot_data = []

    # Linear Ramp Up
    linear_ramper_up = LinearRamp(initial_tq_up, target_tq_up_initial, linear_step_size)
    all_rampers_up.append(linear_ramper_up)
    linear_ramp_up_values = [initial_tq_up] 
    
    # EMA Ramps Up
    ema_rampers_up_dict = {}
    for label_suffix, alpha_val in alphas_config.items():
        ramper = EmaRamp(initial_tq_up, target_tq_up_initial, alpha_val)
        ema_rampers_up_dict[label_suffix] = {"ramper": ramper, "values": [initial_tq_up]}
        all_rampers_up.append(ramper)

    # Sigmoid Ramps Up
    sigmoid_rampers_up_dict = {}
    for label_suffix, dur_val in durations_config.items():
        ramper = SigmoidRamp(initial_tq_up, target_tq_up_initial, dur_val)
        sigmoid_rampers_up_dict[label_suffix] = {"ramper": ramper, "values": [initial_tq_up]}
        all_rampers_up.append(ramper)

    # Main simulation loop for RAMP UP
    for cycle in range(simulation_cycles):
        if cycle == change_target_cycle:
            print(f"Cycle {cycle}: Changing ramp UP target to {target_tq_up_new}")
            for ramper in all_rampers_up:
                ramper.set_target_torque(target_tq_up_new)
        
        linear_ramp_up_values.append(linear_ramper_up.step())
        for config in ema_rampers_up_dict.values():
            config["values"].append(config["ramper"].step())
        for config in sigmoid_rampers_up_dict.values():
            config["values"].append(config["ramper"].step())

    ramp_up_plot_data.append({"label": f"Linear (+{linear_step_size}/step)", "values": linear_ramp_up_values, "style": ":", "marker":"."})
    print(f"Linear Ramp Up after {simulation_cycles} cycles: final torque {linear_ramp_up_values[-1]:.1f}")
    for label_suffix, config in ema_rampers_up_dict.items():
        ramp_up_plot_data.append({"label": f"EMA {label_suffix}", "values": config["values"]})
        print(f"EMA {label_suffix} after {simulation_cycles} cycles: final torque {config['values'][-1]:.1f}")
    for label_suffix, config in sigmoid_rampers_up_dict.items():
        ramp_up_plot_data.append({"label": f"Sigmoid {label_suffix}", "values": config["values"], "style": "--"})
        print(f"Sigmoid {label_suffix} after {simulation_cycles} cycles: final torque {config['values'][-1]:.1f}")
    
    plot_events_up = [{'cycle': change_target_cycle, 'label': f'Target -> {target_tq_up_new}'}]
    plot_ramps(ramp_up_plot_data, f"Torque Ramp Up with Dynamic Target (Simulated for {simulation_cycles} cycles)", hz=controller_hz, events=plot_events_up)


    # --- Ramp Down Simulation ---
    print(f"\n--- Simulating Ramp Down ({initial_tq_down} to {target_tq_down_initial}, then to {target_tq_down_new}) for {simulation_cycles} cycles ---")
    ramp_down_plot_data = []

    # Linear Ramp Down
    linear_ramper_down = LinearRamp(initial_tq_down, target_tq_down_initial, linear_step_size)
    all_rampers_down.append(linear_ramper_down)
    linear_ramp_down_values = [initial_tq_down]

    # EMA Ramps Down
    ema_rampers_down_dict = {}
    for label_suffix, alpha_val in alphas_config.items():
        ramper = EmaRamp(initial_tq_down, target_tq_down_initial, alpha_val)
        ema_rampers_down_dict[label_suffix] = {"ramper": ramper, "values": [initial_tq_down]}
        all_rampers_down.append(ramper)

    # Sigmoid Ramps Down
    sigmoid_rampers_down_dict = {}
    for label_suffix, dur_val in durations_config.items():
        ramper = SigmoidRamp(initial_tq_down, target_tq_down_initial, dur_val)
        sigmoid_rampers_down_dict[label_suffix] = {"ramper": ramper, "values": [initial_tq_down]}
        all_rampers_down.append(ramper)

    # Main simulation loop for RAMP DOWN
    for cycle in range(simulation_cycles):
        if cycle == change_target_cycle:
            print(f"Cycle {cycle}: Changing ramp DOWN target to {target_tq_down_new}")
            for ramper in all_rampers_down:
                ramper.set_target_torque(target_tq_down_new)

        linear_ramp_down_values.append(linear_ramper_down.step())
        for config in ema_rampers_down_dict.values():
            config["values"].append(config["ramper"].step())
        for config in sigmoid_rampers_down_dict.values():
            config["values"].append(config["ramper"].step())

    ramp_down_plot_data.append({"label": f"Linear (-{linear_step_size}/step)", "values": linear_ramp_down_values, "style": ":", "marker":"."})
    print(f"Linear Ramp Down after {simulation_cycles} cycles: final torque {linear_ramp_down_values[-1]:.1f}")
    for label_suffix, config in ema_rampers_down_dict.items():
        ramp_down_plot_data.append({"label": f"EMA {label_suffix}", "values": config["values"]})
        print(f"EMA {label_suffix} after {simulation_cycles} cycles: final torque {config['values'][-1]:.1f}")
    for label_suffix, config in sigmoid_rampers_down_dict.items():
        ramp_down_plot_data.append({"label": f"Sigmoid {label_suffix}", "values": config["values"], "style": "--"})
        print(f"Sigmoid {label_suffix} after {simulation_cycles} cycles: final torque {config['values'][-1]:.1f}")

    plot_events_down = [{'cycle': change_target_cycle, 'label': f'Target -> {target_tq_down_new}'}]
    plot_ramps(ramp_down_plot_data, f"Torque Ramp Down with Dynamic Target (Simulated for {simulation_cycles} cycles)", hz=controller_hz, events=plot_events_down)

    print("\nPlotting complete. Close plot windows to exit.")


In [None]:
# torque_ramping.py

import math
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker # For formatting the secondary x-axis

class EmaRamp:
    """
    Manages EMA torque ramping, calculating one step at a time.
    Can handle dynamic target torque changes.
    """
    def __init__(self, initial_torque: float, target_torque: float, alpha: float, 
                 completion_threshold: float = 0.999):
        if not (0 < alpha <= 1):
            raise ValueError("Alpha must be between 0 (exclusive) and 1 (inclusive).")
        
        self.current_torque = initial_torque
        self.target_torque = target_torque
        self.alpha = alpha
        
        self._is_effectively_done = False 
        self._initial_torque_for_threshold = initial_torque 
        self._total_change_for_threshold = target_torque - initial_torque 
        self._completion_threshold = completion_threshold
        self._update_done_status()

    def _update_done_status(self):
        """Checks if the current torque is at the target torque."""
        self._is_effectively_done = abs(self.current_torque - self.target_torque) < 1e-7
        if not self._is_effectively_done:
            self._initial_torque_for_threshold = self.current_torque 
            self._total_change_for_threshold = self.target_torque - self.current_torque
        else: # If done, ensure threshold params reflect no change needed
            self._initial_torque_for_threshold = self.current_torque
            self._total_change_for_threshold = 0


    def set_target_torque(self, new_target_torque: float):
        """Updates the target torque and resets ramp state if necessary."""
        if abs(self.target_torque - new_target_torque) > 1e-7: 
            self.target_torque = new_target_torque
            self._update_done_status() 

    def step(self) -> float:
        """Calculates and returns the next torque value for this cycle."""
        if self._is_effectively_done:
            return self.current_torque

        next_torque = (self.target_torque * self.alpha) + (self.current_torque * (1 - self.alpha))
        self.current_torque = next_torque
        
        if abs(self.current_torque - self.target_torque) < 1e-7:
            self.current_torque = self.target_torque 
            self._is_effectively_done = True
        elif abs(self._total_change_for_threshold) > 1e-7:
            current_progress_fraction = abs(self.current_torque - self._initial_torque_for_threshold) / abs(self._total_change_for_threshold)
            if current_progress_fraction >= self._completion_threshold:
                self.current_torque = self.target_torque 
                self._is_effectively_done = True
        elif abs(self._total_change_for_threshold) <= 1e-7: 
             self._is_effectively_done = True
        return self.current_torque

class SigmoidRamp:
    """
    Manages Sigmoid (Smootherstep) torque ramping with dynamic duration.
    Duration is based on torque delta, clamped by min/max steps.
    Formula: s = 6x⁵ - 15x⁴ + 10x³
    """
    def __init__(self, initial_torque: float, target_torque: float, config: dict):
        self.config = { # Default config values
            "steps_per_unit_torque": 0.2,
            "min_duration_steps": 3, # S-curve needs at least a few steps
            "max_duration_steps": 50,
            **config # User provided config overrides defaults
        }
        if self.config["min_duration_steps"] < 1:
             raise ValueError("min_duration_steps must be at least 1 for Sigmoid.")
        if self.config["max_duration_steps"] < self.config["min_duration_steps"]:
             raise ValueError("max_duration_steps must be >= min_duration_steps.")


        self.initial_segment_torque = initial_torque 
        self.target_torque = target_torque
        self.current_torque = initial_torque
        
        self.current_segment_step = 0 
        self.current_segment_duration_steps = 0 # Will be calculated
        self.current_segment_total_change = 0 # Will be calculated
        
        self._recalculate_segment_parameters()


    def _recalculate_segment_parameters(self):
        """Recalculates parameters for the current S-curve segment, including dynamic duration."""
        self.current_segment_total_change = self.target_torque - self.initial_segment_torque
        abs_delta = abs(self.current_segment_total_change)

        if abs_delta < 1e-7: # Effectively no change needed
            self.current_segment_duration_steps = 0
            self.current_torque = self.target_torque # Snap to target
        else:
            calculated_duration = round(abs_delta * self.config["steps_per_unit_torque"])
            self.current_segment_duration_steps = max(self.config["min_duration_steps"], 
                                                      min(self.config["max_duration_steps"], calculated_duration))
        
        # If duration is 0 (either by calculation for zero delta, or if min_duration was 0 - though disallowed),
        # ensure current_torque is target.
        if self.current_segment_duration_steps == 0:
            self.current_torque = self.target_torque


    def set_target_torque(self, new_target_torque: float):
        """Updates the target torque. Starts a new S-curve segment from current torque with re-calculated dynamic duration."""
        if abs(self.target_torque - new_target_torque) > 1e-7:
            self.target_torque = new_target_torque
            self.initial_segment_torque = self.current_torque 
            self.current_segment_step = 0 
            self._recalculate_segment_parameters()


    def step(self) -> float:
        """Calculates and returns the next torque value for this cycle."""
        if self.current_segment_duration_steps == 0: 
            # This means target is already reached or delta was zero.
            # self.current_torque should already be self.target_torque from _recalculate_segment_parameters
            return self.current_torque

        if self.current_segment_step >= self.current_segment_duration_steps:
            # Segment complete, hold target torque of this segment
            self.current_torque = self.target_torque 
            return self.current_torque

        x = 0.0
        # Ensure current_segment_duration_steps is positive before division
        if self.current_segment_duration_steps > 0:
             x = self.current_segment_step / self.current_segment_duration_steps
        
        s = 0.0
        if x <= 0: s = 0.0
        elif x >= 1.0: s = 1.0 
        else:
            x_3 = x * x * x 
            x_4 = x_3 * x   
            x_5 = x_4 * x   
            s = 6 * x_5 - 15 * x_4 + 10 * x_3
        
        self.current_torque = self.initial_segment_torque + self.current_segment_total_change * s
        
        # If this calculation makes it the last step of the segment, ensure it's precisely the target
        if self.current_segment_step == self.current_segment_duration_steps -1 : # This is the step that calculates for x=(D-1)/D
            # The next increment of current_segment_step will make it equal to duration
            pass

        self.current_segment_step += 1 

        # After incrementing, if current_segment_step IS NOW duration_steps, it means the s-curve for x=1 has been effectively applied.
        # Or, if x was already >= 1.0, snap to target.
        if self.current_segment_step >= self.current_segment_duration_steps or x >= 1.0:
             self.current_torque = self.target_torque

        return self.current_torque

class LinearRamp:
    """
    Manages Linear torque ramping, calculating one step at a time.
    Can handle dynamic target torque changes.
    """
    def __init__(self, initial_torque: float, target_torque: float, step_per_cycle: float = 1.0):
        if step_per_cycle <= 0:
            raise ValueError("step_per_cycle must be positive.")

        self.current_torque = initial_torque
        self.target_torque = target_torque
        self.step_per_cycle_magnitude = step_per_cycle 
        
        self._actual_step_value = 0
        self._is_done = False
        self._update_step_direction_and_done_status()

    def _update_step_direction_and_done_status(self):
        """Updates step direction based on current and target torque, and done status."""
        if abs(self.current_torque - self.target_torque) < 1e-7:
            self._actual_step_value = 0
            self._is_done = True
        else:
            self._actual_step_value = self.step_per_cycle_magnitude if self.target_torque > self.current_torque else -self.step_per_cycle_magnitude
            self._is_done = False

    def set_target_torque(self, new_target_torque: float):
        """Updates the target torque and re-evaluates step direction and done status."""
        if abs(self.target_torque - new_target_torque) > 1e-7:
            self.target_torque = new_target_torque
            self._update_step_direction_and_done_status()

    def step(self) -> float:
        """Calculates and returns the next torque value for this cycle."""
        if self._is_done or self._actual_step_value == 0:
            return self.current_torque

        next_torque_candidate = self.current_torque + self._actual_step_value

        if (self._actual_step_value > 0 and next_torque_candidate >= self.target_torque) or \
           (self._actual_step_value < 0 and next_torque_candidate <= self.target_torque):
            self.current_torque = self.target_torque 
            self._is_done = True 
            self._actual_step_value = 0 
        else:
            self.current_torque = next_torque_candidate
        
        return self.current_torque


def plot_ramps(ramp_data: list[dict], title: str, hz: float = 100.0, events: list[dict] = None):
    """
    Plots multiple torque ramps on the same graph, with a secondary x-axis for time in seconds.
    Can also plot vertical lines for events like target changes.
    """
    fig, ax1 = plt.subplots(figsize=(14, 8)) 
    max_len = 0 
    for ramp_info in ramp_data:
        num_points = len(ramp_info['values'])
        if num_points > max_len: max_len = num_points
        steps_values = range(num_points) 
        ax1.plot(steps_values, ramp_info['values'], 
                 label=ramp_info['label'], 
                 linestyle=ramp_info.get('style', '-'), 
                 marker=ramp_info.get('marker', None),
                 markersize=ramp_info.get('markersize', 4)) 

    if events:
        for event in events:
            ax1.axvline(x=event['cycle'], color=event.get('color', 'r'), linestyle=event.get('linestyle', '--'), label=event['label'])


    ax1.set_title(title, fontsize=16)
    ax1.set_xlabel(f"Simulation Cycles (Steps @ {hz}Hz)", fontsize=12)
    ax1.set_ylabel("Torque", fontsize=12)
    # Filter out event labels from main legend if they were added by axvline
    handles, labels = ax1.get_legend_handles_labels()
    filtered_handles_labels = [(h, l) for h, l in zip(handles, labels) if not any(event.get('label') == l for event in (events or []))]
    if filtered_handles_labels: # Only show legend if there are actual lines
        ax1.legend([h for h,l in filtered_handles_labels], [l for h,l in filtered_handles_labels], loc='best')
    elif events : # if only events have labels, create a legend for them
        event_handles = [plt.Line2D([0], [0], color=event.get('color', 'r'), linestyle=event.get('linestyle', '--')) for event in events if event.get('label')]
        event_labels = [event['label'] for event in events if event.get('label')]
        if event_handles:
            ax1.legend(event_handles, event_labels, loc='best')


    ax1.grid(True, which='both', linestyle='--', linewidth=0.5) 
    ax2 = ax1.twiny() 
    ax1_xlim = ax1.get_xlim()
    if ax1_xlim[1] > ax1_xlim[0] and hz > 0:
        ax2.set_xlim(ax1_xlim[0] / hz, ax1_xlim[1] / hz)
    else: 
        ax2.set_xlim(ax1_xlim[0], ax1_xlim[1])
    ax2.set_xlabel("Time (seconds)", fontsize=12)
    ax2.xaxis.set_major_formatter(ticker.FormatStrFormatter('%.4f'))
    if max_len > 0:
        xlim_max_steps = max_len -1 
        padding = int(xlim_max_steps * 0.05) if xlim_max_steps > 20 else 1
        ax1.set_xlim(-0.5, xlim_max_steps + padding + 0.5) 
    else: 
        ax1.set_xlim(-0.5, 1.5)
    new_ax1_xlim = ax1.get_xlim()
    if new_ax1_xlim[1] > new_ax1_xlim[0] and hz > 0:
         ax2.set_xlim(new_ax1_xlim[0] / hz, new_ax1_xlim[1] / hz)
    else:
         ax2.set_xlim(new_ax1_xlim[0], new_ax1_xlim[1])
    fig.tight_layout() 
    plt.show()

# --- Example Usage ---
if __name__ == "__main__":
    controller_hz = 100.0 
    simulation_cycles = 200 
    change_target_cycle = 100 

    initial_tq_up = 50.0
    target_tq_up_initial = 200.0
    target_tq_up_new = 120.0 

    initial_tq_down = 200.0
    target_tq_down_initial = 25.0
    target_tq_down_new = 180.0 
    
    linear_step_size = 1.0 

    alphas_config = {
        #"Slowest (α=0.05)": 0.05, 
        "Slower (α=0.07)": 0.07,
        "Slow (α=0.1)": 0.1, 
        #"Medium (α=0.2)": 0.2, 
        #"Fast (α=0.5)": 0.5
    }
    
    # Configuration for dynamic sigmoid duration
    sigmoid_dynamic_configs = { 
        #"Sigmoid Responsive (0.1 steps/unit, min 3, max 30)": {
        #    "steps_per_unit_torque": 0.1, "min_duration_steps": 3, "max_duration_steps": 30
        #},
        #"Sigmoid Smooth (0.25 steps/unit, min 5, max 50)": {
        #    "steps_per_unit_torque": 0.25, "min_duration_steps": 5, "max_duration_steps": 50
        #},
        #"Sigmoid Smooth (0.25 steps/unit, min 5, max 100)": {
        #    "steps_per_unit_torque": 0.25, "min_duration_steps": 5, "max_duration_steps": 100
        #},
        #"Sigmoid Smooth (0.5 steps/unit, min 25, max 100)": {
        #    "steps_per_unit_torque": 0.5, "min_duration_steps": 25, "max_duration_steps": 100
        #},
        #"Sigmoid Smooth (0.5 steps/unit, min 40, max 75)": {
        #    "steps_per_unit_torque": 0.5, "min_duration_steps": 40, "max_duration_steps": 75
        #},
        #"Sigmoid Smooth (0.5 steps/unit, min 5, max 75)": {
        #    "steps_per_unit_torque": 0.5, "min_duration_steps": 5, "max_duration_steps": 75
        #},
        #"Sigmoid Smooth (0.1 steps/unit, min 40, max 75)": {
        #    "steps_per_unit_torque": 0.1, "min_duration_steps": 40, "max_duration_steps": 75
        #},
        #"Sigmoid Smooth (0.7 steps/unit, min 5, max 200)": {
        #    "steps_per_unit_torque": 0.7, "min_duration_steps": 5, "max_duration_steps": 200
        #},
        "Sigmoid Smooth (0.7 steps/unit, min 5, max 75)": {
            "steps_per_unit_torque": 0.7, "min_duration_steps": 5, "max_duration_steps": 75
        }
    }

    all_rampers_up = []
    all_rampers_down = []

    # --- Ramp Up Simulation ---
    print(f"\n--- Simulating Ramp Up ({initial_tq_up} to {target_tq_up_initial}, then to {target_tq_up_new}) for {simulation_cycles} cycles ---")
    ramp_up_plot_data = []

    linear_ramper_up = LinearRamp(initial_tq_up, target_tq_up_initial, linear_step_size)
    all_rampers_up.append(linear_ramper_up)
    linear_ramp_up_values = [initial_tq_up] 
    
    ema_rampers_up_dict = {}
    for label_suffix, alpha_val in alphas_config.items():
        ramper = EmaRamp(initial_tq_up, target_tq_up_initial, alpha_val)
        ema_rampers_up_dict[label_suffix] = {"ramper": ramper, "values": [initial_tq_up]}
        all_rampers_up.append(ramper)

    sigmoid_rampers_up_dict = {}
    for label_suffix, sig_config in sigmoid_dynamic_configs.items(): # Use new sigmoid_dynamic_configs
        ramper = SigmoidRamp(initial_tq_up, target_tq_up_initial, config=sig_config)
        sigmoid_rampers_up_dict[label_suffix] = {"ramper": ramper, "values": [initial_tq_up]}
        all_rampers_up.append(ramper)

    for cycle in range(simulation_cycles):
        if cycle == change_target_cycle:
            print(f"Cycle {cycle}: Changing ramp UP target to {target_tq_up_new}")
            for ramper in all_rampers_up:
                ramper.set_target_torque(target_tq_up_new)
        
        linear_ramp_up_values.append(linear_ramper_up.step())
        for config in ema_rampers_up_dict.values():
            config["values"].append(config["ramper"].step())
        for config_label, config_data in sigmoid_rampers_up_dict.items(): # Iterate through new dict structure
            config_data["values"].append(config_data["ramper"].step())

    ramp_up_plot_data.append({"label": f"Linear (+{linear_step_size}/step)", "values": linear_ramp_up_values, "style": ":", "marker":"."})
    print(f"Linear Ramp Up after {simulation_cycles} cycles: final torque {linear_ramp_up_values[-1]:.1f}")
    for label_suffix, config in ema_rampers_up_dict.items():
        ramp_up_plot_data.append({"label": f"EMA {label_suffix}", "values": config["values"]})
        print(f"EMA {label_suffix} after {simulation_cycles} cycles: final torque {config['values'][-1]:.1f}")
    for label_suffix, config_data in sigmoid_rampers_up_dict.items(): # Use new dict structure
        ramp_up_plot_data.append({"label": label_suffix, "values": config_data["values"], "style": "--"})
        print(f"{label_suffix} after {simulation_cycles} cycles: final torque {config_data['values'][-1]:.1f}")
    
    plot_events_up = [{'cycle': change_target_cycle, 'label': f'Target -> {target_tq_up_new:.0f}'}]
    plot_ramps(ramp_up_plot_data, f"Torque Ramp Up with Dynamic Target (Simulated for {simulation_cycles} cycles)", hz=controller_hz, events=plot_events_up)


    # --- Ramp Down Simulation ---
    print(f"\n--- Simulating Ramp Down ({initial_tq_down} to {target_tq_down_initial}, then to {target_tq_down_new}) for {simulation_cycles} cycles ---")
    ramp_down_plot_data = []

    linear_ramper_down = LinearRamp(initial_tq_down, target_tq_down_initial, linear_step_size)
    all_rampers_down.append(linear_ramper_down)
    linear_ramp_down_values = [initial_tq_down]

    ema_rampers_down_dict = {}
    for label_suffix, alpha_val in alphas_config.items():
        ramper = EmaRamp(initial_tq_down, target_tq_down_initial, alpha_val)
        ema_rampers_down_dict[label_suffix] = {"ramper": ramper, "values": [initial_tq_down]}
        all_rampers_down.append(ramper)

    sigmoid_rampers_down_dict = {}
    for label_suffix, sig_config in sigmoid_dynamic_configs.items(): # Use new sigmoid_dynamic_configs
        ramper = SigmoidRamp(initial_tq_down, target_tq_down_initial, config=sig_config)
        sigmoid_rampers_down_dict[label_suffix] = {"ramper": ramper, "values": [initial_tq_down]}
        all_rampers_down.append(ramper)

    for cycle in range(simulation_cycles):
        if cycle == change_target_cycle:
            print(f"Cycle {cycle}: Changing ramp DOWN target to {target_tq_down_new}")
            for ramper in all_rampers_down:
                ramper.set_target_torque(target_tq_down_new)

        linear_ramp_down_values.append(linear_ramper_down.step())
        for config in ema_rampers_down_dict.values():
            config["values"].append(config["ramper"].step())
        for config_label, config_data in sigmoid_rampers_down_dict.items(): # Iterate through new dict structure
            config_data["values"].append(config_data["ramper"].step())

    ramp_down_plot_data.append({"label": f"Linear (-{linear_step_size}/step)", "values": linear_ramp_down_values, "style": ":", "marker":"."})
    print(f"Linear Ramp Down after {simulation_cycles} cycles: final torque {linear_ramp_down_values[-1]:.1f}")
    for label_suffix, config in ema_rampers_down_dict.items():
        ramp_down_plot_data.append({"label": f"EMA {label_suffix}", "values": config["values"]})
        print(f"EMA {label_suffix} after {simulation_cycles} cycles: final torque {config['values'][-1]:.1f}")
    for label_suffix, config_data in sigmoid_rampers_down_dict.items(): # Use new dict structure
        ramp_down_plot_data.append({"label": label_suffix, "values": config_data["values"], "style": "--"})
        print(f"{label_suffix} after {simulation_cycles} cycles: final torque {config_data['values'][-1]:.1f}")

    plot_events_down = [{'cycle': change_target_cycle, 'label': f'Target -> {target_tq_down_new:.0f}'}]
    plot_ramps(ramp_down_plot_data, f"Torque Ramp Down with Dynamic Target (Simulated for {simulation_cycles} cycles)", hz=controller_hz, events=plot_events_down)

    print("\nPlotting complete. Close plot windows to exit.")


In [None]:
# live_sigmoid_torque_ramp.py
import math

class LiveSigmoidTorqueRamp:
    """
    Manages Sigmoid (Smootherstep) torque ramping for a live environment,
    calculating one step at a time with dynamic duration based on configuration.

    The S-curve profile is based on the Smootherstep function: s = 6x⁵ - 15x⁴ + 10x³
    The duration of each ramp segment is dynamically calculated based on the
    magnitude of the torque change, clamped by min_duration_steps and max_duration_steps.
    """

    def __init__(self, initial_torque: float, config: dict = None):
        """
        Initializes the LiveSigmoidTorqueRamp.

        Args:
            initial_torque (float): The starting torque value.
            config (dict, optional): Configuration for the sigmoid ramp.
                Defaults to:
                {
                    "steps_per_unit_torque": 0.7,
                    "min_duration_steps": 5,
                    "max_duration_steps": 75
                }
        """
        default_config = {
            "steps_per_unit_torque": 0.7,
            "min_duration_steps": 5,
            "max_duration_steps": 75
        }
        self.config = {**default_config, **(config or {})} # User config overrides defaults

        if self.config["min_duration_steps"] < 1:
            raise ValueError("min_duration_steps must be at least 1 for Sigmoid.")
        if self.config["max_duration_steps"] < self.config["min_duration_steps"]:
            raise ValueError("max_duration_steps must be >= min_duration_steps.")

        self.current_torque = initial_torque
        self.target_torque = initial_torque # Initially, target is the current torque

        # Parameters for the current S-curve segment
        self.initial_segment_torque = initial_torque
        self.current_segment_step = 0
        self.current_segment_duration_steps = 0
        self.current_segment_total_change = 0
        
        # Initialize segment parameters based on initial state (no change)
        self._recalculate_segment_parameters()

    def _recalculate_segment_parameters(self):
        """
        Recalculates parameters for the current S-curve segment,
        including its dynamic duration based on the torque delta.
        This is called when the target torque changes or at initialization.
        """
        self.current_segment_total_change = self.target_torque - self.initial_segment_torque
        abs_delta = abs(self.current_segment_total_change)

        if abs_delta < 1e-7:  # Effectively no change needed for this segment
            self.current_segment_duration_steps = 0
            # self.current_torque is already self.initial_segment_torque.
            # If initial_segment_torque is already target_torque, then current_torque is target.
            # This ensures if a new target is set that is the same as current_torque, duration is 0.
            if abs(self.current_torque - self.target_torque) > 1e-7 : # If current is not yet target
                 self.current_torque = self.target_torque # Snap to target if delta is zero
        else:
            calculated_duration = round(abs_delta * self.config["steps_per_unit_torque"])
            self.current_segment_duration_steps = max(
                self.config["min_duration_steps"],
                min(self.config["max_duration_steps"], calculated_duration)
            )
        
        # If, after calculation, duration is 0, it means we should be at the target.
        # This can happen if abs_delta was very small leading to calculated_duration < min_duration_steps,
        # and min_duration_steps was then clamped to 0 (if min_duration_steps was allowed to be 0, which it isn't by the check).
        # More relevantly, if abs_delta was ~0.
        if self.current_segment_duration_steps == 0:
            self.current_torque = self.target_torque


    def set_target_torque(self, new_target_torque: float):
        """
        Updates the target torque. This will start a new S-curve ramp segment
        from the current actual torque to the new target torque.
        The duration of this new segment is dynamically calculated.

        Args:
            new_target_torque (float): The new desired target torque.
        """
        # Check if the new target is meaningfully different from the current target
        if abs(self.target_torque - new_target_torque) > 1e-7:
            self.target_torque = new_target_torque
            # The new S-curve segment starts from the current actual torque
            self.initial_segment_torque = self.current_torque 
            self.current_segment_step = 0  # Reset step count for the new segment
            self._recalculate_segment_parameters()
        # If new target is same as old target, do nothing, continue current ramp.


    def step(self) -> float:
        """
        Calculates and returns the torque value for the current control cycle.
        This method should be called once per cycle in your live environment.

        Returns:
            float: The calculated torque for the current cycle.
        """
        # If the duration for the current segment is 0, it means:
        #   a) The initial torque and target torque for this segment were the same.
        #   b) Or, the calculated duration (based on delta and config) was clamped to 0.
        # In such cases, current_torque should already be at the target_torque of this segment.
        if self.current_segment_duration_steps == 0:
            # self.current_torque should have been set to self.target_torque
            # by _recalculate_segment_parameters when duration became 0.
            return self.current_torque

        # If the number of steps taken in the current segment has reached or exceeded its planned duration
        if self.current_segment_step >= self.current_segment_duration_steps:
            # The S-curve segment is complete. Hold the target torque of this segment.
            # Ensure current_torque is precisely the target_torque for this completed segment.
            self.current_torque = self.target_torque 
            return self.current_torque

        # Calculate the S-curve factor 's' (normalized progress from 0 to 1)
        # x is the normalized time within the current segment's duration
        x = 0.0
        if self.current_segment_duration_steps > 0: # Avoid division by zero
            x = self.current_segment_step / self.current_segment_duration_steps
        
        s = 0.0
        if x <= 0:  # At or before the start of the segment
            s = 0.0
        elif x >= 1.0:  # At or after the end of the segment
            s = 1.0
        else:  # Within the segment, apply Smootherstep
            x_3 = x * x * x
            x_4 = x_3 * x
            x_5 = x_4 * x
            s = 6 * x_5 - 15 * x_4 + 10 * x_3
        
        # Calculate the torque based on the S-curve factor
        self.current_torque = self.initial_segment_torque + self.current_segment_total_change * s
        
        # Increment the step count for the current segment
        self.current_segment_step += 1

        # If this increment just completed the duration, ensure torque is exactly target
        if self.current_segment_step >= self.current_segment_duration_steps:
            self.current_torque = self.target_torque

        return self.current_torque

# --- Example Usage (for testing the live class) ---
if __name__ == "__main__":
    # User-specified configuration
    live_config = {
        "steps_per_unit_torque": 0.7,
        "min_duration_steps": 5,
        "max_duration_steps": 75
    }

    # Initialize the ramp controller
    initial_car_torque = 0.0
    torque_controller = LiveSigmoidTorqueRamp(initial_torque=initial_car_torque, config=live_config)

    print(f"Initial Torque: {torque_controller.current_torque:.2f}")

    # Simulate some controller cycles
    simulation_total_cycles = 150
    target_change_cycle_1 = 10
    new_target_1 = 100.0
    target_change_cycle_2 = 70
    new_target_2 = 30.0
    target_change_cycle_3 = 120
    new_target_3 = 50.0


    print(f"\nSimulating {simulation_total_cycles} cycles at 100Hz (example):")
    print("Cycle | Target | Current Segment Duration | Segment Step | Output Torque")
    print("------|--------|--------------------------|--------------|--------------")

    # Store values for plotting if desired (basic plot example)
    output_torques = [torque_controller.current_torque]
    cycle_numbers = [0]

    for cycle in range(1, simulation_total_cycles + 1):
        # Potentially change target at specific cycles
        if cycle == target_change_cycle_1:
            print(f"----- Cycle {cycle}: Setting new target to {new_target_1:.2f} -----")
            torque_controller.set_target_torque(new_target_1)
        elif cycle == target_change_cycle_2:
            print(f"----- Cycle {cycle}: Setting new target to {new_target_2:.2f} -----")
            torque_controller.set_target_torque(new_target_2)
        elif cycle == target_change_cycle_3:
            print(f"----- Cycle {cycle}: Setting new target to {new_target_3:.2f} -----")
            torque_controller.set_target_torque(new_target_3)

        # Get the torque for the current cycle
        current_cycle_torque = torque_controller.step()
        output_torques.append(current_cycle_torque)
        cycle_numbers.append(cycle)

        if cycle <= 15 or cycle >= simulation_total_cycles - 5 or \
           cycle == target_change_cycle_1 or cycle == target_change_cycle_1 + 1 or \
           cycle == target_change_cycle_2 or cycle == target_change_cycle_2 + 1 or \
           cycle == target_change_cycle_3 or cycle == target_change_cycle_3 + 1:
            print(f"{cycle:<5} | {torque_controller.target_torque:<6.1f} | "
                  f"{torque_controller.current_segment_duration_steps:<24} | "
                  f"{torque_controller.current_segment_step:<12} | "
                  f"{current_cycle_torque:<13.2f}")

    print(f"\nFinal Torque after {simulation_total_cycles} cycles: {torque_controller.current_torque:.2f}")

    # Basic plotting example (requires matplotlib)
    try:
        import matplotlib.pyplot as plt
        plt.figure(figsize=(12, 6))
        plt.plot(cycle_numbers, output_torques, marker='.', linestyle='-')
        plt.title("Live Sigmoid Torque Ramp Simulation")
        plt.xlabel("Simulation Cycle")
        plt.ylabel("Torque")
        
        # Add vertical lines for target changes
        if target_change_cycle_1 < simulation_total_cycles:
            plt.axvline(x=target_change_cycle_1, color='r', linestyle='--', label=f'Target -> {new_target_1:.0f}')
        if target_change_cycle_2 < simulation_total_cycles:
            plt.axvline(x=target_change_cycle_2, color='g', linestyle='--', label=f'Target -> {new_target_2:.0f}')
        if target_change_cycle_3 < simulation_total_cycles:
            plt.axvline(x=target_change_cycle_3, color='purple', linestyle='--', label=f'Target -> {new_target_3:.0f}')
        
        plt.legend()
        plt.grid(True)
        plt.tight_layout()
        plt.show()
    except ImportError:
        print("\nMatplotlib not installed. Skipping plot.")
