In [1]:
import snake, queue, random, threading, math
import tensorflow as tf
import numpy as np
import tkinter as tk

In [2]:
stack_size = 4

In [3]:
# Q-Network
q = tf.keras.Sequential()

input_1 = (stack_size, 15, 15, 1)
input_2 = (stack_size, 5, 5, 1)

q.add(tf.keras.layers.Conv2D(15, 4,
                             activation="relu", input_shape=input_1[1:]))
q.add(tf.keras.layers.Conv2D(5, 2,
                             activation="relu", input_shape=input_2[1:]))
q.add(tf.keras.layers.Dense(32, activation="relu"))
q.add(tf.keras.layers.Dense(4, activation="relu"))
q.compile(optimizer="Adam", loss="mse")

# Replay Memory
replay_memory = []

In [4]:
class experience:
    states = None
    action = None
    reward = None
    transitions = None
    
    def __init__(self, states, action, reward, transitions):
        self.states = states
        self.action = action
        self.reward = reward
        self.transitions = transitions

In [5]:
class agent:
    directions = ["UP", "DOWN", "LEFT", "RIGHT"]
    phi = queue.deque()

    def __init__(self, game, return_queue, replay_memory, q, epsilon, discount, rate, batch_size):
        self.game = game
        self.return_queue = return_queue
        self.replay_memory = replay_memory
        self.q = q

        self.epsilon = epsilon
        self.discount = discount
        self.rate = rate
        self.batch_size = batch_size

    def epsilon_action(self):
        if random.randrange(0, 1) <= self.epsilon:
            action = self.directions[random.randint(0, 3)]
        else:
            action = self.directions[np.argmax(q.predict(self.phi))]
        return action

    def step(self):
        action = self.epsilon_action()
        self.game.step(action)

        while self.return_queue.empty():
            if not self.game.running:
                break

        state_reward = return_queue.get()

        phi_last = np.array(self.phi)
        self.phi.appendleft(np.expand_dims(state_reward[0], axis=2))

        phi_current = np.array(self.phi)

        if len(self.phi) >= stack_size:
            self.phi.pop()
            self.replay_memory.append(experience(
                phi_last, action, state_reward[1], phi_current))

    def get_batch(self):
        batch = np.empty([self.batch_size], dtype=experience)
        if len(replay_memory) != 0:
            for x in range(self.batch_size):
                batch[x] = replay_memory[random.randint(0, len(replay_memory) - 1)]

        return batch

    def loss(self, phi, reward, action):
        q_of_phi = self.q.predict(phi)
        yj = reward + self.discount * np.amax(q_of_phi)
        return math.pow(yj - q_of_phi[self.directions.index(action)], 2)

    def learn(self):
        # This probably isn't correct but might as well try it
        if len(replay_memory) != 0:
            batch = self.get_batch()
            # print(type(batch))
            # q.train_on_batch(batch)
            print(self.loss(batch[0].states, batch[0].reward, batch[0].action))


In [6]:
def train_agent(agent, epoch):
    print("Training epoch " + str(epoch) + ".")

    while agent.game.running:
        agent.step()
        agent.learn()

    print("Training ended, agent scored " + str(game.score) + " points.")

In [7]:
game = snake.game(queue.Queue(1))
for x in range(100):
    return_queue = queue.Queue(1)

    dqn = agent(game, return_queue, replay_memory, q, 0.1, 0.95, 0.1, 10)

    training_thread = threading.Thread(target=train_agent, args=(dqn, x))
    training_thread.start()

    game.start(return_queue)
    training_thread.join()

game.w.destroy()

Training epoch 0.
