In [None]:
from gym import envs
d = envs.registry
for k, v in d.items():
    print(f"{k} -> {v}")

In [None]:
import gym
import matplotlib.pyplot as plt
from IPython import display

env = gym.make('CartPole-v1', render_mode='rgb_array')      
obs, _ = env.reset()

# 初始化图像对象
frame = env.render()
plt.ion()  # 打开交互模式
fig, ax = plt.subplots()
img = ax.imshow(frame)
display.display(plt.gcf())

for _ in range(500):
    frame = env.render()
    img.set_data(frame)  # 只更新图像数据，而不是重建
    display.clear_output(wait=True)
    display.display(plt.gcf())
    # plt.pause(0.001)  # 小延时允许刷新
    action = env.action_space.sample()
    print(f"Action taken: {action} {env.observation_space}")
    obs, _, terminated, truncated, _ = env.step(action)
    
    # if terminated or truncated:
        # print(f"{terminated} {truncated}")
        # obs, _ = env.reset()

plt.ioff()
env.close()


In [None]:
import gym
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Categorical
import numpy as np

# 定义策略网络
class PolicyNetwork(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(PolicyNetwork, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, output_dim)
        self.relu = nn.ReLU()
        self.softmax = nn.Softmax(dim=-1)
    
    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        x = self.softmax(x)
        return x

# 定义价值网络
class ValueNetwork(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim=1):
        super(ValueNetwork, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, output_dim)
        self.relu = nn.ReLU()
    
    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x

# PPO 代理
class PPOAgent:
    def __init__(self, input_dim, hidden_dim, output_dim, lr_policy=3e-4, lr_value=1e-3, gamma=0.99, lambd=0.95, epsilon=0.2):
        self.policy = PolicyNetwork(input_dim, hidden_dim, output_dim)
        self.value = ValueNetwork(input_dim, hidden_dim)
    
        self.optimizer_policy = optim.Adam(self.policy.parameters(), lr=lr_policy)
        self.optimizer_value = optim.Adam(self.value.parameters(), lr=lr_value)
    
        self.gamma = gamma
        self.lambd = lambd
        self.epsilon = epsilon
    
    def get_action(self, state):
        state = torch.from_numpy(state).float().unsqueeze(0)
        probs = self.policy(state)
        dist = Categorical(probs)
        action = dist.sample()
        log_prob = dist.log_prob(action)
        return action.item(), log_prob

    def compute_advantages(self, rewards, values, next_values, dones):
        advantages = torch.zeros_like(rewards)
        advantages[-1] = rewards[-1] + self.gamma * (1 - dones[-1]) * next_values[-1] - values[-1]
    
        for t in reversed(range(len(rewards)-1)):
            delta = rewards[t] + self.gamma * (1 - dones[t]) * next_values[t] - values[t]
            advantages[t] = delta + self.gamma * self.lambd * (1 - dones[t]) * advantages[t+1]
    
        returns = advantages + values
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
    
        return returns, advantages

    def update(self, states, actions, log_probs_old, returns, advantages):
        states = torch.tensor(states, dtype=torch.float)
        actions = torch.tensor(actions, dtype=torch.long).unsqueeze(-1)
        log_probs_old = torch.stack(log_probs_old).detach()
        returns = torch.tensor(returns, dtype=torch.float).unsqueeze(-1)
        advantages = torch.tensor(advantages, dtype=torch.float).unsqueeze(-1)
    
        for _ in range(10):  # 更新纪元数
            log_probs = self.policy(states).gather(1, actions)
            ratio = (log_probs / log_probs_old).exp()
        
            surr1 = ratio * advantages
            surr2 = torch.clamp(ratio, 1 - self.epsilon, 1 + self.epsilon) * advantages
        
            loss_policy = -torch.min(surr1, surr2).mean()
        
            self.optimizer_policy.zero_grad()
            loss_policy.backward()
            self.optimizer_policy.step()
        
            values = self.value(states)
            loss_value = nn.MSELoss()(values, returns)
        
            self.optimizer_value.zero_grad()
            loss_value.backward()
            self.optimizer_value.step()

def train_ppo(env, agent, max_timesteps=500, batch_size=5000, n_updates=10):
    state, _ = env.reset()  # 处理Gym v0.26+的返回值
    episode_rewards = []
    all_rewards = []
    t = 0

    while t < max_timesteps:
        states, actions, rewards, next_states, dones, log_probs = [], [], [], [], [], []
        batch_rewards = 0
    
        # 收集轨迹
        for _ in range(batch_size):
            action, log_prob = agent.get_action(state)
            next_state, reward, terminated, truncated, _ = env.step(action)  # Gym v0.26+的返回参数
            done = terminated or truncated  # 合并终止标志
        
            states.append(state)
            actions.append(action)
            rewards.append(reward)
            next_states.append(next_state)
            dones.append(done)
            log_probs.append(log_prob)
        
            if done:
                state, _ = env.reset()  # 重置环境并获取初始状态
            else:
                state = next_state
            batch_rewards += reward
            t += 1
        
            if done or t >= max_timesteps:
                break
    
        episode_rewards.append(batch_rewards)
        all_rewards.extend(rewards)
    
        # 计算回报和优势
        states_tensor = torch.tensor(np.array(states), dtype=torch.float)
        next_states_tensor = torch.tensor(np.array(next_states), dtype=torch.float)
        dones_tensor = torch.tensor(dones, dtype=torch.float).unsqueeze(-1)
    
        with torch.no_grad():
            values = agent.value(states_tensor).squeeze()
            next_values = agent.value(next_states_tensor).squeeze()
    
        returns, advantages = agent.compute_advantages(torch.tensor(rewards, dtype=torch.float), 
                                                       values, next_values, dones_tensor.squeeze())
    
        # 更新策略和价值网络
        agent.update(states, actions, log_probs, returns.numpy(), advantages.numpy())
    
        if t >= max_timesteps:
            break

    return all_rewards, episode_rewards

# 初始化环境
env = gym.make('CartPole-v1')
input_dim = env.observation_space.shape[0]
output_dim = env.action_space.n
hidden_dim = 64

# 初始化PPO代理
agent = PPOAgent(input_dim, hidden_dim, output_dim)

# 训练代理
max_timesteps = 1000
batch_size = 2048
all_rewards, episode_rewards = train_ppo(env, agent, max_timesteps, batch_size)

# 打印每集的平均奖励
print("每集的平均奖励:", np.mean(episode_rewards))

In [None]:
import matplotlib.pyplot as plt
from IPython import display
env = gym.make('CartPole-v1', render_mode='rgb_array')    
state, _ = env.reset()  # 处理Gym v0.26+的返回值
frame = env.render()
plt.ion()  # 打开交互模式
fig, ax = plt.subplots()
img = ax.imshow(frame)
display.display(plt.gcf())

for _ in range(batch_size):
    try:
        frame = env.render()
    except Exception as e:
        print(f"渲染错误: {e}")
        break
    img.set_data(frame)  # 只更新图像数据，而不是重建
    display.clear_output(wait=True)
    display.display(plt.gcf())
    action, log_prob = agent.get_action(state)
    next_state, reward, terminated, truncated, _ = env.step(action)  # Gym v0.26+的返回参数
    state = next_state
    if  truncated:
        state, _ = env.reset()  # 重置环境并获取初始状态
plt.ioff()
env.close()