## Q learning with different exploration strategies

Authors: [johnjim0816](https://github.com/johnjim0816)


## 1、定义算法

In [10]:
import numpy as np
import math
from collections import defaultdict

class QLearning(object):
    def __init__(self,cfg):
        self.explore_type = cfg.explore_type # 探索策略类型
        self.n_actions = cfg.n_actions 
        self.lr = cfg.lr  # 学习率
        self.gamma = cfg.gamma  
        self.epsilon = cfg.epsilon_start
        self.sample_count = 0  
        self.epsilon_start = cfg.epsilon_start
        self.epsilon_end = cfg.epsilon_end
        self.epsilon_decay = cfg.epsilon_decay
        ## TODO: 这里Q表可以换成数组比如np.zeros((n_states, n_actions))
        self.Q_table  = defaultdict(lambda: np.zeros(self.n_actions)) # 用嵌套字典存放状态->动作->状态-动作值（Q值）的映射，即Q表
    def sample_action(self, state):
        ''' 采样动作，训练时用
        '''
        self.sample_count += 1
        if self.explore_type == 'epsilon_greedy':
            self.epsilon = self.epsilon_end + (self.epsilon_start - self.epsilon_end) * \
                math.exp(-1. * self.sample_count / self.epsilon_decay) # epsilon是会递减的，这里选择指数递减
            # e-greedy 策略
            if np.random.uniform(0, 1) > self.epsilon:
                action = np.argmax(self.Q_table[str(state)]) # 选择Q(s,a)最大对应的动作
            else:
                action = np.random.choice(self.n_actions) # 随机选择动作
            return action
        elif self.explore_type == 'boltzmann':
            # boltzmann 策略
            action_probs = np.exp(self.Q_table[str(state)] / self.epsilon) / np.sum(np.exp(self.Q_table[str(state)] / self.epsilon))
            action = np.random.choice(self.n_actions, p=action_probs)
            return action
        elif self.explore_type == 'ucb':
            # ucb 策略
            if self.sample_count < self.n_actions:
                action = self.sample_count
            else:
                action = np.argmax(self.Q_table[str(state)] + self.epsilon * np.sqrt(np.log(self.sample_count) / self.sample_count))
            return action
        elif self.explore_type == 'softmax':
            # softmax 策略
            action_probs = np.exp(self.Q_table[str(state)] / self.epsilon) / np.sum(np.exp(self.Q_table[str(state)] / self.epsilon))
            action = np.random.choice(self.n_actions, p=action_probs)
            return action
        elif self.explore_type == 'thompson':
            # thompson 策略
            action = np.argmax(np.random.beta(self.Q_table[str(state)] + 1, 1))
            return action
        else:
            raise NotImplementedError
    def predict_action(self,state):
        ''' 预测或选择动作，测试时用
        '''
        if self.explore_type == 'epsilon_greedy':
            action = np.argmax(self.Q_table[str(state)])
            return action
        elif self.explore_type == 'boltzmann':
            action_probs = np.exp(self.Q_table[str(state)] / self.epsilon) / np.sum(np.exp(self.Q_table[str(state)] / self.epsilon))
            action = np.random.choice(self.n_actions, p=action_probs)
            return action
        elif self.explore_type == 'ucb':
            action = np.argmax(self.Q_table[str(state)])
            return action
        elif self.explore_type == 'softmax':
            action_probs = np.exp(self.Q_table[str(state)] / self.epsilon) / np.sum(np.exp(self.Q_table[str(state)] / self.epsilon))
            action = np.random.choice(self.n_actions, p=action_probs)
            return action
        elif self.explore_type == 'thompson':
            action = np.argmax(np.random.beta(self.Q_table[str(state)] + 1, 1))
            return action
        else:
            raise NotImplementedError
    def update(self, state, action, reward, next_state, terminated):
        Q_predict = self.Q_table[str(state)][action] 
        if terminated: # 终止状态
            Q_target = reward  
        else:
            Q_target = reward + self.gamma * np.max(self.Q_table[str(next_state)]) 
        self.Q_table[str(state)][action] += self.lr * (Q_target - Q_predict)

## 2、定义训练

In [11]:
def train(cfg,env,agent):
    print('开始训练！')
    print(f'环境:{cfg.env_name}, 算法:{cfg.algo_name}, 设备:{cfg.device}')
    rewards = []  # 记录奖励
    for i_ep in range(cfg.train_eps):
        ep_reward = 0  # 记录每个回合的奖励
        state = env.reset(seed=cfg.seed)  # 重置环境,即开始新的回合
        while True:
            action = agent.sample_action(state)  # 根据算法采样一个动作
            next_state, reward, terminated, info = env.step(action)  # 与环境进行一次动作交互
            agent.update(state, action, reward, next_state, terminated)  # Q学习算法更新
            state = next_state  # 更新状态
            ep_reward += reward
            if terminated:
                break
        rewards.append(ep_reward)
        if (i_ep+1)%20==0:
            print(f"回合：{i_ep+1}/{cfg.train_eps}，奖励：{ep_reward:.1f}，Epsilon：{agent.epsilon:.3f}")
    print('完成训练！')
    return {"rewards":rewards} #TODO:可以加收敛的回合数
def test(cfg,env,agent):
    print('开始测试！')
    print(f'环境：{cfg.env_name}, 算法：{cfg.algo_name}, 设备：{cfg.device}')
    rewards = []  # 记录所有回合的奖励
    for i_ep in range(cfg.test_eps):
        ep_reward = 0  # 记录每个episode的reward
        state = env.reset(seed=cfg.seed)  # 重置环境, 重新开一局（即开始新的一个回合）
        while True:
            action = agent.predict_action(state)  # 根据算法选择一个动作
            next_state, reward, terminated, info = env.step(action)  # 与环境进行一个交互
            state = next_state  # 更新状态
            ep_reward += reward
            if terminated:
                break
        rewards.append(ep_reward)
        print(f"回合数：{i_ep+1}/{cfg.test_eps}, 奖励：{ep_reward:.1f}")
    print('完成测试！')
    return {"rewards":rewards}

## 3、定义环境

In [12]:

import sys,os
curr_path = os.path.abspath('')
parent_path = os.path.dirname(curr_path)
sys.path.append(parent_path)
from gym.envs.toy_text import FrozenLakeEnv
from envs.simple_grid import DrunkenWalkEnv
lake_env = FrozenLakeEnv(is_slippery=False)
alley_env = DrunkenWalkEnv(map_name='theAlley')
walk_in_the_park_env = DrunkenWalkEnv(map_name='walkInThePark')

env_dict = {
    'theAlley': alley_env ,
    'walkInThePark': walk_in_the_park_env,
    'FrozenLakeEasy-v0': lake_env,
}



ImportError: cannot import name 'discrete' from 'gym.envs.toy_text' (/Users/jj/opt/anaconda3/envs/easyrl/lib/python3.7/site-packages/gym/envs/toy_text/__init__.py)

## 4、设置参数

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import torch
class Config:
    '''配置参数
    '''
    def __init__(self):
        self.env_name = 'theAlley' # 环境名称
        self.algo_name = 'Q-Learning' # 算法名称
        self.explore_type = 'epsilon_greedy' # 探索策略
        self.train_eps = 400 # 训练回合数
        self.test_eps = 20 # 测试回合数
        self.max_steps = 200 # 每个回合最大步数
        self.epsilon_start = 0.95 #  e-greedy策略中epsilon的初始值
        self.epsilon_end = 0.01 #  e-greedy策略中epsilon的最终值
        self.epsilon_decay = 300 #  e-greedy策略中epsilon的衰减率
        self.gamma = 0.9 # 折扣因子
        self.lr = 0.1 # 学习率
        self.seed = 1 # 随机种子
        self.device = torch.device('cpu')

def smooth(data, weight=0.9):  
    '''用于平滑曲线
    '''
    last = data[0]  # First value in the plot (first timestep)
    smoothed = list()
    for point in data:
        smoothed_val = last * weight + (1 - weight) * point  # 计算平滑值
        smoothed.append(smoothed_val)                    
        last = smoothed_val                                
    return smoothed

def plot_rewards(rewards,title="learning curve"):
    sns.set()
    plt.figure()  # 创建一个图形实例，方便同时多画几个图
    plt.title(f"{title}")
    plt.xlim(0, len(rewards), 10)  # 设置x轴的范围
    plt.xlabel('epsiodes')
    plt.plot(rewards, label='rewards')
    plt.plot(smooth(rewards), label='smoothed')
    plt.legend()
def plot_table(table,title="Q table"):
    # TODO 把你的Qtableplot变成一个函数，不要写成类，输入输出自己想办法吧
    sns.set()
    plt.figure()  # 创建一个图形实例，方便同时多画几个图
    plt.title(f"{title}")
    plt.xlabel('state')
    plt.ylabel('action')
    plt.imshow(table, cmap='gray')
    plt.colorbar()

final_res = [] # 用于记录每个环境的最终结果
cfg = Config() 
cs = Console() #TODO 不明白用这个干啥，直接print就行

## 5、探索策略研究

### 5.1、softmax
之后的探索策略都一样

In [None]:

cfg.explore_type = 'softmax'

for env_name, env in env_dict.items():
    print('--'*45)
    print(f'EnvName = {env_name}')
    n_states = env.observation_space.n
    n_actions = env.action_space.n
    print(f'状态数：{n_states}, 动作数：{n_actions}')
    setattr(cfg, 'env_name', env_name)
    setattr(cfg, 'n_states', n_states)
    setattr(cfg, 'n_actions', n_actions)
    agent = QLearning(cfg)
    final_play_res = train(env,agent) #TODO：这里输出结果用字典表示，方便后面提取数据
    final_res_print_str = f'Method: {final_play_res[0]}, MeanStepCnt: {final_play_res[1]:.3f}, MeanReward: {final_play_res[2]:.3f}]'
    final_res.append([env_name] + final_play_res)
    cs.print(final_res_print_str)

## 6、总结
这里写一下文字说明

In [None]:
import pandas as pd

pd.DataFrame(final_res, columns=['envName', 'policy', 'stepCount', 'Rewards']).sort_values(by='envName')