In [None]:
import os

import time

import numpy as np
import matplotlib.pyplot as plt

import tensorflow as tf

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense

In [None]:
my_seed = 42
split_proportion = 0.1
batch_size = 4
number_epochs = 8
lr = 3e-5

figure_count = 0
figure_dir = os.path.join("..", "assets")

train_new = False

print(f"Number of GPUs Available: {len(tf.config.list_physical_devices('GPU'))}")

In [None]:
## download the dataset if you are working on colab
#! kaggle datasets download -d ardamavi/27-class-sign-language-dataset

In [None]:

input_dir = os.path.join("..", "input", "27-class-sign-language-dataset")

x_filename = os.path.join(input_dir, "X.npy")
y_filename = os.path.join(input_dir, "Y.npy")

x = np.load(x_filename)
y = np.load(y_filename)

# shuffle and split the data
split_number = int(split_proportion * x.shape[0])

np.random.seed(my_seed)
np.random.shuffle(x)
val_x = tf.convert_to_tensor(x[:split_number])
test_x = tf.convert_to_tensor(x[split_number:2*split_number])
train_x = tf.convert_to_tensor(x[2*split_number:])

np.random.seed(my_seed)
np.random.shuffle(y)
val_y_labels = tf.convert_to_tensor(y[:split_number])
test_y_labels = tf.convert_to_tensor(y[split_number:2*split_number])
train_y_labels = tf.convert_to_tensor(y[2*split_number:])

In [None]:
np.save(os.path.join(input_dir, "mini_X.npy"), x[:4096])
np.save(os.path.join(input_dir, "mini_y.npy"), y[:4096])

In [None]:
# visualize images with labels

fig, ax = plt.subplots(3,3, figsize=(8,8))
for count, x_index in enumerate(np.random.randint(0, train_x.shape[0], size=(9,))):

    cx = count // 3
    cy = count % 3
    ax[cx,cy].imshow(train_x[x_index])
    ax[cx,cy].set_title(f"label: {train_y_labels[x_index]}")
    ax[cx,cy].set_yticklabels("")
    ax[cx,cy].set_xticklabels("")
    
plt.savefig(os.path.join(figure_dir, f"figure_{figure_count}.png"))
figure_count += 1
plt.tight_layout()
plt.show()

In [None]:
label_dict = {}

for number, label in enumerate(np.unique(train_y_labels)):
    label_dict[number] = label
    
print(label_dict, x.shape)

In [None]:
reverse_label_dict = {}
for key in label_dict.keys():
    reverse_label_dict[label_dict[key]] = key
    
print(reverse_label_dict)

In [None]:
np_train_y = np.zeros_like(train_y_labels)
np_val_y = np.zeros_like(val_y_labels)
np_test_y = np.zeros_like(test_y_labels)

for ii in range(np_train_y.shape[0]):
    np_train_y[ii] = reverse_label_dict[train_y_labels[ii].numpy()[0]]
    
for ii in range(np_val_y.shape[0]):
    np_val_y[ii] = reverse_label_dict[val_y_labels[ii].numpy()[0]]
    
for ii in range(np_test_y.shape[0]):
    np_test_y[ii] = reverse_label_dict[test_y_labels[ii].numpy()[0]]
    
train_y = tf.convert_to_tensor(np_train_y.reshape(-1), dtype=tf.int32)
val_y = tf.convert_to_tensor(np_val_y.reshape(-1), dtype=tf.int32)
test_y = tf.convert_to_tensor(np_test_y.reshape(-1), dtype=tf.int32)

In [None]:
# visualize images with labels

fig, ax = plt.subplots(3,3, figsize=(8,8))
for count, x_index in enumerate(np.random.randint(0, val_x.shape[0], size=(9,))):

    cx = count // 3
    cy = count % 3
    idx = val_y[x_index]
    ax[cx,cy].imshow(val_x[x_index])
    ax[cx,cy].set_title(f"label index: \n {idx} = {label_dict[idx.numpy()]}")
    ax[cx,cy].set_yticklabels("")
    ax[cx,cy].set_xticklabels("")
    
plt.tight_layout()

plt.savefig(os.path.join(figure_dir, f"figure_{figure_count}.png"))
figure_count += 1

plt.show()

In [None]:
number_classes = len(label_dict.keys())

extractor = tf.keras.applications.MobileNet(\
    input_shape=train_x.shape[1:], include_top=False,weights="imagenet")
    

extractor.trainable = True


model = Sequential([extractor, \
        tf.keras.layers.Flatten(),\
        tf.keras.layers.Dropout(0.25),\
        Dense(32, activation="relu"),\
        Dense(32, activation="relu"),\
        Dense(number_classes, activation="softmax")])

#model.build([None, 128, 128, 3])


_ = model(train_x[0:1])
model.summary()

In [None]:
model.compile(optimizer = 'adam',\
              loss=tf.keras.losses.SparseCategoricalCrossentropy(),
              metrics = ['accuracy']
             )


model.summary()

In [None]:
def make_scheduler(my_lr):
    
    def scheduler(epoch, lr):
        if epoch <= 1:
            return my_lr / 10.
        elif epoch == 2:
            return my_lr * 10.
        else:
            return lr * 0.9 
    
    return scheduler

tensorboard_callback = tf.keras.callbacks.TensorBoard(\
    log_dir="logs", \
    write_graph=True, \
    update_freq='epoch', \
)

scheduler = make_scheduler(lr)
lr_scheduler_callback = tf.keras.callbacks.LearningRateScheduler(scheduler)

In [None]:
save_model_dir = os.path.join("..", "models", "mobilenet_sign")
tf_lite_model_filename = os.path.join("..", "models", "tflite_mobilenet.tflite")

if train_new:
    history = model.fit(x=train_x, y=train_y, validation_data=(val_x, val_y), \
        batch_size=batch_size, epochs=number_epochs, \
        callbacks=[tensorboard_callback, lr_scheduler_callback])
    
    model.save(save_model_dir)
else:
    model = tf.keras.models.load_model(save_model_dir)

In [None]:
if (1):
    # save the keras model in directory save_model_dir 
    # then convert the SavedModel directory to a TF Lite model
    converter = tf.lite.TFLiteConverter.from_saved_model(save_model_dir)
    tf_lite_mobilenet = converter.convert()
else:
    # alternatively, convert from the keras model without saving first
    convert = tf.lit.TFLiteConvert.from_keras_model(model)
    tf_lite_mobilenet = converter.convert()  

# Save the model.
with open(tf_lite_model_filename, "wb") as f:
    f.write(tf_lite_mobilenet)

In [None]:
interpreter = tf.lite.Interpreter(model_path=tf_lite_model_filename)
my_signature = interpreter.get_signature_runner()

for jj in range(4):
    
    my_index = np.random.randint(0, val_x.shape[0])
    
    my_batch = val_x[my_index:my_index+1]
    
    t0 = time.time()
    output_data = my_signature(**{"mobilenet_1.00_128_input": my_batch})["dense_2"]
    t1 = time.time()
    full_output_data = model(my_batch)
    t2 = time.time()

    fig, ax = plt.subplots(1,2, figsize=(8,5))
    
    ax[0].imshow(my_batch[0].numpy())
    ax[0].set_title(f"TF Lite pred. ({(t1-t0):.1e} s) \n label: {label_dict[output_data.argmax()]}")
    ax[1].imshow(my_batch[0].numpy())
    ax[1].set_title(f"TF keras pred. ({(t2-t1):.1e} s)\n label: {label_dict[full_output_data.numpy().argmax()]}")
    
    ax[0].set_yticklabels("")
    ax[0].set_xticklabels("")
    fig.suptitle(f"True label: {label_dict[val_y[my_index].numpy()]}", fontsize=18)
    plt.tight_layout()
    
    plt.savefig(os.path.join(figure_dir, f"figure_{figure_count}.png"))
    figure_count += 1
    
    plt.show()
    

In [None]:
def representative_dataset():
    
    for data in tf.data.Dataset.from_tensor_slices((val_x)).batch(1).take(100):
        yield [tf.dtypes.cast(data, tf.float32)]
        
converter = tf.lite.TFLiteConverter.from_saved_model(save_model_dir)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_dataset

tf_lite_mobilenet_quantized = converter.convert()
tf_lite_quant_filename = os.path.join("..", "models", "tflite_mobilenet_quant.tflite")

# Save the model.
with open(tf_lite_quant_filename, "wb") as f:
    f.write(tf_lite_mobilenet_quantized)

In [None]:
quant_interpreter = tf.lite.Interpreter(model_path=tf_lite_quant_filename)
quant_signature = quant_interpreter.get_signature_runner()

for jj in range(4):
    
    my_index = np.random.randint(0, val_x.shape[0])
    
    my_batch = val_x[my_index:my_index+1]
    
    t0 = time.time()
    quant_output_data = quant_signature(**{"mobilenet_1.00_128_input": my_batch})["dense_2"]
    t1 = time.time()
    full_output_data = model(my_batch)
    t2 = time.time()

    fig, ax = plt.subplots(1,2, figsize=(8,5))
    
    ax[0].imshow(my_batch[0].numpy())
    ax[0].set_title(f"Quant. Lite pred. ({(t1-t0):.1e} s) \n label: {label_dict[quant_output_data.argmax()]}")
    ax[1].imshow(my_batch[0].numpy())
    ax[1].set_title(f"TF keras pred. ({(t2-t1):.1e} s)\n label: {label_dict[full_output_data.numpy().argmax()]}")
    
    ax[0].set_yticklabels("")
    ax[0].set_xticklabels("")
    fig.suptitle(f"True label: {label_dict[val_y[my_index].numpy()]}", fontsize=18)
    plt.tight_layout()
    
    plt.savefig(os.path.join(figure_dir, "figure_{figure_count}.png"))
    figure_count += 1
    plt.show()
    

In [None]:
with tf.device('/CPU:0'):

    t3 = time.time()

    for my_index in range(val_x.shape[0]):

        my_batch = val_x[my_index:my_index+1]
        full_output_data = model(my_batch)

    t4 = time.time()
    print("finished with full keras model")

    for my_index in range(val_x.shape[0]):

        my_batch = val_x[my_index:my_index+1]
        output_data = my_signature(**{"mobilenet_1.00_128_input": my_batch})["dense_2"]

    t5 = time.time()
    print("finished with TF Lite model")

    for my_index in range(val_x.shape[0]):

        my_batch = val_x[my_index:my_index+1]
        quant_output_data = quant_signature(**{"mobilenet_1.00_128_input": my_batch})["dense_2"]

    t6 = time.time()
    print("finished with quantized TF Lite model")

msg = f"time elapsed {val_x.shape[0]} samples \n\t keras: {t4-t3:.2f} s \n\t TF Lite: {t5-t4:.2f} s"
msg += f" \n\t Quantized TF Lite: {t6-t5:.3f} s"
print(msg)
"""
finished with full keras model
finished with TF Lite model
finished with quantized TF Lite model
time elapsed 2280 samples 
	 keras: 77.63 s 
	 TF Lite: 13.34 s 
	 Quantized TF Lite: 21.718 s
"""

In [None]:
correct_keras = 0
correct_lite = 0
correct_quant = 0
total_samples = val_x.shape[0]

for my_index in range(val_x.shape[0]):

    my_batch = val_x[my_index:my_index+1]
    
    full_output_data = model(my_batch)
    output_data = my_signature(**{"mobilenet_1.00_128_input": my_batch})["dense_2"]
    
    quant_output_data = quant_signature(**{"mobilenet_1.00_128_input": my_batch})["dense_2"]
                        
    
    true_label = val_y[my_index].numpy()
    
    correct_keras += 1.0 * (full_output_data.numpy().argmax() == true_label)
    correct_lite += 1.0 * (output_data.argmax() == true_label)
    correct_quant += 1.0 * (quant_output_data.argmax() == true_label)
    
accuracy_quant = correct_quant / total_samples
accuracy_lite = correct_lite / total_samples
accuracy_keras = correct_keras / total_samples

msg = f"Validation accuracies "
msg += f"\n\t keras {accuracy_keras:.4f}"
msg += f"\n\t TF Lite {accuracy_lite:.4f}"
msg += f"\n\t Quantized TF Lite {accuracy_quant:.4f}" 

print(msg)
"""
accuracies 
	 keras 0.9851
	 TF Lite 0.9851
	 Quantized TF Lite 0.9855
"""

In [None]:
correct_keras = 0
correct_lite = 0
correct_quant = 0
total_samples = test_x.shape[0]

for my_index in range(test_x.shape[0]):

    my_batch = test_x[my_index:my_index+1]
    
    full_output_data = model(my_batch)
    output_data = my_signature(**{"mobilenet_1.00_128_input": my_batch})["dense_2"]
    
    quant_output_data = quant_signature(**{"mobilenet_1.00_128_input": my_batch})["dense_2"]
                        
    
    true_label = test_y[my_index].numpy()
    
    correct_keras += 1.0 * (full_output_data.numpy().argmax() == true_label)
    correct_lite += 1.0 * (output_data.argmax() == true_label)
    correct_quant += 1.0 * (quant_output_data.argmax() == true_label)
    
accuracy_quant = correct_quant / total_samples
accuracy_lite = correct_lite / total_samples
accuracy_keras = correct_keras / total_samples

msg = f"Test accuracies "
msg += f"\n\t keras {accuracy_keras:.4f}"
msg += f"\n\t TF Lite {accuracy_lite:.4f}"
msg += f"\n\t Quantized TF Lite {accuracy_quant:.4f}" 

print(msg)
"""
Test accuracies 
	 keras 0.9882
	 TF Lite 0.9882
	 Quantized TF Lite 0.9877
"""

In [None]:
pi_test = True

if pi_test:    
    
    # load dataset (a validation split of)
    # CC-BY-NC-SA Mavi and Dikle and Turkey Ankara Ayrancı Anadolu High School
    # https://www.kaggle.com/datasets/ardamavi/27-class-sign-language-dataset
    # associated arXiv manuscript 2203.03859
    
    dim_x, dim_y = 128, 128
    data_dir = os.path.join("..", "input", "27-class-sign-language-dataset")
    
    x_filename = os.path.join(data_dir, "val_x.npy")
    y_filename = os.path.join(data_dir, "val_y.npy")
    
    pi_x = np.load(x_filename)
    pi_y = np.load(y_filename)
    
    # load the Keras, TF Lite, and TF Lite quantized model
    
    save_model_dir = os.path.join("..", "models", "mobilenet_sign")
    tf_lite_model_filename = os.path.join("..", "models", "tflite_mobilenet.tflite")
    tf_lite_quant_filename = os.path.join("..", "models", "tflite_mobilenet_quant.tflite")
  
    model = tf.keras.models.load_model(save_model_dir)

    interpreter = tf.lite.Interpreter(model_path=tf_lite_model_filename)
    quant_interpreter = tf.lite.Interpreter(model_path=tf_lite_quant_filename)
    
    tf_lite_signature = interpreter.get_signature_runner()
    quant_signature = quant_interpreter.get_signature_runner()
    
    correct_keras = 0
    correct_lite = 0
    correct_quant = 0
    
    t_quant = 0.
    t_lite = 0.
    t_keras = 0.
    
    for my_index in range(pi_x.shape[0]):

        my_batch = pi_x[my_index:my_index+1]

        t0_tf_lite = time.time()
        output_data = my_signature(**{"mobilenet_1.00_128_input": my_batch})["dense_2"]
        t1_tf_lite = time.time()
        
        t0_keras = time.time()
        full_output_data = model(my_batch)
        t1_keras = time.time()
        
        t0_quant = time.time()
        quant_output_data = quant_signature(**{"mobilenet_1.00_128_input": my_batch})["dense_2"]
        t1_quant = time.time()

        true_label = pi_y[my_index]#.numpy()

        correct_keras += 1.0 * (full_output_data.numpy().argmax() == true_label)
        correct_lite += 1.0 * (output_data.argmax() == true_label)
        correct_quant += 1.0 * (quant_output_data.argmax() == true_label)
        
        t_quant += t1_quant - t0_quant
        t_keras += t1_keras - t0_keras
        t_lite += t1_tf_lite - t0_tf_lite
        


    samples_seen = pi_x.shape[0]

    quant_accuracy = correct_quant / samples_seen
    keras_accuracy = correct_keras / samples_seen
    lite_accuracy = correct_lite / samples_seen

    avg_t_quant = t_quant / samples_seen
    avg_t_keras = t_keras / samples_seen
    avg_t_lite = t_lite / samples_seen

    msg = f"\n quant \n\t avg. inference time = {avg_t_quant:.3e} \n\t accuracy {quant_accuracy}"
    msg += f"\n tf lite \n\t avg. inference time = {avg_t_lite:.3e} \n\t accuracy {lite_accuracy}"
    msg += f"\n keras \n\t avg. inference time = {avg_t_keras:.3e} \n\t accuracy {keras_accuracy}"

    print(msg)
    """
    # on desktop:
    quant 
        avg. inference time = 9.547e-03 
        accuracy 0.9855263157894737
    tf lite 
        avg. inference time = 5.612e-03 
        accuracy 0.9850877192982456
    keras 
        avg. inference time = 5.612e-03 
        accuracy 0.9850877192982456
    """
