In [None]:
!apt-get install python-opengl -y
!apt install xvfb -y
!pip install pyvirtualdisplay

In [None]:
import gym
from IPython import display
import matplotlib.pyplot as plt
from pyvirtualdisplay import Display
from collections import namedtuple
import math
import random

import numpy as np
import tensorflow as tf
from tensorflow.keras import Input,layers,Model

display = Display(visible=0, size=(1400, 900)).start()

In [None]:
# env_name = 'MountainCar-v0'
env_name = 'CartPole-v0'
env = gym.make(env_name)
env.reset()

In [None]:
learning_rate = 5e-4

inputs = Input([*Env_info.input_dims])
x = layers.Dense(256, activation = tf.nn.relu)(inputs)
x = layers.Dense(256, activation = tf.nn.relu)(x)
outputs = layers.Dense(Env_info.n_actions)(x)

train_net = Model(inputs = inputs, outputs= outputs)
target_net = tf.keras.models.clone_model(train_net)

loss = tf.keras.losses.mean_squared_error
opt = tf.keras.optimizers.Adam(learning_rate = learning_rate)

train_net.compile(optimizer = opt, loss = loss)

In [None]:
Transition = namedtuple('Transition',
                        ('observation', 'action', 'reward', 'next_observation', 'done'))

class ReplayMemory:
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, *args):
        """Saves a transition."""
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [None]:
EPS_START = 1.0
EPS_END = 0.0
EPS_DECAY = 200

class Epsilon:
  eps = EPS_START
  start = EPS_START
  end = EPS_END
  decay_factor = EPS_DECAY
  steps_done = 0
  @staticmethod
  def decay():
    Epsilon.eps = Epsilon.end + (Epsilon.start - Epsilon.end) * math.exp(-1.0 * (Epsilon.steps_done / Epsilon.decay_factor))
    Epsilon.steps_done+=1
    return Epsilon.eps

# for i in range(2000):
#   Epsilon.decay()
# print(Epsilon.eps)  #4.562749785743468e-05

In [None]:
BATCH_SIZE = 64
UPDATE_EVERY = 10
EXPERIENCE_MEMORY_SIZE = 100000
GAMMA = 0.99 


class Agent:
  def __init__(self):
    self.train_net = train_net
    self.target_net = target_net
    self.batch_size = BATCH_SIZE
    self.t_step = 0
    self.gamma = GAMMA
    self.memory = ReplayMemory(EXPERIENCE_MEMORY_SIZE)
    

    self.losses = []
    
  
  def predict(self, observation):
    if random.random() > Epsilon.decay():
      observation= tf.expand_dims(observation,axis = 0)
      return tf.argmax(self.train_net(observation),axis=1).numpy()[0]
    return env.action_space.sample()

  def step(self, observation, action, reward, next_observation, done):
    self.memory.push(observation, action, reward, next_observation, done)

    # Learn every UPDATE_EVERY time steps.
    self.t_step = (self.t_step + 1) % UPDATE_EVERY
    if self.t_step == 0:
      # If enough samples are available in memory, get random subset and learn
      if len(self.memory) > BATCH_SIZE:
        experiences = self.memory.sample(BATCH_SIZE)
        self.train_step(experiences)
        # loss_val = self.train_step(experiences)['loss']  #return loss value
        # self.losses.append(loss_val)
        # return loss_val 


  def train_step(self, transitions):
    
    # q_target = current_reward + gamma * q_future  # for usual cases
    # if the next_state is terminal, then q_future for that particular state will become zero
    # then it becomes q_target = current_reward
    # this can be achieved by multiplying (1-done) 

    batch = Transition(*zip(*transitions))

    current_observations = np.asarray(batch.observation)
    next_observations = np.asarray(batch.next_observation)
    rewards = np.asarray(batch.reward)
    dones = np.asarray(batch.done)
    actions = np.asarray(batch.action)
    indices = tf.range(0,self.batch_size).numpy()

    # print(current_observations.shape)
    # print(next_observations.shape)
    # print(rewards.shape)
    # print(dones.shape)
    # print(actions.shape)
    # print(indices.shape)

    q_vals = self.train_net(current_observations).numpy()
    q_target = np.copy(q_vals)
    q_next = tf.reduce_max(self.target_net(next_observations), axis = -1)
    q_target[indices,actions] = rewards + self.gamma * q_next * (1.0-dones) 

    # print("Q-vals")
    # print(q_vals)
    # print("Actions")
    # print(actions)
    # print("Q-targets")
    # print(q_target)
    # print("result")
    # print(q_vals-q_target)

    #performs training and caches loss value in train_metrics
    # loss_val = self.policy_net.train_on_batch(current_observations, q_target, return_dict = True)

    #Low-level alternative to train_on_batch
    with tf.GradientTape() as tape:
      q_vals = self.train_net(current_observations)
      loss_val = loss(q_vals, q_target)
    gradients = tape.gradient(loss_val, self.train_net.trainable_variables)
    opt.apply_gradients(zip(gradients, self.train_net.trainable_variables))

    self.update_target_net()

    
  #Write soft_update
  def update_target_net(self):
    weights = self.train_net.get_weights()
    self.target_net.set_weights(weights)

In [None]:
from collections import deque

n_episodes = 2000
log_every_n_episodes = 5

scores = []
scores_window = deque(maxlen = log_every_n_episodes)

agent = Agent()

for episode in range(n_episodes):

  obs = env.reset()
  score = 0
  done = False

  while not done:
    action = agent.act(obs)
    new_obs, reward, done, _ = env.step(action)
    done = 1.0 if done else 0.0  # if done is True -> done = 1 else done = 0
    agent.step(obs,action,reward,new_obs,done) 
    # loss_tmp = agent.step(obs,action,reward,new_obs,done) 
    # if loss_tmp == None: loss_tmp = 0
    # loss += loss_tmp
    score += reward
    obs = new_obs
  
  scores.append(score)
  scores_window.append(score)
  
  if episode% log_every_n_episodes == 0:
    print('Episode {}  AverageScore: {:.2f}'.format(episode, np.mean(scores_window)))


# Serializer(agent.target_net, )

In [None]:
import imageio
import base64
import IPython

def embed_mp4(filename):
  """Embeds an mp4 file in the notebook."""
  video = open(filename,'rb').read()
  b64 = base64.b64encode(video)
  tag = '''
  <video width="640" height="480" controls>
    <source src="data:video/mp4;base64,{0}" type="video/mp4">
  Your browser does not support the video tag.
  </video>'''.format(b64.decode())

  return IPython.display.HTML(tag)


def create_policy_eval_video(agent, filename, num_episodes=5, fps=30):
  l = []
  env_name = 'CartPole-v0'
  env = gym.make(env_name)
  filename = filename + ".mp4"
  with imageio.get_writer(filename, fps=fps) as video:
    for _ in range(num_episodes):
      steps = 0
      obs = env.reset()
      video.append_data(env.render(mode='rgb_array'))
      done = False
      while not done:
        action = agent.act(obs)
        new_obs,_,done,_ = env.step(action)
        video.append_data(env.render(mode='rgb_array'))
        obs = new_obs
        steps+=1
        if done:
          l.append(steps)
  print(l) 
  return embed_mp4(filename)
agent = Agent()
create_policy_eval_video(agent, "trained-agent")