http://outlace.com/Reinforcement-Learning-Part-3/

### Gridworld Setting

In [1]:
import numpy as np

def randPair(s,e):
    return np.random.randint(s,e), np.random.randint(s,e)

#finds an array in the "depth" dimension of the grid
def findLoc(state, obj):
    for i in range(0,4):
        for j in range(0,4):
            if (state[i,j] == obj).all():
                return i,j

#Initialize stationary grid, all items are placed deterministically
def initGrid():
    state = np.zeros((4,4,4))
    #place player
    state[0,1] = np.array([0,0,0,1])
    #place wall
    state[2,2] = np.array([0,0,1,0])
    #place pit
    state[1,1] = np.array([0,1,0,0])
    #place goal
    state[3,3] = np.array([1,0,0,0])

    return state

#Initialize player in random location, but keep wall, goal and pit stationary
def initGridPlayer():
    state = np.zeros((4,4,4))
    #place player
    state[randPair(0,4)] = np.array([0,0,0,1])
    #place wall
    state[2,2] = np.array([0,0,1,0])
    #place pit
    state[1,1] = np.array([0,1,0,0])
    #place goal
    state[1,2] = np.array([1,0,0,0])

    a = findLoc(state, np.array([0,0,0,1])) #find grid position of player (agent)
    w = findLoc(state, np.array([0,0,1,0])) #find wall
    g = findLoc(state, np.array([1,0,0,0])) #find goal
    p = findLoc(state, np.array([0,1,0,0])) #find pit
    if (not a or not w or not g or not p):
        #print('Invalid grid. Rebuilding..')
        return initGridPlayer()

    return state

#Initialize grid so that goal, pit, wall, player are all randomly placed
def initGridRand():
    state = np.zeros((4,4,4))
    #place player
    state[randPair(0,4)] = np.array([0,0,0,1])
    #place wall
    state[randPair(0,4)] = np.array([0,0,1,0])
    #place pit
    state[randPair(0,4)] = np.array([0,1,0,0])
    #place goal
    state[randPair(0,4)] = np.array([1,0,0,0])

    a = findLoc(state, np.array([0,0,0,1]))
    w = findLoc(state, np.array([0,0,1,0]))
    g = findLoc(state, np.array([1,0,0,0]))
    p = findLoc(state, np.array([0,1,0,0]))
    #If any of the "objects" are superimposed, just call the function again to re-place
    if (not a or not w or not g or not p):
        #print('Invalid grid. Rebuilding..')
        return initGridRand()

    return state

In [2]:
def makeMove(state, action):
    #need to locate player in grid
    #need to determine what object (if any) is in the new grid spot the player is moving to
    player_loc = findLoc(state, np.array([0,0,0,1]))
    wall = findLoc(state, np.array([0,0,1,0]))
    goal = findLoc(state, np.array([1,0,0,0]))
    pit = findLoc(state, np.array([0,1,0,0]))
    state = np.zeros((4,4,4))

    #up (row - 1)
    if action==0:
        new_loc = (player_loc[0] - 1, player_loc[1])
        if (new_loc != wall):
            if ((np.array(new_loc) <= (3,3)).all() and (np.array(new_loc) >= (0,0)).all()):
                state[new_loc][3] = 1
    #down (row + 1)
    elif action==1:
        new_loc = (player_loc[0] + 1, player_loc[1])
        if (new_loc != wall):
            if ((np.array(new_loc) <= (3,3)).all() and (np.array(new_loc) >= (0,0)).all()):
                state[new_loc][3] = 1
    #left (column - 1)
    elif action==2:
        new_loc = (player_loc[0], player_loc[1] - 1)
        if (new_loc != wall):
            if ((np.array(new_loc) <= (3,3)).all() and (np.array(new_loc) >= (0,0)).all()):
                state[new_loc][3] = 1
    #right (column + 1)
    elif action==3:
        new_loc = (player_loc[0], player_loc[1] + 1)
        if (new_loc != wall):
            if ((np.array(new_loc) <= (3,3)).all() and (np.array(new_loc) >= (0,0)).all()):
                state[new_loc][3] = 1

    new_player_loc = findLoc(state, np.array([0,0,0,1]))

    #if superimposed, return to original position
    if (not new_player_loc):
        state[player_loc] = np.array([0,0,0,1])
        
    #re-place pit
    state[pit][1] = 1
    #re-place wall
    state[wall][2] = 1
    #re-place goal
    state[goal][0] = 1

    return state

In [3]:
def getLoc(state, level):
    for i in range(0,4):
        for j in range(0,4):
            if (state[i,j][level] == 1):
                return i,j

def getReward(state):
    player_loc = getLoc(state, 3)
    pit = getLoc(state, 1)
    goal = getLoc(state, 0)
    if (player_loc == pit):
        return -10
    elif (player_loc == goal):
        return 10
    else:
        return -1

def dispGrid(state):
    grid = np.zeros((4,4), dtype='<U2')
    player_loc = findLoc(state, np.array([0,0,0,1]))
    wall = findLoc(state, np.array([0,0,1,0]))
    goal = findLoc(state, np.array([1,0,0,0]))
    pit = findLoc(state, np.array([0,1,0,0]))
    for i in range(0,4):
        for j in range(0,4):
            grid[i,j] = ' '

    if player_loc:
        grid[player_loc] = 'P' #player
    if wall:
        grid[wall] = 'W' #wall
    if goal:
        grid[goal] = '+' #goal
    if pit:
        grid[pit] = '-' #pit

    return grid

### Playing with Gridworld

In [50]:
state = initGridRand()
dispGrid(state)

array([[u'-', u' ', u' ', u' '],
       [u' ', u'W', u' ', u' '],
       [u' ', u' ', u'+', u' '],
       [u' ', u' ', u' ', u'P']], 
      dtype='<U2')

In [60]:
state = makeMove(state, 2)
dispGrid(state)

TypeError: 'NoneType' object has no attribute '__getitem__'

In [61]:
getReward(state)

-10

### Setting NN

In [62]:
from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation
from keras.optimizers import RMSprop

Using TensorFlow backend.


In [63]:
model = Sequential()
model.add(Dense(164, init='lecun_uniform', input_shape=(64,)))
model.add(Activation('relu'))
#model.add(Dropout(0.2)) I'm not using dropout, but maybe you wanna give it a try?

model.add(Dense(150, init='lecun_uniform'))
model.add(Activation('relu'))
#model.add(Dropout(0.2))

model.add(Dense(4, init='lecun_uniform'))
model.add(Activation('linear')) #linear output so we can have range of real-valued outputs

rms = RMSprop()
model.compile(loss='mse', optimizer=rms)

In [64]:
model.predict(state.reshape(1,64), batch_size=1)
#just to show an example output; read outputs left to right: up/down/left/right

array([[-0.16014339, -0.13676874, -0.11245299,  0.04404612]], dtype=float32)

### Training

1. Setup a for-loop to number of epochs
2. In the loop, setup while loop (while game is in progress)
3. Run Q network forward.
4. We're using an epsilon greedy implementation, so at time t with probability  ϵϵ  we will choose a random action. With probability  1−ϵ1−ϵ  we will choose the action associated with the highest Q value from our neural network.
5. Take action  aa  as determined in (4), observe new state  s′s′  and reward  rt+1rt+1 
6. Run the network forward using  s′s′ . Store the highest Q value (maxQ).
7. Our target value to train the network is reward + (gamma * maxQ) where gamma is a parameter ( 0<=γ<=10<=γ<=1 ).
8. Given that we have 4 outputs and we only want to update/train the output associated with the action we just took, our target output vector is the same as the output vector from the first run, except we change the one output associated with our action to: reward + (gamma * maxQ)
9. Train the model on this 1 sample. Repeat process 2-9

In [None]:
from IPython.display import clear_output
import random

In [79]:
epochs = 1000
gamma = 0.9 #since it may take several moves to goal, making gamma high
epsilon = 1.

In [94]:
for i in range(epochs):

    state = initGrid()
    status = 1
    #while game still in progress
    while(status == 1):
        #We are in state S
        #Let's run our Q function on S to get Q values for all possible actions
        qval = model.predict(state.reshape(1,64), batch_size=1)
        if (random.random() < epsilon): #choose random action
            action = np.random.randint(0,4)
        else: #choose best action from Q(s,a) values
            action = (np.argmax(qval))
        #Take action, observe new state S'
        new_state = makeMove(state, action)
        #Observe reward
        reward = getReward(new_state)
        #Get max_Q(S',a)
        newQ = model.predict(new_state.reshape(1,64), batch_size=1)
        maxQ = np.max(newQ)
        y = np.zeros((1,4))
        y[:] = qval[:]
        if reward == -1: #non-terminal state
            update = (reward + (gamma * maxQ))
        else: #terminal state
            update = reward
        y[0][action] = update #target output
        print("Game #: %s" % (i,))
        model.fit(state.reshape(1,64), y, batch_size=1, nb_epoch=1, verbose=1)
        state = new_state
        if reward != -1:
            status = 0
        clear_output(wait=True)
    if epsilon > 0.1:
        epsilon -= (1./epochs)

Game #: 999
Epoch 1/1


In [95]:
def testAlgo(init=0):
    i = 0
    if init==0:
        state = initGrid()
    elif init==1:
        state = initGridPlayer()
    elif init==2:
        state = initGridRand()

    print("Initial State:")
    print(dispGrid(state))
    status = 1
    #while game still in progress
    while(status == 1):
        qval = model.predict(state.reshape(1,64), batch_size=1)
        action = (np.argmax(qval)) #take action with highest Q-value
        print('Move #: %s; Taking action: %s' % (i, action))
        state = makeMove(state, action)
        print(dispGrid(state))
        reward = getReward(state)
        if reward != -1:
            status = 0
            print("Reward: %s" % (reward,))
        i += 1 #If we're taking more than 10 actions, just stop, we probably can't win this game
        if (i > 10):
            print("Game lost; too many moves.")
            break

In [96]:
testAlgo(init=0)

Initial State:
[[u' ' u'P' u' ' u' ']
 [u' ' u'-' u' ' u' ']
 [u' ' u' ' u'W' u' ']
 [u' ' u' ' u' ' u'+']]
Move #: 0; Taking action: 3
[[u' ' u' ' u'P' u' ']
 [u' ' u'-' u' ' u' ']
 [u' ' u' ' u'W' u' ']
 [u' ' u' ' u' ' u'+']]
Move #: 1; Taking action: 3
[[u' ' u' ' u' ' u'P']
 [u' ' u'-' u' ' u' ']
 [u' ' u' ' u'W' u' ']
 [u' ' u' ' u' ' u'+']]
Move #: 2; Taking action: 1
[[u' ' u' ' u' ' u' ']
 [u' ' u'-' u' ' u'P']
 [u' ' u' ' u'W' u' ']
 [u' ' u' ' u' ' u'+']]
Move #: 3; Taking action: 1
[[u' ' u' ' u' ' u' ']
 [u' ' u'-' u' ' u' ']
 [u' ' u' ' u'W' u'P']
 [u' ' u' ' u' ' u'+']]
Move #: 4; Taking action: 1
[[u' ' u' ' u' ' u' ']
 [u' ' u'-' u' ' u' ']
 [u' ' u' ' u'W' u' ']
 [u' ' u' ' u' ' u' ']]
Reward: 10


### Playing the harder variant, catastrophic forgetting, and experience replay

In [107]:
model.compile(loss='mse', optimizer=rms)#reset weights of neural network
epochs = 3000
gamma = 0.975
epsilon = 1.
batchSize = 40
buffer = 80
replay = [] #stores tuples of (S, A, R, S')

In [108]:
h = 0
for i in range(epochs):

    state = initGridPlayer() #using the harder state initialization function
    status = 1
    #while game still in progress
    while(status == 1):
        #We are in state S
        #Let's run our Q function on S to get Q values for all possible actions
        qval = model.predict(state.reshape(1,64), batch_size=1)
        if (random.random() < epsilon): #choose random action
            action = np.random.randint(0,4)
        else: #choose best action from Q(s,a) values
            action = (np.argmax(qval))
        #Take action, observe new state S'
        new_state = makeMove(state, action)
        #Observe reward
        reward = getReward(new_state)

        #Experience replay storage
        if (len(replay) < buffer): #if buffer not filled, add to it
            replay.append((state, action, reward, new_state))
        else: #if buffer full, overwrite old values
            if (h < (buffer-1)):
                h += 1
            else:
                h = 0
            replay[h] = (state, action, reward, new_state)
            #randomly sample our experience replay memory
            minibatch = random.sample(replay, batchSize)
            X_train = []
            y_train = []
            for memory in minibatch:
                #Get max_Q(S',a)
                old_state, action, reward, new_state = memory
                old_qval = model.predict(old_state.reshape(1,64), batch_size=1)
                newQ = model.predict(new_state.reshape(1,64), batch_size=1)
                maxQ = np.max(newQ)
                y = np.zeros((1,4))
                y[:] = old_qval[:]
                if reward == -1: #non-terminal state
                    update = (reward + (gamma * maxQ))
                else: #terminal state
                    update = reward
                y[0][action] = update
                X_train.append(old_state.reshape(64,))
                y_train.append(y.reshape(4,))

            X_train = np.array(X_train)
            y_train = np.array(y_train)
            print("Game #: %s" % (i,))
            model.fit(X_train, y_train, batch_size=batchSize, nb_epoch=1, verbose=1)
            state = new_state
        if reward != -1: #if reached terminal state, update game status
            status = 0
        clear_output(wait=True)
    if epsilon > 0.1: #decrement epsilon over time
        epsilon -= (1./epochs)

Game #: 2999
Epoch 1/1


In [111]:
testAlgo(1) #run testAlgo using random player placement => initGridPlayer()

Initial State:
[[u' ' u' ' u' ' u'P']
 [u' ' u'-' u'+' u' ']
 [u' ' u' ' u'W' u' ']
 [u' ' u' ' u' ' u' ']]
Move #: 0; Taking action: 2
[[u' ' u' ' u'P' u' ']
 [u' ' u'-' u'+' u' ']
 [u' ' u' ' u'W' u' ']
 [u' ' u' ' u' ' u' ']]
Move #: 1; Taking action: 1
[[u' ' u' ' u' ' u' ']
 [u' ' u'-' u' ' u' ']
 [u' ' u' ' u'W' u' ']
 [u' ' u' ' u' ' u' ']]
Reward: 10
