# 訓練強化學習模型來平衡倒立擺

這份筆記本是 [AI for Beginners Curriculum](http://aka.ms/ai-beginners) 的一部分，靈感來自 [官方 PyTorch 教程](https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html) 和 [這個 Cartpole 的 PyTorch 實現](https://github.com/yc930401/Actor-Critic-pytorch)。

在這個範例中，我們將使用強化學習（RL）來訓練一個模型，使其能夠在一個可以左右移動的小車上平衡一根桿子。我們將使用 [OpenAI Gym](https://www.gymlibrary.ml/) 環境來模擬這個倒立擺。

> **注意**: 你可以在本地環境（例如 Visual Studio Code）運行這節課的程式碼，此時模擬畫面會在新視窗中打開。如果在線上運行程式碼，可能需要對程式碼進行一些調整，具體請參考[這裡](https://towardsdatascience.com/rendering-openai-gym-envs-on-binder-and-google-colab-536f99391cc7)。

我們將從確保 Gym 已安裝開始：


In [None]:
import sys
!{sys.executable} -m pip install gym

現在讓我們建立 CartPole 環境，並了解如何操作它。一個環境具有以下特性：

* **動作空間** 是我們在模擬的每一步中可以執行的所有可能動作的集合  
* **觀察空間** 是我們可以進行觀察的所有可能狀態的集合  


In [None]:
import gym

env = gym.make("CartPole-v1")

print(f"Action space: {env.action_space}")
print(f"Observation space: {env.observation_space}")

讓我們看看模擬是如何運作的。以下的迴圈會執行模擬，直到 `env.step` 不再返回終止標誌 `done` 為止。我們將使用 `env.action_space.sample()` 隨機選擇動作，這意味著實驗可能會非常快地失敗（當小車的速度、位置或角度超出某些限制時，CartPole 環境就會終止）。

> 模擬將在新視窗中開啟。你可以多次執行程式碼，觀察其行為表現。


In [None]:
env.reset()

done = False
total_reward = 0
while not done:
   env.render()
   obs, rew, done, info = env.step(env.action_space.sample())
   total_reward += rew
   print(f"{obs} -> {rew}")
print(f"Total reward: {total_reward}")

您可以注意到觀測值包含四個數字。它們分別是：
- 小車的位置
- 小車的速度
- 杆子的角度
- 杆子的旋轉速率

`rew` 是我們在每一步中獲得的獎勵。在 CartPole 環境中，每次模擬步驟都會獲得 1 分，目標是最大化總獎勵，也就是讓 CartPole 能夠平衡而不倒下的時間越長越好。

在強化學習中，我們的目標是訓練一個 **策略** $\pi$，它能根據每個狀態 $s$ 告訴我們應該採取的行動 $a$，基本上就是 $a = \pi(s)$。

如果您希望使用概率解法，可以將策略理解為為每個行動返回一組概率，也就是 $\pi(a|s)$ 表示在狀態 $s$ 下採取行動 $a$ 的概率。

## 策略梯度方法

在最簡單的強化學習算法中，稱為 **策略梯度**，我們將訓練一個神經網絡來預測下一步的行動。


In [None]:
import numpy as np
import matplotlib.pyplot as plt
import torch

num_inputs = 4
num_actions = 2

model = torch.nn.Sequential(
    torch.nn.Linear(num_inputs, 128, bias=False, dtype=torch.float32),
    torch.nn.ReLU(),
    torch.nn.Linear(128, num_actions, bias = False, dtype=torch.float32),
    torch.nn.Softmax(dim=1)
)

我們將通過運行多次實驗來訓練網絡，並在每次運行後更新我們的網絡。讓我們定義一個函數來運行實驗並返回結果（所謂的**追蹤**）——所有狀態、行動（及其推薦的概率）和獎勵：


In [None]:
def run_episode(max_steps_per_episode = 10000,render=False):    
    states, actions, probs, rewards = [],[],[],[]
    state = env.reset()
    for _ in range(max_steps_per_episode):
        if render:
            env.render()
        action_probs = model(torch.from_numpy(np.expand_dims(state,0)))[0]
        action = np.random.choice(num_actions, p=np.squeeze(action_probs.detach().numpy()))
        nstate, reward, done, info = env.step(action)
        if done:
            break
        states.append(state)
        actions.append(action)
        probs.append(action_probs.detach().numpy())
        rewards.append(reward)
        state = nstate
    return np.vstack(states), np.vstack(actions), np.vstack(probs), np.vstack(rewards)

您可以使用未訓練的網絡運行一個情節，並觀察到總回報（即情節的長度）非常低：


In [None]:
s, a, p, r = run_episode()
print(f"Total reward: {np.sum(r)}")

政策梯度算法的一個棘手方面是使用**折扣回報**。其想法是我們在遊戲的每一步計算總回報的向量，並在此過程中使用某個係數 $gamma$ 折扣早期的回報。我們還對結果向量進行標準化，因為我們將使用它作為權重來影響我們的訓練：


In [None]:
eps = 0.0001

def discounted_rewards(rewards,gamma=0.99,normalize=True):
    ret = []
    s = 0
    for r in rewards[::-1]:
        s = r + gamma * s
        ret.insert(0, s)
    if normalize:
        ret = (ret-np.mean(ret))/(np.std(ret)+eps)
    return ret

現在開始實際訓練吧！我們將運行 300 次實驗，每次實驗中我們將執行以下步驟：

1. 執行實驗並收集追蹤數據
2. 計算所採取行動與預測機率之間的差異（`gradients`）。差異越小，表示我們越確定採取了正確的行動。
3. 計算折扣後的獎勵，並將梯度乘以折扣後的獎勵——這樣可以確保高獎勵的步驟對最終結果的影響比低獎勵的步驟更大。
4. 我們的神經網路的期望目標行動部分來自運行期間的預測機率，部分來自計算出的梯度。我們將使用 `alpha` 參數來決定梯度和獎勵在多大程度上被考慮——這被稱為強化學習算法的*學習率*。
5. 最後，我們基於狀態和期望行動訓練我們的網路，並重複這個過程。


In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

def train_on_batch(x, y):
    x = torch.from_numpy(x)
    y = torch.from_numpy(y)
    optimizer.zero_grad()
    predictions = model(x)
    loss = -torch.mean(torch.log(predictions) * y)
    loss.backward()
    optimizer.step()
    return loss

In [None]:
alpha = 1e-4

history = []
for epoch in range(300):
    states, actions, probs, rewards = run_episode()
    one_hot_actions = np.eye(2)[actions.T][0]
    gradients = one_hot_actions-probs
    dr = discounted_rewards(rewards)
    gradients *= dr
    target = alpha*np.vstack([gradients])+probs
    train_on_batch(states,target)
    history.append(np.sum(rewards))
    if epoch%100==0:
        print(f"{epoch} -> {np.sum(rewards)}")

plt.plot(history)

現在讓我們運行帶有渲染的劇集來查看結果：


In [None]:
_ = run_episode(render=True)

希望你能看到，現在杆子可以平衡得相當不錯了！

## Actor-Critic 模型

Actor-Critic 模型是策略梯度的進一步發展，在這個模型中，我們構建了一個神經網絡來同時學習策略和估算的回報。這個網絡將有兩個輸出（或者你可以將其視為兩個獨立的網絡）：
* **Actor** 將通過提供狀態概率分佈來推薦採取的行動，就像在策略梯度模型中一樣。
* **Critic** 則會估算這些行動可能帶來的回報。它會在給定的狀態下返回未來的總估算回報。

讓我們定義這樣一個模型：


In [None]:
from itertools import count
import torch.nn.functional as F

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
env = gym.make("CartPole-v1")

state_size = env.observation_space.shape[0]
action_size = env.action_space.n
lr = 0.0001

class Actor(torch.nn.Module):
    def __init__(self, state_size, action_size):
        super(Actor, self).__init__()
        self.state_size = state_size
        self.action_size = action_size
        self.linear1 = torch.nn.Linear(self.state_size, 128)
        self.linear2 = torch.nn.Linear(128, 256)
        self.linear3 = torch.nn.Linear(256, self.action_size)

    def forward(self, state):
        output = F.relu(self.linear1(state))
        output = F.relu(self.linear2(output))
        output = self.linear3(output)
        distribution = torch.distributions.Categorical(F.softmax(output, dim=-1))
        return distribution


class Critic(torch.nn.Module):
    def __init__(self, state_size, action_size):
        super(Critic, self).__init__()
        self.state_size = state_size
        self.action_size = action_size
        self.linear1 = torch.nn.Linear(self.state_size, 128)
        self.linear2 = torch.nn.Linear(128, 256)
        self.linear3 = torch.nn.Linear(256, 1)

    def forward(self, state):
        output = F.relu(self.linear1(state))
        output = F.relu(self.linear2(output))
        value = self.linear3(output)
        return value

我們需要稍微修改我們的 `discounted_rewards` 和 `run_episode` 函數：


In [None]:
def discounted_rewards(next_value, rewards, masks, gamma=0.99):
    R = next_value
    returns = []
    for step in reversed(range(len(rewards))):
        R = rewards[step] + gamma * R * masks[step]
        returns.insert(0, R)
    return returns

def run_episode(actor, critic, n_iters):
    optimizerA = torch.optim.Adam(actor.parameters())
    optimizerC = torch.optim.Adam(critic.parameters())
    for iter in range(n_iters):
        state = env.reset()
        log_probs = []
        values = []
        rewards = []
        masks = []
        entropy = 0
        env.reset()

        for i in count():
            env.render()
            state = torch.FloatTensor(state).to(device)
            dist, value = actor(state), critic(state)

            action = dist.sample()
            next_state, reward, done, _ = env.step(action.cpu().numpy())

            log_prob = dist.log_prob(action).unsqueeze(0)
            entropy += dist.entropy().mean()

            log_probs.append(log_prob)
            values.append(value)
            rewards.append(torch.tensor([reward], dtype=torch.float, device=device))
            masks.append(torch.tensor([1-done], dtype=torch.float, device=device))

            state = next_state

            if done:
                print('Iteration: {}, Score: {}'.format(iter, i))
                break


        next_state = torch.FloatTensor(next_state).to(device)
        next_value = critic(next_state)
        returns = discounted_rewards(next_value, rewards, masks)

        log_probs = torch.cat(log_probs)
        returns = torch.cat(returns).detach()
        values = torch.cat(values)

        advantage = returns - values

        actor_loss = -(log_probs * advantage.detach()).mean()
        critic_loss = advantage.pow(2).mean()

        optimizerA.zero_grad()
        optimizerC.zero_grad()
        actor_loss.backward()
        critic_loss.backward()
        optimizerA.step()
        optimizerC.step()


現在我們將運行主要訓練循環。我們將通過計算適當的損失函數並更新網絡參數來使用手動網絡訓練過程：


In [None]:

actor = Actor(state_size, action_size).to(device)
critic = Critic(state_size, action_size).to(device)
run_episode(actor, critic, n_iters=100)

In [None]:
env.close()

## 重點

在這次示範中，我們看到了兩種強化學習演算法：簡單的策略梯度和更複雜的演員-評論者方法。可以看到這些演算法是基於狀態、行動和獎勵的抽象概念運作，因此它們可以應用於非常不同的環境。

強化學習讓我們僅透過觀察最終的獎勵來學習解決問題的最佳策略。不需要標註的數據集使我們能夠多次重複模擬以優化模型。然而，強化學習仍然面臨許多挑戰，如果你決定深入研究這個有趣的人工智慧領域，可能會學到更多相關知識。



---

**免責聲明**：  
本文件已使用 AI 翻譯服務 [Co-op Translator](https://github.com/Azure/co-op-translator) 進行翻譯。儘管我們努力確保翻譯的準確性，但請注意，自動翻譯可能包含錯誤或不準確之處。原始文件的母語版本應被視為權威來源。對於關鍵信息，建議使用專業人工翻譯。我們對因使用此翻譯而引起的任何誤解或錯誤解釋不承擔責任。
