#CartPole-v0 mithilfe einer Actor-Critic Methode

Simon Schmid (9917195)

## Einleitung



Dieses Notebook zeigt, wie mithilfe der "Actor-Critic" Methode und TensorFlow ein Agent trainiert werden kann, welcher im OpenAI Gym CartPole-v0 Environment agiert.


**Actor-Critic Methode**

Die Actor-Critic Methode ist eine "temporal difference (TD)" Lernmethode.
Bei einer TD-Lernmethode werden die Erwartungswerte nicht erst angepasst, wenn die Ergebnisse feststehen, sondern die Werte werden laufend angepasst. 

Richard Sutton beschrieb dies in "Learning to predict by the methods of temporal differences"(1988, doi:10.1007/BF00115009) mit dem Satz: 

"Suppose you wish to predict the weather for Saturday, and you have some model that predicts Saturday's weather, given the weather of each day in the week. In the standard case, you would wait until Saturday and then adjust all your models. However, when it is, for example, Friday, you should have a pretty good idea of what the weather would be on Saturday – and thus be able to change, say, Saturday's model before Saturday arrives."


Wie bei anderen Reinforcement Learning Methoden gibt es auch hier eine "policy function", welche eine Wahrscheinlichkeitsverteilung für die verschiedenen Actionen bietet, mit den jeweiligen Wahrscheinlichkeiten zum aktuellen Zeitpunkt (state).

Ebenfalls gibt es eine "value function", welche vergleichbar mit der Q-function des Q-Learnings, den zu erwartenden Reward für eine bestimmte Aktion berechnet. Sie dient als "Kritiker" (critic) der Aktionen.

**CartPole-v0**

Das CartPole ist ein Environment der OpenAI Gym Umgebung.
Dabei befindet sich eine Stange (pole) auf einem, sich bewegenden, "Wagen" (cart). Ziel ist des die Stange so lange wie möglich aufrecht zu halten. Erreicht werden kann das, indem der Wagen sich nach links oder rechts bewegt.

Ein Reward von +1 wird für jede Zeiteinheit, welche die Stange aufrecht ist gegeben.
Eine Episode endet, wenn entweder die Stange mehr als 15 Grad geneigt ist, oder der Wagen mehr als 2.4 Längeneinheiten nach links oder rechts gefahren ist, sprich, außerhalb des Bildschirms.

Das Problem wird als gelöst betrachtet, wenn der durchschnittliche Reward einer Episode einen Wert von 195 100x am Stück erreicht hat. (Ich wollte den benötigten minimalen Reward eigentlich hochstellen auf ca. 300, um längere Ergebnisse zu sehen, habe dabei aber gemerkt, dass der maximale Reward bei 200 stehen bleibt, was wohl eine Einschränkung des Environments ist)

## Setup


In [2]:
!pip install gym[classic_control]
!pip install pyglet
!pip install pyvirtualdisplay

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [3]:
%%bash
# Install additional packages for visualization
sudo apt-get install -y xvfb python-opengl 

Reading package lists...
Building dependency tree...
Reading state information...
python-opengl is already the newest version (3.1.0+dfsg-1).
xvfb is already the newest version (2:1.19.6-1ubuntu4.11).
The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'sudo apt autoremove' to remove it.
0 upgraded, 0 newly installed, 0 to remove and 19 not upgraded.


In [4]:
import collections
import gym
import numpy as np
import statistics
import tensorflow as tf
import tqdm

from matplotlib import pyplot as plt
from tensorflow.keras import layers
from typing import Any, List, Sequence, Tuple


# Create the environment
env = gym.make("CartPole-v0")

# Set seed for experiment reproducibility
seed = 42
env.seed(seed)
tf.random.set_seed(seed)
np.random.seed(seed)

# Small epsilon value for stabilizing division operations
eps = np.finfo(np.float32).eps.item()

## Modell
Sowohl *Actor* als auch *Critic* werden mit demselben neuronalen Netz abgebildet werden.

Als Input dient dabei der aktuelle State und die Outpus sind einerseits die *Action*-Wahrscheinlichkeiten, sowie die *Critic*-Werte.

Im CartPole-Environment gibt es 4 Werte, womit der State beschrieben wird: Wagen-Position, Wagen-Geschwindigkeit, Stangen-Winkel und Stangen-Geschwindigkeit.
Aktionen gibt es 2 Stück: Den Wagen nach links bewegen oder nach rechts bewegen.




In [5]:
class ActorCritic(tf.keras.Model):
  """Combined actor-critic network."""

  def __init__(
      self, 
      num_actions: int, 
      num_hidden_units: int):
    """Initialize."""
    super().__init__()

    self.common = layers.Dense(num_hidden_units, activation="relu")
    self.actor = layers.Dense(num_actions)
    self.critic = layers.Dense(1)

  def call(self, inputs: tf.Tensor) -> Tuple[tf.Tensor, tf.Tensor]:
    x = self.common(inputs)
    return self.actor(x), self.critic(x)

In [6]:
num_actions = env.action_space.n  # 2, move left or right
num_hidden_units = 128

model = ActorCritic(num_actions, num_hidden_units)

## Training

Das Training besteht aus 4 (bzw. 5) Schritten:

1. Den Agent im Environment laufen lassen und Trainingsdaten sammeln.
2. Zu jeder Zeiteinheit den zu erwarteten Return berechnen.
3. Loss für das Actor-Critic Modell berechnen.
4. Anhand des Losses Gradienten berechnen und die Parameter des neuronalen Netzes anpassen.
5. Schritte 1-4 wiederholen bis entweder das Problem gelöst ist, oder die maximale Anzahl an Episoden erreicht wurde.


### 1. Trainingsdaten sammeln
Trainingsdaten werden in jeder Episode gesammelt. Dabei werden zu jeder Zeiteinheit anhand des States mit der aktuellsten Version des neuronalen Netzes die Aktionswahrscheinlichkeiten berechnet. Anhand dieser Verteilung wird dann die nächste Aktion ausgeführt.

In [7]:
# Wrap OpenAI Gym's `env.step` call as an operation in a TensorFlow function.
# This would allow it to be included in a callable TensorFlow graph.

def env_step(action: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
  """Returns state, reward and done flag given an action."""

  state, reward, done, _ = env.step(action)
  return (state.astype(np.float32), 
          np.array(reward, np.int32), 
          np.array(done, np.int32))


def tf_env_step(action: tf.Tensor) -> List[tf.Tensor]:
  return tf.numpy_function(env_step, [action], 
                           [tf.float32, tf.int32, tf.int32])

In [8]:
def run_episode(
    initial_state: tf.Tensor,  
    model: tf.keras.Model, 
    max_steps: int) -> Tuple[tf.Tensor, tf.Tensor, tf.Tensor]:
  """Runs a single episode to collect training data."""

  action_probs = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)
  values = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)
  rewards = tf.TensorArray(dtype=tf.int32, size=0, dynamic_size=True)

  initial_state_shape = initial_state.shape
  state = initial_state

  for t in tf.range(max_steps):
    # Convert state into a batched tensor (batch size = 1)
    state = tf.expand_dims(state, 0)
  
    # Run the model and to get action probabilities and critic value
    action_logits_t, value = model(state)
  
    # Sample next action from the action probability distribution
    action = tf.random.categorical(action_logits_t, 1)[0, 0]
    action_probs_t = tf.nn.softmax(action_logits_t)

    # Store critic values
    values = values.write(t, tf.squeeze(value))

    # Store log probability of the action chosen
    action_probs = action_probs.write(t, action_probs_t[0, action])
  
    # Apply action to the environment to get next state and reward
    state, reward, done = tf_env_step(action)
    state.set_shape(initial_state_shape)
  
    # Store reward
    rewards = rewards.write(t, reward)

    if tf.cast(done, tf.bool):
      break

  action_probs = action_probs.stack()
  values = values.stack()
  rewards = rewards.stack()
  
  return action_probs, values, rewards

### 2. Zu erwartenden Return berechnen

Die Rewards der einzelnen Zeiteinheiten, welche über eine Episode hinweg gesammelt werden, werden in eine Sequenz der zu erwartenden Rewards verarbeitet. 
Dabei werden die Werte mit einem Faktor Gamma versehen, um Rewards, welche weiter in der Vergangenheit liegen, weniger Gewichtung zu schenken.

Um das Training zu stabilisieren, wird die Sequenz der Returns noch standardisiert.


In [9]:
def get_expected_return(
    rewards: tf.Tensor, 
    gamma: float, 
    standardize: bool = True) -> tf.Tensor:
  """Compute expected returns per timestep."""

  n = tf.shape(rewards)[0]
  returns = tf.TensorArray(dtype=tf.float32, size=n)

  # Start from the end of `rewards` and accumulate reward sums
  # into the `returns` array
  rewards = tf.cast(rewards[::-1], dtype=tf.float32)
  discounted_sum = tf.constant(0.0)
  discounted_sum_shape = discounted_sum.shape
  for i in tf.range(n):
    reward = rewards[i]
    discounted_sum = reward + gamma * discounted_sum
    discounted_sum.set_shape(discounted_sum_shape)
    returns = returns.write(i, discounted_sum)
  returns = returns.stack()[::-1]

  if standardize:
    returns = ((returns - tf.math.reduce_mean(returns)) / 
               (tf.math.reduce_std(returns) + eps))

  return returns

### 3. Berechnung des Actor-Critic Loss

Da das neuronale Netz aus der Kombination des Actors und Critic besteht, ist die Loss-Kunktion eine Summe der jeweiligen einzelnen Losses.

In [10]:
huber_loss = tf.keras.losses.Huber(reduction=tf.keras.losses.Reduction.SUM)

def compute_loss(
    action_probs: tf.Tensor,  
    values: tf.Tensor,  
    returns: tf.Tensor) -> tf.Tensor:
  """Computes the combined actor-critic loss."""

  advantage = returns - values

  action_log_probs = tf.math.log(action_probs)
  actor_loss = -tf.math.reduce_sum(action_log_probs * advantage)

  critic_loss = huber_loss(values, returns)

  return actor_loss + critic_loss

### 4. Parameter updaten

Durchführung der vorherigen Schritte und Anpassung des neuronalen Netzes.


In [11]:
optimizer = tf.keras.optimizers.Adam(learning_rate=0.01)


@tf.function
def train_step(
    initial_state: tf.Tensor, 
    model: tf.keras.Model, 
    optimizer: tf.keras.optimizers.Optimizer, 
    gamma: float, 
    max_steps_per_episode: int) -> tf.Tensor:
  """Runs a model training step."""

  with tf.GradientTape() as tape:

    # Run the model for one episode to collect training data
    action_probs, values, rewards = run_episode(
        initial_state, model, max_steps_per_episode) 

    # Calculate expected returns
    returns = get_expected_return(rewards, gamma)

    # Convert training data to appropriate TF tensor shapes
    action_probs, values, returns = [
        tf.expand_dims(x, 1) for x in [action_probs, values, returns]] 

    # Calculating loss values to update our network
    loss = compute_loss(action_probs, values, returns)

  # Compute the gradients from the loss
  grads = tape.gradient(loss, model.trainable_variables)

  # Apply the gradients to the model's parameters
  optimizer.apply_gradients(zip(grads, model.trainable_variables))

  episode_reward = tf.math.reduce_sum(rewards)

  return episode_reward

### 5. Training-Loop

Training läuf bis entweder das Problem gelöst wurde oder die maximale Anzahl an Episoden erreicht wurde.


In [12]:
%%time

min_episodes_criterion = 100
max_episodes = 10000
max_steps_per_episode = 1000

# Cartpole-v0 is considered solved if average reward is >= 195 over 100 
# consecutive trials
reward_threshold = 195
running_reward = 0

# Discount factor for future rewards
gamma = 0.99

# Keep last episodes reward
episodes_reward: collections.deque = collections.deque(maxlen=min_episodes_criterion)

with tqdm.trange(max_episodes) as t:
  for i in t:
    initial_state = tf.constant(env.reset(), dtype=tf.float32)
    episode_reward = int(train_step(
        initial_state, model, optimizer, gamma, max_steps_per_episode))
    
    episodes_reward.append(episode_reward)
    running_reward = statistics.mean(episodes_reward)
  
    t.set_description(f'Episode {i}')
    t.set_postfix(
        episode_reward=episode_reward, running_reward=running_reward)
  
    # Show average episode reward every 10 episodes
    if i % 10 == 0:
      pass # print(f'Episode {i}: average reward: {avg_reward}')
  
    if running_reward > reward_threshold and i >= min_episodes_criterion:  
        break

print(f'\nSolved at episode {i}: average reward: {running_reward:.2f}!')

Episode 425:   4%|▍         | 425/10000 [00:46<17:24,  9.17it/s, episode_reward=200, running_reward=196]


Solved at episode 425: average reward: 195.53!
CPU times: user 43.7 s, sys: 1.88 s, total: 45.5 s
Wall time: 46.4 s





## Visualisierung

Erstellung eines GIFs welches das fertige Modell zeigt.
Dieses kann in den Files gefunden werden.

In [13]:
# Render an episode and save as a GIF file

from IPython import display as ipythondisplay
from PIL import Image
from pyvirtualdisplay import Display


display = Display(visible=0, size=(400, 300))
display.start()


def render_episode(env: gym.Env, model: tf.keras.Model, max_steps: int): 
  screen = env.render(mode='rgb_array')
  im = Image.fromarray(screen)

  images = [im]
  
  state = tf.constant(env.reset(), dtype=tf.float32)
  for i in range(1, max_steps + 1):
    state = tf.expand_dims(state, 0)
    action_probs, _ = model(state)
    action = np.argmax(np.squeeze(action_probs))

    state, _, done, _ = env.step(action)
    state = tf.constant(state, dtype=tf.float32)

    # Render screen every 10 steps
    if i % 10 == 0:
      screen = env.render(mode='rgb_array')
      images.append(Image.fromarray(screen))
  
    if done:
      break
  
  return images


# Save GIF image
images = render_episode(env, model, max_steps_per_episode)
image_file = 'cartpole-v0.gif'
# loop=0: loop forever, duration=1: play each frame for 1ms
images[0].save(
    image_file, save_all=True, append_images=images[1:], loop=0, duration=1)