In [3]:
%matplotlib inline

import numpy as np
import pickle
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data

from sklearn.model_selection import train_test_split


torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
torch.manual_seed(999)
np.random.seed(0)

In [4]:
channels = 2
filters = 64

n_epocs = 10000
batch_size = 4096
test_size = 0.2
learning_rate = 0.001 * (batch_size / 1024.0)
momentum = 0.0

In [5]:
class Connect4Dataset(data.Dataset):
    def __init__(self, boards, values):
        assert len(boards) == len(values)
        self.boards = boards
        self.values = values
        
    def __len__(self):
        return len(self.boards)
    
    def __getitem__(self, idx):
        value = self.values[idx].item()
        if value == 1.0:
            a = 0
        elif value == 0.5:
            a = 1
        elif value == 0.0:
            a = 2
        return self.boards[idx], a

boards = torch.load('/home/richard/Downloads/connect4_boards.pth').numpy()
values = torch.load('/home/richard/Downloads/connect4_values.pth').numpy()

# Here we don't want to have the player to move channel
boards = boards[:, 3 - channels:]

board_train, board_test, value_train, value_test = train_test_split(boards, values, test_size=test_size, random_state=42)

In [6]:
# augment data
np.random.shuffle(value_train)
np.random.shuffle(board_train)

print(len(value_train), len(board_train))

train = Connect4Dataset(torch.from_numpy(board_train), torch.from_numpy(value_train))
test = Connect4Dataset(torch.from_numpy(board_test), torch.from_numpy(value_test))

train_gen = data.DataLoader(train, batch_size, shuffle=False)
test_gen = data.DataLoader(test, batch_size, shuffle=False)

54045 54045


In [9]:
from src.connect4.utils import NetworkStats as info

convolutional_layer = \
    nn.Sequential(nn.Conv2d(in_channels=channels,
                            out_channels=filters,
                            kernel_size=3,
                            stride=1,
                            padding=1,
                            dilation=1,
                            groups=1,
                            bias=False),
                  nn.BatchNorm2d(filters),
                  nn.LeakyReLU())


# Input with N * filters * (6,7)
# Output with N * filters * (6,7)
class ResidualLayer(nn.Module):
    def __init__(self):
        super(ResidualLayer, self).__init__()
        self.conv1 = nn.Conv2d(filters, filters, 3, padding=1, bias=False)
        self.conv2 = nn.Conv2d(filters, filters, 3, padding=1, bias=False)
        self.batch_norm1 = nn.BatchNorm2d(filters)
        self.batch_norm2 = nn.BatchNorm2d(filters)
        self.relu = nn.LeakyReLU()

    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.batch_norm1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.batch_norm2(out)

        out += residual
        out = self.relu(out)
        return out


# Input with N * filters * (6,7)
# Output with N * 1 * 1
class ValueHead_1(nn.Module):
    def __init__(self):
        super(ValueHead, self).__init__()
        self.conv1 = nn.Conv2d(filters, 1, 1)
        self.batch_norm = nn.BatchNorm2d(1)
        self.relu = nn.LeakyReLU()
        self.fcN = nn.Sequential(*[nn.Linear(info.area, info.area) for _ in range(4)])
        self.fc2 = nn.Linear(info.area, 1)
        self.tanh = torch.nn.Tanh()
        self.w1 = nn.Parameter(torch.tensor(1.0), requires_grad=False)
        self.w2 = nn.Parameter(torch.tensor(0.5), requires_grad=False)

    def forward(self, x):
        x = self.conv1(x)
        x = self.batch_norm(x)
        x = self.relu(x)
        x = x.view(x.shape[0], 1, -1)
        x = self.fcN(x)
        x = self.relu(x)
        x = self.fc2(x)
        x = self.tanh(x)
#         map from [-1, 1] to [0, 1]
        x = (x + self.w1) * self.w2
        return x

# Input with N * filters * (6,7)
# Output with N * 3
class ValueHead_Classifier(nn.Module):
    def __init__(self):
        super(ValueHead_Classifier, self).__init__()
        self.conv1 = nn.Conv2d(filters, 1, 1)
        self.batch_norm = nn.BatchNorm2d(1)
        self.relu = nn.LeakyReLU()
        self.fcN = nn.Sequential(*[nn.Linear(info.area, info.area) for _ in range(4)])
        self.fc2 = nn.Linear(info.area, 3)

    def forward(self, x):
        x = self.conv1(x)
        x = self.batch_norm(x)
        x = self.relu(x)
        x = x.view(x.shape[0], 1, -1)
        x = self.fcN(x)
        x = self.relu(x)
        x = self.fc2(x)
        x = x.view(-1, 3)
        return x

def init_uniform(m):
    if type(m) == nn.Linear:
        nn.init.uniform_(m.weight)
    elif type(m) == nn.Conv2d:
        nn.init.uniform_(m.weight)
    elif type(m) == nn.BatchNorm2d:
        nn.init.uniform_(m.weight)
#         nn.init.constant_(m.bias, 0)
        
def init_normal(m):
    if type(m) == nn.Linear:
        nn.init.normal_(m.weight, std=0.0002)
    elif type(m) == nn.Conv2d:
        nn.init.normal_(m.weight, std=0.0002)
    elif type(m) == nn.BatchNorm2d:
        nn.init.normal_(m.weight, std=0.0002)
#         nn.init.constant_(m.bias, 0)

net = nn.Sequential(convolutional_layer,
                    nn.Sequential(*[ResidualLayer() for _ in range(4)]),
                    ValueHead_Classifier())

net.apply(init_normal)

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
net.to(device)

Sequential(
  (0): Sequential(
    (0): Conv2d(2, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): LeakyReLU(negative_slope=0.01)
  )
  (1): Sequential(
    (0): ResidualLayer(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (batch_norm1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (batch_norm2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): LeakyReLU(negative_slope=0.01)
    )
    (1): ResidualLayer(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (batch_norm1): BatchNorm2d(64, eps=1e-05, momentum=0

In [10]:
# criterion = nn.MSELoss()
# criterion = nn.L1Loss()
weight = torch.tensor([float(len(value_train) - len(value_train[value_train == i])) / len(value_train) for i in [1.0, 0.5, 0.0]])
weight = weight.to(device)
criterion = nn.CrossEntropyLoss(weight=weight)
optimizer = optim.SGD(net.parameters(), lr=learning_rate, momentum=momentum)

In [11]:
%%time

for epoch in range(n_epocs):
    
#     net = net.train()
    running_loss = 0.0
    for i, d in enumerate(train_gen, 0):
        # get the inputs
        board, value = d
#         print(board, value)

        board, value = board.to(device), value.to(device)
        
        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = net(board)
        loss = criterion(outputs, value)
        loss.backward()
        optimizer.step()

        # print statistics
        running_loss += loss.item()
        step = int(1.0 * len(train) / batch_size)
        if i % step == step - 1:
            print('[%d, %5d] loss: %.3f' %
                  (epoch + 1, i + 1, running_loss / step))
            running_loss = 0.0

    # validate
    with torch.set_grad_enabled(False):
        net = net.eval()
        correct = {i : 0 for i in range(3)}
        total = {i : 0 for i in range(3)}
        for board, value in test_gen:
            board, value = board.to(device), value.to(device)

            outputs = net(board)
            y = nn.functional.softmax(outputs, dim=1)
            prediction_class = torch.max(outputs, dim=1)[1]

            if batch_size == 1:
                total[value.item()] += 1
                if categories == value:
                    correct[value.item()] += 1 
            else:
                for k in correct:
                    idx = (value == k).nonzero()
                    total[k] += len(idx)
                    correct[k] += len(torch.eq(prediction_class[idx], value[idx]).nonzero())
            
        for k in correct:
            print('Category, # Predictions, Accuracy of the network on the test:  %s, %d,  %d' % (
                k,
                total[k],
                correct[k]))
# #             print('Average value predicted by the network: ')
        print("Example:\nvalue:  ", value[0], "\ny:  ", y[0], "\nprediction:  ", prediction_class[0])
        

print('Finished Training')

[1,    13] loss: 1.123
Category, # Predictions, Accuracy of the network on the test:  0, 8876,  0
Category, # Predictions, Accuracy of the network on the test:  1, 1295,  1295
Category, # Predictions, Accuracy of the network on the test:  2, 3341,  0
Example:
value:   tensor(0, device='cuda:0') 
y:   tensor([0.3368, 0.3663, 0.2969], device='cuda:0') 
prediction:   tensor(1, device='cuda:0')
[2,    13] loss: 1.119
Category, # Predictions, Accuracy of the network on the test:  0, 8876,  0
Category, # Predictions, Accuracy of the network on the test:  1, 1295,  1295
Category, # Predictions, Accuracy of the network on the test:  2, 3341,  0
Example:
value:   tensor(0, device='cuda:0') 
y:   tensor([0.3396, 0.3620, 0.2985], device='cuda:0') 
prediction:   tensor(1, device='cuda:0')
[3,    13] loss: 1.116
Category, # Predictions, Accuracy of the network on the test:  0, 8876,  0
Category, # Predictions, Accuracy of the network on the test:  1, 1295,  1295
Category, # Predictions, Accuracy of

KeyboardInterrupt: 

In [None]:
# validate
with torch.set_grad_enabled(False):
    net = net.eval()
    correct = {i : 0 for i in [0, 0.5, 1]}
    total = {i : 0 for i in [0, 0.5, 1]}
    for board, value in test_gen:
        board, value = board.to(device), value.to(device)

        outputs = net(board)
        categories = categorise_predictions(outputs)

        for k in correct:
            idx = (categories == k).nonzero()
            total[k] += len(idx)
            correct[k] += (categories[idx] == value[idx]).nonzero().sum().item()
    for k in correct:
        print('Category, # Predictions, Accuracy of the network on the test%s: %d %d %%' % (
            k,
            total[k],
            (100 * float(correct[k]) / float(total[k])) if total[k] != 0 else 0.))

In [None]:
assert(False)
# save that crap
torch.save({
    'net_state_dict': net.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'loss': loss
},
    open('/home/richard/Downloads/nn.pth', 'wb'))

In [None]:
# alternatively load it
assert(False)
checkpoint = torch.load('/home/richard/Downloads/nn.pth')
net.load_state_dict(checkpoint['net_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
loss = checkpoint['loss']

In [None]:
dataiter = iter(testloader)
images, labels = dataiter.next()

# print images
imshow(torchvision.utils.make_grid(images))
print('GroundTruth: ', ' '.join('%5s' % classes[labels[j]] for j in range(4)))