# 1.定义模型（训练与测试）


In [None]:
import numpy as np
import math
import torch
from collections import defaultdict  #创建默认字典

#定义一个智能体的类
class Qlearning(object):
    def __init__(self,cfg):
        '''智能体类
        Args:
            cfg (class): 超参数类
        '''
        #动作空间大小（可执行的动作数量）
        self.n_actions = cfg.n_actions    
        #探索策略如 ε-greedy ，boltzmann ，softmax， ucb 等
        self.exploration_type = 'e-greedy' 
        #学习率（控制Q值更新的幅度）
        self.lr = cfg.lr  
        #折扣因子（未来奖励的衰减系数）                  
        self.gamma = cfg.gamma  
        #当前ε值（初始值为超参数中的起始ε）           
        self.epsilon = cfg.epsilon_start   
        #记录采样次数（用于ε的衰减）
        self.sample_count = 0        
        # ε的初始值      
        self.epsilon_start = cfg.epsilon_start  
        # ε的最终值（衰减下限）
        self.epsilon_end = cfg.epsilon_end      
        # ε的衰减率（控制衰减速度）
        self.epsilon_decay = cfg.epsilon_decay  
        # 初始化Q表：用默认字典存储，当遇见未知状态，会自动创建一个条目，不需要初始化所有可能的状态
        # 动作空间有4个值，每一个状态都对应4个值，初始都是0
        self.Q_table  = defaultdict(lambda: np.zeros(self.n_actions))  


    #训练阶段选择动作（包含探索）
    def sample_action(self, state):        
        ''' 以 e-greedy 策略训练时选择动作 
        Args:
            state (array): 状态
        Returns:
            action (int): 动作
        ''' 
        #如果使用贪心策略
        if self.exploration_type == 'e-greedy':  
            #调用对应的动作选择方法                   
            action = self._epsilon_greedy_sample_action(state)      
        else:
            #否则抛出异常
            raise NotImplementedError                               
        return action
    

    #测试阶段选择动作（无探索，纯利用）
    def predict_action(self,state):        
        ''' 预测动作
        Args:
            state (array): 状态
        Returns:
            action (int): 动作
        '''
        #如果使用贪心策略
        if self.exploration_type == 'e-greedy':
            #调用对应的动作选择方法
            action = self._epsilon_greedy_predict_action(state)
        else:
            #否则抛出异常
            raise NotImplementedError
        return action
    
    
    #实现贪婪策略的动作选择（训练用）
    def _epsilon_greedy_sample_action(self, state):
        ''' 
        采用 epsilon-greedy 策略进行动作选择 
        Args: 
            state (array): 状态
        Returns: 
            action (int): 动作
        ''' 
        #采样次数+1（用于更新ε）
        self.sample_count += 1
        #计算当前ε值：指数衰减（随采样次数增加，ε从start衰减到end）
        self.epsilon = self.epsilon_end + (self.epsilon_start - self.epsilon_end) * \
            math.exp(-1. * self.sample_count / self.epsilon_decay) 
        #以(1-ε)的概率选择当前Q值最大的动作（利用），以ε的概率随机选择动作（探索）
        #随机数落在[a, 1]区间的概率正好是1-a,随机数落在 [0, a] 区间的概率正好是a
        if np.random.uniform(0, 1) > self.epsilon:
            #从Q表中获取当前状态（转为字符串作为键）对应的所有动作的Q值
            #返回数组中最大值的索引（即Q值最大的动作编号）
            action = np.argmax(self.Q_table[str(state)]) 
        #随机数小于ε：探索
        else:
            #从动作空间中随机选一个动作
            action = np.random.choice(self.n_actions) 
        return action
    
    #测试阶段
    def _epsilon_greedy_predict_action(self,state):
        ''' 
        使用 epsilon-greedy 算法进行动作预测 
        Args: 
            state (array): 状态
        Returns: 
            action (int): 动作 
        ''' 
        #找到当前状态中，4个动作最大的Q值，然后返回其索引值
        action = np.argmax(self.Q_table[str(state)])
        return action
    
    #Q值表的更新
    def update(self, state, action, reward, next_state, done):
        ''' 更新模型
        Args:
            state (array): 当前状态 
            action (int): 当前动作 
            reward (float): 当前奖励信号 
            next_state (array): 下一个状态 
            done (bool): 表示是否达到终止状态 
        '''
        #取当前状态当前动作的Q值。[action]是一个索引
        Q_predict = self.Q_table[str(state)][action] 
        #如果状态终止，目标Q值就等于当前获得的奖励。状态终止奖励为0
        if done: 
            #所以目标Q值是0
            Q_target = reward  
        else:
            #如果非终止态，用这个公式求目标Q值
            Q_target = reward + self.gamma * np.max(self.Q_table[str(next_state)]) 
        #得到目标Q值之后，更新当前状态当前动作的Q值
        self.Q_table[str(state)][action] += self.lr * (Q_target - Q_predict)

    def save_model(self,path):
        '''
        保存模型
        Args:
            path (str): 模型存储路径 
        '''
        #序列化储存
        import dill
        #路径处理库
        from pathlib import Path
        # 确保存储路径存在 
        # 使用path对象创建目录（如果不存在）
        # parents=True: 创建所有必要的父目录
        # exist_ok=True: 如果目录已存在不会抛出异常
        Path(path).mkdir(parents=True, exist_ok=True)
        #保存模型
        torch.save(
            #保存的对象，这里是Q表
            obj=self.Q_table,
            #保存的路径和文件名
            f=path+"Qleaning_model.pkl",
            #指定使用dill而不是默认的pickle进行序列化
            pickle_module=dill
        )
        print("Model saved!")
        
        
    def load_model(self, path):
        '''
        根据模型路径导入模型
        Args:
            fpath (str): 模型路径
        '''
        import dill
        self.Q_table =torch.load(f=path+'Qleaning_model.pkl',pickle_module=dill)
        print("Mode loaded!")

# 2.可视化

In [None]:
import pygame
import sys
import time 

class CliffWalkingVisualizer:
    def __init__(self):
        # 初始化Pygame
        pygame.init()
        
        # 网格参数
        self.GRID_ROWS = 4
        self.GRID_COLS = 12
        self.CELL_SIZE = 80  # 每个网格单元格的大小（像素）
        self.MARGIN = 2      # 网格线宽度
        
        # 计算窗口大小
        self.WINDOW_WIDTH = self.GRID_COLS * (self.CELL_SIZE + self.MARGIN) + self.MARGIN
        self.WINDOW_HEIGHT = self.GRID_ROWS * (self.CELL_SIZE + self.MARGIN) + self.MARGIN
        
        # 颜色定义
        self.WHITE = (255, 255, 255)
        self.BLACK = (0, 0, 0)
        self.GREEN = (0, 200, 0)    # 起点
        self.RED = (200, 0, 0)      # 终点
        self.ORANGE = (255, 165, 0) # 悬崖
        self.BLUE = (100, 100, 255) # 普通路径
        self.GRAY = (200, 200, 200) # 网格线
        self.AGENT_COLOR = (255, 255, 0)  # 智能体颜色
        
        # 创建窗口
        self.screen = pygame.display.set_mode((self.WINDOW_WIDTH, self.WINDOW_HEIGHT))
        pygame.display.set_caption("悬崖寻路可视化")
        
        # 设置字体
        self.font = pygame.font.SysFont('SimHei', 20)  # 使用支持中文的字体
        
        # 当前智能体位置
        self.agent_pos = None
    
    def update_agent_position(self, state):
        """更新智能体位置"""
        # 将状态转换为网格坐标
        row = state // self.GRID_COLS
        col = state % self.GRID_COLS
        self.agent_pos = (row, col)
    
    def render(self, episode, step, reward, total_reward):
        """渲染当前状态"""
        # 处理事件
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                pygame.quit()
                sys.exit()
            elif event.type == pygame.KEYDOWN:
                if event.key == pygame.K_ESCAPE:
                    pygame.quit()
                    sys.exit()
        
        # 填充背景
        self.screen.fill(self.GRAY)
        
        # 绘制网格
        for row in range(self.GRID_ROWS):
            for col in range(self.GRID_COLS):
                # 计算单元格位置
                x = col * (self.CELL_SIZE + self.MARGIN) + self.MARGIN
                y = row * (self.CELL_SIZE + self.MARGIN) + self.MARGIN
                
                # 确定单元格颜色
                if row == self.GRID_ROWS - 1:  # 最后一行
                    if col == 0:  # 起点
                        color = self.GREEN
                    elif col == self.GRID_COLS - 1:  # 终点
                        color = self.RED
                    else:  # 悬崖
                        color = self.ORANGE
                else:  # 其他行
                    color = self.BLUE
                
                # 绘制单元格
                pygame.draw.rect(self.screen, color, (x, y, self.CELL_SIZE, self.CELL_SIZE))
        
        # 绘制智能体
        if self.agent_pos:
            row, col = self.agent_pos
            x = col * (self.CELL_SIZE + self.MARGIN) + self.MARGIN
            y = row * (self.CELL_SIZE + self.MARGIN) + self.MARGIN
            center_x = x + self.CELL_SIZE // 2
            center_y = y + self.CELL_SIZE // 2
            pygame.draw.circle(self.screen, self.AGENT_COLOR, (center_x, center_y), self.CELL_SIZE // 3)
        
        # 添加起点和终点文字标签（居中）
        start_text = self.font.render("起点", True, self.BLACK)
        start_x = self.MARGIN + (self.CELL_SIZE - start_text.get_width()) / 2
        start_y = (self.GRID_ROWS - 1) * (self.CELL_SIZE + self.MARGIN) + self.MARGIN + (self.CELL_SIZE - start_text.get_height()) / 2
        self.screen.blit(start_text, (start_x, start_y))
        
        end_text = self.font.render("终点", True, self.BLACK)
        end_x = (self.GRID_COLS - 1) * (self.CELL_SIZE + self.MARGIN) + self.MARGIN + (self.CELL_SIZE - end_text.get_width()) / 2
        end_y = (self.GRID_ROWS - 1) * (self.CELL_SIZE + self.MARGIN) + self.MARGIN + (self.CELL_SIZE - end_text.get_height()) / 2
        self.screen.blit(end_text, (end_x, end_y))
        
        # 添加信息显示
        info_text = self.font.render(f"回合: {episode} 步数: {step} 当前奖励: {reward} 总奖励: {total_reward}", True, self.BLACK)
        self.screen.blit(info_text, (10, 10))
        
        # 更新显示
        pygame.display.flip()
        
        # 控制渲染速度
        time.sleep(0.1)
    
    def close(self):
        """关闭可视化"""
        pygame.quit()

# 3.定义模型的训练与测试

In [None]:
def train(cfg, env, agent):
    ''' 训练
    '''
    print("开始训练！")
    #记录所有回合的奖励
    rewards = []  
    #记录所有回合步数
    steps = []
    #开始训练，回合数=cfg.train_eps
    for i_ep in range(cfg.train_eps):
        #初始化当前回合的累积奖励
        ep_reward = 0  
        #初始化当前回合的步数 
        ep_step = 0
        #重置环境并获取初始状态，设置随机种子
        state = env.reset(seed = cfg.seed)  
        #当前回合内，步数=cfg.max_steps
        for _ in range(cfg.max_steps):
            #当前回合步数+1
            ep_step += 1
            #根据当前状态采样一个动作（训练阶段包含探索）
            action = agent.sample_action(state)  
            #根据配置决定使用新API还是旧API
            if cfg.new_step_api:
                #使用 OpenAI Gym 的 new_step_api
                #更新环境并返回新状态、奖励、终止状态、截断标志和其他信息
                next_state, reward, terminated, truncated , info = env.step(action)  
            else:
                #使用 OpenAI Gym 的 old_step_api
                #更新环境并返回新状态、奖励、终止状态和其他信息
                next_state, reward, terminated, info = env.step(action)  
            #更新智能体 
            agent.update(state, action, reward, next_state, terminated)  
            #更新状态
            state = next_state  
            #增加奖励
            ep_reward += reward  
            #如果到达终止状态，提前结束当前回合
            if terminated:
                break
        #记录当前回合的步数和总奖励    
        steps.append(ep_step)
        rewards.append(ep_reward)
        #每10个回合打印一次训练进度
        if (i_ep + 1) % 10 == 0:
            print(f"回合：{i_ep+1}/{cfg.train_eps}，奖励：{ep_reward:.2f}")
    print("完成训练！")
    #返回一个包含所有回合奖励的字典
    return {'rewards':rewards}


def test(cfg, env, agent):
    print("开始测试！")
    # 记录所有回合的奖励
    # 初始化可视化
    visualizer = CliffWalkingVisualizer()
    rewards = []  
    steps = []
    for i_ep in range(cfg.test_eps):
        # 一轮的累计奖励 
        ep_reward = 0  
        ep_step = 0
        # 重置环境并获取初始状态
        state = env.reset(seed = cfg.seed)   
        for _ in range(cfg.max_steps):
            if cfg.render:
                env.render()
            ep_step += 1
            action = agent.predict_action(state)  
            next_state, reward, terminated, truncated , info = env.step(action)
            # 更新状态
            state = next_state   
            # 增加奖励
            ep_reward += reward  
            if terminated:
                break
        steps.append(ep_step)
        rewards.append(ep_reward)
        print(f"回合：{i_ep+1}/{cfg.test_eps}，奖励：{ep_reward:.2f}")
    print("完成测试")
    env.close()
    return {'rewards':rewards}

# 4.定义环境

In [None]:
import gymnasium as gym
import os
import random
def all_seed(env,seed = 1):
    ''' 万能的seed函数
    '''
    #控制环境本身的随机性
    env.reset(seed=seed) 
    #控制所有使用numpy的随机操作
    np.random.seed(seed)
    #控制python内置模块random的随机性
    random.seed(seed)
    #控制PyTorch在CPU上的所有随机操作
    torch.manual_seed(seed) 
    #控制PyTorch在GPU上的所有随机操作
    torch.cuda.manual_seed(seed) 
    #控制控制Python的哈希函数行为
    #哈希函数在每次解释器启动时会被随机化。这会导致诸如set或dict这类数据结构的迭代顺序每次运行都不同
    os.environ['PYTHONHASHSEED'] = str(seed) 
    # 设置CuDNN使用确定性算法，防止底层CUDA库的随机性和不确定性
    torch.backends.cudnn.deterministic = True
    # 闭CuDNN的基准模式，确保确定性
    torch.backends.cudnn.benchmark = False
    #禁用CuDNN，只使用PyTorch自己的CUDA实现
    torch.backends.cudnn.enabled = False

#定义环境与智能体配置参数    
def env_agent_config(cfg):
    #创建指定名称的环境
    env = gym.make(cfg.env_name) 
    #调用上面定义的all_seed函数，设置所有随机种子
    all_seed(env,seed=cfg.seed)
    #打印观察空间的形状（对于离散空间可能不适用）
    print(env.observation_space.shape)
    #获取离散状态空间的大小（仅适用于离散观察空间）
    n_states = env.observation_space.n
    #获取离散动作空间的大小
    n_actions = env.action_space.n
    print(f"状态空间维度：{n_states}，动作空间维度：{n_actions}")
    #使用setattr将状态空间大小添加到配置对象中，状态空间值=48
    setattr(cfg, 'n_states', n_states)
    #将动作空间大小添加到配置对象中，动作空间值=4
    setattr(cfg, 'n_actions', n_actions) 
    #将环境的动作空间对象添加到配置中，动作空间对象=Discrete(4)
    #动作空间对象提供了更完整的动作空间定义，包含动作数量、动作空间类型、可能的动作值范围、采样方法等
    setattr(cfg, 'action_space', env.action_space) 
    agent = Qlearning(cfg)
    return env,agent

# 5.设置参数

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
class Config:
    def __init__(self) -> None:
        ## 通用参数
        #环境名称
        self.env_name = "CliffWalking-v1" 
        #使用新的API
        self.new_step_api = True 
        #不使用任何Wrapper
        self.wrapper = None 
        #不渲染环境
        self.render = False 
        #渲染模式
        self.render_mode = "human" 
        #算法名称
        self.algo_name = "Qlearning" 
        #运行模式
        self.mode = "train"
        #多线程框架
        self.mp_backend = "mp" 
        #随机种子的设置，保证实验结果可复现
        self.seed = 1 
        #计算设备
        self.device = "cuda" 
        #训练回合数
        self.train_eps = 500 
        #测试回合数
        self.test_eps = 10 
        #训练期间每隔10个回合评估一次
        self.eval_eps = 10 
        #每个回合中，每走5步评估一次
        self.eval_per_episode = 5 
        #单个回合最大步数
        self.max_steps = 1000 
        #是否从加载模型继续训练或者直接测试，false表示直接训练
        self.load_checkpoint = False
        #加载模型的文件路径
        self.load_path = "tasks" 
        #是否在程序运行时，显示结果图表
        self.show_fig = False 
        #是否将结果保存为图片
        self.save_fig = True 

        ## Qlearing参数
        self.epsilon_start = 0.95 # epsilon 初始值
        self.epsilon_end = 0.01 # epsilon 终止值
        self.epsilon_decay = 300 # epsilon 衰减率
        self.gamma = 0.90 # 奖励折扣因子
        self.lr = 0.1 # 学习率

def smooth(data, weight=0.9):  
    '''用于平滑曲线,类似于Tensorboard中的smooth曲线
    '''
    last = data[0] 
    smoothed = []
    for point in data:
        smoothed_val = last * weight + (1 - weight) * point  # 计算平滑值
        smoothed.append(smoothed_val)                    
        last = smoothed_val                                
    return smoothed

def plot_rewards(rewards,title="learning curve"):
    sns.set()
    plt.figure()  # 创建一个图形实例，方便同时多画几个图
    plt.title(f"{title}")
    plt.xlim(0, len(rewards))  # 设置x轴的范围
    plt.xticks(np.arange(0, len(rewards)+1, 50))
    plt.xlabel('episodes')
    plt.plot(rewards, label='rewards')
    plt.plot(smooth(rewards), label='smoothed')
    plt.legend()
    plt.show()

# 6.开始训练

In [None]:
# 获取参数
cfg = Config() 
# 训练
env, agent = env_agent_config(cfg)
res_dic = train(cfg, env, agent)
plot_rewards(res_dic['rewards'], title=f"training curve on {cfg.device} of {cfg.algo_name} for {cfg.env_name}")

# 测试
res_dic = test(cfg, env, agent)
plot_rewards(res_dic['rewards'], title=f"testing curve on {cfg.device} of {cfg.algo_name} for {cfg.env_name}")



