In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import copy
import gym

In [2]:
import warnings
# Filter deprecation warnings that are not critical (specifically for np.bool8)
warnings.simplefilter("ignore", DeprecationWarning)

## 一、带基线的`REINFORCE`

In [3]:
class REINFORCEModel(nn.Module):
    def __init__(self):
        super(REINFORCEModel, self).__init__()
        self.fc1 = nn.Linear(4, 128)
        self.actor = nn.Linear(128, 2)  # 2个动作

    def forward(self, x):
        x = F.relu(self.fc1(x))
        action_probs = F.softmax(self.actor(x), dim=-1)
        return action_probs

In [4]:
class REINFORCETrainer:
    def __init__(self, env, model, gamma=0.99, lr=3e-3, max_steps=1000, update_target_interval=10):
        self.env = env  # gym环境
        self.model = model  # 策略网络
        self.target_model = copy.deepcopy(model)  # 创建目标网络的深拷贝
        self.target_model.load_state_dict(model.state_dict())  # 将参数复制到目标网络
        self.target_model.eval()
        self.gamma = gamma
        self.max_steps = max_steps
        self.optimizer = optim.Adam(model.parameters(), lr=lr)  # 优化器
        self.update_target_interval = update_target_interval  # 更新目标网络的间隔

    def train(self, num_episodes):
        for episode in range(num_episodes):
            state, _ = self.env.reset()
            saved_log_probs = []  # 保存log概率
            rewards = []  # 保存奖励
            for _ in range(self.max_steps):  # model和环境交互max_steps步
                state_tensor = torch.FloatTensor(state).unsqueeze(0)
                action_probs = self.model(state_tensor)
                m = torch.distributions.Categorical(action_probs)
                action = m.sample()
                saved_log_probs.append(m.log_prob(action))
                
                state, reward, done, _, _ = self.env.step(action.item())
                rewards.append(reward)

                if done:
                    break

            R = 0
            policy_loss = []
            returns = []
            for r in reversed(rewards):  # 计算每一步的回报
                R = r + self.gamma * R
                returns.insert(0, R)
            
            returns = torch.tensor(returns)  # 计算回报的均值和标准差
            returns = (returns - returns.mean()) / (returns.std() + 1e-5)  # 防止除0
            for log_prob, R in zip(saved_log_probs, returns):  # 计算损失
                policy_loss.append(-log_prob * R)

            self.optimizer.zero_grad()
            policy_loss = torch.cat(policy_loss).sum()  # 求和
            policy_loss.backward()  # 反向传播
            self.optimizer.step()

            if (episode + 1) % self.update_target_interval == 0:
                self.target_model.load_state_dict(self.model.state_dict())

            if (episode + 1) % 100 == 0:
                print(f'Episode {episode + 1}: Last total rewards: {sum(rewards)}')

    def evaluate(self, num_episodes=10):
        total_rewards = []
        for i in range(num_episodes):
            state, _ = self.env.reset()
            episode_reward = 0
            for _ in range(self.max_steps):
                state_tensor = torch.FloatTensor(state).unsqueeze(0)
                with torch.no_grad():
                    action_probs = self.model(state_tensor)
                    
                action = torch.argmax(action_probs).item()
                state, reward, done, _, _ = self.env.step(action)
                episode_reward += reward
                if done:
                    break
                
            total_rewards.append(episode_reward)
            print(f'Episode {i + 1}: Total Reward = {episode_reward}')
        return sum(total_rewards)/len(total_rewards)

In [5]:
env = gym.make('CartPole-v1')
model = REINFORCEModel()
trainer = REINFORCETrainer(env, model, update_target_interval=25)
trainer.train(num_episodes=800)

Episode 100: Last total rewards: 195.0
Episode 200: Last total rewards: 1000.0
Episode 300: Last total rewards: 1000.0
Episode 400: Last total rewards: 147.0
Episode 500: Last total rewards: 1000.0
Episode 600: Last total rewards: 1000.0
Episode 700: Last total rewards: 1000.0
Episode 800: Last total rewards: 1000.0


In [6]:
trainer.evaluate(num_episodes=10)

Episode 1: Total Reward = 1000.0
Episode 2: Total Reward = 1000.0
Episode 3: Total Reward = 1000.0
Episode 4: Total Reward = 1000.0
Episode 5: Total Reward = 1000.0
Episode 6: Total Reward = 1000.0
Episode 7: Total Reward = 1000.0
Episode 8: Total Reward = 1000.0
Episode 9: Total Reward = 1000.0
Episode 10: Total Reward = 1000.0


1000.0

## 二、`A2C`

要使用A2C（Advantage Actor-Critic）算法来训练控制CartPole游戏的智能体，我们首先需要建立一个Actor-Critic网络架构。A2C算法结合了策略梯度方法和值函数方法的优势，使用一个策略网络（Actor）来选择动作，同时使用一个价值网络（Critic）来评估采取某个动作后的状态价值。



以下是使用PyTorch实现A2C来控制CartPole游戏的完整步骤：


### 1. 构建Actor-Critic网络

我们将创建一个网络，它有两个输出头：一个用于Actor，输出动作概率；另一个用于Critic，输出状态值评估。

In [2]:
class ActorCritic(nn.Module):
    def __init__(self):
        super(ActorCritic, self).__init__()
        self.fc1 = nn.Linear(4, 128)
        self.actor = nn.Linear(128, 2)  # 2个动作
        self.critic = nn.Linear(128, 1)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        action_probs = F.softmax(self.actor(x), dim=-1)
        state_values = self.critic(x)
        return action_probs, state_values

### 2. 定义训练过程

在A2C的训练过程中，我们需要计算优势函数来更新策略，并同时更新价值函数。

- 不使用目标网络


In [16]:
class ActorCriticTrainer:
    def __init__(self, env, model, gamma=0.99, lr=3e-3, max_steps=1000):
        self.env = env
        self.model = model  # 策略网络
        self.gamma = gamma  # 折扣因子
        self.max_steps = max_steps  # 最大步数
        self.optimizer = optim.Adam(model.parameters(), lr=lr)  # 优化器

    def train(self, num_episodes):
        for episode in range(num_episodes):
            state, _ = self.env.reset()
            log_probs = []
            values = []
            rewards = []
            masks = []
            entropy = 0
            step = 0
            while True:  # 利用策略网络控制游戏，完成一个回合交互
                if state is None or len(state) == 0:
                    print("State is empty or None, breaking the loop.")
                    break
                
                state_tensor = torch.FloatTensor(state).unsqueeze(0)  # 确保state非空
                probs, value = self.model(state_tensor)  # 获取策略网络概率分布和状态值
                m = torch.distributions.Categorical(probs)  # 根据策略生成动作概率分布
                action = m.sample()  # 采样动作
                next_state, reward, done, _, _ = self.env.step(action.item())  # 执行动作

                log_prob = m.log_prob(action)  # 计算log概率
                entropy += m.entropy().mean()  # 计算累计熵

                log_probs.append(log_prob)  # 记录log概率
                values.append(value)  # 记录状态值
                rewards.append(torch.tensor([reward], dtype=torch.float32))  # 记录奖励
                masks.append(torch.tensor([1 - done], dtype=torch.float32))  # 记录mask

                state = next_state  # 更新状态
                step += 1
                if done or step >= self.max_steps:  # 如果回合结束或者超过最大步数，结束
                    break

            # 玩完一个回合，再更新策略网络
            Qvals = []  # 计算Q值
            Qval = 0  # 初始化Q值
            for r, m in zip(reversed(rewards), reversed(masks)):  # 从后往前计算Q值
                Qval = r + self.gamma * Qval * m  # hay_y计算公式
                Qvals.insert(0, Qval)  # 插入到Q值列表的第一个位置
            
            Qvals = torch.cat(Qvals).detach()  # 转换为张量
            log_probs = torch.cat(log_probs)  # log_prob转换为张量
            values = torch.cat(values)  # values转换为张量
            
            advantage = Qvals - values  # 计算优势

            # Calculate losses
            actor_loss = -(log_probs * advantage.detach()).mean()  # 计算actor损失
            critic_loss = advantage.pow(2).mean()  # 计算critic损失

            ac_loss = actor_loss + critic_loss - 0.001 * entropy  # 计算总损失

            # Perform backprop
            self.optimizer.zero_grad()  # 梯度清零
            ac_loss.backward()  # 反向传播
            self.optimizer.step()  # 更新参数

            if (episode + 1) % 100 == 0:  # 每100个episode打印一次
                print(f'Episode {episode + 1}: Last total rewards: {torch.tensor(rewards).sum().item()}')

    def evaluate(self, num_episodes=10):
        '''
        计算num_episodes个回合的平均奖励
        '''
        total_rewards = []  # 记录每个episode的总奖励
        for i in range(num_episodes):
            state, _ = self.env.reset()  # 重置环境
            episode_reward = 0
            done = False
            step = 0
            while not done:
                state = torch.FloatTensor(state).unsqueeze(0)
                with torch.no_grad():
                    probs, _ = self.model(state)
                    
                action = torch.argmax(probs).item()
                state, reward, done, _, _ = self.env.step(action)
                episode_reward += reward
                step += 1
                if step >= self.max_steps:
                    break
                
            total_rewards.append(episode_reward)
            print(f'Episode {i + 1}: Total Reward = {episode_reward}')
            
        return sum(total_rewards)/len(total_rewards)

- 使用目标网络训练

In [33]:
class ActorCriticTrainerWithTargetNet:
    def __init__(self, env, model, gamma=0.99, lr=3e-3, max_steps=1000, update_target_interval=10):
        self.env = env
        self.model = model
        self.target_model = copy.deepcopy(model)  # 创建目标网络的深拷贝
        self.target_model.load_state_dict(model.state_dict())
        self.target_model.eval()
        self.gamma = gamma
        self.max_steps = max_steps
        self.optimizer = optim.Adam(model.parameters(), lr=lr)
        self.update_target_interval = update_target_interval  # 目标网络更新间隔

    def train(self, num_episodes):
        for episode in range(num_episodes):
            state, _ = self.env.reset()
            log_probs = []
            values = []
            rewards = []
            masks = []
            entropy = 0
            step = 0
            while True:
                state_tensor = torch.FloatTensor(state).unsqueeze(0)
                probs, value = self.model(state_tensor)
                m = torch.distributions.Categorical(probs)
                action = m.sample()
                next_state, reward, done, _, _ = self.env.step(action.item())

                log_prob = m.log_prob(action)
                entropy += m.entropy().mean()

                log_probs.append(log_prob)
                values.append(value)
                rewards.append(torch.tensor([reward], dtype=torch.float32))
                masks.append(torch.tensor([1 - done], dtype=torch.float32))

                state = next_state
                step += 1
                if done or step >= self.max_steps:
                    break

            Qvals = []
            Qval = 0
            for r, m in zip(reversed(rewards), reversed(masks)):
                Qval = r + self.gamma * Qval * m
                Qvals.insert(0, Qval)

            Qvals = torch.cat(Qvals).detach()
            values = torch.cat(values)
            advantage = Qvals - values

            actor_loss = -(torch.cat(log_probs) * advantage.detach()).mean()
            critic_loss = advantage.pow(2).mean()
            ac_loss = actor_loss + critic_loss - 0.001 * entropy

            self.optimizer.zero_grad()
            ac_loss.backward()
            self.optimizer.step()

            if (episode + 1) % self.update_target_interval == 0:
                self.target_model.load_state_dict(self.model.state_dict())

            if (episode + 1) % 100 == 0:
                print(f'Episode {episode + 1}: Last total rewards: {sum(rewards).item()}')
    
    def evaluate(self, num_episodes=10):
        '''
        计算num_episodes个回合的平均奖励
        '''
        total_rewards = []  # 记录每个episode的总奖励
        for i in range(num_episodes):
            state, _ = self.env.reset()  # 重置环境
            episode_reward = 0
            done = False
            step = 0
            while not done:
                state = torch.FloatTensor(state).unsqueeze(0)
                with torch.no_grad():
                    probs, _ = self.model(state)
                    
                action = torch.argmax(probs).item()
                state, reward, done, _, _ = self.env.step(action)
                episode_reward += reward
                step += 1
                if step >= self.max_steps:
                    break
                
            total_rewards.append(episode_reward)
            print(f'Episode {i + 1}: Total Reward = {episode_reward}')
            
        return sum(total_rewards)/len(total_rewards)

### 3. 运行训练过程

创建环境和模型，然后启动训练。


- 不使用目标网络

In [20]:
env = gym.make('CartPole-v1')
model = ActorCritic()
trainer = ActorCriticTrainer(env, model)
trainer.train(num_episodes=800)

  if not isinstance(terminated, (bool, np.bool8)):


Episode 100: Last total rewards: 55.0
Episode 200: Last total rewards: 688.0
Episode 300: Last total rewards: 166.0
Episode 400: Last total rewards: 1000.0
Episode 500: Last total rewards: 917.0
Episode 600: Last total rewards: 303.0
Episode 700: Last total rewards: 233.0
Episode 800: Last total rewards: 1000.0


In [21]:
trainer.evaluate(num_episodes=10)

Episode 1: Total Reward = 1000.0
Episode 2: Total Reward = 1000.0
Episode 3: Total Reward = 1000.0
Episode 4: Total Reward = 1000.0
Episode 5: Total Reward = 1000.0
Episode 6: Total Reward = 1000.0
Episode 7: Total Reward = 1000.0
Episode 8: Total Reward = 1000.0
Episode 9: Total Reward = 1000.0
Episode 10: Total Reward = 1000.0


1000.0

- 使用目标网络训练

In [43]:
env = gym.make('CartPole-v1')
model = ActorCritic()
trainer = ActorCriticTrainerWithTargetNet(env, model, lr=5e-3, update_target_interval=25)
trainer.train(num_episodes=800)

  if not isinstance(terminated, (bool, np.bool8)):


Episode 100: Last total rewards: 80.0
Episode 200: Last total rewards: 77.0
Episode 300: Last total rewards: 293.0
Episode 400: Last total rewards: 246.0
Episode 500: Last total rewards: 241.0
Episode 600: Last total rewards: 301.0
Episode 700: Last total rewards: 293.0
Episode 800: Last total rewards: 809.0


In [44]:
trainer.evaluate(num_episodes=10)

Episode 1: Total Reward = 1000.0
Episode 2: Total Reward = 1000.0
Episode 3: Total Reward = 1000.0
Episode 4: Total Reward = 1000.0
Episode 5: Total Reward = 1000.0
Episode 6: Total Reward = 1000.0
Episode 7: Total Reward = 1000.0
Episode 8: Total Reward = 1000.0
Episode 9: Total Reward = 1000.0
Episode 10: Total Reward = 1000.0


1000.0

在Actor-Critic方法中，Actor的目标函数 $L$ 经常被设计为期望的负对数概率乘以优势函数的形式。这个函数是：

 $$L(\theta) = -\mathbb{E}_{\tau \sim \pi_\theta}\left[\sum_{t=0}^T \log \pi_\theta(a_t|s_t) \cdot A^\pi(s_t, a_t)\right]$$

其中，$\pi_\theta(a_t|s_t)$ 是由参数 $\theta$ 确定的在给定状态 $s_t$ 下选择动作 $a_t$ 的策略，$A^\pi(s_t, a_t)$ 是在状态 $s_t$ 下选择动作 $a_t$ 的优势函数。

### 计算 $L$ 对 $\theta$ 的导数

要找到 $L$ 对策略参数 $\theta$ 的导数（也称为策略梯度），我们可以使用链式法则。根据策略梯度定理，我们有：

$$ \nabla_\theta L(\theta) = -\mathbb{E}_{\tau \sim \pi_\theta}\left[\sum_{t=0}^T \nabla_\theta \log \pi_\theta(a_t|s_t) \cdot A^\pi(s_t, a_t)\right] $$

进一步展开这个梯度的计算：

1. **对数概率的梯度** $\nabla_\theta \log \pi_\theta(a_t|s_t)$：
   - 这是策略网络对其输出（动作概率）的敏感度的度量。使用链式法则，我们可以找到策略网络输出相对于其参数的梯度。
   - 由于 $\log \pi_\theta(a_t|s_t)$ 是对数似然，其对策略参数 $\theta$ 的梯度可以直接通过自动微分（如PyTorch中的autograd）获得。

2. **乘以优势函数** $A^\pi(s_t, a_t)$：
   - 优势函数衡量选择特定动作相对于平均情况的额外价值。它不依赖于参数 $\theta$，因此在求导时仅作为乘法因子。

因此，策略梯度实际上指示了如何调整策略参数 $\theta$ 以增加获得高优势的动作的概率，同时减少获得低或负优势的动作的概率。这种更新方式是在学习过程中推动策略向着增加预期回报的方向演进。

### 为什么使用 $-\mathbb{E}[\cdot]$

由于我们通常使用梯度下降方法最小化损失函数，而策略梯度方法的目标是最大化总回报，因此在实际实现中，我们取目标函数的负值（即最大化 $\mathbb{E}[\cdot]$ 转换为最小化 $-\mathbb{E}[\cdot]$）。这样，就可以直接应用标准的优化算法（如SGD或Adam）来进行参数更新。