In [17]:
import gym
from gym import spaces
from gym.utils import seeding
from collections import deque
from collections import defaultdict
import copy
%load_ext autoreload

%autoreload 2
%matplotlib notebook

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [18]:
import tensorflow as tf

from keras.models import Sequential
from keras.layers import Dense, Activation, Flatten
from keras.optimizers import Adam

from rl.agents.dqn import DQNAgent
from rl.policy import BoltzmannQPolicy
from rl.policy import EpsGreedyQPolicy
from rl.memory import SequentialMemory

In [19]:
import numpy as np

In [72]:
class SnakeEnv(gym.Env):
    #### GYM IMPLEMENTATION ###
    metadata = {'render.modes': ['human', 'not_human']}
    def __init__(self):
    # Definitions for game.
    # Board 15 x 15 to start
        self.score = 0
        self.game_over = False 
        self.width = 10
        self.height = 10
        self.grid = np.array([np.zeros(self.width) for i in range(self.height)], dtype=object)
        self.snake = deque()
        # direction 0 = down, 1 = left, 2 = up, 3 = right
        self.direction = 0
    # Running score
        self.score = 0
    #Gym implementation members:
        self.action_space = spaces.Discrete(4)
        # each square can be empty, food, snake, snake head
        self.observation_space = spaces.Box(low = np.array([0,0,0]), high=np.array([self.width, self.height, 4]), dtype=np.int)

        self.reset()
    def step(self, action):
        ''' Perform one step of the game'''
        self.move(action)
        info = {}
        return self.grid, self.score, self.game_over, info
    def reset(self):
        self.direction = 0
        self.snake = deque()
        self.grid = np.array([np.zeros(self.width) for i in range(self.height)], dtype=object)
        i = np.random.randint(0,self.width- 2 ) + 1
        j = np.random.randint(0,self.height - 2) + 1 
        # * is a nonhead part of the snake
        self.grid[i,j] = 2 #"*"
        self.snake.append((i,j))
        # H is the head of the snake
        self.grid[i+1,j] = 1 #"H"
        self.snake.append((i+1,j))
        # X is food, set initial food
        k,l = i,j
        while ((i==k and j==l) or (i+1 ==k and j==l)):
            k = np.random.randint(0,self.width- 2 ) + 1
            l = np.random.randint(0,self.height - 2) + 1
        self.grid[k,l] = 3 #"X"
        self.food = (k,l)
        return self.grid
    def updategrid(self):
        self.grid = np.array([np.zeros(self.width) for i in range(self.height)], dtype=object)
        temp_snake = self.snake.copy()
        self.grid[temp_snake[-1]] = 1 #"H"
        temp_snake.pop()
        for body in temp_snake:
            self.grid[body] = 2 #"*"
        self.grid[self.food] = 3 #"X"
    def render(self, mode='human', close=False):
        print(self.grid)
                        
    
    ### SNAKE IMPLEMENTATION ###
    def move(self, action):
        ## check direction, if direction is opposite of input, continue straight
        if (self.direction %2 == action %2):
            action = self.direction
        self.direction = action
        new_pos = self.snake[-1]
        if (action == 0): #down
            new_pos_row = self.snake[-1][0]+1
            new_pos_col = self.snake[-1][1]
        if (action == 1): #left
            new_pos_row = self.snake[-1][0]
            new_pos_col = self.snake[-1][1]-1
        if (action == 2): #up
            new_pos_row = self.snake[-1][0]-1
            new_pos_col = self.snake[-1][1]
        if (action == 3): #right
            new_pos_row = self.snake[-1][0]
            new_pos_col = self.snake[-1][1]+1
        # Check if snake CRASHES
        new_pos = (new_pos_row, new_pos_col)
        self.check_game_over(new_pos)
        if (self.game_over):
            self.updategrid()
            return
        self.snake.append(new_pos)
        
        # Check if snake grows
        if (new_pos != self.food):
            self.snake.popleft()
        else:
            self.score +=1
            self.add_food()
        self.updategrid()
    def add_food(self):
        x_coord = np.random.randint(0,self.width-1)
        y_coord = np.random.randint(0,self.height-1)
        self.food = (x_coord, y_coord)
        while(self.food in self.snake):
            x_coord = np.random.randint(0,self.width-1)
            y_coord = np.random.randint(0,self.height-1)
            self.food = (x_coord, y_coord)
            
    def check_game_over(self, new_pos):
        # Check if snake hits itself
        if new_pos in self.snake:
            self.game_over = True 
        # Check if snake hits the wall
        if ((new_pos[0]<0) or (new_pos[0]>=self.width) or (new_pos[1]<0) or (new_pos[1]>=self.height)):
            self.game_over = True
    
    
    
    

In [73]:
def random_game():
    env = SnakeEnv()
    while(not env.game_over):
        action = np.random.randint(0,4)
        env.step(action)
        env.render()
        print(action, env.game_over)
    return

In [74]:
def montecarlo(game, number_of_games, prints=False):
    def random_game_initial_score(game, initial_move):
        grid,_, lost, _ = game.step(initial_move)
        #print(grid)
        while( not lost):
            grid,_, lost, _ = game.step(np.random.randint(4))
            #print(grid)
        return initial_move, game.score
    def get_move(game):
        grids = []
        for i in range(number_of_games):
            grids.append(copy.deepcopy(game))
        move_scores = defaultdict(list)
        i=0
        for games in grids:
            initial_move, score = random_game_initial_score(games, i%4)
            move_scores[initial_move].append(score)
            i+=1
        score = -1
        for key, value in move_scores.items():
            if (prints == True):
                print(key, sum(value)/len(value))
            if (sum(value)/len(value) > score):
                move = key
                score = sum(value)/len(value)
        if (prints == True):
            print(move, score)
        return move
    states = []
    lost = False
    while (not lost):
        move = get_move(game)
        if (prints == True):
            game.render()
        _, _, lost, _ = game.step(move)
    return(game.score)
            

In [75]:
#Monte Carlo Testing
env = SnakeEnv()
montecarlo(env,10,True)


0 0.0
1 0.0
2 0.0
3 1.0
3 1.0
[[0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0]
 [0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0]
 [0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0]
 [0.0 0.0 0.0 0.0 0.0 0.0 2 0.0 0.0 0.0]
 [0.0 0.0 0.0 0.0 0.0 0.0 1 3 0.0 0.0]
 [0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0]
 [0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0]
 [0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0]
 [0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0]
 [0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0]]
0 1.0
1 1.0
2 1.0
3 1.0
0 1.0
[[0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0]
 [0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0]
 [0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0]
 [0.0 0.0 0.0 0.0 0.0 0.0 2 0.0 0.0 0.0]
 [0.0 0.0 0.0 0.0 0.0 0.0 2 1 0.0 0.0]
 [0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0]
 [0.0 0.0 0.0 3 0.0 0.0 0.0 0.0 0.0 0.0]
 [0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0]
 [0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0]
 [0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0]]
0 1.0
1 1.0
2 1.0
3 2.0
3 2.0
[[0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0]
 [0.0 0.0 0.0 0.0 0

1

In [64]:
print(env.width)


10


In [76]:
#DQN Approach
np.random.seed(123)
env = SnakeEnv()

model = Sequential()
model.add(Flatten(input_shape=(1,env.height,env.width)))
model.add(Dense(16))
model.add(Activation('relu'))
#model.add(Dense(32))
#model.add(Activation('relu'))
#model.add(Dense(256))
#model.add(Activation('relu'))
#model.add(Dense(256))
#model.add(Activation('relu'))
model.add(Dense(nb_actions))
model.add(Activation('linear'))
print(model.summary())

memory = SequentialMemory(limit=100000, window_length=1)

policy = EpsGreedyQPolicy()
dqn = DQNAgent(model=model, nb_actions=nb_actions, memory=memory, nb_steps_warmup=1000,
               target_model_update=1e-2, policy=policy, batch_size=256, train_interval = 4, gamma=.99)

_________________________________________________________________
Layer (type)                 Output Shape              Param #   
flatten_9 (Flatten)          (None, 100)               0         
_________________________________________________________________
dense_22 (Dense)             (None, 16)                1616      
_________________________________________________________________
activation_22 (Activation)   (None, 16)                0         
_________________________________________________________________
dense_23 (Dense)             (None, 4)                 68        
_________________________________________________________________
activation_23 (Activation)   (None, 4)                 0         
Total params: 1,684
Trainable params: 1,684
Non-trainable params: 0
_________________________________________________________________
None


In [77]:
dqn.compile(Adam(lr = .00025), metrics = ['mse'])

In [85]:
dqn.fit(env, nb_steps = 100000, visualize = False, verbose = 1)

Training for 100000 steps ...
Interval 1 (0 steps performed)
  914/10000 [=>............................] - ETA: 17s - reward: 0.0000e+00done, took 1.829 seconds


<keras.callbacks.History at 0xb2a367c88>