<a href="https://colab.research.google.com/github/skywalker0803r/deep-learning-ian-goodfellow/blob/master/custom_gym_env.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Stable Baselines only supports tensorflow 1.x for now
%tensorflow_version 1.x
!pip install stable-baselines[mpi]==2.10.0

TensorFlow 1.x selected.


In [2]:
import numpy as np
import gym
from gym import spaces
from stable_baselines.common.env_checker import check_env
from stable_baselines import DQN, PPO2, A2C, ACKTR
from stable_baselines.common.cmd_util import make_vec_env
import pandas as pd

The TensorFlow contrib module will not be included in TensorFlow 2.0.
For more information, please see:
  * https://github.com/tensorflow/community/blob/master/rfcs/20180907-contrib-sunset.md
  * https://github.com/tensorflow/addons
  * https://github.com/tensorflow/io (for I/O related ops)
If you depend on functionality not listed there, please file an issue.



##  def custom Env

In [3]:
class GoLeftEnv(gym.Env):
  """
  Custom Environment that follows gym interface.
  This is a simple env where the agent must learn to go always left. 
  """
  # Because of google colab, we cannot implement the GUI ('human' render mode)
  metadata = {'render.modes': ['console']}
  # Define constants for clearer code
  LEFT = 0
  RIGHT = 1

  def __init__(self, grid_size=10):
    super(GoLeftEnv, self).__init__()

    # Size of the 1D-grid
    self.grid_size = grid_size
    # Initialize the agent at the right of the grid
    self.agent_pos = grid_size - 1

    # Define action and observation space
    # They must be gym.spaces objects
    # Example when using discrete actions, we have two: left and right
    n_actions = 2
    self.action_space = spaces.Discrete(n_actions)
    # The observation will be the coordinate of the agent
    # this can be described both by Discrete and Box space
    self.observation_space = spaces.Box(low=0, high=self.grid_size,
                                        shape=(1,), dtype=np.float32)

  def reset(self):
    """
    Important: the observation must be a numpy array
    :return: (np.array) 
    """
    # Initialize the agent at the right of the grid
    self.agent_pos = self.grid_size - 1
    # here we convert to float32 to make it more general (in case we want to use continuous actions)
    return np.array([self.agent_pos]).astype(np.float32)

  def step(self, action):
    if action == self.LEFT:
      self.agent_pos -= 1
    elif action == self.RIGHT:
      self.agent_pos += 1
    else:
      raise ValueError("Received invalid action={} which is not part of the action space".format(action))

    # Account for the boundaries of the grid
    self.agent_pos = np.clip(self.agent_pos, 0, self.grid_size)

    # Are we at the left of the grid?
    done = bool(self.agent_pos == 0)

    # Null reward everywhere except when reaching the goal (left of the grid)
    reward = 1 if self.agent_pos == 0 else 0

    # Optionally we can pass additional info, we are not using that for now
    info = {}

    return np.array([self.agent_pos]).astype(np.float32), reward, done, info

  def render(self, mode='console'):
    if mode != 'console':
      raise NotImplementedError()
    # agent is represented as a cross, rest as a dot
    print("." * self.agent_pos, end="")
    print("x", end="")
    print("." * (self.grid_size - self.agent_pos))

  def close(self):
    pass
    

# Validate the environment

In [4]:
env = GoLeftEnv()
check_env(env,warn=True)

# train

In [5]:
env = GoLeftEnv(grid_size=10)
env = make_vec_env(lambda: env, n_envs=1)

In [6]:
# Train the agent
model = DQN('MlpPolicy', env, verbose=1).learn(5000)







Instructions for updating:
Use keras.layers.flatten instead.
Instructions for updating:
Please use `layer.__call__` method instead.

Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where








--------------------------------------
| % time spent exploring  | 2        |
| episodes                | 100      |
| mean 100 episode reward | 1        |
| steps                   | 1168     |
--------------------------------------
--------------------------------------
| % time spent exploring  | 2        |
| episodes                | 200      |
| mean 100 episode reward | 1        |
| steps                   | 2090     |
--------------------------------------
--------------------------------------
| % time spent exploring  | 2        |
| episodes                | 300      |
| mean 100 episode reward | 1        |
| steps                   | 3014     |
--------------------------------------
--------------------------------------
| % time spent exp

In [7]:
# Test the trained agent
obs = env.reset()
n_steps = 20
for step in range(n_steps):
  action, _ = model.predict(obs, deterministic=True)
  print("Step {}".format(step + 1))
  print("Action: ", action)
  obs, reward, done, info = env.step(action)
  print('obs=', obs, 'reward=', reward, 'done=', done)
  env.render(mode='console')
  if done:
    # Note that the VecEnv resets automatically
    # when a done signal is encountered
    print("Goal reached!", "reward=", reward)
    break

Step 1
Action:  [0]
obs= [[8.]] reward= [0.] done= [False]
........x..
Step 2
Action:  [0]
obs= [[7.]] reward= [0.] done= [False]
.......x...
Step 3
Action:  [0]
obs= [[6.]] reward= [0.] done= [False]
......x....
Step 4
Action:  [0]
obs= [[5.]] reward= [0.] done= [False]
.....x.....
Step 5
Action:  [0]
obs= [[4.]] reward= [0.] done= [False]
....x......
Step 6
Action:  [0]
obs= [[3.]] reward= [0.] done= [False]
...x.......
Step 7
Action:  [0]
obs= [[2.]] reward= [0.] done= [False]
..x........
Step 8
Action:  [0]
obs= [[1.]] reward= [0.] done= [False]
.x.........
Step 9
Action:  [0]
obs= [[9.]] reward= [1.] done= [ True]
.........x.
Goal reached! reward= [1.]


# collect replay_buffer data

In [8]:
state,action,reward,next_state,done = model.replay_buffer.sample(3000)
print(state.shape)
print(action.shape)
print(reward.shape)
print(next_state.shape)
print(done.shape)

(3000, 1)
(3000,)
(3000,)
(3000, 1)
(3000,)


# netrnnetwork approach

In [9]:
from sklearn import neural_network
from sklearn.metrics import accuracy_score

'''
next_state = T(state,action)
reward = R(state,action)
done = D(state)
'''

def get_model(x,y):
  model = neural_network.MLPClassifier(hidden_layer_sizes=(128,128,128),activation='relu')
  model.fit(x,y)
  y_hat = model.predict(x)
  print(accuracy_score(y,y_hat))
  return model

# T model
x = np.hstack((state,action.reshape(-1,1)))
y = next_state
T = get_model(x,y)

# R model
x = np.hstack((state,action.reshape(-1,1)))
y = reward
R = get_model(x,y)

# D model
x = state
y = done
D = get_model(x,y)

  y = column_or_1d(y, warn=True)


1.0
1.0
0.9983333333333333


In [10]:
class GoLeftEnv(gym.Env):
  metadata = {'render.modes': ['console']}
  LEFT = 0
  RIGHT = 1
  def __init__(self,grid_size=10,T=None,R=None,D=None):
    super(GoLeftEnv, self).__init__()
    self.grid_size = grid_size
    self.agent_pos = grid_size - 1
    n_actions = 2
    self.action_space = spaces.Discrete(n_actions)
    self.observation_space = spaces.Box(low=0, high=self.grid_size,
                                        shape=(1,), dtype=np.float32)
    
    # models
    self.T = T
    self.R = R
    self.D = D

  def reset(self):
    self.agent_pos = self.grid_size - 1
    return np.array([self.agent_pos]).astype(np.float32)

  def step(self, action):
    state = np.array([self.agent_pos]).astype(np.float32).reshape(-1,1)
    action = np.array([action]).reshape(-1,1)
    self.agent_pos = self.T.predict(np.hstack((state,action)))[0]
    self.agent_pos = np.clip(self.agent_pos, 0, self.grid_size)
    done = bool(self.D.predict(state)[0])
    reward = self.R.predict(np.hstack((state,action)))[0]
    info = {}
    return np.array([self.agent_pos]).astype(np.float32), reward, done, info

  def render(self, mode='console'):
    if mode != 'console':
      raise NotImplementedError()
    # agent is represented as a cross, rest as a dot
    print("." * int(self.agent_pos), end="")
    print("x", end="")
    print("." * (self.grid_size - int(self.agent_pos)))

  def close(self):
    pass
    

In [11]:
env = GoLeftEnv(grid_size=10,T=T,R=R,D=D)
check_env(env,warn=True)

In [12]:
env = make_vec_env(lambda: env, n_envs=1)
model = DQN('MlpPolicy', env, verbose=1).learn(5000)

--------------------------------------
| % time spent exploring  | 2        |
| episodes                | 100      |
| mean 100 episode reward | 1        |
| steps                   | 1929     |
--------------------------------------
--------------------------------------
| % time spent exploring  | 2        |
| episodes                | 200      |
| mean 100 episode reward | 1        |
| steps                   | 2841     |
--------------------------------------
--------------------------------------
| % time spent exploring  | 2        |
| episodes                | 300      |
| mean 100 episode reward | 1        |
| steps                   | 3753     |
--------------------------------------
--------------------------------------
| % time spent exploring  | 2        |
| episodes                | 400      |
| mean 100 episode reward | 1        |
| steps                   | 4663     |
--------------------------------------


In [13]:
# Test the trained agent
obs = env.reset()
n_steps = 20
for step in range(n_steps):
  action, _ = model.predict(obs, deterministic=True)
  print("Step {}".format(step + 1))
  print("Action: ", action)
  obs, reward, done, info = env.step(action)
  print('obs=', obs, 'reward=', reward, 'done=', done)
  env.render(mode='console')
  if done:
    # Note that the VecEnv resets automatically
    # when a done signal is encountered
    print("Goal reached!", "reward=", reward)
    break

Step 1
Action:  [0]
obs= [[8.]] reward= [0.] done= [False]
........x..
Step 2
Action:  [0]
obs= [[7.]] reward= [0.] done= [False]
.......x...
Step 3
Action:  [0]
obs= [[6.]] reward= [0.] done= [False]
......x....
Step 4
Action:  [0]
obs= [[5.]] reward= [0.] done= [False]
.....x.....
Step 5
Action:  [0]
obs= [[4.]] reward= [0.] done= [False]
....x......
Step 6
Action:  [0]
obs= [[3.]] reward= [0.] done= [False]
...x.......
Step 7
Action:  [0]
obs= [[2.]] reward= [0.] done= [False]
..x........
Step 8
Action:  [0]
obs= [[1.]] reward= [0.] done= [False]
.x.........
Step 9
Action:  [0]
obs= [[9.]] reward= [1.] done= [ True]
.........x.
Goal reached! reward= [1.]


'\nStep 1\nAction:  [0]\nobs= [[8.]] reward= [0.] done= [False]\n'