## Imports

In [None]:
!pip install -q tensorflow-addons tensorflow-hub

In [None]:
from tensorflow import keras

import tensorflow as tf
import tensorflow_hub as hub
import tensorflow_addons as tfa

from sklearn.model_selection import train_test_split


import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

import re



from utils import calculate_mean, calculate_std, pad_image

## GPUs

In [None]:
# try:  # detect TPUs
#     tpu = tf.distribute.cluster_resolver.TPUClusterResolver.connect()  # TPU detection
#     strategy = tf.distribute.TPUStrategy(tpu)
# except ValueError:  # detect GPUs
#     tpu = False
#     strategy = (
#         tf.distribute.get_strategy()
#     )  # default strategy that works on CPU and single GPU
# print("Number of Accelerators: ", strategy.num_replicas_in_sync)

## Configuration

Find the list of all fine-tunable models [here](https://tfhub.dev/sayakpaul/collections/cait/1).

In [None]:
# Model
IMAGE_SIZE = [224, 224]
MODEL_PATH = "https://tfhub.dev/sayakpaul/cait_s24_224_fe/1"

# TPU
# if tpu:
#     BATCH_SIZE = (
#         16 * strategy.num_replicas_in_sync
#     )  # a TPU has 8 cores so this will be 128
# else:
BATCH_SIZE = 64  # on Colab/GPU, a higher batch size may throw(OOM)

# Dataset
CLASS_0 = ['A', 'AA', 'AB', 'AD', 'AE', 'AF', 'AG', 'B']
CLASS_1 = [f'{i}' for i in range(10)]
CLASS_2 = [f'{i}' for i in range(10)]
CLASS_2.append('[PAD]')
CLASS_3 = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '[PAD]']


# Other constants

MEAN = tf.constant(calculate_mean('train'))  # bdc_train mean
STD = tf.constant(calculate_std('train'))  # bdc_train std
AUTO = tf.data.AUTOTUNE

# Data Pipeline

[CaiT authors](https://arxiv.org/abs/2103.17239) use a separate preprocessing pipeline for fine-tuning. But for keeping this walkthrough short and simple, we can just perform the basic ones.

In [None]:
def make_dataset(dataset: tf.data.Dataset, train: bool, image_size: int = IMAGE_SIZE):
    def preprocess(image, label):
        # for training, do augmentation
        if train:
            if tf.random.uniform(shape=[]) > 0.5:
                image = tf.image.flip_left_right(image)
        image = tf.image.resize(image, size=image_size, method="bicubic")
        image = (image - MEAN) / STD  # normalization
        return image, label

    if train:
        dataset = dataset.shuffle(BATCH_SIZE * 10)

    return dataset.map(preprocess, AUTO).batch(BATCH_SIZE).prefetch(AUTO)

# BDC-2023 Dataset

In [None]:
df_train = pd.read_csv('train/DataTrain.csv', delimiter=';')
df_train = df_train.drop(['Unnamed: 0'], axis=1)
df_train = df_train.drop([126, 457, 600]) # delete the wrong labeled data
df_train.head()

## Prepare the image data

In [None]:
path_train = df_train['NameofFile'].to_numpy()
folder = 'train'
img_list = [pad_image(f'{folder}/{pth}') for pth in path_train]
img_list = np.asarray(img_list)

## Prepare the label data

In [None]:
plate_train = df_train['Vehicleregistrationplate'].to_numpy()
classes = [[], [], [], [], [], [], [], []]


for string in plate_train:
    match = re.match(r'([A-Za-z]*)(\d*)([A-Za-z]*)', string)
    if match:
        alphabet_before_numeric = match.group(1)
        numeric_value = match.group(2)
        remaining_alphabet = match.group(3)
        classes[0].append(CLASS_0.index(alphabet_before_numeric))
        tmp_num = list(numeric_value)
        if len(tmp_num) < 4:
            for i in range(4-len(tmp_num)):
                tmp_num.append("[PAD]")
        for i in range(1, 5):
            if i == 1:
                classes[i].append(CLASS_1.index(tmp_num[i-1]))
            else:
                classes[i].append(CLASS_2.index(tmp_num[i-1]))
        tmp_rem = list(remaining_alphabet)
        if len(tmp_rem) < 3:
            for i in range(3-len(tmp_rem)):
                tmp_rem.append("[PAD]")
        for i in range(5, 8):
            classes[i].append(CLASS_3.index(tmp_rem[i-5]))
        
label_list = np.asarray(classes)


In [None]:
label_list.shape

In [None]:
# tmp data preparation
mean = calculate_mean('train')*255
std = calculate_std('train')*255
img_list = (img_list - mean) / std



In [None]:
std

In [None]:
mean

In [None]:
np.max(img_list)

In [None]:
# image_array = np.asarray(img_list)
# train_image = image_array[:540]
# val_image = image_array[540:]
# # Convert the image array to TensorFlow dataset
# train_image_dataset = tf.data.Dataset.from_tensor_slices(train_image)
# val_image_dataset = tf.data.Dataset.from_tensor_slices(val_image)

# # Convert the list of labels to NumPy array
# label_array = np.asarray(label_list)
# train_label = label_array[:,:540]
# val_label = label_array[:,540:]
# # Convert the label array to TensorFlow dataset
# train_label_dataset = tf.data.Dataset.from_tensor_slices(train_label)
# val_label_dataset = tf.data.Dataset.from_tensor_slices(val_label)

# # Combine the image and label datasets into a single dataset
# train_dataset = tf.data.Dataset.zip((train_image_dataset, train_label_dataset))
# val_dataset = tf.data.Dataset.zip((val_image_dataset, val_label_dataset))

In [None]:
val_label_dataset.element_spec

In [None]:
num_train = tf.data.experimental.cardinality(train_dataset)
num_val = tf.data.experimental.cardinality(val_dataset)
print(f"Number of training examples: {num_train}")
print(f"Number of validation examples: {num_val}")

## Prepare dataset

In [None]:
train_dataset = make_dataset(train_dataset, True)
val_dataset = make_dataset(val_dataset, False)

## Visualize

In [None]:
sample_images, sample_labels = next(iter(train_dataset))

plt.figure(figsize=(5 * 3, 3 * 3))
for n in range(15):
    ax = plt.subplot(3, 5, n + 1)
    image = (sample_images[n] * STD + MEAN).numpy()
    image = (image - image.min()) / (
        image.max() - image.min()
    )  # convert to [0, 1] for avoiding matplotlib warning
    plt.imshow(image)
    plt.title(CLASSES[sample_labels[n]])
    plt.axis("off")
plt.tight_layout()
plt.show()

# Model Utility

In [None]:
def number_output(x):
    out_num_0 = tf.keras.layers.Dense(10, activation="softmax", name='outputs_num_0')(x)
    out_num_1 = tf.keras.layers.Dense(11, activation="softmax", name='outputs_num_1')(x)
    out_num_2 = tf.keras.layers.Dense(11, activation="softmax", name='outputs_num_2')(x)
    out_num_3 = tf.keras.layers.Dense(11, activation="softmax", name='outputs_num_3')(x)
    return out_num_0, out_num_1, out_num_2, out_num_3

def char_output(x):
    out_char_0 = tf.keras.layers.Dense(27, activation="softmax", name='outputs_char_0')(x)
    out_char_1 = tf.keras.layers.Dense(27, activation="softmax", name='outputs_char_1')(x)
    out_char_2 = tf.keras.layers.Dense(27, activation="softmax", name='outputs_char_2')(x)
    return out_char_0, out_char_1, out_char_2


def get_model(
    model_url: str, res: int = IMAGE_SIZE[0], num_classes: int = 5
) -> tf.keras.Model:
    inputs = tf.keras.Input((res, res, 3))
    hub_module = hub.KerasLayer(model_url, trainable=False)

    x, _, _ = hub_module(inputs)  # Second and third outputs in the tuple is a
    # dictionary containing attention scores.
    x = tf.keras.layers.Dense(128, activation='relu')(x)
    out = tf.keras.layers.Dense(8, activation="softmax", name='outputs_code_area')(x)
    out_num_0, out_num_1, out_num_2, out_num_3 = number_output(x)
    out_char_0, out_char_1, out_char_2 = char_output(x)
    outputs = [out, out_num_0, out_num_1, out_num_2,
               out_num_3,  out_char_0, out_char_1, out_char_2]
    return tf.keras.Model(inputs, outputs)

In [None]:
model = get_model(MODEL_PATH)
model.summary()

# Training Hyperparameters

In [None]:
EPOCHS = 10

start_lr = 0.00001
min_lr = 0.00001
max_lr = 0.0002
rampup_epochs = 5
sustain_epochs = 0
exp_decay = 0.8


def lrfn(epoch):
    def lr(epoch, start_lr, min_lr, max_lr, rampup_epochs, sustain_epochs, exp_decay):
        if epoch < rampup_epochs:
            lr = (max_lr - start_lr) / rampup_epochs * epoch + start_lr
        elif epoch < rampup_epochs + sustain_epochs:
            lr = max_lr
        else:
            lr = (max_lr - min_lr) * exp_decay ** (
                epoch - rampup_epochs - sustain_epochs
            ) + min_lr
        return lr

    return lr(epoch, start_lr, min_lr, max_lr, rampup_epochs, sustain_epochs, exp_decay)


lr_callback = tf.keras.callbacks.LearningRateScheduler(
    lambda epoch: lrfn(epoch), verbose=True
)

rng = [i for i in range(EPOCHS)]
y = [lrfn(x) for x in rng]
plt.plot(rng, [lrfn(x) for x in rng])
plt.show()

In [None]:
def bdc_loss(y_true, y_pred):
    num_outputs = len(y_true)
    losses = []
    loss_names = []
    for i in range(num_outputs):
        loss = tf.keras.losses.sparse_categorical_crossentropy(y_true[i], y_pred[i])
        losses.append(loss)
        loss_name = f"loss_{i}"
        loss_names.append(loss_name)

    # Average the losses
    total_loss = tf.reduce_mean(losses, name="total_loss")

    # Add loss names as additional metrics
    for i, loss_name in enumerate(loss_names):
        tf.keras.metrics.Mean(name=loss_name)(losses[i])

    return total_loss

In [None]:
def character_accuracy_tf(y_true, y_pred):
    """
    Calculate the character-based accuracy using TensorFlow.

    Parameters:
        y_true (tensorflow.Tensor): Ground truth labels.
        y_pred (tensorflow.Tensor): Predicted labels.

    Returns:
        tensorflow.Tensor: Character-based accuracy.

    """
    y_true_flat = tf.reshape(y_true, [-1])
    y_pred_flat = tf.reshape(y_pred, [-1])

    accuracy = tf.reduce_mean(tf.cast(tf.equal(y_true_flat, y_pred_flat), tf.float32))
    return accuracy

In [None]:
metrics = {'outputs_code_area':tf.keras.metrics.CategoricalAccuracy(),
           'outputs_num_0':tf.keras.metrics.CategoricalAccuracy(),
           'outputs_num_1':tf.keras.metrics.CategoricalAccuracy(),
           'outputs_num_2':tf.keras.metrics.CategoricalAccuracy(),
           'outputs_num_3':tf.keras.metrics.CategoricalAccuracy(),
           'outputs_char_0':tf.keras.metrics.CategoricalAccuracy(),
           'outputs_char_1':tf.keras.metrics.CategoricalAccuracy(),
           'outputs_char_2':tf.keras.metrics.CategoricalAccuracy(),
           }
optimizer = tfa.optimizers.AdamW(weight_decay=1e-5)
loss = [tf.keras.losses.SparseCategoricalCrossentropy() for i in range(8)]
loss_weight = [0.125 for i in range(8)]

# Training & Validation

In [None]:
# with strategy.scope():  # this line is all that is needed to run on TPU (or multi-GPU, ...)
# model = get_model(MODEL_PATH)
model.compile(loss=loss, optimizer=optimizer, metrics=metrics, loss_weights=loss_weight)

# history = model.fit(
#     train_dataset, validation_data=val_dataset, epochs=EPOCHS, callbacks=[lr_callback]
# )
label = [l for l in label_list]
history = model.fit(
    img_list, label, validation_split=0.2, batch_size=BATCH_SIZE, epochs=EPOCHS, callbacks=[lr_callback]
)

In [None]:
import pandas as pd

result = pd.DataFrame(history.history)
fig, ax = plt.subplots(2, 1, figsize=(10, 10))
result[["accuracy", "val_accuracy"]].plot(xlabel="epoch", ylabel="score", ax=ax[0])
result[["loss", "val_loss"]].plot(xlabel="epoch", ylabel="score", ax=ax[1])

# Predictions

In [None]:
sample_images, sample_labels = next(iter(val_dataset))

predictions = model.predict(sample_images, batch_size=16).argmax(axis=-1)
evaluations = model.evaluate(sample_images, sample_labels, batch_size=16)

print("[val_loss, val_acc]", evaluations)

In [None]:
plt.figure(figsize=(5 * 3, 3 * 3))
for n in range(15):
    ax = plt.subplot(3, 5, n + 1)
    image = (sample_images[n] * STD + MEAN).numpy()
    image = (image - image.min()) / (
        image.max() - image.min()
    )  # convert to [0, 1] for avoiding matplotlib warning
    plt.imshow(image)
    target = CLASSES[sample_labels[n]]
    pred = CLASSES[predictions[n]]
    plt.title("{} ({})".format(target, pred))
    plt.axis("off")
plt.tight_layout()
plt.show()