<a href="https://colab.research.google.com/github/jaroslav87/OpenAIOneAxisVehicle/blob/master/gym_stableBaseline3_moving_mass_DQN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Stable Baselines 3 for tnesorflow 2.x

%tensorflow_version 2.x
#!pip install stable-baselines[mpi]==2.10.0

!pip install stable-baselines3



In [26]:
import numpy as np

print((np.random.rand(1)-0.5).item()*100.0)

-38.6080788678007


In [3]:
import numpy as np
import gym
from gym import spaces


class MovingMassEnv(gym.Env):
  """
  Custom Environment that follows gym interface.
  This is a simple env where the force is acting on moving mass. 
  The agent must learn how to use the force to reach the set position on X axis. 
  It is assumed that only by using forces +1 -1 and 0 N we can 
  """
  # Because of google colab, we cannot implement the GUI ('human' render mode)
  metadata = {'render.modes': ['console']}
 

  def __init__(self, grid_size=100):
    super(MovingMassEnv, self).__init__()

    self.stepp = 0
    # Size of the 1D-grid
    self.grid_size = grid_size
    self.grid_size_max = grid_size*10
    # Initialize the agent at the right of the grid with zero vel and acc
    self.agent_pos = grid_size - 20
    self.agent_v = 0.0 #speed equals to zero
    self.agent_a = 0.0  # accelertion equal to zero

    #model/sim params:
    self.m = 1.0 # in kg
    self.delta = 0.05 # friction coefficient
    self.dt = 0.1 # in s
    self.max_v = 10 # m/s
    self.f_max = 5 # N

    # Define action and observation space
    # They must be gym.spaces objects
    # Example when using discrete actions, we have three: zero, left and right
    n_actions = 3
    action_high = np.array([self.f_max]) #f
    action_low = np.array([-self.f_max])
    self.action_space = spaces.Box(low=action_low, 
                                        high=action_high)
    # The observation will be the coordinate of the agent/model and its speed
    # this can be described both by Discrete and Box space
    observation_high = np.array([self.grid_size_max, self.max_v]) #pos
    observation_low = np.array([-self.grid_size_max, -self.max_v])

    self.observation_space = spaces.Box(low=observation_low, 
                                        high=observation_high)

  def reset(self):
    """
    Important: the observation must be a numpy array
    :return: (np.array) 
    """
    # Initialize the agent at the right of the grid with zero vel and acc
    self.agent_pos = self.grid_size-120
    self.agent_v = 0.0
    self.agent_a = 0.0

    self.stepp = 0

    # here we convert to float32 to make it more general (in case we want to use continuous actions)
    return np.array([self.agent_pos, self.agent_v]).astype(np.float32)

  def step(self, action, set_pos = 0):
    
    f = action[0]

    # model execution:
    ff = self.delta * self.agent_v # friction force
    self.agent_a = (f - ff)/self.m # acceleartion
    self.agent_v = self.agent_v + self.agent_a * self.dt
    self.agent_pos = self.agent_pos + self.agent_v * self.dt
    # Account for the boundaries of the grid and vel
    self.agent_pos = np.clip(self.agent_pos,-self.grid_size, self.grid_size)
    self.agent_v = np.clip(self.agent_v, -self.max_v, self.max_v)

    # Are we at the left of the grid?
    target = set_pos#(np.random.rand(1)-0.5).item()*self.grid_size
    d = np.abs(self.agent_pos-target)
    v = np.abs(self.agent_v)

    done = bool(d < 0.01 and v < 0.01)

    # reward
    
    reward =-d #- self.stepp/10000#- v# - self.stepp/10000
    
    self.stepp += 1
    # Optionally we can pass additional info, we are not using that for now
    info = {}

    return np.array([self.agent_pos, self.agent_v]).astype(np.float32), reward, done, info

  def render(self, mode='console'):
    if mode != 'console':
      raise NotImplementedError()
    # agent is represented as a cross, rest as a dot
    #print("." * self.agent_pos, end="")
    #print("x", end="")
    #print("." * (self.grid_size - self.agent_pos))

  def close(self):
    pass
    

### Validate the environment

Stable Baselines provides a [helper](https://stable-baselines.readthedocs.io/en/master/common/env_checker.html) to check that your environment follows the Gym interface. It also optionally checks that the environment is compatible with Stable-Baselines (and emits warning if necessary).

In [5]:
from stable_baselines3.common.env_checker import check_env

In [6]:
env = MovingMassEnv()
# If the environment don't follow the interface, an error will be thrown
check_env(env, warn=True)

  "We recommend you to use a symmetric and normalized Box action space (range=[-1, 1]) "


### Testing the environment

In [7]:
env = MovingMassEnv(grid_size=100)

obs = env.reset()
env.render()

print(env.observation_space)
print(env.action_space)
print(env.action_space.sample())

force = [-0.5]
# Hardcoded best agent: always go left!
n_steps = 50
for step in range(n_steps):
  print("Step {}".format(step + 1))
  obs, reward, done, info = env.step(force, 20)
  print('obs=', obs, 'reward=', reward, 'done=', done)
  env.render()
  if done:
    print("Goal reached!", "reward=", reward)
    break

Box(2,)
Box(1,)
[1.2466291]
Step 1
obs= [-20.005  -0.05 ] reward= -40.004999999999995 done= False
Step 2
obs= [-20.014975  -0.09975 ] reward= -40.014975 done= False
Step 3
obs= [-20.0299      -0.14925125] reward= -40.029900125 done= False
Step 4
obs= [-20.049751  -0.198505] reward= -40.049750624375 done= False
Step 5
obs= [-20.074501    -0.24751247] reward= -40.074501871253126 done= False
Step 6
obs= [-20.10413    -0.2962749] reward= -40.10412936189686 done= False
Step 7
obs= [-20.138609    -0.34479353] reward= -40.13860871508737 done= False
Step 8
obs= [-20.177916    -0.39306957] reward= -40.17791567151194 done= False
Step 9
obs= [-20.222027   -0.4411042] reward= -40.22202609315438 done= False
Step 10
obs= [-20.270916   -0.4888987] reward= -40.270915962688605 done= False
Step 11
obs= [-20.324562   -0.5364542] reward= -40.32456138287516 done= False
Step 12
obs= [-20.382938    -0.58377194] reward= -40.38293857596079 done= False
Step 13
obs= [-20.446024    -0.63085306] reward= -40.446023



### Try it with Stable-Baselines

Once your environment follow the gym interface, it is quite easy to plug in any algorithm from stable-baselines

In [15]:
from stable_baselines3 import DQN, A2C, DDPG, PPO
from stable_baselines3.common.cmd_util import make_vec_env

# Instantiate the env
env = MovingMassEnv(grid_size=100)
# wrap it
#env = make_vec_env(lambda: env, n_envs=1)
env.configure(m = 2.0)



AttributeError: ignored

In [9]:
# Train the agent
model = PPO('MlpPolicy', env, n_steps=256, verbose=0).learn(102400) #ACKTR('MlpPolicy', env, verbose=1).learn(100000)
#DDPG('MlpPolicy', env, verbose=1).learn(100000)

In [13]:
# Test the trained agent

import matplotlib.image  as mpimg
import matplotlib.pyplot as plt


obs = env.reset()
n_steps = 9000
pos_agent = []
v_agent = []
pos_time = []
for step in range(n_steps):
  action, _ = model.predict(obs, deterministic=True)
  #print("Step {}".format(step + 1))
  #print("Action: ", action)
  obs, reward, done, info = env.step(action,0)
  pos_agent.append(obs[0])
  v_agent.append(obs[1])
  pos_time.append(step + 1)
  if step>8900:
    print('obs=', obs, 'reward=', reward, 'done=', done)
  env.render(mode='console')
  if done:
    # Note that the VecEnv resets automatically
    # when a done signal is encountered
    print("Goal reached!", "reward=", reward)
    break

plt.plot(pos_time, pos_agent, 'r')
plt.title('Agent position')
plt.xlabel("steps")
plt.ylabel("Position")
plt.legend(["Pos"])

plt.figure()

plt.plot(pos_time, v_agent, 'r')
plt.title('Agent vel')
plt.xlabel("steps")
plt.ylabel("Velocity")
plt.legend(["vel"])

plt.figure()

TypeError: ignored