# **初赛的算法基础**

## **Q-learing**

#### **基本原理：**

Q-learing算法通过不断更新$Q$值学习最优策略，$Q$值表示在某一状态下，采取某一动作能够获得的预期累计的奖励。$Q$值用于衡量当前状态下采取某个动作，最终带来的回报。Q-learing更新公式：

$$Q(s_t, a_t) \leftarrow Q(s_t, a_t) + \alpha \left[ r_{t+1} + \gamma \max_{a'} Q(s_{t+1}, a') - Q(s_t, a_t) \right]$$

其中：
- $Q(s_t, a_t)$是当前状态$s_t$采取动作$a_t$的价值
- $\alpha$是学习率，控制$Q$值更新的步长
- $r_{t+1}$是执行动作后的即时奖励
- $s_{t+1}$是执行动作后的下一个状态
- $\gamma$是折扣因子，衡量当前奖励和未来奖励的重要性
- $max_{a'}Q(s_{t+1}, a')$表示在状态$s_{t+1}$下选择的最佳动作的$Q$值

传统的Q-learing中，$Q$表被用来存储每个状态-动作对的$Q$值，$Q$表是一个二维的表格，行表示状态，列表示动作。每个单元格存储了对应状态和动作的$Q$值。

#### **维护$Q$表：**

1. 在开始时，$Q$表中的所有$Q$值通常都被初始化为0。初始化时，agent并不知道任何状态-动作对的值。
2. 学习过程：agent与环境交互，通过采取动作并根据反馈更新$Q$表。每次代理选择一个动作并执行后，都会根据上述公式更新对应的$Q$值。
3. 贪婪策略：agent根据其当前的$Q$表选择动作，通常使用ε-greedy策略，即大部分时间选择当前$Q$值最大的动作，但有一定概率选择随机动作，以保证探索新状态。
4. Q值更新：每次代理执行一个动作并获得奖励后，$Q$值会根据贝尔曼方程进行更新，以反映新的估计。随着学习的进行，$Q$表逐渐收敛，$Q$值变得更精确

#### **具体示例：**

假设环境为一个$5\times 5$的格子，智能体起始位置在左上角$(0,0)$，目标位置是右下角$(4, 4)$，智能体可以选择四种动作之一：
- Up
- Down
- Left
- Right

每个动作都有一个奖励：
- 到达目标时，奖励为+10
- 没移动一步奖励-1
- 碰到墙壁或者超出边界，智能体停留在原地，奖励-1

#### **环境与$Q$表实现**

将环境状态设置为$(0,0)$到$(4,4)$的每个格子，用$(x, y)$来表示状态，并将其转换为对应的状态编号

In [1]:
import numpy as np
import random

class GridWorld:
    def __init__(self, grid_size=5):
        self.grid_size = grid_size
        self.state_space = grid_size * grid_size
        self.action_space = 4
        self.start_state = (0, 0)
        self.goal_state = (4, 4)
    
    def state_to_id(self, state):
        return state[0] * self.grid_size + state[1]
    
    # 0:Up, 1:Down, 2:Left, 3:Right
    def take_action(self, state, action):
        x, y = state
        if action == 0:
            x = max(x - 1, 0)
        elif action == 1:
            x = min(x + 1, self.grid_size - 1)
        elif action == 2:
            y = max(y - 1, 0)
        elif action == 3:
            y = min(y + 1, self.grid_size - 1)
        return (x, y)
    
    def get_reward(self, state):
        if state == self.goal_state:
            return 10
        else:
            return -1
    
    def reset(self):
        return self.start_state

#### **Q-learing算法实现**

1. **参数：**
- `epsilon`：探索率决定智能体是利用已有的知识还是进行随机探索，当`epsilon`较高时，智能体更多探索未知状态，`epsilon`较低时，智能体更多利用已有知识
- `q_table`：$Q$表是二维矩阵，用来存储每个状态-动作对应的$Q$值，`q_table[state_id, action]`存储的是从`state_id`状态下采取的`action`动作的预期回报

2. **动作选择：**

如果一个随机值小于当前的`epsilon`，智能体随机选择一个动作，否则智能体会选择当前状态下`Q`值最大的动作

3. **$Q$值更新**

- 当前$Q$值：`self.q_table[current_state_id, action]`
- 智能体从当前状态到一下状态的预期回报：`target = reward + self.gamma * self.q_table[next_state_id, best_next_action]`，其中`self.q_table[next_state_id, best_next_action]`是下一状态的最大$Q$值
- 当前$Q$值与目标$Q$值之间的差距：`error = target - self.q_table[current_state_id, action]`
- $Q$值更新：self.q_table[current_state_id, action] += self.alpha * error

4. **epsilon衰减**

为了逐渐减少探索行为并增加利用行为，`epsilon`会随着训练的进行逐渐减少，`epsilon_decay`控制衰减速率，`min_epsilon`确保探索率不会降的过低

In [2]:
class Agent:
    def __init__(self, env, alpha=0.1, gamma=0.9, epsilon=1.0, epsilon_dacay=0.995, min_epsilon=0.01):
        self.env = env
        self.alpha = alpha
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_dacay
        self.min_epsilon = min_epsilon
        self.q_table = np.zeros((env.state_space, env.action_space))
        self.state = env.reset()

    def choose_action(self):
        if random.uniform(0,1) < self.epsilon:
            return random.randint(0, self.env.action_space - 1)
        else:
            return np.argmax(self.q_table[self.env.state_to_id(self.state)])
    
    def update_q_value(self, next_state, action, reward):
        current_state_id = self.env.state_to_id(self.state)
        next_state_id = self.env.state_to_id(next_state)

        best_next_action = np.argmax(self.q_table[next_state_id])
        target = reward + self.gamma * self.q_table[next_state_id, best_next_action]
        error = target - self.q_table[current_state_id, action] 
        self.q_table[current_state_id, action] += self.alpha * error
    
    def update_epsilon(self):
        if self.epsilon > self.min_epsilon:
            self.epsilon *= self.epsilon_decay
    
    def train(self, episodes=1000):
        for episode in range(episodes):
            self.state = self.env.reset()
            total_reward = 0
            done = False

            while not done:
                action = self.choose_action()
                next_state = self.env.take_action(self.state, action)
                reward = self.env.get_reward(next_state)

                self.update_q_value(next_state, action, reward)
                self.state = next_state
                total_reward += reward

                if next_state == self.env.goal_state:
                    done = True
            
            self.update_epsilon()
            print(f"Episode {episode + 1}/{episodes}, Total Reward: {total_reward}, Epsilon: {self.epsilon:.3f}")
        
        print(self.q_table)


env = GridWorld()
agent = Agent(env)

agent.train(episodes=1000)



Episode 1/1000, Total Reward: -293, Epsilon: 0.995
Episode 2/1000, Total Reward: -57, Epsilon: 0.990
Episode 3/1000, Total Reward: -27, Epsilon: 0.985
Episode 4/1000, Total Reward: -34, Epsilon: 0.980
Episode 5/1000, Total Reward: -173, Epsilon: 0.975
Episode 6/1000, Total Reward: -121, Epsilon: 0.970
Episode 7/1000, Total Reward: -8, Epsilon: 0.966
Episode 8/1000, Total Reward: -65, Epsilon: 0.961
Episode 9/1000, Total Reward: -32, Epsilon: 0.956
Episode 10/1000, Total Reward: -65, Epsilon: 0.951
Episode 11/1000, Total Reward: -98, Epsilon: 0.946
Episode 12/1000, Total Reward: -88, Epsilon: 0.942
Episode 13/1000, Total Reward: -12, Epsilon: 0.937
Episode 14/1000, Total Reward: -97, Epsilon: 0.932
Episode 15/1000, Total Reward: -12, Epsilon: 0.928
Episode 16/1000, Total Reward: -29, Epsilon: 0.923
Episode 17/1000, Total Reward: -147, Epsilon: 0.918
Episode 18/1000, Total Reward: -9, Epsilon: 0.914
Episode 19/1000, Total Reward: -28, Epsilon: 0.909
Episode 20/1000, Total Reward: -47, Ep

## **DQN算法**

#### **基本内容**

DQN是结合Q-learing和深度学习神经网络的强化学习算法，通过神经网络来近似$Q$函数。神经网络接受状态$s_t$作为输入，输出每个可能动作$a_t$的$Q$值。同时使用了下面几种改进技术：

1. 经验回放
- 在Q-learing中，智能体根据当前状态和动作更新$Q$值，每次更新依赖于当前的状态，导致了样本之间的相关性
- 经验回放通过存储智能体的经历(state, action, reward, next_state)，智能体可以随即地从中采样

2. 目标网络
- Q-learing中，$Q$值更新依赖于最大化下一个状态的$Q$值，$max_{a'}Q(s_{t+1}, a')$
- DQN引入了目标网络，是当前$Q$网络的一个副本，目标网络的参数更新慢

#### **算法流程**

1. 初始化
    - 初始化$Q$网络和目标网络，$Q$网络用于估计$Q$值，目标网络用于计算目标$Q$值
    - 初始化经验回放缓冲区

2. 探索和训练
    - 在每个时间步，选择一个动作，使用贪心算法

    - 执行动作，获取下一个状态和奖励

    - 存储经历到经验回放缓冲区

    - 从经验回放缓冲区中随机采样一小批经历

    - 使用神经网络和目标网络来计算目标$Q$值，更新$Q$网络的权重

3. 更新目标网络
    - 每隔一定步数，将$Q$网络的权重复制到目标网络中
    - 进行重复训练迭代

#### **核心原理**

假设用$Q(s,a;\theta)$表示$Q$网络，其中$\theta$是$Q$网络的参数，希望最小化损失函数：

$$L(\theta) = \mathbb{E}_{(s_t, a_t, r_t, s_{t+1}) \sim \mathcal{D}} \left[ \left( r_{t+1} + \gamma \max_{a'} Q(s_{t+1}, a'; \theta^-) - Q(s_t, a_t; \theta) \right)^2 \right]
$$

其中：
- $Q(s_t, a_t; \theta)$是当前$Q$网络的$Q$值
- $Q(s_{t+1}, a'; \theta^-)$是目标网络的$Q$值
- $D$是经验回放池

训练过程中，神经网络通过优化损失函数$L(\theta)$来更新网络的参数$\theta$，从而使得$Q$网络的输出尽可能接近期望的目标$Q$值。损失函数的最小化使用反向传播算法完成，计算损失函数的梯度并利用梯度下降更新参数，同时损失函数中利用了$\mathbb{E}$期望，表示计算目标时要从经验回放池中随机采样一批经历

#### **DQN示例**


针对环境**CartPole-v1**实现一个DQN算法，利用深度$Q$网络学习最优策略。**CartPole-v1**任务为agent控制小车的左右运动，使杆子竖直不倒下。每一个回合`max_step=500`，杆子倒下或者超过最大步数，回合结束。agent在每一个回合内的目标是保持杆子尽可能长的时间

1. 状态空间：
    - x：推车的位置
    - x_dot：推车的速度
    - theta：杆子的角度
    - theta_dot：杆子的角速度
2. 动作空间：
    - 0：推车向左移动
    - 1：推车向右移动
3. 奖励：
    - agent每一步成功使杆子不倒下，reward+1
    - 当杆子倒下或推车移出界限时，回合结束，奖励为$0$，环境进入结束状态



In [None]:
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
from collections import deque


class QNetwork(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(QNetwork, self).__init__()
        self.fc1 = nn.Linear(state_dim, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, action_dim)

    def forward(self, state):
        x = torch.relu(self.fc1(state))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

# env = gym.make('CartPole-v1')

class DQNAgent:
    def __init__(self, env, gamma=0.99, epsilon=1.0, epsilon_decay=0.995, min_epsilon=0.01, alpha=0.001, batch_size=64, memory_size=10000):
        self.env = env
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.min_epsilon = min_epsilon
        self.alpha = alpha
        self.batch_size = batch_size
        self.memory = deque(maxlen=memory_size)

        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.device = device

        # 当前Q网络与目标Q网络
        self.q_network = QNetwork(env.observation_space.shape[0], env.action_space.n)
        self.target_network = QNetwork(env.observation_space.shape[0], env.action_space.n)
        self.optimizer = optim.Adam(self.q_network.parameters(), lr=alpha)
        self.update_target_network()
    
    def choose_action(self, state):
        if random.random() < self.epsilon:
            return self.env.action_space.sample()
        
        state = np.array(state)
        state = torch.tensor(state, dtype=torch.float32).to(self.device)
        
        q_values = self.q_network(state)
        return torch.argmax(q_values).item()
    
    def store_experience(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))
    
    def update_target_network(self):
        self.target_network.load_state_dict(self.q_network.state_dict())
    
    def sample_batch(self):
        return random.sample(self.memory, self.batch_size)
    
    def train(self):
        if len(self.memory) < self.batch_size:
            return
        
        batch = self.sample_batch()
        states, actions, rewards, next_states, dones = zip(*batch)

        states = np.array(states)
        next_states = np.array(next_states)

        states = torch.tensor(states, dtype=torch.float32).to(self.device)
        actions = torch.tensor(actions, dtype=torch.long).to(self.device)
        rewards = torch.tensor(rewards, dtype=torch.float32).to(self.device)
        next_states = torch.tensor(next_states, dtype=torch.float32).to(self.device)
        dones = torch.tensor(dones, dtype=torch.float32).to(self.device)

        # 计算当前Q网络输出的Q值
        q_values = self.q_network(states).gather(1, actions.view(-1, 1)).squeeze(1)

        # 计算目标Q网络的输出Q值
        next_q_values = self.target_network(next_states).max(1)[0]
        target_q_values = rewards + self.gamma * next_q_values * (1 - dones)

        # Loss
        loss = nn.MSELoss()(q_values, target_q_values)

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        if self.epsilon > self.min_epsilon:
            self.epsilon *= self.epsilon_decay
        
def train_dqn(episodes=1000):
    env = gym.make('CartPole-v1')
    agent = DQNAgent(env)

    for episode in range(episodes):
        state, info = env.reset()
        done = False
        total_reward = 0

        while not done:
            action = agent.choose_action(state)
            next_state, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated

            agent.store_experience(state, action, reward, next_state, done)
            agent.train()
            state = next_state
            total_reward += reward
        
        if episode % 10 == 0:
            agent.update_target_network()
            print(f"Episode {episode + 1}/{episodes}, Total Reward: {total_reward}, Epsilon: {agent.epsilon:.3f}")
        
train_dqn(1000)



  if not isinstance(terminated, (bool, np.bool8)):
  states = torch.tensor(states, dtype=torch.float32)


Episode 1/1000, Total Reward: 36.0, Epsilon: 1.000
Episode 11/1000, Total Reward: 11.0, Epsilon: 0.545
Episode 21/1000, Total Reward: 9.0, Epsilon: 0.294
Episode 31/1000, Total Reward: 8.0, Epsilon: 0.171
Episode 41/1000, Total Reward: 34.0, Epsilon: 0.075
Episode 51/1000, Total Reward: 46.0, Epsilon: 0.015
Episode 61/1000, Total Reward: 167.0, Epsilon: 0.010
Episode 71/1000, Total Reward: 389.0, Epsilon: 0.010
Episode 81/1000, Total Reward: 179.0, Epsilon: 0.010
Episode 91/1000, Total Reward: 297.0, Epsilon: 0.010
Episode 101/1000, Total Reward: 442.0, Epsilon: 0.010
Episode 111/1000, Total Reward: 151.0, Epsilon: 0.010
Episode 121/1000, Total Reward: 154.0, Epsilon: 0.010
Episode 131/1000, Total Reward: 232.0, Epsilon: 0.010
Episode 141/1000, Total Reward: 250.0, Epsilon: 0.010
Episode 151/1000, Total Reward: 159.0, Epsilon: 0.010
Episode 161/1000, Total Reward: 165.0, Epsilon: 0.010
Episode 171/1000, Total Reward: 144.0, Epsilon: 0.010
Episode 181/1000, Total Reward: 128.0, Epsilon: