## General imports and config

In [None]:
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')


import sys
import time
import numpy as np
import pandas as pd
import tkinter as tk

rl_weight_filename = '5x4_Ep200_Q_Learning.csv'

## Maze 2D Environment

In [None]:
"""
2D Maze Environment
"""

class Maze2DEnv(tk.Tk, object):
    def __init__(self, config, *args, **kwargs):
        super(Maze2DEnv, self).__init__(*args, **kwargs)
        
        self._name = config.get('name', 'Maze2D')
        self.title = self._name
        
        # render frequency, default 0.5s for rendering interval
        self._refresh_interval = config.get('refresh_interval', 0.5)
        
        # unit width and height
        self._unit_width = config.get('unit_pixel', 35)
        self._unit_height = config.get('unit_pixel', 35)
        
        # 2D shape, eg: cols x rows: 5 x 4
        self._shape = config.get('shape', (5, 4))
        
        # space for actions
        self._action_space = ['left', 'right', 'up', 'down']
        self._n_actions = len(self._action_space)
        
        screen_width, screen_height = self.winfo_screenwidth(), self.winfo_screenheight()
        window_width, window_height = self._shape[0] * self._unit_width, self._shape[1] * self._unit_height
        self.geometry('%dx%d+%d+%d' % (window_width, window_height, screen_width - window_width, screen_height- window_height))
        
        # init objects in maze
        self._maze_objects = config.get('maze_objects')

        # observation, default (0, 0) (from the very left/top position)
        self._origin = config.get('origin', (0, 0))
        self.obs = self._origin
        
        # draw maze
        self._draw_maze();
        
#         self.bind("<Left>", lambda e: self._move_obs(0))
#         self.bind("<Right>", lambda e: self._move_obs(1))
#         self.bind("<Up>", lambda e: self._move_obs(2))
#         self.bind("<Down>", lambda e: self._move_obs(3))
    
    def _draw_maze(self):
        # canvas
        self.canvas = tk.Canvas(self, bg='white',
                                width=self._shape[0] * self._unit_width,
                                height=self._shape[1] * self._unit_height)
        self.canvas.pack()
        self.redraw_all(self.canvas)
        
    def redraw_all(self, canvas):
        # clear all objects on the canvas
        canvas.delete("all")
        
        # draw grids
        for x in range(0, self._shape[0] * self._unit_width, self._unit_width):
            x0, y0, x1, y1 = x, 0, x, self._shape[1] * self._unit_height
            canvas.create_line(x0, y0, x1, y1)
        for y in range(0, self._shape[1] * self._unit_height, self._unit_height):
            x0, y0, x1, y1 = 0, y, self._shape[0] * self._unit_width, y
            canvas.create_line(x0, y0, x1, y1)
            
        # draw mines, black square
        # draw treasure, yellow square
        for key, value in self._maze_objects.items():
            color = 'black'
            if value == 'treasure':
                color = 'yellow'
            elif value == 'mine':
                color = 'black'
                
            canvas.create_rectangle(
                key[0] * self._unit_width + 2, key[1] * self._unit_height + 2,
                (key[0] + 1) * self._unit_width - 2, (key[1] + 1) * self._unit_height - 2,
                fill=color)

        # draw obs, grey square
        canvas.create_rectangle(
            self.obs[0] * self._unit_width + 2, self.obs[1] * self._unit_height + 2,
            (self.obs[0] + 1) * self._unit_width - 2, (self.obs[1] + 1) * self._unit_height - 2,
            fill='grey')
        
    def _conflict_check(self, obs):
        conflict = obs in self._maze_objects.keys()
        obj = self._maze_objects.get(obs, None)
        return conflict, obj
    
    def reset(self):
        # reset observation
        self.obs = self._origin
        self.render()
        return self.obs

    def step(self, action):
        # init
        obs_ = list(self.obs)
        reward = 0
        done = False
        
        # action move, 
        if action == 0:  # 'left'
            obs_[0] -= 1
            if obs_[0] < 0:
                obs_[0] = 0
        elif action == 1:  # 'right'
            obs_[0] += 1
            if obs_[0] > self._shape[0] - 1:
                obs_[0] = self._shape[0] - 1
        elif action == 2:  # 'up'
            obs_[1] -= 1
            if obs_[1] < 0:
                obs_[1] = 0
        elif action == 3:  # 'down'
            obs_[1] += 1
            if obs_[1] > self._shape[1] - 1:
                obs_[1] = self._shape[1] - 1
        else:
            raise Exception('invalid action code', action)

        # conflict check
        obs_ = tuple(obs_)
        conflict, obj = self._conflict_check(obs_)
        if conflict:
            if obj == 'treasure':
                obs_ = obj
                reward = 1
                done = True
            elif obj == 'mine':
                obs_ = obj
                reward = -1
                done = True
            else:
                raise Exception('invalid object', obj)
        
        self.obs = obs_
        
        return obs_, reward, done
        
    def render(self):
        time.sleep(self._refresh_interval)
        self.redraw_all(self.canvas)
        self.update()
    
    @property
    def n_actions(self):
        return self._n_actions
    
    @property
    def name(self):
        return self._name

In [None]:
"""
Agent with Q Learning algorithm

Q(s, a) <- Q(s, a) + alpha * (R + gamma * maxQ(s_, a_) - Q(s, a))
s <- s_

alpha: learning rate
gamma: reward decay

q_target: R + gamma * maxQ(s_, a_)
q_predict: Q(s, a)
"""

class QLearning(object):
    def __init__(self, actions, alpha=0.1, gamma=0.9, eplison_greedy=0.9, *args, **kwargs):
        self._name = 'Q Learning'
        
        # init available actions
        self._actions = actions
        
        # init learning paras
        self._alpha = alpha
        self._gamma = gamma
        self._eplison_greedy = eplison_greedy
        
        # init Q table
        self._q_table = pd.DataFrame(columns=actions, dtype=np.float)
    
    def _check_state_available(self, obs):
        # add obs to q_table if q_table does not contain it
        if obs not in self._q_table.index:
            self._q_table = self._q_table.append(
                pd.Series(
                    [0] * len(self._actions),
                    index=self._q_table.columns,
                    name=obs
                )
            )

    def choose_action(self, obs):
        # type of obs should be str
        self._check_state_available(obs)
        
        # e-greedy policy for choosing action
        if np.random.uniform() < self._eplison_greedy:
            # p < 0.9, choose maxQ(s_, a_) for next move
            obs_actions = self._q_table.loc[obs, :]
            action = np.random.choice(obs_actions[obs_actions == np.max(obs_actions)].index)
        else:
            # p >= 0.9, choose random move
            action = np.random.choice(self._actions)
        
        return action
    
    def learn(self, obs, a, r, obs_):
        self._check_state_available(obs_)
        
        if obs_ in ['treasure', 'mine']:
            q_target = r
        else:
            q_target = r + self._gamma * self._q_table.loc[obs_, :].max()
        
        q_predict = self._q_table.loc[obs, a]
        
        self._q_table.loc[obs, a] = q_predict + self._alpha * (q_target - q_predict)
    
    def __str__(self):
        return self._q_table.__str__()
    
    @property
    def name(self):
        return self._name
    
    def save(self, filename):
        self._q_table.to_csv(filename)
        logging.debug('save weight to file %s:\n%s' % (filename, self._q_table))
    
    def load(self, filename):
        self._q_table = pd.read_csv(filename, index_col=0, names=self._actions, header=0)
        logging.debug('load weight from file %s:\n%s' % (filename, self._q_table))

## Training

In [None]:
# train
def train(config, env, agent):
    n_episode = config.get('N_EPISODE', 100)
    n_max_step_episode = config.get('N_MAX_STEP_PER_EPISODE', 100)
    # N_EPISODE episode loop
    for i_episode in range(n_episode):
        logging.info("[Episode %d]" % (i_episode + 1))

        obs = env.reset()
        # N_MAX_STEP_PER_EPISODE step loop
        for i_step in range(n_max_step_episode):
            env.render()

            # choose next action
            action = agent.choose_action(str(obs))

            # move and feedback from env
            obs_, reward, done = env.step(action)

            # update q_table
            rl.learn(str(obs), action, reward, str(obs_))

            obs = obs_

            # show q_table
            if done or i_step == n_max_step_episode - 1:
                if done:
                    if reward > 0:
                        disp_state = 'WIN'
                    else:
                        disp_state = 'BUSTED'
                else:
                    disp_state = 'LOST'
                
                logging.info('Episode %d terminated with %d steps, finally %s' % (i_episode + 1, i_step + 1, disp_state))
                logging.info('%s:\n%s\n' % (rl.name, rl))

            # earlier out loop
            if done:
                break
                
    env.destroy()    
    agent.save(''.join(rl_weight_filename))

# configs
train_config = {
    'N_EPISODE': 200,
    'N_MAX_STEP_PER_EPISODE': 100,
}
    
maze_config = {
    'name': 'Maze2D',
    'shape': (5, 4),
    'maze_objects': {
        (1, 1): 'mine',
        (1, 2): 'mine',
        (2, 1): 'mine',
        (3, 3): 'mine',
        (4, 1): 'mine',
        (4, 3): 'treasure',  # treasure
    },
    'origin': (0, 0),
    'refresh_interval': 0.05,
    'unit_pixel': 35,
}

# train process
env = Maze2DEnv(config=maze_config)
rl = QLearning(list(range(env.n_actions)))

env.after(100, train, train_config, env, rl)
env.mainloop()

## Test

In [None]:
# test run
def test(config, env, agent):
    logging.info('Simulation')
    obs = env.reset()
    i_step = 0
    while True:
        env.render()
        i_step += 1
        action = agent.choose_action(str(obs))
        obs_, reward, done = env.step(action)
        
        # skip learning phrase
        obs = obs_
        
        if done:
            if reward > 0:
                disp_state = 'WIN'
            else:
                disp_state = 'BUSTED'
            
            logging.info('Terminated with %d steps, finally %s' % (i_step, disp_state))
            break
    
    env.destroy()

# configs
maze_config = {
    'name': 'Maze2D',
    'shape': (5, 4),
    'maze_objects': {
        (1, 1): 'mine',
        (1, 2): 'mine',
        (2, 1): 'mine',
        (3, 3): 'mine',
        (4, 1): 'mine',
        (4, 3): 'treasure',  # treasure
    },
    'origin': (0, 0),
    'refresh_interval': 0.5,
    'unit_pixel': 35,
}

# simulation process
env = Maze2DEnv(config=maze_config)
rl = QLearning(list(range(env.n_actions)))
rl.load(''.join(rl_weight_filename))

env.after(100, test, None, env, rl)
env.mainloop()