In [6]:
import numpy as np
import gym
from stable_baselines3.common.env_checker import check_env
from csv import DictReader

class FSMEnv(gym.Env):
  """
  Custom Environment that follows gym interface.
  This is a simple env where the agent must learn to go always left. 
  """
  # Because of google colab, we cannot implement the GUI ('human' render mode)
  metadata = {'render.modes': ['console']}

  def __init__(self, file_name, start_state_id=None):
    super(FSMEnv, self).__init__()

    # The name of the .csv file containing the FSM
    self.file_name = file_name
    self.start_state_id = start_state_id

    if start_state_id is None:
      preset_start_state = False
    else:
      preset_start_state = True
      self.start_state_id = start_state_id

    if preset_start_state == False:  
      start_state_already_set = False
      multiple_start_states = False

      with open(self.file_name, 'r') as file:
        # pass the file object to DictReader() to get the DictReader object
        csv_dict_reader = DictReader(file)
        # iterate over each line as a ordered dictionary
        for row in csv_dict_reader:
          # row variable is a dictionary that represents a row in csv
          if row['Start state'] == 1:
            if start_state_already_set == False:
              start_state_already_set = True
              self.start_state_id = row['Unique ID']
            else:
              multiple_start_states = True
              start_states = []
              start_states.append(row['Unique ID'])
      
        if multiple_start_states == True:
          print("Multiple start states have been found!")
          correct_start_state_provided = False

          while correct_start_state_provided == False:
            print("Please choose one of the found start states:")
            for id in start_states:
              print(id)

            start_state_id = input("Provide an unique ID: ")
            if start_state_id not in start_states:
              print("ID provided not in the list!")
            
            else:
              self.start_state_id = start_state_id
              correct_start_state_provided = True            

    # Define action and observation space
    # They must be gym.spaces objects
    # Example when using discrete actions, we have two: left and right
    n_actions = 2
    self.action_space = spaces.Discrete(n_actions)
    # The observation will be the coordinate of the agent
    # this can be described both by Discrete and Box space
    self.observation_space = 0

  def reset(self):
    """
    Important: the observation must be a numpy array
    :return: (np.array) 
    """
    # Initialize the agent at the right of the grid
    self.agent_pos = self.grid_size - 1
    # here we convert to float32 to make it more general (in case we want to use continuous actions)
    return np.array([self.agent_pos]).astype(np.float32)

  def step(self, action):
    if action == self.LEFT:
      self.agent_pos -= 1
    elif action == self.RIGHT:
      self.agent_pos += 1
    else:
      raise ValueError("Received invalid action={} which is not part of the action space".format(action))

    # Account for the boundaries of the grid
    self.agent_pos = np.clip(self.agent_pos, 0, self.grid_size)

    # Are we at the left of the grid?
    done = bool(self.agent_pos == 0)

    # Null reward everywhere except when reaching the goal (left of the grid)
    reward = 1 if self.agent_pos == 0 else 0

    # Optionally we can pass additional info, we are not using that for now
    info = {}

    return np.array([self.agent_pos]).astype(np.float32), reward, done, info

  def render(self, mode='console'):
    if mode != 'console':
      raise NotImplementedError()
    # agent is represented as a cross, rest as a dot
    print("." * self.agent_pos, end="")
    print("x", end="")
    print("." * (self.grid_size - self.agent_pos))

  def close(self):
    pass

In [2]:
env = gym.make('CartPole-v0')
env.reset()
for _ in range(1000):
    env.render()
    env.step(env.action_space.sample()) # take a random action
env.close()

KeyboardInterrupt: 

In [5]:
import gym

from stable_baselines3 import PPO

env = gym.make('CartPole-v0')

model = PPO('MlpPolicy', env, verbose=1)
model.learn(total_timesteps=10000)

obs = env.reset()
for i in range(1000):
    action, _state = model.predict(obs, deterministic=True)
    obs, reward, done, info = env.step(action)
    env.render()
    if done:
      obs = env.reset()

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 23.8     |
|    ep_rew_mean     | 23.8     |
| time/              |          |
|    fps             | 2697     |
|    iterations      | 1        |
|    time_elapsed    | 0        |
|    total_timesteps | 2048     |
---------------------------------


In [7]:
env = FSMEnv('pelican.csv')
# If the environment don't follow the interface, an error will be thrown
check_env(env, warn=True)

{'Unique ID': '0', 'State name': 'S0', 'Start state': '1', 'Possible discrete actions (transitions)': '2', 'Transition names': '0 1', 'Transitions to states': '0 1'}
{'Unique ID': '1', 'State name': 'S1', 'Start state': '1', 'Possible discrete actions (transitions)': '2', 'Transition names': '0 1', 'Transitions to states': '4 2'}
{'Unique ID': '2', 'State name': 'S2', 'Start state': '0', 'Possible discrete actions (transitions)': '2', 'Transition names': '0 1', 'Transitions to states': '5 3'}
{'Unique ID': '3', 'State name': 'S3', 'Start state': '1', 'Possible discrete actions (transitions)': '2', 'Transition names': '0 1', 'Transitions to states': '4 2'}
{'Unique ID': '4', 'State name': 'S4', 'Start state': '0', 'Possible discrete actions (transitions)': '2', 'Transition names': '0 1', 'Transitions to states': '5 3'}
{'Unique ID': '5', 'State name': 'S5', 'Start state': '1', 'Possible discrete actions (transitions)': '2', 'Transition names': '0 1', 'Transitions to states': '0 1'}


NameError: name 'spaces' is not defined

In [3]:
env = GoLeftEnv(grid_size=10)

obs = env.reset()
env.render()

print(env.observation_space)
print(env.action_space)
print(env.action_space.sample())

GO_LEFT = 0
# Hardcoded best agent: always go left!
n_steps = 20
for step in range(n_steps):
  print("Step {}".format(step + 1))
  obs, reward, done, info = env.step(GO_LEFT)
  print('obs=', obs, 'reward=', reward, 'done=', done)
  env.render()
  if done:
    print("Goal reached!", "reward=", reward)
    break

.........x.
Box(0.0, 10.0, (1,), float32)
Discrete(2)
1
Step 1
obs= [8.] reward= 0 done= False
........x..
Step 2
obs= [7.] reward= 0 done= False
.......x...
Step 3
obs= [6.] reward= 0 done= False
......x....
Step 4
obs= [5.] reward= 0 done= False
.....x.....
Step 5
obs= [4.] reward= 0 done= False
....x......
Step 6
obs= [3.] reward= 0 done= False
...x.......
Step 7
obs= [2.] reward= 0 done= False
..x........
Step 8
obs= [1.] reward= 0 done= False
.x.........
Step 9
obs= [0.] reward= 1 done= True
x..........
Goal reached! reward= 1


In [4]:
from stable_baselines3 import DQN, PPO, A2C
from stable_baselines3.common.cmd_util import make_vec_env

# Instantiate the env
env = GoLeftEnv(grid_size=10)
# wrap it
env = make_vec_env(lambda: env, n_envs=1)



In [5]:
# Train the agent
model = PPO('MlpPolicy', env, verbose=1).learn(5000)

Using cpu device
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 138      |
|    ep_rew_mean      | 1        |
|    exploration rate | 0.05     |
| time/               |          |
|    episodes         | 4        |
|    fps              | 17244    |
|    time_elapsed     | 0        |
|    total timesteps  | 552      |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 161      |
|    ep_rew_mean      | 1        |
|    exploration rate | 0.05     |
| time/               |          |
|    episodes         | 8        |
|    fps              | 17428    |
|    time_elapsed     | 0        |
|    total timesteps  | 1290     |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 159      |
|    ep_rew_mean      | 1        |
|    exploration rate | 0.05     |
| time/               |          |
|  

In [6]:
# Test the trained agent
obs = env.reset()
n_steps = 20
for step in range(n_steps):
  action, _ = model.predict(obs, deterministic=True)
  print("Step {}".format(step + 1))
  print("Action: ", action)
  obs, reward, done, info = env.step(action)
  print('obs=', obs, 'reward=', reward, 'done=', done)
  env.render(mode='console')
  if done:
    # Note that the VecEnv resets automatically
    # when a done signal is encountered
    print("Goal reached!", "reward=", reward)
    break

Step 1
Action:  [1]
obs= [[10.]] reward= [0.] done= [False]
..........x
Step 2
Action:  [1]
obs= [[10.]] reward= [0.] done= [False]
..........x
Step 3
Action:  [1]
obs= [[10.]] reward= [0.] done= [False]
..........x
Step 4
Action:  [1]
obs= [[10.]] reward= [0.] done= [False]
..........x
Step 5
Action:  [1]
obs= [[10.]] reward= [0.] done= [False]
..........x
Step 6
Action:  [1]
obs= [[10.]] reward= [0.] done= [False]
..........x
Step 7
Action:  [1]
obs= [[10.]] reward= [0.] done= [False]
..........x
Step 8
Action:  [1]
obs= [[10.]] reward= [0.] done= [False]
..........x
Step 9
Action:  [1]
obs= [[10.]] reward= [0.] done= [False]
..........x
Step 10
Action:  [1]
obs= [[10.]] reward= [0.] done= [False]
..........x
Step 11
Action:  [1]
obs= [[10.]] reward= [0.] done= [False]
..........x
Step 12
Action:  [1]
obs= [[10.]] reward= [0.] done= [False]
..........x
Step 13
Action:  [1]
obs= [[10.]] reward= [0.] done= [False]
..........x
Step 14
Action:  [1]
obs= [[10.]] reward= [0.] done= [False]