In [1]:
import torch
from matplotlib import pyplot as plt
from tqdm import tqdm
import numpy as np
import gym
import collections
import random
from IPython import display
import pygame

# 环境

## 显示所有Gym环境

In [2]:
def showAllGymEnvs():
    for env in gym.envs.registry:
        print(env)

## 离散动作环境（动作取值离散）

### 悬崖漫步环境

In [5]:
class CliffWalkingEnv(gym.Wrapper):
    '''
    ‌悬崖漫步环境‌是一个经典的强化学习环境，
    用于训练智能体在避免掉入悬崖的情况下从起点移动到终点。
    该环境由一个4x12的网格组成，智能体的起点位于左下角，目标点位于右下角。
    智能体可以执行上、下、左、右四个动作（0 up, 1 right, 2 down, 3 left），
    每走一步会获得-1的奖励，如果掉入悬崖或到达终点，则游戏结束。
    悬崖漫步环境的特性
    ‌网格世界‌：环境由4行12列的网格组成，每个网格代表一个状态。
    ‌起始和终止条件‌：智能体从左下角开始，目标是到达右下角。
    掉入悬崖或到达终点时，游戏结束。
    ‌动作空间‌：智能体可以执行上、下、左、右四个动作。
    ‌奖励机制‌：每走一步获得-1的奖励，掉入悬崖获得-100的奖励。
    悬崖漫步环境的应用场景
    悬崖漫步环境常用于测试和训练强化学习算法，
    特别是用于策略迭代和动态规划算法的示例。
    通过这个环境，研究者可以测试不同算法在面对障碍和终点时的表现，
    从而优化智能体的决策过程。
    '''
    name = 'CliffWalking-v0'
    def __init__(self, max_step=None, failure_score=None):
        env = gym.make(self.name, render_mode='rgb_array')
        super().__init__(env)
        self.env = env
        self.step_n = 0
        # self.max_step = 200
        self.max_step = max_step
        self.failure_score = failure_score

    def reset(self, seed=None):
        self.step_n = 0
        if seed:
            state, _ = self.env.reset(seed = seed)
        else:
            state, _ = self.env.reset()
        return state

    def step(self, action):
        # 参数是一个离散值（0或1），表示向左或向右移动。
        state, reward, terminated, truncated, info = self.env.step(action)
        done = terminated or truncated
        self.step_n += 1
        if self.max_step:
            # 限制最大步数
            if self.step_n >= self.max_step:
                done = True
        if self.failure_score:
            # 没坚持到最后，扣分
            if done and self.step_n < self.max_step:
                # reward = -1000
                reward = self.failure_score
        return state, reward, done

    # 打印游戏图像
    def show(self):
        plt.figure(figsize=(3, 3))
        plt.imshow(self.env.render())
        plt.show()

### 悬崖漫步环境（自定义）

In [6]:
class MyCliffWalkingEnv(gym.Env):
    '''
    ‌悬崖漫步环境‌是一个经典的强化学习环境，
    用于训练智能体在避免掉入悬崖的情况下从起点移动到终点。
    该环境由一个4x12的网格组成，智能体的起点位于左下角，目标点位于右下角。
    智能体可以执行上、下、左、右四个动作（0 up, 1 right, 2 down, 3 left），
    每走一步会获得-1的奖励，如果掉入悬崖或到达终点，则游戏结束。
    悬崖漫步环境的特性
    ‌网格世界‌：环境由4行12列的网格组成，每个网格代表一个状态。
    ‌起始和终止条件‌：智能体从左下角开始，目标是到达右下角。
    掉入悬崖或到达终点时，游戏结束。
    ‌动作空间‌：智能体可以执行上、下、左、右四个动作。
    ‌奖励机制‌：每走一步获得-1的奖励，掉入悬崖获得-100的奖励。
    悬崖漫步环境的应用场景
    悬崖漫步环境常用于测试和训练强化学习算法，
    特别是用于策略迭代和动态规划算法的示例。
    通过这个环境，研究者可以测试不同算法在面对障碍和终点时的表现，
    从而优化智能体的决策过程。
    '''
    name = 'MyCliffWalkingEnv'
    action_meaning = ['^', 'v', '<', '>']
    def __init__(self, ncol, nrow, disaster=list(range(37, 47)), end=[47]):
        self.nrow = nrow
        self.ncol = ncol
        self.x = 0  # 记录当前智能体位置的横坐标
        self.y = self.nrow - 1  # 记录当前智能体位置的纵坐标
        self.disaster = disaster
        self.end = end
        self.action_list = []
        self.state_list = []

    def reset(self):  # 回归初始状态（左下角）
        self.x = 0
        self.y = self.nrow - 1
        self.action_list = []
        self.state_list = []
        return self.y * self.ncol + self.x
     
    def step(self, action):  # 外部调用这个函数来改变当前位置
        self.action_list.append(action)
        # 4种动作，change[0]:上，change[1]:下，change[2]:左，change[3]:右。
        # 坐标系原点(0, 0)定义在左上角
        change = [[0, -1], [0, 1], [-1, 0], [1, 0]]
        self.x = min(self.ncol - 1, max(0, self.x + change[action][0]))
        self.y = min(self.nrow - 1, max(0, self.y + change[action][1]))
        next_state = self.y * self.ncol + self.x
        self.state_list.append((self.y, self.x))
        reward = -1
        done = False
        # 下一个位置在悬崖或者目标
        if self.y == self.nrow - 1 and self.x > 0:  
            done = True
            if self.x != self.ncol - 1:
                reward = -100
        return next_state, reward, done

    def show(self):
        print("动作总数：{}".format(len(self.action_list)))
        if len(self.action_list) > 20:
            print("前20个动作和对应的状态：")
        else:
            print("动作和对应的状态：")
        for action, state in list(zip(self.action_list, self.state_list))[:20]:
            print("{}->{}".format(action, state))

### 车杆平衡环境

In [7]:
class CartPoleEnv(gym.Wrapper):
    '''
    ‌Gym中的CartPole环境是离散动作空间‌。在CartPole环境中，
    小车可以通过两个离散动作来控制：向左移动或向右移动。具体来说，动作空间包括：
    ‌0‌：小车向左移动
    ‌1‌：小车向右移动
    这些动作是固定的，施加的力大小也是固定的，
    但速度和位移会根据杆子与竖直方向的角度变化而有所不同。
    此外，CartPole环境的观测值包括：
    ‌x‌：小车位置（范围：[-4.8, 4.8]）
    ‌x˙‌：小车速度（范围：−∞ 到 ∞∞）
    ‌θ‌：杆子角度（范围：[−24度，24度]）
    ‌θ˙‌：杆子顶端速度（范围：−∞ 到 ∞）。
    '''
    name = 'CartPole-v1'
    def __init__(self, max_step=None, failure_score=None):
        env = gym.make(self.name, render_mode='rgb_array')
        super().__init__(env)
        self.env = env
        self.step_n = 0
        # self.max_step = 200
        self.max_step = max_step
        self.failure_score = failure_score

    def reset(self, seed=None):
        self.step_n = 0
        if seed:
            state, _ = self.env.reset(seed = seed)
        else:
            state, _ = self.env.reset()
        return state

    def step(self, action):
        # 参数是一个离散值（0或1），表示向左或向右移动。
        state, reward, terminated, truncated, info = self.env.step(action)
        done = terminated or truncated
        self.step_n += 1
        if self.max_step:
            # 限制最大步数
            if self.step_n >= self.max_step:
                done = True
        if self.failure_score:
            # 没坚持到最后，扣分
            if done and self.step_n < self.max_step:
                # reward = -1000
                reward = self.failure_score
        return state, reward, done

    # 打印游戏图像
    def show(self):
        plt.figure(figsize=(3, 3))
        plt.imshow(self.env.render())
        plt.show()

## 连续动作环境（动作取值连续）

### 倒立摆环境

In [98]:
class PendulumEnv(gym.Wrapper):
    '''
    ‌Gym中的Pendulum环境是连续动作空间。
    智能体需要通过施加一个连续的力矩来控制倒立摆的摆动，使其保持竖直状态。
    Pendulum环境的动作空间和状态空间如下：
    动作空间 (action_space)：一个一维向量，表示施加在摆杆上的扭矩，范围为[-2, 2]（牛顿·米）。
    状态空间 (observation_space)：一个三维向量，包含以下信息：
        theta_dot：摆杆的角速度（弧度/秒），范围为 [-8, 8]。
        cos(theta) 和 sin(theta)：这两个值通常用于表示角度，避免角度值的不连续性。
    其中，theta为摆杆的角度（弧度），范围为 [-π, π]。
    通过训练，智能体需要学习如何通过施加合适的力矩来最小化负奖励，
    从而使倒立摆保持竖直状态。
    注：
    Pendulum-v0环境的动作空间是从-2到2的一个连续值，表示施加在摆杆上的一个扭矩。
    Pendulum-v1环境的动作空间是从-2到2的一个连续值一维向量，表示施加在摆杆上的若干个扭矩。
    '''
    name = 'Pendulum-v1'
    def __init__(self, max_step=200):
        env = gym.make(self.name, render_mode='rgb_array')
        super().__init__(env)
        self.env = env
        self.step_n = 0
        # self.max_step = 200
        self.max_step = max_step

    def reset(self, seed=None):
        if seed:
            state, _ = self.env.reset(seed = seed)
        else:
            state, _ = self.env.reset()
        self.step_n = 0
        return state

    def step(self, action):
        # 参数是从-2到2的一个连续值一维向量，表示施加在摆杆上的若干个扭矩。
        state, reward, terminated, truncated, info = self.env.step(action)        
        done = terminated or truncated
        # 限制最大步数
        self.step_n += 1
        if self.max_step:
            if self.step_n >= self.max_step:
                done = True
        # 对倒立摆环境的奖励进行修改以便训练
        reward = (reward + 8.0) / 8.0  
        return state, reward, done

    # 打印游戏图像
    def show(self):
        plt.figure(figsize=(3, 3))
        plt.imshow(self.env.render())
        plt.show()

# 训练

## 训练结果

### 显示回报

In [7]:
def show_return(return_list, agent, env):
    episodes_list = list(range(len(return_list)))
    plt.plot(episodes_list, return_list)
    plt.xlabel('Episodes')
    plt.ylabel('Returns')
    plt.title('{} on {}'.format(agent.name, env.name))
    plt.show()

### 移动平均

In [8]:
def moving_average(li, window_size):
    cumulative_sum = np.cumsum(np.insert(li, 0, 0)) 
    middle = (cumulative_sum[window_size:] - cumulative_sum[:-window_size]) / window_size
    r = np.arange(1, window_size-1, 2)
    begin = np.cumsum(li[:window_size-1])[::2] / r
    end = (np.cumsum(li[:-window_size:-1])[::2] / r)[::-1]
    return np.concatenate((begin, middle, end))

# 经验回放池

In [10]:
class ReplayBuffer:
    ''' 经验回放池 '''
    def __init__(self, buffer_size):
        # deque代表“double-ended queue”，即双端队列
        self.buffer = collections.deque(maxlen=buffer_size)  # 队列，先进先出
        
    def add(self, state, action, reward, next_state, done):  # 将数据加入buffer
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):  # 从buffer中采样数据，数量为batch_size
        transitions = random.sample(self.buffer, batch_size)
        state, action, reward, next_state, done = zip(*transitions)
        return state, action, reward, next_state, done

    def size(self):  # 目前buffer中数据的数量
        return len(self.buffer)

# 计算广义优势估计(GAE)

In [None]:
# Generalized Advantage Estimation
def compute_gae(gamma, lmbda, td_delta):
    td_delta = td_delta.detach().numpy()
    advantage_list = []
    advantage = 0.0
    for delta in td_delta[::-1]:
        advantage = gamma * lmbda * advantage + delta
        advantage_list.append(advantage)
    advantage_list.reverse()
    return torch.tensor(advantage_list, dtype=torch.float)