# 简介

本次作业包含2个代码填空和5个Exercise。

回顾课堂内容，我们知道TRPO在很多场景上都很成功，但是我们也发现了它的计算过程非常的复杂，每步更新的运算量非常大。于是，在2017年TRPO的改进版PPO算法被提出，它基于TRPO的思想，但是实现算法更加简单，避免了复杂的求KL散度的Hessian矩阵。并且大量的实验结果表明，PPO能够比TRPO学习的更快，这使得PPO一下子成为了非常流行的强化学习算法。如果我们想要尝试在一个新的环境用强化学习，那么PPO就属于那种可以首先尝试的算法。

## PPO算法

我们回忆一下TRPO的优化目标：
$$
 \max\limits_{\theta}\mathbb{E}_{s\sim \nu^{\pi_{\theta_k}}}\mathbb E_{a\sim \pi_{\theta_k}(\cdot|s)}\left[ \frac{\pi_{\theta}(a|s)}{\pi_{\theta_k}(a|s)}A^{\pi_{\theta_k}}(s,a)\right]\quad\text{s.t.}\quad \mathbb{E}_{s\sim \nu^{\pi_{\theta_k}}}\left[D_{KL}(\pi_{\theta_k}(\cdot|s),\pi_\theta(\cdot|s))\right]\le\delta
$$

TRPO使用泰勒展开近似、共轭梯度、线性搜索等方法直接求解。PPO的优化目标同样是它，但PPO用了一些相对简单的方法来求解。具体来说，PPO有两种形式，一是PPO-Penalty，二是PPO-Clip。

### PPO-Penalty

PPO-Penalty用Lagrange乘子法直接将KL散度的限制放进了目标函数中，这就变成了一个无约束的优化问题，然后在迭代的过程中不断更新KL散度前的系数。即
$$
\theta=\mathop{\arg\max}_\theta \mathbb{E}_{s\sim \nu^{\pi_{\theta_k}}}\mathbb E_{a\sim \pi_{\theta_k}(\cdot|s)}\left[ \frac{\pi_{\theta}(a|s)}{\pi_{\theta_k}(a|s)}A^{\pi_{\theta_k}}(s,a)-\beta D_{KL}[\pi_{\theta_k}(\cdot|s),\pi_\theta(\cdot|s)]\right]
$$

令 $d_k=D_{KL}^{\nu^{\pi_{\theta_k}}}(\pi_{\theta_k},\pi_\theta)$，$\beta$ 的更新规则如下：
1. 如果$d_k<\delta/1.5$，那么$\beta_{k+1}\leftarrow \beta_k/2$
2. 如果$d_k>\delta\times1.5$，那么$\beta_{k+1}\leftarrow \beta_k\times 2$
3. 否则$\beta_{k+1}=\beta_k$。




### PPO-Clip

PPO-Clip更加直接，其直接在目标函数里进行限制，以保证新的参数和旧的参数的差距不会太大，即

$$
\theta=\mathop{\arg\max}_\theta \mathbb{E}_{s\sim \nu^{\pi_{\theta_k}}}\mathbb E_{a\sim \pi_{\theta_k}(\cdot|s)}\left[\min \left(\frac{\pi_{\theta}(a|s)}{\pi_{\theta_k}(a|s)} A^{\pi_{\theta_k}}(s,a),\text{clip}\left(\frac{\pi_{\theta}(a|s)}{\pi_{\theta_k}(a|s)},1-\epsilon,1+\epsilon\right)A^{\pi_{\theta_k}}(s,a)\right)\right]
$$

其中 $\text{clip}(x,l,r):=\max(\min(x,r),l)$ ，即把 $x$ 限制在 $[l,r]$ 内。上式中$\epsilon$是一个超参数，表示clip的范围。如果$A(s,a)>0$，说明这个动作的Q值高于平均，最大化这个式子会增大$\frac{\pi_{\theta}(a|s)}{\pi_{\theta_k}(a|s)}$，但不会让其超过$1+\epsilon$。反之，如果$A(s,a)<0$，最大化这个式子会减小$\frac{\pi_{\theta}(a|s)}{\pi_{\theta_k}(a|s)}$，但不会让其超过$1-\epsilon$。

### PPO 算法改进
在标准的 PPO-Clip 算法里，我们使用 Clip 来约束新策略和旧策略之间的差异。新推出的 GRPO 算法对 PPO-Clip 做了一些改进，我们在这里尝试其中的两项改进：
1、结合 PPO-Penalty，将 KL 散度的惩罚项添加到目标函数中；
2、使用KL 散度的无偏估计替代对KL散度的直接计算
新的更新目标如下：
$$
\theta=\mathop{\arg\max}_\theta \mathbb{E}_{s\sim \nu^{\pi_{\theta_k}}}\mathbb E_{a\sim \pi_{\theta_k}(\cdot|s)}\left[\min \left(\frac{\pi_{\theta}(a|s)}{\pi_{\theta_k}(a|s)} A^{\pi_{\theta_k}}(s,a),\text{clip}\left(\frac{\pi_{\theta}(a|s)}{\pi_{\theta_k}(a|s)},1-\epsilon,1+\epsilon\right)A^{\pi_{\theta_k}}(s,a)\right)-\beta D_{KL}[\pi_{\theta_k}(\cdot|s),\pi_\theta(\cdot|s)]\right].
$$

其中 KL散度采用如下链接中给出的无偏估计 http://joschu.net/blog/kl-approx.html :
$$
D_{KL}[\pi_{\theta_k}(\cdot|s),\pi_\theta(\cdot|s)]=\frac{\pi_{\theta}(\cdot|s)}{\pi_{\theta_k}(\cdot|s)}-\log\frac{\pi_{\theta}(\cdot|s)}{\pi_{\theta_k}(\cdot|s)}-1.
$$

**Exercise 1**. 结合阅读材料 http://joschu.net/blog/kl-approx.html，说明为何要取如下的 estimator:
对于样本 $x_1,x_2\cdots ~ q$，令 $r=\frac{p(x)}{q(x)}$，我们有
$$
D_{KL}[p,q]=r\log r-(r-1)\\
D_{KL}[q,p]=(r-1)-\log r
$$

**解答**：

在 PPO 算法中，我们希望限制新策略与旧策略的差异，从而保持稳定性,因此这样取使用KL散度来度量策略变化的程度，可以作为一种正则项或指标来控制更新步长。

# PPO代码实践

我们在两个环境CartPle和Pendulum上测试PPO算法。我们采用改进后的PPO算法，并且通过参数搜索寻找最合适的$\beta$值

### 环境1

首先导入一些必要的库。

In [None]:
# % pip install gym
import torch
import torch.utils.data
import numpy as np
import gym
import copy
import matplotlib.pyplot as plt


In [None]:
from IPython.display import clear_output

我们的PPOAgent主要有两个网络：policy网络和value网络。

In [None]:
class PPOAgent:
    """创建两种agent，并设定部分超参数。"""

    def __init__(self, feature_n, action_n):
        self.policy_model = torch.nn.Sequential(torch.nn.Linear(feature_n, 30),
                                                torch.nn.Tanh(),
                                                torch.nn.Linear(30, action_n),
                                                torch.nn.Softmax(dim=1))
        self.value_model = torch.nn.Sequential(torch.nn.Linear(feature_n, 30),
                                               torch.nn.ReLU(),
                                               torch.nn.Linear(30, 10),
                                               torch.nn.ReLU(),
                                               torch.nn.Linear(10, 1))

        self.policy_optim = torch.optim.Adam(self.policy_model.parameters(), lr=1e-4)
        self.value_optim = torch.optim.Adam(self.value_model.parameters(), lr=3e-4)

        self.gamma = 0.98  # 折扣因子
        self.batch_size = 128
        self.eps = 0.2

    """根据给定的状态，采样动作。"""

    def sample_action(self, state):
        state_tensor = torch.tensor(state, dtype=torch.float)
        prob = self.policy_model(state_tensor)
        dist = torch.distributions.Categorical(probs=prob)
        return dist.sample(), prob

    """根据给定的状态，计算V函数。"""

    def get_value(self, state):
        return self.value_model(state)

    """策略学习。"""

    def policy_learn(self, s, a, old_pro, adv, beta=0.1):
        ########################################
        ## Programming 1:策略更新
        ########################################
        new_probs = self.policy_model(s)
        new_probs_a = new_probs.gather(1, a) 
        old_probs_a = old_pro.gather(1, a) 

        prob_ratio = (new_probs_a / (old_probs_a.detach() + 1e-8)) 

        surr1 = prob_ratio * adv 
        surr2 = torch.clamp(prob_ratio, 1 - self.eps, 1 + self.eps) * adv  
        policy_loss = -torch.min(surr1, surr2).mean() 

        entropy = -torch.sum(new_probs * torch.log(new_probs + 1e-8), dim=1).mean() 
        policy_loss -= beta * entropy 

        self.policy_optim.zero_grad()
        policy_loss.backward()
        torch.nn.utils.clip_grad_norm_(self.policy_model.parameters(), 0.5) 
        self.policy_optim.step()
        #############################
        ########################################
        ## End of Programming 1
        ########################################
        return prob_ratio  # 返回probability ratio以观察训练过程

    """价值学习。"""

    def value_learn(self, s, r, d, s_):
        v_ = self.get_value(s_)
        v = self.get_value(s)
        td_error = v - (r + (1 - d) * self.gamma * v_).detach()
        loss = td_error.pow(2).mean()

        self.value_optim.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(self.value_model.parameters(), .5)
        self.value_optim.step()

计算advantage函数的时候，对于Q函数的计算做一步展开（TD）。

In [None]:
def compute_advantage(s, s_, r, d, agent):
    with torch.no_grad():
        q_fn = r + (1 - d) * agent.gamma * agent.value_model(s_)
    return q_fn - agent.value_model(s)

创建环境，并设定随机数种子以便重复实现。

In [None]:
env_name = "CartPole-v1"
env = gym.make(env_name)

seed = 999
env.reset(seed=seed)
np.random.seed(seed)
torch.manual_seed(seed)

实例化agent并开始训练。在训练过程中，我们会动态描绘训练曲线（横坐标是episode，纵坐标是对应的reward）。

当训练出的agent已经足够好时，训练停止并输出“Solved!”。

期望运行时间：$1$分钟。

In [None]:
# line = 200
line = 500
agent = PPOAgent(env.observation_space.shape[0], env.action_space.n)
epi_n = 1500
mini_epoch = 15
i_list = []
r_list = []
p_list = []
p_i_list = []

s, a, p, r, s_, d = [], [], [], [], [], []

for i in range(epi_n + 1):
    state = env.reset()[0]
    done = False

    tot_reward = 0
    while not done:
        action, action_distribution = agent.sample_action([state])
        next_state, reward, terminated, truncated, info = env.step(action.item())
        done = terminated or truncated
        tot_reward += reward

        if done:
            reward = -20

        s.append(state)
        a.append(action)
        p.append(action_distribution)
        r.append(reward)
        s_.append(next_state)
        d.append(done)

        state = next_state

    if len(s) >= 200:
        # print(f"collect {len(s)} samples")
        s = torch.tensor(s, dtype=torch.float)
        a = torch.tensor(a, dtype=torch.long).view(-1, 1)
        p = torch.cat(p).view(-1, 2)
        r = torch.tensor(r, dtype=torch.float).view(-1, 1)
        s_ = torch.tensor(s_, dtype=torch.float)
        d = torch.tensor(d, dtype=torch.float).view(-1, 1)

        adv = compute_advantage(s, s_, r, d, agent).detach().view(-1, 1)

        prob_ratio = 0
        for _ in range(mini_epoch):
            prob_list = []
            for idx in torch.utils.data.sampler.BatchSampler(
                    torch.utils.data.sampler.SubsetRandomSampler(range(len(s))), agent.batch_size, False):
                prob = agent.policy_learn(s[idx], a[idx], p[idx], adv[idx])
                agent.value_learn(s[idx], r[idx], d[idx], s_[idx])
                prob_list.append(prob.mean().item())
            prob_ratio += np.mean(prob_list)
        p_list.append(prob_ratio / mini_epoch)
        p_i_list.append(i)
        # on-policy 训练
        s, a, p, r, s_, d = [], [], [], [], [], []

    # 画图

    if i % 100 == 0:
        plt.figure()
        plt.plot([0, i], [line, line])
        i_list.append(i)
        tot_reward_list = []
        for _ in range(5):
            tot_reward = 0
            state = env.reset()[0]
            done = False
            while not done:
                _, prob = agent.sample_action([state])
                action = prob.argmax(-1)
                state, reward, terminated, truncated, _ = env.step(action.item())
                done = terminated or truncated
                tot_reward += reward
            tot_reward_list.append(tot_reward)
        r_list.append(np.mean(tot_reward_list))
        plt.plot(i_list, r_list)
        clear_output(True)
        plt.xlabel('episodes')
        plt.ylabel('accumulated reward')
        plt.figure()
        plt.plot(p_i_list, p_list)
        plt.xlabel('episodes')
        plt.ylabel('prob ratio')
        plt.show()

参考训练过程如下：

![Image Name](https://cdn.kesci.com/upload/rt/w39tuKE-Rwv_/qsxj2k8ece.png)



![Image Name](https://cdn.kesci.com/upload/rt/w39tuKE-Rwv_/qsxj2kvekg.png)


在上面的图中，probability ratio的值可以用来作为mini-epoch，learning rate这些参数调节的依据。离散动作空间中，在训练中期，该值可以说明策略更新的幅度，且应该保持在较大的值。而在下面的连续动作空间中，由于假设习得的动作分布是高斯分布，该值一般较为稳定。

改进的ppo算法存在超参数 $\beta$, 我们可以通过调整这个参数来观察训练过程中的变化。请结合先前代码，在下面的代码块中设计一个简单的参数搜索算法，来寻找最优的 $\beta$ 值。并在Exercise 2中简要说明你的参数搜索算法是如何评估 $\beta$ 值的好坏的并给出你认为的最优的 $\beta$ 值。

In [None]:
def train_agent_with_beta(beta_value, env, num_episodes=500, mini_epoch=5):
    feature_n = env.observation_space.shape[0]
    action_n = env.action_space.n

    agent = PPOAgent(feature_n, action_n)
    cumulative_rewards = [] 

    s, a, p, r, s_, d = [], [], [], [], [], []

    for episode in range(num_episodes):
        state = env.reset()[0]
        done = False
        episode_reward = 0

        while not done:
            # 解包返回两个值
            action, action_log_prob = agent.sample_action([state])
            action = int(action)
            next_state, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated
            episode_reward += reward

            s.append(state)
            a.append(action)
            p.append(action_log_prob)
            r.append(reward)
            s_.append(next_state)
            d.append(done)

            state = next_state

        cumulative_rewards.append(episode_reward)

        if len(s) >= 1000:
            s_tensor   = torch.tensor(s, dtype=torch.float)
            a_tensor   = torch.tensor(a, dtype=torch.long).view(-1, 1)
            p_tensor   = torch.cat(p, dim=0)
            r_tensor   = torch.tensor(r, dtype=torch.float).view(-1, 1)
            s_tensor_  = torch.tensor(s_, dtype=torch.float)
            d_tensor   = torch.tensor(d, dtype=torch.float).view(-1, 1)

            # 奖励归一化（可选）
            r_tensor = (r_tensor - r_tensor.mean()) / (r_tensor.std() + 1e-8)
            adv = compute_advantage(s_tensor, s_tensor_, r_tensor, d_tensor, agent).detach().view(-1, 1)

            for _ in range(mini_epoch):
                for idx in torch.utils.data.BatchSampler(
                        torch.utils.data.SubsetRandomSampler(range(len(s_tensor))),
                        agent.batch_size,
                        drop_last=False):
                    agent.policy_learn(s_tensor[idx], a_tensor[idx], p_tensor[idx], adv[idx], beta=beta_value)
                    agent.value_learn(s_tensor[idx], r_tensor[idx], d_tensor[idx], s_tensor_[idx])
            s, a, p, r, s_, d = [], [], [], [], [], []

    avg_final_reward = np.mean(cumulative_rewards[-10:])
    return avg_final_reward


# 调用部分
env = gym.make("CartPole-v1")
beta_candidates = np.arange(0.001, 0.1001, 0.005)
beta_results = {}

print("开始进行 β 参数的网格搜索（Exercise 2）……")
for beta in beta_candidates:
    avg_reward = train_agent_with_beta(beta_value=beta, env=env, num_episodes=500, mini_epoch=5)
    beta_results[beta] = avg_reward
    print(f"Beta: {beta:.4f}  --> 最后 10 个 episode 平均累计奖励: {avg_reward:.2f}")

best_beta = max(beta_results, key=beta_results.get)
print("\n参数搜索完成。")
print(f"最优的 β 值为：{best_beta:.4f}，对应的平均累计奖励为：{beta_results[best_beta]:.2f}")

**Exercise 2**. 请在此简要说明你的参数搜索算法是如何评估 $\beta$ 值的好坏的并给出你认为的最优的 $\beta$ 值。

- 采用网格搜索方法优化超参数 beta，其搜索范围设定为 [0.001, 0.1]，步长为 0.005，共进行了约 20 次实验。每次实验，重新初始化 PPOAgent，并训练固定数量（500 个）的 episode，记录每个 episode 的累计奖励，最终以最后 10 个episode 的平均累计奖励作为性能评估指标。

- 对于每个 beta 值，累计奖励越高，说明该 beta 值使策略收敛更快、表现更好，因此被认为更优。

- 最优的beta值为：0.036

### 连续动作环境

对于连续动作环境，我们需要对上面的代码做一定的修改。具体来说，需要修改以下代码块（直接运行即可）。

下面是新的policy网络，因为环境是连续动作的，因此我们的网络分别输出表示动作分布的高斯分布的 $\mu$ 和 $\sigma$ 。

In [None]:
class PolicyModel(torch.nn.Module):
    def __init__(self, feature_n):
        super(PolicyModel, self).__init__()
        self.l1 = torch.nn.Linear(feature_n, 100)
        self.l2 = torch.nn.Linear(100, 1)
        self.l3 = torch.nn.Linear(100, 1)

    def forward(self, x):
        x = torch.nn.functional.relu(self.l1(x))
        mu = 2 * torch.tanh(self.l2(x))
        sigma = torch.nn.functional.softplus(self.l3(x))
        return mu, sigma

以及修改一些主函数。

这和上面的PPOAgent是类似的，只有部分修改。

In [None]:
class PPOAgent:

    def __init__(self, feature_n, action_n):
        self.policy_model = PolicyModel(feature_n)
        self.value_model = torch.nn.Sequential(torch.nn.Linear(feature_n, 100),
                                               torch.nn.ReLU(),
                                               torch.nn.Linear(100, 1))

        self.policy_optim = torch.optim.Adam(self.policy_model.parameters(), lr=1e-4)
        self.value_optim = torch.optim.Adam(self.value_model.parameters(), lr=3e-4)

        self.gamma = 0.9  # 折扣因子
        self.batch_size = 128
        self.eps = 0.2

    def sample_action(self, state):
        state_tensor = torch.tensor(state, dtype=torch.float)
        with torch.no_grad():
            mu, sigma = self.policy_model(state_tensor)
        dist = torch.distributions.normal.Normal(mu, sigma)
        action = dist.sample()
        action_log_prob = dist.log_prob(action)
        return action.item(), action_log_prob.item(), dist

    def get_value(self, state):
        return self.value_model(state)

    def policy_learn(self, s, a, old_p, adv, beta = 0.05):
        ########################################
        ## Programming 2:策略更新
        ########################################
        mu, sigma = self.policy_model(s)
        dist = torch.distributions.Normal(mu, sigma)

        new_log_prob = dist.log_prob(a) 

        ratio = torch.exp(new_log_prob - old_p) 
   
        surr1 = ratio * adv
        surr2 = torch.clamp(ratio, 1 - self.eps, 1 + self.eps) * adv  
        policy_loss = -torch.min(surr1, surr2).mean() 
        
        entropy = dist.entropy().mean() 
        policy_loss -= beta * entropy 

        self.policy_optim.zero_grad()
        policy_loss.backward()
        torch.nn.utils.clip_grad_norm_(self.policy_model.parameters(), 0.5)  
        self.policy_optim.step()
        
        prob_ratio = ratio.mean() 
        ########################################
        ## End of Programming 2
        ########################################
        return prob_ratio

    def value_learn(self, s, r, d, s_):
        v_ = self.get_value(s_)
        v = self.get_value(s)
        td_error = v - (r + (1 - d) * self.gamma * v_).detach()
        loss = td_error.pow(2).mean()

        self.value_optim.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(self.value_model.parameters(), .5)
        self.value_optim.step()


创建环境Pendulum-v1，并设定随机数种子以便重复实现。

In [None]:
env_name = "Pendulum-v1"
env = gym.make(env_name)

seed = 0
env.reset(seed=seed)
np.random.seed(seed)
torch.manual_seed(seed)

实例化agent并开始训练。在训练过程中，我们会动态描绘训练曲线（横坐标是episode，纵坐标是对应的reward）。

当训练出的agent已经足够好时，训练停止并输出“Solved!”。

期望运行时间：$3$分钟。

In [None]:
line = -200

agent = PPOAgent(env.observation_space.shape[0], env.action_space.shape[0])
epi_n = 1500
mini_epoch = 15
i_list = []
r_list = []
p_list = []
p_i_list = []
s, a, p, r, s_, d = [], [], [], [], [], []

for i in range(epi_n + 1):
    state = env.reset()[0]
    done = False

    tot_reward = 0
    while not done:
        action, action_distribution, _ = agent.sample_action([state])
        next_state, reward, terminated, truncated, info = env.step([action])
        done = terminated or truncated
        tot_reward += reward

        s.append(state)
        a.append(action)
        p.append(action_distribution)
        r.append(reward)
        s_.append(next_state)
        d.append(done)

        state = next_state

    if len(s) >= 1000:
        s = torch.tensor(s, dtype=torch.float)
        a = torch.tensor(a, dtype=torch.float).view(-1, 1)
        p = torch.tensor(p, dtype=torch.float).view(-1, 1)
        r = torch.tensor(r, dtype=torch.float).view(-1, 1)
        s_ = torch.tensor(s_, dtype=torch.float)
        d = torch.tensor(d, dtype=torch.float).view(-1, 1)

        r = (r - r.mean()) / (r.std() + 1e-8)

        adv = compute_advantage(s, s_, r, d, agent).detach().view(-1, 1)

        prob_ratio = 0
        for _ in range(mini_epoch):
            prob_list = []
            for idx in torch.utils.data.sampler.BatchSampler(
                    torch.utils.data.sampler.SubsetRandomSampler(range(len(s))), agent.batch_size, False):
                prob = agent.policy_learn(s[idx], a[idx], p[idx], adv[idx])
                agent.value_learn(s[idx], r[idx], d[idx], s_[idx])
                prob_list.append(prob.mean().item())
            prob_ratio += np.mean(prob_list)
        p_list.append(prob_ratio / mini_epoch)
        p_i_list.append(i)
        # on-policy 训练
        s, a, p, r, s_, d = [], [], [], [], [], []

    # 画图
    if i % 100 == 0:
        plt.figure()
        plt.plot([0, i], [line, line])
        i_list.append(i)
        tot_reward_list = []
        for _ in range(5):
            tot_reward = 0
            state = env.reset()[0]
            done = False
            while not done:
                _, _, prob = agent.sample_action([state])
                action = prob.mean
                state, reward, terminated, truncated, _ = env.step([action.item()])
                done = terminated or truncated
                tot_reward += reward
            tot_reward_list.append(tot_reward)
        r_list.append(np.mean(tot_reward_list))
        plt.plot(i_list, r_list)
        clear_output(True)
        plt.xlabel('episodes')
        plt.ylabel('accumulated reward')
        plt.figure()
        plt.plot(p_i_list, p_list)
        plt.xlabel('episodes')
        plt.ylabel('prob ratio')
        plt.show()

参考训练过程如下：

![Image Name](images/1.png)

![Image Name](images/2.png)



同样的，请在此实现对 $beta$ 的参数搜索算法，并在Exercise 3中简要说明你的参数搜索算法是如何评估 $\beta$ 值的好坏的并给出你认为的最优的 $\beta$ 值。

In [None]:

def train_agent_with_beta(beta_value, env, num_episodes=500, mini_epoch=5):
    feature_n = env.observation_space.shape[0]
    action_n = env.action_space.shape[0]
    agent = PPOAgent(feature_n, action_n)
    
    cumulative_rewards = []
    s, a, p, r, s_, d = [], [], [], [], [], []
    
    for episode in range(num_episodes):
        state = env.reset()[0]
        done = False
        episode_reward = 0
        
        while not done:
            action, action_log_prob, _ = agent.sample_action([state])
            next_state, reward, terminated, truncated, info = env.step([action])
            done = terminated or truncated
            episode_reward += reward
            
            s.append(state)
            a.append(action)
            p.append(action_log_prob)
            r.append(reward)
            s_.append(next_state)
            d.append(done)
            
            state = next_state
        
        cumulative_rewards.append(episode_reward)

        if len(s) >= 1000:
            s_tensor = torch.tensor(s, dtype=torch.float)
            a_tensor = torch.tensor(a, dtype=torch.float).view(-1, 1)
            p_tensor = torch.tensor(p, dtype=torch.float).view(-1, 1)
            r_tensor = torch.tensor(r, dtype=torch.float).view(-1, 1)
            s_tensor_ = torch.tensor(s_, dtype=torch.float)
            d_tensor = torch.tensor(d, dtype=torch.float).view(-1, 1)

            r_tensor = (r_tensor - r_tensor.mean()) / (r_tensor.std() + 1e-8)
            
            adv = compute_advantage(s_tensor, s_tensor_, r_tensor, d_tensor, agent).detach().view(-1, 1)
            
            for _ in range(mini_epoch):
                for idx in torch.utils.data.BatchSampler(
                        torch.utils.data.SubsetRandomSampler(range(len(s_tensor))),
                        agent.batch_size,
                        drop_last=False):
                    agent.policy_learn(s_tensor[idx], a_tensor[idx], p_tensor[idx], adv[idx], beta=beta_value)
                    agent.value_learn(s_tensor[idx], r_tensor[idx], d_tensor[idx], s_tensor_[idx])

            s, a, p, r, s_, d = [], [], [], [], [], []
  
    avg_final_reward = np.mean(cumulative_rewards[-10:])
    return avg_final_reward

env = gym.make("Pendulum-v1") 

beta_candidates = np.arange(0.001, 0.1001, 0.005) 
beta_results = {}

print("开始对 beta 参数进行搜索……")
for beta in beta_candidates:
    avg_reward = train_agent_with_beta(beta_value=beta, env=env, num_episodes=500, mini_epoch=5)
    beta_results[beta] = avg_reward
    print(f"Beta: {beta:.4f}, 最后10个 episode 平均累计奖励: {avg_reward:.2f}")

best_beta = max(beta_results, key=beta_results.get)
print("\n参数搜索完成。")
print(f"最优的 beta 值为：{best_beta:.4f}，对应的平均累计奖励为：{beta_results[best_beta]:.2f}")


**Exercise 3**. 请在此简要说明你的参数搜索算法是如何评估 $\beta$ 值的好坏的并给出你认为的最优的 $\beta$ 值。

回答：

- 采用网格搜索方法优化超参数 beta，其搜索范围设定为 [0.001, 0.1]，步长为 0.005，共进行了约 20 次实验。每次实验，重新初始化 PPOAgent，并训练固定数量（500 个）的 episode，记录每个 episode 的累计奖励，最终以最后 10 个episode 的平均累计奖励作为性能评估指标。

- 对于每个 beta 值，累计奖励越高，说明该 beta 值使策略收敛更快、表现更好，因此被认为更优。

- 最优的beta值为：0.0110




# 更新目标理论
学习完这两章，现在我们给出一些关于策略梯度算法的更新目标的理论：
首先，由策略梯度定理（证明见策略梯度章节的作业），我们知道：
$$
\nabla_{\theta} \mathbb{E}_{\tau \sim \pi_{\theta}}[R(\tau)] = 
\mathbb{E}_{\tau \sim \pi_{\theta}}\left[\sum_{t=0}^{T-1} \nabla_{\theta} \log \pi_{\theta}\left(a_{t} \mid s_{t}\right)\left(\sum_{t^{\prime}=t}^{T-1} r_{t^{\prime}}\right)\right] 
\tag{1}
$$
对比PPO的更新目标（这里使用GAE($\lambda,0)$）$\mathbb{E}_{s_t,s_{t+1} \sim \pi_{\theta}}[r_t+\gamma V(s_{t+1})-V(s_{t})]$和公式(1)，发现其中的差异主要在于减去了一项$V(s_{t})$，这一项通常被称为Baseline，此项一般只与状态有关。

将Baseline表示为$b(s_t)$，引入Baseline，有
$$
\nabla_{\theta} \mathbb{E}_{\tau \sim \pi_{\theta}}[R(\tau)] = 
\mathbb{E}_{\tau \sim \pi_{\theta}}\left[\sum_{t=0}^{T-1} \nabla_{\theta} \log \pi_{\theta}\left(a_{t} \mid s_{t}\right)\left(\sum_{t^{\prime}=t}^{T-1} r_{t^{\prime}}-b(s_t)\right)-\right] 
\tag{2}
$$


**Exercise 4**. 证明
$$
\mathbb{E}_{\tau \sim \pi_{\theta}}\left[\nabla_{\theta} \log \pi_{\theta}\left(a_{t} \mid s_{t}\right) b\left(s_{t}\right)\right] = 0 \tag{3}.
$$

提示1：$\mathbb{E}_{\tau \sim \pi_{\theta}}\left[\nabla_{\theta} \log \pi_{\theta}\left(a_{t} \mid s_{t}\right) b\left(s_{t}\right)\right]=\mathbb{E}_{s_{0: t}, a_{0: t-1}}\left[\mathbb{E}_{s_{t+1: T}, a_{t: T-1}}\left[\nabla_{\theta} \log \pi_{\theta}\left(a_{t} \mid s_{t}\right) b\left(s_{t}\right)\right]\right]$.

提示2：要**证明** $\mathbb{E}_{a_{t}}\left[\nabla_{\theta} \log \pi_{\theta}\left(a_{t} \mid s_{t}\right)\right]=0$.

**证明**：

对于给定状态$s_t$，考虑动作的分布$\pi_{\theta}\left(a_{t} \mid s_{t}\right)$。根据概率密度函数的性质，有

$$\int \pi_{\theta}\left(a \mid s_{t}\right) d a=1$$

对参数 $\theta$ 求梯度，两边得到

$$\nabla_{\theta} \int \pi_\theta(a \mid s_t) \, da = \nabla_{\theta} 1 = 0$$

化简得：

$$\int \nabla_{\theta} \pi_\theta(a \mid s_t) \, da = 0$$

又有：

$$\nabla_{\theta} \pi_\theta(a \mid s_t) = \pi_\theta(a \mid s_t) \, \nabla_{\theta} \log \pi_\theta(a \mid s_t)$$

带入得：

$$\int \pi_\theta(a \mid s_t) \, \nabla_{\theta} \log \pi_\theta(a \mid s_t) \, da = 0$$

即：

$$\mathbb{E}_{a_{t}\sim \pi_\theta(\cdot \mid s_t)}\left[\nabla_{\theta} \log \pi_\theta(a_t \mid s_t)\right] = 0.$$

现在，考虑包含 $b(s_t)$ 的期望，由于 $b(s_t) 与 a_t$ 无关，有

$$
\mathbb{E}_{a_{t}\sim \pi_\theta(\cdot \mid s_t)}\left[\nabla_{\theta} \log \pi_\theta(a_t \mid s_t)b(s_t)\right] = b(s_t) \, \mathbb{E}_{a_{t}\sim \pi_\theta(\cdot \mid s_t)}\left[\nabla_{\theta} \log \pi_\theta(a_t \mid s_t)\right]= b(s_t) \cdot 0 = 0
$$

利用全概率公式，对整个轨迹 $\tau \sim \pi_\theta$ 做期望，可以写成

$$
\mathbb{E}_{\tau \sim \pi_{\theta}}\Bigl[\nabla_{\theta}\log \pi_{\theta}(a_t\mid s_t)b(s_t)\Bigr] = \mathbb{E}_{s_{0:t},\, a_{0:t-1}}\left[\mathbb{E}_{s_{t+1:T},\, a_{t:T-1}}\Bigl[\nabla_{\theta}\log \pi_{\theta}(a_t\mid s_t)\, b(s_t)\Bigr]\right] = \mathbb{E}_{s_t}\left[b(s_t)\, \mathbb{E}_{a_t\sim \pi_{\theta}(\cdot \mid s_t)}\left[\nabla_{\theta}\log \pi_{\theta}(a_t\mid s_t)\right]\right] = \mathbb{E}_{s_t}\left[b(s_t) \cdot 0\right] = 0
$$

得证

**Exercise 5**.
证明
1. 求出最小化$Var(\widehat{\nabla_{\theta} J})$的$b^{*}(s^{k}_t)$, 并指出其在何种条件下等于$\mathbb{E}_{\tau}\left[\sum_{t^{\prime}=t}^{T-1} r^{k}_{t^{\prime}}(s_{t^{\prime}}^k, a_{t^{\prime}}^k)\right]=V(s^k_t)$.
2. 记不用Baseline的梯度估计为
$$
\widehat{\nabla_{\theta} J}^{\prime} = \frac{1}{N}\sum_{k=0}^{N} \left[\sum_{t=0}^{T-1} \nabla_{\theta} \log \pi_{\theta}\left(a_{t}^{k} \mid s_{t}^{k}\right)\left(\sum_{t^{\prime}=t}^{T-1} r^{k}_{t^{\prime}}\right)\right]. \tag{5}
$$
证明使用$b^{*}$的梯度估计方差小于该不用baseline的梯度估计方差。

提示1：$Var(X)=\mathbb{E}(X^2)-(\mathbb{E}(X))^2$, $Var(X+Y) = Var(X)+Var(Y)+2Cov(X,Y)$

提示2：使用**Ex 1**的结论。

提示3： 最小二乘法。

**证明**：

1.

我们考虑带 baseline 的策略梯度估计，其形式为:

$$\widehat{\nabla_{\theta} J}(b) = \sum_{t=0}^{T-1} \nabla_{\theta} \log \pi_{\theta}\bigl(a_{t} \mid s_{t}\bigr) \left( \sum_{t'=t}^{T-1} r_{t'} - b(s_t) \right)$$

又，我们知道对于任意与动作无关的 baseline $b(s_t)$ 都不改变梯度估计的无偏性，而可以用来降低方差。

令$G_t = \sum_{t'=t}^{T-1} r_{t'} \quad $和$ \quad X = \nabla_{\theta}\log\pi_{\theta}(a_{t}\mid s_t)$

考虑在给定状态 \(s_t\) 下的局部目标函数:

$\mathcal{V}(b) = \mathbb{E}_{a_t\sim\pi_\theta(\cdot\mid s_t)}\Bigl[\bigl\|X\bigr\|^2 \Bigl(G_t - b(s_t)\Bigr)^2\Bigr]$

这相当于一个加权的最小二乘问题,可得到最优解为：

$$
b^{*}(s_t) = \frac{\mathbb{E}_{a_t}\Bigl[c(s_t,a_t)\, G_t\Bigr]}{\mathbb{E}_{a_t}\Bigl[c(s_t,a_t)\Bigr]} = \frac{\mathbb{E}_{a_t}\Bigl[\bigl\|\nabla_{\theta}\log \pi_{\theta}(a_t\mid s_t)\bigr\|^2\, G_t\Bigr]}{\mathbb{E}_{a_t}\Bigl[\bigl\|\nabla_{\theta}\log \pi_{\theta}(a_t\mid s_t)\bigr\|^2\Bigr]}
$$

若对于给定状态 $s_t$ 下，$\bigl\|\nabla_{\theta}\log \pi_{\theta}(a_t\mid s_t)\bigr\|^2$ 关于动作 $a_t$ 没有变化（即为常数），则
$$b^{*}(s_t)=\frac{\textstyle c(s_t)\, \mathbb{E}_{a_t}[G_t]}{c(s_t)} =\mathbb{E}_{a_t}[G_t]=\mathbb{E}_{\tau}\left[ \sum_{t'=t}^{T-1} r_{t'} \,\Big|\, s_t \right]=V(s_t)$$

即最优 baseline 就等于状态价值函数 $V(s_t)$。

2.

记不用 baseline（即 $b(s_t)=0$）时的梯度估计为

$$\widehat{\nabla_{\theta} J}^{\prime} = \frac{1}{N}\sum_{k=1}^{N} \left[\sum_{t=0}^{T-1} \nabla_{\theta} \log \pi_{\theta}\bigl(a_{t}^{k} \mid s_{t}^{k}\bigr)\, G_t^k \right]$$

使用 baseline $b(s_t)$ 后的梯度估计为

$$\widehat{\nabla_{\theta} J}(b) = \frac{1}{N}\sum_{k=1}^{N} \left[\sum_{t=0}^{T-1} \nabla_{\theta} \log \pi_{\theta}\bigl(a_{t}^{k} \mid s_{t}^{k}\bigr)\Bigl( G_t^k - b(s_t^k) \Bigr) \right]$$


对于固定 $s_t$ 和对应 $a_t$ 的取样，记则第 $k$ 个样本对梯度估计的贡献为

$$Z^k(b) = \Delta^k \Bigl( G_t^k - b(s_t) \Bigr)$$


所以其方差为

$$\operatorname{Var}\Bigl(Z^k(b)\Bigr) = \mathbb{E}\Bigl[\bigl\|\Delta^k\bigr\|^2 \Bigl(G_t^k - b(s_t)\Bigr)^2\Bigr] \quad$$

将 $b(s_t)$ 作为可调参数，用最小二乘法求解问题

$$\min_{b(s_t)} \; \mathbb{E}\Bigl[\bigl\|\Delta^k\bigr\|^2 \Bigl(G_t^k - b(s_t)\Bigr)^2\Bigr]$$

因此有

$$\operatorname{Var}\Bigl(Z^k(b^*(s_t))\Bigr) \le \operatorname{Var}\Bigl(Z^k(0)\Bigr)$$

由此，对所有样本取平均后，可以证明使用最优 baseline 的梯度估计方差小于不使用 baseline 的梯度估计方差。
