# Q-Learning 高级技巧与实战

## 学习目标

通过本教程，你将掌握：
- Q-Learning 过估计问题与 Double Q-Learning
- 学习率调度与自适应更新
- Gymnasium 环境实战训练
- 模型保存、加载与评估

---

## 第一部分：Q-Learning 过估计问题

### 1.1 过估计现象

Q-Learning 更新使用 $\max$：

$$Q(S,A) \leftarrow Q(S,A) + \alpha[R + \gamma \max_a Q(S',a) - Q(S,A)]$$

**问题**: $\max$ 操作会系统性地高估 Q 值

**原因**: 假设所有 Q 值估计都有噪声 $\hat{Q} = Q^* + \epsilon$，则：

$$\mathbb{E}[\max_a \hat{Q}(s,a)] \geq \max_a \mathbb{E}[\hat{Q}(s,a)] = \max_a Q^*(s,a)$$

### 1.2 Double Q-Learning 解决方案

**核心思想**: 解耦动作选择和价值评估

维护两个 Q 表 $Q_1$ 和 $Q_2$：

$$Q_1(S,A) \leftarrow Q_1 + \alpha[R + \gamma Q_2(S', \arg\max_a Q_1(S',a)) - Q_1]$$

$$Q_2(S,A) \leftarrow Q_2 + \alpha[R + \gamma Q_1(S', \arg\max_a Q_2(S',a)) - Q_2]$$

---

## 第二部分：代码实现

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
import json
from pathlib import Path

np.random.seed(42)
plt.rcParams['figure.figsize'] = (12, 5)

print("库导入完成")

In [None]:
# 检查 Gymnasium
try:
    import gymnasium as gym
    HAS_GYM = True
    print(f"Gymnasium 版本: {gym.__version__}")
except ImportError:
    HAS_GYM = False
    print("请安装 gymnasium: pip install gymnasium")

In [None]:
@dataclass
class TrainingMetrics:
    """训练指标记录"""
    episode_rewards: List[float] = field(default_factory=list)
    episode_lengths: List[int] = field(default_factory=list)
    epsilon_history: List[float] = field(default_factory=list)
    
    def get_moving_average(self, window: int = 100) -> np.ndarray:
        if len(self.episode_rewards) < window:
            return np.array(self.episode_rewards)
        return np.convolve(self.episode_rewards, np.ones(window)/window, mode='valid')

### 2.1 Double Q-Learning 实现

In [None]:
class DoubleQLearningAgent:
    """
    Double Q-Learning 智能体
    
    通过维护两个独立 Q 表，解耦动作选择与价值评估，
    消除标准 Q-Learning 的过估计偏差。
    """
    
    def __init__(self, n_actions, learning_rate=0.1, discount_factor=0.99,
                 epsilon=1.0, epsilon_decay=0.995, epsilon_min=0.01):
        self.n_actions = n_actions
        self.lr = learning_rate
        self.gamma = discount_factor
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min
        
        # 两个独立的 Q 表
        self.q_table1 = defaultdict(lambda: np.zeros(n_actions))
        self.q_table2 = defaultdict(lambda: np.zeros(n_actions))

In [None]:
    def get_action(self, state, training=True):
        """使用两个 Q 表的和选择动作"""
        if training and np.random.random() < self.epsilon:
            return np.random.randint(self.n_actions)
        
        combined_q = self.q_table1[state] + self.q_table2[state]
        return np.random.choice(np.where(np.isclose(combined_q, np.max(combined_q)))[0])

In [None]:
    def update(self, state, action, reward, next_state, done):
        """
        Double Q-Learning 更新
        
        随机选择更新 Q1 或 Q2，解耦选择和评估
        """
        if np.random.random() < 0.5:
            # 更新 Q1：用 Q1 选择，Q2 评估
            current_q = self.q_table1[state][action]
            if done:
                target = reward
            else:
                best_action = np.argmax(self.q_table1[next_state])
                target = reward + self.gamma * self.q_table2[next_state][best_action]
            self.q_table1[state][action] += self.lr * (target - current_q)
        else:
            # 更新 Q2：用 Q2 选择，Q1 评估
            current_q = self.q_table2[state][action]
            if done:
                target = reward
            else:
                best_action = np.argmax(self.q_table2[next_state])
                target = reward + self.gamma * self.q_table1[next_state][best_action]
            self.q_table2[state][action] += self.lr * (target - current_q)
        
        return target - current_q

In [None]:
    def decay_epsilon(self):
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

# 组装类
DoubleQLearningAgent.get_action = get_action
DoubleQLearningAgent.update = update
DoubleQLearningAgent.decay_epsilon = decay_epsilon

### 2.2 标准 Q-Learning (对比)

In [None]:
class QLearningAgent:
    """标准 Q-Learning"""
    
    def __init__(self, n_actions, learning_rate=0.1, discount_factor=0.99,
                 epsilon=1.0, epsilon_decay=0.995, epsilon_min=0.01):
        self.n_actions = n_actions
        self.lr = learning_rate
        self.gamma = discount_factor
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min
        self.q_table = defaultdict(lambda: np.zeros(n_actions))
    
    def get_action(self, state, training=True):
        if training and np.random.random() < self.epsilon:
            return np.random.randint(self.n_actions)
        q_values = self.q_table[state]
        return np.random.choice(np.where(np.isclose(q_values, np.max(q_values)))[0])
    
    def update(self, state, action, reward, next_state, done):
        current_q = self.q_table[state][action]
        target = reward if done else reward + self.gamma * np.max(self.q_table[next_state])
        self.q_table[state][action] += self.lr * (target - current_q)
        return target - current_q
    
    def decay_epsilon(self):
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
    
    def save(self, filepath):
        data = {'q_table': {str(k): v.tolist() for k, v in self.q_table.items()},
                'epsilon': self.epsilon}
        with open(filepath, 'w') as f:
            json.dump(data, f, indent=2)
    
    def load(self, filepath):
        with open(filepath, 'r') as f:
            data = json.load(f)
        self.q_table = defaultdict(lambda: np.zeros(self.n_actions))
        for k, v in data['q_table'].items():
            key = eval(k) if '(' in k else int(k)
            self.q_table[key] = np.array(v)
        self.epsilon = data.get('epsilon', 0.01)

---

## 第三部分：Gymnasium 环境实战

### 3.1 Taxi-v3 环境

```
+---------+
|R: | : :G|    R, G, Y, B: 乘客位置/目的地
| : | : : |    |: 墙壁
| : : : : |
| | : | : |
|Y| : |B: |
+---------+

状态: 500 (位置×乘客位置×目的地)
动作: 6 (南/北/东/西/接/放)
奖励: 每步-1, 成功+20, 非法操作-10
```

In [None]:
if HAS_GYM:
    env = gym.make('Taxi-v3', render_mode='ansi')
    
    print("Taxi-v3 环境:")
    print(f"  状态空间: {env.observation_space.n}")
    print(f"  动作空间: {env.action_space.n}")
    
    state, _ = env.reset(seed=42)
    print(f"\n初始状态: {state}")
    print(env.render())
    
    env.close()

### 3.2 训练函数

In [None]:
def train_agent(env, agent, episodes=2000, max_steps=200, verbose=True, log_interval=200):
    """通用训练函数"""
    metrics = TrainingMetrics()
    
    for episode in range(episodes):
        result = env.reset()
        state = result[0] if isinstance(result, tuple) else result
        total_reward, steps = 0.0, 0
        
        for _ in range(max_steps):
            action = agent.get_action(state, training=True)
            result = env.step(action)
            
            if len(result) == 3:
                next_state, reward, done = result
            else:
                next_state, reward, terminated, truncated, _ = result
                done = terminated or truncated
            
            agent.update(state, action, reward, next_state, done)
            state, total_reward, steps = next_state, total_reward + reward, steps + 1
            if done: break
        
        agent.decay_epsilon()
        metrics.episode_rewards.append(total_reward)
        metrics.episode_lengths.append(steps)
        metrics.epsilon_history.append(agent.epsilon)
        
        if verbose and (episode + 1) % log_interval == 0:
            avg = np.mean(metrics.episode_rewards[-log_interval:])
            print(f"Episode {episode+1:4d} | Avg Reward: {avg:8.2f} | ε: {agent.epsilon:.4f}")
    
    return metrics

In [None]:
if HAS_GYM:
    print("="*60)
    print("Taxi-v3 Q-Learning 训练")
    print("="*60)
    
    env = gym.make('Taxi-v3')
    agent = QLearningAgent(n_actions=env.action_space.n, learning_rate=0.1,
                           epsilon=1.0, epsilon_decay=0.995, epsilon_min=0.01)
    
    metrics = train_agent(env, agent, episodes=2000, verbose=True)
    env.close()
    
    print(f"\n训练完成！最后100回合平均: {np.mean(metrics.episode_rewards[-100:]):.2f}")

### 3.3 可视化训练过程

In [None]:
def plot_metrics(metrics, title="训练曲线"):
    fig, axes = plt.subplots(1, 3, figsize=(15, 4))
    window = 50
    
    # 奖励
    rewards = metrics.episode_rewards
    smoothed = np.convolve(rewards, np.ones(window)/window, mode='valid')
    axes[0].plot(rewards, alpha=0.3, color='blue')
    axes[0].plot(range(window-1, len(rewards)), smoothed, color='blue', linewidth=2)
    axes[0].set_xlabel('Episode')
    axes[0].set_ylabel('Reward')
    axes[0].set_title('回合奖励')
    axes[0].grid(True, alpha=0.3)
    
    # 步数
    steps = metrics.episode_lengths
    smoothed_steps = np.convolve(steps, np.ones(window)/window, mode='valid')
    axes[1].plot(steps, alpha=0.3, color='green')
    axes[1].plot(range(window-1, len(steps)), smoothed_steps, color='green', linewidth=2)
    axes[1].set_xlabel('Episode')
    axes[1].set_ylabel('Steps')
    axes[1].set_title('回合步数')
    axes[1].grid(True, alpha=0.3)
    
    # 探索率
    axes[2].plot(metrics.epsilon_history, color='red')
    axes[2].set_xlabel('Episode')
    axes[2].set_ylabel('Epsilon')
    axes[2].set_title('探索率衰减')
    axes[2].grid(True, alpha=0.3)
    
    plt.suptitle(title, fontsize=14, fontweight='bold')
    plt.tight_layout()
    plt.show()

In [None]:
if HAS_GYM:
    plot_metrics(metrics, "Taxi-v3 Q-Learning")

### 3.4 评估智能体

In [None]:
def evaluate_agent(env, agent, episodes=100):
    """评估智能体性能"""
    rewards, successes = [], 0
    
    for _ in range(episodes):
        result = env.reset()
        state = result[0] if isinstance(result, tuple) else result
        total_reward, steps = 0, 0
        
        while steps < 200:
            action = agent.get_action(state, training=False)
            result = env.step(action)
            if len(result) == 3:
                next_state, reward, done = result
            else:
                next_state, reward, terminated, truncated, _ = result
                done = terminated or truncated
            
            total_reward += reward
            state, steps = next_state, steps + 1
            
            if done:
                if reward == 20: successes += 1
                break
        
        rewards.append(total_reward)
    
    return {'mean': np.mean(rewards), 'std': np.std(rewards), 'success_rate': successes/episodes*100}

In [None]:
if HAS_GYM:
    print("\n评估训练好的智能体...")
    env = gym.make('Taxi-v3')
    results = evaluate_agent(env, agent, episodes=100)
    env.close()
    
    print(f"\n评估结果 (100回合):")
    print(f"  平均奖励: {results['mean']:.2f} ± {results['std']:.2f}")
    print(f"  成功率: {results['success_rate']:.1f}%")

### 3.5 演示智能体

In [None]:
if HAS_GYM:
    env = gym.make('Taxi-v3', render_mode='ansi')
    action_names = ['南', '北', '东', '西', '接', '放']
    
    print("\n" + "="*40)
    print("演示回合")
    print("="*40)
    
    state, _ = env.reset()
    total_reward, steps = 0, 0
    
    print("\n初始状态:")
    print(env.render())
    
    while steps < 15:
        action = agent.get_action(state, training=False)
        result = env.step(action)
        next_state, reward, terminated, truncated, _ = result
        done = terminated or truncated
        
        total_reward += reward
        steps += 1
        
        print(f"\n步骤 {steps}: {action_names[action]}, 奖励={reward}")
        print(env.render())
        
        if done:
            print(f"\n回合结束！总奖励: {total_reward}")
            break
        
        state = next_state
    
    env.close()

---

## 第四部分：Double Q-Learning 对比

In [None]:
if HAS_GYM:
    print("="*60)
    print("Q-Learning vs Double Q-Learning")
    print("="*60)
    
    env = gym.make('Taxi-v3')
    
    # Q-Learning
    print("\n训练 Q-Learning...")
    q_agent = QLearningAgent(n_actions=6, learning_rate=0.1, epsilon=1.0,
                              epsilon_decay=0.995, epsilon_min=0.01)
    q_metrics = train_agent(env, q_agent, episodes=1000, verbose=False)
    
    # Double Q-Learning
    print("训练 Double Q-Learning...")
    double_agent = DoubleQLearningAgent(n_actions=6, learning_rate=0.1, epsilon=1.0,
                                         epsilon_decay=0.995, epsilon_min=0.01)
    double_metrics = train_agent(env, double_agent, episodes=1000, verbose=False)
    
    env.close()

In [None]:
if HAS_GYM:
    fig, ax = plt.subplots(figsize=(10, 5))
    window = 50
    
    q_smooth = np.convolve(q_metrics.episode_rewards, np.ones(window)/window, mode='valid')
    double_smooth = np.convolve(double_metrics.episode_rewards, np.ones(window)/window, mode='valid')
    
    ax.plot(q_smooth, label='Q-Learning', alpha=0.8)
    ax.plot(double_smooth, label='Double Q-Learning', alpha=0.8)
    ax.set_xlabel('Episode')
    ax.set_ylabel('Reward')
    ax.set_title('Q-Learning vs Double Q-Learning')
    ax.legend()
    ax.grid(True, alpha=0.3)
    plt.show()
    
    print(f"\n最后100回合:")
    print(f"  Q-Learning: {np.mean(q_metrics.episode_rewards[-100:]):.2f}")
    print(f"  Double Q-Learning: {np.mean(double_metrics.episode_rewards[-100:]):.2f}")

---

## 第五部分：学习率调度

In [None]:
class AdaptiveLRQLearning(QLearningAgent):
    """
    自适应学习率 Q-Learning
    
    α(s,a) = 1 / (1 + N(s,a))
    """
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.visit_count = defaultdict(lambda: np.zeros(self.n_actions))
    
    def update(self, state, action, reward, next_state, done):
        self.visit_count[state][action] += 1
        lr = 1.0 / (1.0 + self.visit_count[state][action])
        
        current_q = self.q_table[state][action]
        target = reward if done else reward + self.gamma * np.max(self.q_table[next_state])
        self.q_table[state][action] += lr * (target - current_q)
        return target - current_q

In [None]:
# 测试自适应学习率
agent = AdaptiveLRQLearning(n_actions=4)
state = (0, 0)

print("自适应学习率演示:")
for i in range(5):
    lr = 1.0 / (1.0 + agent.visit_count[state][0])
    agent.update(state, 0, -1.0, (0, 1), False)
    print(f"  访问 {i+1}: lr = {lr:.4f}")

---

## 总结

### 核心要点

1. **过估计问题**: Q-Learning 的 max 导致系统性高估
2. **Double Q-Learning**: 解耦选择与评估消除偏差
3. **学习率调度**: 基于访问次数自适应调整

### 超参数建议

| 参数 | 范围 | 说明 |
|------|------|------|
| 学习率 | 0.05-0.5 | 表格型可用较大值 |
| 折扣因子 | 0.95-0.99 | 任务越长期越接近1 |
| 初始探索率 | 1.0 | 从完全探索开始 |
| 最终探索率 | 0.01-0.1 | 保持少量探索 |

---

## 单元测试

In [None]:
def run_tests():
    print("开始单元测试...\n")
    passed = 0
    
    # 测试1: Double Q-Learning 更新
    agent = DoubleQLearningAgent(n_actions=4, learning_rate=0.5)
    np.random.seed(42)
    for _ in range(10):
        agent.update((0,0), 0, -1.0, (0,1), False)
    assert agent.q_table1[(0,0)][0] != 0 or agent.q_table2[(0,0)][0] != 0
    print("✓ 测试1: Double Q-Learning 更新")
    passed += 1
    
    # 测试2: 自适应学习率
    agent = AdaptiveLRQLearning(n_actions=4)
    lr1 = 1.0 / (1.0 + agent.visit_count[(0,0)][0])
    agent.update((0,0), 0, -1.0, (0,1), False)
    lr2 = 1.0 / (1.0 + agent.visit_count[(0,0)][0])
    assert lr2 < lr1, "学习率应该衰减"
    print("✓ 测试2: 自适应学习率")
    passed += 1
    
    # 测试3: 保存/加载
    agent = QLearningAgent(n_actions=4)
    agent.q_table[(0,0)] = np.array([1.0, 2.0, 3.0, 4.0])
    agent.save('_test.json')
    
    new_agent = QLearningAgent(n_actions=4)
    new_agent.load('_test.json')
    assert np.allclose(new_agent.q_table[(0,0)], agent.q_table[(0,0)])
    
    import os
    os.remove('_test.json')
    print("✓ 测试3: 保存/加载")
    passed += 1
    
    print(f"\n全部 {passed} 项测试通过!")

run_tests()

---

## 参考资料

1. Van Hasselt (2010). Double Q-learning. NeurIPS.
2. Sutton & Barto (2018). Reinforcement Learning: An Introduction.
3. [Gymnasium Documentation](https://gymnasium.farama.org/)