In [2]:
import random
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
from copy import deepcopy as dc
from game import Game
from heuristics import *

In [3]:
class DeepQNetworkConnect4(nn.Module):
    # def __init__(self, env):
    def __init__(self):
        super().__init__()
        self.conv = nn.Conv2d(1, 10, kernel_size=4, stride=1, padding=0)
        self.fc1 = nn.Linear(10*3*4, 42)
        self.fc2 = nn.Linear(42, 20)
        self.fc3 = nn.Linear(20, 7)


    def forward(self, x):
        x = x.unsqueeze(0)  # Add an extra dimension for the channels
        x = F.relu(self.conv(x))
        x = x.view(-1, 10*3*4)  # Flatten the tensor
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x.squeeze(0)

In [4]:
class ReplayBuffer:

    def __init__(self, max_frames):
        self.max_frames = max_frames
        self.buffer = []

    def add(self, frame):
        self.buffer.append(frame)
        if len(self.buffer) > self.max_frames:
            del self.buffer[0:len(self.buffer)-self.max_frames]

    def sample(self, num_samples):
        # Ensure we don't pick the same frame twice
        # Record the random indices picked from elements in the buffer
        sample_nums = set()
        while len(sample_nums) < num_samples:
            sample_nums.add(random.randrange(len(self.buffer)))
        experiences = [self.buffer[i] for i in sample_nums]
        return experiences

In [5]:
def calculate_epsilon(step, epsilon_start, epsilon_finish, total_timesteps, exploration_fraction):
    finish_step = total_timesteps * exploration_fraction
    if step > finish_step:
        return epsilon_finish
    epsilon_range = epsilon_start - epsilon_finish
    return epsilon_finish + (((finish_step - step) / finish_step) * epsilon_range)

In [6]:
# HYPERPARAMETERS
seed = 0
buffer_size = 10000
learning_rate = 2e-3 # should be lower
ideal_batch_size = 1000
total_timesteps = 200
train_games = 20
epsilon_start = 1
epsilon_finish = 0
exploration_fraction = 0.95
gamma = 0.99

In [7]:
q_network = DeepQNetworkConnect4()
old_q_network = DeepQNetworkConnect4()
buffer = ReplayBuffer(buffer_size)

In [8]:
def QLoss(experiences, q_network = None, target_network = None):
    # mean squared of (r + gamma * max_a Q(s', a)) - Q(s, a)
    loss = 0
    for i in experiences:
        state, action, reward, next_state = i.state, i.action, i.reward, i.next_state
        # change state shape to (6, 7)
        state = np.array(state).reshape(1, 6, 7)
        if reward == None:
            next_state = np.array(next_state).reshape(1, 6, 7)
        state_tensor = torch.from_numpy(state).float()
        if reward == None:
            next_state_tensor = torch.from_numpy(next_state).float()
            loss += (gamma * torch.max(target_network(next_state_tensor)) - q_network(state_tensor)[action]) ** 2
        if next_state is None:
            # huber loss instead of MSE
            loss += (reward - q_network(state_tensor)[action]) ** 2
    return loss

def train(q_network, opponent, epsilon_initial_value = 0):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.backends.cudnn.deterministic = True if seed > 0 else False

    # Initialize action-value function Q and target network
    # TODO: Add extra agents to play against
    target_network = dc(q_network)
    optimiser = torch.optim.Adam(q_network.parameters(), learning_rate)

    epsilon_start = epsilon_initial_value
    rewards = []
    losses = []
    for iter in range(int(total_timesteps)):
        for step in range(int(train_games)):
            epsilon = calculate_epsilon(iter, epsilon_start, epsilon_finish, total_timesteps, exploration_fraction)
            # Generate games and add experiences
            g = Game()
            # TODO: Why is the first player dominating? first_player is random, should be 50/50
            g.playGame(agent1 = q_network, agent2 = opponent, epsilon = epsilon, first_player = random.randint(1, 2))
            rewards.append(g.state)
            for experience in g.experiences:
                buffer.add(experience)

        # print(sum(rewards[-20:-1]))

        batch = buffer.sample(min(len(buffer.buffer), ideal_batch_size))
        # states and next states should be floats (same as the OUTPUT)
        # brackets are required to turn generator into a list

        # get loss
        loss = QLoss(batch, q_network, target_network)
        losses.append(loss.item())
        if iter % 10 == 0:
            print(iter, loss, epsilon, sum(rewards)/len(rewards))
            # comment out if no need
            torch.save(q_network, 'q_conv_network.pth')
            
        '''
        if len(rewards) > 500:
            print(iter, loss, epsilon, sum(rewards[-500:])/500)
        else:
            print(iter, loss, epsilon, sum(rewards)/len(rewards))
        '''
        # backprop
        with torch.no_grad():
            optimiser.zero_grad()
            loss.backward()
            optimiser.step()

    # graph losses
    plt.plot(losses)
    plt.show()
    return q_network

In [None]:
# VS Random
q_networks = []
total_timesteps = 2000
q_network = train(q_network, 1, 0.9)
q_networks.append(q_network)
torch.save(q_network, 'q_conv_network.pth')

In [None]:
# VS Minimax
q_network = torch.load('q_conv_network.pth')

# 1000 timesteps done, 1000 left to go
# fix based on how far training got
total_timesteps = 100

q_network = train(q_network, 3, 0.9)
q_networks.append(q_network)
torch.save(q_network, 'q_conv_network.pth')

In [None]:
# VS Itself
q_network = torch.load('q_conv_network.pth')

# 1000 timesteps done, 0 left to go
# fix based on how far training got
total_timesteps = 500
old_q_network = dc(q_network)
q_network = train(q_network, old_q_network, 0.9)
q_networks.append(q_network)
torch.save(q_network, 'q_conv_network.pth')

In [None]:
g = Game()
g.playGame(agent1 = q_network, agent2 = 3, epsilon = 0, first_player = 1, pick_display = 1)