In [1]:
import mujoco
import numpy as np
import matplotlib.pyplot as plt
import os
import csv
from scipy.optimize import minimize
import time
import sys
from tqdm import tqdm


In [2]:
# ==========================================
# 1. Class Definitions
# ==========================================
class CompliantTendonParams:
    def __init__(self, F_max, l_opt, l_slack, v_max, W, C, N, K, E_REF):
        self.F_max = F_max
        self.l_opt = l_opt
        self.l_slack = l_slack
        self.v_max = v_max
        self.W = W
        self.C = C
        self.N = N
        self.K = K
        self.E_REF = E_REF
    
    def get_prm_str(self):
        return f"{self.F_max} {self.l_opt} {self.l_slack} {self.v_max} {self.W} {self.C} {self.N} {self.K} {self.E_REF}"


In [3]:
# ==========================================
# 2. MuJoCo Model Generation
# ==========================================
def create_model(cp_params: CompliantTendonParams):
    xml_string = f"""
    <mujoco model="fitting_scene">
    <option timestep="0.002" integrator="Euler"/>

    <default>
        <default class="compliant_muscle">
        <general biasprm="0" biastype="none" ctrllimited="true" ctrlrange="0 1" 
                dynprm="0.01 0.04" dyntype="muscle" 
                gainprm="{cp_params.get_prm_str()}"
                gaintype="compliant_mtu"/>
        </default>
    </default>

    <worldbody>
        <body name="ground"/>
        <site name="anchor" pos="0 0 0" size="0.01" rgba="1 0 0 1"/>

        <body name="load" pos="0 0 0">
            <joint name="slide" type="slide" axis="0 0 1" limited="false" damping="100"/> 
            <site name="insertion" pos="0 0 0" size="0.01" rgba="0 1 0 1"/>
            <geom type="sphere" size="0.05" mass="1.0"/>
        </body>
    </worldbody>

    <tendon>
        <spatial name="tendon">
            <site site="anchor"/>
            <site site="insertion"/>
        </spatial>
    </tendon>

    <actuator>
        <general class="compliant_muscle" name="muscle" tendon="tendon"/>
    </actuator>

    <sensor>
        <actuatorfrc name="force_sensor" actuator="muscle"/>
        <tendonpos name="length_sensor" tendon="tendon"/>
    </sensor>
    </mujoco>
    """
    return mujoco.MjModel.from_xml_string(xml_string)


In [4]:
# ==========================================
# 3. Simulation Function
# ==========================================
def compute_forces_at_velocity(model, data, velocity, lengths, activation=1.0):
    """
    Compute forces for a specific velocity across multiple lengths.
    Uses Kinematic Drive (mj_forward) to efficiently calculate steady-state forces.
    
    Args:
        model: MuJoCo model
        data: MuJoCo data (reused)
        velocity: Physical velocity (m/s)
        lengths: List/Array of physical lengths (m)
        activation: Muscle activation (0.0-1.0)
        
    Returns:
        np.array of forces corresponding to 'lengths'
    """
    forces = []
    
    # Set state that is constant for this velocity loop
    data.qvel[0] = velocity
    data.act[0] = activation
    data.ctrl[0] = activation
    
    for length in lengths:
        # Update length (qpos)
        # Ensure non-negative length if physical constraints require
        if length < 0.001: length = 0.001
        data.qpos[0] = length
        
        # Compute dynamics
        # mj_forward computes position-dependent forces (active+passive) given qpos, qvel, act
        mujoco.mj_forward(model, data)
        
        # Record force
        forces.append(data.sensor("force_sensor").data[0])
        
    return np.array(forces)


In [5]:
# ==========================================
# 4. Data Loading & Preprocessing
# ==========================================
def load_opensim_data(muscle_name, params_csv, data_dir):
    p = None
    with open(params_csv, 'r') as f:
        reader = csv.DictReader(f)
        for row in reader:
            if row['muscle'] == muscle_name:
                p = row
                break
    
    if p is None:
        raise ValueError(f"Muscle {muscle_name} not found in parameters.")
    
    f_max = float(p['max_isometric_force'])
    l_opt = float(p['optimal_fiber_length'])
    l_slack = float(p['tendon_slack_length'])
    v_max = float(p['max_contraction_velocity'])
    
    csv_path = os.path.join(data_dir, f"{muscle_name}_force.csv")
    if not os.path.exists(csv_path):
        raise FileNotFoundError(f"Data file {csv_path} not found.")
    
    with open(csv_path, 'r') as f:
        reader = csv.reader(f)
        rows = list(reader)
    
    norm_velocities = np.array([float(v) for v in rows[0][1:]])
    data_matrix = []
    norm_lengths = []
    
    for r in rows[1:]:
        norm_lengths.append(float(r[0]))
        data_matrix.append([float(x) for x in r[1:]])
        
    norm_lengths = np.array(norm_lengths)
    data_matrix = np.array(data_matrix)
    
    return {
        "f_max": f_max,
        "l_opt": l_opt,
        "l_slack": l_slack,
        "v_max": v_max,
        "norm_lengths": norm_lengths,
        "norm_velocities": norm_velocities,
        "force_matrix": data_matrix
    }


In [None]:
# ==========================================
# 5. Fitting Logic (All Parameters)
# ==========================================
# Global variables for progress tracking
_obj_iter_count = 0
_obj_start_time = None
_obj_verbose = 1

def objective_function(x, target_data, verbose=1):
    global _obj_iter_count, _obj_start_time, _obj_verbose
    
    iter_start_time = time.time()
    
    # Unpack 9 parameters: F_max, l_opt, l_slack, v_max, W, C, N, K, E_REF
    F_max, l_opt, l_slack, v_max, W, C, N, K, E_REF = x

    ref_f_max = target_data['f_max']
    norm_lengths = target_data['norm_lengths']
    norm_velocities = target_data['norm_velocities']
    force_matrix = target_data['force_matrix']
    
    _obj_iter_count += 1
    if _obj_start_time is None:
        _obj_start_time = time.time()
    
    total_elapsed = time.time() - _obj_start_time
    
    if verbose >= 1:
        print(f"\n[Objective #{_obj_iter_count}] Starting evaluation...")
        print(f"  Params: F_max={F_max:.2f}, l_opt={l_opt:.4f}, l_slack={l_slack:.4f}, v_max={v_max:.2f}")
        print(f"          W={W:.3f}, C={C:.3f}, N={N:.3f}, K={K:.3f}, E_REF={E_REF:.4f}")
        print(f"  Total elapsed: {total_elapsed:.1f}s")
    
    if verbose >= 2:
        print(f"  Creating MuJoCo model...")
    
    cp_params = CompliantTendonParams(
        F_max=F_max,
        l_opt=l_opt,
        l_slack=l_slack,
        v_max=v_max,
        W=W, C=C, N=N, K=K, E_REF=E_REF
    )
    
    model = create_model(cp_params)
    data = mujoco.MjData(model) # Create data once per objective call
    
    model_time = time.time() - iter_start_time
    
    if verbose >= 2:
        print(f"  Model created in {model_time:.3f}s")
    
    total_error = 0
    count = 0
    
    # Sampling stride to speed up optimization
    l_stride = max(1, len(norm_lengths) // 5)
    v_stride = max(1, len(norm_velocities) // 5)
    
    l_indices = list(range(0, len(norm_lengths), l_stride))
    v_indices = list(range(0, len(norm_velocities), v_stride))
    total_points = len(l_indices) * len(v_indices)
    
    if verbose >= 1:
        print(f"  Evaluating {total_points} points (stride: L={l_stride}, V={v_stride})...")
    
    sim_start_time = time.time()
    
    # Convert target lengths to physical units for this iteration
    # We do this once to pass array to simulation function
    target_l_indices = l_indices
    target_l_norms = norm_lengths[target_l_indices]
    target_l_phys = target_l_norms * l_opt + l_slack
    
    processed_v = 0
    
    # Iterate over VELOCITIES (Outer Loop)
    for j in v_indices:
        v_norm = norm_velocities[j]
        v_phy = v_norm * v_max
        
        # Target forces for this velocity profile across selected lengths
        f_target_profile = force_matrix[target_l_indices, j]
        
        # Compute simulated forces for this velocity across all target lengths
        # using the efficient kinematic batch function
        f_sim_profile = compute_forces_at_velocity(model, data, v_phy, target_l_phys, activation=1.0)
        
        # Accumulate Error
        errs = (f_sim_profile - f_target_profile) / ref_f_max
        total_error += np.sum(errs**2)
        count += len(f_sim_profile)
        
        processed_v += 1
        if verbose >= 2 and processed_v % max(1, len(v_indices) // 5) == 0:
             print(f"    Velocity Progress: {processed_v}/{len(v_indices)} (v_norm={v_norm:.2f})")

    sim_time = time.time() - sim_start_time
    mse = total_error / count
    iter_time = time.time() - iter_start_time
    
    if verbose >= 1:
        print(f"  Simulation completed: {sim_time:.2f}s")
        print(f"  MSE: {mse:.6f}")
        print(f"  Total iteration time: {iter_time:.2f}s")
        if _obj_iter_count > 1:
            avg_iter_time = total_elapsed / _obj_iter_count
            print(f"  Average iteration time: {avg_iter_time:.2f}s")
    
    return mse


In [None]:
# ==========================================
# 6. Plotting Function
# ==========================================
def plot_results(best_params, target_data, muscle_name):
    print("\n[Plotting] Generating plots...")
    F_max, l_opt, l_slack, v_max, W, C, N, K, E_REF = best_params
    
    cp_params = CompliantTendonParams(
        F_max=F_max, l_opt=l_opt, l_slack=l_slack, v_max=v_max,
        W=W, C=C, N=N, K=K, E_REF=E_REF
    )
    model = create_model(cp_params)
    data = mujoco.MjData(model) # Create data once for plotting

    activations = [0.0, 0.5, 1.0]
    norm_lengths = target_data['norm_lengths']
    norm_velocities = target_data['norm_velocities']
    
    # Prepare dense length array for smooth curves
    dense_L_norm = np.linspace(norm_lengths.min(), norm_lengths.max(), 30)
    dense_L_phy = dense_L_norm * l_opt + l_slack
    
    # --- Figure 1: Force-Length (lines = velocity) ---
    print("[Plotting] Creating Force-Length plots...")
    fig1, axes1 = plt.subplots(1, 3, figsize=(18, 5), sharey=True)
    fig1.suptitle(f"Force-Length Curve (Fitted: {muscle_name})", fontsize=14)
    
    sel_v_idxs = np.linspace(0, len(norm_velocities)-1, 5, dtype=int)
    sel_vels = norm_velocities[sel_v_idxs]
    colors = plt.cm.viridis(np.linspace(0, 1, len(sel_vels)))
    
    for i, act in enumerate(activations):
        ax = axes1[i]
        ax.set_title(f"Activation = {act}")
        ax.set_xlabel("Length (normalized)")
        if i==0: ax.set_ylabel("Force / F_max")
        
        for v_idx, v_val in enumerate(sel_vels):
            c = colors[v_idx]
            v_phy = v_val * v_max
            
            # Batch compute for all dense lengths
            f_sim = compute_forces_at_velocity(model, data, v_phy, dense_L_phy, activation=act)
            
            ax.plot(dense_L_norm, f_sim/target_data['f_max'], color=c, 
                   label=f"Fit v={v_val:.2f}", linewidth=2, linestyle='-')
            
            # Original Data Points (only for act=1.0)
            if act == 1.0:
                 real_v_idx = sel_v_idxs[v_idx]
                 f_dat = target_data['force_matrix'][:, real_v_idx]
                 ax.scatter(norm_lengths, f_dat/target_data['f_max'], color=c, 
                          s=20, marker='x', alpha=0.7, label=f"Data v={v_val:.2f}")

        if i == 2: ax.legend(fontsize='x-small', loc='best', ncol=2)
        ax.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()

    # --- Figure 2: Force-Velocity (lines = length) ---
    print("[Plotting] Creating Force-Velocity plots...")
    fig2, axes2 = plt.subplots(1, 3, figsize=(18, 5), sharey=True)
    fig2.suptitle(f"Force-Velocity Curve (Fitted: {muscle_name})", fontsize=14)
    
    sel_l_idxs = np.linspace(0, len(norm_lengths)-1, 5, dtype=int)
    sel_lens = norm_lengths[sel_l_idxs]
    colors_l = plt.cm.plasma(np.linspace(0, 1, len(sel_lens)))
    
    dense_V_norm = np.linspace(norm_velocities.min(), norm_velocities.max(), 30)
    dense_V_phy = dense_V_norm * v_max

    for i, act in enumerate(activations):
        ax = axes2[i]
        ax.set_title(f"Activation = {act}")
        ax.set_xlabel("Velocity (normalized)")
        if i==0: ax.set_ylabel("Force / F_max")
        
        # This loop is tricky for optimization because we want lines of Length.
        # For plotting, we can afford to loop. 
        # To batch, we could iterate Lengths (Outer) and compute for all Vels (Inner).
        # But compute_forces_at_velocity assumes const V, var L.
        # So we iterate Lengths, and for each length, we compute across Vels.
        # But we don't have a "compute_forces_at_length" function yet.
        # We can just use single point calls or make a helper.
        # Given plotting is once, single point calls are okay, OR we adapt helper.
        
        # Let's just use the helper in a loop for clarity/consistency.
        for l_idx, l_val in enumerate(sel_lens):
            c = colors_l[l_idx]
            l_phy = l_val * l_opt + l_slack
            
            f_sim = []
            # Here we iterate velocities
            for v in tqdm(dense_V_phy, desc=f"Act={act}, L={l_val:.2f}", leave=False):
                # Reuse helper for single point (list of 1)
                f = compute_forces_at_velocity(model, data, v, [l_phy], activation=act)[0]
                f_sim.append(f)
                
            ax.plot(dense_V_norm, np.array(f_sim)/target_data['f_max'], color=c, 
                   label=f"Fit L={l_val:.2f}", linewidth=2, linestyle='-')
            
            # Original Data Points (only for act=1.0)
            if act == 1.0:
                 real_l_idx = sel_l_idxs[l_idx]
                 f_dat = target_data['force_matrix'][real_l_idx, :]
                 ax.scatter(norm_velocities, f_dat/target_data['f_max'], color=c, 
                          s=20, marker='x', alpha=0.7, label=f"Data L={l_val:.2f}")

        if i == 2: ax.legend(fontsize='x-small', loc='best', ncol=2)
        ax.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()
    print("[Plotting] Done!")


In [None]:
# ==========================================
# 7. Main Execution
# ==========================================
def callback_function(xk, state=None):
    """Callback function to show optimization progress"""
    global _obj_iter_count
    if _obj_iter_count > 0:
        print(f"\n[Callback] Optimization step completed")
        print(f"  Current params:")
        print(f"    F_max={xk[0]:.2f}, l_opt={xk[1]:.4f}, l_slack={xk[2]:.4f}, v_max={xk[3]:.2f}")
        print(f"    W={xk[4]:.4f}, C={xk[5]:.4f}, N={xk[6]:.4f}, K={xk[7]:.4f}, E_REF={xk[8]:.4f}")
    return False

def fit_muscle(muscle_name, verbose=1):
    """
    Fit muscle parameters
    
    Args:
        muscle_name: Name of the muscle to fit
        verbose: Verbosity level (0=quiet, 1=normal, 2=detailed, 3=debug)
    """
    global _obj_iter_count, _obj_start_time, _obj_verbose
    
    _obj_verbose = verbose
    _obj_iter_count = 0
    _obj_start_time = None
    
    print(f"=" * 60)
    print(f"Starting fit for {muscle_name}")
    print(f"=" * 60)
    
    param_csv = "osim_muscle_data_extracted/all_muscle_parameters.csv"
    data_dir = "osim_muscle_data_extracted" 
    
    try:
        print(f"[Loading] Reading data from {param_csv}...")
        target_data = load_opensim_data(muscle_name, param_csv, data_dir)
        print(f"[Loading] Data loaded successfully!")
        print(f"  - F_max: {target_data['f_max']:.2f}")
        print(f"  - l_opt: {target_data['l_opt']:.4f}")
        print(f"  - l_slack: {target_data['l_slack']:.4f}")
        print(f"  - v_max: {target_data['v_max']:.2f}")
        print(f"  - Data shape: {target_data['force_matrix'].shape}")
    except FileNotFoundError as e:
        print(f"[ERROR] File not found: {e}")
        return None
    except ValueError as e:
        print(f"[ERROR] Value error: {e}")
        return None
    except Exception as e:
        print(f"[ERROR] Unexpected error: {e}")
        import traceback
        traceback.print_exc()
        return None

    # Initial Guess: [F_max, l_opt, l_slack, v_max, W, C, N, K, E_REF]
    # All 9 parameters including F_max, l_opt, l_slack, v_max will be fitted
    base_F = target_data['f_max']
    base_L_opt = target_data['l_opt']
    base_L_slack = target_data['l_slack']
    base_V_max = target_data['v_max']
    
    x0 = [
        base_F, 
        base_L_opt, 
        base_L_slack, 
        base_V_max,
        0.56, -2.995732274, 1.5, 5.0, 0.04
    ]
    
    # Bounds: Allow physical params to vary
    bounds = [
        (base_F * 0.5, base_F * 1.5),       # F_max - fitting
        (base_L_opt * 0.8, base_L_opt * 1.2), # l_opt - fitting
        (base_L_slack * 0.8, base_L_slack * 1.2), # l_slack - fitting
        (10, 10),     # v_max - fitting
        (0.56, 0.56),         # W
        (-2.995732274, -2.995732274),      # C
        (1.5, 1.5),         # N
        (5.0, 5.0),       # K
        (0.01, 0.15)        # E_REF
    ]
    
    print(f"\n[Optimization] Starting optimization...")
    print(f"  - Optimizing 9 parameters: F_max, l_opt, l_slack, v_max, W, C, N, K, E_REF")
    print(f"  - Initial guess: {x0}")
    print(f"  - Max iterations: 50")
    print(f"  - Verbose level: {verbose}")
    
    _obj_iter_count = 0
    _obj_start_time = time.time()
    opt_start_time = time.time()
    
    try:
        # Wrapper function for objective with progress
        def obj_wrapper(x):
            return objective_function(x, target_data, verbose=verbose)
        
        res = minimize(
            obj_wrapper,
            x0,
            method='L-BFGS-B',
            bounds=bounds,
            options={'maxiter': 50},
            callback=callback_function
        )
        
        opt_time = time.time() - opt_start_time
        print(f"\n[Optimization] Finished in {opt_time:.2f}s")
        print(f"  - Status: {res.message}")
        print(f"  - Success: {res.success}")
        print(f"  - Final MSE: {res.fun:.6f}")
        print(f"  - Iterations: {res.nit}")
        print(f"\n[Optimization] Fitted Parameters:")
        print(f"  - F_max: {res.x[0]:.2f} (initial: {x0[0]:.2f})")
        print(f"  - l_opt: {res.x[1]:.4f} (initial: {x0[1]:.4f})")
        print(f"  - l_slack: {res.x[2]:.4f} (initial: {x0[2]:.4f})")
        print(f"  - v_max: {res.x[3]:.2f} (initial: {x0[3]:.2f})")
        print(f"  - W: {res.x[4]:.4f}")
        print(f"  - C: {res.x[5]:.4f}")
        print(f"  - N: {res.x[6]:.4f}")
        print(f"  - K: {res.x[7]:.4f}")
        print(f"  - E_REF: {res.x[8]:.4f}")
        
        if not res.success:
            print(f"\n[WARNING] Optimization did not converge successfully!")
            print(f"  - Message: {res.message}")
        
        print(f"\n[Plotting] Starting to generate plots...")
        sys.stdout.flush()
        
    except Exception as e:
        print(f"\n[ERROR] Optimization failed: {e}")
        import traceback
        traceback.print_exc()
        return None
    
    try:
        print(f"[Plotting] Calling plot_results with fitted parameters...")
        sys.stdout.flush()
        plot_results(res.x, target_data, muscle_name)
        print(f"[Plotting] Plots generated successfully!")
        sys.stdout.flush()
    except Exception as e:
        print(f"\n[ERROR] Plotting failed: {e}")
        import traceback
        traceback.print_exc()
        return res.x
    
    print(f"\n[Complete] Fitting finished successfully!")
    return res.x


: 

In [None]:
# Run fitting
# verbose: 0=quiet, 1=normal, 2=detailed, 3=debug
fit_muscle("gaslat_r", verbose=2)


Starting fit for gaslat_r
[Loading] Reading data from osim_muscle_data_extracted/all_muscle_parameters.csv...
[Loading] Data loaded successfully!
  - F_max: 1575.06
  - l_opt: 0.0690
  - l_slack: 0.3740
  - v_max: 10.00
  - Data shape: (50, 50)

[Optimization] Starting optimization...
  - Optimizing 9 parameters: F_max, l_opt, l_slack, v_max, W, C, N, K, E_REF
  - Initial guess: [1575.05901639344, 0.069, 0.374, 10.0, 0.56, -2.995732274, 1.5, 5.0, 0.04]
  - Max iterations: 50
  - Verbose level: 2

[Objective #1] Starting evaluation...
  Params: F_max=1575.06, l_opt=0.0690, l_slack=0.3740, v_max=10.00
          W=0.560, C=-2.996, N=1.500, K=5.000, E_REF=0.0400
  Total elapsed: 0.0s
  Creating MuJoCo model...
  Model created in 0.001s
  Evaluating 25 simulation points (stride: L=10, V=10)...
    Progress: 2/25 (8.0%), ETA: 0.0s, avg: 1.0ms/point
    Progress: 4/25 (16.0%), ETA: 0.0s, avg: 1.2ms/point
    Progress: 6/25 (24.0%), ETA: 0.0s, avg: 1.0ms/point
    Progress: 8/25 (32.0%), ETA: 