In [4]:
!pip install gym pyvirtualdisplay > /dev/null 2>&1
!apt-get install -y xvfb python-opengl ffmpeg > /dev/null 2>&1

!git clone https://github.com/openai/gym.git
%cd gym
!pip install -e .
%cd ..

fatal: destination path 'gym' already exists and is not an empty directory.
/content/gym
Obtaining file:///content/gym
Installing collected packages: gym
  Attempting uninstall: gym
    Found existing installation: gym 0.18.0
    Can't uninstall 'gym'. No files were found to uninstall.
  Running setup.py develop for gym
Successfully installed gym-0.18.0
/content


In [4]:
# reinitialize the exec env
import gym
from gym.wrappers import Monitor
import glob
import io
import base64
from IPython.display import HTML
from pyvirtualdisplay import Display
from IPython import display as ipythondisplay

display = Display(visible=0, size=(1400, 900))
display.start()

"""
Utility functions to enable video recording of gym environment 
and displaying it.
To enable video, just do "env = wrap_env(env)""
"""

def show_video():
  mp4list = glob.glob('video/*.mp4')
  if len(mp4list) > 0:
    mp4 = mp4list[0]
    video = io.open(mp4, 'r+b').read()
    encoded = base64.b64encode(video)
    ipythondisplay.display(HTML(data='''<video alt="test" autoplay 
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
  else: 
    print("Could not find video")
    

def wrap_env(env):
  env = Monitor(env, './video', force=True)
  return env

**Agente** (Rede neural com uma única camada escondida de 128 neurônios)

In [5]:
from torch import nn

class Net(nn.Module):
  def __init__(self, obs_size, hidden_size, n_actions):
    super(Net, self).__init__()
    self.net = nn.Sequential(
      nn.Linear(obs_size, hidden_size),
      nn.ReLU(),
      nn.Linear(hidden_size, n_actions)
    )
  def forward(self, x):
    return self.net(x)

Função que irá gerar batches com episódios

In [6]:
def iterate_batches(env, net, batch_size):
  batch = []
  episode_reward = 0.0
  episode_steps = []
  obs = env.reset()
  sm = nn.Softmax(dim=1)
  while True:
    obs_v = torch.FloatTensor([obs])
    act_probs_v = sm(net(obs_v))
    act_probs = act_probs_v.data.numpy()[0]
    action = np.random.choice(len(act_probs), p=act_probs)
    next_obs, reward, done, _ = env.step(action)
    episode_reward += reward
    step = EpisodeStep(observation=obs,action=action)
    episode_steps.append(step)

    if done:
      e = Episode(reward=episode_reward,steps=episode_steps)
      batch.append(e)
      episode_reward = 0.0
      episode_steps = []
      next_obs = env.reset()

    if len(batch) == batch_size:
      yield batch
      batch = []
      
    obs = next_obs

Função que irá filtrar episódios "elite", com recompensas acima de um limiar obtido por um determinado percentil das recompensas acumuladas de todos os episódios

In [7]:
from collections import namedtuple

Episode = namedtuple('Episode', field_names=['reward', 'steps'])
EpisodeStep = namedtuple('EpisodeStep',field_names=['observation', 'action'])

def filter_batch(batch, percentile):
  rewards = list(map(lambda s: s.reward, batch))
  reward_bound = np.percentile(rewards, percentile)
  reward_mean = float(np.mean(rewards))

  train_obs = []
  train_act = []
  for reward, steps in batch:
    if reward < reward_bound:
      continue
    train_obs.extend(map(lambda step: step.observation,steps))
    train_act.extend(map(lambda step: step.action, steps))

  train_obs_v = torch.FloatTensor(train_obs)
  train_act_v = torch.LongTensor(train_act)
  return train_obs_v, train_act_v, reward_bound, reward_mean

In [20]:
import torch
import numpy as np
from torch import optim

HIDDEN_SIZE = 128
BATCH_SIZE = 16
PERCENTILE = 70

env = gym.make("CartPole-v0")
obs_size = env.observation_space.shape[0]
n_actions = env.action_space.n
net = Net(obs_size, HIDDEN_SIZE, n_actions)
objective = nn.CrossEntropyLoss()
optimizer = optim.Adam(params=net.parameters(), lr=0.01)

for iter_no, batch in enumerate(iterate_batches(env, net,BATCH_SIZE)):
  obs_v, acts_v, reward_b, reward_m = filter_batch(batch, PERCENTILE)
  optimizer.zero_grad()
  action_scores_v = net(obs_v)
  loss_v = objective(action_scores_v, acts_v)
  loss_v.backward()
  optimizer.step()
  print("%d: loss=%.3f, reward_mean=%.1f, rw_bound=%.1f" % (iter_no, loss_v.item(), reward_m, reward_b))
  if reward_m > 199:
    print("Solved!")
    break


0: loss=0.690, reward_mean=16.5, rw_bound=16.5
1: loss=0.687, reward_mean=20.7, rw_bound=21.5
2: loss=0.674, reward_mean=24.8, rw_bound=28.0
3: loss=0.658, reward_mean=33.2, rw_bound=44.0
4: loss=0.642, reward_mean=31.9, rw_bound=34.0
5: loss=0.635, reward_mean=33.9, rw_bound=36.0
6: loss=0.631, reward_mean=55.5, rw_bound=65.0
7: loss=0.626, reward_mean=44.9, rw_bound=53.0
8: loss=0.610, reward_mean=55.0, rw_bound=64.5
9: loss=0.621, reward_mean=56.9, rw_bound=66.5
10: loss=0.624, reward_mean=55.8, rw_bound=63.0
11: loss=0.602, reward_mean=67.3, rw_bound=78.5
12: loss=0.602, reward_mean=75.1, rw_bound=90.0
13: loss=0.597, reward_mean=65.8, rw_bound=77.0
14: loss=0.575, reward_mean=71.9, rw_bound=75.0
15: loss=0.585, reward_mean=73.2, rw_bound=86.0
16: loss=0.581, reward_mean=78.4, rw_bound=88.0
17: loss=0.575, reward_mean=97.5, rw_bound=105.0
18: loss=0.565, reward_mean=94.4, rw_bound=102.5
19: loss=0.576, reward_mean=83.7, rw_bound=106.5
20: loss=0.574, reward_mean=83.1, rw_bound=89.0

Agora Podemos ver o agente treinado em ação

In [23]:
env = wrap_env(gym.make("CartPole-v0"))
observation = env.reset()
sm = nn.Softmax(dim=1)
for i in range(1):
  done = False
  while not done:
    env.render()
    obs_v = torch.FloatTensor([observation])
    act_probs_v = sm(net(obs_v))
    act_probs = act_probs_v.data.numpy()[0]
    action = np.random.choice(len(act_probs), p=act_probs)
    observation, reward, done, info = env.step(action)     
    #if done:
    #    observation = env.reset() 

env.close()
show_video()

Agente com politica aleatória

In [24]:
env = wrap_env(gym.make("CartPole-v0"))
observation = env.reset()
for i in range(1):
  done = False
  while not done:
    env.render()
    action = env.action_space.sample()
    observation, reward, done, info = env.step(action)     
    #if done:
    #    observation = env.reset() 

env.close()
show_video()