In [1]:
import numpy as np

In [2]:
def randPair(s,e):
    return np.random.randint(s,e), np.random.randint(s,e)

In [3]:
#finds an array in the "depth" dimension of the grid
def findLoc(state, obj):
    for i in range(0,4):
        for j in range(0,4):
            if (state[i,j] == obj).all():
                return i,j

In [4]:
state = np.arange(0,16,1).reshape(4,4)
findLoc(state, 2)

(0, 2)

### Layers
* Player
* Wall
* Pit
* Goal

In [5]:
#Initialize stationary grid, all items are placed deterministically
def initGrid():
    state = np.zeros((4,4,4))
    #place player
    state[0,1,3] = 1
    #place wall
    state[2,2,2] = 1
    #place pit
    state[1,1,1] = 1
    #place goal
    state[3,3,0] = 1
    
    return state

In [6]:
state = initGrid()
state

array([[[ 0.,  0.,  0.,  0.],
        [ 0.,  0.,  0.,  1.],
        [ 0.,  0.,  0.,  0.],
        [ 0.,  0.,  0.,  0.]],

       [[ 0.,  0.,  0.,  0.],
        [ 0.,  1.,  0.,  0.],
        [ 0.,  0.,  0.,  0.],
        [ 0.,  0.,  0.,  0.]],

       [[ 0.,  0.,  0.,  0.],
        [ 0.,  0.,  0.,  0.],
        [ 0.,  0.,  1.,  0.],
        [ 0.,  0.,  0.,  0.]],

       [[ 0.,  0.,  0.,  0.],
        [ 0.,  0.,  0.,  0.],
        [ 0.,  0.,  0.,  0.],
        [ 1.,  0.,  0.,  0.]]])

In [7]:
#Initialize player in random location, but keep wall, goal and pit stationary
def initGridPlayer():
    state = np.zeros((4,4,4))
    #place player
    state[randPair(0,4)] = np.array([0,0,0,1])
    #place wall
    state[2,2] = np.array([0,0,1,0])
    #place pit
    state[1,1] = np.array([0,1,0,0])
    #place goal
    state[1,2] = np.array([1,0,0,0])
    
    a = findLoc(state, np.array([0,0,0,1])) #find grid position of player (agent)
    w = findLoc(state, np.array([0,0,1,0])) #find wall
    g = findLoc(state, np.array([1,0,0,0])) #find goal
    p = findLoc(state, np.array([0,1,0,0])) #find pit
    if (not a or not w or not g or not p):
        #print('Invalid grid. Rebuilding..')
        return initGridPlayer()
    
    return state

In [8]:
#Initialize grid so that goal, pit, wall, player are all randomly placed
def initGridRand():
    state = np.zeros((4,4,4))
    #place player
    state[randPair(0,4)] = np.array([0,0,0,1])
    #place wall
    state[randPair(0,4)] = np.array([0,0,1,0])
    #place pit
    state[randPair(0,4)] = np.array([0,1,0,0])
    #place goal
    state[randPair(0,4)] = np.array([1,0,0,0])
    
    a = findLoc(state, np.array([0,0,0,1]))
    w = findLoc(state, np.array([0,0,1,0]))
    g = findLoc(state, np.array([1,0,0,0]))
    p = findLoc(state, np.array([0,1,0,0]))
    #If any of the "objects" are superimposed, just call the function again to re-place
    if (not a or not w or not g or not p):
        #print('Invalid grid. Rebuilding..')
        return initGridRand()
    
    return state

In [9]:
def makeMove(state, action):
    #need to locate player in grid
    #need to determine what object (if any) is in the new grid spot the player is moving to
    player_loc = findLoc(state, np.array([0,0,0,1]))
    wall = findLoc(state, np.array([0,0,1,0]))
    goal = findLoc(state, np.array([1,0,0,0]))
    pit = findLoc(state, np.array([0,1,0,0]))
    state = np.zeros((4,4,4))

    actions = [[-1,0],[1,0],[0,-1],[0,1]]
    #e.g. up => (player row - 1, player column + 0)
    new_loc = (player_loc[0] + actions[action][0], player_loc[1] + actions[action][1])
    if (new_loc != wall):
        if ((np.array(new_loc) <= (3,3)).all() and (np.array(new_loc) >= (0,0)).all()):
            state[new_loc][3] = 1

    new_player_loc = findLoc(state, np.array([0,0,0,1]))
    if (not new_player_loc):
        state[player_loc] = np.array([0,0,0,1])
    #re-place pit
    state[pit][1] = 1
    #re-place wall
    state[wall][2] = 1
    #re-place goal
    state[goal][0] = 1

    return state

In [10]:
def getLoc(state, level):
    for i in range(0,4):
        for j in range(0,4):
            if (state[i,j][level] == 1):
                return i,j

def getReward(state):
    player_loc = getLoc(state, 3)
    pit = getLoc(state, 1)
    goal = getLoc(state, 0)
    if (player_loc == pit):
        return -10
    elif (player_loc == goal):
        return 10
    else:
        return -1
    
def dispGrid(state):
    grid = np.zeros((4,4), dtype=np.str)
    player_loc = findLoc(state, np.array([0,0,0,1]))
    wall = findLoc(state, np.array([0,0,1,0]))
    goal = findLoc(state, np.array([1,0,0,0]))
    pit = findLoc(state, np.array([0,1,0,0]))
    for i in range(0,4):
        for j in range(0,4):
            grid[i,j] = ' '
            
    if player_loc:
        grid[player_loc] = 'P' #player
    if wall:
        grid[wall] = 'W' #wall
    if goal:
        grid[goal] = '+' #goal
    if pit:
        grid[pit] = '-' #pit
    
    return grid

### Let's play the game

In [39]:
state = initGridRand()
dispGrid(state)

array([[' ', ' ', ' ', ' '],
       ['+', ' ', 'W', ' '],
       [' ', ' ', ' ', '-'],
       [' ', 'P', ' ', ' ']],
      dtype='<U1')

In [40]:
state = makeMove(state, 1)
print('Reward: %s' % (getReward(state),))
state = makeMove(state, 3)
print('Reward: %s' % (getReward(state),))
dispGrid(state)

Reward: -1
Reward: -1


array([[' ', ' ', ' ', ' '],
       ['+', ' ', 'W', ' '],
       [' ', ' ', ' ', '-'],
       [' ', ' ', 'P', ' ']],
      dtype='<U1')

### Design the Q function
NN with 2 hidden layers

In [41]:
from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation
from keras.optimizers import Adam

In [42]:
model = Sequential()
model.add(Dense(164, input_shape=(64,)))
model.add(Activation('relu'))
#model.add(Dropout(0.2)) I'm not using dropout, but maybe you wanna give it a try?

model.add(Dense(150))
model.add(Activation('relu'))
#model.add(Dropout(0.2))

model.add(Dense(4))
model.add(Activation('linear')) #linear output so we can have range of real-valued outputs

adam = Adam()
model.compile(loss='mse', optimizer=adam)

In [67]:
model.predict(state.reshape(1,64), batch_size=1)

array([[ 0.10892771, -0.1549127 ,  0.04893943, -0.10359161]], dtype=float32)

### Online Training

* Setup a for-loop to number of epochs
* In the loop, setup while loop (while game is in progress)
* Run Q network **forward**, to get the **target output vector (TOV)**.
* We're using an **epsilon greedy** implementation, so at time t with probability ϵ we will choose a **random action**. With probability 1−ϵ we will choose the action associated with the **highest Q value** from our neural network.
* Take action a' as determined previously, observe new state s' and **reward $r_{t+1}$**
* Run the network **forward** using s'. Store the **highest Q value (maxQ)**.
* Our target value to train the network is **reward + (gamma * maxQ)** where gamma is a parameter (0<=γ<=1).
* Given that we have **4 outputs** and we only want to update/train the output associated with the action we just took, **our TOV** is the same, except **we change the one output associated with our action to: reward + (gamma * maxQ)**
* Train the model on this 1 sample. Repeat process 2-9

In [70]:
from IPython.display import clear_output
import random

epochs = 1000
gamma = 0.9 #since it may take several moves to goal, making gamma high
epsilon = 1
for i in range(epochs):
    
    state = initGrid()
    status = 1
    #while game still in progress
    while(status == 1):
        #We are in state S
        #Let's run our Q function on S to get Q values for all possible actions
        qval = model.predict(state.reshape(1,64), batch_size=1)
        if (random.random() < epsilon): #choose random action
            action = np.random.randint(0,4)
        else: #choose best action from Q(s,a) values
            action = (np.argmax(qval))
        #Take action, observe new state S'
        new_state = makeMove(state, action)
        #Observe reward
        reward = getReward(new_state)
        #Get max_Q(S',a)
        newQ = model.predict(new_state.reshape(1,64), batch_size=1)
        maxQ = np.max(newQ)
        y = np.zeros((1,4))
        y[:] = qval[:]
        if reward == -1: #non-terminal state (-1)
            update = (reward + (gamma * maxQ))
        else: #terminal state (+10 or -10)
            update = reward
        y[0][action] = update #target output
        print("Game #: %s" % (i,))
        model.fit(state.reshape(1,64), y, batch_size=1, epochs=1, verbose=1)
        state = new_state
        if reward != -1:
            status = 0
        clear_output(wait=True)
    if epsilon > 0.1:
        epsilon -= (1/epochs)

Game #: 999
Epoch 1/1


In [71]:
def testAlgo(init=0):
    i = 0
    if init==0:
        state = initGrid()
    elif init==1:
        state = initGridPlayer()
    elif init==2:
        state = initGridRand()

    print("Initial State:")
    print(dispGrid(state))
    status = 1
    #while game still in progress
    while(status == 1):
        qval = model.predict(state.reshape(1,64), batch_size=1)
        action = (np.argmax(qval)) #take action with highest Q-value
        print('Move #: %s; Taking action: %s' % (i, action))
        state = makeMove(state, action)
        print(dispGrid(state))
        reward = getReward(state)
        if reward != -1:
            status = 0
            print("Reward: %s" % (reward,))
        i += 1 #If we're taking more than 10 actions, just stop, we probably can't win this game
        if (i > 10):
            print("Game lost; too many moves.")
            break

In [114]:
testAlgo(init=0)

Initial State:
[[' ' 'P' ' ' ' ']
 [' ' '-' ' ' ' ']
 [' ' ' ' 'W' ' ']
 [' ' ' ' ' ' '+']]
Move #: 0; Taking action: 3
[[' ' ' ' 'P' ' ']
 [' ' '-' ' ' ' ']
 [' ' ' ' 'W' ' ']
 [' ' ' ' ' ' '+']]
Move #: 1; Taking action: 1
[[' ' ' ' ' ' ' ']
 [' ' '-' 'P' ' ']
 [' ' ' ' 'W' ' ']
 [' ' ' ' ' ' '+']]
Move #: 2; Taking action: 3
[[' ' ' ' ' ' ' ']
 [' ' '-' ' ' 'P']
 [' ' ' ' 'W' ' ']
 [' ' ' ' ' ' '+']]
Move #: 3; Taking action: 1
[[' ' ' ' ' ' ' ']
 [' ' '-' ' ' ' ']
 [' ' ' ' 'W' 'P']
 [' ' ' ' ' ' '+']]
Move #: 4; Taking action: 1
[[' ' ' ' ' ' ' ']
 [' ' '-' ' ' ' ']
 [' ' ' ' 'W' ' ']
 [' ' ' ' ' ' ' ']]
Reward: 10


### Catastrophic forgetting
* Associated with gradient descent based training methods in online training
* With a different initial state and after a very different reward: it may do backpropagation again to update its state-action value but because this state-action is very similar to the last learned state-action it may mess up its previously learned weights
* There's a push-pull between very similar state-actions (but with divergent targets) that results in this inability to properly learn anything
* Randomized batch learning in supervised learning
* **Experience replay** in DQN: minibatch updating in an online learning scheme

### Experience replay
* In state s, take action a, observe new state $s_{t+1}$ and reward $r_{t+1}$
* Store this as a tuple (s,a,$s_{t+1}$,$r_{t+1}$) in a list.
* Continue to store each experience in this list until we have filled the list to a specific length (up to you to define)
* Once the experience replay memory is filled, randomly select a subset (e.g. 40)
* Iterate through this subset and calculate value updates for each; store these in a target array (e.g. y_train) and store the state s of each memory in X_train
* Use X_train and y_train as a minibatch for batch training. For subsequent epochs where the array is full, just overwrite old values in our experience replay memory array.

In [None]:
model.compile(loss='mse', optimizer=adam)#reset weights of neural network
epochs = 3000
gamma = 0.975
epsilon = 1
batchSize = 40
buffer = 80
replay = []
#stores tuples of (S, A, R, S')
h = 0
for i in range(epochs):
    
    state = initGridPlayer() #using the harder state initialization function
    status = 1
    #while game still in progress
    while(status == 1):
        #We are in state S
        #Let's run our Q function on S to get Q values for all possible actions
        qval = model.predict(state.reshape(1,64), batch_size=1)
        if (random.random() < epsilon): #choose random action
            action = np.random.randint(0,4)
        else: #choose best action from Q(s,a) values
            action = (np.argmax(qval))
        #Take action, observe new state S'
        new_state = makeMove(state, action)
        #Observe reward
        reward = getReward(new_state)
        
        #Experience replay storage
        if (len(replay) < buffer): #if buffer not filled, add to it
            replay.append((state, action, reward, new_state))
        else: #if buffer full, overwrite old values
            if (h < (buffer-1)):
                h += 1
            else:
                h = 0
            replay[h] = (state, action, reward, new_state)
            #randomly sample our experience replay memory
            minibatch = random.sample(replay, batchSize)
            X_train = []
            y_train = []
            for memory in minibatch:
                #Get max_Q(S',a)
                old_state, action, reward, new_state = memory
                old_qval = model.predict(old_state.reshape(1,64), batch_size=1)
                newQ = model.predict(new_state.reshape(1,64), batch_size=1)
                maxQ = np.max(newQ)
                y = np.zeros((1,4))
                y[:] = old_qval[:]
                if reward == -1: #non-terminal state
                    update = (reward + (gamma * maxQ))
                else: #terminal state
                    update = reward
                y[0][action] = update
                X_train.append(old_state.reshape(64,))
                y_train.append(y.reshape(4,))
            
            X_train = np.array(X_train)
            y_train = np.array(y_train)
            print("Game #: %s" % (i,))
            model.fit(X_train, y_train, batch_size=batchSize, nb_epoch=1, verbose=1)
            state = new_state
        if reward != -1: #if reached terminal state, update game status
            status = 0
        clear_output(wait=True)
    if epsilon > 0.1: #decrement epsilon over time
        epsilon -= (1/epochs)

Game #: 1677
Epoch 1/1


In [None]:
testAlgo(1)

In [None]:
testAlgo(2)