# DQN 算法


### 主要步骤
* Replay Buffer 经验回放
* Q-Network模型，使用神经网络来逼近Q函数
* 目标Q-Network模型
* 训练模型，利用经验回放和目标网络更新Q-Network的参数

In [None]:
# 安装必要的库
pip install gym numpy torch matplotlib

### 1 导入相关库并定义经验回放

In [None]:
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random 
from collections import deque # 双端队列
import matplotlib.pyplot as plt

# Replay buffer
'''
三个函数功能：初始化，存储样本，取出样本，返回样本数量

'''

class ReplayBuffer:

    def __init__(self, capacity):
        self.buffer = deque(maxlen = capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
    
    def sample(self, batch_size):
        state, action, reward, next_state, done = zip(*random.sample(self.buffer, batch_size))
        return state, action, reward, next_state, done

    def __len__(self):
        return len(self.buffer)



### 2 定义QNetwork网络

In [None]:
class QNetwork(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(QNetwork, self).__init__()
    self.fc1 = nn.Linear(state_dim, 128)
    self.fc2 = nn.Linear(128,128)
    self.fc3 = nn.Linear(128, action_dim)

def forward(self, x):
    x = torch.relu(self,fc1(x))
    x = torch.relu(self, fc2(x))
    x = self.fc3(x)
    return x

### 3 定义DQN智能体

In [None]:
class DQNAgent:
    '''
    函数说明：初始化，包括网络，目标网络，优化器，经验回放池，超参数等

    '''

    def __init__(self, state_dim, action_dim, device):
        self.device = device
        self.q_network = QNetwork(state_dim, action_dim).to(device)
        self.target_network = QNetwork(state_dim, action_dim).to(device)
        self.target_network.load_state_dict(self.q_network.state_dict()) # load_state_dict()用于加载模型的参数
        self.optimizer = optim.Adam(self.q_network.parameters())
        self.replay_buffer = ReplayBuffer(10000)
        self.batch_size = 64
        self.gamma = 0.99
        self.eposilon = 1.0
        self.eposilon_decay = 0.995
        self.eposilon_min = 0.01
        self.update_target_every = 1000
        

    def select_action(self, state):
        if np.random.rand() < self.eposilon:
            return random.randrange(2)
        else:
            state = torch.FloatTensor(state).unsequeeze(0).to(self.device)
            with torch.no_grad():
                return self.q_network(state).argmax(1).item()
            
    def train(self):
        if len(self.replay_buffer) < self.batch_size:
            return
        
        state, action, reward, next_state, done = self.replay_buffer.sample(self, batch_size)
        state = torch.FloatTensor(state).to(self.device)
        action = torch.LongTensor(action).unqueeze(1).to(self.device)
        reward = torch.FloatTensor(reward).unqueeze(1).to(self.device)
        next_state = torch.FloatTensor(next_state).to(self.device)
        done = torch.FloatTensor(done).unqueeze(1).to(self.device)

        q_values = self.q_network(state).gather(1, action)
        next_q_values = self.target_network(next_state).max(1)[0].unsqueeze(1)
        target_q_values = reward + self.gamma * next_q_values * (1 - done)

        loss = nn.MSELoss()(q_values, target_q_values)

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        if self.eposilon > self.eposilon_min:
            self.eposilon *= self.eposilon_decay

        self.step_counter += 1
        if self.step_counter % self.update_target_every == 0:
            self.target_network.load_state_dict(self.q_network.state_dict())

    

### 4 训练DQN智能体

In [None]:
def train_dqn(agent, env, num_episodes):
    episode_rewards = []

    for episode in range(num_episodes):
        state = env.reset()
        total_reward = 0

        while True:
            action = agent.select_action(state)
            next_state, reward, done = env.step(action)
            agent.replay_buffer.push(state, action, reward, next_state, done)
            agent.train()

            state = next_state
            total_reward += reward

            if done:
                break

        episode_rewards.append(total_reward)
        print(f"Episode {episode + 1}/{num_episodes}, Reward: {total_reward}")

    return episode_rewards

env = gym.make('CartPole-v1')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

agent = DQNAgent(state_dim, action_dim, device)
num_episodes = 200
episode_rewards = train_dqn(agent, env, num_episodes)

# 绘制出曲线
plt.plot(episode_rewards)
plt.xlable('Episodes')
plt.ylabel('Rewards')
plt.title('DQN on CartPole-v1')
plt.show()
