<a href="https://colab.research.google.com/github/phoenixSP/Gridworld-RL/blob/master/extra_credit_hw4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
# Stable Baselines only supports tensorflow 1.x for now
%tensorflow_version 1.x
!pip install stable-baselines[mpi]==2.10.0

In [0]:
import numpy as np
import gym
from gym import spaces


In [0]:
class GridWorld4x4(gym.Env):
  metadata = {'render.modes': ['console']}

  # Define constants for clearer code
  LEFT = 2
  RIGHT = 3
  UP = 0
  DOWN = 1

  def __init__(self, start, terminal_win, terminal_lose, obstacles):
    super(GridWorld4x4, self).__init__()
    self.actions = ["UP", "DOWN",  "LEFT", "RIGHT"] #is this required ???
    n_actions = 4


    self.grid_size = 4
    self.start = start
    self.agent_pos = self.start
    self.obstacles = obstacles
    self.determine = False
    self.action_space = spaces.Discrete(n_actions)
    self.terminal_states = [terminal_win, terminal_lose]

    self.observation_space = spaces.Box(low = 0, high = 5, shape = (4,4,1), dtype = np.float32)


  def reset(self):
    """
    Important: the observation must be a numpy array
    :return: (np.array) 
    """

    self.agent_pos = self.start   
    return self.generate_state()

  def _choose_action_probability(self, action):
    if action == 0:
        return np.random.choice(["UP", "LEFT", "RIGHT"], p=[0.8, 0.1, 0.1])
    if action == 1:
        return np.random.choice(["DOWN", "LEFT", "RIGHT"], p=[0.8, 0.1, 0.1])
    if action == 2:
        return np.random.choice(["LEFT", "UP", "DOWN"], p=[0.8, 0.1, 0.1])
    if action == 3:
        return np.random.choice(["RIGHT", "UP", "DOWN"], p=[0.8, 0.1, 0.1])

  def next_position(self, action):

    action = self._choose_action_probability(action)

    if action == "UP":
        nxtState = (self.agent_pos[0] - 1, self.agent_pos[1])
    elif action == "DOWN":
        nxtState = (self.agent_pos[0] + 1, self.agent_pos[1])
    elif action == "LEFT":
        nxtState = (self.agent_pos[0], self.agent_pos[1] - 1)
    elif action == "RIGHT":
        nxtState = (self.agent_pos[0], self.agent_pos[1] + 1)

    # if next state is legal
    if not self.is_obstacle_or_wall(nxtState):
      #print("Moving", nxtState)
      return nxtState

    #print("Not moving", self.agent_pos)
    return self.agent_pos

  def is_obstacle_or_wall(self, state):
    if state[0] < 0 or state[0] >= self.grid_size:
      return True
    
    if state[1] < 0 or state[1] >= self.grid_size:
      return True

    if state in self.obstacles:
      return True

    return False

  def is_terminal(self):
    
    if self.agent_pos in self.terminal_states:
      return True
    else:
      return False

  def generate_state(self):

    grid = np.zeros(( self.grid_size, self.grid_size, 1))
    grid[self.agent_pos[0], self.agent_pos[1], 0] = 1

    for obs in self.obstacles:
      grid[obs[0], obs[1], 0] = 2

    for i, state in enumerate(self.terminal_states):
      if i == 0:
        grid[state[0], state[1], 0] = 5
      else:
        grid[state[0], state[1], 0] = 4
    
    return grid.astype(np.float32)


  def step(self, action):
    self.agent_pos = self.next_position(action)
    next_state = self.generate_state()

    done = self.is_terminal()

    info = {}
    goal_reward = 1
    lose_reward = -1
    step_reward = -0.1
    if self.agent_pos == self.terminal_states[0]:
      reward = goal_reward
    elif self.agent_pos == self.terminal_states[1]:
      reward = lose_reward
    else:
      reward = step_reward

    return next_state, reward, done, info

  def render(self, mode='console'):
    if mode != 'console':
      raise NotImplementedError()

    grid = np.zeros(( self.grid_size, self.grid_size))
    for obs in self.obstacles:
      grid[obs[0], obs[1]] = 2

    for i, state in enumerate(self.terminal_states):
      if i == 0:
        grid[state[0], state[1]] = 5
      else:
        grid[state[0], state[1]] = 4
    grid[self.agent_pos[0], self.agent_pos[1]] = 1
    print("Agent's position:", self.agent_pos[0], self.agent_pos[1])
    print(grid)

  def close(self):
    pass  

In [7]:
from stable_baselines.common.env_checker import check_env

The TensorFlow contrib module will not be included in TensorFlow 2.0.
For more information, please see:
  * https://github.com/tensorflow/community/blob/master/rfcs/20180907-contrib-sunset.md
  * https://github.com/tensorflow/addons
  * https://github.com/tensorflow/io (for I/O related ops)
If you depend on functionality not listed there, please file an issue.



In [59]:
start = (3,0)
terminal_win = (2,3)
terminal_lose = (1,3)
obstacles = [(1,1)]
env = GridWorld4x4(start, terminal_win, terminal_lose, obstacles)

# If the environment don't follow the interface, an error will be thrown
check_env(env, warn=True)



In [75]:
start = (3,0)
terminal_win = (2,3)
terminal_lose = (1,3)
obstacles = [(1,1)]
env = GridWorld4x4(start, terminal_win, terminal_lose, obstacles)

moves = [3,3,3,0]

for move in moves:
  print("Step {}".format(move + 1))
  obs, reward, done, info = env.step(move)
  print('reward=', reward, 'done=', done)
  env.render()
  if done:
    print("Goal reached!", "reward=", reward)
    break

Step 4
Moving (2, 0)
reward= 1 done= False
[[0. 0. 0. 0.]
 [0. 2. 0. 4.]
 [1. 0. 0. 5.]
 [0. 0. 0. 0.]]
Step 4
Moving (1, 0)
reward= 1 done= False
[[0. 0. 0. 0.]
 [1. 2. 0. 4.]
 [0. 0. 0. 5.]
 [0. 0. 0. 0.]]
Step 4
Not moving (1, 0)
reward= 1 done= False
[[0. 0. 0. 0.]
 [1. 2. 0. 4.]
 [0. 0. 0. 5.]
 [0. 0. 0. 0.]]
Step 1
Moving (0, 0)
reward= 1 done= False
[[1. 0. 0. 0.]
 [0. 2. 0. 4.]
 [0. 0. 0. 5.]
 [0. 0. 0. 0.]]




In [108]:
from stable_baselines import DQN, PPO2, A2C, ACKTR
from stable_baselines.common.cmd_util import make_vec_env

# Instantiate the env
start = (3,0)
terminal_win = (2,3)
terminal_lose = (1,3)
obstacles = [(1,1)]
env = GridWorld4x4(start, terminal_win, terminal_lose, obstacles)
# wrap it
env = make_vec_env(lambda: env, n_envs=1)



In [109]:
model = ACKTR('MlpPolicy', env, verbose=1).learn(5000)

---------------------------------
| explained_variance | -0.0447  |
| fps                | 16       |
| nupdates           | 1        |
| policy_entropy     | 1.39     |
| policy_loss        | -1.3     |
| total_timesteps    | 20       |
| value_loss         | 1.14     |
---------------------------------
---------------------------------
| ep_len_mean        | 6.21     |
| ep_reward_mean     | 0.279    |
| explained_variance | 0.55     |
| fps                | 482      |
| nupdates           | 100      |
| policy_entropy     | 0.42     |
| policy_loss        | -0.0878  |
| total_timesteps    | 2000     |
| value_loss         | 0.0455   |
---------------------------------
----------------------------------
| ep_len_mean        | 5.23      |
| ep_reward_mean     | 0.477     |
| explained_variance | 0.748     |
| fps                | 597       |
| nupdates           | 200       |
| policy_entropy     | 0.0717    |
| policy_loss        | -0.000416 |
| total_timesteps    | 4000      |
| val

In [110]:
# Test the trained agent
obs = env.reset()
n_steps = 20
for step in range(n_steps):
  action, _ = model.predict(obs, deterministic=True)
  print("Step {}".format(step + 1))
  print("Action: ", action)
  obs, reward, done, info = env.step(action)
  print('reward=', reward, 'done=', done)
  env.render(mode='console')
  if done:
    # Note that the VecEnv resets automatically when a done signal is encountered, so that final state is not printed
    print("Goal reached!", "reward=", reward)
    break

Step 1
Action:  [3]
reward= [-0.1] done= [False]
Agent's position: 3 1
[[0. 0. 0. 0.]
 [0. 2. 0. 4.]
 [0. 0. 0. 5.]
 [0. 1. 0. 0.]]
Step 2
Action:  [3]
reward= [-0.1] done= [False]
Agent's position: 3 2
[[0. 0. 0. 0.]
 [0. 2. 0. 4.]
 [0. 0. 0. 5.]
 [0. 0. 1. 0.]]
Step 3
Action:  [3]
reward= [-0.1] done= [False]
Agent's position: 3 3
[[0. 0. 0. 0.]
 [0. 2. 0. 4.]
 [0. 0. 0. 5.]
 [0. 0. 0. 1.]]
Step 4
Action:  [0]
reward= [-0.1] done= [False]
Agent's position: 3 2
[[0. 0. 0. 0.]
 [0. 2. 0. 4.]
 [0. 0. 0. 5.]
 [0. 0. 1. 0.]]
Step 5
Action:  [3]
reward= [-0.1] done= [False]
Agent's position: 3 2
[[0. 0. 0. 0.]
 [0. 2. 0. 4.]
 [0. 0. 0. 5.]
 [0. 0. 1. 0.]]
Step 6
Action:  [3]
reward= [-0.1] done= [False]
Agent's position: 3 3
[[0. 0. 0. 0.]
 [0. 2. 0. 4.]
 [0. 0. 0. 5.]
 [0. 0. 0. 1.]]
Step 7
Action:  [0]
reward= [1.] done= [ True]
Agent's position: 3 0
[[0. 0. 0. 0.]
 [0. 2. 0. 4.]
 [0. 0. 0. 5.]
 [1. 0. 0. 0.]]
Goal reached! reward= [1.]
