In [1]:
import numpy as np

In [124]:
states = ['LT', 'A', 'B', 'C', 'D', 'E', 'RT']
terminal_states = ['LT', 'RT']
idx_to_state = {i:states[i] for i in range(len(states))}
state_to_idx = {states[i]:i for i in range(len(states))}

In [41]:
states

['LT', 'A', 'B', 'C', 'D', 'E', 'RT']

In [165]:
def generate_episode(add_terminal=False):
    curr_state = 3
    history = []
    while True:
        rnd = np.random.randint(0,2)
        prev_state = curr_state
        if rnd == 0:
            # go right
            curr_state += 1
            # if we hit a terminal, then stop, otherwise
            # track the reward
            if curr_state == state_to_idx["RT"]:
                #print "hit RT"
                history.append( (idx_to_state[prev_state], "right", 1) )
                if add_terminal:
                    history.append( (idx_to_state[curr_state], "n/a", 0) )
                break
            else:
                history.append( (idx_to_state[prev_state], "right", 0) )
        elif rnd == 1:
            # go left
            curr_state -= 1
            # if we hit a terminal, then stop, otherwise
            # track the reward
            if curr_state == state_to_idx["LT"]:
                #print "hit LT"
                history.append( (idx_to_state[prev_state], "left", 0) )
                if add_terminal:
                    history.append( (idx_to_state[curr_state], "n/a", 0) )
                break
            else:
                history.append( (idx_to_state[prev_state], "left", 0) )
    return history

In [167]:
history = generate_episode(add_terminal=True)
history

[('C', 'right', 0),
 ('D', 'right', 0),
 ('E', 'left', 0),
 ('D', 'right', 0),
 ('E', 'right', 1),
 ('RT', 'n/a', 0)]

In [49]:
[ history[i][2] for i in range(len(history)) ]

[0, 0, 0, 0, 0]

In [80]:
def history_from_state(history, state):
    """
    return the history from when state 's' was
    first seen
    :history:
    :state: the string describing the state
    """
    from_idx = -1
    for i in range(len(history)):
        if history[i][0] == state:
            from_idx = i
            break
    if from_idx == -1:
        return []
    else:
        return history[from_idx::]

Let's do first-visit MC policy evaluation

In [95]:
# repeat forever
returns = {state:[] for state in states}
num_iters = 10000
for num_iter in range(num_iters):
    # generate an episode using policy
    history = generate_episode()
    # for each state appearing in the episode:
    for state in states:
        history_from_this_state = history_from_state(history, state)
        if history_from_this_state == []:
            continue
        else:
            # G = return following first occurence of s
            G = sum([ history_from_this_state[i][2] for i in range(len(history_from_this_state)) ])
            # append G to returns
            returns[state].append(G)
# average returns
for state in states:
    if returns[state] != []:
        print state, ":", np.mean(returns[state])

A : 0.167109634551
B : 0.33377624236
C : 0.4986
D : 0.660922587487
E : 0.82563338301


This is consistent with the values in the book: A = 1/6, B = 2/6, ..., E = 5/6

----

Let's implement TD(0)

In [111]:
V = {state:0.0 for state in states}
V

{'A': 0.0, 'B': 0.0, 'C': 0.0, 'D': 0.0, 'E': 0.0, 'LT': 0.0, 'RT': 0.0}

In [310]:
V = {state:0.0 for state in states}
num_iters = 50000
gamma = 1
N = {state:0.0 for state in states}
# repeat (for each episode)
for i in range(num_iters):
    history = generate_episode(add_terminal=True)
    # initialise S
    S = history[0][0]
    # repeat (for each step of episode)
    for t in range(0, len(history)-1):
        N[S] += 1
        # A = action given by pi for S
        A = history[t][1]
        # take action A, observe R, S'
        R = history[t][2]
        S_prime = history[t+1][0]
        #alpha = 1.0 / N[S]
        alpha = 0.001
        V[S] += alpha * ( R + (gamma*V[S_prime]) - V[S] )
        S = S_prime
        if S in terminal_states:
            break

In [311]:
V

{'A': 0.16329355441220494,
 'B': 0.3273412189514655,
 'C': 0.4897669743175231,
 'D': 0.6577518024201993,
 'E': 0.823333377291298,
 'LT': 0.0,
 'RT': 0.0}

-----

TD(0) for batch updating. The advantage: if we have a finite # of episodes, just keep iterating over these episodes rather than having to generate new ones

In [302]:
V = {state:0.0 for state in states}
num_iters = 100
gamma = 1
histories = [ generate_episode(add_terminal=True) for i in range(num_iters) ]
for num_epochs in range(1000):
    # repeat (for each episode)
    deltas = {state:0.0 for state in states}
    for i in range(0, num_iters):
        history = histories[i]
        # initialise S
        S = history[0][0]
        # repeat (for each step of episode)
        for t in range(0, len(history)-1):
            # A = action given by pi for S
            A = history[t][1]
            # take action A, observe R, S'
            R = history[t][2]
            S_prime = history[t+1][0]
            #alpha = 1.0 / N[S]
            alpha = 0.001
            deltas[S] += alpha*( R + (gamma*V[S_prime]) - V[S] )
            S = S_prime
            if S in terminal_states:
                break
    for state in states:
        V[state] += deltas[state]

In [312]:
V

{'A': 0.16329355441220494,
 'B': 0.3273412189514655,
 'C': 0.4897669743175231,
 'D': 0.6577518024201993,
 'E': 0.823333377291298,
 'LT': 0.0,
 'RT': 0.0}

In [313]:
5.0/6

0.8333333333333334