In [1]:
import gym
import tensorflow as tf
from tensorflow import keras
import numpy as np
from collections import deque

env = gym.make("CartPole-v1")

  DESCRIPTOR = _descriptor.FileDescriptor(
  _descriptor.FieldDescriptor(
  _TENSORSHAPEPROTO_DIM = _descriptor.Descriptor(
  DESCRIPTOR = _descriptor.FileDescriptor(
  _descriptor.EnumValueDescriptor(
  _DATATYPE = _descriptor.EnumDescriptor(
  DESCRIPTOR = _descriptor.FileDescriptor(
  _descriptor.FieldDescriptor(
  _RESOURCEHANDLEPROTO_DTYPEANDSHAPE = _descriptor.Descriptor(
  DESCRIPTOR = _descriptor.FileDescriptor(
  _descriptor.FieldDescriptor(
  _TENSORPROTO = _descriptor.Descriptor(
  DESCRIPTOR = _descriptor.FileDescriptor(
  _descriptor.FieldDescriptor(
  _ATTRVALUE_LISTVALUE = _descriptor.Descriptor(
  'nearest': pil_image.NEAREST,
  'bilinear': pil_image.BILINEAR,
  'bicubic': pil_image.BICUBIC,
  if hasattr(pil_image, 'HAMMING'):
  if hasattr(pil_image, 'BOX'):
  if hasattr(pil_image, 'LANCZOS'):


In [2]:
from typing import Dict, Tuple
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import layers
from agent import Agent
from state import State
from action import Action
from variables import GAMMA, SIGMA

class BaselineActorCriticAgent(Agent):
    def __init__(self, num_actions, optimizer, discount:float=GAMMA) -> None:
        super().__init__(num_actions, optimizer)
        self.num_actions = num_actions
        self.optimizer = optimizer
        self.discount = discount
        self.dense1 = layers.Dense(64)
        self.dense2 = layers.Dense(64)
        self.actor  = layers.Dense(self.num_actions)        # Produce logits
        self.critic = layers.Dense(1)                      # Produce the state-value directly

    def call(self, inputs: tf.Tensor) -> Tuple[tf.Tensor, tf.Tensor]:
        x = self.dense2(self.dense1(inputs))
        # Then we produce the policy values
        action_logits = self.actor(x)                    # 1xnum_actions
        action_probs  = tf.nn.softmax(action_logits)     # 1xnum_actions probabilities
        # Avoid producing a tensor containing probability 0 for some actions.
        action_probs = tf.clip_by_value(action_probs, 1e-10, 1.0)
        # ... and the state value.
        state_value = self.critic(x)                    # 1x1
        return action_logits, action_probs, state_value

    def choose_action(self, state:np.ndarray) -> Tuple:
        action_logits, action_probs, state_value = self(
            tf.expand_dims(tf.cast(state, tf.float32), axis=0))
        # Sample from the actions probability distribution
        action = tf.random.categorical(action_logits, 1)
        return {
            'action': action.numpy()[0,0],
            'policy': action_probs[0],
            'value' : state_value[0]
        }

In [3]:
model = BaselineActorCriticAgent(2, Adam(learning_rate=1e-2, clipnorm=40.0))

In [4]:
def play_one_step(env, state):
    a_info = model.choose_action(state)
    next_state, reward, done, _ = env.step(a_info['action'])
    return (state, a_info, reward, next_state, done)

In [5]:
def play_episode(env, init_state):
    ep_steps = 0
    episode_buffer = deque([], maxlen=200)
    state = init_state
    for step in tf.range(200):
        ep_steps += 1
        state, a_info, reward, next_state, done = play_one_step(env, state)
        episode_buffer.append((state, a_info, reward, next_state, done))
        if done:
            break
        state = next_state
    states, actions, rewards, next_states, dones = (
        np.array([experience[i] for experience in list(episode_buffer)]) 
        for i in range(5))
    # Compute returns
    returns = []
    discounted_sum = tf.constant(0.0)
    for i in tf.range(ep_steps-1, -1, -1):
        discounted_sum = rewards[i] + 0.99 * discounted_sum
        returns.append(discounted_sum)
    returns = np.stack(returns[::-1])
    returns = (returns - tf.math.reduce_mean(returns)) / (tf.math.reduce_std(returns))
    return states, actions, rewards, returns, next_states, dones

In [6]:
def training_step(env, initial_state):
    # We open a GradientTape because we want to reverse these operations to obtain the gradient
    # of the loss with respect to the model's parameter
    with tf.GradientTape() as tape:
        # 1) Get the experience playing the episode
        states, actions, rewards, returns, next_states, dones = play_episode(env, initial_state)
        v_st_pred = tf.squeeze(tf.stack([a['value'] for a in actions]))
        a_probs = tf.stack([a['policy'] for a in actions])
        a_indices = tf.stack([a['action'] for a in actions])
        a_probs = tf.gather(a_probs, a_indices, batch_dims=1)
        a_log_probs = tf.math.log(a_probs)
        # Compute delta
        delta = returns - v_st_pred
        # Actor loss
        actor_loss = -tf.reduce_sum(tf.expand_dims(delta, axis=-1)*a_log_probs)
        # Critic loss
        critic_loss = tf.keras.losses.Huber(reduction=tf.keras.losses.Reduction.SUM)(v_st_pred, returns)
        # Entropy loss
        # entropy_loss = -tf.reduce_sum(a_log_probs*a_probs)
        # Total loss
        loss = tf.reduce_sum(actor_loss + critic_loss) #+ 0.01*entropy_loss)
    # 9) Obtain the gradient of the loss with respect to the model's parameters
    grads = tape.gradient(loss, model.trainable_variables)
    # 10) Apply the update
    model.optimizer.apply_gradients(zip(grads, model.trainable_variables))
    return rewards

In [7]:
from tqdm import trange

rewards_mean = 0
with trange(800) as t:
    for ep_num in t:
        initial_state = env.reset()
        rewards = training_step(env, initial_state)
        episode_reward = sum(rewards)
        rewards_mean = rewards_mean + (episode_reward-rewards_mean)/(ep_num+1)
        t.set_description(f'Episode reward: {episode_reward:.2f}, mean reward: {rewards_mean:.2f}')
        
env.close()

Episode reward: 8.00, mean reward: 16.63: 100%|██████████| 800/800 [01:16<00:00, 10.42it/s]  
