In [1]:
import numpy as np
import random


class House(object):
    def __init__(self):
        self.state = [0, 1, 2, 3, 4, 5]
        self.goal_state = [5]
        self.action = [
            [4],
            [3, 5],
            [3],
            [1, 2, 4],
            [0, 3, 5],
            [1, 4, 5]
        ]
        
        self.reward = {0: {4: 0},
                        1: {3: 0,
                            5: 100},
                        2: {3: 0},
                        3: {1: 0,
                            2: 0,
                            4: 0},
                        4: {0: 0,
                            3: 0,
                            5: 100},
                        5: {1: 0,
                            4: 0,
                            5: 100}
                        }

    def num_state(self):
        return len(self.state)

    def get_actions(self, state):
        return self.action[state]

    def get_reward(self, state, action):
        if not self.reward.has_key(state):
            return -1

        if not self.reward[state].has_key(action):
            return -1

        return self.reward[state][action]

    def get_init_state(self):
        return random.randint(0, self.num_state() - 1)

    def is_goal(self, state):
        if state in self.goal_state:
            return True
        return False


class Agent(object):
    def __init__(self, env, q_function):
        self.state = 0
        self.trajectory = []
        self.env = env
        self.q_function = q_function

    def begin(self):
        init_state = self.env.get_init_state()
        self.state = init_state
        self.trajectory = []

    def do_epsilon_greedy(self):
        e = 1.0  # random
        pivot = np.random.uniform()
        if pivot < e:
            action_list = self.env.get_actions(self.state)
            action = random.choice(action_list)
        else:
            action = self.q_function.get_max_action(self.state)
        reward = self.env.get_reward(self.state, action)
        self.trajectory.append((self.state, action, reward))
        self.state = action

    def finalize_trajectory(self):
        self.trajectory.append((self.state, -1, -1))

    def is_goal(self):
        return self.env.is_goal(self.state)

    def reward(self):
        return sum([trj[2] for trj in self.trajectory])

    def make_experience(self):
        self.begin()
        while not self.is_goal():
            self.do_epsilon_greedy()
        self.finalize_trajectory()


class QFunction(object):
    def __init__(self, env):
        self.q_tbl = np.zeros([env.num_state(), env.num_state()])
        self.env = env

    def get_value(self, state, action):
        return self.q_tbl[state][action]

    def get_max_value(self, state):
        return max(self.q_tbl[state])

    def get_max_action(self, state):
        act_list = self.env.get_actions(state)
        q_list = [self.q_tbl[state][act] for act in act_list]
        max_act_id = np.argmax(q_list)
        return act_list[max_act_id]

    def update_value(self, state, action, value):
        self.q_tbl[state][action] = value

    def update(self, trajectory, gamma=0.8):
        q_tbl_before = self.q_tbl.copy()
        len_traj = len(trajectory)

        updated = False
        j = 0
        while j < len_traj - 1:
            state, action, reward = trajectory[j]

            next_state = trajectory[j + 1][0]

            max_q = max(q_tbl_before[next_state])
            new_q = reward + gamma * max_q

            if new_q != q_tbl_before[state][action]:
                updated = True

            self.update_value(state, action, new_q)

            j += 1

        return updated

    def print_learned(self):
        max_val = self.q_tbl.max()
        val = (self.q_tbl / max_val * 100).astype(dtype=np.uint)
        print(val)

In [2]:
def q_learning():
    env = House()
    q_function = QFunction(env)
    agent = Agent(env, q_function)

    no_update_count = 0
    max_episode = 50
    i = 0
    while i < max_episode:
        agent.make_experience()

        updated = q_function.update(agent.trajectory)

        if updated == False:
            no_update_count += 1
        else:
            print('episode %d: %s' % (i, agent.trajectory))
            q_function.print_learned()

        if no_update_count == 10:
            break

        i += 1


if __name__=='__main__':
    q_learning()

episode 0: [(0, 4, 0), (4, 3, 0), (3, 4, 0), (4, 0, 0), (0, 4, 0), (4, 3, 0), (3, 1, 0), (1, 5, 100), (5, -1, -1)]
[[  0   0   0   0   0   0]
 [  0   0   0   0   0 100]
 [  0   0   0   0   0   0]
 [  0   0   0   0   0   0]
 [  0   0   0   0   0   0]
 [  0   0   0   0   0   0]]
episode 1: [(2, 3, 0), (3, 4, 0), (4, 0, 0), (0, 4, 0), (4, 0, 0), (0, 4, 0), (4, 3, 0), (3, 1, 0), (1, 5, 100), (5, -1, -1)]
[[  0   0   0   0   0   0]
 [  0   0   0   0   0 100]
 [  0   0   0   0   0   0]
 [  0  80   0   0   0   0]
 [  0   0   0   0   0   0]
 [  0   0   0   0   0   0]]
episode 2: [(0, 4, 0), (4, 3, 0), (3, 1, 0), (1, 3, 0), (3, 1, 0), (1, 5, 100), (5, -1, -1)]
[[  0   0   0   0   0   0]
 [  0   0   0  64   0 100]
 [  0   0   0   0   0   0]
 [  0  80   0   0   0   0]
 [  0   0   0  64   0   0]
 [  0   0   0   0   0   0]]
episode 3: [(3, 4, 0), (4, 5, 100), (5, -1, -1)]
[[  0   0   0   0   0   0]
 [  0   0   0  64   0 100]
 [  0   0   0   0   0   0]
 [  0  80   0   0  51   0]
 [  0   0   0  64   