In [19]:
import torch
import torch.nn.functional as F
from torch.distributions import Categorical

```
class PPO:
    """Proximal Policy Optimization (PPO) Algorithm."""
    
    def __init__(self, agent, lr=3e-4, gamma=0.99, clip_epsilon=0.2):
        """
        Initialize the PPO algorithm.

        :param agent: The reinforcement learning agent model (e.g., neural network)
        :param lr: Learning rate for the optimizer
        :param gamma: Discount factor for future rewards
        :param clip_epsilon: Clipping parameter to limit policy updates in PPO
        """
        self.agent = agent
        self.optimizer = torch.optim.Adam(agent.parameters(), lr=lr)
        self.gamma = gamma
        self.clip_epsilon = clip_epsilon

    def update(self, states, actions, rewards, dones, old_log_probs, values):
        """Update PPO agent based on collected experiences."""
        returns, advantages = self.compute_gae(rewards, dones, values)
        new_policy, new_values = self.agent(states)
        new_log_probs = self.compute_log_probs(new_policy, actions)

        ratio = torch.exp(new_log_probs - old_log_probs)
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)  # Normalize advantages
        
        surr1 = ratio * advantages
        surr2 = torch.clamp(ratio, 1.0 - self.clip_epsilon, 1.0 + self.clip_epsilon) * advantages
        policy_loss = -torch.min(surr1, surr2).mean()

        # Value function loss clipped by a prediction error (optional, not in original message but common practice)
        value_clipped = values + torch.clamp(new_values - values, -self.clip_epsilon, self.clip_epsilon)
        value_losses_clipped = (new_values - returns).pow(2)
        value_losses_original = (value_clipped - returns).pow(2)
        value_loss = torch.max(value_losses_original, value_losses_clipped).mean()

        entropy = Categorical(new_policy).entropy().mean()  # Encourage exploration

        # Total loss with entropy regularization
        total_loss = policy_loss + 0.5 * value_loss - 0.01 * entropy  # Assuming a coefficient for entropy of 0.01
        
        self.optimizer.zero_grad()
        total_loss.backward()
        self.optimizer.step()
        # Optionally return training statistics
        return {
            'policy_loss': policy_loss.item(),
            'value_loss': value_loss.item(),
            'entropy': entropy.item(),
            'loss': total_loss.item(),
            'avg_ratio': ratio.mean().item()
            }

    def compute_gae(self, rewards, dones, values):
        """Compute Generalized Advantage Estimation (GAE) for each step."""
        advantages = torch.zeros_like(rewards)
        gae_lambda = 0.95  # GAE's lambda parameter
        next_advantage = 0

        for t in reversed(range(len(rewards))):
            delta = rewards[t] + self.gamma * values[t + 1] * (1 - dones[t]) - values[t]
            next_advantage = delta + self.gamma * gae_lambda * next_advantage * (1 - dones[t])
            advantages[t] = next_advantage
        
        returns = advantages + values
        return returns, advantages

    def compute_log_probs(self, policy_dist, actions):
        """
        计算所选动作的对数概率。
        确保在传递给Categorical分布前进行规范化。
        """
        # 指数化以获得未规范化的概率
        unnormalized_probs = torch.exp(policy_dist)
    
        # 规范化以满足simplex约束
        probs = unnormalized_probs / unnormalized_probs.sum(dim=-1, keepdim=True)
    s
        # 确保没有NaN或无穷大等数值问题
        probs[torch.isnan(probs)] = 0.0  # 将NaN置为零处理
        probs[probs == float('inf')] = 0.0  # 类似地处理无穷大
    
        # 现在创建Categorical分布并计算对数概率
        dist = Categorical(probs=probs)
        return dist.log_prob(actions)

```
import torch
import numpy as np

# 假设的Agent模型简化为一个简单的神经网络结构
class TestAgent(torch.nn.Module):
    def __init__(self):
        super(TestAgent, self).__init__()
        self.fc = torch.nn.Linear(10, 2)  # 简单的全连接层，输出动作概率分布和值函数

    def forward(self, x):
        x = torch.relu(self.fc(x))
        policy_logits = x[:, :1]  # 动作概率分布的对数
        values = x[:, 1:]  # 状态值函数
        return policy_logits, values.squeeze()

# 实例化PPO类和测试Agent
ppo_agent = PPO(TestAgent(), lr=1e-3, gamma=0.99, clip_epsilon=0.2)

# 生成模拟数据
num_samples = 32  # 假设的样本数量
state_dim = 10  # 状态空间维度
action_dim = 1  # 动作空间维度，简化为二元动作问题

states = torch.randn(num_samples, state_dim)  # 随机状态数据
actions = torch.randint(0, 2, (num_samples, action_dim)).float()  # 随机动作数据
rewards = torch.randn(num_samples, 1)  # 随机奖励数据
dones = torch.zeros(num_samples, 1, dtype=torch.bool)  # 完成标志，假设所有序列未完成
# 对于最后一条序列设置done为True，模拟序列结束
dones[-1] = True
values = ppo_agent.agent(states)[1]  # 初始值函数预测
old_log_probs = ppo_agent.compute_log_probs(ppo_agent.agent(states)[0], actions)

# 调用update方法并打印训练统计信息
training_stats = ppo_agent.update(states, actions, rewards, dones, old_log_probs, values)
print("Training Statistics:")
for key, value in training_stats.items():
    print(f"{key}: {value.item()}")

# 注意：此处的compute_gae和compute_log_probs方法需要在PPO类中正确实现，以匹配测试数据结构。

In [20]:
# 定义PPO类，实现近端策略优化算法

class PPO:
    """
    近端策略优化（Proximal Policy Optimization，PPO）算法。

    一种强化学习算法，通过信任区域方法进行策略更新，平衡探索与利用。
    """

    def __init__(self, agent, lr=3e-4, gamma=0.99, clip_epsilon=0.2):
        """
        初始化PPO算法。

        参数:
        - agent: 强化学习智能体模型，通常为神经网络
        - lr: 优化器的学习率
        - gamma: 未来奖励的折现因子
        - clip_epsilon: PPO中用于限制策略更新的裁剪参数
        """
        self.agent = agent  # 设置智能体模型
        self.optimizer = torch.optim.Adam(agent.parameters(), lr=lr)  # 使用Adam优化器调整模型参数
        self.gamma = gamma  # 折现因子，用于计算未来奖励的当前价值
        self.clip_epsilon = clip_epsilon  # 策略更新时的裁剪范围界限

    def update(self, states, actions, rewards, dones, old_log_probs, values):
        """
        根据收集的经验数据更新PPO智能体。

        步骤包括计算广义优势估计（GAE）、新旧策略的比率、策略损失、价值损失以及加入熵正则化项的总损失，
        并执行反向传播和参数更新。
        """
        # 计算广义优势估计（GAE）和回报
        returns, advantages = self.compute_gae(rewards, dones, values)

        # 从当前策略中获取新的动作分布和状态价值
        new_policy, new_values = self.agent(states)

        # 计算新策略下动作的对数概率
        new_log_probs = self.compute_log_probs(new_policy, actions)

        # 计算重要性采样比值（ratio）
        ratio = torch.exp(new_log_probs - old_log_probs)

        # 对优势进行标准化处理，以稳定训练
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

        # 计算损失函数的两部分：策略损失（通过裁剪）和价值函数损失（可选的裁剪形式）
        # 假设surr1和surr2的计算方式
        surr1 = ratio * advantages
        surr2 = torch.clamp(ratio, 1.0 - self.clip_epsilon, 1.0 + self.clip_epsilon) * advantages
        policy_loss = -torch.min(surr1, surr2).mean()

        # 价值函数损失计算，考虑了裁剪以减少更新波动
        # 假设returns的原始形状是[batch_size, sequence_length]
        returns = returns.sum(dim=1, keepdim=True)  # 累加sequence_length维度，添加keepdim保持二维结构
        # 假设new_values的原始形状是[batch_size, sequence_length]
        new_values_aggregated = new_values.sum(dim=1, keepdim=True)  # 累加sequence_length维度，保持二维结构
        value_loss = F.mse_loss(new_values_aggregated, returns)

        # 计算策略熵，鼓励探索
        new_policy_normalized = F.softmax(new_policy, dim=-1)
        # 现在使用归一化后的概率分布来计算熵
        entropy = Categorical(probs=new_policy_normalized).entropy().mean()

        # 总损失，结合策略损失、价值损失和熵项
        total_loss = policy_loss + 0.5 * value_loss - 0.01 * entropy

        # 执行优化步骤：梯度清零、反向传播、参数更新
        self.optimizer.zero_grad()
        total_loss.backward()
        self.optimizer.step()

        # 返回训练统计信息，供监控和调试
        return {
            '策略损失': policy_loss.item(),
            '价值损失': value_loss.item(),
            '熵': entropy.item(),
            '总损失': total_loss.item(),
            '平均比率': ratio.mean().item()
        }

    def compute_gae(self, rewards, dones, values):
        """
        计算每个时间步的广义优势估计（Generalized Advantage Estimation, GAE）。
        """
        advantages = torch.zeros_like(rewards)
        gae_lambda = 0.95  # GAE中的衰减因子λ
        last_gae_lam = 0

        # 修改循环，使其不包括最后一个元素，避免索引越界
        for t in reversed(range(len(rewards) - 1)):  # 减1以避免索引越界
            next_non_terminal = 1.0 - dones[t+1]
            delta = rewards[t] + self.gamma * values[t+1] * next_non_terminal - values[t]
            advantages[t] = delta + self.gamma * gae_lambda * next_non_terminal * last_gae_lam
            last_gae_lam = advantages[t]

        # 对于序列的第一个元素（t=0），我们需要特殊处理，这里直接设置为累计的优势
        # 注意，因为我们已经处理到了t=1（原序列的倒数第二个元素），所以不需要额外的循环迭代
        #advantages[0] = advantages[0]  # 这一行实际上不需要操作，因为第一个元素的advantage已经在循环中计算好

        returns = advantages + values
        return returns, advantages

    def compute_log_probs(self, policy_dist, actions):
        """
        计算在给定动作分布下选择的动作的对数概率。

        确保在传递给Categorical分布之前进行规范化。
        """
        # 计算未规范化概率
        unnormalized_probs = torch.exp(policy_dist)

        # 规范化概率以满足概率分布的约束
        probs = unnormalized_probs / unnormalized_probs.sum(dim=-1, keepdim=True)

        # 处理可能出现的NaN或无穷大值
        # 将它们设为0以避免数值问题
        probs[torch.isnan(probs)] = 0.0
        probs[probs == float('inf')] = 0.0

        # 创建Categorical分布对象并计算对数概率
        dist = Categorical(probs=probs)
        log_probs = dist.log_prob(actions)  # 计算对数概率

        return log_probs
       

In [21]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Categorical

# 构建一个简单的智能体模型作为示例
class SimpleActorCritic(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(SimpleActorCritic, self).__init__()
        self.shared_layers = nn.Linear(state_dim, 128)
        self.policy_head = nn.Linear(128, action_dim)
        self.value_head = nn.Linear(128, 1)

    def forward(self, x):
        shared_out = torch.relu(self.shared_layers(x))
        policy_logits = self.policy_head(shared_out)
        state_values = self.value_head(shared_out)
        return policy_logits, state_values.unsqueeze(-1)

# 生成模拟数据
def generate_fake_data(agent, num_steps, state_dim, action_dim):
    states = torch.randn(num_steps, state_dim)
    actions = Categorical(logits=agent(states)[0]).sample()
    rewards = torch.randn(num_steps, 1)
    dones = torch.zeros(num_steps, 1, dtype=torch.float)
    dones[-1] = 1  # 最后一步设置为终止状态
    values = agent(states)[1]
    old_log_probs = Categorical(logits=agent(states)[0]).log_prob(actions)
    return states, actions, rewards, dones, old_log_probs, values

# 实例化PPO类和智能体模型
state_dim = 4
action_dim = 2
agent = SimpleActorCritic(state_dim, action_dim)
ppo_agent = PPO(agent)

# 生成模拟数据并进行一次更新
states, actions, rewards, dones, old_log_probs, values = generate_fake_data(agent, 100, state_dim, action_dim)
training_stats = ppo_agent.update(states, actions, rewards, dones, old_log_probs, values)

print("Training Statistics:", training_stats)

Training Statistics: {'策略损失': 4.0435789827597546e-08, '价值损失': 10999.3134765625, '熵': 0.6894959211349487, '总损失': 5499.64990234375, '平均比率': 1.0}
