In [1]:
import numpy as np
import gym
import random

#environment creation

In [18]:
import numpy as np

def randPair(s,e):
    return np.random.randint(s,e), np.random.randint(s,e)

#finds an array in the "depth" dimension of the grid
def findLoc(state, obj):
    for i in range(0,4):
        for j in range(0,4):
            if (state[i,j] == obj).all():
                return i,j

#Initialize stationary grid, all items are placed deterministically
def initGrid():
    state = np.zeros((4,4,4))
    #place player
    state[0,1] = np.array([0,0,0,1])
    #place wall
    state[2,2] = np.array([0,0,1,0])
    #place pit
    state[1,1] = np.array([0,1,0,0])
    #place goal
    state[3,3] = np.array([1,0,0,0])
    return state

#Initialize player in random location, but keep wall, goal and pit stationary
def initGridPlayer():
    state = np.zeros((4,4,4))
    #place player
    state[randPair(0,4)] = np.array([0,0,0,1])
    #place wall
    state[2,2] = np.array([0,0,1,0])
    #place pit
    state[1,1] = np.array([0,1,0,0])
    #place goal
    state[1,2] = np.array([1,0,0,0])
    
    a = findLoc(state, np.array([0,0,0,1])) #find grid position of player (agent)
    w = findLoc(state, np.array([0,0,1,0])) #find wall
    g = findLoc(state, np.array([1,0,0,0])) #find goal
    p = findLoc(state, np.array([0,1,0,0])) #find pit
    if (not a or not w or not g or not p):
        #print('Invalid grid. Rebuilding..')
        return initGridPlayer()
    
    return state

#Initialize grid so that goal, pit, wall, player are all randomly placed
def initGridRand():
    state = np.zeros((4,4,4))
    #place player
    state[randPair(0,4)] = np.array([0,0,0,1])
    #place wall
    state[randPair(0,4)] = np.array([0,0,1,0])
    #place pit
    state[randPair(0,4)] = np.array([0,1,0,0])
    #place goal
    state[randPair(0,4)] = np.array([1,0,0,0])
    
    a = findLoc(state, np.array([0,0,0,1]))
    w = findLoc(state, np.array([0,0,1,0]))
    g = findLoc(state, np.array([1,0,0,0]))
    p = findLoc(state, np.array([0,1,0,0]))
    #If any of the "objects" are superimposed, just call the function again to re-place
    if (not a or not w or not g or not p):
        #print('Invalid grid. Rebuilding..')
        return initGridRand()
    
    return state


def makeMove(state, action):
    #need to locate player in grid
    #need to determine what object (if any) is in the new grid spot the player is moving to
    player_loc = findLoc(state, np.array([0,0,0,1]))
    wall = findLoc(state, np.array([0,0,1,0]))
    goal = findLoc(state, np.array([1,0,0,0]))
    pit = findLoc(state, np.array([0,1,0,0]))
    state = np.zeros((4,4,4))

    actions = [[-1,0],[1,0],[0,-1],[0,1]]
    #e.g. up => (player row - 1, player column + 0)
    new_loc = (player_loc[0] + actions[action][0], player_loc[1] + actions[action][1])
    if (new_loc != wall):
        if ((np.array(new_loc) <= (3,3)).all() and (np.array(new_loc) >= (0,0)).all()):
            state[new_loc][3] = 1

    new_player_loc = findLoc(state, np.array([0,0,0,1]))
    if (not new_player_loc):
        state[player_loc] = np.array([0,0,0,1])
    #re-place pit
    state[pit][1] = 1
    #re-place wall
    state[wall][2] = 1
    #re-place goal
    state[goal][0] = 1

    return state

def getLoc(state, level):
    for i in range(0,4):
        for j in range(0,4):
            if (state[i,j][level] == 1):
                return i,j

def getReward(state):
    player_loc = getLoc(state, 3)
    pit = getLoc(state, 1)
    goal = getLoc(state, 0)
    if (player_loc == pit):
        return -10
    elif (player_loc == goal):
        return 10
    else:
        return -1
    
def dispGrid(state):
    grid = np.zeros((4,4), dtype= str)
    player_loc = findLoc(state, np.array([0,0,0,1]))
    wall = findLoc(state, np.array([0,0,1,0]))
    goal = findLoc(state, np.array([1,0,0,0]))
    pit = findLoc(state, np.array([0,1,0,0]))
    for i in range(0,4):
        for j in range(0,4):
            grid[i,j] = ' '
            
    if player_loc:
        grid[player_loc] = 'P' #player
    if wall:
        grid[wall] = 'W' #wall
    if goal:
        grid[goal] = '+' #goal
    if pit:
        grid[pit] = '-' #pit
    
    return grid

#Definition of the DQN Model and Replay Buffer

In [20]:
import torch
import torch.nn as nn
from torch.autograd import Variable
import torch.nn.functional as F
import numpy as np
from collections import namedtuple
import random

class hidden_unit(nn.Module):
    def __init__(self, in_channels, out_channels, activation):
        super(hidden_unit, self).__init__()
        self.activation = activation
        self.nn = nn.Linear(in_channels, out_channels)
    
    def forward(self, x):
        out = self.nn(x)
        out = self.activation(out)   
        return out
        
class Q_learning(nn.Module):
    def __init__(self, in_channels, hidden_layers, out_channels, unit = hidden_unit, activation = F.relu):
        super(Q_learning, self).__init__()
        assert type(hidden_layers) is list
        self.hidden_units = nn.ModuleList()
        self.in_channels = in_channels
        prev_layer = in_channels
        for hidden in hidden_layers:
            self.hidden_units.append(unit(prev_layer, hidden, activation))
            prev_layer = hidden
        self.final_unit = nn.Linear(prev_layer, out_channels)
    
    def forward(self, x):
        out = x.view(-1,self.in_channels).float()
        for unit in self.hidden_units:
            out = unit(out)
        out = self.final_unit(out)
        return out


Transition = namedtuple('Transition',
                        ('state', 'action', 'new_state', 'reward'))


class ReplayMemory(object):

    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, *args):
        """Saves a transition."""
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)        

    

#hyper--parameters and training code

In [21]:
from torch.autograd import Variable
import torch.optim as optim
import torch

from tqdm import tqdm

## Include the replay experience

epochs = 2000
gamma = 0.9 #since it may take several moves to goal, making gamma high
epsilon = 1
model = Q_learning(64, [150,150], 4, hidden_unit)
optimizer = optim.RMSprop(model.parameters(), lr = 1e-2)
# optimizer = optim.SGD(model.parameters(), lr = 0.1, momentum = 0)
criterion = torch.nn.MSELoss()
buffer = 80
BATCH_SIZE = 40
memory = ReplayMemory(buffer)   

for i in tqdm(range(epochs)):
    state = initGridRand()
    status = 1
    step = 0
    #while game still in progress
    while(status == 1):   
        v_state = Variable(torch.from_numpy(state)).view(1,-1)
        qval = model(v_state)
        if (np.random.random() < epsilon): #choose random action
            action = np.random.randint(0,4)
        else: #choose best action from Q(s,a) values
            action = np.argmax(qval.data)
        #Take action, observe new state S'
        new_state = makeMove(state, action)
        step +=1
        v_new_state = Variable(torch.from_numpy(new_state)).view(1,-1)
        #Observe reward
        reward = getReward(new_state)
        memory.push(v_state.data, action, v_new_state.data, reward)
        if (len(memory) < buffer): #if buffer not filled, add to it
            state = new_state
            if reward != -1: #if reached terminal state, update game status
                break
            else:
                continue
        transitions = memory.sample(BATCH_SIZE)
        batch = Transition(*zip(*transitions))
        state_batch = Variable(torch.cat(batch.state))
        action_batch = Variable(torch.LongTensor(batch.action)).view(-1,1)
        new_state_batch = Variable(torch.cat(batch.new_state))
        reward_batch = Variable(torch.FloatTensor(batch.reward))
        non_final_mask = (reward_batch == -1)
        #Let's run our Q function on S to get Q values for all possible actions
        qval_batch = model(state_batch)
        # we only grad descent on the qval[action], leaving qval[not action] unchanged
        state_action_values = qval_batch.gather(1, action_batch)
        #Get max_Q(S',a)
        with torch.no_grad():
            newQ = model(new_state_batch)
        maxQ = newQ.max(1)[0]
#         if reward == -1: #non-terminal state
#             update = (reward + (gamma * maxQ))
#         else: #terminal state
#             update = reward + 0*maxQ
#         y = reward_batch + (reward_batch == -1).float() * gamma *maxQ
        y = reward_batch
        y[non_final_mask] += gamma * maxQ[non_final_mask]
        y = y.view(-1,1)
        # print("Game #: %s" % (i,), end='\r')
        loss = criterion(state_action_values, y)
        # Optimize the model
        optimizer.zero_grad()
        loss.backward()
        for p in model.parameters():
            p.grad.data.clamp_(-1, 1)
        optimizer.step()
        state = new_state
        if reward != -1:
            status = 0
        if step >20:
            break
    if epsilon > 0.1:
        epsilon -= (1/epochs)

print("Training complete!") 

100%|██████████| 2000/2000 [01:46<00:00, 18.74it/s]

Training complete!





#inferencing model

In [22]:
 ## Here is the test of AI
def testAlgo(init=0):
    i = 0
    if init==0:
        state = initGrid()
    elif init==1:
        state = initGridPlayer()
    elif init==2:
        state = initGridRand()

    print("Initial State:")
    print(dispGrid(state))
    status = 1
    #while game still in progress
    while(status == 1):
        v_state = Variable(torch.from_numpy(state))
        qval = model(v_state.view(64))
        print(qval)
        action = np.argmax(qval.data) #take action with highest Q-value
        print('Move #: %s; Taking action: %s' % (i, action))
        state = makeMove(state, action)
        print(dispGrid(state))
        reward = getReward(state)
        if reward != -1:
            status = 0
            print("Reward: %s" % (reward,))
        i += 1 #If we're taking more than 10 actions, just stop, we probably can't win this game
        if (i > 10):
            print("Game lost; too many moves.")
            break


testAlgo(init=2)

Initial State:
[[' ' ' ' 'P' ' ']
 [' ' ' ' ' ' 'W']
 [' ' ' ' '-' ' ']
 [' ' '+' ' ' ' ']]
tensor([[2.1909, 3.4337, 3.6535, 5.2972]], grad_fn=<AddmmBackward0>)
Move #: 0; Taking action: tensor(3)
[[' ' ' ' ' ' 'P']
 [' ' ' ' ' ' 'W']
 [' ' ' ' '-' ' ']
 [' ' '+' ' ' ' ']]
tensor([[-0.3375,  1.0575,  1.4274,  2.7001]], grad_fn=<AddmmBackward0>)
Move #: 1; Taking action: tensor(3)
[[' ' ' ' ' ' 'P']
 [' ' ' ' ' ' 'W']
 [' ' ' ' '-' ' ']
 [' ' '+' ' ' ' ']]
tensor([[-0.3375,  1.0575,  1.4274,  2.7001]], grad_fn=<AddmmBackward0>)
Move #: 2; Taking action: tensor(3)
[[' ' ' ' ' ' 'P']
 [' ' ' ' ' ' 'W']
 [' ' ' ' '-' ' ']
 [' ' '+' ' ' ' ']]
tensor([[-0.3375,  1.0575,  1.4274,  2.7001]], grad_fn=<AddmmBackward0>)
Move #: 3; Taking action: tensor(3)
[[' ' ' ' ' ' 'P']
 [' ' ' ' ' ' 'W']
 [' ' ' ' '-' ' ']
 [' ' '+' ' ' ' ']]
tensor([[-0.3375,  1.0575,  1.4274,  2.7001]], grad_fn=<AddmmBackward0>)
Move #: 4; Taking action: tensor(3)
[[' ' ' ' ' ' 'P']
 [' ' ' ' ' ' 'W']
 [' ' ' ' '-' ' ']
 [

In [13]:
from tqdm import tqdm

# List of rewards
rewards = []

# 2 For life or until learning is stopped
for episode in tqdm(range(total_episodes)):
    # Reset the environment
    state = env.reset()
    step = 0
    done = False
    total_rewards = 0
    
    for step in range(max_steps):
        # 3. Choose an action a in the current world state (s)
        ## First we randomize a number
        exp_exp_tradeoff = random.uniform(0, 1)
        
        ## If this number > greater than epsilon --> exploitation (taking the biggest Q value for this state)
        if exp_exp_tradeoff > epsilon:
            action = np.argmax(qtable[state,:])
            #print(exp_exp_tradeoff, "action", action)

        # Else doing a random choice --> exploration
        else:
            action = env.action_space.sample()
            #print("action random", action)
            
        
        # Take the action (a) and observe the outcome state(s') and reward (r)
        new_state, reward, done, info = env.step(action)

        # Update Q(s,a):= Q(s,a) + lr [R(s,a) + gamma * max Q(s',a') - Q(s,a)]
        # qtable[new_state,:] : all the actions we can take from new state
        qtable[state, action] = qtable[state, action] + learning_rate * (reward + gamma * np.max(qtable[new_state, :]) - qtable[state, action])
        
        total_rewards += reward
        
        # Our new state is state
        state = new_state
        
        # If done (if we're dead) : finish episode
        if done == True: 
            break
        
    # Reduce epsilon (because we need less and less exploration)
    epsilon = min_epsilon + (max_epsilon - min_epsilon)*np.exp(-decay_rate*episode) 
    rewards.append(total_rewards)
    

print ("Score over time: " +  str(sum(rewards)/total_episodes))
print(qtable)

100%|██████████| 30000/30000 [00:46<00:00, 644.15it/s]

Score over time: 0.5020333333333333
[[3.35995692e-01 6.85312929e-02 4.84371168e-02 8.76328308e-02]
 [1.93292488e-02 6.94751993e-03 6.56083040e-03 2.86630774e-01]
 [1.33686145e-02 3.44560181e-02 2.32055054e-02 1.57297142e-01]
 [1.51907729e-02 2.19639425e-03 5.50681239e-03 4.17108707e-02]
 [4.40091677e-01 3.17389799e-01 3.56037614e-03 4.15271997e-02]
 [0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00]
 [2.10786427e-04 2.59460865e-05 1.22351438e-01 5.98695964e-05]
 [0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00]
 [1.41552272e-01 6.83545799e-02 2.73747781e-02 4.53743148e-01]
 [7.10780506e-02 6.72202856e-01 3.29372379e-02 9.41063084e-03]
 [2.72303233e-01 1.61687104e-03 1.29566574e-02 6.79906388e-02]
 [0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00]
 [0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00]
 [1.44080036e-01 9.79322685e-02 9.29958961e-01 1.12178573e-01]
 [3.56200789e-01 9.99844266e-01 1.49496373e-01 2.02158297e-01]
 [0.00000000e+00 0.




In [14]:
ACTION_LOOKUP = {0: 'LEFT', 1: 'DOWN', 2: 'RIGHT', 3: 'UP'}

In [15]:
q_optimal = [ACTION_LOOKUP[np.argmax(q)] for q in qtable]
q_optimal = np.reshape(q_optimal, (4,4))
print(q_optimal)

[['LEFT' 'UP' 'UP' 'UP']
 ['LEFT' 'LEFT' 'RIGHT' 'LEFT']
 ['UP' 'DOWN' 'LEFT' 'LEFT']
 ['LEFT' 'RIGHT' 'DOWN' 'LEFT']]


#use q table to play frozen lake

In [16]:
env.reset()

for episode in range(5):
    state = env.reset()
    step = 0
    done = False
    print("****************************************************")
    print("EPISODE ", episode)

    for step in range(max_steps):
        
        # Take the action (index) that have the maximum expected future reward given that state
        action = np.argmax(qtable[state,:])
        
        new_state, reward, done, info = env.step(action)
        
        if done:
            # Here, we decide to only print the last state (to see if our agent is on the goal or fall into an hole)
            env.render()
            if new_state == 15:
                print("We reached our Goal 🏆")
            else:
                print("We fell into a hole ☠️")
            
            # We print the number of step it took.
            print("Number of steps", step)
            
            break
        state = new_state
env.close()

****************************************************
EPISODE  0
****************************************************
EPISODE  1
  (Down)
SFFF
FHFH
FFFH
HFF[41mG[0m
We reached our Goal 🏆
Number of steps 34
****************************************************
EPISODE  2
  (Down)
SFFF
FHFH
FFFH
HFF[41mG[0m
We reached our Goal 🏆
Number of steps 6
****************************************************
EPISODE  3
****************************************************
EPISODE  4
  (Down)
SFFF
FHFH
FFFH
HFF[41mG[0m
We reached our Goal 🏆
Number of steps 76


In [17]:
q_optimal = [ACTION_LOOKUP[np.argmax(q)] for q in qtable]
q_optimal = np.reshape(q_optimal, (4,4))
print(q_optimal)

[['LEFT' 'UP' 'UP' 'UP']
 ['LEFT' 'LEFT' 'RIGHT' 'LEFT']
 ['UP' 'DOWN' 'LEFT' 'LEFT']
 ['LEFT' 'RIGHT' 'DOWN' 'LEFT']]
