**AUTHORIZATIONS**

In [None]:
from google.colab import drive
drive.mount('./mount')

**IMPORT LIBRARIES**

In [3]:
import collections
import numpy as np

import torch
import torch.nn as nn

**Parameters**

In [5]:
# Board
row = 10

# Neural Network and Q-learning
GAMMA = 0.99  # discount
BATCH_SIZE = 64
REPLAY_SIZE = 10000
LEARNING_RATE = 1e-4
SYNC_TARGET_LOOPS = 1000
REPLAY_START_SIZE = 10000

# epsilon -> chance for random action
EPSILON_DECAY_LAST_FRAME = 10**5
EPSILON_START = 1.0
EPSILON_FINAL = 0.02

reward_dict = {"hit self": -100, "hit boundary": -100, "eat apple": 30, "step": -1, "see apple": 1, "a lot of steps": -10, "win game": 1000}

**Environment (Snake game)**

In [40]:
class SnakeSensors:
    def __init__(self, x, board_info, moves):
        """
        Return array where are saved all information about snake and his sensors,
        which will get pass to agent through neural network for choosing action
        Sensors:
            1. distance to walls (4 dim)
            2. distance to apple (8 dim)
            3. distance to body snake (8 dim)
            4. snake head direction
            5. snake tail direction
            Note: distance is converted between 0.0 to 1.0
        """
        self.dis = x    # row/x/distance
        self.target = board_info
        self.moves = moves
    
    def update_sensor_board(self, board, apple, snake):
        self.board = board
        self.apple_pos = apple
        self.head_y, self.head_x = snake[-1]

        # distance to boundaries in 4 dimensions
        self.dis_y_down = self.dis - self.head_y - 1    # -1 because grid is from 0 to boundary
        self.dis_y_up = self.dis - self.dis_y_down - 1
        self.dis_x_right = self.dis - self.head_x - 1
        self.dis_x_left = self.dis - self.dis_x_right - 1
    
    def check_up(self, target):
        if self.head_y == 0: return 0
        for i in range(self.head_y-1, -1, -1):
            if self.board[i, self.head_x] == target:
                return 1
        return 0
    
    def check_down(self, target):
        if self.head_y == self.dis-1: return 0
        for i in range(self.head_y+1, self.dis):
            if self.board[i, self.head_x] == target:
                return 1
        return 0
    
    def check_right(self, target):
        if self.head_x == self.dis-1: return 0
        for i in range(self.head_x+1, self.dis):
            if self.board[self.head_y, i] == target:
                return 1
        return 0
    
    def check_left(self, target):
        if self.head_x == 0: return 0
        for i in range(self.head_x-1, -1, -1):
            if self.board[self.head_y, i] == target:
                return 1
        return 0
    
    def check_right_up(self, target):
        # choose shorter distance
        distance = self.dis_y_up if self.dis_y_up < self.dis_x_right else self.dis_x_right
        if distance == 0: return 0
        for n in range(1, distance+1):
            if self.board[self.head_y-n, self.head_x+n] == target:
                return 1
        return 0
    
    def check_right_down(self, target):
        distance = self.dis_y_down if self.dis_y_down < self.dis_x_right else self.dis_x_right
        if distance == self.dis: return 0
        for n in range(1, distance+1):
            if self.board[self.head_y+n, self.head_x+n] == target: 
                return 1
        return 0
    
    def check_left_up(self, target):
        distance = self.dis_y_up if self.dis_y_up < self.dis_x_left else self.dis_x_left
        if distance == 0: return 0
        for n in range(1, distance+1):
            if self.board[self.head_y-n, self.head_x-n] == target: 
                return 1
        return 0

    def check_left_down(self, target):
        distance = self.dis_y_down if self.dis_y_down < self.dis_x_left else self.dis_x_left
        if distance == self.dis: return 0
        for n in range(1, distance+1):
            if self.board[self.head_y+n, self.head_x-n] == target: 
                return 1
        return 0
    
    def all_eight_directions(self, target):
        to_up = self.check_up(target)                       # up
        to_right_up = self.check_right_up(target)           # right up
        to_right = self.check_right(target)                 # right
        to_right_down = self.check_right_down(target)       # right down
        to_down = self.check_down(target)                   # down
        to_left_down = self.check_left_down(target)         # left down
        to_left = self.check_left(target)                   # left
        to_left_up = self.check_left_up(target)             # left up
        return np.array([to_up, to_right_up, to_right, to_right_down, to_down, to_left_down, to_left, to_left_up])
    
    def distance_to_wall(self):
        return np.round(np.array([self.dis_y_up, self.dis_x_right, self.dis_y_down, self.dis_x_left]) / self.dis, 1)

    def see_apple(self):
        return self.all_eight_directions(self.target["apple"])

    def see_it_self(self):
        return self.all_eight_directions(self.target["snake"]) # skip snake head
    
    def get_head_direction(self, head_dir):
        if np.array_equal(head_dir, self.moves["up"]): return np.array([1, 0, 0, 0])   # up
        elif np.array_equal(head_dir, self.moves["right"]): return np.array([0, 1, 0, 0])  # right
        elif np.array_equal(head_dir, self.moves["down"]): return np.array([0, 0, 1, 0])  # down
        elif np.array_equal(head_dir, self.moves["left"]): return np.array([0, 0, 0, 1]) # left
    
    def get_tail_direction(self, tail_dir):
        if np.array_equal(tail_dir, self.moves["up"]): return np.array([1, 0, 0, 0])  # up
        elif np.array_equal(tail_dir, self.moves["right"]): return np.array([0, 1, 0, 0]) # right
        elif np.array_equal(tail_dir, self.moves["down"]): return np.array([0, 0, 1, 0]) # down
        elif np.array_equal(tail_dir, self.moves["left"]): return np.array([0, 0, 0, 1])# left


###############################################################################################
class Environment():
    def __init__(self, n_row):
        # parameters
        self.board_info = {"empty": 0, "snake": 1, "apple": 2}
        # (y, x)
        self.moves = {"up": np.array([-1, 0]), "right": np.array([0, 1]), "down": np.array([1, 0]), "left": np.array([0, -1])}
        self.reward_dict = reward_dict
        self.count_deaths = -2
        # Snake sensors for returning state (all logic)
        self.Sensors = SnakeSensors(n_row, self.board_info, self.moves)

        # Generate components
        self.x = n_row
        self.reward = 0   # reward based on every action and its consequences
        self.restart_env()
    
    def generate_grid(self):
        # generate board (grid) of zeros (always square)
        self.board = np.zeros((self.x, self.x))
    
    def generate_snake(self):
        # Randomly choose spot to generate snake
        indices = np.random.randint(2, high=self.x-2, size=2)
        y, x = indices[0], indices[1]

        self.board[y, x] = 1
        self.snake_body = np.array([[y, x-2], [y, x-1], [y, x]])


    def generate_apple(self):
        # Randomly generate apple (if there isn't already body of snake)
        while True:
            indices = np.random.randint(0, high=self.x, size=2)
            y, x = indices[0], indices[1]

            if self.board[y, x] == 0:
                self.board[y, x] = 2
                self.apple_pos = np.array([y, x])
                break
        
    def collision_with_self(self):
        # if count of eaten apples don't equel size of snake body -> collision with it self; game over
        body = []
        snake_body = self.snake_body.tolist()
        for i in snake_body:
            body.append((i[0], i[1]))
        
        if self.eaten_apples+1 != len(set(body)):
            self.done = True
            self.reward = self.reward_dict["hit self"]
    
    def collision_with_boundaries(self):
        # if snake go beyond board; game over
        if (self.snake_body[-1, 0] < 0 or self.snake_body[-1, 0] >= self.x) \
            or (self.snake_body[-1, 1] < 0 or self.snake_body[-1, 1] >= self.x):
            self.done = True
            self.reward = self.reward_dict["hit boundary"]

    def collision_with_apple(self):
        # Add another body to snake and generate another apple
        if np.array_equal(self.snake_body[-1], self.apple_pos):
            self.eaten_apples += 1
            self.reward = self.reward_dict["eat apple"]
            self.steps = 0

            self.check_for_end()
            if not self.done: 
                self.generate_apple()
        else:
            self.pop_snake_tail()
        self.get_tail_dir()

    def check_steps(self):
        # If taken (row*row) or more steps until apple is eaten -> game over
        if self.steps > (self.x*self.x//2):
            self.done = True
            self.reward = self.reward_dict["a lot of steps"]
    
    def check_for_end(self):
        # return True if snake filled whole board else False
        if np.all(self.board.all(1)):
            self.done = True
            self.game_info = {"won game": True}
    
    def restart_env(self):
        # set up parameters
        self.apple_pos = None           # position of current apple
        self.none = None
        self.head_dir = self.none       # direction of head of snake
        self.prev_direction = None
        self.steps = 0                  # every action untill apple is eaten
        self.done = False               # if env/game is finished
        self.game_info = {"won game": False} # info about state of game
        self.count_deaths += 1          # Count generations

        # Generate game grid
        self.generate_grid()
        self.generate_snake()
        self.generate_apple()

        self.eaten_apples = len(self.snake_body)           # track of eaten apple
        self.get_tail_dir()

        self.check_state()

        return self.state

    def update_board(self):
        # refresh board; write on board snake and apple
        self.generate_grid()
        for body in self.snake_body:
            self.board[body[0], body[1]] = 1
        
        self.board[self.apple_pos[0], self.apple_pos[1]] = 2

    def check_state(self):
        self.Sensors.update_sensor_board(self.board, self.apple_pos, self.snake_body)
        distance_sensor = self.Sensors.distance_to_wall()
        apple_sensor = self.Sensors.see_apple()
        snake_body_sensor = self.Sensors.see_it_self()
        if self.head_dir is not self.none: head_dir_sensor = self.Sensors.get_head_direction(self.head_dir)
        else: head_dir_sensor = np.array([0, 0, 0, 0])

        if len(self.snake_body) > 1: tail_dir_sensor = self.Sensors.get_tail_direction(self.tail_dir)
        else: tail_dir_sensor = np.array([0, 0, 0, 0])

        self.state = np.concatenate((distance_sensor, apple_sensor, snake_body_sensor, head_dir_sensor, tail_dir_sensor), axis=0)
    
    def select_random_action(self):
        list_of_action = np.array([0, 1, 2, 3])
        return np.random.choice(list_of_action, size=1)[0]
    
    def get_head_dir(self, direction):
        # snake cannot kill himself by going opposite direction
        if direction == 0 and self.prev_direction != 2:         self.head_dir = self.moves["up"]
        elif direction == 1 and self.prev_direction != 3:       self.head_dir = self.moves["right"]
        elif direction == 2 and self.prev_direction != 0:       self.head_dir = self.moves["down"]
        elif direction == 3 and self.prev_direction != 1:       self.head_dir = self.moves["left"]

        self.prev_direction = direction
    
    def get_tail_dir(self):
        if len(self.snake_body) > 1:
            self.tail_dir = tuple((np.array([self.snake_body[1, 0], self.snake_body[1, 1]] \
                 - np.array([self.snake_body[0, 0], self.snake_body[0, 1]]))).reshape(1, -1)[0])
    
    def reward_for_steps(self):
        # if snake see apple, than get reward otherwise is punished
        if 1 in self.state[4:12]:
            self.reward = self.reward_dict["see apple"]
        else:
            self.reward = self.reward_dict["step"]
    
    def pop_snake_tail(self):
        if self.eaten_apples == len(self.snake_body)-1 and not self.done:
            self.snake_body = np.delete(self.snake_body, 0, 0)
            self.reward_for_steps()

    def snake_move(self, direction):
        """
        Algorithm: add new part before head in corresponding direction and delete tail
        """
        self.get_head_dir(direction)
        head = self.snake_body[-1]
        new_head = np.array([head[0]+self.head_dir[0], head[1]+self.head_dir[1]])
        self.snake_body = np.vstack((self.snake_body, new_head))
        # deletng tail is handled in collision_with_apple() with getting tail direction and reward for nothing happening

        self.steps += 1
    
    def action(self, act):
        # Set up
        self.reward = 0
        if self.done: 
            self.restart_env()

        # Check game logic
        self.snake_move(act)
        self.collision_with_self()
        self.collision_with_apple()
        self.collision_with_boundaries()
        self.check_steps()
        self.check_for_end()

        # Update
        if not self.done: 
            self.update_board()
            self.check_state()

        return self.state, self.reward, self.done, self.game_info

**DQN AI**

In [51]:
class Neural_Network(nn.Module):
    def __init__(self, input_size=28, lr=LEARNING_RATE):
        super().__init__()
        """
        Input to NN:
            [distance to wall, see apple, see it self, head direction, tail direction] -> 28 elements
        output of NN:
            [0: up    1: right    2: down    3: left] -> 4 elements
        """
        # Neural Network
        
        self.model = nn.Sequential(
            nn.Linear(input_size, 20),
            nn.ReLU(),
            nn.Linear(20, 12),
            nn.ReLU(),
            nn.Linear(12, 4),
            nn.Sigmoid()
        )

        self.loss_f = nn.BCELoss()
        self.optimizer = torch.optim.Adam(self.parameters(), lr=lr)
    
    def forward(self, input_tensor):
        return self.model(input_tensor)

class ExperienceBuffer:
    def __init__(self, capacity):
        self.buffer = collections.deque(maxlen=capacity)

    def __len__(self):
        return len(self.buffer)

    def append(self, experience):
        self.buffer.append(experience)

    def sample(self, batch_size):
        indices = np.random.choice(len(self.buffer), batch_size, replace=False)
        states, actions, rewards, dones, next_states = zip(*[self.buffer[idx] for idx in indices])
        return np.array(states), np.array(actions), np.array(rewards, dtype=np.float32), \
               np.array(dones, dtype=np.uint8), np.array(next_states)

class Agent:
    def __init__(self, env, buffer):
        self.env = env
        self.buffer = buffer
        self.Experience = collections.namedtuple('Experience', field_names=['state', 'action', 'reward', 'done', 'new_state'])
        self._reset(env)

    def _reset(self, env):
        self.state = env.restart_env()
        self.total_reward_ = 0.0

    def play_step(self, net, env, epsilon=0.0, device="cpu"):
        done_reward = None

        if np.random.random() < epsilon:
            act = env.select_random_action()
        else:
            state_a = np.array([self.state], copy=False)
            state_v = torch.from_numpy(state_a).to(device, dtype=torch.float32)
            q_vals_v = net.forward(state_v)
            _, act_v = torch.max(q_vals_v, dim=1)
            act = int(act_v.item())

        # do step in the environment
        new_state, reward, is_done, _ = env.action(act)
        self.total_reward_ += reward

        exp = self.Experience(self.state, act, reward, is_done, new_state)
        self.buffer.append(exp)
        self.state = new_state

        if is_done:
            done_reward = self.total_reward_
            self._reset(env)
        return done_reward

class DQN(Agent):
    def __init__(self, env, buffer, net, load=False):
        super().__init__(env, buffer)
        self.device = self.select_device()
        # Networks
        self.net = Neural_Network().to(self.device)
        self.target_net = Neural_Network().to(self.device)

        # parameters
        self.total_rewards = []
        self.best_mean_reward = None
        self.mean_reward = None
        self.index = 0
        self.epsilon = EPSILON_START
        self.episode = 0

        self.agent_info = {}

        # loading sequence
        if load: self.load()

    def select_device(self):
        if torch.cuda.is_available():
            torch.set_default_tensor_type(torch.cuda.FloatTensor)
            print("using cuda:", torch.cuda.get_device_name(0))
        return torch.device("cuda" if torch.cuda.is_available() else "cpu")

    def load(self, load_index=True, load_episode=True):
        self.net.load_state_dict(torch.load("mount/My Drive/Colab Notebooks/snake/net.dat", map_location=torch.device(self.device)))
        self.target_net.load_state_dict(self.net.state_dict())
        if not load_index:
          with open("mount/My Drive/Colab Notebooks/snake/index", 'rb') as f:
            self.index = np.load(f)[0]
        with open("mount/My Drive/Colab Notebooks/snake/total_rewards", 'rb') as f:
          self.total_rewards = np.load(f).tolist()
        with open("mount/My Drive/Colab Notebooks/snake/count_deaths", 'rb') as f:
          self.env.count_deaths = np.load(f)[0]
        if not load_episode:
          with open("mount/My Drive/Colab Notebooks/snake/episode", 'rb') as f:
              self.episode = np.load(f)[0]
    
    def save(self):
        torch.save(self.net.state_dict(), "mount/My Drive/Colab Notebooks/snake/net.dat")
        with open("mount/My Drive/Colab Notebooks/snake/index", 'wb') as f:
          np.save(f, np.array([self.index]))
        with open("mount/My Drive/Colab Notebooks/snake/total_rewards", 'wb') as f:
          np.save(f, self.total_rewards)
        with open("mount/My Drive/Colab Notebooks/snake/count_deaths", 'wb') as f:
          np.save(f, np.array([self.env.count_deaths]))
        with open("mount/My Drive/Colab Notebooks/snake/episode", 'wb') as f:
            np.save(f, np.array([self.episode]))
    
    def api(self):
        return (self.env.board, self.env.state, self.epsilon, self.mean_reward, \
                self.env.steps, self.env.count_deaths, \
                self.env.eaten_apples, self.episode, self.env.game_info)
    
    def calc_loss(self, batch, device="cpu"):
        # unpack batch
        states, actions, rewards, dones, next_states = batch

        # convert everything from batch to torch tensors and move it to device
        states_v = torch.tensor(states).to(device, dtype=torch.float32)
        next_states_v = torch.tensor(next_states).to(device, dtype=torch.float32)
        actions_v = torch.tensor(actions).to(device, dtype=torch.int64)
        rewards_v = torch.tensor(rewards).to(device, dtype=torch.float32)
        done_mask = torch.ByteTensor(dones).to(device)
        done_mask = done_mask.to(torch.bool)

        # get output from NNs which is used for calculating state action value with discount
        state_action_values = self.net.forward(states_v)
        state_action_values = state_action_values.gather(1, actions_v.unsqueeze(-1)).squeeze(-1)
        next_state_values = self.target_net.forward(next_states_v).max(1)[0]
        next_state_values[done_mask] = 0.0
        next_state_values = next_state_values.detach()

        expected_state_action_values = next_state_values * GAMMA + rewards_v
        # Calculate NN loss
        return self.net.loss_f(state_action_values, expected_state_action_values)
    
    def simulate(self, print_info=True):
        # Training AI
        self.index += 1
        self.epsilon = max(EPSILON_FINAL, EPSILON_START - self.index / EPSILON_DECAY_LAST_FRAME)

        self.epsilon = 0.02

        reward = self.play_step(self.net, self.env, self.epsilon, device=self.device)

        if reward is not None:

            self.total_rewards.append(reward)
            self.mean_reward = np.mean(self.total_rewards[-100:])
            
            if self.best_mean_reward is None or self.best_mean_reward < self.mean_reward:
                self.save()

                if self.best_mean_reward is not None:
                    self.agent_info = {"Generation": self.env.count_deaths, \
                                       "Mean reward": self.mean_reward, \
                                       "Epsilon": self.epsilon, "Episode": self.episode}
                    if print_info: print(self.agent_info)

                self.best_mean_reward = self.mean_reward

            if self.env.game_info["won game"]:
                print("Solved in %d frames!" % self.index)
                self.save()
                return

            """
            if self.best_mean_reward > self.mean_reward and self.epsilon < 0.1:
                self.episode += 1
                self.load(load_index=False, load_episode=False)
            """

        if len(self.buffer) < REPLAY_START_SIZE:
            return
        
        # After certain amount time target net become first net
        if self.index % SYNC_TARGET_LOOPS == 0:
            self.target_net.load_state_dict(self.net.state_dict())
          
        # Calculate loss of NN and train it
        self.net.optimizer.zero_grad()
        batch = self.buffer.sample(BATCH_SIZE)
        loss_t = self.calc_loss(batch, device=self.device)
        loss_t.backward()
        self.net.optimizer.step()



**Train**

In [None]:
    env = Environment(row)
    buffer = ExperienceBuffer(REPLAY_SIZE)
    net = Neural_Network()
    dqn_agent = DQN(env, buffer, net, load=True)

    flag = True
    count = 0

    while flag:      
      all_boards = []
      dqn_agent.simulate()
      board, state, epsilon, mean_reward, steps, generation, score, episode, game_info = dqn_agent.api()

      if game_info["won game"]:
          flag = False
          print("finished")
      
      if count % 50000 == 0:
        print("Mean reward", mean_reward, "Generation", generation, "Epsilon", epsilon, "Mean Reward", mean_reward)
      count += 1