In [8]:
import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
from collections import deque

# Create the CartPole environment
env = gym.make("CartPole-v1", render_mode=None)

In [9]:
# Define the neural network
class DQN(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_dim, 24)
        self.fc2 = nn.Linear(24, 24)
        self.fc3 = nn.Linear(24, output_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

In [10]:
# # Define the neural network. This is a linear regression model
# class DQN(nn.Module):
#     def __init__(self, input_dim, output_dim):
#         super(DQN, self).__init__()
#         self.fc1 = nn.Linear(input_dim, output_dim)

#     def forward(self, x):
#         return self.fc1(x)

In [11]:
# Parameters
gamma = 0.99                    # 折扣因子
epsilon = 0.1                   # epsilon-greedy策略中的epsilon
learning_rate = 0.001           # 梯度下降的学习率
batch_size = 64                 # 每次从经验回放池中取出的样本数量
memory_size = 10000             # 经验回放池大小

num_episodes = 1000            # 训练的总episode数量
target_update = 10            # 每隔多少个episode更新一次目标网络

In [12]:
# Initialize DQN
input_dim = env.observation_space.shape[0]    # 状态空间维度，作为输入维度
output_dim = env.action_space.n               # 动作空间维度，作为输出维度
policy_net = DQN(input_dim, output_dim)       # 策略网络，用于计算Q值
target_net = DQN(input_dim, output_dim)       # 目标网络，用于计算目标Q值
target_net.load_state_dict(policy_net.state_dict())  # 初始化目标网络参数与策略网络参数相同
target_net.eval()                             # 目标网络设置为评估模式，不进行梯度更新

optimizer = optim.Adam(policy_net.parameters(), lr=learning_rate)   # 优化器选择Adam，和SGD相比更加稳定，也是目前最常用的优化器
memory = deque(maxlen=memory_size)            # 经验回放池，用于存储经验

In [13]:
def epsilon_greedy(state, epsilon):         # epsilon-greedy策略
    if random.random() < epsilon:           # 以epsilon的概率随机选择动作
        return env.action_space.sample()
    else:                                   # 以1-epsilon的概率选择Q值最大的动作
        with torch.no_grad():
            state = torch.tensor(state, dtype=torch.float32)
            return policy_net(state).argmax().item()


def train_step():                           # 训练神经网络，一步更新
    if len(memory) < batch_size:            # 经验回放池中样本数量不足时，不训练
        return

    batch = random.sample(memory, batch_size)                           # 从经验回放池中随机取出batch_size个样本
    states, actions, rewards, next_states, dones = zip(*batch)          # 将样本拆分为状态、动作、奖励、下一个状态、终止标志

    # 将样本转换为torcch.tensor格式，用于训练
    states = torch.tensor(states, dtype=torch.float32)
    actions = torch.tensor(actions, dtype=torch.int64).unsqueeze(1)     # unsqueeze(1)用于将动作转换为(batch_size, 1)的格式
    rewards = torch.tensor(rewards, dtype=torch.float32).unsqueeze(1)
    next_states = torch.tensor(next_states, dtype=torch.float32)
    dones = torch.tensor(dones, dtype=torch.float32).unsqueeze(1)

    q_values = policy_net(states).gather(1, actions)                      # 计算当前状态下选择动作的Q值, gather用于根据action取值
    max_next_q_values = target_net(next_states).max(1, keepdim=True)[0]   # 计算下一个状态的最大Q值
    target_q_values = rewards + (gamma * max_next_q_values * (1 - dones)) # 计算目标Q值

    loss = nn.MSELoss()(q_values, target_q_values)                        # 计算损失
    optimizer.zero_grad()                                                 # 梯度清零
    loss.backward()                                                       # 反向传播
    optimizer.step()                                                      # 更新参数

In [14]:
# 训练
for episode in range(num_episodes):
    state, _ = env.reset()     # 初始化环境
    total_reward = 0           # 记录每个episode的总奖励

    while True:                # 循环直到episode结束
        action = epsilon_greedy(state, epsilon)      # 选择动作
        next_state, reward, done, truncated, _ = env.step(action)  # 执行动作，获取下一个状态、奖励、终止标志
        memory.append((state, action, reward, next_state, done))   # 将样本存入经验回放池

        state = next_state     # 更新状态
        total_reward += reward # 更新总奖励，用于记录
        train_step()           # 训练神经网络

        if done or truncated:  # episode结束时，跳出循环
            print(f"Episode {episode}, Total Reward: {total_reward}")
            break

    # 每隔target_update个episode更新一次目标网络
    if episode % target_update == 0:
        target_net.load_state_dict(policy_net.state_dict())

Episode 0, Total Reward: 9.0
Episode 1, Total Reward: 8.0
Episode 2, Total Reward: 8.0
Episode 3, Total Reward: 10.0
Episode 4, Total Reward: 10.0
Episode 5, Total Reward: 10.0
Episode 6, Total Reward: 10.0
Episode 7, Total Reward: 10.0
Episode 8, Total Reward: 9.0
Episode 9, Total Reward: 11.0
Episode 10, Total Reward: 9.0
Episode 11, Total Reward: 9.0
Episode 12, Total Reward: 10.0
Episode 13, Total Reward: 9.0
Episode 14, Total Reward: 9.0
Episode 15, Total Reward: 10.0
Episode 16, Total Reward: 8.0
Episode 17, Total Reward: 10.0
Episode 18, Total Reward: 11.0
Episode 19, Total Reward: 11.0
Episode 20, Total Reward: 11.0
Episode 21, Total Reward: 9.0
Episode 22, Total Reward: 9.0
Episode 23, Total Reward: 9.0
Episode 24, Total Reward: 8.0
Episode 25, Total Reward: 9.0
Episode 26, Total Reward: 9.0
Episode 27, Total Reward: 8.0
Episode 28, Total Reward: 10.0
Episode 29, Total Reward: 10.0
Episode 30, Total Reward: 9.0
Episode 31, Total Reward: 9.0
Episode 32, Total Reward: 8.0
Episod

In [None]:
# Create the CartPole environment with render_mode="human"
env = gym.make("CartPole-v1", render_mode="human")

def play_episode(env, epsilon=0):    # 运行一个episode
    state, _ = env.reset()
    total_reward = 0

    while True:
        action = epsilon_greedy(state, epsilon)                     # epsilon为0，完全按照Q值选择动作
        next_state, reward, done, truncated, _ = env.step(action)   # 执行动作
        total_reward += reward                                      # 更新总奖励
        state = next_state                                          # 更新状态

        if done or truncated:
            break

    print(f"Total Reward in Displayed Episode: {total_reward}")

play_episode(env)     # 运行一个episode
env.close()                       # 关闭环境


Total Reward in Displayed Episode: 500.0


: 