# Actor-Critic 与 PPO 实战教程

---

## 学习目标

通过本教程，你将学会：
- 理解策略梯度方法的核心思想
- 掌握 Actor-Critic 架构的原理
- 实现广义优势估计 (GAE)
- 实现 A2C (Advantage Actor-Critic) 算法
- 实现 PPO (Proximal Policy Optimization) 算法
- 在 CartPole 环境中训练和比较不同算法

## 前置知识

- 强化学习基础（MDP、价值函数）
- PyTorch 神经网络
- 概率分布与采样

## 预计时间

60-90 分钟

---

## 第1部分：理论背景

### 1.1 策略梯度定理

**核心思想**：直接参数化策略 $\pi_\theta(a|s)$，通过梯度上升最大化期望回报。

**策略梯度定理**：

$$\nabla_\theta J(\theta) = \mathbb{E}_{\pi_\theta}\left[\nabla_\theta \log \pi_\theta(a|s) \cdot Q^{\pi_\theta}(s, a)\right]$$

**直觉理解**：
- 如果动作 $a$ 导致高回报，增加其概率
- 如果动作 $a$ 导致低回报，减少其概率

### 1.2 Actor-Critic 架构

结合策略方法和价值方法的优点：

```
┌─────────────────────────────────────────────────┐
│              Actor-Critic 架构                   │
├─────────────────────────────────────────────────┤
│                                                 │
│    状态 s                                       │
│       │                                         │
│       ├──────────────┬────────────────┐        │
│       ↓              ↓                │        │
│   ┌───────┐     ┌───────┐            │        │
│   │ Actor │     │ Critic│            │        │
│   │ π(a|s)│     │ V(s)  │            │        │
│   └───┬───┘     └───┬───┘            │        │
│       │             │                 │        │
│       ↓             ↓                 │        │
│    动作 a      优势估计 A              │        │
│       │             │                 │        │
│       ↓             ↓                 │        │
│    环境交互 ←── 策略梯度更新 ←─────────┘        │
│                                                 │
└─────────────────────────────────────────────────┘
```

- **Actor (演员)**：策略网络，输出动作概率分布
- **Critic (评论家)**：价值网络，评估状态价值

### 1.3 优势函数与 GAE

**优势函数** $A(s, a)$：动作 $a$ 相比平均水平的优势

$$A(s, a) = Q(s, a) - V(s) \approx r + \gamma V(s') - V(s)$$

**广义优势估计 (GAE)**：平衡偏差与方差

$$\hat{A}_t = \sum_{l=0}^{\infty} (\gamma\lambda)^l \delta_{t+l}$$

其中 $\delta_t = r_t + \gamma V(s_{t+1}) - V(s_t)$ 是 TD 误差。

- $\lambda = 0$：单步 TD 估计（低方差，高偏差）
- $\lambda = 1$：蒙特卡洛估计（高方差，低偏差）

### 1.4 PPO 核心思想

**问题**：策略梯度更新步长难以控制
- 步长太大：策略崩溃
- 步长太小：学习缓慢

**PPO-Clip 解决方案**：限制策略更新幅度

$$r_t(\theta) = \frac{\pi_\theta(a_t|s_t)}{\pi_{\theta_{old}}(a_t|s_t)}$$

$$L^{CLIP}(\theta) = \mathbb{E}_t\left[\min\left(r_t(\theta)\hat{A}_t, \text{clip}(r_t(\theta), 1-\epsilon, 1+\epsilon)\hat{A}_t\right)\right]$$

**直觉**：当策略比率超出 $[1-\epsilon, 1+\epsilon]$ 范围时，梯度被截断。

---

## 第2部分：环境准备

In [None]:
# ============================================================
# 导入必要的库
# ============================================================

import numpy as np
import random
from typing import Tuple, List, Dict, Optional, NamedTuple
from dataclasses import dataclass

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Categorical

import matplotlib.pyplot as plt

# Gymnasium
try:
    import gymnasium as gym
    HAS_GYM = True
except ImportError:
    HAS_GYM = False
    print("请安装 gymnasium: pip install gymnasium")

# ============================================================
# 配置
# ============================================================

RANDOM_SEED = 42
random.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)

plt.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['figure.figsize'] = (10, 6)

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"使用设备: {DEVICE}")
print("环境准备完成")

---

## 第3部分：核心组件实现

### 3.1 轨迹缓冲区

In [None]:
# ============================================================
# 轨迹缓冲区（用于 on-policy 算法）
# ============================================================

class RolloutBuffer:
    """
    轨迹缓冲区
    
    与 off-policy 的经验回放不同，on-policy 方法需要完整轨迹，
    数据使用后即丢弃。
    """
    
    def __init__(self, gamma: float = 0.99, gae_lambda: float = 0.95):
        self.gamma = gamma
        self.gae_lambda = gae_lambda
        self.reset()
    
    def reset(self):
        """清空缓冲区"""
        self.states = []
        self.actions = []
        self.log_probs = []
        self.rewards = []
        self.values = []
        self.dones = []
    
    def add(self, state, action, log_prob, reward, value, done):
        """添加单步数据"""
        self.states.append(state)
        self.actions.append(action)
        self.log_probs.append(log_prob)
        self.rewards.append(reward)
        self.values.append(value)
        self.dones.append(done)
    
    def compute_gae(self, last_value: float) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        计算广义优势估计 (GAE)
        
        GAE 公式:
        δ_t = r_t + γV(s_{t+1}) - V(s_t)
        Â_t = Σ_{l=0}^{∞} (γλ)^l δ_{t+l}
        
        Args:
            last_value: 最后状态的价值估计（用于 bootstrap）
        
        Returns:
            (returns, advantages): 回报和优势估计
        """
        rewards = np.array(self.rewards)
        values = np.array(self.values)
        dones = np.array(self.dones)
        n_steps = len(rewards)
        
        # 添加最后价值
        values = np.append(values, last_value)
        
        # 计算 GAE
        advantages = np.zeros(n_steps, dtype=np.float32)
        gae = 0.0
        
        for t in reversed(range(n_steps)):
            # TD 误差
            delta = rewards[t] + self.gamma * values[t + 1] * (1 - dones[t]) - values[t]
            # GAE 递推
            gae = delta + self.gamma * self.gae_lambda * (1 - dones[t]) * gae
            advantages[t] = gae
        
        # 回报 = 优势 + 价值
        returns = advantages + values[:-1]
        
        return torch.FloatTensor(returns), torch.FloatTensor(advantages)
    
    def __len__(self):
        return len(self.states)


# 测试 GAE 计算
buffer = RolloutBuffer(gamma=0.99, gae_lambda=0.95)
for i in range(10):
    buffer.add(
        state=np.random.randn(4),
        action=0,
        log_prob=-0.5,
        reward=1.0,
        value=0.5,
        done=(i == 9)
    )

returns, advantages = buffer.compute_gae(last_value=0.0)
print(f"缓冲区大小: {len(buffer)}")
print(f"回报形状: {returns.shape}")
print(f"优势形状: {advantages.shape}")
print(f"回报示例: {returns[:5].numpy()}")
print("轨迹缓冲区测试通过")

### 3.2 Actor-Critic 网络

In [None]:
# ============================================================
# Actor-Critic 共享参数网络
# ============================================================

class ActorCriticNetwork(nn.Module):
    """
    Actor-Critic 网络
    
    结构:
        State -> 共享层 -> Actor 头 -> π(a|s)
                       -> Critic 头 -> V(s)
    
    共享特征层可以让策略和价值函数共享状态表示。
    """
    
    def __init__(self, state_dim: int, action_dim: int, hidden_dim: int = 256):
        super().__init__()
        
        # 共享特征层
        self.shared = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.Tanh()
        )
        
        # Actor 头（策略）
        self.actor = nn.Linear(hidden_dim, action_dim)
        
        # Critic 头（价值）
        self.critic = nn.Linear(hidden_dim, 1)
        
        # 初始化
        self._init_weights()
    
    def _init_weights(self):
        """正交初始化"""
        for module in self.modules():
            if isinstance(module, nn.Linear):
                nn.init.orthogonal_(module.weight, gain=np.sqrt(2))
                nn.init.zeros_(module.bias)
        
        # Actor 输出层用较小增益
        nn.init.orthogonal_(self.actor.weight, gain=0.01)
    
    def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
        """返回动作 logits 和状态价值"""
        features = self.shared(x)
        action_logits = self.actor(features)
        value = self.critic(features)
        return action_logits, value
    
    def get_action_and_value(
        self, 
        state: torch.Tensor, 
        action: Optional[torch.Tensor] = None
    ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:
        """
        获取动作、对数概率、熵和价值
        
        Args:
            state: 状态张量
            action: 可选的指定动作（用于计算旧动作的新对数概率）
        
        Returns:
            (action, log_prob, entropy, value)
        """
        logits, value = self(state)
        probs = F.softmax(logits, dim=-1)
        dist = Categorical(probs)
        
        if action is None:
            action = dist.sample()
        
        log_prob = dist.log_prob(action)
        entropy = dist.entropy()
        
        return action, log_prob, entropy, value.squeeze(-1)


# 测试
net = ActorCriticNetwork(state_dim=4, action_dim=2).to(DEVICE)
x = torch.randn(32, 4).to(DEVICE)

action, log_prob, entropy, value = net.get_action_and_value(x)
print(f"动作形状: {action.shape}")
print(f"对数概率形状: {log_prob.shape}")
print(f"熵形状: {entropy.shape}")
print(f"价值形状: {value.shape}")
print(f"网络参数量: {sum(p.numel() for p in net.parameters()):,}")
print("Actor-Critic 网络测试通过")

---

## 第4部分：A2C 智能体实现

In [None]:
# ============================================================
# A2C (Advantage Actor-Critic) 智能体
# ============================================================

class A2CAgent:
    """
    A2C 智能体
    
    A2C 是 A3C 的同步版本，核心思想：
    1. 使用优势函数减少方差
    2. 熵正则化鼓励探索
    3. N-step 回报平衡偏差和方差
    
    损失函数:
    L = L_policy + c1 * L_value - c2 * H[π]
    """
    
    def __init__(
        self,
        state_dim: int,
        action_dim: int,
        hidden_dim: int = 256,
        lr: float = 7e-4,
        gamma: float = 0.99,
        gae_lambda: float = 0.95,
        value_coef: float = 0.5,
        entropy_coef: float = 0.01,
        max_grad_norm: float = 0.5,
        device: str = 'auto'
    ):
        self.gamma = gamma
        self.value_coef = value_coef
        self.entropy_coef = entropy_coef
        self.max_grad_norm = max_grad_norm
        
        if device == 'auto':
            self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        else:
            self.device = torch.device(device)
        
        # 网络
        self.network = ActorCriticNetwork(state_dim, action_dim, hidden_dim).to(self.device)
        self.optimizer = optim.Adam(self.network.parameters(), lr=lr)
        
        # 缓冲区
        self.buffer = RolloutBuffer(gamma=gamma, gae_lambda=gae_lambda)
    
    def get_action(self, state: np.ndarray) -> Tuple[int, float, float]:
        """选择动作"""
        state_t = torch.FloatTensor(state).unsqueeze(0).to(self.device)
        
        with torch.no_grad():
            action, log_prob, _, value = self.network.get_action_and_value(state_t)
        
        return action.item(), log_prob.item(), value.item()
    
    def store(self, state, action, log_prob, reward, value, done):
        """存储转换"""
        self.buffer.add(state, action, log_prob, reward, value, done)
    
    def update(self, last_value: float) -> Dict[str, float]:
        """执行一次策略更新"""
        # 获取数据
        returns, advantages = self.buffer.compute_gae(last_value)
        
        states = torch.FloatTensor(np.array(self.buffer.states)).to(self.device)
        actions = torch.LongTensor(self.buffer.actions).to(self.device)
        old_log_probs = torch.FloatTensor(self.buffer.log_probs).to(self.device)
        returns = returns.to(self.device)
        advantages = advantages.to(self.device)
        
        # 标准化优势
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
        
        # 计算新的策略分布
        _, new_log_probs, entropy, values = self.network.get_action_and_value(states, actions)
        
        # 策略损失
        policy_loss = -(new_log_probs * advantages.detach()).mean()
        
        # 价值损失
        value_loss = F.mse_loss(values, returns)
        
        # 熵损失（负号因为要最大化熵）
        entropy_loss = -entropy.mean()
        
        # 总损失
        total_loss = policy_loss + self.value_coef * value_loss + self.entropy_coef * entropy_loss
        
        # 优化
        self.optimizer.zero_grad()
        total_loss.backward()
        nn.utils.clip_grad_norm_(self.network.parameters(), self.max_grad_norm)
        self.optimizer.step()
        
        # 清空缓冲区
        self.buffer.reset()
        
        return {
            'policy_loss': policy_loss.item(),
            'value_loss': value_loss.item(),
            'entropy': -entropy_loss.item()
        }


print("A2C 智能体定义完成")

---

## 第5部分：PPO 智能体实现

In [None]:
# ============================================================
# PPO (Proximal Policy Optimization) 智能体
# ============================================================

class PPOAgent:
    """
    PPO 智能体
    
    核心特点:
    1. PPO-Clip: 限制策略更新幅度
    2. 多轮 epoch: 每批数据可以训练多次
    3. Mini-batch: 将大批次分成小批次
    
    PPO-Clip 目标:
    L^{CLIP} = E[min(r_t(θ)Â_t, clip(r_t(θ), 1-ε, 1+ε)Â_t)]
    """
    
    def __init__(
        self,
        state_dim: int,
        action_dim: int,
        hidden_dim: int = 256,
        lr: float = 3e-4,
        gamma: float = 0.99,
        gae_lambda: float = 0.95,
        clip_epsilon: float = 0.2,
        value_coef: float = 0.5,
        entropy_coef: float = 0.01,
        max_grad_norm: float = 0.5,
        n_epochs: int = 10,
        mini_batch_size: int = 64,
        device: str = 'auto'
    ):
        self.gamma = gamma
        self.clip_epsilon = clip_epsilon
        self.value_coef = value_coef
        self.entropy_coef = entropy_coef
        self.max_grad_norm = max_grad_norm
        self.n_epochs = n_epochs
        self.mini_batch_size = mini_batch_size
        
        if device == 'auto':
            self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        else:
            self.device = torch.device(device)
        
        # 网络
        self.network = ActorCriticNetwork(state_dim, action_dim, hidden_dim).to(self.device)
        self.optimizer = optim.Adam(self.network.parameters(), lr=lr, eps=1e-5)
        
        # 缓冲区
        self.buffer = RolloutBuffer(gamma=gamma, gae_lambda=gae_lambda)
    
    def get_action(self, state: np.ndarray) -> Tuple[int, float, float]:
        """选择动作"""
        state_t = torch.FloatTensor(state).unsqueeze(0).to(self.device)
        
        with torch.no_grad():
            action, log_prob, _, value = self.network.get_action_and_value(state_t)
        
        return action.item(), log_prob.item(), value.item()
    
    def store(self, state, action, log_prob, reward, value, done):
        """存储转换"""
        self.buffer.add(state, action, log_prob, reward, value, done)
    
    def update(self, last_value: float) -> Dict[str, float]:
        """PPO 更新"""
        # 获取数据
        returns, advantages = self.buffer.compute_gae(last_value)
        
        states = torch.FloatTensor(np.array(self.buffer.states)).to(self.device)
        actions = torch.LongTensor(self.buffer.actions).to(self.device)
        old_log_probs = torch.FloatTensor(self.buffer.log_probs).to(self.device)
        returns = returns.to(self.device)
        advantages = advantages.to(self.device)
        
        # 标准化优势
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
        
        batch_size = len(states)
        total_policy_loss = 0
        total_value_loss = 0
        total_entropy = 0
        update_count = 0
        
        # 多轮 epoch
        for _ in range(self.n_epochs):
            # 随机打乱
            indices = np.random.permutation(batch_size)
            
            # Mini-batch 更新
            for start in range(0, batch_size, self.mini_batch_size):
                end = start + self.mini_batch_size
                mb_idx = indices[start:end]
                
                mb_states = states[mb_idx]
                mb_actions = actions[mb_idx]
                mb_old_log_probs = old_log_probs[mb_idx]
                mb_advantages = advantages[mb_idx]
                mb_returns = returns[mb_idx]
                
                # 计算新策略
                _, new_log_probs, entropy, values = self.network.get_action_and_value(
                    mb_states, mb_actions
                )
                
                # 计算比率
                log_ratio = new_log_probs - mb_old_log_probs
                ratio = torch.exp(log_ratio)
                
                # PPO-Clip 目标
                surr1 = ratio * mb_advantages
                surr2 = torch.clamp(ratio, 1 - self.clip_epsilon, 1 + self.clip_epsilon) * mb_advantages
                policy_loss = -torch.min(surr1, surr2).mean()
                
                # 价值损失
                value_loss = F.mse_loss(values, mb_returns)
                
                # 熵损失
                entropy_loss = -entropy.mean()
                
                # 总损失
                loss = policy_loss + self.value_coef * value_loss + self.entropy_coef * entropy_loss
                
                # 优化
                self.optimizer.zero_grad()
                loss.backward()
                nn.utils.clip_grad_norm_(self.network.parameters(), self.max_grad_norm)
                self.optimizer.step()
                
                total_policy_loss += policy_loss.item()
                total_value_loss += value_loss.item()
                total_entropy += entropy.mean().item()
                update_count += 1
        
        # 清空缓冲区
        self.buffer.reset()
        
        return {
            'policy_loss': total_policy_loss / update_count,
            'value_loss': total_value_loss / update_count,
            'entropy': total_entropy / update_count
        }


print("PPO 智能体定义完成")

---

## 第6部分：训练与评估

In [None]:
# ============================================================
# 训练函数
# ============================================================

def train_policy_gradient(
    agent,
    env_name: str = 'CartPole-v1',
    num_episodes: int = 200,
    n_steps: int = 128,
    seed: int = 42,
    algo_name: str = 'Agent',
    verbose: bool = True
) -> List[float]:
    """
    训练策略梯度智能体
    
    Args:
        agent: A2C 或 PPO 智能体
        env_name: 环境名称
        num_episodes: 训练回合数（仅用于记录）
        n_steps: 每次更新的步数
        seed: 随机种子
        algo_name: 算法名称
        verbose: 是否打印进度
    
    Returns:
        奖励历史
    """
    if not HAS_GYM:
        print("需要安装 gymnasium")
        return []
    
    env = gym.make(env_name)
    
    if verbose:
        print(f"\n{'='*50}")
        print(f"训练 {algo_name} on {env_name}")
        print(f"{'='*50}")
    
    rewards_history = []
    episode_rewards = []
    best_avg = float('-inf')
    
    state, _ = env.reset(seed=seed)
    episode_reward = 0
    total_steps = 0
    max_steps = num_episodes * 500  # 估计最大步数
    
    while total_steps < max_steps:
        # 收集 n_steps 步
        for _ in range(n_steps):
            action, log_prob, value = agent.get_action(state)
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            
            agent.store(state, action, log_prob, reward, value, done)
            
            state = next_state
            episode_reward += reward
            total_steps += 1
            
            if done:
                episode_rewards.append(episode_reward)
                rewards_history.append(episode_reward)
                episode_reward = 0
                state, _ = env.reset()
            
            if total_steps >= max_steps:
                break
        
        # 计算最后状态价值
        _, _, last_value = agent.get_action(state)
        
        # 更新
        agent.update(last_value)
        
        # 打印进度
        if verbose and len(episode_rewards) >= 20 and len(episode_rewards) % 20 == 0:
            avg = np.mean(episode_rewards[-20:])
            best_avg = max(best_avg, avg)
            print(f"回合 {len(episode_rewards):4d} | 平均: {avg:7.2f} | 最佳: {best_avg:7.2f}")
    
    env.close()
    return rewards_history

In [None]:
# ============================================================
# 训练 A2C
# ============================================================

if HAS_GYM:
    a2c_agent = A2CAgent(
        state_dim=4,
        action_dim=2,
        lr=7e-4,
        gamma=0.99
    )
    
    rewards_a2c = train_policy_gradient(
        a2c_agent,
        num_episodes=150,
        n_steps=5,
        algo_name='A2C'
    )

In [None]:
# ============================================================
# 训练 PPO
# ============================================================

if HAS_GYM:
    ppo_agent = PPOAgent(
        state_dim=4,
        action_dim=2,
        lr=3e-4,
        gamma=0.99,
        clip_epsilon=0.2,
        n_epochs=10,
        mini_batch_size=64
    )
    
    rewards_ppo = train_policy_gradient(
        ppo_agent,
        num_episodes=150,
        n_steps=128,
        algo_name='PPO'
    )

---

## 第7部分：结果可视化

In [None]:
# ============================================================
# 绘制学习曲线对比
# ============================================================

def plot_comparison(results: dict, window: int = 20):
    """绘制算法对比图"""
    plt.figure(figsize=(12, 5))
    
    colors = ['#1f77b4', '#ff7f0e']
    
    for idx, (name, rewards) in enumerate(results.items()):
        if len(rewards) >= window:
            smoothed = np.convolve(rewards, np.ones(window)/window, mode='valid')
            plt.plot(smoothed, label=name, color=colors[idx], linewidth=2)
    
    plt.xlabel('Episode', fontsize=12)
    plt.ylabel('Total Reward', fontsize=12)
    plt.title('A2C vs PPO 学习曲线对比', fontsize=14)
    plt.legend(fontsize=10)
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()

if HAS_GYM and rewards_a2c and rewards_ppo:
    plot_comparison({
        'A2C': rewards_a2c,
        'PPO': rewards_ppo
    })

---

## 第8部分：交互式实验

### 实验1：PPO 裁剪系数的影响

In [None]:
# ============================================================
# 可视化 PPO 裁剪函数
# ============================================================

def visualize_ppo_clip(epsilon: float = 0.2):
    """可视化 PPO 裁剪机制"""
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))
    
    ratios = np.linspace(0.5, 1.5, 100)
    
    # 正优势情况
    ax = axes[0]
    advantage = 1.0
    unclipped = ratios * advantage
    clipped = np.clip(ratios, 1-epsilon, 1+epsilon) * advantage
    objective = np.minimum(unclipped, clipped)
    
    ax.plot(ratios, unclipped, 'b--', label='未裁剪', alpha=0.7)
    ax.plot(ratios, clipped, 'r--', label='裁剪后', alpha=0.7)
    ax.plot(ratios, objective, 'g-', linewidth=2, label='PPO 目标')
    ax.axvline(1-epsilon, color='gray', linestyle=':', alpha=0.5)
    ax.axvline(1+epsilon, color='gray', linestyle=':', alpha=0.5)
    ax.set_xlabel('策略比率 r(θ)')
    ax.set_ylabel('目标值')
    ax.set_title(f'正优势 (A > 0), ε = {epsilon}')
    ax.legend()
    ax.grid(True, alpha=0.3)
    
    # 负优势情况
    ax = axes[1]
    advantage = -1.0
    unclipped = ratios * advantage
    clipped = np.clip(ratios, 1-epsilon, 1+epsilon) * advantage
    objective = np.minimum(unclipped, clipped)
    
    ax.plot(ratios, unclipped, 'b--', label='未裁剪', alpha=0.7)
    ax.plot(ratios, clipped, 'r--', label='裁剪后', alpha=0.7)
    ax.plot(ratios, objective, 'g-', linewidth=2, label='PPO 目标')
    ax.axvline(1-epsilon, color='gray', linestyle=':', alpha=0.5)
    ax.axvline(1+epsilon, color='gray', linestyle=':', alpha=0.5)
    ax.set_xlabel('策略比率 r(θ)')
    ax.set_ylabel('目标值')
    ax.set_title(f'负优势 (A < 0), ε = {epsilon}')
    ax.legend()
    ax.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()

visualize_ppo_clip(epsilon=0.2)

print("\nPPO 裁剪机制解读:")
print("- 正优势时: 阻止策略比率过大 (防止过度增加动作概率)")
print("- 负优势时: 阻止策略比率过小 (防止过度减少动作概率)")
print("- 效果: 限制策略每步更新幅度，保证稳定性")

### 实验2：GAE λ 参数的影响

In [None]:
# ============================================================
# GAE λ 参数说明
# ============================================================

print("GAE (广义优势估计) λ 参数的影响:")
print("="*50)
print()
print("λ = 0 (单步 TD):")
print("  Â_t = δ_t = r_t + γV(s_{t+1}) - V(s_t)")
print("  特点: 低方差，高偏差（依赖价值函数准确性）")
print()
print("λ = 1 (蒙特卡洛):")
print("  Â_t = Σ γ^l δ_{t+l} = R_t - V(s_t)")
print("  特点: 无偏，高方差")
print()
print("λ = 0.95 (推荐):")
print("  在偏差和方差之间取得平衡")
print("  大多数任务效果良好")

---

## 总结

### 关键要点

1. **策略梯度**：
   - 直接优化策略参数
   - 可以处理连续动作空间
   - 可以学习随机策略

2. **Actor-Critic**：
   - Actor: 策略网络，决定动作
   - Critic: 价值网络，评估状态
   - 优势函数减少方差

3. **GAE**：
   - 平衡偏差与方差
   - λ=0.95 是常用设置

4. **PPO**：
   - 裁剪机制保证稳定更新
   - 允许多次使用同批数据
   - 实现简单，效果好

### 算法对比

| 算法 | 类型 | 样本效率 | 稳定性 | 实现复杂度 |
|------|------|----------|--------|------------|
| A2C | On-Policy | 低 | 中 | 简单 |
| PPO | On-Policy | 中 | 高 | 中等 |
| SAC | Off-Policy | 高 | 高 | 复杂 |

### 调参建议

| 参数 | A2C 推荐 | PPO 推荐 |
|------|----------|----------|
| 学习率 | 7e-4 | 3e-4 |
| γ | 0.99 | 0.99 |
| GAE λ | 0.95 | 0.95 |
| Clip ε | - | 0.2 |
| 熵系数 | 0.01 | 0.01 |
| N-steps | 5 | 2048 |
| Epochs | 1 | 10 |

---

## 单元测试

In [None]:
# ============================================================
# 单元测试
# ============================================================

def run_tests():
    """运行单元测试"""
    print("开始单元测试...\n")
    passed = 0
    failed = 0
    
    # 测试1: RolloutBuffer
    try:
        buf = RolloutBuffer(gamma=0.99, gae_lambda=0.95)
        for i in range(10):
            buf.add(np.random.randn(4), 0, -0.5, 1.0, 0.5, i == 9)
        returns, advantages = buf.compute_gae(0.0)
        assert returns.shape == (10,)
        assert advantages.shape == (10,)
        print("测试1通过: RolloutBuffer")
        passed += 1
    except Exception as e:
        print(f"测试1失败: {e}")
        failed += 1
    
    # 测试2: ActorCriticNetwork
    try:
        net = ActorCriticNetwork(4, 2, 64)
        x = torch.randn(32, 4)
        action, log_prob, entropy, value = net.get_action_and_value(x)
        assert action.shape == (32,)
        assert log_prob.shape == (32,)
        assert value.shape == (32,)
        print("测试2通过: ActorCriticNetwork")
        passed += 1
    except Exception as e:
        print(f"测试2失败: {e}")
        failed += 1
    
    # 测试3: A2CAgent
    try:
        agent = A2CAgent(4, 2, device='cpu')
        state = np.random.randn(4).astype(np.float32)
        action, log_prob, value = agent.get_action(state)
        assert 0 <= action < 2
        
        for i in range(10):
            agent.store(state, 0, -0.5, 1.0, 0.5, i == 9)
        loss_info = agent.update(0.0)
        assert 'policy_loss' in loss_info
        print("测试3通过: A2CAgent")
        passed += 1
    except Exception as e:
        print(f"测试3失败: {e}")
        failed += 1
    
    # 测试4: PPOAgent
    try:
        agent = PPOAgent(4, 2, mini_batch_size=32, device='cpu')
        state = np.random.randn(4).astype(np.float32)
        action, log_prob, value = agent.get_action(state)
        assert 0 <= action < 2
        
        for i in range(64):
            agent.store(state, 0, -0.5, 1.0, 0.5, i == 63)
        loss_info = agent.update(0.0)
        assert 'policy_loss' in loss_info
        print("测试4通过: PPOAgent")
        passed += 1
    except Exception as e:
        print(f"测试4失败: {e}")
        failed += 1
    
    # 测试5: GAE 计算正确性
    try:
        buf = RolloutBuffer(gamma=0.99, gae_lambda=0.95)
        # 简单测试: 3步轨迹
        for i in range(3):
            buf.add(np.array([1.0]), 0, 0.0, 1.0, 0.5, i == 2)
        returns, advantages = buf.compute_gae(0.0)
        
        # 手动验证
        gamma, lam = 0.99, 0.95
        v = [0.5, 0.5, 0.5, 0.0]
        r = [1.0, 1.0, 1.0]
        d = [0, 0, 1]
        
        expected = np.zeros(3)
        gae = 0.0
        for t in reversed(range(3)):
            delta = r[t] + gamma * v[t+1] * (1 - d[t]) - v[t]
            gae = delta + gamma * lam * (1 - d[t]) * gae
            expected[t] = gae
        
        assert np.allclose(advantages.numpy(), expected, atol=1e-5)
        print("测试5通过: GAE 计算正确")
        passed += 1
    except Exception as e:
        print(f"测试5失败: {e}")
        failed += 1
    
    print(f"\n{'='*40}")
    print(f"测试完成: {passed} 通过, {failed} 失败")
    if failed == 0:
        print("所有测试通过！")
    print(f"{'='*40}")

run_tests()

---

## 参考资料

1. Mnih et al., "Asynchronous Methods for Deep Reinforcement Learning", 2016 (A3C/A2C)
2. Schulman et al., "Proximal Policy Optimization Algorithms", 2017
3. Schulman et al., "High-Dimensional Continuous Control Using Generalized Advantage Estimation", 2015
4. Sutton & Barto, "Reinforcement Learning: An Introduction", Chapter 13

---

[返回上级](../README.md)