In [1]:
import gym
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import tensorflow.keras.layers as kl
import tensorflow.keras.losses as kls
import tensorflow.keras.optimizers as ko

class A2CAgent:
  def __init__(self, model, lr=7e-3, gamma=0.99, value_c=0.5, entropy_c=1e-4):
    self.gamma = gamma
    self.value_c = value_c
    self.entropy_c = entropy_c
    self.model = model
    self.model.compile(
      optimizer=ko.RMSprop(lr=lr),
      # 定義策略的Loss與價值函數的Loss
      loss=[self._logits_loss, self._value_loss])

  def train(self, env, batch_sz=64, updates=100):
    actions = np.empty((batch_sz,), dtype=np.int32) #儲存批次動作數據
    rewards, dones, values = np.empty((3, batch_sz)) #儲存批次獎勵、是否結束等數據
    observations = np.empty((batch_sz,) + env.observation_space.shape) #儲存批次觀測值
    # 進入訓練的迭代
    ep_rewards = [0.0]
    next_obs = env.reset()
    for update in range(updates):
      for step in range(batch_sz):
        observations[step] = next_obs.copy()
        actions[step], values[step] = self.model.action_value(next_obs[None, :]) #取得動作、價值
        next_obs, rewards[step], dones[step], _ = env.step(actions[step]) #取得動作之後的回饋值

        ep_rewards[-1] += rewards[step]
        if dones[step]:
          ep_rewards.append(0.0)
          next_obs = env.reset()
          print("迭代: %03d, 獎勵: %03d" % (len(ep_rewards) - 1, ep_rewards[-2]))

      _, next_value = self.model.action_value(next_obs[None, :]) #動作函數
      returns, advs = self._returns_advantages(rewards, dones, values, next_value) #優勢函數
      acts_and_advs = np.concatenate([actions[:, None], advs[:, None]], axis=-1) #整合動作與動作優勢函數
      losses = self.model.train_on_batch(observations, [acts_and_advs, returns])

    return ep_rewards

  def test(self, env, render=False):  #測試
    obs, done, ep_reward = env.reset(), False, 0
    while not done:
      action, _ = self.model.action_value(obs[None, :])
      obs, reward, done, _ = env.step(action)
      ep_reward += reward
      if render:
        env.render()
    return ep_reward

  def _returns_advantages(self, rewards, dones, values, next_value): #優勢函數
    # 評論
    returns = np.append(np.zeros_like(rewards), next_value, axis=-1)
    # 回傳折扣率之後的獎勵
    for t in reversed(range(rewards.shape[0])):
      returns[t] = rewards[t] + self.gamma * returns[t + 1] * (1 - dones[t])
    returns = returns[:-1]
    advantages = returns - values
    return returns, advantages

  def _value_loss(self, returns, value):
    # 計算Value Loss
    return self.value_c * kls.mean_squared_error(returns, value)

  def _logits_loss(self, actions_and_advantages, logits): 
    # 計算策略的Loss
    actions, advantages = tf.split(actions_and_advantages, 2, axis=-1)
    weighted_sparse_ce = kls.SparseCategoricalCrossentropy(from_logits=True)
    actions = tf.cast(actions, tf.int32)
    policy_loss = weighted_sparse_ce(actions, logits, sample_weight=advantages)
    probs = tf.nn.softmax(logits)
    entropy_loss = kls.categorical_crossentropy(probs, probs)
    return policy_loss - self.entropy_c * entropy_loss


class ProbabilityDistribution(tf.keras.Model): #機率模型
  def call(self, logits, **kwargs):
    return tf.squeeze(tf.random.categorical(logits, 1), axis=-1)

class Model(tf.keras.Model):#建立簡單的神經網路
  def __init__(self, num_actions):
    super().__init__('mlp_policy')
    self.hidden1 = kl.Dense(128, activation='relu')
    self.hidden2 = kl.Dense(128, activation='relu')
    self.value = kl.Dense(1, name='value')
    self.logits = kl.Dense(num_actions, name='policy_logits')
    self.dist = ProbabilityDistribution()

  def call(self, inputs, **kwargs):
    x = tf.convert_to_tensor(inputs) 
    # 隱藏層做分開
    hidden_logs = self.hidden1(x)
    hidden_vals = self.hidden2(x)
    return self.logits(hidden_logs), self.value(hidden_vals)

  def action_value(self, obs):
    logits, value = self.predict_on_batch(obs)
    action = self.dist.predict_on_batch(logits)
    return np.squeeze(action, axis=-1), np.squeeze(value, axis=-1)

env = gym.make('CartPole-v0')
model = Model(num_actions=env.action_space.n)

agent = A2CAgent(model)
rewards_history = agent.train(env)
print("總迭代獲得的獎勵: %d out of 200" % agent.test(env))

迭代: 001, 獎勵: 029
迭代: 002, 獎勵: 015
迭代: 003, 獎勵: 017
迭代: 004, 獎勵: 024
迭代: 005, 獎勵: 017
迭代: 006, 獎勵: 040
迭代: 007, 獎勵: 034
迭代: 008, 獎勵: 066
迭代: 009, 獎勵: 052
迭代: 010, 獎勵: 052
迭代: 011, 獎勵: 012
迭代: 012, 獎勵: 075
迭代: 013, 獎勵: 029
迭代: 014, 獎勵: 015
迭代: 015, 獎勵: 046
迭代: 016, 獎勵: 020
迭代: 017, 獎勵: 047
迭代: 018, 獎勵: 039
迭代: 019, 獎勵: 015
迭代: 020, 獎勵: 018
迭代: 021, 獎勵: 045
迭代: 022, 獎勵: 039
迭代: 023, 獎勵: 081
迭代: 024, 獎勵: 065
迭代: 025, 獎勵: 097
迭代: 026, 獎勵: 130
迭代: 027, 獎勵: 033
迭代: 028, 獎勵: 031
迭代: 029, 獎勵: 033
迭代: 030, 獎勵: 036
迭代: 031, 獎勵: 082
迭代: 032, 獎勵: 041
迭代: 033, 獎勵: 060
迭代: 034, 獎勵: 033
迭代: 035, 獎勵: 152
迭代: 036, 獎勵: 082
迭代: 037, 獎勵: 060
迭代: 038, 獎勵: 061
迭代: 039, 獎勵: 165
迭代: 040, 獎勵: 166
迭代: 041, 獎勵: 110
迭代: 042, 獎勵: 094
迭代: 043, 獎勵: 200
迭代: 044, 獎勵: 200
迭代: 045, 獎勵: 200
迭代: 046, 獎勵: 194
迭代: 047, 獎勵: 101
迭代: 048, 獎勵: 200
迭代: 049, 獎勵: 139
迭代: 050, 獎勵: 104
迭代: 051, 獎勵: 032
迭代: 052, 獎勵: 200
迭代: 053, 獎勵: 200
迭代: 054, 獎勵: 200
迭代: 055, 獎勵: 181
迭代: 056, 獎勵: 080
迭代: 057, 獎勵: 108
迭代: 058, 獎勵: 035
迭代: 059, 獎勵: 1