In [1]:
import datasets as ds
import chess
import numpy as np
import tensorflow as tf
from IPython.display import clear_output
import os

%load_ext tensorboard

dataset = ds.load_dataset("laion/strategic_game_chess", split="train", streaming=True)

Resolving data files:   0%|          | 0/1599 [00:00<?, ?it/s]

In [2]:
one_hot_mapping = {".": np.array([0,0,0,0,0,0]),
                   "P": np.array([1,0,0,0,0,0]),
                   "N": np.array([0,1,0,0,0,0]),
                   "B": np.array([0,0,1,0,0,0]),
                   "R": np.array([0,0,0,1,0,0]),
                   "Q": np.array([0,0,0,0,1,0]),
                   "K": np.array([0,0,0,0,0,1]),
                   "p": np.array([-1,0,0,0,0,0]),
                   "n": np.array([0,-1,0,0,0,0]),
                   "b": np.array([0,0,-1,0,0,0]),
                   "r": np.array([0,0,0,-1,0,0]),
                   "q": np.array([0,0,0,0,-1,0]),
                   "k": np.array([0,0,0,0,0,-1])}

def process_batch(games_array):
    boards = []
    moves = []
    for game in games_array:
        board = chess.Board()
        board_states = []
        board_states_labels = []
        if len(game) < 20: continue
        for move_str in game[:20]:
            move = chess.Move.from_uci(move_str)
            move_from = move.from_square
            move_to = move.to_square
            board_states_labels.append((move_from, move_to))
            board_states.append(board.copy())
            board.push(move)
        moves.append(board_states_labels[::2])
        boards.append(board_states[::2])

    boards = np.array(boards)
    moves = np.array(moves)
    boards = boards.flatten()
    moves = moves.reshape(-1, moves.shape[-1])
    return (boards, moves)

def create_tensors(boards_data, moves_labels):
    data = np.array(boards_data).astype(str)
    games_data = []
    games_labels = []
    for i in range(len(data)):
        data_split = np.array([row.split() for row in data[i].split("\n")])
        labels_split = tf.keras.utils.to_categorical(moves_labels[i], num_classes=64)
        data_encoded_state = np.zeros([8, 8, 6])
        for char, encoding in one_hot_mapping.items():
            data_encoded_state[data_split == char] = encoding
        games_data.append(np.array(data_encoded_state))
        games_labels.append(labels_split)

    train_data = tf.constant(games_data[0 : int(len(games_data) * .70)])
    train_labels = tf.constant(games_labels[0 : int(len(games_labels) * .70)])
    test_data = tf.constant(games_data[int(len(games_data) * .70) : len(games_data)])
    test_labels = tf.constant(games_labels[int(len(games_labels) * .70) : len(games_labels)])
    return train_data, train_labels, test_data, test_labels

In [3]:
model_name = 'model_vgg_test'
input_layer = tf.keras.layers.Input(shape=(8,8,6))

x = tf.keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same', strides=1)(input_layer)
x = tf.keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same', strides=1)(x)
x = tf.keras.layers.MaxPool2D(pool_size=2, strides=2, padding='same')(x)

x = tf.keras.layers.Conv2D(128, (3, 3), activation='relu', padding='same', strides=1)(x)
x = tf.keras.layers.Conv2D(128, (3, 3), activation='relu', padding='same', strides=1)(x)
x = tf.keras.layers.MaxPool2D(pool_size=2, strides=2, padding='same')(x)

x = tf.keras.layers.Conv2D(256, (3, 3), activation='relu', padding='same', strides=1)(x)
x = tf.keras.layers.Conv2D(256, (3, 3), activation='relu', padding='same', strides=1)(x)
x = tf.keras.layers.Conv2D(256, (3, 3), activation='relu', padding='same', strides=1)(x)
x = tf.keras.layers.MaxPool2D(pool_size=2, strides=2, padding='same')(x)

x = tf.keras.layers.Conv2D(512, (3, 3), activation='relu', padding='same', strides=1)(x)
x = tf.keras.layers.Conv2D(512, (3, 3), activation='relu', padding='same', strides=1)(x)
x = tf.keras.layers.Conv2D(512, (3, 3), activation='relu', padding='same', strides=1)(x)
x = tf.keras.layers.MaxPool2D(pool_size=2, strides=2, padding='same')(x)

x = tf.keras.layers.Conv2D(512, (3, 3), activation='relu', padding='same', strides=1)(x)
x = tf.keras.layers.Conv2D(512, (3, 3), activation='relu', padding='same', strides=1)(x)
x = tf.keras.layers.Conv2D(512, (3, 3), activation='relu', padding='same', strides=1)(x)
x = tf.keras.layers.MaxPool2D(pool_size=2, strides=2, padding='same')(x)

x = tf.keras.layers.Flatten()(x)
x = tf.keras.layers.Dense(4096, activation ='relu')(x) 
x = tf.keras.layers.Dense(4096, activation ='relu')(x)
output1 = tf.keras.layers.Dense(64, activation="softmax", name="output1")(x)
output2 = tf.keras.layers.Dense(64, activation="softmax", name="output2")(x)

model = tf.keras.Model(inputs=input_layer, outputs=[output1, output2])
model.name = model_name

In [4]:
optimizer = tf.keras.optimizers.Adam(learning_rate=0.01)
loss_fn = tf.keras.losses.CategoricalCrossentropy()

train_loss = tf.keras.metrics.Mean('train_loss', dtype=tf.float32)
train_accuracy_1 = tf.keras.metrics.Accuracy(name='train_accuracy_1')
train_accuracy_2 = tf.keras.metrics.Accuracy(name='train_accuracy_2')

test_loss = tf.keras.metrics.Mean('test_loss', dtype=tf.float32)
test_accuracy_1 = tf.keras.metrics.Accuracy(name='test_accuracy_1')
test_accuracy_2 = tf.keras.metrics.Accuracy(name='test_accuracy_2')

train_summary_writer = tf.summary.create_file_writer(f'Logs/{model.name}/' + 'train')
test_summary_writer = tf.summary.create_file_writer(f'Logs/{model.name}/' + 'test')

In [5]:
def train_step(model, optimizer, train_data, train_labels):
    with tf.GradientTape() as tape:
        logits_1, logits_2 = model(train_data, training=True)
        loss_1 = loss_fn(train_labels[:,0,:], logits_1)
        loss_2 = loss_fn(train_labels[:,1,:], logits_2)
        loss = tf.reduce_mean(loss_1) + tf.reduce_mean(loss_2)
    grads = tape.gradient(loss, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))

    train_loss(loss)
    train_accuracy_1(tf.argmax(train_labels[:,0,:], 1), tf.argmax(logits_1, 1))
    train_accuracy_2(tf.argmax(train_labels[:,1,:], 1), tf.argmax(logits_2, 1))

def test_step(model, test_data, test_labels):
    test_preds_1, test_preds_2 = model(test_data, training=False)
    test_loss_1 = loss_fn(test_labels[:,0,:], test_preds_1)
    test_loss_2 = loss_fn(test_labels[:,1,:], test_preds_2)
    t_loss = tf.reduce_mean(test_loss_1) + tf.reduce_mean(test_loss_2)

    test_loss(t_loss)
    test_accuracy_1(tf.argmax(test_labels[:,0,:], 1), tf.argmax(test_preds_1, 1))
    test_accuracy_2(tf.argmax(test_labels[:,1,:], 1), tf.argmax(test_preds_2, 1))

In [6]:
def train_model(model, num_of_rows, batch_size, epochs, checkpoints=False, save_model=False):
    if batch_size>num_of_rows:
        return
    for epoch in range(epochs):
        print(f"Epoch: {epoch+1}/{epochs}")
        limited_dataset = dataset.take(num_of_rows)
        batched_dataset = limited_dataset.batch(batch_size)
        for step, batch in enumerate(batched_dataset):            
            print(f"Step: {step+1}/{num_of_rows//batch_size}")
            
            boards, moves = process_batch(batch['Moves'])
            train_data, train_labels, test_data, test_labels = create_tensors(boards, moves)
            
            train_step(model, optimizer, train_data, train_labels)
            with train_summary_writer.as_default():
                tf.summary.scalar('train_loss', train_loss.result(), step=step)
                tf.summary.scalar('train_accuracy_1', train_accuracy_1.result(), step=step)
                tf.summary.scalar('train_accuracy_2', train_accuracy_2.result(), step=step)

            test_step(model, test_data, test_labels)
            with test_summary_writer.as_default():
                tf.summary.scalar('test_loss', test_loss.result(), step=step)
                tf.summary.scalar('test_accuracy_1', test_accuracy_1.result(), step=step)
                tf.summary.scalar('test_accuracy_2', test_accuracy_2.result(), step=step)
                
        clear_output()    
        print(f"Epoch: {epoch+1}/{epochs}")
        print(f'Train_loss: {train_loss.result()}, Train_accuracy_1: {train_accuracy_1.result()}. Train_accuracy_2: {train_accuracy_2.result()}')
        print(f'Test_loss: {test_loss.result()}, Test_accuracy_1: {test_accuracy_1.result()}. Test_accuracy_2: {test_accuracy_2.result()}')

        if checkpoints:
            if not os.path.exists(f'./Checkpoints/{model.name}'):
                os.makedirs(f'./Checkpoints/{model.name}')
            model.save_weights(f'./Checkpoints/{model.name}/epoch_{epoch+1}_checkpoint.weights.h5')
            print(f'Checkpoint for epoch {epoch+1} created')
        
        train_loss.reset_state()
        test_loss.reset_state()
        train_accuracy_1.reset_state()
        train_accuracy_2.reset_state()
        test_accuracy_1.reset_state()
        test_accuracy_2.reset_state()
        
    if save_model:
        model.save(f'./Models/{model.name}.keras')
        print(f'Model saved as {model.name}')

In [8]:
#model.load_weights(f'./Checkpoints/{model.name}/epoch_1_checkpoint.weights.h5')

In [7]:
train_model(model=model,
            num_of_rows=1000,
            batch_size=32, 
            epochs=5,
            checkpoints=True,
            save_model=True)

Epoch: 5/5
Train_loss: 6.222309112548828, Train_accuracy_1: 0.10571428388357162. Train_accuracy_2: 0.12057142704725266
Test_loss: 6.260839462280273, Test_accuracy_1: 0.10100000351667404. Test_accuracy_2: 0.11833333224058151
Checkpoint for epoch 5 created
Model saved as model_vgg_test


In [9]:
%tensorboard --logdir logs/model_vgg_test --host localhost