使用REINFORCE，模型训练了大概3000轮，基本可以稳赢，但是离理想中的21：0碾压局还差很多

In [1]:
import sys
import gymnasium as gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Categorical
import matplotlib.pyplot as plt
from collections import deque, Counter
import os
from matplotlib import animation
from PIL import Image
import ale_py
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

env = gym.make('ALE/Pong-v5')
print(env.observation_space)
print(env.action_space)

def preprocess(image):
    #预处理 210x160x3 uint8 frame into 6400 (80x80) 1维 float vector
    image = image[35:195]  # 裁剪
    image = image[::2, ::2, 0]  # 下采样，缩放2倍
    image[image == 144] = 0  # 擦除背景 (background type 1)
    image[image == 109] = 0  # 擦除背景 
    image[image != 0] = 1  # 转为灰度图，除了黑色外其他都是白色
    return image.astype(np.float32).ravel()  # 打平,(6400,)

class Model(nn.Module):
    """ 使用全连接网络.
    参数:
        obs_dim (int): 观测空间的维度.
        act_dim (int): 动作空间的维度.
    """
    def __init__(self, obs_dim, act_dim):
        super(Model, self).__init__()
        hid1_size = 256
        hid2_size = 64
        
        self.fc1 = nn.Linear(obs_dim, hid1_size)
        self.fc2 = nn.Linear(hid1_size, hid2_size)
        self.fc3 = nn.Linear(hid2_size, act_dim)
    
    def forward(self, obs):
        h1 = F.relu(self.fc1(obs))
        h2 = F.relu(self.fc2(h1))
        prob = F.softmax(self.fc3(h2), dim=-1)
        return prob

# 梯度下降算法
class PolicyGradient():
    def __init__(self, model, lr):
        self.model = model
        self.optimizer = optim.Adam(self.model.parameters(), lr=lr)
    
    def predict(self, obs):
        prob = self.model(obs)
        return prob
    
    def learn(self, obs, action, reward):
        prob = self.model(obs)
        dist = Categorical(prob)
        log_prob = dist.log_prob(action.squeeze(-1))
        loss = torch.mean(-1 * log_prob * reward.squeeze(-1))
        
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        return loss

class Agent():
    def __init__(self, algorithm):
        self.alg = algorithm
        
        if os.path.exists("./savemodel"):
            print("开始从文件加载参数....")
            try:
                self.load()
                print("从文件加载参数结束....")
            except:
                print("从文件加载参数失败，从0开始训练....")
    
    def sample(self, obs):
        """ 根据观测值 obs 采样（带探索）一个动作 """
        obs = torch.FloatTensor(obs).unsqueeze(0).to(device)
        prob = self.alg.predict(obs)
        prob = prob.detach().cpu().numpy()[0]
        act = np.random.choice(len(prob), 1, p=prob)[0]  # 根据动作概率选取动作
        return act
    
    def predict(self, obs):
        """ 根据观测值 obs 选择最优动作 """
        obs = torch.FloatTensor(obs).unsqueeze(0).to(device)
        prob = self.alg.predict(obs)
        act = prob.argmax().detach().cpu().item()  # 使用item()获取标量值
        return act
    
    def learn(self, obs, act, reward):
        """ 根据训练数据更新一次模型参数 """
        act = np.expand_dims(act, axis=-1)
        reward = np.expand_dims(reward, axis=-1)
        
        obs = torch.FloatTensor(obs).to(device)
        act = torch.LongTensor(act).to(device)
        reward = torch.FloatTensor(reward).to(device)
        
        loss = self.alg.learn(obs, act, reward)
        return loss.detach().cpu().numpy()
    
    def save(self):
        os.makedirs("./savemodel", exist_ok=True)
        torch.save(self.alg.model.state_dict(), './savemodel/PG-Pong_net.pdparams')
        torch.save(self.alg.optimizer.state_dict(), "./savemodel/opt.pdopt")
    
    def load(self):
        # 加载网络参数
        model_state_dict = torch.load('./savemodel/PG-Pong_net.pdparams', map_location=device)
        self.alg.model.load_state_dict(model_state_dict)
        
        # 加载优化器参数 
        # optimizer_state_dict = torch.load("./savemodel/opt.pdopt", map_location=device)
        # self.alg.optimizer.load_state_dict(optimizer_state_dict)

# 训练一个episode
def run_train_episode(agent, env):
    obs_list, action_list, reward_list = [], [], []
    obs, info = env.reset()
    while True:
        obs = preprocess(obs)  # from shape (210, 160, 3) to (6400,)
        obs_list.append(obs)
        action = agent.sample(obs)
        
        action_list.append(action)
        
        obs, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated
        # if reward!=0:
        #     print("reward: ",action)
        
        reward_list.append(reward)
        
        if done:
            break
    return obs_list, action_list, reward_list

# 评估 agent, 跑 5 个episode，总reward求平均
def run_evaluate_episodes(agent, env, render=False):
    eval_reward = []
    for i in range(5):
        obs, info = env.reset()
        episode_reward = 0
        while True:
            obs = preprocess(obs)  # from shape (210, 160, 3) to (6400,)
            action = agent.predict(obs)
            obs, reward, terminated, truncated, _ = env.step(action)
            isOver = terminated or truncated
            episode_reward += reward
            if render:
                env.render()
            if isOver:
                break
        eval_reward.append(episode_reward)
    return np.mean(eval_reward)

def calc_reward_to_go(reward_list, gamma=0.99):
    """calculate discounted reward"""
    reward_arr = np.array(reward_list)
    for i in range(len(reward_arr) - 2, -1, -1):
        # G_t = r_t + γ·r_t+1 + ... = r_t + γ·G_t+1
        reward_arr[i] += gamma * reward_arr[i + 1]
    
    # normalize episode rewards
    reward_arr -= np.mean(reward_arr)
    reward_arr /= np.std(reward_arr)
    return reward_arr

def main():
    env = gym.make('ALE/Pong-v5')
    obs_dim = 80 * 80
    act_dim = env.action_space.n
    print('obs_dim {}, act_dim {}'.format(obs_dim, act_dim))
    
    # 根据parl框架构建agent
    LEARNING_RATE = 5e-4
    model = Model(obs_dim=obs_dim, act_dim=act_dim).to(device)
    alg = PolicyGradient(model, lr=LEARNING_RATE)
    agent = Agent(alg)
    
    # twriter=LogWriter('./logs/PG_Pong')  # 注释掉visualdl
    
    for i in range(100):  # default 3000
        obs_list, action_list, reward_list = run_train_episode(agent, env)
 
        if i % 50 == 0:
            print("Episode {}, Reward Sum {}.".format(i, sum(reward_list)))
        
        batch_obs = np.array(obs_list)
        batch_action = np.array(action_list)
        batch_reward = calc_reward_to_go(reward_list)
        

        
        agent.learn(batch_obs, batch_action, batch_reward)
        last_test_total_reward = 0  
        if (i + 1) % 100 == 0:
            # render=True 查看显示效果
            total_reward = run_evaluate_episodes(agent, env, render=False)
            print('Test reward: {}'.format(total_reward))
            
            # save the parameters
            if last_test_total_reward < total_reward:
                last_test_total_reward = total_reward
                agent.save()

# 运行整个程序
main()

# # 9.使用训练好的网络进行测试并生成动图
# def save_frames_as_gif(frames, filename):
#     # Mess with this to change frame size
#     plt.figure(figsize=(frames[0].shape[1]/100, frames[0].shape[0]/100), dpi=300)
    
#     patch = plt.imshow(frames[0])
#     plt.axis('off')
    
#     def animate(i):
#         patch.set_data(frames[i])
    
#     anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=50)
#     anim.save(filename, writer='pillow', fps=60)

# 从文件加载模型参数
print("\n测试训练好的模型...")
test_model = Model(6400, 6).to(device)
if os.path.exists("./savemodel/PG-Pong_net.pdparams"):
    model_state_dict = torch.load("./savemodel/PG-Pong_net.pdparams", map_location=device)
    test_model.load_state_dict(model_state_dict)
    
    # 9.4 使用训练好的模型进行测试并保存过程为动图
    # 创建带渲染模式的环境
    env_test = gym.make('ALE/Pong-v5', render_mode='rgb_array')
    
    state, info = env_test.reset()
    frames = []
    done = False
    i = 0
    reward_list = []
    
    while not done:
        # Gymnasium中正确的获取帧方式
        frame = env_test.render()
        if frame is not None:
            frames.append(frame)
        
        obs = preprocess(state)
        obs = torch.FloatTensor(obs).unsqueeze(0).to(device)
        prob = test_model(obs)
        action = prob.argmax().detach().cpu().item()  # 使用item()获取标量值
        next_state, reward, terminated, truncated, _ = env_test.step(action)
        done = terminated or truncated
        if reward != 0:
            reward_list.append(reward)
            print(i, "   ", reward, done)
        state = next_state
        i += 1
    
    reward_counter = Counter(reward_list)
    print(reward_counter)
    print("你的得分为：", reward_counter.get(1.0, 0), '对手得分为：', reward_counter.get(-1.0, 0))
    if reward_counter.get(1.0, 0) > reward_counter.get(-1.0, 0):
        print("恭喜您赢了！！！")
    else:
        print("惜败，惜败，训练一下智能体网络再来挑战吧QWQ")
    
    # # 只有在收集到帧时才保存gif
    # if frames:
    #     print(f"保存 {len(frames)} 帧为动图...")
    #     save_frames_as_gif(frames, filename="Pong-v5_trained.gif")
    #     print("动图已保存")
    # else:
    #     print("未收集到渲染帧，跳过动图保存")
    
    # env_test.close()
else:
    print("未找到训练好的模型文件，请先训练模型")

Using device: cuda
Box(0, 255, (210, 160, 3), uint8)
Discrete(6)
obs_dim 6400, act_dim 6
开始从文件加载参数....
从文件加载参数结束....
Episode 0, Reward Sum 16.0.
Episode 50, Reward Sum 13.0.
Test reward: -3.6

测试训练好的模型...
63     -1.0 False
126     -1.0 False
174     1.0 False
313     1.0 False
378     -1.0 False
506     1.0 False
661     -1.0 False
770     1.0 False
909     1.0 False
988     1.0 False
1066     1.0 False
1144     1.0 False
1332     -1.0 False
1395     -1.0 False
1458     -1.0 False
1521     -1.0 False
1584     -1.0 False
1647     -1.0 False
1710     -1.0 False
1773     -1.0 False
1821     1.0 False
1960     1.0 False
2054     -1.0 False
2102     1.0 False
2235     1.0 False
2360     -1.0 False
2408     1.0 False
2606     1.0 False
2700     -1.0 False
2748     1.0 False
2826     1.0 False
2965     1.0 False
3044     1.0 False
3122     1.0 False
3262     1.0 False
3340     1.0 True
Counter({1.0: 21, -1.0: 15})
你的得分为： 21 对手得分为： 15
恭喜您赢了！！！
