# Gridworld + CNN + Target network + experience replay

In [12]:
%reset -f
import numpy as np
import torch
from Gridworld import Gridworld
from IPython.display import clear_output
import random
import matplotlib.pylab as plt
import copy

In [13]:
gridSize = 6
game = Gridworld(size=gridSize, mode='random')

In [14]:
state_ = game.board.render_np()
state_ = torch.from_numpy(state_).float()#.reshape(4,4,4)

import torch.nn as nn
import torch.nn.functional as F
model =nn.Sequential(
    nn.Conv2d(4,12,kernel_size=1,stride= 1,padding=1),
    nn.ReLU(),
    nn.Conv2d(12,24,kernel_size=3,stride=1,padding=1),
    nn.ReLU(),
    nn.Flatten(start_dim=-3,end_dim=-1),
    nn.Linear(1536,300),
    nn.ReLU(),
    nn.Linear(300,64),
    nn.ReLU(),
    nn.Linear(64,32),
    nn.ReLU(),
    nn.Linear(32,8),
    nn.ReLU(),
    nn.Linear(8,4),
)
print(state_.shape)
model(state_).shape


torch.Size([4, 6, 6])


torch.Size([4])

In [15]:
model2 = copy.deepcopy(model)
model2.load_state_dict(model.state_dict())

loss_fn = torch.nn.MSELoss()
learning_rate = 1e-3
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

gamma = 0.9
epsilon = 0.3

action_set = { 0: 'u', 1: 'd', 2: 'l', 3: 'r'}

## Train if not trained already

In [None]:
from collections import deque
import sys

epochs = 10000
losses = []
mem_size = 1000
batch_size = 200
replay = deque(maxlen=mem_size)
max_moves = 50
h = 0
sync_freq = 500
j = 0

for i in range(epochs):
    game = Gridworld(size=gridSize,mode='random')
    state1_ = game.board.render_np() + np.random.rand(4,gridSize,gridSize)/100.0
    state1 = torch.from_numpy(state1_).float()
    status = 1
    mov = 0
    while (status == 1):
        j += 1
        mov += 1
        qval = model(state1)
        qval_ = qval.data.numpy()
        action_ = np.argmax(qval_) if random.random() < epsilon else np.random.randint(0,4)
        action = action_set[action_]
        game.makeMove(action)
        state2_ = game.board.render_np() + np.random.rand(4,gridSize,gridSize)/100.0
        state2 = torch.from_numpy(state2_).float()
        reward = game.reward()
        done = True if reward > 0 else False
        exp = (state1, action_, reward, state2, done)
        replay.append(exp)
        state1 = state2
        
        if len(replay) > batch_size:
            minibatch = random.sample(replay, batch_size)
            state1_batch = torch.stack([s1 for (s1,a,r,s2,d) in minibatch])
            action_batch = torch.Tensor([a for (s1,a,r,s2,d) in minibatch])
            reward_batch = torch.Tensor([r for (s1,a,r,s2,d) in minibatch])
            state2_batch = torch.stack([s2 for (s1,a,r,s2,d) in minibatch])
            done_batch = torch.Tensor([d for (s1,a,r,s2,d) in minibatch])  
            Q1 = model(state1_batch)
            with torch.no_grad():
                Q2 = model2(state2_batch)
            Y = reward_batch + gamma*((1-done_batch)*torch.max(Q2,dim=1)[0])
            X = Q1.gather(dim=1, index=action_batch.long().unsqueeze(dim=1)).squeeze()
            loss = loss_fn(X,Y.detach())
            print(i,loss.item())
            clear_output(wait=True)
            optimizer.zero_grad()
            loss.backward()
            losses.append(loss.item())
            optimizer.step()      
        
        
            if j % sync_freq == 0:
                model2.load_state_dict(model.state_dict())
        if reward != -1 or mov > max_moves:
            status = 0
            mov = 0
losses = np.array(losses)
print("done")


In [None]:
plt.figure(figsize=(10,7))
#plt.plot(losses)
plt.plot([np.mean(losses[i:i+500]) for i in range(len(losses))])
plt.xlabel("Epochs",fontsize=22)
plt.ylabel("Loss",fontsize=22)

In [19]:
# save trained model
#torch.save(model, 'model.pth')

## Load pretrained model

In [16]:
model = torch.load('savedModels/model_gridworld_6x6.pth')

In [17]:
def test_model(model, mode='static', display=True):
    i = 0
    test_game = Gridworld(size=gridSize , mode=mode)
    state_ = game.board.render_np() + np.random.rand(4,gridSize,gridSize)/100.0
    state = torch.from_numpy(state_).float()
    if display:
        print("Initial State:")
        print(test_game.display())
    status = 1
    while(status == 1): #A
        qval = model(state)
        qval_ = qval.data.numpy()
        action_ = np.argmax(qval_) #B
        action = action_set[action_]
        if display:
            print('Move #: %s; Taking action: %s' % (i, action))
        test_game.makeMove(action)
        state_ = test_game.board.render_np() + np.random.rand(4,gridSize,gridSize)/100.0
        state = torch.from_numpy(state_).float()
        if display:
            print(test_game.display())
        reward = test_game.reward()
        if reward != -1:
            if reward > 0:
                status = 2
                if display:
                    print("Game won! Reward: %s" % (reward,))
            else:
                status = 0
                if display:
                    print("Game LOST. Reward: %s" % (reward,))
        i += 1
        if (i > 15):
            if display:
                print("Game lost; too many moves.")
            break
    
    win = True if status == 2 else False
    return win

In [18]:
max_games = 1000
wins = 0
for i in range(max_games):
    win = test_model(model, mode='random', display=False)
    if win:
        wins += 1
win_perc = float(wins) / float(max_games)
print("Games played: {0}, # of wins: {1}".format(max_games,wins))
print("Win percentage: {}%".format(100.0*win_perc))

Games played: 1000, # of wins: 856
Win percentage: 85.6%


In [56]:
# lets play
import time
won = 0
lost = 0
for iii in range(30):
    game = Gridworld(size=gridSize, mode='random') #C
    state_ = game.board.render_np() + np.random.rand(4,gridSize,gridSize)/100.0
    state1 = torch.from_numpy(state_).float()
    status = 1 #F
    i = 0
    while(status == 1): #G
    #for j in range(1):
        i += 1
        print("game:", iii, ", move: ", i, ", won: ", won, ",lost:", lost)
        qval = model(state1) #H
        qval_ = qval.data.numpy()
        action_ = np.argmax(qval_)
        action = action_set[action_] #J

        game.makeMove(action) #K
        reward = game.reward()
        print(game.display())
        time.sleep(0.1)
        if i > 50:
            reward = -10
        if reward != -1: #Q
            status = 0
            if reward == 10:
                won += 1
            else:
                lost +=1 
        clear_output(wait=True)
        state_ = game.board.render_np() + np.random.rand(4,gridSize,gridSize)/100.0
        state1 = torch.from_numpy(state_).float()

game: 29 , move:  51 , won:  27 ,lost: 2
[[' ' ' ' ' ' ' ' ' ' ' ']
 [' ' ' ' 'P' ' ' ' ' ' ']
 [' ' ' ' 'W' ' ' ' ' ' ']
 [' ' ' ' ' ' ' ' ' ' ' ']
 [' ' ' ' ' ' ' ' ' ' '+']
 ['-' ' ' ' ' ' ' ' ' ' ']]


In [32]:
# save this model and put it on githubreward

In [59]:
#torch.save(model, 'model.pth')