In [45]:
# maze_env.py

import numpy as np
import time
import sys
if sys.version_info.major == 2:
    import Tkinter as tk
else:
    import tkinter as tk

# %matplotlib inline
# %config InlineBackend.figure_format = 'retina'

UNIT = 40   # pixels
MAZE_H = 4  # grid height
MAZE_W = 4  # grid width


class Maze(tk.Tk, object):
    def __init__(self):
        super(Maze, self).__init__()
        self.action_space = ['u', 'd', 'l', 'r']
        self.n_actions = len(self.action_space)
        self.title('maze')
        self.geometry('{0}x{1}'.format(MAZE_H * UNIT, MAZE_H * UNIT))
        self._build_maze()

    def _build_maze(self):
        self.canvas = tk.Canvas(self, bg='white',
                           height=MAZE_H * UNIT,
                           width=MAZE_W * UNIT)

        # create grids
        for c in range(0, MAZE_W * UNIT, UNIT):
            x0, y0, x1, y1 = c, 0, c, MAZE_H * UNIT
            self.canvas.create_line(x0, y0, x1, y1)
        for r in range(0, MAZE_H * UNIT, UNIT):
            x0, y0, x1, y1 = 0, r, MAZE_H * UNIT, r
            self.canvas.create_line(x0, y0, x1, y1)

        # create origin
        origin = np.array([20, 20])

        # hell
        hell1_center = origin + np.array([UNIT * 2, UNIT])
        self.hell1 = self.canvas.create_rectangle(
            hell1_center[0] - 15, hell1_center[1] - 15,
            hell1_center[0] + 15, hell1_center[1] + 15,
            fill='black')
        # hell
        hell2_center = origin + np.array([UNIT, UNIT * 2])
        self.hell2 = self.canvas.create_rectangle(
            hell2_center[0] - 15, hell2_center[1] - 15,
            hell2_center[0] + 15, hell2_center[1] + 15,
            fill='black')

        # create oval
        oval_center = origin + UNIT * 2
        self.oval = self.canvas.create_oval(
            oval_center[0] - 15, oval_center[1] - 15,
            oval_center[0] + 15, oval_center[1] + 15,
            fill='yellow')

        # create red rect
        self.rect = self.canvas.create_rectangle(
            origin[0] - 15, origin[1] - 15,
            origin[0] + 15, origin[1] + 15,
            fill='red')

        # pack all
        self.canvas.pack()

    def reset(self):
        self.update()
        time.sleep(0.5)
        self.canvas.delete(self.rect)
        origin = np.array([20, 20])
        self.rect = self.canvas.create_rectangle(
            origin[0] - 15, origin[1] - 15,
            origin[0] + 15, origin[1] + 15,
            fill='red')
        # return observation
        return self.canvas.coords(self.rect)

    def step(self, action):
        s = self.canvas.coords(self.rect)
        base_action = np.array([0, 0])
        if action == 0:   # up
            if s[1] > UNIT:
                base_action[1] -= UNIT
        elif action == 1:   # down
            if s[1] < (MAZE_H - 1) * UNIT:
                base_action[1] += UNIT
        elif action == 2:   # right
            if s[0] < (MAZE_W - 1) * UNIT:
                base_action[0] += UNIT
        elif action == 3:   # left
            if s[0] > UNIT:
                base_action[0] -= UNIT

        self.canvas.move(self.rect, base_action[0], base_action[1])  # move agent

        s_ = self.canvas.coords(self.rect)  # next state

        # reward function
        if s_ == self.canvas.coords(self.oval):
            reward = 1
            done = True
            s_ = 'terminal'
        elif s_ in [self.canvas.coords(self.hell1), self.canvas.coords(self.hell2)]:
            reward = -1
            done = True
            s_ = 'terminal'
        else:
            reward = 0
            done = False

        return s_, reward, done

    def render(self):
        time.sleep(0.1)
        self.update()


# def update():
#     for t in range(10):
#         s = env.reset()
#         while True:
#             env.render()
#             a = 1
#             s, r, done = env.step(a)
#             if done:
#                 break

# if __name__ == '__main__':
#     env = Maze()
#     env.after(100, update)
#     env.mainloop()

# RL_brain.py

import numpy as np
import pandas as pd


class QLearningTable:
    # 初始化
    def __init__(self, actions, learning_rate=0.01, reward_decay=0.9, e_greedy=0.9):
        self.actions = actions  # a list
        self.lr = learning_rate # 学习率
        self.gamma = reward_decay # 奖励衰减
        self.epsilon = e_greedy  # 贪婪度
        self.q_table = pd.DataFrame(columns=self.actions, dtype=np.float64) # 初始化 q_table

    # 选行为
    def choose_action(self, observation):
        self.check_state_exist(observation) # 检测本 state 是否在 q_table 中存在 
        
        # 选择 action
        if np.random.uniform() < self.epsilon:  
            # 选择 Q value 最高的 action
            # print "observation: {}".format(observation)
            state_action = self.q_table.loc[observation, :]
            # print "q_table: {}".format(self.q_table)
            # print "state_action0: {}".format(state_action)
            
            # 同一个 state, 可能会有多个相同的 Q action value, 所以我们乱序一下
            state_action = state_action.reindex(np.random.permutation(state_action.index)) 
            # print "state_action1: {}".format(state_action)
            action = state_action.idxmax()
            # print "action: {}".format(action)
        else:  
            # 随机选择 action
            action = np.random.choice(self.actions)
        return action

    # 学习更新参数
    def learn(self, s, a, r, s_):
        print "s: {}    a: {}    r: {}    s_: {}".format(s, a, r, s_)
        self.check_state_exist(s_)  # 检测 q_table 中是否存在 s_ 
        q_predict = self.q_table.loc[s, a]
        # print "q_predict: {}".format(q_predict)
        if s_ != 'terminal':
            q_target = r + self.gamma * self.q_table.loc[s_, :].max()  # 下个 state 不
        else:
            q_target = r  # 下个 state 是终止符
        self.q_table.loc[s, a] += self.lr * (q_target - q_predict)  # 更新对应的 state-action 值

    # 检测 state 是否存在
    def check_state_exist(self, state):
        if state not in self.q_table.index:
            # append new state to q table
            self.q_table = self.q_table.append(
                pd.Series(
                    [0]*len(self.actions),
                    index=self.q_table.columns,
                    name=state,
                )
            )
            
# run_this.py

# from maze_env import Maze
# from RL_brain import QLearningTable


def update():
    # 学习回合数
    for episode in range(6):
        # 初始化 state 的观测值
        observation = env.reset()

        while True:
            # 更新可视化环境
            env.render()

            # RL 智能体根据 state 的观测值挑选 action
            action = RL.choose_action(str(observation))
            # print "observation: {}".format(observation)

            # 探索者在环境中实施这个 action, 并得到环境返回的下一个 
            # state 观测值, reward 和 done (是否是掉下地狱或者升上天堂)
            observation_, reward, done = env.step(action)
            # print "observation_: {}    reward: {}    done: {}".format(observation_, reward, done)

            # RL 从这个序列 (state, action, reward, state_) 中学习
            RL.learn(str(observation), action, reward, str(observation_))

            # 将下一个 state 的值传到下一次循环
            observation = observation_

            # 如果掉下地狱或者升上天堂, 这回合就结束了
            if done:
                break

    # 结束游戏并关闭窗口
    print('game over')
    print "q_table: {}".format(RL.q_table)
    env.destroy()

if __name__ == "__main__":
    # 定义环境 env 和 RL 方式
    env = Maze()
    RL = QLearningTable(actions=list(range(env.n_actions)))

    # 开始可视化环境 env
    env.after(100, update)
    env.mainloop()

s: [5.0, 5.0, 35.0, 35.0]    a: 0    r: 0    s_: [5.0, 5.0, 35.0, 35.0]
s: [5.0, 5.0, 35.0, 35.0]    a: 2    r: 0    s_: [45.0, 5.0, 75.0, 35.0]
s: [45.0, 5.0, 75.0, 35.0]    a: 2    r: 0    s_: [85.0, 5.0, 115.0, 35.0]
s: [85.0, 5.0, 115.0, 35.0]    a: 0    r: 0    s_: [85.0, 5.0, 115.0, 35.0]
s: [85.0, 5.0, 115.0, 35.0]    a: 0    r: 0    s_: [85.0, 5.0, 115.0, 35.0]
s: [85.0, 5.0, 115.0, 35.0]    a: 3    r: 0    s_: [45.0, 5.0, 75.0, 35.0]
s: [45.0, 5.0, 75.0, 35.0]    a: 0    r: 0    s_: [45.0, 5.0, 75.0, 35.0]
s: [45.0, 5.0, 75.0, 35.0]    a: 0    r: 0    s_: [45.0, 5.0, 75.0, 35.0]
s: [45.0, 5.0, 75.0, 35.0]    a: 3    r: 0    s_: [5.0, 5.0, 35.0, 35.0]
s: [5.0, 5.0, 35.0, 35.0]    a: 0    r: 0    s_: [5.0, 5.0, 35.0, 35.0]
s: [5.0, 5.0, 35.0, 35.0]    a: 1    r: 0    s_: [5.0, 45.0, 35.0, 75.0]
s: [5.0, 45.0, 35.0, 75.0]    a: 0    r: 0    s_: [5.0, 5.0, 35.0, 35.0]
s: [5.0, 5.0, 35.0, 35.0]    a: 0    r: 0    s_: [5.0, 5.0, 35.0, 35.0]
s: [5.0, 5.0, 35.0, 35.0]    a: 2    r: 0 

In [15]:
actions = [0, 1, 2, 3]

q_table = pd.DataFrame(columns=actions, dtype=np.float64)


Empty DataFrame
Columns: [0, 1, 2, 3]
Index: []


In [32]:
np.random.uniform()

0.7262670856091131

In [38]:
import logging

# 通过下面的方式进行简单配置输出方式与日志级别
logging.basicConfig(filename='logger.log', level=logging.DEBUG)

logging.debug('debug message')
logging.info('info message')
logging.warn('warn message')
logging.error('error message')
logging.critical('critical message')

In [36]:
%ls

[0m[01;32mabel-regular.ttf[0m*  [01;34mimages[0m/     my_robot_maze.ipynb  Runner.py
[01;35mdefault2.png[0m       logger.log  README.md            [01;34mtest_world[0m/
[01;35mdefault.png[0m        Maze.py     robot_maze.ipynb
environment.yml    Maze.pyc    Robot.py


In [39]:
%cat logger.log

INFO:root:info message
ERROR:root:error message
CRITICAL:root:critical message
INFO:root:info message
ERROR:root:error message
CRITICAL:root:critical message


In [54]:
qtable = { 0:{'u':12, 'd':11, 'r':13, 'l':14},
           1:{'u':2, 'd':1, 'r':3, 'l':4}}

qtable[2] = {'u':2, 'd':1, 'r':3, 'l':4}

print qtable

{0: {'r': 13, 'u': 12, 'd': 11, 'l': 14}, 1: {'r': 3, 'u': 2, 'd': 1, 'l': 4}, 2: {'r': 3, 'u': 2, 'd': 1, 'l': 4}}
