# 深度强化学习 DQN 实验

实验采用 OpenAI Gymnasium 的 Frozen Lake 环境



In [None]:
import collections
import random
import torch
import torch.nn as nn
import torch.optim as optim
import gymnasium as gym
import matplotlib.pyplot as plt

In [None]:
BLANK = 0
LAKE = 1
GIFT = 2
PLAYER = 3
    
grid = torch.tensor([
    [BLANK, BLANK, BLANK, BLANK],
    [BLANK, LAKE, BLANK, LAKE],
    [BLANK, BLANK, BLANK, LAKE],
    [LAKE, BLANK, BLANK, GIFT]
], dtype=torch.long)

def player_on_grid(state):
    batch_size = state.size(0)
    ret = grid.unsqueeze(0).repeat(batch_size, 1, 1)
    indices = torch.cat((state.unsqueeze(-1) // 4, state.unsqueeze(-1) % 4), dim=-1)
    for i in range(batch_size):
        ret[i, indices[i,0], indices[i,1]] = PLAYER
    return ret

In [None]:
class DQN(nn.Module):
    def __init__(self, state_size=(4, 4), action_size=4, grid_states=4, d_model=16, lr=1e-3):
        super().__init__()
        self.state_size = state_size
        self.action_size = action_size
        self.grid_states = grid_states
        self.d_model = d_model
        
        self.emb = nn.Embedding(self.grid_states, self.d_model)
        
        self.featnet = nn.Sequential(
            nn.Conv2d(d_model, 2*d_model, kernel_size=3, stride=1, padding=1),          # (4,4,16) -> (4,4,32)
            nn.ReLU(),
            nn.Conv2d(2*d_model, 4*d_model, kernel_size=3, stride=1, padding=1),        # (4,4,32) -> (4,4,64)
            nn.ReLU(),
            nn.Conv2d(4*d_model, 8*d_model, kernel_size=3, stride=1),                   # (4,4,64) -> (2,2,128)
            nn.ReLU()
        )
        
        self.vnet = nn.Sequential(
            nn.Linear(self._feat_size(), self._feat_size() // 8),                        # (512) -> (64)
            nn.ReLU(),
            nn.Linear(self._feat_size() // 8, self.action_size)                          # (64) -> (4)
        )
        
        self.criterion = nn.MSELoss()
        self.optimizer = optim.Adam(self.parameters(), lr=lr)
        
    def _feat_size(self):
        return 8 * (self.state_size[0] - 2) * (self.state_size[1] - 2) * self.d_model
        
    def forward(self, x: torch.Tensor):
        batch_size = x.size(0)
        x = player_on_grid(x)
        x = self.emb(x)
        x = x.permute(0, 3, 1, 2)
        x = self.featnet(x)
        x = x.reshape(batch_size, -1)
        x = self.vnet(x)
        return x
    
    def act(self, x: int, eps=0.1):
        # Epsilon-Greedy
        if random.random() > eps:
            with torch.no_grad():
                out = self.forward(torch.tensor([x]))
                return torch.argmax(out, dim=-1).squeeze().item()
        else:
            return random.randint(0, self.action_size-1)
        
    def update(self, sample, gamma=1.0):
        state, action, reward, next_state, done = sample
        state = torch.tensor(state)
        action = torch.tensor(action)
        reward = torch.tensor(reward)
        next_state = torch.tensor(next_state)
        done = torch.tensor(done, dtype=torch.bool)
        
        self.optimizer.zero_grad()
        max_next_q = torch.max(self.forward(next_state), dim=1)[0] * (~done)
        target = reward + gamma * max_next_q
        q = self.forward(state).gather(1, action[:,None])[:,0]
        loss = self.criterion(q, target)
        loss.backward()
        self.optimizer.step()

In [None]:
class ExpReplayBuffer:
    def __init__(self, max_len=1024):
        self.max_len = max_len
        self.buffer = collections.deque(maxlen=max_len)
    
    def clear(self):
        self.buffer.clear()
        
    def is_full(self):
        return len(self.buffer) == self.max_len
    
    def append(self, sample):
        self.buffer.append(sample)
    
    def sample(self, n=256):
        zipped = random.sample(list(self.buffer), n)
        return list(zip(*zipped))

In [None]:
def sample_from_env(env: gym.Env, net: nn.Module, buffer: ExpReplayBuffer):
    net.eval()
    obs = 0
    env_running = False
    buffer.clear()
    while not buffer.is_full():
        if not env_running:
            obs, info = env.reset()
            env_running = True
        action = net.act(obs)
        obs_next, reward, terminated, truncated, info = env.step(action)
        env_running = not (terminated or truncated)
        buffer.append((obs, action, reward, obs_next, terminated))
        obs = obs_next
    env.close()

In [None]:
def train_model(net: nn.Module, buffer: ExpReplayBuffer, batch_size=256):
    samples = buffer.sample(batch_size)
    net.train()
    net.update(samples, gamma=1.0)

In [None]:
env = gym.make('FrozenLake-v1')
net = DQN()
buffer = ExpReplayBuffer(max_len=1024)
sample_from_env(env, net, buffer)
train_model(net, buffer, batch_size=256)