<a href="https://colab.research.google.com/github/juststudentinIT/Methods-of-Compression-and-Performance-Improvement-of-Deep-Neural-Networks/blob/main/Vision_Transformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# TensorFlow and tf.keras
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt

print(tf.__version__)

import tensorflow as tf
from tensorflow.keras import datasets, layers, models, losses, Model

2.8.0


In [None]:
fashion_mnist = tf.keras.datasets.fashion_mnist
(x_train,y_train),(x_test,y_test) = fashion_mnist.load_data()

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-labels-idx1-ubyte.gz
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-images-idx3-ubyte.gz
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-labels-idx1-ubyte.gz
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-images-idx3-ubyte.gz


In [None]:
print(f"x_train: {x_train.shape} - y_train: {y_train.shape}")
print(f"x_test: {x_test.shape} - y_test: {y_test.shape}")

x_train: (60000, 28, 28) - y_train: (60000,)
x_test: (10000, 28, 28) - y_test: (10000,)


In [None]:
x_train = tf.pad(x_train, [[0, 0], [2,2], [2,2]])/255
x_test = tf.pad(x_test, [[0, 0], [2,2], [2,2]])/255

x_train = tf.expand_dims(x_train, axis=3, name=None)
x_test = tf.expand_dims(x_test, axis=3, name=None)

x_train = tf.repeat(x_train, 3, axis=3)
x_test = tf.repeat(x_test, 3, axis=3)

In [None]:
learning_rate = 0.001
weight_decay = 0.0001
batch_size = 256
image_size = 72
patch_size = 6  
num_patches = (image_size // patch_size) ** 2
projection_dim = 64
num_heads = 4
transformer_units = [projection_dim * 2,projection_dim,] 
transformer_layers = 8
mlp_head_units = [2048, 1024]
input_shape = (32,32,3)
num_classes = 10

In [None]:
!pip install tensorflow_addons

Collecting tensorflow_addons
  Downloading tensorflow_addons-0.16.1-cp37-cp37m-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (1.1 MB)
[K     |████████████████████████████████| 1.1 MB 6.6 MB/s 
Installing collected packages: tensorflow-addons
Successfully installed tensorflow-addons-0.16.1


In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow_addons as tfa
data_augmentation = keras.Sequential(
    [
        layers.Normalization(),
        layers.Resizing(72, 72),
        layers.RandomFlip("horizontal"),
        layers.RandomRotation(factor=0.02),
        layers.RandomZoom(
            height_factor=0.2, width_factor=0.2
        ),
    ],
    name="data_augmentation",
)
data_augmentation.layers[0].adapt(x_train)

In [None]:
def mlp(x, hidden_units, dropout_rate):
    for units in hidden_units:
        x = layers.Dense(units, activation=tf.nn.gelu)(x)
        x = layers.Dropout(dropout_rate)(x)
    return x

In [None]:
class Patches(layers.Layer):
    def __init__(self, patch_size):
        super(Patches, self).__init__()
        self.patch_size = patch_size
 
    def call(self, images):
        batch_size = tf.shape(images)[0]
        patches = tf.image.extract_patches(
            images=images,
            sizes=[1, self.patch_size, self.patch_size, 1],
            strides=[1, self.patch_size, self.patch_size, 1],
            rates=[1, 1, 1, 1],
            padding="VALID",
        )
        patch_dims = patches.shape[-1]
        patches = tf.reshape(patches, [batch_size, -1, patch_dims])
        return patches


In [None]:
class PatchEncoder(layers.Layer):
    def __init__(self, num_patches, projection_dim):
        super(PatchEncoder, self).__init__()
        self.num_patches = num_patches
        self.projection = layers.Dense(units=projection_dim)
        self.position_embedding = layers.Embedding(
            input_dim=num_patches, output_dim=projection_dim
        )
 
    def call(self, patch):
        positions = tf.range(start=0, limit=self.num_patches, delta=1)
        encoded = self.projection(patch) + self.position_embedding(positions)
        return encoded
    def get_config(self):

        config = super().get_config().copy()
        config.update({
            'vocab_size': self.vocab_size,
            'num_layers': self.num_layers,
            'units': self.units,
            'd_model': self.d_model,
            'num_heads': self.num_heads,
            'dropout': self.dropout,
        })
        return config

In [None]:
def create_vit_classifier():
    inputs = layers.Input(shape= input_shape)
    augmented = data_augmentation(inputs)
    patches = Patches(patch_size)(augmented)
    encoded_patches = PatchEncoder(num_patches, projection_dim)(patches)
 
    for _ in range(transformer_layers):
        x1 = layers.LayerNormalization(epsilon=1e-6)(encoded_patches)
        attention_output = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=projection_dim, dropout=0.1
        )(x1, x1)
        x2 = layers.Add()([attention_output, encoded_patches])
        x3 = layers.LayerNormalization(epsilon=1e-6)(x2)
        x3 = mlp(x3, hidden_units=transformer_units, dropout_rate=0.1)
        encoded_patches = layers.Add()([x3, x2])
 
    representation = layers.LayerNormalization(epsilon=1e-6)(encoded_patches)
    representation = layers.Flatten()(representation)
    representation = layers.Dropout(0.5)(representation)
    features = mlp(representation, hidden_units=mlp_head_units, dropout_rate=0.5)
    logits = layers.Dense(num_classes)(features)
    model = keras.Model(inputs=inputs, outputs=logits)
    return model

In [None]:
optimizer = tfa.optimizers.AdamW(learning_rate=learning_rate, weight_decay=weight_decay)
model = create_vit_classifier()
model.compile(
    optimizer=optimizer,
    loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=[
       keras.metrics.SparseCategoricalAccuracy(name="accuracy")],)

In [None]:
history = model.fit(
    x=x_train,
    y=y_train,
    batch_size=batch_size,
    epochs=5,
    validation_split=0.1,)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


In [None]:
model.evaluate(x_test,y_test)



[0.3451457619667053, 0.8712999820709229]

##Post-training quantization

In [None]:
import tensorflow as tf

converter = tf.lite.TFLiteConverter.from_keras_model(model)
#converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_ops = [
  tf.lite.OpsSet.TFLITE_BUILTINS, # enable TensorFlow Lite ops.
  tf.lite.OpsSet.SELECT_TF_OPS
   # enable TensorFlow ops.
]
tflite_model = converter.convert()
#open("converted_model.tflite", "wb").write(tflite_model)



INFO:tensorflow:Assets written to: /tmp/tmpemo8vxwd/assets


INFO:tensorflow:Assets written to: /tmp/tmpemo8vxwd/assets


In [None]:
#Model with quantizes weights 16 float
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT] #DEFAULT Default optimization strategy that quantizes model weights.
converter.target_spec.supported_ops = [
  tf.lite.OpsSet.TFLITE_BUILTINS, # enable TensorFlow Lite ops.
  tf.lite.OpsSet.SELECT_TF_OPS
   # enable TensorFlow ops.
]
converter.target_spec.supported_types = [tf.float16]
tflite_model_quant_16 = converter.convert()



INFO:tensorflow:Assets written to: /tmp/tmp4_nz0w0i/assets


INFO:tensorflow:Assets written to: /tmp/tmp4_nz0w0i/assets


In [None]:
def representative_data_gen():
  for input_value in tf.data.Dataset.from_tensor_slices(x_train).batch(1).take(100):
    # Model has only one input so each data point has one element.
    yield [input_value]

converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_ops = [ tf.lite.OpsSet.TFLITE_BUILTINS_INT8, # enable TensorFlow Lite ops.
  tf.lite.OpsSet.SELECT_TF_OPS
   ]
converter.representative_dataset = representative_data_gen
converter.target_spec.supported_types = [tf.int8]
tflite_model_quant_8 = converter.convert()



INFO:tensorflow:Assets written to: /tmp/tmpknu8uetx/assets


INFO:tensorflow:Assets written to: /tmp/tmpknu8uetx/assets


In [None]:
import pathlib

tflite_models_dir = pathlib.Path("/tmp/mnist_tflite_models/")
tflite_models_dir.mkdir(exist_ok=True, parents=True)

# Save the unquantized/float model:
tflite_model_file = tflite_models_dir/"mnist_model.tflite"
tflite_model_file.write_bytes(tflite_model)

# Save the quantized model:
tflite_model_quant_16_file = tflite_models_dir/"model_quant16f.tflite"
tflite_model_quant_16_file.write_bytes(bytes(tflite_model_quant_16))
# Save the quantized model:
tflite_model_quant_8_file = tflite_models_dir/"model_quant8u.tflite"
tflite_model_quant_8_file.write_bytes(bytes(tflite_model_quant_8))

42823456

In [None]:
# Helper function to run inference on a TFLite model
def run_tflite_model(tflite_file, test_image_indices):
  global x_test
  x_test = x_test[-500:]

  # Initialize the interpreter
  interpreter = tf.lite.Interpreter(model_path=str(tflite_file)) #Interpreter interface for running TensorFlow Lite models.
  interpreter.allocate_tensors()

  input_details = interpreter.get_input_details()[0]
  output_details = interpreter.get_output_details()[0]

  predictions = np.zeros((len(test_image_indices),), dtype=int)
  for i, test_image_index in enumerate(test_image_indices):
    test_image = x_test[test_image_index]
    test_label = y_test[test_image_index]

    # Check if the input type is quantized, then rescale input data to uint8
    if input_details['dtype'] == np.uint8:
      input_scale, input_zero_point = input_details["quantization"]
      test_image = test_image / input_scale + input_zero_point


    test_image = np.expand_dims(test_image, axis=0).astype(input_details["dtype"])
    #we want to avoid copying, so we  use the tensor() function to get a numpy buffer pointing
    #to the input buffer in the tflite interpreter.
    interpreter.set_tensor(input_details["index"], test_image) 
    interpreter.invoke()
    output = interpreter.get_tensor(output_details["index"])[0]

    predictions[i] = output.argmax()
    #print(i)

  return predictions

In [None]:
# Helper function to evaluate a TFLite model on all images
import time
def evaluate_model(tflite_file, model_type):
  global x_test
  global y_test
  x_test = x_test[-500:]
  y_test = y_test[-500:]

  start_time = time.time()
  test_image_indices = range(x_test.shape[0])
  predictions = run_tflite_model(tflite_file, test_image_indices)
  #print("here pred")
  accuracy = (np.sum(y_test == predictions) * 100) / len(x_test)
  spent_time = time.time() - start_time
  print('%s model accuracy is %.4f%% (Number of test samples=%d) , time = %f' % (
      model_type, accuracy, len(x_test), spent_time/len(x_test) ))

In [None]:
import os
#print("Original_original model in Mb:", os.path.getsize(head_model) / float(2**20))
print("Original model in Mb:", os.path.getsize(tflite_model_file) / float(2**20))
print("Quantized 16f model in Mb:", os.path.getsize(tflite_model_quant_16_file) / float(2**20))
print("Quantized 8u model in Mb:", os.path.getsize(tflite_model_quant_8_file) / float(2**20))

Original model in Mb: 162.79111099243164
Quantized 16f model in Mb: 81.48712158203125
Quantized 8u model in Mb: 40.839630126953125


In [None]:
evaluate_model(tflite_model_file, model_type="Original")
evaluate_model(tflite_model_quant_16_file, model_type="Quantized 16float")
evaluate_model(tflite_model_quant_8_file, model_type="Quantized 8uint")

Original model accuracy is 89.0000% (Number of test samples=500) , time = 0.032268
Quantized 16float model accuracy is 89.0000% (Number of test samples=500) , time = 0.034737
Quantized 8uint model accuracy is 88.4000% (Number of test samples=500) , time = 0.652956
