In [1]:
import numpy as np
from collections import OrderedDict

# Rules

1. start in room `0`
2. try to get to room `max`

In [18]:
# get size of building from dict
def find_building_size(a_building_dict):
    items = a_building_dict.items()
    exit_doors = list(filter(lambda x: x[1] == 100, items))
    return exit_doors[0][0][1] + 1

# build R table
def build_R_table(building_dict):
    """
    build the R-table with information from building_dict
    :param building_dict: Dict where each key is an ordered pair (from room to room) and value is reward
        Note: -1 is for an ordered pair that is *not* valid (can't go from room to room)
    :param num_rooms: Size of building
    :return: `numpy` matrix of size `num_rooms` x `num_rooms` where the cell value is the reward
    """
    # find buildilng size
    num_rooms = find_building_size(building_dict)
    # build empty R table
    r_table = np.empty((num_rooms, num_rooms))
    # fill all with -1 (no room connections)
    r_table.fill(-1)
    # update R table with info from building dict
    for cell in building_dict.keys():
        r_table[cell] = building_dict[cell]
    return r_table

In [19]:
# build Q table
# where agent knows how many rooms
def initialize_Q_table_known(num_rooms):
    """
    initialize the Q-table when the number of rooms is known
    :param num_rooms: Size of building
    :return: `numpy` matrix of all zeros with dimensions `num_rooms` x `num_rooms`
    """
    return np.zeros((num_rooms, num_rooms))

# where agent does *not* know how many rooms
def initialize_Q_table_unknown():
    """
    initialize the Q-table when the number of rooms is *not* known
    :return: `numpy` matrix of size `1` x `1`
    """
    return np.zeros((1,1))

In [20]:
# each key is an ordered pair of room to room
# value is reward
building_dict = {
    (0,4):0,
    (1,3):0,
    (1,5):100,
    (2,3):0,
    (3,1):0,
    (3,2):0,
    (3,4):0,
    (4,0):0,
    (4,3):0,
    (4,5):100,
    (5,1):0,
    (5,4):0,
    (5,5):100
}

In [21]:
r_table = build_R_table(building_dict)
r_table

array([[  -1.,   -1.,   -1.,   -1.,    0.,   -1.],
       [  -1.,   -1.,   -1.,    0.,   -1.,  100.],
       [  -1.,   -1.,   -1.,    0.,   -1.,   -1.],
       [  -1.,    0.,    0.,   -1.,    0.,   -1.],
       [   0.,   -1.,   -1.,    0.,   -1.,  100.],
       [  -1.,    0.,   -1.,   -1.,    0.,  100.]])

In [22]:
initialized_q_table = initialize_Q_table_known(6)
initialized_q_table

array([[ 0.,  0.,  0.,  0.,  0.,  0.],
       [ 0.,  0.,  0.,  0.,  0.,  0.],
       [ 0.,  0.,  0.,  0.,  0.,  0.],
       [ 0.,  0.,  0.,  0.,  0.,  0.],
       [ 0.,  0.,  0.,  0.,  0.,  0.],
       [ 0.,  0.,  0.,  0.,  0.,  0.]])

In [23]:
def _find_valid_moves(current_state, r_table):
    """
    Finds all valid moves from one current state
    :param current_state:
    :param r_table:
    :return: `numpy` array (1-d) consisting of valid states
    """
    # find all valid moves from current_state
    valid_moves = np.where(r_table[current_state] != -1)[0]
    return valid_moves

# method for making a "explore" move
def _one_explore_move(current_state, r_table):
    """
    Defines one random move of our agent
    :param current_state: the current state of the agent
    :param r_table: R-table which shows allowed actions (i.e. any cell from start_state that is *not* -1)
    :return: an action (i.e. a room to move to)
    """
    # find all valid moves from current_state
    valid_moves = _find_valid_moves(current_state, r_table)
    # make a random choice from available moves
    choice = np.random.choice(valid_moves)
    return choice

# method for making an "exploit" move
def _one_exploit_move(current_state, r_table, q_table):
    """
    Defines one random move of our agent
    :param current_state: the current state of the agent
    :param r_table: R-table which shows allowed actions (i.e. any cell from start_state that is *not* -1)
    :param q_table: Q-table of (current) learned Q-values
    :return: an action (i.e. a room to move to)
    """
    # possible next moves
    next_moves = _find_valid_moves(current_state, r_table)
    # rebuild coordinates
    next_moves_coordinates = []
    for elem in next_moves:
        next_moves_coordinates.append((current_state, elem))
    # get current Q value for each next_move
    next_Q_values = []
    for coord in next_moves_coordinates:
        next_Q_values.append(q_table[coord])
    # find maximum Q-value from next moves
    max_next_move = np.argmax(next_Q_values)
    return next_moves_coordinates[max_next_move][1]

# method for making a move
def _one_move(explore_rate, current_state, r_table, q_table):
    """
    Make one move
    :param explore_rate: percentage of time to "explore" (i.e. make random move)
    :param current_state:
    :param r_table:
    :param q_table:
    :return: an action (i.e. a room to move to), whether it was explore or exploit
    """
    # determine if explore or exploit
    rand_number = np.random.randint(0, 100)
    if rand_number > (explore_rate * 100):
        # exploit
        action = _one_exploit_move(current_state, r_table, q_table)
        action_type = "exploit"
    else:
        # explore
        action = _one_explore_move(current_state, r_table)
        action_type = "explore"
    return action, action_type

In [24]:
def update_Q(q_table, r_table, gamma, current_state, action):
    """
    Updates Q-table after *one* move
    :param q_table:
    :param r_table:
    :param gamma:
    :param current_state:
    :param action:
    :return: an updated q_table
    """
    # calculate instant reward: R[current_state, action]
    instant_reward = r_table[current_state, action]
    # if action is invalid, exit
    if instant_reward == -1:
        return None
    else:
        max_next_move_index = _one_exploit_move(action, r_table, q_table)
        max_next_move_value = q_table[action, max_next_move_index]
        # update q_table (with a copy)
        q_table_copy = np.copy(q_table)
        q_table_copy[current_state, action] = instant_reward + gamma * max_next_move_value
        return q_table_copy

In [25]:
# method for entire move, including q_update
def make_move(explore_rate, current_state, r_table, gamma, q_table):
    """
    Make one move *and* update q-table
    :param explore_rate: percentage of time to "explore" (i.e. make random move)
    :param current_state:
    :param r_table:
    :param gamma:
    :param q_table:    
    :return: (action, updated_q_table)
    """
    # make move
    action, action_type = _one_move(explore_rate, current_state, r_table, q_table)
    # update q-table
    updated_q = update_Q(q_table, r_table, gamma, current_state, action)
    return (action, action_type, updated_q)

In [26]:
def play_round(q_table, r_table, gamma, explore_rate):
    """
    Play one round of game, starting at a random room until agent gets to room 5
    :param q_table:
    :param r_table:
    :param gamma:
    :param explore_rate:
    :return: ???
    """
    # find exit room
    exit_room = r_table.shape[0] - 1
    # randomize starting room
    starting_room = 0
    # build variables for housing history
    moves_history = [starting_room]
    action_type_history = [None]
    q_table_history = [q_table]
    # make first move
    action, action_type, q_update = make_move(explore_rate, starting_room, r_table, gamma, q_table)
    # add to history
    moves_history.append(action)
    action_type_history.append(action_type)
    q_table_history.append(q_update)
    # continue moving through rooms until reach exit
    while action != exit_room:
        action, action_type, q_update = make_move(explore_rate, action, r_table, gamma, q_update)
        moves_history.append(action)
        action_type_history.append(action_type)
        q_table_history.append(q_update)
    # if at exit
#     print("agent required {} moves before reaching exit".format(len(moves_history) - 1))
    # determine actual explore rate
    actual_explore_count = action_type_history.count("explore") 
    actual_explore_rate = float(actual_explore_count) / float(len(action_type_history))
#     print("agent's actual explore rate was {} ".format(actual_explore_rate))
    return moves_history, action_type_history, q_table_history

In [11]:
# TODO fix bug
def learn(num_rounds, explore_rate, gamma, building_dict, explore_rate_decay=1, gamma_decay=1):
    """
    Let agent learn the maze
    :param num_rounds: Maximum number of rounds to play
    :param explore_rate:
    :param gamma:
    :param building_dict:
    :param explore_rate_decay:
    :param gamma_decay:
    :return: [list of rounds], where round = (moves_history, action_type_history, q_table_history)
    """
    # get exit door
    building_size = find_building_size(building_dict)
    # build r_table
    r_table = build_R_table(building_dict)
    round_history = []
    # play first round
    q_table = initialize_Q_table_known(building_size)
    moves_history, action_type_history, q_table_history = play_round(q_table, r_table, gamma, explore_rate)
    round_history.append((moves_history, action_type_history, q_table_history))
    # loop through remaining rounds
    converge_counter = 0
    rounds = 0
    for i in range(1,num_rounds):
        while np.sum(q_table) != np.sum(round_history[-1][2]) and converge_counter < 10:
            rounds += 1
            # take last q-table from previous round
            q_table = q_table_history[-1]
            moves_history, action_type_history, q_table_history = play_round(q_table, r_table, gamma, explore_rate)
            round_history.append((moves_history, action_type_history, q_table_history))
            # check for converging Q-table
            if np.sum(round_history[-2][2]) != np.sum(round_history[-1][2]):
                converge_counter += 1
    print("Q-table converged in less than {} rounds".format(rounds))
    return round_history

    

In [27]:
# let the agent go through the maze til it learns the fastest route
hist = learn(1000, .5, .8, building_dict)
converged_q = hist[-1][-1][-1]
converged_q

Q-table converged in less than 16 rounds


array([[   0.,    0.,    0.,    0.,   80.,    0.],
       [   0.,    0.,    0.,    0.,    0.,  100.],
       [   0.,    0.,    0.,    0.,    0.,    0.],
       [   0.,   80.,    0.,    0.,    0.,    0.],
       [   0.,    0.,    0.,   64.,    0.,  100.],
       [   0.,    0.,    0.,    0.,    0.,    0.]])

In [28]:
# play as the agent with "0 explore"
m, _, _ = play_round(converged_q, r_table, 1, 0)
m

[0, 4, 5]

# play as agent with *no knowledge of how many rooms

In [15]:
#TODO update - make fewer connections to exit door
def build_random_building(num_rooms, num_doors):
    """
    Builds a building_dict for a random building, with exit at last room
    :param num_rooms:
    :param num_doors: Total number of doors in the building
    :return: building_dict
    """
    # initialize building_dict
    building_dict = OrderedDict()
    # set exit room
    exit_room = num_rooms - 1
    # determine minimum number of doors needed to get through building
    min_doors = num_rooms
    # keep track of total doors
    total_doors = 0
    # keep track of doors to exit room
    exit_room_doors = 0
    
    if min_doors > num_doors:
        print("must have at least as many doors as rooms")
        return None
    
    # helper functions
    def make_door(building_dict, current_room, allow_exit):
        """
        randomly pick room to build door to
        """
        random_room = current_room
        while random_room == current_room and (current_room, random_room) not in building_dict:
            if allow_exit:
                random_room = np.random.randint(0, num_rooms)
            else:
                random_room = np.random.randint(0, num_rooms - 1)
        return random_room
    
    def go_through_door():
        """
        flip a coin to decide whether to go through door when building,
        or build another door from same room
        """
        coin_flip = np.random.randint(0,2)
        if coin_flip:
            return True
        else:
            return False
    
    def update_building_dict(building_dict, current_room, new_room):
        """
        Updates building_dict in place
        """
        if new_room == exit_room:
            building_dict[(current_room, new_room)] = 100
        else:
            building_dict[(current_room, new_room)] = 0
            building_dict[(new_room, current_room)] = 0
            
    # start in room 0
    current_room = 0
    # repeat until finished
    while total_doors < num_doors or exit_room_doors < 1:
        building_dict_keys_size = len(building_dict.keys())
        # only allow exit door if 75% of doors have already been built
        if float(building_dict_keys_size) / float(num_doors) > .99:
            new_room = make_door(building_dict, current_room, True)
        else:
            new_room = make_door(building_dict, current_room, False)
        # add to building_dict
        update_building_dict(building_dict, current_room, new_room)
        # update counters
        if len(building_dict.keys()) > building_dict_keys_size:
            total_doors += 1
        if new_room == exit_room:
            exit_room_doors += 1
        # decide whether to advance
        advance = go_through_door()
        if advance:
            current_room = new_room
    return building_dict

In [16]:
# make a random building
rand_building_dict = build_random_building(500, 600)
rand_r_table = build_R_table(rand_building_dict)

In [17]:
# let the agent go through the random maze til it learns the fastest route
hist = learn(100, .5, .9, rand_building_dict)
rand_converged_q = hist[-1][-1][-1]
print(rand_converged_q)

ValueError: attempt to get argmax of an empty sequence

In [None]:
# play as the agent with "0 explore"
m, _, _ = play_round(rand_converged_q, rand_r_table, 1, 0)
m

In [None]:
rand_building_dict