# Collect Darkchess Image or Video Datasets

In [None]:
import os
import time

import cv2
import IPython.display as display
from datasets import Dataset
from datasets.features import Features, Image, ClassLabel

from config import CHESS
from eye.imgproc import (
    rotate,
    get_board_frame,
    get_form_frame,
    get_chess_frame
)

# ========================================
# Custom Settings
# ========================================
CAP_SRC = None  # set your IP camera URL, or video file path
SAVE_VIDEO_PATH = None  # set your video file path to save, or None to disable
SAVE_IMAGE_DIR = None # set your image directory to save by huggingface datasets save_to_disk, or None to disable
DISPLAY_CAP = False # display captured frame

SPECIFIC_CHESS_LABEL = None
EACH_CHESS_IMAGE_NUM = 36000 # ROTATE 360 x CROP SHIFT 4 x FRAME COUNT 25 = 36000

saved_image_data = {
    "image": [],
    "label": []
}

# Example: process board frame to chess frame and save
def process_image_to_save(frame, frame_count) -> None:
    if frame_count % 5 != 0 or len(saved_image_data["image"]) >= EACH_CHESS_IMAGE_NUM:
        return

    for angle in range(360): # ROTATE 360
        for shift in range(0, 8, 2): # CROP SHIFT 4
            img = get_board_frame(frame)
            img = get_form_frame(img, 20)
            img = get_chess_frame(img, shift)
            img = rotate(img, angle)
            saved_image_data["image"].append(img)
            saved_image_data["label"].append(SPECIFIC_CHESS_LABEL)

    print(f"Now image data number: {len(saved_image_data['image'])}", end="\r")

# ========================================
# End of Custom Settings
# ========================================

cap = cv2.VideoCapture(CAP_SRC)
display_handle = display.display(None, display_id=True)
out = cv2.VideoWriter(
    SAVE_VIDEO_PATH,
    cv2.VideoWriter_fourcc(*"mp4v"), 
    10,
    (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)))
)

while True:
    try:
        frame = cap.read()[1]
        if frame is None:
            print("\nEnd of video")
            break

        if SAVE_IMAGE_DIR:
            os.makedirs(SAVE_IMAGE_DIR, exist_ok=True)
            process_image_to_save(frame, int(cap.get(cv2.CAP_PROP_POS_FRAMES)))

        if SAVE_VIDEO_PATH:
            os.makedirs(os.path.dirname(SAVE_VIDEO_PATH), exist_ok=True)
            out.write(frame)
        
        if DISPLAY_CAP:
            time.sleep(0.5)  # 2 fps, buffer time
            encoded_frame = cv2.imencode(".jpg", frame)[1]
            display_handle.update(display.Image(data=encoded_frame.tobytes()))

    except KeyboardInterrupt:
        break

cap.release()
out.release()
display_handle.update(None)
if SAVE_IMAGE_DIR:
    print(f"Saving {len(saved_image_data['image'])} images to {SAVE_IMAGE_DIR}")
    image_datasets = Dataset.from_dict(
        mapping=saved_image_data,
        features=Features({"image": Image("L"), "label": ClassLabel(num_classes=len(CHESS[:-2]), names=[item["code"] for item in CHESS[:-2]])})
    ).train_test_split(train_size=0.8, test_size=0.2, shuffle=False)
    image_datasets.save_to_disk(SAVE_IMAGE_DIR)

# Login to HuggingFace (just login once)

In [None]:
from huggingface_hub import interpreter_login
interpreter_login()

# Push Board Dataset to Hugging Face
> manually write `metadata.jsonl` is required.

In [None]:
from datasets import load_dataset

dataset = load_dataset("datasets/darkchess-board")
dataset.push_to_hub("ryanlinjui/darkchess-board")

# Combine All Chess Datasets and Push it

In [None]:
from datasets import DatasetDict
from datasets import load_from_disk, concatenate_datasets

from config import CHESS

train_datasets = []
test_datasets = []

for i in range(len(CHESS[:-2])):
    print(f"Loading dataset {i}......")
    dataset = load_from_disk(f"datasets/darkchess-single/{i}")
    train_datasets.append(dataset["train"])
    test_datasets.append(dataset["test"])

datasets = DatasetDict({
    "train": concatenate_datasets(train_datasets),
    "test": concatenate_datasets(test_datasets)
})
datasets.push_to_hub("ryanlinjui/darkchess-single")

# Load Dataset from HuggingFace

In [None]:
import numpy as np
from datasets import load_dataset
from tensorflow.keras.utils import to_categorical

IMG_SHAPE = (56, 56) # set your image shape to resize

datasets = load_dataset("ryanlinjui/darkchess-single")
label_names = datasets["train"].features["label"].names
num_classes = len(label_names)

# Train dataset
train_images = np.expand_dims(
    (np.array([np.array(img["image"].resize(IMG_SHAPE)) for img in datasets["train"]]).astype("float32")) / 255.0, axis=-1
)
train_labels = to_categorical(np.array([img["label"] for img in datasets["train"]]), num_classes=num_classes)

# Test dataset
test_images  = np.expand_dims(
    (np.array([np.array(img["image"].resize(IMG_SHAPE)) for img in datasets["test"]]).astype("float32")) / 255.0, axis=-1
)
test_labels  = to_categorical(np.array([img["label"] for img in datasets["test"]]), num_classes=num_classes)

# Check dataset information
print("Number of classes:", num_classes)
print("Label names:", label_names)
print(f"Train images shape: {train_images.shape}")
print(f"Train labels shape: {train_labels.shape}")
print(f"Test images shape: {test_images.shape}")
print(f"Test labels shape: {test_labels.shape}")

# List devices, check for CPU, GPU, MPU availability

In [None]:
from tensorflow.python.client import device_lib 
print(device_lib.list_local_devices())

# Construct the model

In [None]:
from keras import Sequential
from keras.optimizers import Adam
from keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout

model = Sequential()
model.add(Conv2D(filters=64, kernel_size=(3, 3), strides=(1, 1), input_shape=(IMG_SHAPE[0], IMG_SHAPE[1], 1), padding="same", activation="relu"))
model.add(Conv2D(filters=64, kernel_size=(3, 3), strides=(1, 1), padding="same", activation="relu"))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Conv2D(filters=128, kernel_size=(3, 2), strides=(1, 1), padding="same", activation="relu"))
model.add(Conv2D(filters=128, kernel_size=(3, 3), strides=(1, 1), padding="same", activation="relu"))
model.add(MaxPooling2D(pool_size=(2,2)))
model.add(Conv2D(filters=256, kernel_size=(3, 3), strides=(1, 1), padding="same", activation="relu"))
model.add(Conv2D(filters=256, kernel_size=(3, 3), strides=(1, 1), padding="same", activation="relu"))
model.add(Conv2D(filters=256, kernel_size=(3, 3), strides=(1, 1), padding="same", activation="relu"))
model.add(MaxPooling2D(pool_size=(2,2)))
model.add(Conv2D(filters=512, kernel_size=(3, 3), strides=(1, 1), padding="same", activation="relu"))
model.add(Conv2D(filters=512, kernel_size=(3, 3), strides=(1, 1), padding="same", activation="relu"))
model.add(Conv2D(filters=512, kernel_size=(3, 3), strides=(1, 1), padding="same", activation="relu"))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Flatten())
model.add(Dense(units=4096, activation="relu"))
model.add(Dropout(rate=0.5))
model.add(Dense(units=4096, activation="relu"))
model.add(Dropout(rate=0.5))
model.add(Dense(units=1000, activation="relu"))
model.add(Dropout(rate=0.5))
model.add(Dense(units=14, activation="softmax"))

model.compile(
    optimizer=Adam(lr=0.00001, beta_1=0.9, beta_2=0.999, epsilon=1e-08),
    loss = "categorical_crossentropy",
    metrics=["accuracy"]
)

model.summary()

# Train the model

In [None]:
history = model.fit(x=train_images, y=train_labels, epochs=15, batch_size=360, validation_data=(test_images, test_labels), verbose=1)

# Evaluate the model

In [None]:
history = model.evaluate(x=test_images, y=test_labels, batch_size=360)

# Check predictions on test data

In [None]:
plt.figure(figsize=(10,10))
predict_index = 0
for i in range(100):
    plt.subplot(10,10,i+1)
    plt.xticks([])
    plt.yticks([])
    plt.grid(False)
    a = gray_frame[predict_index+i].reshape([-1,56,56,1])
    m = model.predict(a)
    plt.imshow(gray_frame[predict_index+i])
    plt.xlabel(m.argmax())
plt.show()

In [None]:
model.save("cnn_darkchess_model.h5")
model.save_weights("cnn_darkchess_weights.h5")

# Push Model to Hugging Face

In [None]:
import os

from tensorflow.keras.utils import plot_model
from tensorflow.keras.models import load_model
from huggingface_hub import push_to_hub_keras

from config import DEFAULT_MODEL_NAME, DEFAULT_WEIGHTS_NAME

model_dir = ""

model = load_model(os.path.join(model_dir, DEFAULT_MODEL_NAME))
model.load_weights(os.path.join(model_dir, DEFAULT_WEIGHTS_NAME))
plot_model(model, show_shapes=True, show_dtype=True, show_layer_activations=True, expand_nested=True)
model.summary()

push_to_hub_keras(model, "")

In [None]:
import pandas as pd 
predictions = model.predict_classes(gray_frame1) 
pd.crosstab(gray_label, predictions, rownames=["實際值"], colnames=["預測值"])

In [None]:
plt.plot(history.history["accuracy"], label="accuracy")
plt.plot(history.history["loss"], label = "loss")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.ylim([-0.05, 1.05])
plt.xlim([0, 5])
plt.legend(loc="lower right")