In [1]:
import gym
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from collections import deque
import random
import pygame
from pygame.locals import VIDEORESIZE

  "Gym minimally supports python 3.6 as the python foundation not longer supports the version, please update your version to 3.7+"


In [2]:
# 定义深度Q网络
class DQN(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, action_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

In [3]:
# 经验回放缓冲区
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)
    
    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
    
    def sample(self, batch_size):
        state, action, reward, next_state, done = zip(*random.sample(self.buffer, batch_size))
        return np.array(state), action, reward, np.array(next_state), done
    
    def __len__(self):
        return len(self.buffer)

In [4]:
# 超参数
gamma = 0.99
batch_size = 64
lr = 1e-3
num_episodes = 1500
capacity = 10000

In [5]:
# 环境和网络初始化
env = gym.make('MountainCar-v0')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
dqn = DQN(state_dim, action_dim)
target_dqn = DQN(state_dim, action_dim)
target_dqn.load_state_dict(dqn.state_dict())
optimizer = optim.Adam(dqn.parameters(), lr=lr)
replay_buffer = ReplayBuffer(capacity)

  "Initializing wrapper in old step API which returns one bool instead of two. It is recommended to set `new_step_api=True` to use new step API. This will be the default behaviour in future."
  "Initializing environment in old step API which returns one bool instead of two. It is recommended to set `new_step_api=True` to use new step API. This will be the default behaviour in future."


In [6]:
# 训练过程
def train():
    if len(replay_buffer) < batch_size:
        return
    
    state, action, reward, next_state, done = replay_buffer.sample(batch_size)
    
    state = torch.FloatTensor(state)
    next_state = torch.FloatTensor(next_state)
    action = torch.LongTensor(action)
    reward = torch.FloatTensor(reward)
    done = torch.FloatTensor(done)
    
    q_values = dqn(state)
    next_q_values = dqn(next_state)
    next_q_state_values = target_dqn(next_state)
    
    q_value = q_values.gather(1, action.unsqueeze(1)).squeeze(1)
    next_q_value = next_q_state_values.gather(1, next_q_values.max(1)[1].unsqueeze(1)).squeeze(1)
    expected_q_value = reward + gamma * next_q_value * (1 - done)
    
    loss = (q_value - expected_q_value.detach()).pow(2).mean()
    
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

In [7]:
epsilon_start = 1.0
epsilon_end = 0.01
epsilon_decay = 500

In [8]:
def epsilon_by_frame(frame_idx):
    return epsilon_end + (epsilon_start - epsilon_end) * np.exp(-1. * frame_idx / epsilon_decay)

In [9]:
# Pygame初始化
pygame.init()
screen = pygame.display.set_mode((600, 400))
pygame.display.set_caption("Mountain Car")

In [10]:
def render(env, screen):
    screen.fill((255, 255, 255))
    img = env.render(mode='rgb_array')
    img = pygame.surfarray.make_surface(np.transpose(img, axes=(1, 0, 2)))
    screen.blit(img, (0, 0))
    pygame.display.flip()

In [11]:
# 训练主循环
all_rewards = []
episode_reward = 0
frame_idx = 0

for episode in range(num_episodes):
    state = env.reset()
    episode_reward = 0
    
    while True:
        frame_idx += 1
        epsilon = epsilon_by_frame(frame_idx)
        
        if random.random() > epsilon:
            state_tensor = torch.FloatTensor(state).unsqueeze(0)
            q_value = dqn(state_tensor)
            action = q_value.max(1)[1].item()
        else:
            action = env.action_space.sample()
        
        next_state, reward, done, _ = env.step(action)
        replay_buffer.push(state, action, reward, next_state, done)
        state = next_state
        episode_reward += reward
        
        train()
        
        if frame_idx % 100 == 0:
            target_dqn.load_state_dict(dqn.state_dict())
        
        render(env, screen)
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                pygame.quit()
                quit()
        
        if done:
            break
    
    all_rewards.append(episode_reward)
    if episode % 10 == 0:
        print('Episode: {}, Reward: {}'.format(episode, episode_reward))

See here for more information: https://www.gymlibrary.ml/content/api/[0m
  "The argument mode in render method is deprecated; "


Episode: 0, Reward: -200.0
Episode: 10, Reward: -200.0
Episode: 20, Reward: -200.0
Episode: 30, Reward: -200.0
Episode: 40, Reward: -200.0
Episode: 50, Reward: -200.0
Episode: 60, Reward: -200.0
Episode: 70, Reward: -200.0
Episode: 80, Reward: -200.0
Episode: 90, Reward: -200.0
Episode: 100, Reward: -200.0
Episode: 110, Reward: -200.0
Episode: 120, Reward: -200.0
Episode: 130, Reward: -200.0
Episode: 140, Reward: -200.0
Episode: 150, Reward: -163.0
Episode: 160, Reward: -96.0
Episode: 170, Reward: -113.0
Episode: 180, Reward: -153.0
Episode: 190, Reward: -200.0
Episode: 200, Reward: -114.0
Episode: 210, Reward: -152.0
Episode: 220, Reward: -152.0
Episode: 230, Reward: -86.0
Episode: 240, Reward: -116.0
Episode: 250, Reward: -112.0
Episode: 260, Reward: -128.0
Episode: 270, Reward: -116.0
Episode: 280, Reward: -107.0
Episode: 290, Reward: -111.0
Episode: 300, Reward: -86.0
Episode: 310, Reward: -112.0
Episode: 320, Reward: -114.0
Episode: 330, Reward: -104.0
Episode: 340, Reward: -107.0

In [12]:
torch.save(dqn.state_dict(), 'dqn_model.pth')

In [13]:
# 加载模型权重
model_path = 'dqn_model.pth'
dqn = DQN(state_dim=2, action_dim=3)  # MountainCar-v0的状态空间维度为2，动作空间维度为3
dqn.load_state_dict(torch.load(model_path))

<All keys matched successfully>

In [14]:
# 测试模型
def test_model(env, model, episodes=10):
    for episode in range(episodes):
        state = env.reset()
        episode_reward = 0
        
        while True:
            state_tensor = torch.FloatTensor(state).unsqueeze(0)
            q_value = model(state_tensor)
            action = q_value.max(1)[1].item()
            
            next_state, reward, done, _ = env.step(action)
            state = next_state
            episode_reward += reward
            
            render(env, screen)
            for event in pygame.event.get():
                if event.type == pygame.QUIT:
                    pygame.quit()
                    quit()
            
            if done:
                print('Episode: {}, Reward: {}'.format(episode, episode_reward))
                break


In [15]:
# Pygame初始化
pygame.init()
screen = pygame.display.set_mode((600, 400))
pygame.display.set_caption("Mountain Car")

# 环境初始化
env = gym.make('MountainCar-v0')

# 测试模型
test_model(env, dqn, episodes=10)

env.close()
pygame.quit()

  "Initializing wrapper in old step API which returns one bool instead of two. It is recommended to set `new_step_api=True` to use new step API. This will be the default behaviour in future."
  "Initializing environment in old step API which returns one bool instead of two. It is recommended to set `new_step_api=True` to use new step API. This will be the default behaviour in future."
See here for more information: https://www.gymlibrary.ml/content/api/[0m
  "The argument mode in render method is deprecated; "


Episode: 0, Reward: -103.0
Episode: 1, Reward: -104.0
Episode: 2, Reward: -105.0
Episode: 3, Reward: -103.0
Episode: 4, Reward: -103.0
Episode: 5, Reward: -85.0
Episode: 6, Reward: -97.0
Episode: 7, Reward: -103.0
Episode: 8, Reward: -104.0
Episode: 9, Reward: -95.0
