In [202]:
from gym.envs.registration import register, spec
import gym
import numpy as np

In [203]:
MY_ENV_NAME='FrozenLakeNonskid4x4-v5'

# 0 = left; 1 = down; 2 = right;  3 = up
# reward of 1 if success, no reward otherwise
# done = True even if not success due to timestep limit

register(
    id=MY_ENV_NAME,
    entry_point='gym.envs.toy_text:FrozenLakeEnv',
    kwargs={'map_name': '4x4', 'is_slippery': False},
    reward_threshold=0.78
)

In [204]:
env = gym.make(MY_ENV_NAME)

In [271]:
class Node(object):
    def __init__(self):
        self.q = 0  # win count
        self.n = 0  # play count

def select_action(s, memory, env):
    """
    return action corresponding to the child with highest ucb.
    
    s is the current state
    """
    # find all actions that would lead to a valid state
    action_state = []
    for a in range(3):
        s_new, _, _, _ = env.step(a)
        
        # scenario 1: action does not lead to a valid state
        if s_new == s:
            pass
        # scenario 2: action could lead to a new state
        else:
            action_state.append((a, s_new))
        env.s = s  # reset env to original state
    
    # if we havent seen that child before, then go with it
    for ap in action_state:
        a = ap[0]
        s2 = ap[1]
        if s2 in memory.keys():
            pass
        else:
            return a  # this action leads to a state that we havent seen before
        
    # if we've seen all possible children, return action leading to child with the highest UCB
    action = None
    max_ucb = 0
    for ap in action_state:
        child = memory[ap[1]]
        parent = memory[s]
        score = calculate_ucb(parent, child)
        if score > max_ucb:
            action = ap[0]
            max_ucb = score  # new score to beat
    if action is not None:
        return action
    else:
        actions = [ap[0] for ap in action_state]
        return np.random.choice(actions)
    
def calculate_ucb(parent, child):
    c_param = .14
    term_1 = (child.q / child.n)  # can be interpreted as the value estimate
    term_2 = c_param * np.sqrt((2 * np.log(parent.n) / child.n))
    return  term_1 + term_2

In [283]:
n_rollouts = 6
starting_point = 14
memory = {starting_point: Node()}

for rollout in range(n_rollouts):
    print(">> rollout: ", rollout)
    trace = [starting_point]
    env.s = starting_point
    s = starting_point
    done = False
    steps = 0

    while not done and steps < 50:
        a = select_action(s, memory, env)
        s, r, done, _ = env.step(a)
        steps += 1

        # if we've seen s before
        if s in memory:
            pass
        else:
            memory[s] = Node()

        # add to trace - dont worry about double counting since we will take the set
        trace.append(s)

    print("completed rollout with reward", r)
    print("trace", trace)

    # once we are done, we need to backpropagate 
    for t in set(trace):
        node = memory[t]  # retrieve node for corresponding state
        node.q += r  # reward
        node.n += 1  # games played

>> rollout:  0
completed rollout with reward 0.0
trace [14, 13, 12]
>> rollout:  1
completed rollout with reward 1.0
trace [14, 15]
>> rollout:  2
completed rollout with reward 1.0
trace [14, 15]
>> rollout:  3
completed rollout with reward 1.0
trace [14, 15]
>> rollout:  4
completed rollout with reward 1.0
trace [14, 15]
>> rollout:  5
completed rollout with reward 1.0
trace [14, 15]


In [285]:
idx = 14
print('starting point', starting_point)
a = select_action(idx, memory, env)
print("action", a)

starting point 14
action 2
