In [None]:
import numpy as np
import turtle
import time
import json
import gym
from Src.PolicyHelper.policy_helper_client import PolicyHelperClient

## Agent Using Double Sarsa Learning

In [None]:
EPISODE = 10000

class SarsaAgent(object):
    def __init__(self,
                 obs_n,
                 act_n,
                 learning_rate=0.01,
                 gamma=0.9,
                 e_greed=0.1):
        self.act_n = act_n  # 动作维度，有几个动作可选
        self.lr = learning_rate  # 学习率
        self.gamma = gamma  # reward的衰减率
        self.epsilon = e_greed  # 按一定概率随机选动作
        self.Q1 = np.zeros((obs_n, act_n))
        self.Q2 = np.zeros((obs_n, act_n))

    # 根据输入观察值，采样输出的动作值，带探索
    def sample(self, obs, episode):        
        eps = self.epsilon * (EPISODE - episode) / EPISODE if episode <= EPISODE else 0
        if np.random.uniform(0, 1) < (1.0 - eps):  #根据table的Q值选动作
            action = self.predict(obs)
        else:
            action = np.random.choice(self.act_n)  #有一定概率随机探索选取一个动作
        return action

    # 根据输入观察值，预测输出的动作值
    def predict(self, obs):
        Q_list1 = self.Q1[obs, :]
        Q_list2 = self.Q2[obs, :]
        Q_list = Q_list1 + Q_list2
        maxQ = np.max(Q_list)
        action_list = np.where(Q_list == maxQ)[0]  # maxQ可能对应多个action
        action = np.random.choice(action_list)
        return action

    # 学习方法，也就是更新Q-table的方法
    def learn(self, obs, action, reward, next_obs, next_action, done):
        """ on-policy
            obs: 交互前的obs, s_t
            action: 本次交互选择的action, a_t
            reward: 本次动作获得的奖励r
            next_obs: 本次交互后的obs, s_t+1
            next_action: 根据当前Q表格, 针对next_obs会选择的动作, a_t+1
            done: episode是否结束
        """
        if np.random.uniform(0,1) < 0.5:
            predict_Q = self.Q1[obs, action]
            if done:
                target_Q = reward  # 没有下一个状态了
            else:
                target_Q = reward + self.gamma * self.Q2[next_obs, next_action]  # Sarsa
            self.Q1[obs, action] += self.lr * (target_Q - predict_Q)  # 修正q
        else:
            predict_Q = self.Q2[obs, action]
            if done:
                target_Q = reward  # 没有下一个状态了
            else:
                target_Q = reward + self.gamma * self.Q1[next_obs, next_action]  # Sarsa
            self.Q2[obs, action] += self.lr * (target_Q - predict_Q)  # 修正q

    def save(self):
        npy_file = './q_table_sarsa.npy'
        with open(npy_file, 'wb') as fo:
            np.save(fo, self.Q1)
            np.save(fo, self.Q2)
        print(npy_file + ' saved.')

    def restore(self, npy_file='./q_table_sarsa.npy'):
        with open(npy_file, 'rb') as fo:
            self.Q1 = np.load(fo)
            self.Q2 = np.load(fo)
        print(npy_file + ' loaded.')

In [None]:
def run_episode(env, agent, episode = 0, render=False):
    total_steps = 0  # 记录每个episode走了多少step
    total_reward = 0

    obs = env.reset()  # 重置环境, 重新开一局（即开始新的一个episode）
    action = agent.sample(obs, episode)  # 根据算法选择一个动作

    while True:
        next_obs, reward, done, _ = env.step(action)  # 与环境进行一个交互
        if done and reward == 0:
            reward = -1
        next_action = agent.sample(next_obs, episode)  # 根据算法选择一个动作
        # 训练 Sarsa 算法
        agent.learn(obs, action, reward, next_obs, next_action, done)

        action = next_action
        obs = next_obs  # 存储上一个观察值
        total_reward += reward
        total_steps += 1  # 计算step数
        if render:
            env.render()  #渲染新的一帧图形
        if done:
            break
    return total_reward, total_steps


def test_episode(env, agent):
    total_reward = 0
    obs = env.reset()
    while True:
        action = agent.predict(obs)  # greedy
        next_obs, reward, done, _ = env.step(action)
        total_reward += reward
        obs = next_obs
        # time.sleep(0.5)
        # env.render()
        if done:
            # print('test reward = %.1f' % (total_reward))
            break
    return total_reward

TEST_EPISODE = 10000
RENDER_EPISODE = 50000
def main():
    
    env = gym.make("FrozenLake-v1", desc=None, map_name="8x8", is_slippery=True, max_episode_steps=2000)  # 0 up, 1 right, 2 down, 3 left
    # env = FrozenLakeWapper(env)

    agent = SarsaAgent(
        obs_n=env.observation_space.n,
        act_n=env.action_space.n,
        learning_rate=0.3,
        gamma=0.99,
        e_greed=0.1)

    is_render = False
    win_count = 0
    for episode in range(100000):
        ep_reward, ep_steps = run_episode(env, agent, episode, is_render)
        print('Episode %s: steps = %s , reward = %.1f' % (episode, ep_steps, ep_reward))
        if (ep_reward == 1):
            win_count += 1
        else:
            win_count = 0
        
        if win_count >= 300:
            break
        # 每隔RENDER_EPISODE个episode渲染一下看看效果
        if episode % RENDER_EPISODE == 0:
            is_render = False
        else:
            is_render = False
    # 训练结束，查看算法效果
    total_reward = 0
    for idx in range(TEST_EPISODE):
        total_reward += test_episode(env, agent)
    print(f"average reward: {total_reward / TEST_EPISODE}")
    
    policy = np.argmax(agent.Q1 + agent.Q2, axis=1)
    PolicyHelperClient.show(env, policy, file_suffix="_Sarsa8x8", snapshot_folder="./Doc/Snapshot/")
    
    env.close()

if __name__ == "__main__":
    main()


In [None]:
def main4():
    
    env = gym.make("FrozenLake-v1", desc=None, map_name="4x4", is_slippery=True, max_episode_steps=2000)  # 0 up, 1 right, 2 down, 3 left
    # env = FrozenLakeWapper(env)

    agent = SarsaAgent(
        obs_n=env.observation_space.n,
        act_n=env.action_space.n,
        learning_rate=0.1,
        gamma=0.99,
        e_greed=0.15)

    is_render = False
    win_count = 0
    for episode in range(100000):
        ep_reward, ep_steps = run_episode(env, agent, episode, is_render)
        print('Episode %s: steps = %s , reward = %.1f' % (episode, ep_steps, ep_reward))
        if (ep_reward == 1):
            win_count += 1
        else:
            win_count = 0
        
        if win_count >= 30:
            break
        # 每隔RENDER_EPISODE个episode渲染一下看看效果
        if episode % RENDER_EPISODE == 0:
            is_render = False
        else:
            is_render = False
    # 训练结束，查看算法效果
    total_reward = 0
    for idx in range(TEST_EPISODE):
        total_reward += test_episode(env, agent)
    print(f"average reward: {total_reward / TEST_EPISODE}")
    
    policy = np.argmax(agent.Q1 + agent.Q2, axis=1)
    PolicyHelperClient.show(env, policy, file_suffix="_Sarsa4x4", snapshot_folder="./Doc/Snapshot/")
    
    env.close()

main4()