In [1]:
import numpy as np
import random
import time
from IPython.display import clear_output
from copy import copy, deepcopy

In [2]:
# expose same api with gym 
# so I dont need to change the training code

class Space:
    def __init__(self, n):
        self.n = n
    def sample(self):
        return random.randint(0, self.n-1)

class WatchYourBack:
    def __init__(self, original_map, states):
        self.player_container_cells = [2, 5]
        self.states = states
        self.original_map = original_map
        self.map = None
        self.action_space = Space(4)
        self.observation_space = Space(len(states))
        self.reset()
    
    def reset(self):
        self.map = deepcopy(self.original_map)
        return self.states[self.encode_map(self.map)]
        
    def get_player_pos(self):
        for j in range(len(self.map)):
            for i in range(len(self.map[0])):
                if self.map[j][i] in self.player_container_cells:
                    return [j, i]

    def render(self):
        for row in self.map:
            print(row)
            
    def encode_map(self, m):
        ret = ''
        for r in m:
            ret = ret + ''.join([str(c) for c in r])
        return ret
    
    def close(self):
        return 0
    
    def step(self, action):
        # new_state, reward, done, info = env.step(action)
        #
        # 0: up, 1: right, 2: down, 3: left
        
        done = False
        new_state = None
        reward = 0
        info = None
        
        player_movable_cells = [0, 3]
        xy = {
            0: [0, -1],
            1: [1,  0],
            2: [0,  1],
            3: [-1, 0]
        }
        
        [delta_x, delta_y] = xy[action]
        [player_pos_y, player_pos_x] = self.get_player_pos()
        [target_pos_y, target_pos_x] = [player_pos_y + delta_y, player_pos_x + delta_x]
        
        if target_pos_x >= 0 and target_pos_x < len(self.map[0]) and \
            target_pos_y >= 0 and target_pos_y < len(self.map) and \
            self.map[target_pos_y][target_pos_x] in player_movable_cells:

            targetCellId = self.map[target_pos_y][target_pos_x]
            self.map[player_pos_y][player_pos_x] = 0

            if targetCellId == 3:
                # hit the goal
                reward = 1
                done = True
                self.map[target_pos_y][target_pos_x] = 5
                info = "hit the goal!"
            else:
                self.map[target_pos_y][target_pos_x] = 2
                info = f"player moved to {target_pos_y} {target_pos_x}"
        
        else:
            info = "player could not move"
            
        # each state has a number value
        new_state = self.states[self.encode_map(self.map)]
        
        return new_state, reward, done, info

In [3]:
def create_states_for_map(m):
    ret = {}
    

In [4]:
# states = {
#     '203000000': 0,
#     '023000000': 1,
#     '005000000': 2, # goal state
#     '003200000': 3,
#     '003020000': 4,
#     '003002000': 5,
#     '003000200': 6,
#     '003000020': 7,
#     '003000002': 8
# }
# original_map = [
#     [0, 0, 3],
#     [0, 0, 0],
#     [2, 0, 0]
# ]

states = {
    '123010000': 0,
    '105010000': 1, # goal
    '103210000': 2,
    '103012000': 3,
    '103010200': 4,
    '103010020': 5,
    '103010002': 6,
}

original_map = [
    [1, 0, 3],
    [0, 1, 0],
    [2, 0, 0]
]

env = WatchYourBack(original_map, states)

In [5]:
# Initialize q-table
# 
# rows: state space in the environment
# columns: action space.

In [6]:
action_space_size = env.action_space.n
state_space_size = env.observation_space.n

q_table = np.zeros((state_space_size, action_space_size))
q_table

array([[0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.]])

In [7]:
# Setup hyperparameters
# 
# 

num_episodes = 10000
max_steps_per_episode = 100

learning_rate = 0.1
discount_rate = 0.99

exploration_rate = 1
max_exploration_rate = 1
min_exploration_rate = 0.01
exploration_decay_rate = 0.001

In [8]:
rewards_all_episodes = []

# Q-learning algorithm
for episode in range(num_episodes):
    # initialize new episode params
    state = env.reset()
    done = False
    rewards_current_episode = 0
    
    for step in range(max_steps_per_episode): 
        # Exploration-exploitation trade-off
        exploration_rate_threshold = random.uniform(0, 1)
        
        # explore or exploit?
        if exploration_rate_threshold > exploration_rate:
            action = np.argmax(q_table[state,:])  # exploit: get highest q-value move
        else:
            action = env.action_space.sample()    # explore: select a random move
        
        # Take new action
        new_state, reward, done, info = env.step(action)

        # Update Q-table
        q_table[state, action] = q_table[state, action] * (1 - learning_rate) + \
            learning_rate * (reward + discount_rate * np.max(q_table[new_state, :]))
        
        # Set new state
        state = new_state

        # Add new reward        
        rewards_current_episode += reward 

        if done == True: 
            break

    # Exploration rate decay   
    exploration_rate = min_exploration_rate + \
        (max_exploration_rate - min_exploration_rate) * np.exp(-exploration_decay_rate*episode)
    
    # Add current episode reward to total rewards list
    rewards_all_episodes.append(rewards_current_episode)


In [9]:
# Calculate and print the average reward per thousand episodes
rewards_per_thosand_episodes = np.split(np.array(rewards_all_episodes),num_episodes/1000)
count = 1000

print("********Average reward per thousand episodes********\n")
for r in rewards_per_thosand_episodes:
    print(count, ": ", str(sum(r/1000)))
    count += 1000

********Average reward per thousand episodes********

1000 :  0.9970000000000008
2000 :  1.0000000000000007
3000 :  1.0000000000000007
4000 :  1.0000000000000007
5000 :  1.0000000000000007
6000 :  1.0000000000000007
7000 :  1.0000000000000007
8000 :  1.0000000000000007
9000 :  1.0000000000000007
10000 :  1.0000000000000007


In [10]:
q_table

array([[0.        , 0.        , 0.        , 0.        ],
       [0.        , 0.        , 0.        , 0.        ],
       [0.95099005, 0.95099005, 0.96059601, 0.95099005],
       [1.        , 0.99      , 0.9801    , 0.99      ],
       [0.95099005, 0.970299  , 0.96059601, 0.96059601],
       [0.970299  , 0.9801    , 0.970299  , 0.96059601],
       [0.99      , 0.9801    , 0.9801    , 0.970299  ]])

In [11]:
# Watch it play!

In [12]:
# Watch our agent play Frozen Lake by playing the best action 
# from each state according to the Q-table

for episode in range(3):
    # initialize new episode params
    state = env.reset()
    done = False
    print("*****EPISODE ", episode+1, "*****\n\n\n\n")
    time.sleep(1)

    for step in range(max_steps_per_episode):        
        # Show current state of environment on screen
        clear_output(wait=True)
        env.render()
        time.sleep(0.3)
    
        # Choose action with highest Q-value for current state       
        action = np.argmax(q_table[state,:])        

        # Take new action
        new_state, reward, done, info = env.step(action)
        
        if done:
            if reward == 1:
                # Agent reached the goal and won episode
                print("****You reached the goal!****")
                time.sleep(3)
            else:
                # Agent stepped in a hole and lost episode   
                time.sleep(3)
                clear_output(wait=True)
            
        # Set new state
        state = new_state
        
env.close()

[1, 0, 5]
[0, 1, 0]
[0, 0, 0]


KeyboardInterrupt: 