In [1]:
import numpy as np
import gymnasium as gym
from gymnasium import spaces
from stable_baselines3 import PPO, A2C, DQN
from stable_baselines3.common.env_checker import check_env
from stable_baselines3.common.env_util import make_vec_env

In [2]:
class GoLeftEnv(gym.Env):
    metadata = {"render_modes": ["console"]}

    LEFT = 0
    RIGHT = 1

    def __init__(self, grid_size=10, render_mode="console"):
        super(GoLeftEnv, self).__init__()
        self.render_mode = render_mode

        self.grid_size = grid_size
        self.agent_pos = grid_size - 1

        n_actions = 2
        self.action_space = spaces.Discrete(n_actions)

        self.observation_space = spaces.Box(
            low=0,
            high=self.grid_size,
            shape=(1,),
            dtype=np.float32
        )

    def reset(self, seed=None, options=None):
        super().reset(seed=seed, options=options)

        self.agent_pos = self.grid_size - 1

        return np.array([self.agent_pos]).astype(np.float32), {}

    def step(self, action):
        if action == self.LEFT:
            self.agent_pos -= 1
        elif action == self.RIGHT:
            self.agent_pos += 1
        else:
            raise ValueError(f"Received inavlid action={action}")

        self.agent_pos = np.clip(self.agent_pos, 0, self.grid_size)

        terminated = bool(self.agent_pos == 0)
        truncated = False

        reward = 1 if self.agent_pos == 0 else 0

        info = {}

        return (
            np.array([self.agent_pos]).astype(np.float32),
            reward,
            terminated,
            truncated,
            info,
        )

    def render(self):
        if self.render_mode == "console":
            print("." * self.agent_pos, end="")
            print("X", end="")
            print("." * (self.grid_size - self.agent_pos - 1))

    def close(self):
        pass

In [3]:
env = GoLeftEnv()
check_env(env, warn=True)

In [4]:
env = GoLeftEnv(grid_size=10)

obs, _ = env.reset()
env.render()

print(env.observation_space)
print(env.action_space)
print(env.action_space.sample())

GO_LEFT = 0
n_steps = 20
for step in range(n_steps):
    print(f"Step {step + 1}")
    obs, reward, terminated, truncated, info = env.step(GO_LEFT)
    done = terminated or truncated
    print("obs=", obs, "reward=", reward, "done=", done)
    env.render()
    if done:
        print("Goal reached!", "reward=", reward)
        break

.........X
Box(0.0, 10.0, (1,), float32)
Discrete(2)
1
Step 1
obs= [8.] reward= 0 done= False
........X.
Step 2
obs= [7.] reward= 0 done= False
.......X..
Step 3
obs= [6.] reward= 0 done= False
......X...
Step 4
obs= [5.] reward= 0 done= False
.....X....
Step 5
obs= [4.] reward= 0 done= False
....X.....
Step 6
obs= [3.] reward= 0 done= False
...X......
Step 7
obs= [2.] reward= 0 done= False
..X.......
Step 8
obs= [1.] reward= 0 done= False
.X........
Step 9
obs= [0.] reward= 1 done= True
X.........
Goal reached! reward= 1


In [5]:
seed = int(np.random.randint(0, np.iinfo(np.uint32).max, dtype=np.uint32))
vec_env = make_vec_env(GoLeftEnv, n_envs=1, env_kwargs=dict(grid_size=10), seed=seed)

In [6]:
model = A2C("MlpPolicy", env,  verbose=1).learn(5000)

Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
------------------------------------
| rollout/              |          |
|    ep_len_mean        | 19.2     |
|    ep_rew_mean        | 1        |
| time/                 |          |
|    fps                | 363      |
|    iterations         | 100      |
|    time_elapsed       | 1        |
|    total_timesteps    | 500      |
| train/                |          |
|    entropy_loss       | -0.103   |
|    explained_variance | -0.59    |
|    learning_rate      | 0.0007   |
|    n_updates          | 99       |
|    policy_loss        | -0.00364 |
|    value_loss         | 0.0271   |
------------------------------------
-------------------------------------
| rollout/              |           |
|    ep_len_mean        | 12.6      |
|    ep_rew_mean        | 1         |
| time/                 |           |
|    fps                | 452       |
|    iterations         | 200       |
|    time_

In [7]:
obs = vec_env.reset()
n_steps = 20
for step in range(n_steps):
    action, _ = model.predict(obs, deterministic=True)
    print(f"Step {step + 1}")
    print("Action: ", action)
    obs, reward, done, info = vec_env.step(action)
    print("obs=", obs, "reward=", reward, "done=", done)
    vec_env.render()
    if done:
        print("Goal reached!", "reward=", reward)
        break

Step 1
Action:  [0]
obs= [[8.]] reward= [0.] done= [False]
........X.
Step 2
Action:  [0]
obs= [[7.]] reward= [0.] done= [False]
.......X..
Step 3
Action:  [0]
obs= [[6.]] reward= [0.] done= [False]
......X...
Step 4
Action:  [0]
obs= [[5.]] reward= [0.] done= [False]
.....X....
Step 5
Action:  [0]
obs= [[4.]] reward= [0.] done= [False]
....X.....
Step 6
Action:  [0]
obs= [[3.]] reward= [0.] done= [False]
...X......
Step 7
Action:  [0]
obs= [[2.]] reward= [0.] done= [False]
..X.......
Step 8
Action:  [0]
obs= [[1.]] reward= [0.] done= [False]
.X........
Step 9
Action:  [0]
obs= [[9.]] reward= [1.] done= [ True]
.........X
Goal reached! reward= [1.]
