# A3C

In this section you will learn how to implement A3C, the asynchronous reinforcement learning algorithm you saw earlier in the chapter. A3C is a significantly more complex training algorithm than those you have seen previously. The algorithm requires running gradient descent in multiple threads, interspersed with game rollout code, and updating learned weights asynchronously. As a result of this extra complexity, we will define the A3C algorithm in an object-oriented fashion. Let’s start by defining an A3C object.

The A3C class implements the A3C algorithm (Example 8-16). A few extra bells and whistles are added onto the basic algorithm to encourage learning, notably an entropy term and support for generalized advantage estimation. We won’t cover all of these details, but encourage you to follow references into the research literature (listed in the documentation) to understand more.

Example 8-16. Define the A3C class encapsulating the asynchronous A3C training algorithm

In [None]:
class A3C(object):
  """
  Implements the Asynchronous Advantage Actor-Critic (A3C) algorithm.
  The algorithm is described in Mnih et al, "Asynchronous Methods for Deep
  Reinforcement Learning" (https://arxiv.org/abs/1602.01783). This class
  requires the policy to output two quantities: a vector giving the probability
  of taking each action, and an estimate of the value function for the current
  state. It optimizes both outputs at once using a loss that is the sum of three
  terms:
  1. The policy loss, which seeks to maximize the discounted reward for each action.
  2. The value loss, which tries to make the value estimate match the actual
     discounted reward that was attained at each step.
  3. An entropy term to encourage exploration.
  This class only supports environments with discrete action spaces, not
  continuous ones. The "action" argument passed to the environment is an
  integer, giving the index of the action to perform.
  This class supports Generalized Advantage Estimation as described in Schulman
  et al., "High-Dimensional Continuous Control Using Generalized Advantage
  Estimation" (https://arxiv.org/abs/1506.02438). This is a method of trading
  off bias and variance in the advantage estimate, which can sometimes improve
  the rate of convergence. Use the advantage_lambda parameter to adjust the
  tradeoff.
  """
  self._env = env
  self.max_rollout_length = max_rollout_length
  self.discount_factor = discount_factor
  self.advantage_lambda = advantage_lambda
  self.value_weight = value_weight
  self.entropy_weight = entropy_weight
  self._optimizer = None
  (self._graph, self._features, self._rewards, self._actions,
   self._action_prob, self._value, self._advantages) = self.build_graph(
       None, "global", model_dir)
  with self._graph._get_tf("Graph").as_default():
    self._session = tf.Session()

The heart of the A3C class lies in the build_graph() method (Example 8-17), which constructs a TensorGraph instance (underneath which lies a TensorFlow computation graph) encoding the policy learned by the model. Notice how succinct this definition is compared with others you have seen previously! There are many advantages to using object orientation.

Example 8-17. This method builds the computation graph for the A3C algorithm. Note that the policy network is defined here using the Layer abstractions you saw previously.

In [None]:
def build_graph(self, tf_graph, scope, model_dir):
  """Construct a TensorGraph containing the policy and loss calculations."""
  state_shape = self._env.state_shape
  features = []
  for s in state_shape:
    features.append(Input(shape=[None] + list(s), dtype=tf.float32))
  d1 = Flatten(in_layers=features)
  d2 = Dense(
      in_layers=[d1],
      activation_fn=tf.nn.relu,
      normalizer_fn=tf.nn.l2_normalize,
      normalizer_params={"dim": 1},
      out_channels=64)
  d3 = Dense(
      in_layers=[d2],
      activation_fn=tf.nn.relu,
      normalizer_fn=tf.nn.l2_normalize,
      normalizer_params={"dim": 1},
      out_channels=32)
  d4 = Dense(
      in_layers=[d3],
      activation_fn=tf.nn.relu,
      normalizer_fn=tf.nn.l2_normalize,
      normalizer_params={"dim": 1},
      out_channels=16)
  d4 = BatchNorm(in_layers=[d4])
  d5 = Dense(in_layers=[d4], activation_fn=None, out_channels=9)
  value = Dense(in_layers=[d4], activation_fn=None, out_channels=1)
  value = Squeeze(squeeze_dims=1, in_layers=[value])
  action_prob = SoftMax(in_layers=[d5])
  rewards = Input(shape=(None,))
  advantages = Input(shape=(None,))
  actions = Input(shape=(None, self._env.n_actions))
  loss = A3CLoss(
      self.value_weight,
      self.entropy_weight,
      in_layers=[rewards, actions, action_prob, value, advantages])
  graph = TensorGraph(
      batch_size=self.max_rollout_length,
      graph=tf_graph,
      model_dir=model_dir)
  for f in features:
    graph._add_layer(f)
  graph.add_output(action_prob)
  graph.add_output(value)
  graph.set_loss(loss)
  graph.set_optimizer(self._optimizer)
  with graph._get_tf("Graph").as_default():
    with tf.variable_scope(scope):
      graph.build()
  return graph, features, rewards, actions, action_prob, value, advantages

There’s a lot of code in this example. Let’s break it down into multiple examples and discuss more carefully. Example 8-18 takes the array encoding of the TicTacToeEnvironment and feeds it into the Input instances for the graph directly.

Example 8-18. This snippet from the build_graph() method feeds in the array encoding of TicTacToeEnvironment

In [None]:
state_shape = self._env.state_shape
features = []
for s in state_shape:
  features.append(Input(shape=[None] + list(s), dtype=tf.float32))

Example 8-19 shows the code used to construct inputs for rewards from the environment, advantages observed, and actions taken.

Example 8-19. This snippet from the build_graph() method defines Input objects for rewards, advantages, and actions

In [None]:
rewards = Input(shape=(None,))
advantages = Input(shape=(None,))
actions = Input(shape=(None, self._env.n_actions))

The policy network is responsible for learning the policy. In Example 8-20, the input board state is first flattened into an input feature vector. A series of fully connected (or Dense) transformations are applied to the flattened board. At the very end, a Softmax layer is used to predict action probabilities from d5 (note that out_channels is set to 9, one for each possible move on the tic-tac-toe board).

Example 8-20. This snippet from the build_graph() method defines the policy network

In [None]:
d1 = Flatten(in_layers=features)
d2 = Dense(
    in_layers=[d1],
    activation_fn=tf.nn.relu,
    normalizer_fn=tf.nn.l2_normalize,
    normalizer_params={"dim": 1},
    out_channels=64)
d3 = Dense(
    in_layers=[d2],
    activation_fn=tf.nn.relu,
    normalizer_fn=tf.nn.l2_normalize,
    normalizer_params={"dim": 1},
    out_channels=32)
d4 = Dense(
    in_layers=[d3],
    activation_fn=tf.nn.relu,
    normalizer_fn=tf.nn.l2_normalize,
    normalizer_params={"dim": 1},
    out_channels=16)
d4 = BatchNorm(in_layers=[d4])
d5 = Dense(in_layers=[d4], activation_fn=None, out_channels=9)
value = Dense(in_layers=[d4], activation_fn=None, out_channels=1)
value = Squeeze(squeeze_dims=1, in_layers=[value])
action_prob = SoftMax(in_layers=[d5])

> **_Is Feature Engineering Dead?_**<br>In this section, we feed the raw tic-tac-toe game board into TensorFlow for training the policy. However, it’s important to note that for more complex games than tic-tac-toe, this may not yield satisfactory results. One of the lesser known facts about AlphaGo is that DeepMind performs sophisticated feature engineering to extract “interesting” patterns of Go pieces upon the board to make AlphaGo’s learning easier. (This fact is tucked away into the supplemental information of DeepMind’s paper.)<br>The fact remains that reinforcement learning (and deep learning methods broadly) often still need human-guided feature engineering to extract meaningful information before learning algorithms can learn effective policies and models. It’s likely that as more computational power becomes available through hardware advances, this need for feature engineering will be reduced, but for the near term, plan on manually extracting information about your systems as needed for performance.

# The A3C Loss Function

We now have the object-oriented machinery set in place to define a loss for the A3C policy network. This loss function will itself be implemented as a Layer object (it’s a convenient abstraction that all parts of the deep architecture are simply layers). The A3CLoss object implements a mathematical loss consisting of the sum of three terms: a policy_loss, a value_loss, and an entropy term for exploration. See Example 8-21.

Example 8-21. This Layer implements the loss function for A3C

In [None]:
class A3CLoss(Layer):
  """This layer computes the loss function for A3C."""
  def __init__(self, value_weight, entropy_weight, **kwargs):
    super(A3CLoss, self).__init__(**kwargs)
    self.value_weight = value_weight
    self.entropy_weight = entropy_weight
  def create_tensor(self, **kwargs):
    reward, action, prob, value, advantage = [
        layer.out_tensor for layer in self.in_layers
    ]
    prob = prob + np.finfo(np.float32).eps
    log_prob = tf.log(prob)
    policy_loss = -tf.reduce_mean(
        advantage * tf.reduce_sum(action * log_prob, axis=1))
    value_loss = tf.reduce_mean(tf.square(reward - value))
    entropy = -tf.reduce_mean(tf.reduce_sum(prob * log_prob, axis=1))
    self.out_tensor = policy_loss + self.value_weight * value_loss
    - self.entropy_weight * entropy
    return self.out_tensor

There are a lot of pieces to this definition, so let’s pull out bits of code and inspect. The A3CLoss layer takes in reward, action, prob, value, advantage layers as inputs. For mathematical stability, we convert probabilities to log probabilities (this is numerically much more stable). See Example 8-22.

Example 8-22. This snippet from A3CLoss takes reward, action, prob, value, advantage as input layers and computes a log probability

In [None]:
reward, action, prob, value, advantage = [
    layer.out_tensor for layer in self.in_layers
]
prob = prob + np.finfo(np.float32).eps
log_prob = tf.log(prob)

The policy loss computes the sum of all advantages observed, weighted by the log-probability of the action taken. (Recall that the advantage is the difference in reward resulting from taking the given action as opposed to the expected reward from the raw policy for that state). The intuition here is that the policy_loss provides a signal on which actions were fruitful and which were not (Example 8-23).

Example 8-23. This snippet from A3CLoss defines the policy loss

In [None]:
policy_loss = -tf.reduce_mean(
    advantage * tf.reduce_sum(action * log_prob, axis=1))

The value loss computes the difference between our estimate of V (reward) and the actual value of V observed (value). Note the use of the $L^2$ loss here (Example 8-24).

Example 8-24. This snippet from A3CLoss defines the value loss

In [None]:
value_loss = tf.reduce_mean(tf.square(reward - value))

The entropy term is an addition that encourages the policy to explore further by adding some noise. This term is effectively a form of regularization for A3C networks. The final loss computed by A3CLoss is a linear combination of these component losses. See Example 8-25.

Example 8-25. This snippet from A3CLoss defines an entropy term added to the loss

In [None]:
entropy = -tf.reduce_mean(tf.reduce_sum(prob * log_prob, axis=1))

# Defining Workers

Thus far, you’ve seen how the policy network is constructed, but you haven’t yet seen how the asynchronous training procedure is implemented. Conceptually, asynchronous training consists of individual workers running gradient descent on locally simulated game rollouts and contributing learned knowledge back to a global set of weights periodically. Continuing our object-oriented design, let’s introduce the Worker class.

Each Worker instance holds a copy of the model that’s trained asynchronously on a separate thread (Example 8-26). Note that a3c.build_graph() is used to construct a local copy of the TensorFlow computation graph for the thread in question. Take special note of local_vars and global_vars here. We need to make sure to train only the variables associated with this worker’s copy of the policy and not with the global copy of the variables (which is used to share information across worker threads). As a result gradients uses tf.gradients to take gradients of the loss with respect to only local_vars.

Example 8-26. The Worker class implements the computation performed by each thread

In [None]:
class Worker(object):
  """A Worker object is created for each training thread."""
  def __init__(self, a3c, index):
    self.a3c = a3c
    self.index = index
    self.scope = "worker%d" % index
    self.env = copy.deepcopy(a3c._env)
    self.env.reset()
    (self.graph, self.features, self.rewards, self.actions, self.action_prob,
     self.value, self.advantages) = a3c.build_graph(
        a3c._graph._get_tf("Graph"), self.scope, None)
    with a3c._graph._get_tf("Graph").as_default():
      local_vars = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES,
                                     self.scope)
      global_vars = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES,
                                      "global")
      gradients = tf.gradients(self.graph.loss.out_tensor, local_vars)
      grads_and_vars = list(zip(gradients, global_vars))
      self.train_op = a3c._graph._get_tf("Optimizer").apply_gradients(
          grads_and_vars)
      self.update_local_variables = tf.group(
          * [tf.assign(v1, v2) for v1, v2 in zip(local_vars, global_vars)])
      self.global_step = self.graph.get_global_step()

# Worker rollouts

Each Worker is responsible for simulating game rollouts locally. The create_rollout() method uses session.run to fetch action probabilities from the TensorFlow graph (Example 8-27). It then samples an action from this policy using np.random.choice, weighted by the per-class probabilities. The reward for the action taken is computed from TicTacToeEnvironment via a call to self.env.step(action).

Example 8-27. The create_rollout method simulates a game rollout locally

In [None]:
def create_rollout(self):
  """Generate a rollout."""
  n_actions = self.env.n_actions
  session = self.a3c._session
  states = []
  actions = []
  rewards = []
  values = []
  # Generate the rollout.
  for i in range(self.a3c.max_rollout_length):
    if self.env.terminated:
        break
      state = self.env.state
      states.append(state)
      feed_dict = self.create_feed_dict(state)
      results = session.run(
          [self.action_prob.out_tensor, self.value.out_tensor],
          feed_dict=feed_dict)
      probabilities, value = results[:2]
      action = np.random.choice(np.arange(n_actions), p=probabilities[0])
      actions.append(action)
      values.append(float(value))
      rewards.append(self.env.step(action))
  # Compute an estimate of the reward for the rest of the episode.
  if not self.env.terminated:
    feed_dict = self.create_feed_dict(self.env.state)
    final_value = self.a3c.discount_factor * float(
        session.run(self.value.out_tensor, feed_dict))
  else:
    final_value = 0.0
  values.append(final_value)
  if self.env.terminated:
    self.env.reset()
  return states, actions, np.array(rewards), np.array(values)

The process_rollouts() method does preprocessing needed to compute discounted rewards, values, actions, and advantages (Example 8-28).

Example 8-28. The process_rollout method computes rewards, values, actions, and advantages and then takes a gradient descent step against the loss

In [None]:
def process_rollout(self, states, actions, rewards, values, step_count):
  """Train the network based on a rollout."""
  # Compute the discounted rewards and advantages.
  if len(states) == 0:
    # Rollout creation sometimes fails in multithreaded environment.
    # Don't process if malformed
    print("Rollout creation failed. Skipping")
    return
  discounted_rewards = rewards.copy()
  discounted_rewards[-1] += values[-1]
  advantages = rewards - values[:-1] + self.a3c.discount_factor * np.array(
      values[1:])
  for j in range(len(rewards) - 1, 0, -1):
    discounted_rewards[j-1] += self.a3c.discount_factor * discounted_rewards[j]
    advantages[j-1] += (
        self.a3c.discount_factor * self.a3c.advantage_lambda * advantages[j])
   # Convert the actions to one-hot.
  n_actions = self.env.n_actions
  actions_matrix = []
  for action in actions:
    a = np.zeros(n_actions)
    a[action] = 1.0
    actions_matrix.append(a)
  # Rearrange the states into the proper set of arrays.
  state_arrays = [[] for i in range(len(self.features))]
  for state in states:
    for j in range(len(state)):
      state_arrays[j].append(state[j])
  # Build the feed dict and apply gradients.
  feed_dict = {}
  for f, s in zip(self.features, state_arrays):
    feed_dict[f.out_tensor] = s
  feed_dict[self.rewards.out_tensor] = discounted_rewards
  feed_dict[self.actions.out_tensor] = actions_matrix
  feed_dict[self.advantages.out_tensor] = advantages
  feed_dict[self.global_step] = step_count
  self.a3c._session.run(self.train_op, feed_dict=feed_dict)

The Worker.run() method performs the training step for the Worker, relying on process_rollouts() to issue the actual call to self.a3c._session.run() under the hood (Example 8-29).

Example 8-29. The run() method is the top level invocation for Worker

In [None]:
def run(self, step_count, total_steps):
  with self.graph._get_tf("Graph").as_default():
    while step_count[0] < total_steps:
      self.a3c._session.run(self.update_local_variables)
      states, actions, rewards, values = self.create_rollout()
      self.process_rollout(states, actions, rewards, values, step_count[0])
      step_count[0] += len(actions)

# Training the Policy

The A3C.fit() method brings together all the disparate pieces introduced to train the model. The fit() method takes the responsibility for spawning Worker threads using the Python threading library. Since each Worker takes responsibility for training itself, the fit() method simply is responsible for periodically checkpointing the trained model to disk. See Example 8-30.

Example 8-30. The fit() method brings everything together and runs the A3C training algorithm

In [None]:
def fit(self,
        total_steps,
        max_checkpoints_to_keep=5,
        checkpoint_interval=600,
        restore=False):
  """Train the policy.
  Parameters
  ----------
  total_steps: int
    the total number of time steps to perform on the environment, across all
    rollouts on all threads
  max_checkpoints_to_keep: int
    the maximum number of checkpoint files to keep. When this number is
    reached, older files are deleted.
  checkpoint_interval: float
    the time interval at which to save checkpoints, measured in seconds
  restore: bool
    if True, restore the model from the most recent checkpoint and continue
    training from there. If False, retrain the model from scratch.
  """
  with self._graph._get_tf("Graph").as_default():
    step_count = [0]
    workers = []
    threads = []
    for i in range(multiprocessing.cpu_count()):
      workers.append(Worker(self, i))
    self._session.run(tf.global_variables_initializer())
    if restore:
      self.restore()
    for worker in workers:
      thread = threading.Thread(
          name=worker.scope,
          target=lambda: worker.run(step_count, total_steps))
      threads.append(thread)
      thread.start()
    variables = tf.get_collection(
        tf.GraphKeys.GLOBAL_VARIABLES, scope="global")
    saver = tf.train.Saver(variables, max_to_keep=max_checkpoints_to_keep)
    checkpoint_index = 0
    while True:
      threads = [t for t in threads if t.isAlive()]
      if len(threads) > 0:
        threads[0].join(checkpoint_interval)
      checkpoint_index += 1
      saver.save(
          self._session, self._graph.save_file, global_step=checkpoint_index)
      if len(threads) == 0:
        break

# Challenge for the Reader

We strongly encourage you to try training tic-tac-toe models for yourself! Note that this example is more involved than other examples in the book, and will require greater computational power. We recommend a machine with at least a few CPU cores. This requirement isn’t too onerous; a good laptop should suffice. Try using a tool like htop to check that the code is indeed multithreaded. See how good a model you can train! You should be able to beat the random baseline most of the time, but this basic implementation won’t give you a model that always wins. We recommend exploring the RL literature and expanding upon the base implementation to see how well you can do.