In [37]:
import numpy as np
from collections import defaultdict
import torch
from torch import nn
import torch.nn.functional as F
import gym
from dataclasses import dataclass, field

- references
    - https://github.com/DeepRLChinese/DeepRL-Chinese/blob/master/04_dqn.py

## basics

$$
\begin{split}
U_t&=R_t+\gamma R_{t+1}+\gamma^2 R_{t+2} + \cdots + \gamma^{n-t} R_n\\
&=\sum_{k=t}^n\gamma^{k-t}R_k{}\\
Q_\pi(s_t,a_t)&=\mathbb E\left[U_t|S_t=s_t,A_t=a_t\right]\\
&=\mathbb E_{S_{t+1},A_{t+1},\cdots, S_n,A_n}\left[U_t|S_t=s_t,A_t=a_t\right]\\
Q_\star(s_t,a_t)&=\max_\pi Q_\pi(s_t,a_t)
\end{split}
$$

- $U_t$: discounted return,
    - return: cumulative future reward
- $Q_\pi(s_t,a_t)$：action value function，动作价值函数
    - 其计算式中的期望消除了 $t$ 时刻之后的所有状态 $S_{t+1},\cdots, S_n$ 与所有动作 $A_{t+1}, \cdots, A_n$ 的随机性（也是 $U_t$ 随机性的来源）
        - 未来时刻的 $S_{t+1}, A_{t+1}$ 会带来 $R_{t+1}$
    - 对谁求期望就是消除对谁的随机性；
- $Q_\star(s_t,a_t)$：最优动作价值函数；

## DQN

$$
Q(s,a;w)
$$

- $w$ 表示 NN 的模型参数；
- DQN 的输入是 $s$，输出是离散动作空间 $\mathcal A$ 中的每个动作的 Q 值，是一个 scalar value；
- train DQN 的目标即是，对所有的 $s,a$ pair， DQN 对 $Q(s,a;w)$ 的预测尽量接近 $Q_\star(s,a)$
- 训练 DQN 最常用的算法是 TD（temporal difference，时间差分）

In [2]:
class QNet(nn.Module):
    def __init__(self, dim_state, num_action):
        super().__init__()
        self.fc1 = nn.Linear(dim_state, 64)
        self.fc2 = nn.Linear(64, 32)
        self.fc3 = nn.Linear(32, num_action)
    def forward(self, state):
        x = F.relu(self.fc1(state))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

## TD

- TD target 与 TD error

$$
\begin{split}
U_t&=R_t+\gamma R_{t+1}+\gamma^2 R_{t+2} + \cdots + \gamma^{n-t} R_n\\
&=\sum_{k=t}^n\gamma^{k-t}R_k\\
&=R_{t}+\sum_{k=t+1}^n\gamma^{k-t}R_{k}\\
&=R_{t}+\gamma\sum_{k=t+1}^n\gamma^{k-(t+1)}R_{k}\\
&=R_{t}+\gamma U_{t+1}\\
Q_\star(s_t,a_t)&=\max_\pi\mathbb E\left[U_t|S_t=s_t,A_t=a_t\right]\\
\end{split}
$$

- $U_t$ 的递归定义与计算
- 基于上述的两个等式可以推导出著名的最优贝尔曼方程（optimal Bellman equation）

### optimal Bellman equation

$$
Q_\star(s_t,a_t)=\mathbb E_{S_{t+1}\sim p(\cdot|s_t,a_t)}\left[R_t+\gamma\cdot\max_{\pi}Q_\star(S_{t+1}, A)\bigg|S_t=s_t,A_t=a_t\right]
$$

- 也是一种 recursive 的计算方式
- 右侧是一个期望，期望可以通过蒙特卡洛方法近似，当 Agent 在状态 $s_t$ 执行动作 $a_t$ 之后，环境通过状态转移函数 $p(s_{t+1}|s_t,a_t)$（mdp）计算出下一时刻的状态 $s_{t+1}$，因此当我们观测到 $s_t,a_t,s_{t+1}$ 时，奖励 $R_t$ 也会被观测到，记作 $r_t$，于是有了如下的四元组

$$
(s_t,a_t,r_t,s_{t+1})
$$

- 基于蒙特卡洛估计，可以算出等式右边期望的近似

$$
Q_\star(s_t,a_t) \approx r_t+\gamma\cdot \max_{a\in\mathcal A}Q_\star(s_{t+1},a)
$$

### 基于 TD train DQN

- 将最优动作价值函数 $Q_\star(s_t,a_t)$ 替换为神经网络 $Q(s,a;w)$

$$
\underbrace{Q(s_t,a_t;w)}_{预测 \hat{q}_t}\approx \underbrace{r_t + \gamma\cdot \max_{a\in\mathcal A}Q(s_{t+1},a;w)}_{\text{TD target}, \hat y_t}
$$

- 左边的 $Q(s_t,a_t;w)$ 是神经网络在 $t$ 时刻做出的预测：$\hat q_t$
- 右边的 TD target 则是神经网络在 $t+1$ 时刻做出的预测：$\hat y_t$，且基于了真实观测到的奖励 $r_t$（多了一部分事实）
- 对于右式 $\hat y_t$ 比着左式 $\hat q_t$ 多了部分事实，因此更可信，**在监督学习的范式下**，可以作为训练的 groud truth（将其视为常数），定义如下的损失


$$
\begin{split}
L(w)&=\frac12\left[Q(s_t,a_t;w)-\hat y_t\right]^2=\frac12\left[\hat q_t-\hat y_t\right]^2\\
\nabla_wL(w)&=\underbrace{(\hat q_t-\hat y_t)}_{\text{TD error}}\cdot \nabla_wQ(s_t,a_t;w)=\delta_t\cdot \nabla_wQ(s_t,a_t;w)
\end{split}
$$

- 梯度下降优化让 $\hat q_t$ 更接近 $\hat y_t$

$$
w\leftarrow w-\alpha\cdot\delta_t\nabla_wQ(s_t,a_t;w)
$$

### training process

- 给定一个四元组 $(s_t,a_t,r_t,s_{t+1})$ 一个 DQN 网络，我们可以得到

$$
\begin{split}
\hat q_t&=Q(s_t,a_t;w)\\
\hat y_t&=r_t+\max_{a\in\mathcal A}Q(s_{t+1},a;w)\\
\delta_t&=\hat q_t-\hat y_t
\end{split}
$$

- 梯度下降优化让 $\hat q_t$ 更接近 $\hat y_t$

$$
w\leftarrow w-\alpha\cdot\delta_t\nabla_wQ(s_t,a_t;w)
$$


- 训练 DQN 所需的四元组数据 $(s_t,a_t,r_t,s_{t+1})$ 与控制 agent 运动的策略无关，意味着可以用任何策略控制智能体与环境交互，同时记录下算法运动轨迹（trajectory），作为 DQN 的训练数据；
- 因此 DQN 的训练可以分为两个独立的部分

#### 收集训练数据    

- 可以用任意策略 $\pi$ 来控制智能体与环境交互，此时的 $\pi$ 可以称为行为策略，一种经典的行为策略是 $\epsilon$-greedy 策略
        
$$
a_t=\begin{cases}
\arg\max_a Q(s_t,a;w), & r < 1-\epsilon (以概率 (1-\epsilon), )\\
均匀采样 a\in \mathcal A, & r < \epsilon  (以概率 \epsilon)
\end{cases}
$$

- 将 Agent 在一个 episode （回合） 中的轨迹记作

$$
(s_1,a_1,r_1), (s_2,a_2,r_2), (s_3,a_3,r_3), \cdots, (s_n,a_n,r_n)
$$

- 进一步将其划分为 $(s_t,a_t,r_t,s_{t+1})$ 这样的四元组，存入缓存，这个缓存就叫经验回放缓存（Experience Replay Buffer）

In [5]:
# 循环队列
# 两个核心函数：
# 1. push 队列中循环添加
# 2. sample：队列中采样

@dataclass
class ReplayBuffer:
    maxszie: int
    size: int = 0
        
    # s_t
    states: list = field(default_factory=list)
    # a_t
    actions: list = field(default_factory=list)
    # r_t
    rewards: list = field(default_factory=list)
    # s_{t+1}
    next_states: list = field(default_factory=list)
    
    dones: list = field(default_factory=list)
    
    def push(self, state, action, reward, done, next_state):
        if self.size < self.maxsize:
            # state.shape: 
            self.states.append(state)
            # action.shape: 
            self.actions.append(action)
            # reward.shape: 
            self.rewards.append(reward)
            # done.shape: 
            self.dones.append(done)
            # next_state.shape: 
            self.next_states.append(next_state)
        else:
            # overlap
            index = self.size % self.maxsize
            self.states[index] = state
            self.actions[index] = action
            self.rewards[index] = reward
            self.dones[index] = done
            self.next_states[index] = next_state
        self.size += 1
    
    def sample(self, n):
        total_number = min(self.size, self.maxsize)
        indices = np.random.randint(total_number, size=n)
        sample_states = [self.states[ind] for ind in indices]
        sample_actions = [self.actions[ind] for ind in indices]
        sample_rewards = [self.rewards[ind] for ind in indices]
        sample_dones = [self.dones[ind] for ind in indices]
        sample_next_states = [self.next_states[ind] for ind in indices]
        return sample_states, sample_actions, sample_rewards, sample_dones, sample_next_states

####  `forward/backward` 监督训练过程

> 从 Experience Replay Buffer 中随机选出一个四元组，记作 $(s_j,a_j,r_j,s_{j+1})$，假定 DQN 当前的参数为 $w_{now}$，经过如下的 forward/backward 步骤对参数更新，得到 $w_{new}$


1. forward：分别输入 $s_j$ 和 $s_{j+1}$（两个都是标量值）

$$
\begin{split}
\hat q_j&=Q(s_j,a_j;w_{now})\\
\hat q_{j+1}&=\max_{a\in\mathcal A}Q(s_{j+1},a;w_{now})
\end{split}
$$

2. 计算 TD target 与 TD error，以及定义 loss

$$
\begin{split}
\hat y_j&=r_j+\gamma \hat q_{j+1}, \\
\delta_j&=\hat q_j-\hat y_j\\
L(w)&=\frac12 \delta_j^2=\frac12\left(\hat q_j-\hat y_j\right)^2
\end{split}
$$

3. loss 反向传播，计算梯度（`loss.backward()` 自动计算）

$$
g_j=\nabla_wQ(s_j,a_j;w_{now})
$$

4. `optimizer.step()` 基于梯度下降更新参数

$$
w_{new}\Leftarrow w_{old}-\alpha\cdot\delta_j\cdot g_j
$$


In [3]:
x = torch.randn(5)
print(x)
x.argmax()

tensor([ 0.7624,  0.6647,  0.7374, -0.3199,  1.8371])

In [8]:
QNet(4, 2)(torch.randn(5, 4))

tensor([[0.1709, 0.0343],
        [0.1366, 0.0705],
        [0.2027, 0.0694],
        [0.1014, 0.2124],
        [0.2879, 0.0034]], grad_fn=<AddmmBackward0>)

In [10]:
# 实例化之后，可以视为 agent
class DQN:
    def __init__(self, dim_state, num_action, gamma=0.9):
        self.gamma = gamma
        # 1. 稳定学习过程，主要靠 self.target_Q 较慢的更新
        # 2. 减少自相关性；
        # self.Q：当前值，也是要更新的主网络；
        self.Q = QNet(dim_state, num_action)
        # self.target_Q: 未来值
        self.target_Q = QNet(dim_state, num_action)
        self.target_Q.load_state_dict(self.Q.state_dict())
        
    def get_action(self, state):
        q_vals = self.Q(state)
        return q_vals.argmax()
    
    def comp_loss(self, batch_s, batch_a, batch_r, batch_d, batch_next_s):
        # https://www.bilibili.com/video/BV1YY4y1Q72M/
        q_vals = self.Q(batch_s).gather(1, batch_a.unsqueeze(1)).squeeze()
        # Returns a new Tensor, detached from the current graph.
        # The result will never require gradient.
        next_q_vals = self.target_Q(batch_next_s).detach().max(dim=1)
        loss = F.mse_loss(batch_r + self.gamma * next_q_vals * (1-batch_d), q_vals)
        return loss

## train & eval

In [35]:
def train(env, agent: DQN, lr=1e-3, max_steps=100_1000):
    replay_buffer = ReplayBuffer(10_1000)
    
    optimizer = torch.optim.Adam(agent.Q.parameters(), lr=lr)
    optimizer.zero_grad()
    
    epsilon = 1
    
    agent.Q.train()
    state = env.reset()
    for step in range(max_steps):
        if 

SyntaxError: incomplete input (384133942.py, line 10)

In [13]:
env_name = 'CartPole-v1'
env = gym.make(env_name)

In [23]:
env.reset()
print(env.observation_space)
print(env.action_space)

Box(4,)
Discrete(2)


In [24]:
dim_state = env.observation_space.shape[0]
num_action = env.action_space.n

In [25]:
gamma = 0.99
lr = 1e-3

In [27]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
agent = DQN(dim_state, num_action, gamma)
agent.Q.to(device)
agent.target_Q.to(device)

QNet(
  (fc1): Linear(in_features=4, out_features=64, bias=True)
  (fc2): Linear(in_features=64, out_features=32, bias=True)
  (fc3): Linear(in_features=32, out_features=2, bias=True)
)