In [148]:
import gymnasium as gym
import numpy as np
import upkie.envs


class PIDController:
    def __init__(self, kp, ki, kd, dt):
        self.kp = kp  # Proportional gain
        self.ki = ki  # Integral gain
        self.kd = kd  # Derivative gain
        self.last_error = 0
        self.integral = 0
        self.dt = dt

    def calculate_control(self, current_value):
        error = current_value
        self.integral += error * self.dt  # dt is the time step
        derivative = (error - self.last_error) / self.dt
        control_signal = (
            np.dot(self.kp, error) + np.dot(self.ki, self.integral) + np.dot(self.kd, derivative)
        )
        self.last_error = error
        return control_signal

    def reset(self):
        self.last_error = 0
        self.integral = 0


class PController:
    def __init__(self, kp, ki, kd, dt):
        self.kp = np.array([10., 1., 0., 1.])

    def calculate_control(self, current_value):
        control_signal = (
            np.dot(self.kp, current_value)
        )
        return control_signal

    def reset(self):
        pass


def P(x):
    kp = np.array([10., 1., 0., 1.])
    return np.dot(kp, x)


def push_balance_PID(env: upkie.envs.UpkieGroundVelocity, policy, force, check_msfos = False):
    torso_force_in_world = np.zeros(3)
    torso_force_in_world[0] = force
    bullet_action = {
        "external_forces": {
            "torso": {
                "force": torso_force_in_world,
                "local": False,
            }
        }
    }
    observation, _ = env.reset(seed = 42)
    for step in range(4_400):
        action = 0.0 * env.action_space.sample()
        action[0] = policy.calculate_control(observation)
        if step < 400 and step >= 200:
            env.unwrapped.bullet_extra(bullet_action)  # call before env.step
        
        observation, _, terminated, truncated, _ = env.step(action)
        
        if terminated or truncated:
            observation, _ = env.reset(seed = 42)
            policy.reset()
            if check_msfos:
                return False

    return True


In [149]:
def MSFOS(env, policy):
    force_0 = .1
    force_1 = 1.
    while push_balance(env, policy, force_1, check_msfos = True):
        force_0 = force_1
        force_1 *= 2
    
    max_ = force_1
    min_ = force_0
    force_1 = (max_ + min_) / 2
    
    while max_ - min_ > 1e-1:
        if push_balance(env, policy, force_1, check_msfos = True):
            min_ = force_1
            force_1 = (force_1 + max_) / 2
        else:
            max_ = force_1
            force_1 = (force_1 + min_) / 2
    
    return force_1

In [150]:
upkie.envs.register()
env = gym.make("UpkieGroundVelocity-v3", frequency=200.0)
policy = PIDController(np.array([20., 1., 0.1, 0.1]), np.array([.0, .0, 0, .0]), np.array([.1, 0., .1, .00]), 1 / 200.)
print(MSFOS(env, policy))

TimeoutError: Spine did not process request within 100.0 ms, is it stopped?

In [151]:
def push_balance(env: upkie.envs.UpkieServos, policy, force, check_msfos = False):
    torso_force_in_world = np.zeros(3)
    torso_force_in_world[0] = force
    bullet_action = {
        "external_forces": {
            "torso": {
                "force": torso_force_in_world,
                "local": False,
            }
        }
    }
    observation, _ = env.reset(seed = 42)
    for step in range(4_000):
        action, _ = policy.predict(observation, deterministic=True)
        if step < 400 and step >= 200:
            env.unwrapped.bullet_extra(bullet_action)  # call before env.step
        
        observation, _, terminated, truncated, _ = env.step(action)
        
        if terminated or truncated:
            observation, _ = env.reset(seed = 42)
            if check_msfos:
                return False

    return True

In [168]:
import sys
sys.path.append("/Users/andrei/Desktop/ppo_balancer")
sys.path.append("/Users/andrei/Desktop/ppo_balancer/ppo_balancer")

import gin
config_path = "/Users/andrei/Desktop/ppo_balancer/ppo_balancer/policy/operative_config.gin"
gin.parse_config_file(config_path)

from settings import EnvSettings, PPOSettings, TrainingSettings
from envs import make_ppo_balancer_env, make_ppo_balancer_env_servos
#from train import CustomUpkieServos, UpkieServosWrapper
from stable_baselines3 import PPO

upkie.envs.register()

ppo_settings = PPOSettings()
env_settings = EnvSettings()
init_state = None

# parent process: trainer
agent_frequency = env_settings.agent_frequency

velocity_env = upkie.envs.UpkieServos(
            frequency=agent_frequency,
            regulate_frequency=False,
            spine_config=env_settings.spine_config,
        )

# Wrap the environment with the UpkieServosWrapper
velocity_env_flat = UpkieServosWrapper(velocity_env)

env = make_ppo_balancer_env_servos(velocity_env_flat, env_settings, training=True)

'''
with gym.make(
        env_settings.env_id,
        frequency=env_settings.agent_frequency,
        init_state=init_state,
        max_ground_velocity=env_settings.max_ground_velocity,
        regulate_frequency=True,
        spine_config=env_settings.spine_config,
    ) as velocity_env:
        env = make_ppo_balancer_env(
            velocity_env,
            env_settings,
            training=False,
        )

'''



policy = PPO(
    "MlpPolicy",
    env,
    policy_kwargs={
        "net_arch": {
            "pi": ppo_settings.net_arch_pi,
            "vf": ppo_settings.net_arch_vf,
        },
    },
    verbose=0,
)

policy.set_parameters("/Users/andrei/Desktop/ppo_balancer/training/2024-12-06/prostemmate_1/final") # Itza, toyish

RuntimeError: Error(s) in loading state_dict for ActorCriticPolicy:
	size mismatch for log_std: copying a param with shape torch.Size([12]) from checkpoint, the shape in current model is torch.Size([2]).
	size mismatch for mlp_extractor.policy_net.0.weight: copying a param with shape torch.Size([16, 460]) from checkpoint, the shape in current model is torch.Size([16, 70]).
	size mismatch for mlp_extractor.value_net.0.weight: copying a param with shape torch.Size([32, 460]) from checkpoint, the shape in current model is torch.Size([32, 70]).
	size mismatch for action_net.weight: copying a param with shape torch.Size([12, 16]) from checkpoint, the shape in current model is torch.Size([2, 16]).
	size mismatch for action_net.bias: copying a param with shape torch.Size([12]) from checkpoint, the shape in current model is torch.Size([2]).

In [None]:
print(MSFOS(env, policy))
#push_balance(env, policy, 0.)

In [152]:
import gymnasium

class UpkieServosWrapper(gymnasium.Wrapper):
    """!
    Wrapper for the UpkieServos environment that converts actions 
    and observations to NumPy ndarrays.

    This wrapper simplifies the interaction with the UpkieServos environment
    by converting the dictionary-based actions and observations into 
    flattened NumPy ndarrays. This can be useful for compatibility with 
    algorithms that expect ndarray inputs.
    """

    def __init__(self, env: gymnasium.Env):
        """!
        Initialize the wrapper.

        Args:
            env: The UpkieServos environment to wrap.
        """
        super().__init__(env)

        # Determine the size of the flattened action and observation arrays
        self.action_dim = 2
        self.observation_dim = 3 + 2

        # Determine the lower and upper bounds for the flattened action and observation spaces
        action_low = np.array([0, -1,])#-1, 0, -1])
        action_high = np.array([1., 1.])# 1, 1.01, 1])
        observation_low = np.concatenate([
            np.array([-1.26, -16., -28.8])
            for i in range(1)
        ] + [np.array([-2., -28.8])])
        observation_high = np.concatenate([
            np.array([1.26, 28.8])
            for i in range(1)
        ] + [np.array([2., 500., 28.8])])


        # Define the new action and observation spaces
        self.action_space = gymnasium.spaces.Box(
            low=action_low, high=action_high, shape=(self.action_dim,), dtype=np.float64
        )
        self.observation_space = gymnasium.spaces.Box(
            low=observation_low,
            high=observation_high,
            shape=(self.observation_dim,),
            dtype=np.float64,
        )

    def reset(self, seed=None, options=None):
        """!
        Reset the environment and return the observation as an ndarray.

        Returns:
            The initial observation as an ndarray.
        """
        obs_dict, info = self.env.reset(seed=seed, options=options)
        obs_dict["pitch"] = info["spine_observation"]["base_orientation"]["pitch"]
        obs_dict["velocity"] = info["spine_observation"]["wheel_odometry"]["velocity"]
        obs_dict["pitch_vel"] = np.linalg.norm(info["spine_observation"]["imu"]["angular_velocity"])
        
        return self._flatten_observation(obs_dict), info

    def step(self, action: np.ndarray):
        """!
        Take a step in the environment using an ndarray action.

        Args:
            action: The action to take as an ndarray.

        Returns:
            A tuple containing the next observation as an ndarray,
            the reward, a boolean indicating if the episode is done,
            a boolean indicating if the episode is truncated,
            and a dictionary containing extra information.
        """
        action_dict = self._unflatten_action(action)
        obs_dict, reward, terminated, truncated, info = self.env.step(
            action_dict
        )
        
        obs_dict["pitch"] = info["spine_observation"]["base_orientation"]["pitch"]
        obs_dict["velocity"] = info["spine_observation"]["wheel_odometry"]["velocity"]
        obs_dict["pitch_vel"] = np.linalg.norm(info["spine_observation"]["imu"]["angular_velocity"])
        reward = self.get_reward(obs_dict, action_dict, pos = info["spine_observation"]["wheel_odometry"]["position"], height = info["spine_observation"]["sim"]["base"]["position"][2])

        if info["spine_observation"]["sim"]["base"]["position"][2] < 0.2 or abs(obs_dict["left_knee"]["position"]) > 2:
            terminated = True
        
        return (
            self._flatten_observation(obs_dict),
            reward,
            terminated,
            truncated,
            info,
        )

    def flatten_action(self, action_dict: dict) -> np.ndarray:
        """!
        Flatten the dictionary-based action into an ndarray.
    
        Args:
            action_dict: The action dictionary.
    
        Returns:
            The flattened action as an ndarray.
        """
        flat_action = np.array([action_dict["left_knee"]["position"], action_dict["left_wheel"]["velocity"]])

        return flat_action

    def _flatten_observation(self, obs_dict: dict) -> np.ndarray:
        """!
        Flatten the dictionary-based observation into an ndarray.

        Args:
            obs_dict: The observation dictionary.

        Returns:
            The flattened observation as an ndarray.
        """
        flat_obs = []
        for joint_obs_key in obs_dict.keys():
            if joint_obs_key != "pitch" and joint_obs_key != "velocity" and joint_obs_key != "pitch_vel":
                if joint_obs_key == "left_knee":    
                    for obs_key in obs_dict[joint_obs_key].keys():
                        obs_value = obs_dict[joint_obs_key][obs_key]
                        if obs_key != "temperature" and obs_key != "voltage" and obs_key != "torque":
                            flat_obs.append(np.array(obs_value))
            else:
                flat_obs.append(np.ones(1) * obs_dict[joint_obs_key])

        
        return np.concatenate(flat_obs)

    def _unflatten_action(self, action_ndarray: np.ndarray) -> dict:
        """!
        Unflatten the ndarray action into a dictionary-based action.

        Args:
            action_ndarray: The action as an ndarray.

        Returns:
            The unflattened action as a dictionary.
        """
        action_dict = {}
        i = 0
        for joint_name, joint_space in self.env.action_space.spaces.items():
            action_dict[joint_name] = {}
            if joint_name == "left_hip":
                action_dict[joint_name]["velocity"] = 0
                action_dict[joint_name]["feedforward_torque"] = 0
                action_dict[joint_name]["kd_scale"] = 1.
                action_dict[joint_name]["kp_scale"] = 1.
                action_dict[joint_name]["position"] = - action_ndarray[0] / 2
            elif joint_name == "left_knee":
                action_dict[joint_name]["velocity"] = 0
                action_dict[joint_name]["feedforward_torque"] = 0
                action_dict[joint_name]["kd_scale"] = 1.
                action_dict[joint_name]["kp_scale"] = 1.
                action_dict[joint_name]["position"] = action_ndarray[0]
            elif joint_name == "left_wheel":
                action_dict[joint_name]["position"] = np.nan
                action_dict[joint_name]["feedforward_torque"] = 0
                action_dict[joint_name]["kd_scale"] = 1.
                action_dict[joint_name]["kp_scale"] = 1.
                action_dict[joint_name]["velocity"] = action_ndarray[1] * 28.8
            elif joint_name == "right_hip":
                action_dict[joint_name]["position"] = action_ndarray[0] / 2
                action_dict[joint_name]["feedforward_torque"] = 0
                action_dict[joint_name]["kd_scale"] = 1.
                action_dict[joint_name]["kp_scale"] = 1.
                action_dict[joint_name]["velocity"] = 0
            elif joint_name == "right_knee":
                action_dict[joint_name]["position"] = -action_ndarray[0]
                action_dict[joint_name]["feedforward_torque"] = 0
                action_dict[joint_name]["kd_scale"] = 1.
                action_dict[joint_name]["kp_scale"] = 1.
                action_dict[joint_name]["velocity"] = 0
            elif joint_name == "right_wheel":
                action_dict[joint_name]["position"] = np.nan
                action_dict[joint_name]["feedforward_torque"] = 0
                action_dict[joint_name]["kd_scale"] = 1.
                action_dict[joint_name]["kp_scale"] = 1.
                action_dict[joint_name]["velocity"] = -action_ndarray[1] * 28.8
        return action_dict
    
    # Override the method that calculates the reward.
    def get_reward(self, observation: dict, action: dict, pos: float, height: float) -> float:
        """!
        Get reward from observation and action.

        \param observation Environment observation.
        \param action Environment action.
        \return Reward.
        """
        estimated_pitch = observation["pitch"]
        estimated_ground_position = pos
        estimated_ground_velocity = observation["velocity"]
        estimated_height = height
        estimated_angular_velocity = observation["pitch_vel"]


        tip_height = 0.58  # [m]  # This might need adjustment for the Upkie robot
        tip_position = estimated_ground_position + tip_height * np.sin(estimated_pitch)
        tip_velocity = estimated_ground_velocity + tip_height * estimated_angular_velocity * np.cos(estimated_pitch)

        std_position = 0.05  # [m]
        position_reward = np.exp(-((tip_position / std_position) ** 2))
        velocity_penalty = -abs(tip_velocity)

        position_weight = 1.0  # You can adjust these weights
        velocity_weight = 0.1

        ## Shaping
        wheel_velocity_penalty = -abs(observation["left_wheel"]["velocity"])
        wheel_velocity_weight = 0.1
        height_reward = estimated_height > 0.3
        height_weight = 0.0

        return 0.5 + position_weight * position_reward + velocity_weight * velocity_penalty + height_reward * height_weight + wheel_velocity_penalty * wheel_velocity_weight
