#  创建自定义Gym环境

按照 OpenAI Gym 界面使用自己的环境。完成后，可以在该环境中轻松使用稳定基线中的任何兼容（取决于动作空间）RL 算法。

## 使用 Pip 安装依赖项和稳定基线3

In [1]:
!pip install stable-baselines3



## Gym接口的第一步

In [2]:
import gymnasium as gym

env = gym.make("CartPole-v1")

# Box(4,) means that it is a Vector with 4 components
print("Observation space:", env.observation_space)
print("Shape:", env.observation_space.shape)
# Discrete(2) means that there is two discrete actions
print("Action space:", env.action_space)

# The reset method is called at the beginning of an episode
obs, info = env.reset()
# Sample a random action
action = env.action_space.sample()
print("Sampled action:", action)
obs, reward, terminated, truncated, info = env.step(action)
# Note the obs is a numpy array
# info is an empty dict for now but can contain any debugging info
# reward is a scalar
print(obs.shape, reward, terminated, truncated, info)

Observation space: Box([-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38], [4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38], (4,), float32)
Shape: (4,)
Action space: Discrete(2)
Sampled action: 1
(4,) 1.0 False False {}


## Gym env 框架

In [3]:
import numpy as np
import gymnasium as gym
from gymnasium import spaces


class GoLeftEnv(gym.Env):
    """
    Custom Environment that follows gym interface.
    This is a simple env where the agent must learn to go always left.
    """

    # Because of google colab, we cannot implement the GUI ('human' render mode)
    metadata = {"render_modes": ["console"]}

    # Define constants for clearer code
    LEFT = 0
    RIGHT = 1

    def __init__(self, grid_size=10, render_mode="console"):
        super(GoLeftEnv, self).__init__()
        self.render_mode = render_mode

        # Size of the 1D-grid
        self.grid_size = grid_size
        # Initialize the agent at the right of the grid
        self.agent_pos = grid_size - 1

        # Define action and observation space
        # They must be gym.spaces objects
        # Example when using discrete actions, we have two: left and right
        n_actions = 2
        self.action_space = spaces.Discrete(n_actions)
        # The observation will be the coordinate of the agent
        # this can be described both by Discrete and Box space
        self.observation_space = spaces.Box(
            low=0, high=self.grid_size, shape=(1,), dtype=np.float32
        )

    def reset(self, seed=None, options=None):
        """
        Important: the observation must be a numpy array
        :return: (np.array)
        """
        super().reset(seed=seed, options=options)
        # Initialize the agent at the right of the grid
        self.agent_pos = self.grid_size - 1
        # here we convert to float32 to make it more general (in case we want to use continuous actions)
        return np.array([self.agent_pos]).astype(np.float32), {}  # empty info dict

    def step(self, action):
        if action == self.LEFT:
            self.agent_pos -= 1
        elif action == self.RIGHT:
            self.agent_pos += 1
        else:
            raise ValueError(
                f"Received invalid action={action} which is not part of the action space"
            )

        # Account for the boundaries of the grid
        self.agent_pos = np.clip(self.agent_pos, 0, self.grid_size)

        # Are we at the left of the grid?
        terminated = bool(self.agent_pos == 0)
        truncated = False  # we do not limit the number of steps here

        # Null reward everywhere except when reaching the goal (left of the grid)
        reward = 1 if self.agent_pos == 0 else 0

        # Optionally we can pass additional info, we are not using that for now
        info = {}

        return (
            np.array([self.agent_pos]).astype(np.float32),
            reward,
            terminated,
            truncated,
            info,
        )

    def render(self):
        # agent is represented as a cross, rest as a dot
        if self.render_mode == "console":
            print("." * self.agent_pos, end="")
            print("x", end="")
            print("." * (self.grid_size - self.agent_pos))

    def close(self):
        pass

## 验证自定义环境

Stable Baselines3 提供了一个帮助程序来检查您的环境是否遵循 Gym 界面。它还可以选择检查环境是否与稳定基线兼容（并在必要时发出警告）。

In [4]:
from stable_baselines3.common.env_checker import check_env

In [6]:
env = GoLeftEnv()
check_env(env, warn=True)

## 测试自定义环境

In [9]:
env = GoLeftEnv(grid_size=10)

obs, _ = env.reset()
env.render()

print(env.observation_space)
print(env.action_space)
print(env.action_space.sample())

GO_LEFT = 0
n_steps = 20
for step in range(n_steps):
    print(f"Step {step + 1}")
    obs, reward, terminated, truncated, info = env.step(GO_LEFT)
    done = terminated or terminated
    print("obs=", obs, "reward=", reward, "done=", done)
    env.render()
    if done:
        print("Goal reached!", "reward=", reward)
        break

.........x.
Box(0.0, 10.0, (1,), float32)
Discrete(2)
1
Step 1
obs= [8.] reward= 0 done= False
........x..
Step 2
obs= [7.] reward= 0 done= False
.......x...
Step 3
obs= [6.] reward= 0 done= False
......x....
Step 4
obs= [5.] reward= 0 done= False
.....x.....
Step 5
obs= [4.] reward= 0 done= False
....x......
Step 6
obs= [3.] reward= 0 done= False
...x.......
Step 7
obs= [2.] reward= 0 done= False
..x........
Step 8
obs= [1.] reward= 0 done= False
.x.........
Step 9
obs= [0.] reward= 1 done= True
x..........
Goal reached! reward= 1


## 尝试使用stable baselines3

In [15]:
from stable_baselines3 import PPO, A2C, DQN
from stable_baselines3.common.env_util import make_vec_env

vec_env = make_vec_env(GoLeftEnv, n_envs=1, env_kwargs=dict(grid_size=10))

In [16]:
model = A2C("MlpPolicy", vec_env, verbose=1).learn(5000)

Using cpu device
-------------------------------------
| time/                 |           |
|    fps                | 581       |
|    iterations         | 100       |
|    time_elapsed       | 0         |
|    total_timesteps    | 500       |
| train/                |           |
|    entropy_loss       | -0.692    |
|    explained_variance | -119      |
|    learning_rate      | 0.0007    |
|    n_updates          | 99        |
|    policy_loss        | -0.000661 |
|    value_loss         | 1.16e-06  |
-------------------------------------
------------------------------------
| rollout/              |          |
|    ep_len_mean        | 25.5     |
|    ep_rew_mean        | 1        |
| time/                 |          |
|    fps                | 729      |
|    iterations         | 200      |
|    time_elapsed       | 1        |
|    total_timesteps    | 1000     |
| train/                |          |
|    entropy_loss       | -0.361   |
|    explained_variance | 0.0631   |
|    le

In [20]:
obs = vec_env.reset()
n_steps = 20
for step in range(n_steps):
    action, _ = model.predict(obs, deterministic=True)
    print(f"Step {step + 1}")
    print("Action:", action)
    obs, reward, done, info = vec_env.step(action)
    print("obs=", obs, "reward=", reward, "done=", done)
    vec_env.render()
    if done:
        print("Goal reched!", "reward=", reward)
        break

Step 1
Action: [0]
obs= [[8.]] reward= [0.] done= [False]
........x..
Step 2
Action: [0]
obs= [[7.]] reward= [0.] done= [False]
.......x...
Step 3
Action: [0]
obs= [[6.]] reward= [0.] done= [False]
......x....
Step 4
Action: [0]
obs= [[5.]] reward= [0.] done= [False]
.....x.....
Step 5
Action: [0]
obs= [[4.]] reward= [0.] done= [False]
....x......
Step 6
Action: [0]
obs= [[3.]] reward= [0.] done= [False]
...x.......
Step 7
Action: [0]
obs= [[2.]] reward= [0.] done= [False]
..x........
Step 8
Action: [0]
obs= [[1.]] reward= [0.] done= [False]
.x.........
Step 9
Action: [0]
obs= [[9.]] reward= [1.] done= [ True]
.........x.
Goal reched! reward= [1.]
