In [None]:
import os
from io import StringIO

import numpy as np

from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle

import chess
from chess import pgn
from chess import Board, Piece

import keras
from keras import Model
from keras.models import load_model
from keras.callbacks import ModelCheckpoint
from keras.layers import Input, Flatten, Dense, Activation
from keras.layers import Conv2D, MaxPooling2D, ZeroPadding2D
from keras.layers import GlobalAveragePooling2D
from keras.layers import Add, Multiply, Concatenate
from keras.layers import Dropout
from keras.layers import BatchNormalization as BatchNorm

In [None]:
from google.colab import drive
drive.mount("/content/drive")

os.chdir("/content/drive/My Drive/fishnet")

In [None]:
board_h = 8
board_w = 8
board_size = board_h * board_w
board_shape = (board_h, board_w)

pieces = [
    chess.ROOK, 
    chess.KNIGHT,
    chess.BISHOP, 
    chess.QUEEN,
    chess.KING,
    chess.PAWN
]

assert set(pieces) == set(range(1, 7))

In [None]:
def np_board_to_chess(b):
    board = chess.Board("8/8/8/8/8/8/8/8 w - -")
    
    for i in range(len(b)):
        if b[i] == 0:
            continue
        
        piece = chess.Piece(abs(b[i]), b[i] > 0)

        board.set_piece_at(i, piece)
    
    return board


def decompose_index(idx, d, k):
    ret = np.zeros(k, dtype=np.int8)
    
    for i in range(k - 1, 0, -1):
        idx, comp = divmod(idx, d)
        ret[i] = comp
    
    ret[0] = idx
    
    return ret


class Vector:
    def __init__(self, elem_shape, dtype):
        self.data = np.zeros((0,) + elem_shape, dtype=dtype)
        self.size = 0
        self.dtype = dtype
    
    def _resize_data(self, cap):
        old_data = self.data
        self.data = np.zeros((cap,) + old_data.shape[1:], dtype=self.dtype)
        self.data[:old_data.shape[0]] = old_data
    
    def new_element(self):
        cap = len(self.data)
        
        if self.size == cap:
            if cap == 0:
                self._resize_data(1)
            else:
                self._resize_data(2 * cap)
        
        ret = self.data[self.size:self.size+1]
        self.size += 1
        
        return ret

    def shrink(self):
        self.data = self.data[:self.size].copy()
    
    def to_array(self):
        self.shrink()
        return self.data

In [None]:
dataset_x = Vector((board_size,), np.int8)
dataset_y = Vector((), np.int16)

with open("best_games.txt") as f:
    for line_idx, line in enumerate(f):
        if line_idx % 50 == 0:
            print(line_idx, end="\r")

        game = pgn.read_game(StringIO(line))

        board = game.board()

        for move_idx, move in enumerate(game.mainline_moves()):
            if move_idx % 2 == 1:
                board_view = board.mirror()
                src = chess.square_mirror(move.from_square)
                dst = chess.square_mirror(move.to_square)
            else:
                board_view = board
                src = move.from_square
                dst = move.to_square

            b = dataset_x.new_element()[0]

            for i, piece in board_view.piece_map().items():
                if piece.color == chess.WHITE:
                    b[i] = piece.piece_type
                else:
                    b[i] = piece.piece_type * -1

            dataset_y.new_element()[0] = src * board_size + dst

            board.push(move)

print()

dataset_x = dataset_x.to_array()
dataset_y = dataset_y.to_array()

In [None]:
def save_array(name):
    np.save(f"{name}.npy", globals()[name])


def save_arrays(*args):
    for arg in args:
        save_array(arg)

        
def load_array(name):
    globals()[name] = np.load(f"{name}.npy")

    
def load_arrays(*args):
    for arg in args:
        load_array(arg)

In [None]:
train_x, test_x, train_y, test_y = train_test_split(dataset_x, dataset_y, test_size=0.1)

In [None]:
save_arrays("train_x", "test_x", "train_y", "test_y")

In [None]:
load_arrays("train_x", "test_x", "train_y", "test_y")

In [None]:
def generate_batches(x, y, batch_size):
    X_flat_shape = (batch_size, board_size, len(pieces))
    X_shape = (batch_size,) + board_shape + (len(pieces),)
    
    Y_shape = (batch_size, board_size**2)
    
    while True:
        x, y = shuffle(x, y)
        
        for i in range(0, len(x), batch_size):
            j = min(len(x), i + batch_size)

            X = np.zeros(X_flat_shape, dtype=np.float32)

            for k, board in enumerate(x[i:j]):
                for pos, piece in enumerate(board):
                    if piece > 0:
                        X[k, pos, piece - 1] = 1
                    elif piece < 0:
                        X[k, pos, -piece - 1] = -1

            Y = np.zeros(Y_shape, dtype=np.float32)
            
            for k, move in enumerate(y[i:j]):
                Y[k, move] = 1

            yield X.reshape(X_shape), Y

In [None]:
batch_size = 256
n_epochs = 10

In [None]:
act = "elu"

model_input = Input(shape=board_shape + (len(pieces),))
model = model_input

def conv_block(x, n_channels):
    x = Conv2D(n_channels, 3, padding="same", activation=act)(x)
    x = BatchNorm()(x)
    return x

def dense_block(x, n_layers, n_channels, growth):
    xs = [x]
    
    for i in range(n_layers):
        cb = conv_block(x, growth)
        xs.append(cb)
        x = Concatenate()(xs)
        n_channels += growth

    return x, n_channels

def transition_block(x, n_channels):
    x = Conv2D(n_channels, 1, activation=act)(x)
    x = BatchNorm()(x)
    return x

def se_block(x, n_channels):
    w = x
    w = GlobalAveragePooling2D()(w)
    w = Dense(n_channels // 16, activation=act)(w)
    w = Dense(n_channels, activation="sigmoid")(w)
    
    return Multiply()([x, w])

n_channels = 32
max_channels = 32
n_dense_blocks = 3
n_layers_per_dense_block = 12

model = conv_block(model, n_channels)

for i in range(n_dense_blocks):
    model, n_channels = dense_block(model, n_layers_per_dense_block, n_channels, 12)
    
    model = se_block(model, n_channels)
    
    if n_channels > max_channels and i < n_dense_blocks - 1:
        n_channels //= 2
        model = transition_block(model, n_channels)
    
model = GlobalAveragePooling2D()(model)


model = Dense(board_size**2, activation="softmax")(model)
model = Model(inputs=model_input, outputs=model)

opt = "adam"

model.compile(
    optimizer=opt,
    loss="categorical_crossentropy",
    metrics=["categorical_accuracy", "top_k_categorical_accuracy"]
)

model.summary()

In [None]:
mcp_save = ModelCheckpoint(
    "model.hdf5", 
    verbose=1,
    save_best_only=True
)

model.fit_generator(
    generate_batches(train_x, train_y, batch_size),
    validation_data=generate_batches(test_x, test_y, batch_size),
    steps_per_epoch = (len(train_x) + batch_size - 1) // batch_size,
    validation_steps = (len(test_x) + batch_size - 1) // batch_size,
    epochs=n_epochs,
    callbacks=[mcp_save]
)

In [None]:
model = load_model("model.hdf5")

In [None]:
model.summary()