In [1]:
import numpy as np
import pandas as pd

In [5]:
class SarsaLambdaTable:
    def __init__(self, actions, learning_rate=0.01, reward_decay=0.9, e_greedy=0.9, trace_decay=0.9):
        self.actions = actions
        self.lr = learning_rate
        self.gamma = reward_decay
        self.epsilon = e_greedy
        self.table = pd.DataFrame(columns=self.actions, dtype=np.float64)
        self.lambda_ = trace_decay
        self.eligibility_trace = self.table.copy()

    def check_state_exist(self, state):
        if state not in self.table.index:
            s = pd.Series(
                [0] * len(self.actions),
                index=self.table.columns,
                name=state
            )
            self.table = self.table.append(s)
            self.eligibility_trace = self.eligibility_trace.append(s)
    
    def choose_action(self, observation):
        # q_table 表中不存在就添加到表中
        self.check_state_exist(observation)
        # action selection
        if np.random.uniform() < self.epsilon:
            # 提取要操作的表中的某一行
            state_action = self.table.loc[observation, :]
            # 获取数据最大的那列的名字
            action = np.random.choice(
                state_action[state_action == np.max(state_action)].index
            )
        else:
            action = np.random.choice(self.actions)
        return action
    
    def learn(self, s, a, r, s_, a_):
        self.check_state_exist(s_)
        predict = self.table.loc[s, a]
        if s_ != 'terminal':
            target = r + self.gamma * self.table.loc[s_, a_]
        else:
            target = r
        error = target - predict

        #Method 1
        # self.eligibility_trace.loc[s, a] += 1

        #Method 2
        self.eligibility_trace.loc[s, :] = 0
        self.eligibility_trace.loc[s, a] = 1 

        # Q update
        self.table += self.lr * error * self.eligibility_trace

        # decay eligibility trace after update
        self.eligibility_trace *= self.gamma * self.lambda_
    
    def get_table(self):
        return self.table

In [6]:
def update(env, RL):
    def __update():
        for episode in range(30):
            # initial observation
            observation = env.reset()
            action = RL.choose_action(str(observation))
            while True:
                # fresh env
                env.render()
                
                # RL take action and get next observation and reward
                observation_, reward, done = env.step(action)

                # RL choose action based on observation
                action_ = RL.choose_action(str(observation_))
                
                # RL learn from this transition
                RL.learn(str(observation), action, reward, str(observation_), action_)
                
                # swap observation
                observation = observation_

                action = action_
                
                if done:
                    break
        
        print(RL.get_table())
        # end of game
        print('game over')
        env.destroy()
    return __update

In [7]:
from maze_env import Maze
env = Maze()
RL = SarsaLambdaTable(actions=list(range(env.n_actions)))
env.after(20, update(env, RL))
env.mainloop()