# 前期准备

## 创建环境

In [1]:
import gymnasium as gym

# 创建Lunar Lander环境，这里使用离散动作空间的版本
env = gym.make("LunarLander-v2")


## 探索环境
了解 动作空间, 观察空间

后续 代理设计 需要

In [3]:
print("Action Space:", env.action_space)
print("Observation Space:", env.observation_space)

Action Space: Discrete(4)
Observation Space: Box([-1.5       -1.5       -5.        -5.        -3.1415927 -5.
 -0.        -0.       ], [1.5       1.5       5.        5.        3.1415927 5.        1.
 1.       ], (8,), float32)


## 示例代码

随机动作

In [1]:
import gymnasium as gym
env = gym.make("LunarLander-v2", render_mode="human")
observation, info = env.reset()

for _ in range(1000):
    action = env.action_space.sample()  # agent policy that uses the observation and info
    
    observation, reward, terminated, truncated, info = env.step(action)
    
    # print("{}, {}, {}, {}".format(observation, reward, terminated, truncated))

    if terminated or truncated:
        observation, info = env.reset()

env.close()

## 显卡测试

pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118

pip install torch==2.2.2+cu118 torchvision==0.14.2+cu118 torchaudio==0.12.2+cu118 -f https://download.pytorch.org/whl/torch_stable.html


In [4]:
import torch

print("PyTorch version: ", torch.__version__)
print("CUDA version: ", torch.version.cuda)

# 输出是否可以使用 CUDA
print("CUDA available: ", torch.cuda.is_available())


PyTorch version:  2.2.2+cu118
CUDA version:  11.8
CUDA available:  True


# 正式工作 (使用DQN)

训练一个Agent, 根据Observation和info, 决定最佳Action

## 01 导入并初始化环境

In [100]:
import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
from collections import deque
import matplotlib.pyplot as plt

env = gym.make("LunarLander-v2")


## 02 定义DQN网络模型

通过PyTorch的nn模块, 来构建神经网络

In [101]:
import torch
import torch.nn as nn
import torch.optim as optim

class DQN(nn.Module):
    def __init__(self, state_size, action_size):
        "state_size: 状态空间的维度"
        "action_size: 动作空间的维度"
        super(DQN, self).__init__()
        # 定义神经网络的结构
        # 输入层的维度是状态空间的维度, 输出层的维度是动作空间的维度
        # 输入层 -(fc1)> 128 -(fc2)> 64 -(fc3)> 输出层 (常见的三层全连接神经网络)
        # fc1, fc2, fc3 分别是三个全连接层, 用于处理输入数据
        self.fc1 = nn.Linear(state_size, 128) 
        self.relu = nn.ReLU() # relu 是激活函数, 用于增加网络的非线性 (可以在每个全连接层后面添加, 是一个超参数)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, action_size)

    # 前向传播: 计算出网络的输出和损失, 用来更新网络
    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        return self.fc3(x) # 返回一个向量, 其维度 = 动作空间的维度. i.e., 每个动作的Q值, 即给定状态下动作的价值(分数 / 预期回报)


## 03 实现DQN Agent

创建一个Agent类, 用来实现DQN的训练 (包括经验回放)

In [102]:
import random
import numpy as np
from collections import deque

class DQNAgent:
    def __init__(self, state_size, action_size):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")  # 确定设备
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=2000)
        self.gamma = 0.95
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.model = DQN(state_size, action_size)
        self.optimizer = optim.Adam(self.model.parameters())

    # 定义 Agent如何根据State选择Action
    def act(self, state):
        # state = torch.from_numpy(state).float().unsqueeze(0).to(self.device) # 将状态转换为张量 (还要添加一个维度), 以便输入到网络中
        
        # 使用 ε-greedy 策略选择动作
        if np.random.rand() <= self.epsilon: # 如果随机数小于 ε, 则随机选择一个动作, 用于探索
            return random.randrange(self.action_size)
        
        # 得到Q值, 用于选择动作
        self.model.eval() # 将模型设置为评估模式, 这样可以避免在评估模型时进行梯度更新
        with torch.no_grad(): # 不需要计算梯度, 因为我们只是在评估模型
            action_values = self.model(state) # 用当前状态获取每个动作的Q值
            
        self.model.train() # 修改回训练模式, 以便在训练模型时进行梯度更新 (模型的参数可以继续更新)
        return np.argmax(action_values.cpu().data.numpy()) # 根据Q值选择最佳动作
    
    # 用于 经验回放 (Experience Replay)
    # 当Agent在Environment中执行Action 并观察到新的状态和奖励时, 将这些信息存储, 之后用于训练网络模型
    def store(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    # 经验回放 (Experience Replay)
    # 打破数据之间的相关性, 提高训练的稳定性
    def replay(self, batch_size):
        if len(self.memory) < batch_size: # 如果记忆库中的样本数量小于批量大小, 则不执行
            return
        
        minibatch = random.sample(self.memory, batch_size) # 从记忆库中随机选择一个批量的经验
        states, actions, rewards, next_states, dones = zip(*minibatch) # 将批量经验拆分为状态, 动作, 奖励, 下一个状态, 完成标志
        # 将拆分的经验转换为张量, 以便输入到网络中
        states = torch.tensor(states, dtype =torch.float32).to(self.device).squeeze(1)
        actions = torch.tensor(actions, dtype=torch.long).to(self.device).unsqueeze(1)
        rewards = torch.tensor(rewards, dtype=torch.float32).to(self.device).unsqueeze(1)
        next_states = torch.tensor(next_states, dtype=torch.float32).to(self.device).squeeze(1)
        dones = torch.tensor(dones, dtype=torch.bool).to(self.device).unsqueeze(1)

        # 打印所有张量的形状
        # print("states: ", states.shape)
        # print("actions: ", actions.shape)
        # print("rewards: ", rewards.shape)
        # print("next_states: ", next_states.shape)
        # print("dones: ", dones.shape)

        Q_targets_next = self.model(next_states).detach().max(1)[0].unsqueeze(1) # 使用目标网络计算下一个状态的Q值, 用于计算目标Q值
        Q_targets = rewards + (self.gamma * Q_targets_next * (~dones)) # 计算目标Q值, 用于更新当前状态的Q值
        Q_expected = self.model(states).gather(1, actions) # 计算预期Q值, 用于计算损失

        loss = nn.MSELoss()(Q_expected, Q_targets) # 计算均方误差损失
        self.optimizer.zero_grad() # 梯度清零, 以便在每次迭代中重新计算梯度
        loss.backward() # 反向传播, 计算梯度
        self.optimizer.step() # 更新网络参数

        if self.epsilon > self.epsilon_min: # 更新 ε, 以便在训练过程中逐渐减小探索的概率, 以便在初期更多地探索, 在后期更多地利用经验
            self.epsilon *= self.epsilon_decay


## 04 训练Agent

In [103]:
import os

batch_size = 64

current_dir = os.getcwd()
model_dir = os.path.join(current_dir, "models")
os.makedirs(model_dir, exist_ok=True) # 创建模型保存目录


agent = DQNAgent(env.observation_space.shape[0], env.action_space.n)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)
agent.model.to(device)  # 移动模型到GPU

for e in range(1000): # 训练1000个episode
    full_state = env.reset()
    state = full_state[0] # 提取向量
    # print("初始状态", state)
    state = torch.from_numpy(np.reshape(state, [1, -1])).float().to(device) # 把state转换为 网络模型 接受的形状
    
    for time in range(500): # 一个episode最多执行500个时间步
        action = agent.act(state)
        next_state, reward, done, truncated, info = env.step(action)  # 执行动作, 获取下一个状态, 奖励, 完成标志, 和
        # print("下一个状态", next_state)
        next_state = torch.from_numpy(np.reshape(next_state, [1, -1])).float().to(device) # 把next_state转换为 网络模型 接受的形状
        reward = torch.tensor([reward], device=device) # 把reward转换为张量
        done = torch.tensor([done], device=device)
        
        # agent.store(state, action, reward, next_state, done) # 存储经验
        agent.store(state.cpu().numpy(), action, reward.cpu().numpy(), next_state.cpu().numpy(), done.cpu().numpy()) # 存储经验
        state = next_state # 更新状态
        
        if done:
            print("Episode: {}/{}, score: {}".format(e, 1000, time)) # score 是每个episode的时间步数, 越大越好, 表示Agent在环境中存活的时间
            break
        if len(agent.memory) > batch_size: # 当记忆库中的样本数量大于32时, 开始经验回放
            agent.replay(batch_size)
    
    # 每个episode结束后, 保存模型
    model_path = os.path.join(model_dir, "model_0.pth".format(e))
    torch.save(agent.model.state_dict(), model_path)
    print("模型已保存至", model_path)


Using device: cuda


  dones = torch.tensor(dones, dtype=torch.bool).to(self.device).unsqueeze(1)
  return F.mse_loss(input, target, reduction=self.reduction)


Episode: 0/1000, score: 96
模型已保存至 c:\Users\isrya\#MyFiles\#MyCode\GithubUoL\COMP532\AS02\models\model_0.pth
Episode: 1/1000, score: 89
模型已保存至 c:\Users\isrya\#MyFiles\#MyCode\GithubUoL\COMP532\AS02\models\model_0.pth
Episode: 2/1000, score: 142
模型已保存至 c:\Users\isrya\#MyFiles\#MyCode\GithubUoL\COMP532\AS02\models\model_0.pth
Episode: 3/1000, score: 127
模型已保存至 c:\Users\isrya\#MyFiles\#MyCode\GithubUoL\COMP532\AS02\models\model_0.pth
Episode: 4/1000, score: 169
模型已保存至 c:\Users\isrya\#MyFiles\#MyCode\GithubUoL\COMP532\AS02\models\model_0.pth
Episode: 5/1000, score: 100
模型已保存至 c:\Users\isrya\#MyFiles\#MyCode\GithubUoL\COMP532\AS02\models\model_0.pth
Episode: 6/1000, score: 192
模型已保存至 c:\Users\isrya\#MyFiles\#MyCode\GithubUoL\COMP532\AS02\models\model_0.pth
Episode: 7/1000, score: 56
模型已保存至 c:\Users\isrya\#MyFiles\#MyCode\GithubUoL\COMP532\AS02\models\model_0.pth
Episode: 8/1000, score: 97
模型已保存至 c:\Users\isrya\#MyFiles\#MyCode\GithubUoL\COMP532\AS02\models\model_0.pth
Episode: 9/1000, score:

# Test

In [105]:
import gymnasium as gym
import numpy as np

import torch# 确保从包含DQN类定义的文件中导入
import os

# 假设状态空间和动作空间的维度已知
state_size = 8  # 根据你的环境设置
action_size = 4  # 根据你的环境设置

current_dir = os.getcwd()
model_dir = os.path.join(current_dir, "models")
model_path = os.path.join(model_dir, "model_0.pth")

model = DQN(state_size, action_size)
model.load_state_dict(torch.load(model_path))
model.eval()  # 将模型设置为评估模式

env = gym.make("LunarLander-v2", render_mode="human")
observation, info = env.reset()

for _ in range(1000):
    # 将观测转换为适合模型的格式
    state = torch.tensor([observation], dtype=torch.float32)
    with torch.no_grad():  # 禁止torch追踪此处的梯度计算，因为我们在推理而不是训练
        action = model(state).max(1)[1].item()  # 获取最大Q值对应的动作

    observation, reward, terminated, truncated, info = env.step(action)
    
    # print("{}, {}, {}, {}".format(observation, reward, terminated, truncated))

    if terminated or truncated:
        observation, info = env.reset()

env.close()
