# Chess Pieces Images Classification

Abstract: 

Deep learning has been identified as the state of the art model in terms of overall predictive accuracy with large datasets, particularly in image classification tasks. An abundance of models and techniques has been proposed previously and achieved promising results and the accuracy of this model tends to increase symmetrically with the number of datasets. The problem of lack of dataset is one of the limitations of the status quo of image classification. In this study, a small dataset is prepared taken from online sources is used to train three different models and comparison, however, what constitutes a good model architecture and techniques to be used for the cases having a small dataset had been the main concern in this research.

In [None]:
!pip install wandb
!wandb login

import wandb
from wandb.keras import WandbCallback

wandb.init(project="my-test-project", entity="leekiongweijie")

In [None]:
import tensorflow as tf

print(tf.__version__)
import glob
import os
import random

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tqdm

from PIL import Image
import requests
from io import BytesIO

In [None]:
from google.colab import drive

drive.mount("/content/drive")

In [None]:
DATASET_DIR = "/content/drive/MyDrive/Chess"
for pieces in os.listdir(DATASET_DIR):
    print(pieces)
    print(len(os.listdir(DATASET_DIR + "/" + pieces)))

## Helper Functions

In [None]:
def show_samples(ds):
    plt.figure(figsize=(10, 10))
    for images, labels in ds.take(1):
        for i in range(16):
            ax = plt.subplot(4, 4, i + 1)
            plt.imshow(images[i].numpy().astype("uint8"))
            plt.title(CLASS_NAMES[labels[i]])
            plt.axis("off")

In [None]:
def show_predict(model, ds, display=False, plot=False, return_labels=False):
    actual_label_list, predict_label_list = [], []
    try:
        for images, labels in ds:
            predicted = model.predict(images)
            for i in range(BATCH_SIZE):
                actual = labels[i].numpy()
                actual_label_list.append(actual)
                predict = predicted[i].argmax()
                predict_label_list.append(predict)
                if display:
                    print(f"Actual: {CLASS_NAMES[actual]}")
                    print(f"Predicted: {CLASS_NAMES[predict]}")
                if plot:
                    plt.imshow(images[i].numpy().astype("uint8"))
                    plt.axis("off")
                    plt.show()
    except:
        if return_labels:
            return actual_label_list, predict_label_list

In [None]:
def plot_results(history, epochs, title):
    acc = history.history["accuracy"]
    val_acc = history.history["val_accuracy"]

    loss = history.history["loss"]
    val_loss = history.history["val_loss"]

    epochs_range = range(epochs)

    fig, ax = plt.subplots(1, 2, figsize=(10, 4), dpi=150)
    ax[0].plot(epochs_range, acc, label="Training Accuracy")
    ax[0].plot(epochs_range, val_acc, label="Validation Accuracy")
    ax[0].legend(loc="lower right")
    ax[0].set_xlabel("Epochs")
    ax[0].set_title("Training and Validation Accuracy")
    ax[1].plot(epochs_range, loss, label="Training Loss")
    ax[1].plot(epochs_range, val_loss, label="Validation Loss")
    ax[1].legend(loc="upper right")
    ax[1].set_xlabel("Epochs")
    ax[1].set_title("Training and Validation Loss")
    plt.show()

In [None]:
def loading_data(dataset_dir, batch_size, image_size):
    train_ds = tf.keras.utils.image_dataset_from_directory(
        dataset_dir,
        validation_split=0.2,
        subset="training",
        shuffle=True,
        seed=7,
        batch_size=batch_size,
        image_size=(image_size, image_size),
    )

    val_ds = tf.keras.utils.image_dataset_from_directory(
        dataset_dir,
        validation_split=0.2,
        subset="validation",
        shuffle=True,
        seed=7,
        batch_size=batch_size,
        image_size=(IMAGE_SIZE, IMAGE_SIZE),
    )

    class_names = train_ds.class_names
    train_ds = train_ds.prefetch(buffer_size=AUTOTUNE)
    val_ds = val_ds.prefetch(buffer_size=AUTOTUNE)

    return train_ds, val_ds, class_names


AUTOTUNE = tf.data.AUTOTUNE
IMAGE_SIZE = 224
BATCH_SIZE = 16
DATASET_DIR = "/content/drive/MyDrive/Chess"
train_ds, val_ds, CLASS_NAMES = loading_data(
    dataset_dir=DATASET_DIR, batch_size=BATCH_SIZE, image_size=IMAGE_SIZE
)

In [None]:
show_samples(train_ds)

## MCNN Model

In [None]:
def augmentation_layer(x):
    x = tf.keras.layers.RandomFlip("horizontal")(x)
    x = tf.keras.layers.RandomRotation(0.1)(x)
    x = tf.keras.layers.RandomZoom(0.1)(x)
    return x


def mlp(x, filters, idx):
    x = tf.keras.layers.Conv2D(
        filters=filters, kernel_size=3, padding="same", name=f"conv_layer_{idx}"
    )(x)
    x = tf.keras.layers.Activation("relu", name=f"relu_{idx}")(x)
    # x = tf.keras.layers.BatchNormalization(name=f'normalization_layer_{idx}')(x)
    x = tf.keras.layers.MaxPooling2D(
        pool_size=(2, 2), padding="valid", name=f"pooling_layer_{idx}"
    )(x)
    # x = tf.keras.layers.Dropout(0.2, name=f'dropout_layer_{idx}')(x)
    return x


def create_mcnn_model(input_shape, num_classes, augmentation):
    inputs = tf.keras.Input(shape=input_shape, name="input_layer")
    if augmentation:
        x = augmentation_layer(inputs)
        x = tf.keras.layers.Rescaling(scale=1.0 / 255, name="rescaling_layer")(x)
    else:
        x = tf.keras.layers.Rescaling(scale=1.0 / 255, name="rescaling_layer")(inputs)
    for idx, f in enumerate([16, 32, 64, 64, 64, 64, 64]):
        x = mlp(x, f, idx + 1)
    x = tf.keras.layers.Flatten(name="flatten_layer")(x)
    x = tf.keras.layers.Dense(units=128, activation="relu", name="dense_layer")(x)
    outputs = tf.keras.layers.Dense(units=num_classes, name="output_layer")(x)
    model = tf.keras.Model(inputs=inputs, outputs=outputs, name="mcnn_model")

    model.compile(
        optimizer=tf.keras.optimizers.Adam(),
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=["accuracy"],
    )
    return model


def train_mcnn_model(input_shape, num_classes, augmentation, train_ds, val_ds, epochs):

    model = create_mcnn_model(
        input_shape=input_shape, num_classes=num_classes, augmentation=augmentation
    )

    def scheduler(epoch, lr):
        if epoch < tf.math.ceil(epochs / 2):
            return lr
        else:
            return lr * tf.math.exp(-0.1)

    lr_callback_ = tf.keras.callbacks.LearningRateScheduler(scheduler)

    early_stopping_ = tf.keras.callbacks.EarlyStopping(
        monitor="val_loss",
        min_delta=0.001,
        patience=tf.math.ceil(epochs * 0.3),
        restore_best_weights=True,
    )

    checkpoint_filepath = "/MCNN_checkpoint/cp-{epoch:04d}.ckpt"
    model_checkpoint_ = tf.keras.callbacks.ModelCheckpoint(
        filepath=checkpoint_filepath, save_weights_only=True, monitor="val_accuracy"
    )

    wandb_callback_ = WandbCallback(
        monitor="val_loss", log_weights=True, log_evaluation=True, validation_steps=5
    )

    all_callbacks = [
        early_stopping_,
        model_checkpoint_,
        wandb_callback_,
    ]  # lr_callback_

    model_history = model.fit(
        train_ds, validation_data=val_ds, epochs=epochs, callbacks=all_callbacks
    )

    model.save("/MCNN_model/MCNN_model.h5", save_format="h5")

    return model, model_history

In [None]:
INPUT_SHAPE = (IMAGE_SIZE, IMAGE_SIZE, 3)
NUM_CLASSES = 6
EPOCHS = 100

model = create_mcnn_model(
    input_shape=INPUT_SHAPE, num_classes=NUM_CLASSES, augmentation=True
)
tf.keras.utils.plot_model(
    model, to_file="mcnn_model_architecture.png", show_shapes=True
)

In [None]:
tf.random.set_seed(7)
run = wandb.init(project="train-mcnn", job_type="train")
MCNN, MCNN_history = train_mcnn_model(
    input_shape=INPUT_SHAPE,
    num_classes=NUM_CLASSES,
    augmentation=True,
    train_ds=train_ds,
    val_ds=val_ds,
    epochs=EPOCHS,
)

In [None]:
# pd_MCNN_history = pd.DataFrame(MCNN_history.history)

# plot_results(
#     history=MCNN_history,
#     epochs=pd_MCNN_history.shape[0],
#     title="Multi Convolutional Neural Network",
# )

# pd_MCNN_history.to_csv("pd_MCNN_history.csv", index=False)

MCNN.save("MCNN.h5")

In [None]:
actual_label_list, predict_label_list = show_predict(
    model=MCNN, ds=val_ds, display=False, plot=False, return_labels=True
)

tf.math.confusion_matrix(actual_label_list, predict_label_list, num_classes=NUM_CLASSES)

## TLPM Model

In [None]:
def create_tlpm_model(input_shape, num_classes):
    base_model = tf.keras.applications.NASNetMobile(
        input_shape=input_shape, include_top=False, weights="imagenet"
    )

    fine_tune_threshold = int(len(base_model.layers) * 0.4)
    for layer in base_model.layers[:fine_tune_threshold]:
        layer.trainable = False

    inputs = tf.keras.Input(shape=input_shape, name="input_layer")
    x = tf.keras.applications.nasnet.preprocess_input(inputs)
    x = base_model(x, training=False)
    x = tf.keras.layers.GlobalAveragePooling2D(name="global_avg_pool_layer")(x)
    x = tf.keras.layers.Dropout(rate=0.2, name="dropout_layer")(x)
    outputs = tf.keras.layers.Dense(units=num_classes, name="dense_layer")(x)
    model = tf.keras.Model(inputs=inputs, outputs=outputs, name="tlpm_model")
    base_learning_rate = 0.0001
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=base_learning_rate),
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=["accuracy"],
    )
    return model


def train_tlpm_model(input_shape, num_classes, train_ds, val_ds, epochs):

    model = create_tlpm_model(input_shape=input_shape, num_classes=num_classes)

    def scheduler(epoch, lr):
        if epoch < tf.math.ceil(epochs / 2):
            return lr
        else:
            return lr * tf.math.exp(-0.1)

    lr_callback_ = tf.keras.callbacks.LearningRateScheduler(scheduler)

    early_stopping_ = tf.keras.callbacks.EarlyStopping(
        monitor="val_loss",
        min_delta=0.001,
        patience=tf.math.ceil(epochs * 0.5),
        restore_best_weights=True,
    )

    checkpoint_filepath = "/TLPM_checkpoint/cp-{epoch:04d}.ckpt"
    model_checkpoint_ = tf.keras.callbacks.ModelCheckpoint(
        filepath=checkpoint_filepath, save_weights_only=True, monitor="val_accuracy"
    )

    wandb_callback_ = WandbCallback(
        monitor="val_loss", log_weights=True, log_evaluation=True, validation_steps=5
    )

    all_callbacks = [early_stopping_, model_checkpoint_, wandb_callback_]

    model_history = model.fit(
        train_ds, validation_data=val_ds, epochs=epochs, callbacks=all_callbacks
    )

    model.save("/TLPM_model/TLPM_model.h5", save_format="h5")

    return model, model_history

In [None]:
INPUT_SHAPE = (IMAGE_SIZE, IMAGE_SIZE, 3)
NUM_CLASSES = 6
EPOCHS = 100

model = create_tlpm_model(input_shape=INPUT_SHAPE, num_classes=NUM_CLASSES)
tf.keras.utils.plot_model(
    model, to_file="tlpm_model_architecture.png", show_shapes=True
)

In [None]:
tf.random.set_seed(7)

run = wandb.init(project="train-tlpm", job_type="train")

TLPM, TLPM_history = train_tlpm_model(
    input_shape=INPUT_SHAPE,
    num_classes=NUM_CLASSES,
    train_ds=train_ds,
    val_ds=val_ds,
    epochs=EPOCHS,
)

In [None]:
# pd_TLPM_history = pd.DataFrame(TLPM_history.history)

# plot_results(
#     history=TLPM_history,
#     epochs=pd_TLPM_history.shape[0],
#     title="Transfer Learning with Pretrained Model",
# )

# pd_TLPM_history.to_csv("pd_TLPM_history.csv", index=False)

TLPM.save("TLPM.h5")

In [None]:
actual_label_list, predict_label_list = show_predict(
    model=TLPM, ds=val_ds, display=False, plot=False, return_labels=True
)

tf.math.confusion_matrix(actual_label_list, predict_label_list, num_classes=NUM_CLASSES)

## Test and Compare Model

In [None]:
MCNN_model = tf.keras.models.load_model("/content/MCNN.h5")
TLPM_model = tf.keras.models.load_model("/content/TLPM.h5")

In [None]:
model_list = [MCNN_model, TLPM_model]

In [None]:
def use_model(image_url):
    response = requests.get(image_url)
    img = Image.open(BytesIO(response.content))
    img_array = tf.keras.preprocessing.image.img_to_array(img)
    resized_img = tf.image.resize(img_array, size=(224, 224))
    scale_img = (resized_img - np.min(resized_img)) / (
        np.max(resized_img) - np.min(resized_img)
    )
    plt.imshow(scale_img)
    plt.axis("off")
    plt.show()
    img_array = tf.expand_dims(scale_img, 0)
    predictions_list = list()
    for model in model_list:
        predictions = CLASS_NAMES[model.predict(img_array).argmax()]
        predictions_list.append(predictions)

    print("\n")
    print("Model MCNN Predict:", predictions_list[0])
    print("Model TLPM Predict:", predictions_list[1])
    print("\n")

In [None]:
use_model(image_url=input("Enter image url: "))