## Q learning using neural network as function approximation solving cartpole

Basic Q learning with replay memory to train a Q network to estimate action value

library:  
pytorch 0.4

In [79]:
from IPython import display
from IPython.display import HTML
import matplotlib.pyplot as plt
import matplotlib.animation as animation

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import numpy as np
import random
import os
import gym
#from google.colab import files

In [106]:
use_cuda = torch.cuda.is_available()
FloatTensor = torch.cuda.FloatTensor if use_cuda else torch.FloatTensor
LongTensor = torch.cuda.LongTensor if use_cuda else torch.LongTensor
ByteTensor = torch.cuda.ByteTensor if use_cuda else torch.ByteTensor

In [132]:
def save_torch_model(model, filename):
  if not os.path.exists(os.path.dirname(filename)):
    os.makedirs(os.path.dirname(filename))
  torch.save(model.state_dict(), filename)

def load_torch_model(model, filename):
  model.load_state_dict(torch.load(filename))

## Replay memory 
Save every step as the agent performs an action in an environment (in this case is the cartpole-v0)

In [7]:
class ReplayMemory():
  def __init__(self, memory_size = 1000):
    self.transitions = []
    self.memory_size = memory_size
    self.loc_pointer = 0
  
  def clear(self):
    self.transitions = []
    self.loc_pointer = 0
  
  def add(self, step_tuple):
    # expect a tuple of transition contain:
    # state, action, reward, next_state, ended
    if len(self.transitions) <= self.loc_pointer:
      self.transitions.append(None)
    self.transitions[self.loc_pointer] = step_tuple
    self.loc_pointer += 1
    if self.loc_pointer >= self.memory_size:
      self.loc_pointer %= self.memory_size
  
  def get_sample(self, batch_size):
    return random.sample(self.transitions, batch_size)

In [10]:
# define the a model for Q value approximation
class NN(nn.Module):
  def __init__(self, input_size, output_size):
    super(NN,self).__init__()
    self.l1_linear = nn.Linear(input_size, 128, bias=False)
    nn.init.kaiming_normal_(self.l1_linear.weight)
    self.l2_linear = nn.Linear(128, output_size, bias=False)
    nn.init.kaiming_normal_(self.l2_linear.weight)
    
  def forward(self,x):
    out = F.relu(self.l1_linear(x))
    out = self.l2_linear(out)
    return out

In [134]:
# "Shallow" Q network agent
class SQN():
  _epsilon = 0.3
  _gamma = 0.95
  replay_memory = ReplayMemory()
  
  def __init__(self, env, state_size = None, output_size = None):
    self.is_training = True
    self.iteration = 0
    self.env = env
    self.output_size = output_size
    self.state_size = state_size
    self.Q = NN(state_size * 2, output_size)
    if use_cuda:
      self.Q.cuda()
      
  def epsilon(self):
    decay = 1 / (1 + self.iteration)
    return self._epsilon * decay
      
  def predict(self, state):
    s = Variable(FloatTensor([state]))
    action_value = self.Q(s)
    return action_value.data.tolist()[0]

  def pick_action(self, state):
    if self.is_training and random.random() < self.epsilon():
      action =  random.randint(0, self.output_size -1)
    else:
      action_value = self.predict(state)
      action = action_value.index(max(action_value))
    return action
  
  def update_Q(self, batch):
    # Q learning, Q(s,a) = Q(s,a) + alpha * [reward + gamma * max(Q(s')) - Q(s,a)]
    # Target of the Q function is the one step bellman equation "reward + gamma * max(Q(s'))"
    # so error is [taget - current estimation] = [reward + gamma * max(Q(s')) - Q(s,a)]
    
    (state, action, reward, next_state, ended) = tuple(zip(*batch))

    var_state = Variable(FloatTensor(state))
    var_action = Variable(LongTensor(action))
    var_ended = Variable(FloatTensor(ended))
    var_reward = Variable(FloatTensor(reward))
    var_next_state = Variable(FloatTensor(next_state))
    
    # current estimation, take the Q value of the action performed
    state_action_values = self.Q(var_state).gather(1, var_action.view(-1,1))
    
    # target. If an episode ended at this step, only reward is used as there are no next state
    target_values = Variable(var_reward + (1 - var_ended) * self._gamma * self.Q(var_next_state).max(1)[0])
    
    loss = F.mse_loss(state_action_values, target_values.view(-1,1))
    self.optimizer.zero_grad()
    loss.backward()
    self.optimizer.step()

  def train(self, env, episode, iter_per_episode = 100, batch_size = 32, lr=1e-3, checkpoint = 50):
    self.optimizer = torch.optim.Adam(self.Q.parameters(), lr=lr, weight_decay=1e-3)
    running_score = None
    best_score = -99999
    for i in range(episode):
      s0 = env.reset()
      # state contain 2 time step, as single state does not contain enough information where the pole is moving or moving how fast
      state = np.append(s0,s0)
      episode_ended = False
      score = 0
      while not episode_ended:
        action =  self.pick_action(state)
        (s1, reward, episode_ended, info) = env.step(action)
        next_state = np.append(s0,s1)
        score += 1
        if episode_ended:
          ended = 1
        else:
          ended = 0
        self.replay_memory.add((state,action,reward,next_state,ended))
        s0 = s1
        state = next_state
        
      if running_score == None:
        running_score = score
      else:
        running_score = running_score * 0.9 + score * 0.1
        
        
      if (i + 1) % checkpoint == 0:
        if running_score > best_score and running_score > 100:
          best_score = running_score
          save_torch_model(self.Q,'model/cartpole_sqn_best.pth')
        save_torch_model(self.Q,'model/cartpole_sqn_iter_%d.pth' %(i+1))
        # longer the better, that mean the agent can keep the pole up for longer period
        print(i+1,': running_score:', running_score)
        
      if len(self.replay_memory.transitions) > batch_size:
        for j in range(iter_per_episode):
          batch = self.replay_memory.get_sample(batch_size)
          self.update_Q(batch)


In [75]:
env = gym.make('CartPole-v0')
agent = SQN(env,4,2)

[33mWARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.[0m


In [76]:
# run 600 epidsodes, each episode train the Q network for 50 times using 64 batches
agent.train(env, episode=1000, iter_per_episode=10, batch_size=64, lr=1e-3, checkpoint=100)

100 : running_score: 23.011921307368173
200 : running_score: 17.840581295144926
300 : running_score: 61.219751902765246
400 : running_score: 16.199829456527162
500 : running_score: 107.18701508126131
600 : running_score: 73.62057823902238
700 : running_score: 109.22056504160464
800 : running_score: 183.54841456120877
900 : running_score: 142.95538271472319
1000 : running_score: 198.05190059263006


In [128]:
# sample run with the trained model and record the frames
frames = []
agent.is_training = False
s0 = env.reset()
frames.append(env.render(mode='rgb_array'))
state = np.append(s0,s0)
episode_ended = False
score = 0
while not episode_ended:
  action =  agent.pick_action(state)
  (s1, reward, episode_ended, info) = env.step(action)
  frames.append(env.render(mode='rgb_array'))
  next_state = np.append(s0,s1)
  state = next_state

In [129]:
%%capture
def animate(frames):
  fig, ax = plt.subplots()
  ax.grid('off')
  ax.axis('off')
  ims = []
  for i in range(len(frames)):
      im = plt.imshow(frames[i], animated=True)
      ims.append([im])
  ani = animation.ArtistAnimation(fig, ims, interval=20, blit=True, repeat_delay=1000)
  return ani

ani = animate(frames)
ani.save('cartpole_sqn.mp4')

In [131]:
%%HTML
<video width="400" controls>
  <source src="cartpole_sqn.mp4" type="video/mp4">
</video>