# Project setup

Import necessary libraries

In [None]:
# System
import os
import shutil
import glob
import time

# Image handling
from PIL import Image, ImageOps

# Numerical
import numpy as np
import pandas as pd
import random

# Plotting
import matplotlib.pyplot as plt
import seaborn as sns

# CNN
from keras.utils import to_categorical, set_random_seed
from keras.models import Sequential, load_model, Model
from keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, BatchNormalization, Dropout
from keras.optimizers import Adam
from keras.callbacks import EarlyStopping, ModelCheckpoint

# Tuning
import keras_tuner as kt

# Machine Learning
from sklearn.calibration import LabelEncoder
from sklearn.model_selection import train_test_split, KFold, GridSearchCV
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import classification_report, accuracy_score, f1_score, log_loss

For reproducibility, we define the seed and ensure all libraries are using this seed.

In [None]:
SEED = 42
random.seed(SEED)
set_random_seed(SEED)
np.random.seed(SEED)

First we establish the directory to where our data is stored, and where we ultimately want to store our cleaned, processed data as well as any plots we generate.

In [None]:
INIT_DIR = 'chinese-handwriting-recognition-hsk-1/chinese-handwriting/'

INIT_TRAIN_DIR = os.path.join(INIT_DIR, 'CASIA-HWDB_Train/Train/')
INIT_TEST_DIR = os.path.join(INIT_DIR, 'CASIA-HWDB_Test/Test/')

DIR = 'data/'
PLOT_DIR = 'report/img/'

# Data investigation

First lets have a look at how many data classes we have in the dataset. The dataset has been split into test and train already so lets check how many classes we have in each.

In [None]:
image_classes = os.listdir(INIT_TRAIN_DIR)

if image_classes == os.listdir(INIT_TEST_DIR):
    print('The same classes are in each folder')
else:
    print('The two folders contain different classes')

Next, lets combine the the train and test data into one directory for simplicity.

We start by creating a new directory for our preprocessed data.

In [None]:
# Remove old data directory
if os.path.exists(DIR):
    shutil.rmtree(DIR)
    
# Create the new directory
os.mkdir(DIR)
for image_class in image_classes:
    path = os.path.join(DIR, image_class)
    os.makedirs(path)

Now let's combine the train and test data into one directory. This is specified by the `DIR` variable.

In [None]:
def combineDirectories():
    for image_class in image_classes:
        images = []

        # Fetch images from train dir
        train_path = os.path.join(INIT_TRAIN_DIR, image_class)
        images += [os.path.join(train_path, file) for file in os.listdir(train_path)]

        # Fetch images from test dir
        test_path = os.path.join(INIT_TEST_DIR, image_class)
        images += [os.path.join(test_path, file) for file in os.listdir(test_path)]

        # Iterate over the splits and images and copy them to the data directory
        for i, image in enumerate(images):
            new_filename = f"{i+1}.png"
            destination_path = os.path.join(DIR, image_class, new_filename)
            shutil.copy(image, destination_path)
            
combineDirectories()

# Data Pre-processing

First lets iterate through all the images and and confirm they are all `.png` and black & white 

In [None]:
def checkGrayscale():
    # Loop through each subdirectory and file in the directory
    for subdir, dirs, images in os.walk(DIR):
        for image in images:
            # Only check .png's
            if image.lower().endswith('.png'):
                file_path = os.path.join(subdir, image)

                # Check if the channel is set to 'L' = grayscale
                with Image.open(file_path) as img:
                    if img.mode != 'L':
                        print(f"{file_path} is not grayscale.")
            else:
                print("Not a png file: ")
checkGrayscale()

Next let's iterate through all the images and check if any have an aspect ratio that is not 1:1.

In [None]:
def checkAspectRatios():
    image_sizes = []

    # Iterate through all the images and check if any do not have a 1:1 aspect ratio
    for subdir, dirs, images in os.walk(DIR):
        for image in images:
            file_path = os.path.join(subdir, image)
            with Image.open(file_path) as img:
                image_sizes.append(img.size[0])

                # Both images dimensions must be the same for 1:1 ratio
                if img.size[0] != img.size[1]:
                    print(file_path, img.size)
                    
    return image_sizes
                    
image_sizes = checkAspectRatios()

Since all the images are square, this makes it easier to investigate their image sizes.

Let's plot a distribution of all the image sizes.

In [None]:
print(f"Smallest dimension: {min(image_sizes)}")

# Plotting
sns.displot(image_sizes, height=4, aspect=4/3)
plt.xlabel('Dimension')
plt.ylabel('Frequency')
plt.tight_layout()
plt.savefig(f"{PLOT_DIR}image_dimension_distribution.png", dpi=1000)
plt.show()

As you can see from the graph, the images are not all the same size. This will cause issues when we try to train the model, so we need to resize all the images to the same size. Additionally one of the images is only 3x3 which is way too small to be useful, so we will enforce a minimum image size.

In [None]:
# Constants
MIN_IMAGE_SIZE = 20
IMAGE_SIZE = 48

def resizeImages():
    # Iterate through all the images and resize to a fixed size
    for subdir, dirs, images in os.walk(DIR):
        for image in images:
            file_path = os.path.join(subdir, image)
            with Image.open(file_path) as img:
                current_size = img.size[0]
                
                # Images too small
                if current_size < MIN_IMAGE_SIZE:
                    os.remove(file_path)
                    continue
                
                # Only resize if not the correct size already
                if current_size != IMAGE_SIZE:
                    img = img.resize((IMAGE_SIZE, IMAGE_SIZE), Image.Resampling.LANCZOS)
                
                # Invert the image so it looks cooler
                img = ImageOps.invert(img)
                img.save(file_path)
                
resizeImages()

Now lets check how many images we have in each class and see how balanced the classes are.

In [None]:
# Returns a dataframe of how many samples in each class
def get_class_counts():
    class_counts = {}
    
    for subdir in glob.glob(os.path.join(DIR, '*')):
        file_count = len(glob.glob(os.path.join(subdir, '*')))
        class_counts[subdir.split('/')[-1]] = file_count

    return pd.DataFrame.from_dict(class_counts, orient='index', columns=['Count'])

df_class_counts = get_class_counts() 

# Plot the distribution of the class counts
sns.displot(list(df_class_counts['Count']), height=4, aspect=4/3)
plt.xlabel('Samples per class')
plt.ylabel('Class count')

# Show and save
plt.tight_layout()
plt.savefig(f"{PLOT_DIR}class_imbalance.svg")
plt.show()

# Print some more detailed statistics
print(df_class_counts.describe([0.05, 0.25, 0.75, 0.95]))

From this we can see that the classes are fairly balanced, let's quickly have a look at the slight imbalance in the classes.
We can arbitrarily choose a balance metric such as outside the range of 2 standard deviations from the mean.

In [None]:
mean = df_class_counts['Count'].mean()
std = df_class_counts['Count'].std()
threshold = 2 * std

outlier_counts = df_class_counts[np.abs(df_class_counts['Count'] - mean) > threshold]
print(outlier_counts)

As we can see, the imbalance is minimal. Because of this, we will not be using any class balancing techniques.

Let's have a look some examples of the images in the dataset.

In [None]:
# Get list of all .png images in the directory and its subdirectories
images = glob.glob(os.path.join(DIR, '**', '*.png'), recursive=True)

# Randomly select 4 images
random_images = random.sample(images, 4)

# Create the figure
fig, axs = plt.subplots(2, 2, figsize=(8, 8))

# Populate the figure
for i in range(2):
    for j in range(2):
        img_path = random_images[2 * i + j]
        img = Image.open(img_path)
        axs[i][j].imshow(img)
        # axs[i][j].set_title(f"Character: {img_path.split('/')[1]}", fontproperties=prop, fontsize = 12)

# Show and save the figure
plt.tight_layout()
fig.savefig(f"{PLOT_DIR}example_images.png", dpi=1000)
plt.show()

# NumPy Array Conversion

Now we can convert all the images into one single numpy array with the corresponding labels for each image.

In [None]:
def get_numpy_data(classes):
    images = []
    labels = []
    
    # Iterate over each image
    for image_class in classes:
        path = os.path.join(DIR, image_class)
        for image in os.listdir(path):
            image_path = os.path.join(path, image)

            # Open the image and convert it to a numpy array
            img = Image.open(image_path)
            img_array = np.array(img)
            
            images.append(img_array)
            labels.append(image_class)
            
    return np.array(images), np.array(labels)

images, labels = get_numpy_data(image_classes)

And finally we can save the numpy arrays to the root directory.

In [None]:
np.save('images.npy', images)
np.save('labels.npy', labels)

# Test-Train Split

In [None]:
# Load the images scale the values between 0 and 1
images = np.load('images.npy').astype("float32") / 255
labels = np.load('labels.npy')

# Split into train and test
X_train, X_test, y_train, y_test = train_test_split(images, labels, test_size=0.2, random_state=SEED, stratify=labels)

# Define the label encoder
encoder = LabelEncoder().fit(np.concatenate((y_train, y_test)))

def encode_labels(encoder: LabelEncoder, labels):
    integer_labels = encoder.transform(labels)
    ohe_labels = to_categorical(integer_labels)
    return integer_labels, ohe_labels

# Encode the labels into integeres and one-hot-encoded vectors
y_train_int, y_train_ohe = encode_labels(encoder, y_train)
y_test_int, y_test_ohe = encode_labels(encoder, y_test)

NUM_CLASSES = y_train_ohe.shape[1]

Lets view some general information about the generated input tensors

In [None]:
print(X_train.shape)
print(X_test.shape)

print(y_train_int.shape)
print(y_test_int.shape)

print(y_train_ohe.shape)
print(y_test_ohe.shape)

print(y_train.shape)
print(y_test.shape)

# CNN

Let's first define the architecture of the CNN

In [None]:
def getCNNModel(hp):
    model = Sequential([
        Input(shape=(IMAGE_SIZE, IMAGE_SIZE, 1)),
        
        # Main convolution layer 1
        Conv2D(hp.Int('conv_1_filters', min_value=8, max_value=32, step=8), kernel_size=3, activation='relu'),
        BatchNormalization(),
        MaxPooling2D(2, 2),
        Dropout(hp.Float('dropout_1', min_value=0.0, max_value=0.5, step=0.1)),
        
        # Main convolution layer 2
        Conv2D(hp.Int('conv_2_filters', min_value=16, max_value=64, step=16), kernel_size=3, activation='relu'),
        BatchNormalization(),
        MaxPooling2D(2, 2),
        Dropout(hp.Float('dropout_2', min_value=0.0, max_value=0.5, step=0.1)),
        
        # Main convolution layer 3
        Conv2D(hp.Int('conv_3_filters', min_value=32, max_value=128, step=32), kernel_size=2, activation='relu'),
        MaxPooling2D(2, 2),
        Dropout(hp.Float('dropout_3', min_value=0.0, max_value=0.5, step=0.1)),
        
        # Classification layer
        Flatten(),
        Dense(units=hp.Int('dense_units', min_value=128, max_value=1024, step=128), activation='relu'),
        Dropout(hp.Float('dropout_4', min_value=0.0, max_value=0.7, step=0.1)),
        Dense(units=NUM_CLASSES, activation='softmax')
    ])

    # Compile the model using the Adam optimizer
    model.compile(
        optimizer=Adam(
            hp.Choice('learning_rate', values=[1e-2, 5e-3, 1e-3, 5e-4, 1e-4])),
        loss='categorical_crossentropy',
        metrics=['accuracy'])

    return model

Now we can define the hyperparameter tuner and start the search for the best hyperparameters.

In [None]:
# Define the tuner which optimizes the validation accuracy
tuner = kt.Hyperband(
    getCNNModel,
    objective='val_accuracy',
    max_epochs=15,
    directory='hyperband',
    project_name='cnn_tuning20')

# Early stopping callback which detects bad configurations early
stop_early = EarlyStopping(
    monitor='loss',
    patience=5)

# Checkpoint callback to save the best model to a file for later use
checkpoint = ModelCheckpoint(
    'best_model.h5',
    monitor='val_accuracy',
    verbose=0,
    save_best_only=True)

# Begin the hyperparameter search
tuner.search(
    X_train, y_train_ohe,
    validation_split=0.2,
    callbacks=[stop_early, checkpoint],
    epochs=30, batch_size=128)

Lets have a look at what the best hyperparameters are.

In [None]:
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]
print(f"Best hyperparameters: {best_hps.values}")

To ensure the model is not overfitting, we will perform cross validation on the model.

In [None]:
k = 5
kfold = KFold(n_splits=k, shuffle=True, random_state=SEED)
fold_history = []

for i, (train_idxs, val_idxs) in enumerate(kfold.split(X_train, y_train_ohe)):
    print(f"Fold {i + 1}/{k}")
    
    model = getCNNModel(best_hps)
    
    history = model.fit(
        X_train[train_idxs],
        y_train_ohe[train_idxs],
        validation_data=(X_train[val_idxs], y_train_ohe[val_idxs]),
        epochs=30,
        batch_size=64
    )
    val_acc_per_epoch = history.history['val_accuracy']
    best_epoch = val_acc_per_epoch.index(max(val_acc_per_epoch)) + 1
    print(f'Best epoch: {best_epoch}, Val_accuracy: {max(val_acc_per_epoch)}')
    fold_history.append(history)

Using the data collected from the cross validation, we can plot the accuracy and loss of the model.

In [None]:
df = pd.DataFrame(fold_history[0].history)

for i, history in enumerate(fold_history[1:]):
    df = pd.concat([df, pd.DataFrame(history.history)])

df_long = df.reset_index().melt(id_vars='index', value_vars=['accuracy', 'val_accuracy', 'loss', 'val_loss'])

df_long.rename(columns={'index': 'Epoch', 'variable': 'Metric', 'value': 'Value'}, inplace=True)
df_long['Epoch'] = df_long['Epoch'] + 1

df_accuracy = df_long[df_long['Metric'].isin(['accuracy', 'val_accuracy'])]
df_loss = df_long[df_long['Metric'].isin(['loss', 'val_loss'])]

df_accuracy = df_accuracy.replace({'val_accuracy': 'Validation', 'accuracy': 'Training'})
df_loss = df_loss.replace({'val_loss': 'Validation', 'loss': 'Training'})

fig, axs = plt.subplots(nrows=1, ncols=2, figsize=(10, 5))

sns.lineplot(data=df_accuracy, x='Epoch', y='Value', hue='Metric', errorbar='sd', ax=axs[0])
sns.lineplot(data=df_loss, x='Epoch', y='Value', hue='Metric', errorbar='sd', ax=axs[1])

axs[0].set_ylabel('Accuracy')
axs[1].set_ylabel('Loss')
axs[0].legend(title=None)
axs[1].legend(title=None)

plt.xlabel('Epoch')

plt.savefig(f"{PLOT_DIR}train_val_cnn.png", dpi=2000)
plt.show()

# Feature Extraction

To perform feature extraction, we will use the model we trained in the previous section but we will remove the all the fully connected classification layers to ensure we are only getting the feature vectors.

In [None]:
full_model = load_model('best_model.h5')
feature_model = Model(inputs=full_model.input, outputs=full_model.layers[-4].output)

train_features = feature_model.predict(X_train)
test_features = feature_model.predict(X_test)

Lets see the shapes of the input tensors

In [None]:
print(train_features.shape)
print(test_features.shape)

# Grid Search kNN Hyperparameters

First we need to define the parameter grid we want to search through.


In [None]:
param_grid = {
    'n_neighbors': [*range(5, 20, 1)],
    'weights': ['uniform', 'distance'],
    'metric': ['euclidean', 'manhattan', 'minkowski']
}

Next, we can define the grid search and start the search for the best hyperparameters.

In [None]:
knn = KNeighborsClassifier()

knn_grid_search = GridSearchCV(knn, param_grid, cv=5, scoring='accuracy')
knn_grid_search.fit(train_features, y_train_int)

Let's have a look at what the best hyperparameters found were.

In [None]:
print("Best parameters:", knn_grid_search.best_params_)
print("Best score:", knn_grid_search.best_score_)

# kNN Analysis

For some basic analysis we can calculate the accuracy and f1 score of the kNN model w.r.t the value of k

In [None]:
f1s = []
accuracys = []

for i in range(2, 50, 1):
    knn = KNeighborsClassifier(metric='euclidean', weights='distance', n_neighbors=i, n_jobs=-1)

    knn.fit(train_features, y_train)
    knn_predictions = knn.predict(test_features)
    
    accuracys.append(accuracy_score(y_test, knn_predictions))
    f1s.append(f1_score(y_test, knn_predictions, average='weighted'))
    
    print(f"k={i}, {accuracys[-1]}, {f1s[-1]}")

Let's plot those values

In [None]:
STEP = 1
k_values = range(2, 50, STEP)
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10, 4))

ax1.plot(k_values, f1s[::STEP])
ax1.set_xlabel('k')
ax1.set_ylabel('F1 Score')

ax2.plot(k_values, accuracys[::STEP])
ax2.set_xlabel('k')
ax2.set_ylabel('Accuracy')

plt.tight_layout()
plt.show()

# Grid Search Logistic Regression Hyperparameters

Similarly to how we grid-searched the kNN model, we will grid-search the logistic regression model.

However, in the case of logistic regression, certain solvers dont allow certain other hyperparameters to be adjusted. For example, the `liblinear` solver does not allow the `l1_ratio` hyperparameter to be adjusted. Because of this, we need to define a different parameter grid for each solver. 

In [None]:
param_grid = [
    {
        'solver': ['saga'],
        'penalty': ['elasticnet'],
        'C': [10, 5, 1, 0.5, 0.1],
        'l1_ratio': [0.25, 0.5, 0.75],
        'multi_class': ['multinomial']},
    {
        'solver': ['saga'],
        'penalty': ['l1', 'l2', 'none'],
        'C': [10, 5, 1, 0.5, 0.1],
        'multi_class': ['multinomial']},
    {
        'solver': ['sag', 'lbfgs', 'newton-cg'],
        'penalty': ['l2', 'none'],
        'C': [10, 5, 1, 0.5, 0.1],
        'multi_class': ['multinomial']}
]

Now we can define the grid search and start the search for the best hyperparameters.

In [None]:
logistic_regressor = LogisticRegression(
    class_weight='balanced',
    max_iter=400,
    n_jobs=-1,
    random_state=SEED)

log_grid_search = GridSearchCV(logistic_regressor, param_grid, cv=5, scoring='accuracy')
log_grid_search.fit(train_features, y_train_int)


Let's have a look at what the best hyperparameters found were.

In [None]:
print("Best parameters:", log_grid_search.best_params_)
print("Best score:", log_grid_search.best_score_)

Lets print out a classification report of the predicted values from the logistic regressor.

In [None]:
logistic_regressor = log_grid_search.best_estimator_
log_predictions = logistic_regressor.predict(test_features)
report = classification_report(y_test, encoder.inverse_transform(log_predictions), digits=4)
print(report)

# Logistic Regression Analysis

This is quite intense analysis but essentially, for the logistic regression model, we want measure the accuracy, loss, training time and inference time depending on the size of the training data.

In [None]:
# Ensure test data is not used
train_images, val_images, train_labels, val_labels = train_test_split(train_features, y_train_int, test_size=0.2, random_state=SEED, stratify=y_train_int)

# Define the kfold split
# The train data will be split into 40 equal portions
k = 40
kfold = KFold(n_splits=k, shuffle=True, random_state=SEED)

# Define the optimal classifiers
knn = KNeighborsClassifier(metric='euclidean', n_neighbors=10, weights='distance')
lgr = LogisticRegression(C=5, multi_class='multinomial', penalty='l2', solver='lbfgs')

# Arrays to store values of the metrics
num_samples = []
knn_accuracies = []
lgr_accuracies = []
knn_losses = []
lgr_losses = []
knn_training_times = []
lgr_training_times = []
knn_inference_times = []
lgr_inference_times = []
fitted = None

# Really hacky and stupid way to train the models on increasingly more portions of the dataset. There must be a better way :(
for i, (useless_idxs, idxs) in enumerate(kfold.split(train_images, train_labels)):
    print(f"Portion {i + 1}/{k}")
    
    # Add portion to array which store all portions that the models have already been trained on
    if not fitted:
        fitted = train_images[idxs], train_labels[idxs]
    else:
        fitted = np.concatenate((fitted[0], train_images[idxs])), np.concatenate((fitted[1], train_labels[idxs]))
    
    num_samples.append(len(fitted[0]))
    
    # Get the training times
    start_time = time.time()
    knn.fit(fitted[0], fitted[1])
    end_time = time.time()
    knn_training_times.append(end_time - start_time)
    
    start_time = time.time()
    lgr.fit(fitted[0], fitted[1])
    end_time = time.time()
    lgr_training_times.append(end_time - start_time)
    
    # Get the inference times
    start_time = time.time()
    knn.predict(val_images)
    end_time = time.time()
    knn_inference_times.append(end_time - start_time)
    
    start_time = time.time()
    lgr.predict(val_images)
    end_time = time.time()
    lgr_inference_times.append(end_time - start_time)
    
    # Get the cross-entropy loss for each model
    knn_probs = knn.predict_proba(val_images)
    log_reg_probs = lgr.predict_proba(val_images)
    knn_loss = log_loss(val_labels, knn_probs)
    lgr_loss = log_loss(val_labels, log_reg_probs)
    knn_losses.append(knn_loss)
    lgr_losses.append(lgr_loss)
    
    # Get the accuracies
    knn_accuracy = knn.score(val_images, val_labels)
    lgr_accuracy = lgr.score(val_images, val_labels)
    
    knn_accuracies.append(knn_accuracy)
    lgr_accuracies.append(lgr_accuracy)


Now we can plot the results of the analysis on individual subplots.

In [None]:
fig, axs = plt.subplots(2, 2, figsize=(9, 9))

# Plot the accuracy graph
axs[0, 0].plot(np.array(num_samples) / 1000, knn_accuracies)
axs[0, 0].plot(np.array(num_samples) / 1000, lgr_accuracies)
axs[0, 0].set_title('Validation Accuracy')
axs[0, 0].set_xlabel('Number of Samples (1000s)')
axs[0, 0].set_ylabel('Accuracy')
axs[0, 0].set_ylim((0.55, 1))
axs[0, 0].legend(['kNN', 'Logistic Regression'], loc='upper left')

# Plot the loss graph
axs[0, 1].plot(np.array(num_samples) / 1000, knn_losses)
axs[0, 1].plot(np.array(num_samples) / 1000, lgr_losses)
axs[0, 1].set_title('Cross-entropy (Logistic) Loss')
axs[0, 1].set_xlabel('Number of Samples (1000s)')
axs[0, 1].set_ylabel('Loss')
axs[0, 1].legend(['kNN', 'Logistic Regression'])

# Plot the training time graph
axs[1, 0].plot(np.array(num_samples) / 1000, np.log10(np.array(knn_training_times)))
axs[1, 0].plot(np.array(num_samples) / 1000, np.log10(np.array(lgr_training_times)))
axs[1, 0].set_title('Training Time')
axs[1, 0].set_xlabel('Number of Samples (1000s)')
axs[1, 0].set_ylabel('Training Time (s, log scale)')
axs[1, 0].set_ylim((-3.4, 2.5))
axs[1, 0].legend(['kNN', 'Logistic Regression'])

# Plot the inference time graph
axs[1, 1].plot(np.array(num_samples) / 1000, np.log10(np.array(knn_inference_times)))
axs[1, 1].plot(np.array(num_samples) / 1000, np.log10(np.array(lgr_inference_times)))
axs[1, 1].set_title('Inference Time')
axs[1, 1].set_xlabel('Number of Samples (1000s)')
axs[1, 1].set_ylabel('Inference Time (s, log scale)')
axs[1, 1].set_ylim((-1.5, 1))
axs[1, 1].legend(['kNN', 'Logistic Regression'])

# Show and save the plot
plt.tight_layout()
plt.savefig(f"{PLOT_DIR}model_eval.svg")
plt.show()