In [None]:
# from numpy import loadtxt
# from tensorflow.keras.models import Sequential
# from tensorflow.keras.layers import Rescaling, Dense, Conv2D, BatchNormalization, Dropout, MaxPooling2D, ReLU, AveragePooling2D, Flatten, ZeroPadding2D, DepthwiseConv2D, SeparableConv2D
# from tensorflow.keras import losses, optimizers
# from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler, ReduceLROnPlateau, EarlyStopping
from tensorflow.keras.models import Sequential, save_model, load_model

from tensorflow import keras
from tensorflow.keras import layers

from sklearn.metrics import classification_report,confusion_matrix
import tensorflow as tf
import tensorflow_addons as tfa

import matplotlib.pyplot as plt
import os
import numpy as np

from functools import partial
from albumentations import (
    Compose, RandomBrightness, JpegCompression, HueSaturationValue, RandomContrast, HorizontalFlip,
    Rotate, Cutout, VerticalFlip, ShiftScaleRotate, RandomCrop, Normalize
)

In [None]:
# !pip install -U tensorflow-addons
# !pip install -q -U albumentations
# !echo "$(pip freeze | grep albumentations) is successfully installed"

# Open CV Installation
# !pip list | grep opencv
# ! pip uninstall opencv-python-headless
# ! pip install opencv-python-headless==4.1.2.30

In [None]:
tf.__version__

**Hyper Parameters**

In [None]:
num_classes = 10
input_shape = (32, 32, 3)

image_size = (224, 224)
batch_size = 256
epochs = 200

learning_rate = 0.001
weight_decay = 0.0001
# image_size = 72  # We'll resize input images to this size
patch_size = 4  # Size of the patches to be extract from the input images
num_patches = (image_size[0] // patch_size) ** 2
projection_dim = 64
num_heads = 4
transformer_units = [
    projection_dim * 2,
    projection_dim,
]  # Size of the transformer layers
transformer_layers = 8
mlp_head_units = [2048, 1024]  # Size of the dense layers of the final classifier

In [None]:
from google.colab import drive 
drive.mount('/content/drive/')

# !ls "/content/drive/MyDrive/Colab Notebooks/Masters"

In [None]:
# base_dir = '/media/ext_mount/Project/BuildingCNN'
base_dir = '/content/drive/MyDrive/Colab Notebooks/Masters'

raw_dataset_directory = f"{base_dir}/Dataset/GC10-DET/images"
dataset_directory = f'{base_dir}/Dataset/GC10-DET/dataset'

train_set_directory = f'{dataset_directory}/train'
test_set_directory = f'{dataset_directory}/test'

model_directory = f'{base_dir}/models/vision_transformer'

**Preparing Dataset**

In [None]:
# !unzip "/content/drive/My Drive/Colab Notebooks/Masters/GC10-DET.zip" -d "/content/drive/My Drive/Colab Notebooks/Masters/GC10-DET"
# !pip install split-folders 

In [None]:
# import splitfolders
# splitfolders.ratio(raw_dataset_directory, output=dataset_directory, seed=555, ratio=(.9, .1), group_prefix=None)

In [None]:
train_ds_batch = tf.keras.preprocessing.image_dataset_from_directory(train_set_directory,
    validation_split=0.1,
    subset="training",
    seed=555,
    image_size=image_size,
    batch_size=None, #batch_size
)
# train_ds = train_ds_batch.prefetch(buffer_size=batch_size)

In [None]:
val_ds_batch = tf.keras.preprocessing.image_dataset_from_directory(train_set_directory,
    validation_split=0.1,
    subset="validation",
    seed=555,
    image_size=image_size,
    batch_size=None, #batch_size
)
# val_ds = val_ds_batch.prefetch(buffer_size=batch_size)

In [None]:
test_ds_batch = tf.keras.preprocessing.image_dataset_from_directory(test_set_directory,
    # validation_split=0.1,
    # subset="validation",
    seed=555,
    image_size=image_size,
    batch_size=None, # batch_size
)
# test_ds = test_ds_batch.prefetch(buffer_size=batch_size)

In [None]:
class_names = train_ds_batch.class_names
class_names

### Augmentations

In [None]:
def view_image(ds):
    image, label = next(iter(ds)) # extract 1 batch from the dataset
    image = image.numpy()
    label = label.numpy()
    
    fig = plt.figure(figsize=(10, 10))
    for i in range(9):
        ax = fig.add_subplot(3, 3, i+1, xticks=[], yticks=[])
        ax.imshow(image[i]) # .astype('uint8')
        ax.set_title(f"Label: {label[i]}")

In [None]:
# Instantiate augments
# we can apply as many augments we want and adjust the values accordingly
transforms = Compose([
            # Cutout(num_holes=2, max_h_size=4, max_w_size=4),
            # Normalize(),
            Rotate(limit=50, p=0.5),
            HorizontalFlip(p=0.5),
            # VerticalFlip(p=0.5),
            # ShiftScaleRotate(p=0.5),
            # RandomCrop(height=8, width=8, p=0.75),
        ])

In [None]:
def aug_fn(image, img_size, training=True):
    if training:
      data = {"image":image}
      aug_data = transforms(**data)
      aug_img = aug_data["image"]
      aug_img = tf.cast(aug_img/255.0, tf.float32)
      aug_img = tf.image.resize(aug_img, size=[img_size, img_size])
    else:
      aug_img = tf.cast(image/255.0, tf.float32)
      aug_img = tf.image.resize(aug_img, size=[img_size, img_size])
    return aug_img

In [None]:
def process_data(image, label, img_size, training):
    aug_img = tf.numpy_function(func=aug_fn, inp=[image, img_size, training], Tout=tf.float32)
    return aug_img, label

In [None]:
def set_shapes(img, label, img_shape=(32,32,3)):
    img.set_shape(img_shape)
    label.set_shape([])
    return img, label

In [None]:
# create dataset
AUTOTUNE = tf.data.experimental.AUTOTUNE

In [None]:
# create dataset
train_ds_alb = train_ds_batch.map(partial(process_data, img_size=image_size[0], training=True), num_parallel_calls=AUTOTUNE).prefetch(AUTOTUNE)
train_ds = train_ds_alb.map(set_shapes, num_parallel_calls=AUTOTUNE).batch(batch_size).prefetch(AUTOTUNE)
train_ds

In [None]:
val_ds_alb = val_ds_batch.map(partial(process_data, img_size=image_size[0], training=True), num_parallel_calls=AUTOTUNE).prefetch(AUTOTUNE)
val_ds = val_ds_alb.map(set_shapes, num_parallel_calls=AUTOTUNE).batch(batch_size).prefetch(AUTOTUNE)
val_ds

In [None]:
test_ds_alb = test_ds_batch.map(partial(process_data, img_size=image_size[0], training=False), num_parallel_calls=AUTOTUNE).prefetch(AUTOTUNE)
test_ds = test_ds_alb.map(set_shapes, num_parallel_calls=AUTOTUNE).batch(batch_size).prefetch(AUTOTUNE)
test_ds

# test_ds = test_ds_batch.batch(batch_size).prefetch(AUTOTUNE)
# test_ds

In [None]:
def show_image(img, fig_size=(4, 4)):
    # show image
    plt.figure(figsize=fig_size)
    plt.imshow(img.astype('uint8'))
    plt.show()

In [None]:
def lr_schedule(epoch):
    """Learning Rate Schedule

    Learning rate is scheduled to be reduced after 80, 120, 160, 180 epochs.
    Called automatically every epoch as part of callbacks during training.

    # Arguments
        epoch (int): The number of epochs

    # Returns
        lr (float32): learning rate
    """
    lr = 1e-3
    if epoch > 180:
        lr *= 0.5e-3
    elif epoch > 160:
        lr *= 1e-3
    elif epoch > 120:
        lr *= 1e-2
    elif epoch > 80:
        lr *= 1e-1
    print('Learning rate: ', lr)
    return lr

## Modelling

In [None]:
model_type = 'vision_transformer'

#### Implement multilayer perceptron (MLP)

In [None]:
def mlp(x, hidden_units, dropout_rate):
    for units in hidden_units:
        x = layers.Dense(units, activation=tf.nn.gelu)(x)
        x = layers.Dropout(dropout_rate)(x)
    return x

#### Implement patch creation as a layer

In [None]:
class Patches(layers.Layer):
    def __init__(self, patch_size):
        super(Patches, self).__init__()
        self.patch_size = patch_size

    def call(self, images):
        batch_size = tf.shape(images)[0]
        patches = tf.image.extract_patches(
            images=images,
            sizes=[1, self.patch_size, self.patch_size, 1],
            strides=[1, self.patch_size, self.patch_size, 1],
            rates=[1, 1, 1, 1],
            padding="VALID",
        )
        patch_dims = patches.shape[-1]
        patches = tf.reshape(patches, [batch_size, -1, patch_dims])
        return patches

    def get_config(self):
        config = super().get_config().copy()
        config.update({
            'patch_size': self.patch_size,
        })
        return config

#### Implement the patch encoding layer

The `PatchEncoder` layer will linearly transform a patch by projecting it into a
vector of size `projection_dim`. In addition, it adds a learnable position
embedding to the projected vector.

In [None]:
class PatchEncoder(layers.Layer):
    def __init__(self, num_patches, projection_dim):
        super(PatchEncoder, self).__init__()
        self.num_patches = num_patches
        self.projection = layers.Dense(units=projection_dim)
        self.position_embedding = layers.Embedding(
            input_dim=num_patches, output_dim=projection_dim
        )

    def call(self, patch):
        positions = tf.range(start=0, limit=self.num_patches, delta=1)
        encoded = self.projection(patch) + self.position_embedding(positions)
        return encoded

    def get_config(self):

        config = super().get_config().copy()
        config.update({
            'num_patches': self.num_patches,
            'position_embedding': self.position_embedding,
        })
        return config

#### Build the ViT model

The ViT model consists of multiple Transformer blocks,
which use the `layers.MultiHeadAttention` layer as a self-attention mechanism
applied to the sequence of patches. The Transformer blocks produce a
`[batch_size, num_patches, projection_dim]` tensor, which is processed via an
classifier head with softmax to produce the final class probabilities output.

Unlike the technique described in the [paper](https://arxiv.org/abs/2010.11929),
which prepends a learnable embedding to the sequence of encoded patches to serve
as the image representation, all the outputs of the final Transformer block are
reshaped with `layers.Flatten()` and used as the image
representation input to the classifier head.
Note that the `layers.GlobalAveragePooling1D` layer
could also be used instead to aggregate the outputs of the Transformer block,
especially when the number of patches and the projection dimensions are large.

In [None]:

def create_vit_classifier():
    inputs = layers.Input(shape=input_shape)
    # Augment data.
    # augmented = data_augmentation(inputs)
    augmented = inputs
    # Create patches.
    patches = Patches(patch_size)(augmented)
    # Encode patches.
    encoded_patches = PatchEncoder(num_patches, projection_dim)(patches)

    # Create multiple layers of the Transformer block.
    for _ in range(transformer_layers):
        # Layer normalization 1.
        x1 = layers.LayerNormalization(epsilon=1e-6)(encoded_patches)
        # Create a multi-head attention layer.
        attention_output = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=projection_dim, dropout=0.1
        )(x1, x1)
        # Skip connection 1.
        x2 = layers.Add()([attention_output, encoded_patches])
        # Layer normalization 2.
        x3 = layers.LayerNormalization(epsilon=1e-6)(x2)
        # MLP.
        x3 = mlp(x3, hidden_units=transformer_units, dropout_rate=0.1)
        # Skip connection 2.
        encoded_patches = layers.Add()([x3, x2])

    # Create a [batch_size, projection_dim] tensor.
    representation = layers.LayerNormalization(epsilon=1e-6)(encoded_patches)
    representation = layers.Flatten()(representation)
    representation = layers.Dropout(0.5)(representation)
    # Add MLP.
    features = mlp(representation, hidden_units=mlp_head_units, dropout_rate=0.5)
    # Classify outputs.
    logits = layers.Dense(num_classes)(features)
    # Create the Keras model.
    model = keras.Model(inputs=inputs, outputs=logits)
    return model


#### Compile, train, and evaluate the mode

In [None]:
# Prepare model model saving directory.
save_dir = os.path.join(model_directory, 'saved_models')
model_name = '%s_model.{epoch:03d}.h5' % model_type
if not os.path.isdir(save_dir):
    os.makedirs(save_dir)
filepath = os.path.join(save_dir, model_name)
filepath

In [None]:
# Prepare callbacks for model saving and for learning rate adjustment.
checkpoint = ModelCheckpoint(filepath=filepath,
                             monitor='val_accuracy',
                             verbose=1,
                             save_best_only=True)

lr_scheduler = LearningRateScheduler(lr_schedule)

lr_reducer = ReduceLROnPlateau(factor=np.sqrt(0.1),
                               cooldown=0,
                               patience=5,
                               min_lr=0.5e-6)

early_stopper = EarlyStopping(
    monitor="val_loss",
    min_delta=0.0001,
    patience=100,
    verbose=1,
    restore_best_weights=True
    )

callbacks = [checkpoint, lr_reducer, lr_scheduler, early_stopper]

In [None]:
optimizer = tfa.optimizers.AdamW(
    learning_rate=learning_rate, weight_decay=weight_decay
)

In [None]:
model = create_vit_classifier()
model.summary()

In [None]:
# model.compile(
#     optimizer=optimizer,
#     loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
#     metrics=[
#         keras.metrics.SparseCategoricalAccuracy(name="accuracy"),
#         keras.metrics.SparseTopKCategoricalAccuracy(5, name="top-5-accuracy"),
#     ],
# )

model.compile(
    optimizer='adam',
    loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=['accuracy'],
)

# model.compile(
#     # loss=losses.SparseCategoricalCrossentropy(),
#     # optimizer=optimizers.Adam(learning_rate=0.001),
#     loss=losses.SparseCategoricalCrossentropy(from_logits=True),
#     optimizer='adam',
#     metrics=['accuracy']
# )

In [None]:
history = model.fit(
    train_ds,
    # batch_size=batch_size,
    epochs=epochs,
    validation_data=val_ds,
    # validation_split=0.1,
    verbose=1,
    callbacks=callbacks,
)

In [None]:
scores = model.evaluate(test_ds)
print('Test loss:', scores[0])
print('Test accuracy:', scores[1])

### Save Final Model

In [None]:
# Save the model
final_model_filepath = os.path.join(save_dir, f'{model_type}_final_model.h5')
save_model(model, final_model_filepath)

# model = load_model(final_model_filepath)

In [None]:
final_model_filepath

### Test Predictions

In [None]:
# test_image = f'{test_set_directory}/silk_spot/img_01_425005700_00191.jpg'
test_image = f'{test_set_directory}/silk_spot/img_03_4406645900_00364.jpg'
# test_image = f'{test_set_directory}/silk_spot/img_03_3436786500_00071.jpg'
# test_image = f'{test_set_directory}/oil_spot/img_03_3402617700_00118.jpg'

In [None]:
img = tf.keras.utils.load_img(test_image, target_size=image_size)
img_array = tf.keras.utils.img_to_array(img)
# show_image(img_array)

In [None]:
img_array = img_array/255
img_array = tf.expand_dims(img_array, 0) # Create a batch
predictions = model.predict(img_array)
score = tf.nn.softmax(predictions[0])
print("{} : {:.2f} %".format(class_names[np.argmax(score)], 100 * np.max(score)))

### Inference Timings

In [None]:
import time

In [None]:
# single image inference timings
inference_timings = []

one_batch = iter(test_ds).get_next()

for index, img in enumerate(one_batch[0]):
    
    actual_category = one_batch[1][index]
    start_time = time.time()
    img_array = tf.expand_dims(img, 0) # Create a batch
    predictions = model.predict(img_array)
    score = tf.nn.softmax(predictions[0])
    inference_timings.append(time.time()-start_time)
        
#     print('Prediction time: {}'.format(time.time()-start_time))
#     print("Actual: {} \t|\t Predicted: {} : {:.2f} %".format(
#         class_names[actual_category],
#         class_names[np.argmax(score)], 
#         100 * np.max(score)
#     ))
#     break

print('Average inference time: {:.2f} ms'.format(np.array(inference_timings).mean()*1000))

In [None]:
# batch inference timings

test_batch_id = 3
one_batch = iter(test_ds).get_next()
start_time = time.time()
model.predict(one_batch[0])
end_time = time.time()-start_time

print('Total Prediction time: {:.2f} ms. Per image time: {:.2f} ms'.format(
    end_time*1000, 1000*end_time/batch_size
    ))

### **References**

In [None]:
# https://colab.research.google.com/github/keras-team/keras-io/blob/master/examples/vision/ipynb/image_classification_with_vision_transformer.ipynb#scrollTo=QBFIZ8AeCkK6