In [4]:
# for autoformatting
import jupyter_black

jupyter_black.load()

In [6]:
import gymnasium as gym
from stable_baselines3 import A2C, SAC, PPO, TD3

In [11]:
import os

# Create save dir
save_dir = "/tmp/gym/"
os.makedirs(save_dir, exist_ok=True)

model = PPO("MlpPolicy", "Pendulum-v1", verbose=0).learn(8_000)
# The model will be saved under PPO_tutorial.zip
model.save(f"{save_dir}/PPO_tutorial")

# sample an observation from the environment
obs = model.env.observation_space.sample()

# Check prediction before saving
print("pre saved", model.predict(obs, deterministic=True))

# del model  # delete trained model to demonstrate loading

loaded_model = PPO.load(f"{save_dir}/PPO_tutorial")
# Check that the prediction is the same after loading (for the same observation)
print("loaded", loaded_model.predict(obs, deterministic=True))

pre saved (array([0.3014052], dtype=float32), None)
loaded (array([0.3014052], dtype=float32), None)


In [10]:
# The loading function can also update the model's class variables when loading.

import os
from stable_baselines3.common.vec_env import DummyVecEnv

# Create save dir
save_dir = "/tmp/gym/"
os.makedirs(save_dir, exist_ok=True)

model = A2C("MlpPolicy", "Pendulum-v1", verbose=0, gamma=0.9, n_steps=20).learn(8000)
# The model will be saved under A2C_tutorial.zip
model.save(f"{save_dir}/A2C_tutorial")

del model  # delete trained model to demonstrate loading

# load the model, and when loading set verbose to 1
loaded_model = A2C.load(f"{save_dir}/A2C_tutorial", verbose=1)

# show the save hyperparameters
print(f"loaded: gamma={loaded_model.gamma}, n_steps={loaded_model.n_steps}")

# as the environment is not serializable, we need to set a new instance of the environment
loaded_model.set_env(DummyVecEnv([lambda: gym.make("Pendulum-v1")]))
# and continue training
loaded_model.learn(8_000)

loaded: gamma=0.9, n_steps=20
-------------------------------------
| time/                 |           |
|    fps                | 1582      |
|    iterations         | 100       |
|    time_elapsed       | 1         |
|    total_timesteps    | 2000      |
| train/                |           |
|    entropy_loss       | -1.46     |
|    explained_variance | -0.000236 |
|    learning_rate      | 0.0007    |
|    n_updates          | 499       |
|    policy_loss        | -62.3     |
|    std                | 1.04      |
|    value_loss         | 1.29e+03  |
-------------------------------------
-------------------------------------
| time/                 |           |
|    fps                | 1539      |
|    iterations         | 200       |
|    time_elapsed       | 2         |
|    total_timesteps    | 4000      |
| train/                |           |
|    entropy_loss       | -1.46     |
|    explained_variance | -0.000144 |
|    learning_rate      | 0.0007    |
|    n_updates      

<stable_baselines3.a2c.a2c.A2C at 0x11cf3f620>

### Gym and VecEnv wrappers

https://stable-baselines.readthedocs.io/en/master/guide/custom_env.html

https://gymnasium.farama.org/api/wrappers/

**Anatomy of a gym wrapper**: A gym wrapper follows the gym interface.  It has a `reset()` and `step()` method. We can access the environment with `self.env`, without modifying the original env.

In [17]:
class CustomWrapper(gym.Wrapper):
    """:param env : (gym.Env) Gym environment that will be wrapped"""

    def __init__(self, env):
        super().__init__(env)

    def reset(self, **kwargs):
        """reset the environment"""
        obs, info = self.env.reset(**kwargs)
        return obs, info

    def step(step, action):
        """
        :param action: ([float or int) action taken by the agent
        :return: (np.ndarray, float, bool, bool, dict) observation, reward, is this a final state (episode finished),
        is the max number ofsteps reached (episode finished artificially), additional informations
        """
        obs, reward, terminated, truncated, info = self.env.step(action)
        return obs, reward, terminated, truncated, info

#### 1. Example - `TimeLimitWrapper` limit the episode length
* In practice, gym already have a wrapper for that named `TimeLimit (gym.wrappers.TimeLimit)` that is used by most environments.""

In [18]:
# wrapper to limit the number of steps by episode,
# need to overwrite the done signal when the limit is reached.
# and pass that information in the info dictionary.
class TimeLimitWrapper(gym.Wrapper):
    """
    :param env: (gym.Env) Gym environment that will be wrapped
    :param max_steps: (int) Max number of steps per episode
    """

    def __init__(self, env, max_steps=100):
        # call the parent constructor, so we can access self.env later
        super().__init__(env)
        self.max_steps = max_steps
        self.current_steps = 0

    def reset(self, **kwargs):
        """reset the environment"""
        self.current_step = 0
        return self.env.reset(**kwargs)

    def step(self, action):
        """
        :param action: ([float or int) action taken by the agent
        :return: (np.ndarray, float, bool, bool, dict) observation, reward, is this a final state (episode finished),
        is the max number ofsteps reached (episode finished artificially), additional informations
        """
        self.current_step += 1
        obs, reward, terminated, truncated, info = self.env.step(action)
        # Overwrite the truncation signal when when the number of steps reaches the maximum
        if self.current_step >= self.max_steps:
            truncated = True
        return obs, reward, terminated, truncated, info

In [21]:
# testing
from gymnasium.envs.classic_control.pendulum import PendulumEnv

# Here we create the environment directly because gym.make() already wrap the environment in a TimeLimit wrapper otherwise
env = PendulumEnv()

obs, _ = env.reset()
done = False
n_steps = 0
while not done and n_steps < 5000:
    # Take random actions
    random_action = env.action_space.sample()
    obs, reward, terminated, truncated, info = env.step(random_action)
    done = terminated or truncated
    n_steps += 1
print("Without the TimeWrapper")
print(n_steps, info)


# Wrap the environment
env = TimeLimitWrapper(env, max_steps=100)

obs, _ = env.reset()
done = False
n_steps = 0
while not done:
    # Take random actions
    random_action = env.action_space.sample()
    obs, reward, terminated, truncated, info = env.step(random_action)
    done = terminated or truncated
    n_steps += 1
print("\nWith the TimeWrapper")
print(n_steps, info)

Without the TimeWrapper
5000 {}

With the TimeWrapper
100 {}


#### 2. Example - `NormalizeActionWrapper` normalize observations and actions before giving it to the agent.
* In practice, gym already have a wrapper for that named `TimeLimit (gym.wrappers.TimeLimit)` that is used by most environments.""

In [27]:
import numpy as np


class NormalizeActionWrapper(gym.Wrapper):
    """
    :param env: (gym.Env) Gym environment that will be wrapped
    """

    def __init__(self, env):
        # retrieve the action space
        action_space = env.action_space
        assert isinstance(action_space, gym.spaces.Box)  # this is for continuous action
        # retrieve max/min values
        self.low, self.high = action_space.low, action_space.high
        # modify the action space to [-1, 1]
        env.action_space = gym.spaces.Box(
            low=-1, high=1, shape=action_space.shape, dtype=np.float32
        )
        # call the parent constructor
        super().__init__(env)

    def rescale_action(self, scaled_action):
        """
        Rescale the action from [-1, 1] to [low, high]
        (no need for symmetric action space)
        :param scaled_action: (np.ndarray)
        :return: (np.ndarray)
        """
        return self.low + (0.5 * (scaled_action + 1.0) * (self.high - self.low))

    def reset(self, **kwargs):
        """
        Reset the environment
        """
        return self.env.reset(**kwargs)

    def step(self, action):
        """
        :param action: ([float] or int) Action taken by the agent
        :return: (np.ndarray, float,bool, bool, dict) observation, reward, final state? truncated?, additional informations
        """
        # Rescale action from [-1, 1] to original [low, high] interval
        rescaled_action = self.rescale_action(action)
        obs, reward, terminated, truncated, info = self.env.step(rescaled_action)
        return obs, reward, terminated, truncated, info

In [29]:
## testing
# before rescaling

original_env = gym.make("Pendulum-v1")
print("before rescaling")
print(original_env.action_space.low)
for _ in range(10):
    print(original_env.action_space.sample())

# after rescaling
print("\n\nafter rescaling")
env = NormalizeActionWrapper(gym.make("Pendulum-v1"))

print(env.action_space.low)

for _ in range(10):
    print(env.action_space.sample())

before rescaling
[-2.]
[-0.80088085]
[-0.8478717]
[0.37158832]
[0.6830635]
[0.26569128]
[0.33604035]
[-0.9614514]
[-1.5475844]
[-0.4128491]
[0.17539002]


after rescaling
[-1.]
[0.70716596]
[-0.7957447]
[0.33592948]
[-0.5850199]
[-0.16451477]
[-0.40088794]
[-0.6694457]
[-0.39624718]
[-0.09851905]
[-0.38071966]


In [32]:
## testing with a RL algorithm

from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import DummyVecEnv

env = Monitor(gym.make("Pendulum-v1"))
env = DummyVecEnv([lambda: env])

model = A2C("MlpPolicy", env, verbose=0).learn(int(10000))

mean_reward, std_reward = evaluate_policy(
    model, env, n_eval_episodes=100, warn=False, deterministic=True
)
print("before rescaling")
print(f"mean_reward: {mean_reward:.2f} +/- {std_reward:.2f}")


# with the action wrapper
normalized_env = Monitor(gym.make("Pendulum-v1"))
# Note that we can use multiple wrappers
normalized_env = NormalizeActionWrapper(normalized_env)
normalized_env = DummyVecEnv([lambda: normalized_env])
model_2 = A2C("MlpPolicy", normalized_env, verbose=0).learn(int(10000))


mean_reward, std_reward = evaluate_policy(
    model_2, env, n_eval_episodes=100, warn=False, deterministic=True
)
print("\n\nafter rescaling")
print(f"mean_reward: {mean_reward:.2f} +/- {std_reward:.2f}")

before rescaling
mean_reward: -1506.35 +/- 240.18


after rescaling
mean_reward: -1652.65 +/- 182.49


### Monitor Wrapper

In [72]:
# wrapper to monitor the training progress,
# storing both the episode reward (sum of reward for one episode) and episode length (number of steps in for the last episode).
# return those values using the info dict after each end of episode.


class MyMonitorWrapper(gym.Wrapper):
    """
    :param env: (gym.Env) Gym environment that will be wrapped
    :param max_steps: (int) Max number of steps per episode
    """

    def __init__(self, env, **kwargs):
        # call the parent constructor, so we can access self.env later
        self.episode_rewards = []
        self.episode_steps = 0
        super().__init__(env, **kwargs)

    def reset(self, **kwargs):
        """reset the environment"""
        self.current_step = 0
        self.episode_rewards = []
        self.episode_steps = 0
        return self.env.reset(**kwargs)

    def step(self, action):
        """
        :param action: ([float or int) action taken by the agent
        :return: (np.ndarray, float, bool, bool, dict) observation, reward, is this a final state (episode finished),
        is the max number ofsteps reached (episode finished artificially), additional informations
        """
        self.current_step += 1
        obs, reward, done, info = self.env.step(action)
        print(type(info))
        self.episode_rewards += reward
        self.episode_steps += 1
        print(self.episode_rewards, self.episode_steps)
        # if done
        if done:
            sum_of_episode_rewards = sum(self.episode_rewards)
            number_of_episode_steps = self.episode_steps
            info["sum_of_episode_rewards"], info["number_of_episode_steps"] = (
                sum_of_episode_rewards,
                number_of_episode_steps,
            )
            print(self.episode_rewards, self.episode_steps)
        return obs, reward, terminated, truncated, info

In [73]:
env = gym.make("CartPole-v1")
model = A2C("MlpPolicy", env, verbose=0).learn(int(1000))

# === YOUR CODE HERE ===#
# Wrap the environment
env = gym.make("Pendulum-v1")
env = MyMonitorWrapper(env)
env = DummyVecEnv([lambda: env])
# Reset the environment
env.reset()

array([[ 0.6963424 ,  0.7177097 , -0.64042455]], dtype=float32)

In [74]:
from stable_baselines3.common.base_class import BaseAlgorithm


def evaluate(
    model: BaseAlgorithm, num_episodes: int = 100, deterministic: bool = True
) -> float:
    """
    Evaluate an RL agent for `num_episodes`.
    :param model: the RL Agent
    :param_env: the gym Environment
    :param num_episodes: the number of episodes to evaluate it
    :param deterministic: whether to use deterministic or stochastic actions
    :return: Mean reward for the last `num_episodes`
    """
    # this works for a single environment
    vec_env = model.get_env()
    obs = vec_env.reset()
    all_episode_rewards = []
    for episode in range(num_episodes):
        episode_rewards = []
        done = False
        while not done:
            # _states are only useful if using LSTM policites
            # `deterministic` is to use deterministic actions
            action, _states = model.predict(obs, deterministic=deterministic)
            # actions, rewards, dones are arrays
            # because we are using a vectorized env
            obs, reward, done, _info = vec_env.step(action)
            episode_rewards.append(reward)
        print(f"The {_info=}\n at the end of {episode=}\n")
        all_episode_rewards.append(sum(episode_rewards))
    mean_episode_reward = np.mean(all_episode_rewards)


evaluate(model, num_episodes=2, deterministic=True)

The _info=[{'episode': {'r': 21.0, 'l': 21, 't': 1.53533}, 'TimeLimit.truncated': False, 'terminal_observation': array([ 0.09843981, -0.1932801 , -0.22030337, -0.45737815], dtype=float32)}]
 at the end of episode=0

The _info=[{'episode': {'r': 20.0, 'l': 20, 't': 1.544396}, 'TimeLimit.truncated': False, 'terminal_observation': array([ 0.05870731,  0.01871654, -0.21634758, -0.6021475 ], dtype=float32)}]
 at the end of episode=1



In [78]:
obs = env.reset()
action, *_ = model.predict(obs, deterministic=True)

ValueError: Error: Unexpected observation shape (1, 3) for Box environment, please use (4,) or (n_env, 4) for the observation shape.

In [76]:
env.step(action)[2]

ValueError: too many values to unpack (expected 4)

In [77]:
x = env.step(action)
x[4]

ValueError: too many values to unpack (expected 4)

#### Going further - Saving format 

The format for saving and loading models is a zip-archived JSON dump and NumPy zip archive of the arrays:
```
saved_model.zip/
├── data              JSON file of class-parameters (dictionary)
├── parameter_list    JSON file of model parameters and their ordering (list)
├── parameters        Bytes from numpy.savez (a zip file of the numpy arrays). ...
    ├── ...           Being a zip-archive itself, this object can also be opened ...
        ├── ...       as a zip-archive and browsed.
```

#### Save and find 