In [91]:
import torch
from stable_baselines3 import PPO
import gymnasium as gym
import numpy as np
import imageio

In [92]:
class CustomRewardWrapper(gym.Wrapper):
    def __init__(self, env):
        super(CustomRewardWrapper, self).__init__(env)

    def step(self, action):
        obs, reward, done, truncated, info = self.env.step(action)
        # Access relevant state and action variables
        
        #state
        z = obs[0]                  # Height of the robot
        a = obs[1]                  # Angle of the robot
        a_hip = obs[2]              # Angle of the hip
        a_knee = obs[3]             # Angle of the knee
        a_ankle = obs[4]            # Angle of the ankle
        v_x = obs[5]                # Velocity in x direction
        v_z = obs[6]                # Velocity in z direction
        a_d = obs[7]                # Angular velocity 
        a_hip_d = obs[8]            # Angular velocity of the hip
        a_knee_d = obs[9]           # Angular velocity of the knee
        a_ankle_d = obs[10]         # Angular velocity of the ankle
        
        #action
        torque_hip = action[0]      # Torque applied to the hip
        torque_knee = action[1]     # Torque applied to the knee
        torque_ankle = action[2]    # Torque applied to the ankle

        #different criteria for reward
        energy_used = np.sum(np.square(action))  # Simplistic energy calculation

        # Custom reward logic
        custom_reward = np.tanh(a) * 0.5                           # Reward for positive angles
        custom_reward += 5*np.exp(-(a-(2*np.pi))**2/(2*np.pi))      # Reward for 2pi angle
        custom_reward -= energy_used * 0.1                          # Penalize energy consumption

        # used by openai
        # backroll = -obs[7]
        # height = obs[0]
        # vel_act = action[0] * obs[8] + action[1] * obs[9] + a[2] * obs[10]
        # backslide = -obs[5]
        # reward = backroll * (1.0 + .3 * height + .1 * vel_act + .05 * backslide)

        if done:
            custom_reward -= 8  # Heavy penalty for falling

        return obs, custom_reward, done, truncated, info

In [93]:
healthy_reward = 0.8
healthy_z_range = (0.2, float("inf"))
healthy_angle_range = (-np.pi//4, 2*np.pi+np.pi//4)
reset_noise_scale = 5e-3
exclude_current_positions_from_observation = True

env = gym.make('Hopper-v4', render_mode='rgb_array', healthy_reward=healthy_reward, healthy_z_range=healthy_z_range, healthy_angle_range=healthy_angle_range, reset_noise_scale=reset_noise_scale, exclude_current_positions_from_observation=exclude_current_positions_from_observation)
env = CustomRewardWrapper(env)

model = PPO("MlpPolicy", env, verbose=1)

# Train the model
n_learning_steps = 300000
model.learn(total_timesteps=n_learning_steps)

# Save the model
model.save("hopper")

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 18.2     |
|    ep_rew_mean     | -43.5    |
| time/              |          |
|    fps             | 1058     |
|    iterations      | 1        |
|    time_elapsed    | 1        |
|    total_timesteps | 2048     |
---------------------------------
-----------------------------------------
| rollout/                |             |
|    ep_len_mean          | 25.1        |
|    ep_rew_mean          | -43.2       |
| time/                   |             |
|    fps                  | 768         |
|    iterations           | 2           |
|    time_elapsed         | 5           |
|    total_timesteps      | 4096        |
| train/                  |             |
|    approx_kl            | 0.016985249 |
|    clip_fraction        | 0.209       |
|    clip_range           | 0.2         |
|    entropy_loss   

KeyboardInterrupt: 

In [94]:
vec_env = model.get_env()
obs = vec_env.reset()

writer = imageio.get_writer('hopper-flip.mp4', fps=100)


N_step = 100000
for i in range(N_step):
    action, _state = model.predict(obs, deterministic=True)
    obs, reward, done, info = vec_env.step(action)
    writer.append_data(vec_env.render("rgb_array"))
    #VecEnv resets automatically
    if done:
      print("Episode finished after {} timesteps".format(i+1))
      break
      obs = vec_env.reset()

writer.close()

Episode finished after 64 timesteps
