# Simulation with Human and Cable Forces

In [5]:
import gym
from gym import spaces
import numpy as np
import matplotlib
matplotlib.use('Qt5Agg')
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation

### Pendulum Environment

This custom PendulumEnv simulates a planar pendulum influenced by both human-applied torque and external cable forces. It is built using the OpenAI Gym framework.

In [6]:
import gym
import numpy as np
import matplotlib.pyplot as plt
from gym import spaces

class LinearEnv(gym.Env):
    """
    A 1D linear environment for a point mass that moves horizontally from x=0 to x=L.
    The goal is to follow a minimum-jerk trajectory (MJT) over a specified duration.
    
    State:
        [x, x_dot]
    
    Action:
        [F]  (the user/manipulated force)
    """
    metadata = {"render.modes": ["human"]}

    def __init__(self, length=0.3, duration=5.0):
        """
        Args:
            length (float): The total linear distance (meters) we want to move.
            duration (float): Duration (seconds) over which the MJT completes.
        """
        super(LinearEnv, self).__init__()

        # ---------- Physical & simulation parameters ----------
        self.m = 1.0       # mass (kg)
        self.b = 0.2       # damping coefficient (N·s/m)
        self.dt = 0.02     # time step (s)

        # ---------- Environment-specific parameters ----------
        self.L = length    # target final position
        self.T = duration  # time to complete MJT

        # ---------- Define Action & Observation Spaces ----------
        # Action: single value = external force (bounded here as an example)
        self.action_space = spaces.Box(low=-10.0, high=10.0, shape=(1,), dtype=np.float32)
        # Observation: [x, x_dot]
        # We'll allow x to span [-1, 2] just as a wide example (you can adjust)
        obs_high = np.array([2.0, np.finfo(np.float32).max], dtype=np.float32)
        self.observation_space = spaces.Box(low=-obs_high, high=obs_high, dtype=np.float32)

        # ---------- Internal State Variables ----------
        self.state = None         # [x, x_dot]
        self.time = 0.0           # track total simulation time
        self.cable_force = 0.0    # external force (if desired)
        self.last_action = 0.0    # last applied force

        # ---------- Data Recording ----------
        self.time_history = []
        self.human_force_history = []
        self.cable_force_history = []
        self.damping_force_history = []
        self.total_force_history = []

        # Which forces to plot (user can set this)
        # Options: ['human', 'cable', 'damping', 'total']
        self.forces_to_plot = ['human', 'cable']

        # ---------- Viewer/Rendering ----------
        self.viewer = None
        self.fig = None
        self.ax_main = None
        self.ax_force = None
        self.dot_line = None
        self.target_point = None
        self.force_arrow = None
        self.x_text = None
        self.force_text = None

    def set_forces_to_plot(self, force_list):
        """
        Set which force components to plot. 
        force_list should be chosen from ['human', 'cable', 'damping', 'total'].
        Example: env.set_forces_to_plot(['human', 'damping', 'total'])
        """
        valid_forces = {'human', 'cable', 'damping', 'total'}
        self.forces_to_plot = [f for f in force_list if f in valid_forces]

    def step(self, action):
        """
        Run one timestep of the simulation.
        
        action: [F] (the user-supplied force)
        """
        x, x_dot = self.state
        # Clip the force to the action space
        F_human = np.clip(action[0], self.action_space.low[0], self.action_space.high[0])

        # Equation of motion (no gravity):
        # m * x_ddot = F_human - b*x_dot + cable_force
        x_ddot = (F_human - self.b * x_dot + self.cable_force) / self.m

        # Update state using Euler integration
        x_dot = x_dot + x_ddot * self.dt
        x = x + x_dot * self.dt

        # (Optional) position constraints, e.g., x >= 0 or x <= L
        # Here we just allow free movement; you could clamp or bounce if desired
        # e.g.:
        # if x < 0:
        #    x = 0
        #    x_dot = 0

        # Update internal states
        self.state = np.array([x, x_dot], dtype=np.float32)
        self.time += self.dt
        self.last_action = F_human

        # Record data
        force_dict = self.calculate_forces()
        self.time_history.append(self.time)
        self.human_force_history.append(force_dict['human'])
        self.cable_force_history.append(force_dict['cable'])
        self.damping_force_history.append(force_dict['damping'])
        self.total_force_history.append(force_dict['total'])

        # Example: you could compute a reward measuring how close we are to the MJT reference
        x_ref = self.target_trajectory(self.time)
        reward = -abs(x - x_ref)  # negative distance from target

        # Decide if done
        done = (self.time >= self.T + 2.0)  # e.g., end after T+2 seconds

        return self.state, reward, done, {}

    def reset(self, initial_state=None):
        """
        Reset the environment to an initial state.
        If initial_state is provided, it should be [x, x_dot].
        Otherwise, start from zero.
        """
        if initial_state is not None:
            self.state = np.array(initial_state, dtype=np.float32)
        else:
            self.state = np.array([0.0, 0.0], dtype=np.float32)

        self.time = 0.0
        self.cable_force = 0.0
        self.last_action = 0.0

        # Clear stored data
        self.time_history = []
        self.human_force_history = []
        self.cable_force_history = []
        self.damping_force_history = []
        self.total_force_history = []

        return self.state

    def render(self, mode='human'):
        """
        Render the 1D motion in a Matplotlib figure.
        Plots the dot along the x-axis, plus a force subplot.
        """
        if self.viewer is None:
            plt.ion()
            # Create a figure with two subplots
            self.fig, (self.ax_main, self.ax_force) = plt.subplots(2, 1, figsize=(6, 8))
            self.fig.subplots_adjust(hspace=0.3)

            # Main subplot: the line and a dot
            self.dot_line, = self.ax_main.plot([], [], 'ro', markersize=8, label='Mass')
            self.target_point, = self.ax_main.plot([], [], 'bo', markersize=6, label='MJT Target')
            self.force_arrow = self.ax_main.arrow(0, 0, 0, 0, head_width=0.05, head_length=0.1,
                                                  fc='g', ec='g')

            self.ax_main.set_xlim(-0.1, self.L + 0.4)
            self.ax_main.set_ylim(-0.5, 0.5)
            self.ax_main.set_xlabel('X (m)')
            self.ax_main.set_ylabel('Position')
            self.ax_main.legend()
            self.ax_main.set_title('Linear Motion Simulation')
            self.viewer = True

        # Update main subplot
        x, x_dot = self.state
        x_ref = self.target_trajectory(self.time)

        self.dot_line.set_data(x, 0)
        self.target_point.set_data(x_ref, 0)

        # (Optional) show text about current x
        if self.x_text is None:
            self.x_text = self.ax_main.text(0.02, 0.9, '', transform=self.ax_main.transAxes)
        self.x_text.set_text(f"x = {x:.3f} m")

        # Update the force arrow length to represent the last action
        self.force_arrow.remove()
        arrow_scale = 0.02  # scale the arrow length
        self.force_arrow = self.ax_main.arrow(
            x, 0, self.last_action * arrow_scale, 0,
            head_width=0.05, head_length=0.1,
            fc='g', ec='g'
        )

        # Update force subplot
        self.ax_force.clear()
        self.ax_force.set_xlabel('Time (s)')
        self.ax_force.set_ylabel('Force (N)')
        self.ax_force.set_title('Forces over Time')

        if len(self.time_history) > 0:
            # Build a dict for plotting
            force_data_dict = {
                'human':   (self.human_force_history, 'g-', 'Human'),
                'cable':   (self.cable_force_history, 'r-', 'Cable'),
                'damping': (self.damping_force_history, 'm-', 'Damping'),
                'total':   (self.total_force_history, 'b-', 'Total'),
            }
            for name in self.forces_to_plot:
                data, style, label = force_data_dict[name]
                self.ax_force.plot(self.time_history, data, style, label=label)
            self.ax_force.legend()

            # Show text with current forces
            current_forces = self.calculate_forces()
            text_str = ""
            for name in self.forces_to_plot:
                text_str += f"{name.capitalize()} = {current_forces[name]:.2f} N\n"
            if self.force_text is None:
                self.force_text = self.ax_force.text(0.05, 0.8, '', transform=self.ax_force.transAxes)
            self.force_text.set_text(text_str.strip())

        self.fig.canvas.draw()
        self.fig.canvas.flush_events()

    def close(self):
        if self.viewer:
            plt.ioff()
            plt.close(self.fig)
            self.viewer = None

    def target_trajectory(self, t):
        """
        Defines a minimum-jerk trajectory from x=0 to x=self.L over self.T seconds.
        If t >= self.T, returns the final position self.L.
        """
        if t >= self.T:
            return self.L
        else:
            tau = t / self.T
            # Minimum-jerk polynomial: x(t) = L * (10*tau^3 - 15*tau^4 + 6*tau^5)
            return self.L * (10*tau**3 - 15*tau**4 + 6*tau**5)

    def calculate_forces(self):
        """
        Calculate the force components at the current state.
        Returns a dict with keys: ['human', 'cable', 'damping', 'total'].
        """
        x, x_dot = self.state
        F_human = self.last_action
        F_cable = self.cable_force
        F_damping = -self.b * x_dot
        F_total = F_human + F_cable + F_damping

        return {
            'human': F_human,
            'cable': F_cable,
            'damping': F_damping,
            'total': F_total
        }

    def plot_data(self):
        """
        After the simulation ends, plot the recorded time-history of chosen forces for offline analysis.
        """
        plt.figure(figsize=(6, 4))
        force_data_dict = {
            'human':   (self.human_force_history, 'g-', 'Human'),
            'cable':   (self.cable_force_history, 'r-', 'Cable'),
            'damping': (self.damping_force_history, 'm-', 'Damping'),
            'total':   (self.total_force_history, 'b-', 'Total')
        }

        for name in self.forces_to_plot:
            data, style, label = force_data_dict[name]
            plt.plot(self.time_history, data, style, label=label)

        plt.xlabel('Time (s)')
        plt.ylabel('Force (N)')
        plt.title('Forces over Time (Post-Simulation)')
        plt.legend()
        plt.grid(True)
        plt.show()


#### Example Usage

In [16]:
# import matplotlib
# from linear_env import LinearEnv  # or wherever your LinearEnv class is saved

if __name__ == "__main__":
    # Create the linear environment with desired length and duration
    env = LinearEnv(length=0.3, duration=5.0)

    # Choose which forces to visualize: e.g. human, cable, total, damping
    env.set_forces_to_plot(['human', 'cable'])

    # Reset to initial state [x=0.0, x_dot=0.0]
    initial_state = [0.0, 0.0]
    state = env.reset(initial_state=initial_state)

    # Simple proportional (P) controller gains
    Kp = 5 
    # You could also add derivative or integral components as needed

    for _ in range(300):
        # Get the current time's target position (minimum jerk from 0 to env.L)
        x_ref = env.target_trajectory(env.time)
        x_current, x_dot_current = state

        # Simple P-control on the position error
        error = x_ref - x_current
        human_force = Kp * error

        # Optionally set a constant external cable force
        env.cable_force = -0.5

        # Step the environment
        action = [human_force]
        state, reward, done, _ = env.step(action)

        # Render environment with chosen force lines
        env.render()

        if done:
            break

    env.close()

    # Plot saved data after simulation
    print("Simulation ended. Backend:", matplotlib.get_backend())
    # env.plot_data()


Simulation ended. Backend: Qt5Agg


### Human
The `Human` class simulates human interaction with an environment using an impedance control model. The class allows for customizable parameters like proportional gain (`Kp`) and a force schedule to represent external forces applied via cables. The class also logs trajectory data during the simulation.

In [175]:
class Human:
    def __init__(self, force_schedule=None, Kp=10.0):
        self.Kp = Kp
        self.force_schedule = force_schedule
        self.trajectory_data = {
            'time': [],
            'theta': [],
            'theta_dot': [],  
            'human_torque': [],
            'cable_force': []
        }

    def impedance_torque(self, target_theta, current_theta):
        error = target_theta - current_theta
        human_torque = self.Kp * error  # Only Proportional control, Damping is handled by the environment
        return human_torque

    def simulate(self, env, cable_schedule, duration=10, render=False):
        state = env.reset(initial_state=[0.0, 0.0])

        self.trajectory_data = {
            'time': [],
            'theta': [],
            'theta_dot': [],
            'human_torque': [],
            'cable_force': []
        }   
        steps = int(duration / env.dt)
        for step in range(steps):
            env.time = step * env.dt  
            target_theta = env.target_trajectory(env.time)
            current_theta, current_theta_dot = state

            human_torque = self.impedance_torque(target_theta, current_theta)

            env.cable_force = cable_schedule.get_force(step, steps)

            # Apply torque as action
            action = [human_torque]
            state, done, _ = env.step(action)

            # Render environment if desired
            if render:
                env.render()

            # Store data for analysis
            self.trajectory_data['time'].append(env.time)
            self.trajectory_data['theta'].append(current_theta)
            self.trajectory_data['theta_dot'].append(current_theta_dot)
            self.trajectory_data['human_torque'].append(human_torque)
            self.trajectory_data['cable_force'].append(env.cable_force)

            if done:
                break

        return self.trajectory_data


### Cable Force Schedule

The CableForceSchedule class manages the application of cable forces during a simulation episode. It provides customizable force schedules, including linear, step function, and constant modes. The class calculates the cable force based on the current simulation step, the total number of steps, and the selected schedule type.

In [None]:
class CableForceSchedule:
    """A class to manage cable force adjustments during the episode."""
    def __init__(self, initial_force=0.0, max_force=40.0, schedule_type='linear'):
        self.initial_force = initial_force
        self.max_force = max_force
        self.schedule_type = schedule_type

    def get_force(self, current_step, steps):
        """Return cable force based on current step and chosen schedule."""
        if self.schedule_type == 'constant':
            # Default to constant force
            return self.initial_force
        else:
            raise ValueError("Invalid schedule type. Choose from 'linear', 'step_function', or 'constant'.")

# Simulation and Visualization
### Human participant

In [375]:
env = PendulumEnv()
env.set_torques_to_plot(['human', 'cable'])

Superman = Human(Kp=500.0)
John = Human(Kp=50.0)
Charile = Human(Kp=5.0)

constant_0 = CableForceSchedule(initial_force=0.0, max_force=10.0, schedule_type='constant')
constant_10 = CableForceSchedule(initial_force=10.0, max_force=10.0, schedule_type='constant')
constant_40 = CableForceSchedule(initial_force=40.0, max_force=10.0, schedule_type='constant')

# Superman.simulate(env, constant_0, duration=7, render=True
# Charile.simulate(env, constant_40, duration=7, render=True)
John.simulate(env, constant_10, duration=7, render=True)

# # Now we can plot Superman's results
# data = Superman.trajectory_data
# fig, ax = plt.subplots(figsize=(10, 6))

# ax.plot(data['time'], data['theta'], label='Theta')
# ax.plot(data['time'], data['human_torque'], label='Human Torque', color='r')
# ax.plot(data['time'], data['cable_force'], label='Cable Force', color='g')

# ax.set_title("Superman's Simulation Results")
# ax.set_xlabel('Time [s]')
# ax.legend()
# plt.tight_layout()
# plt.show()


{'time': [0.02,
  0.04,
  0.06,
  0.08,
  0.1,
  0.12000000000000001,
  0.13999999999999999,
  0.16,
  0.18,
  0.19999999999999998,
  0.22,
  0.24,
  0.26,
  0.28,
  0.30000000000000004,
  0.32,
  0.34,
  0.36000000000000004,
  0.38,
  0.4,
  0.42000000000000004,
  0.44,
  0.46,
  0.48000000000000004,
  0.5,
  0.52,
  0.54,
  0.56,
  0.5800000000000001,
  0.6,
  0.62,
  0.64,
  0.66,
  0.68,
  0.7000000000000001,
  0.7200000000000001,
  0.74,
  0.76,
  0.78,
  0.8,
  0.8200000000000001,
  0.8400000000000001,
  0.86,
  0.88,
  0.9,
  0.92,
  0.9400000000000001,
  0.9600000000000001,
  0.98,
  1.0,
  1.02,
  1.04,
  1.06,
  1.08,
  1.1,
  1.12,
  1.1400000000000001,
  1.1600000000000001,
  1.18,
  1.2,
  1.22,
  1.24,
  1.26,
  1.28,
  1.3,
  1.32,
  1.34,
  1.36,
  1.3800000000000001,
  1.4000000000000001,
  1.4200000000000002,
  1.44,
  1.46,
  1.48,
  1.5,
  1.52,
  1.54,
  1.56,
  1.58,
  1.6,
  1.62,
  1.6400000000000001,
  1.6600000000000001,
  1.6800000000000002,
  1.7,
  1.72,
  

## Beysian Optimization

In [430]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import Matern, ConstantKernel
from scipy.stats import norm

# ====================================
# Your environment and human are set up
# ====================================
env = PendulumEnv()
human = Superman  # You can change this to John, Charile, or any other human object as needed

def run_simulation_and_get_error(cable_force_value, duration=7.0):
    cable_schedule = CableForceSchedule(
        initial_force=cable_force_value, 
        max_force=cable_force_value, 
        schedule_type='constant'
    )
    data = human.simulate(env, cable_schedule, duration=duration, render=False)

    thetas = np.array(data['theta'])
    times = np.array(data['time'])
    target_thetas = np.array([env.target_trajectory(t) for t in times])

    x_human = env.L * np.sin(thetas)
    y_human = -env.L * np.cos(thetas)

    x_target = env.L * np.sin(target_thetas)
    y_target = -env.L * np.cos(target_thetas)

    position_errors = (x_human - x_target)**2 + (y_human - y_target)**2
    error = np.mean(position_errors)
    return error

# Objective function now includes a penalty on the cable force effort.
def objective(**kwargs):
    cable_force = kwargs['cable_force_value']
    error = run_simulation_and_get_error(cable_force)

    # Normalize error and penalty weights
    normalized_error = error / 0.1  # Scale factor for error
    normalized_force = cable_force / 40.0  # Normalize to max force
    weight = 0.8  # Weight for penalty vs error

    cost = weight * normalized_error + (1 - weight) * normalized_force
    print("Normalized Error:", normalized_error)
    print("Normalized Force:", normalized_force)
    print("Cost:", cost)
    return cost


# ====================
# Bayesian Optimization Setup
# ====================
bounds = [(-40.0, 40.0)]
n_initial = 1
n_iterations = 7

# Initial random samples
X_init = np.random.uniform(bounds[0][0], bounds[0][1], size=(n_initial, 1))
y_init = np.array([objective(cable_force_value=x[0]) for x in X_init])

X = X_init.copy()
y = y_init.copy()

# ====================
# Define Acquisition Function (Expected Improvement)
# ====================
def expected_improvement(X_new, X, y, model, xi=0.01):
    mu, sigma = model.predict(X_new, return_std=True)
    y_min = np.min(y)

    with np.errstate(divide='warn'):
        Z = (y_min - mu - xi) / sigma
        ei = (y_min - mu - xi) * norm.cdf(Z) + sigma * norm.pdf(Z)
        ei[sigma == 0.0] = 0.0
    return ei

def propose_next_point(acquisition, X, y, model, bounds):
    X_candidates = np.linspace(bounds[0][0], bounds[0][1], 200).reshape(-1, 1)
    ei_values = acquisition(X_candidates, X, y, model)
    best_index = np.argmax(ei_values)
    return X_candidates[best_index].reshape(-1, 1), np.max(ei_values)

# ====================
# Callback and Plotting
# ====================
fig, (ax_gp, ax_acq) = plt.subplots(2, 1, figsize=(6, 8))
plt.ion()
plt.tight_layout(pad=3.0)

def optimization_callback(iteration, X, y, model):
    print(f"Iteration {iteration}: Best so far: cable_force={X[np.argmin(y)][0]:.4f}, cost={np.min(y):.6f}")

    ax_gp.clear()
    ax_acq.clear()

    X_plot = np.linspace(bounds[0][0], bounds[0][1], 200).reshape(-1, 1)
    mu, std = model.predict(X_plot, return_std=True)

    # Plot GP mean and ±1 std
    ax_gp.plot(X_plot, mu, 'b-', label='GP Mean')
    ax_gp.fill_between(X_plot.ravel(), mu - std, mu + std, alpha=0.2, color='blue', label='GP ±1σ')

    # Plot observed data
    ax_gp.scatter(X, y, c='r', label="Observed Data")

    # Title and labels
    ax_gp.set_title("GP estimate, GP uncertainty, and Data")
    ax_gp.set_xlabel("Cable Force")
    ax_gp.set_ylabel("Cost (Normalized)")
    ax_gp.legend()

    # Plot acquisition function
    ei_values = expected_improvement(X_plot, X, y, model)
    ax_acq.plot(X_plot, ei_values, 'g-', label='Expected Improvement')
    ax_acq.set_title("Acquisition Function (EI)")
    ax_acq.set_xlabel("Cable Force")
    ax_acq.set_ylabel("Expected Improvement")
    ax_acq.legend()

    plt.pause(0.5)

# ====================
# Main Optimization Loop
# ====================
kernel = ConstantKernel(1.0) * Matern(length_scale=10.0, nu=2.5)
model = GaussianProcessRegressor(kernel=kernel, alpha=1e-3, normalize_y=False, 
                                 n_restarts_optimizer=5, random_state=42)

for i in range(n_initial, n_iterations):
    model.fit(X, y)
    optimization_callback(i, X, y, model)

    # Compute next point
    X_next, ei_value = propose_next_point(expected_improvement, X, y, model, bounds)
    y_next = objective(cable_force_value=X_next[0][0])

    # Append new data
    X = np.vstack((X, X_next))
    y = np.append(y, y_next)

model.fit(X, y)
optimization_callback(n_iterations, X, y, model)

best_idx = np.argmin(y)
print("Optimization completed.")
print("Optimal cable force:", X[best_idx][0])
print("Minimum cost achieved:", y[best_idx])

# Re-run the simulation with the optimal cable force and optionally render it
optimal_cable_force = X[best_idx][0]
final_cable_schedule = CableForceSchedule(initial_force=optimal_cable_force, 
                                          max_force=optimal_cable_force, 
                                          schedule_type='constant')
final_data = human.simulate(env, final_cable_schedule, duration=7.0, render=True)
# env.close()


Normalized Error: 0.00028465065456253233
Normalized Force: 0.9086301582906969
Cost: 0.18195375218178939
Iteration 1: Best so far: cable_force=36.3452, cost=0.181954
Normalized Error: 0.01567905375777583
Normalized Force: -1.0
Cost: -0.18745675699377928
Iteration 2: Best so far: cable_force=-40.0000, cost=-0.187457




Normalized Error: 0.015501361818025958
Normalized Force: -0.9899497487437185
Cost: -0.18558886029432292
Iteration 3: Best so far: cable_force=-40.0000, cost=-0.187457
Normalized Error: 0.01036824729699958
Normalized Force: -0.6683417085427136
Cost: -0.12537374387094302
Iteration 4: Best so far: cable_force=-40.0000, cost=-0.187457
Normalized Error: 0.01567905375777583
Normalized Force: -1.0
Cost: -0.18745675699377928
Iteration 5: Best so far: cable_force=-40.0000, cost=-0.187457
Normalized Error: 0.01567905375777583
Normalized Force: -1.0
Cost: -0.18745675699377928
Iteration 6: Best so far: cable_force=-40.0000, cost=-0.187457
Normalized Error: 0.01567905375777583
Normalized Force: -1.0
Cost: -0.18745675699377928
Iteration 7: Best so far: cable_force=-40.0000, cost=-0.187457
Optimization completed.
Optimal cable force: -40.0
Minimum cost achieved: -0.18745675699377928
