<a href="https://colab.research.google.com/github/kaka-lin/ML-Notes/blob/master/TensorFlow/keras/01_classification_mnist_model_subclassing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Post Training Quantization - Dynamic range quantization

In [1]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

from tqdm import tqdm
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Flatten, Conv2D

%matplotlib inline

print(tf.__version__)

2.9.1


## Load data and preprocess data

這邊使用 `MNIST dataset`

In [2]:
def load_data():
    mnist_dataset = tf.keras.datasets.mnist.load_data()
    (x_train, y_train), (x_test, y_test) = mnist_dataset
    
    ### Preprocess the data
    # normalization
    # scale pixel value from 0:255 to 0:1
    x_train = x_train / 255.0
    x_test = x_test / 255.0

    # Add a channels dimension
    x_train = x_train[..., tf.newaxis]
    x_test = x_test[..., tf.newaxis]
    
    return (x_train, y_train), (x_test, y_test)

(x_train, y_train), (x_test, y_test) = load_data()

print("Training images: ", x_train.shape)
print("Training lables: ", y_train.shape)
print("Testing images: ", x_test.shape)
print("Testing labels: ", y_test.shape)
print("data type: ", type(x_train))

Training images:  (60000, 28, 28, 1)
Training lables:  (60000,)
Testing images:  (10000, 28, 28, 1)
Testing labels:  (10000,)
data type:  <class 'numpy.ndarray'>


## Preprocess the dataset

[tf.data.Dataset](https://www.tensorflow.org/versions/r2.0/api_docs/python/tf/data/Dataset?hl=zh_tw)

- `from_tensor_silces`: Creates a `Dataset` whose elements are slices of the given tensors. 

    Args:
        shuffle:  uses a fixed-size buffer to shuffle the items as they pass through. 
        repeat:   restarts the Dataset when it reachs the end.
                  To limit the number of epochs, set the count argument.
        batch:    collects a number of examples and stacks them, to create batches.

In [3]:
train_dataset = tf.data.Dataset.from_tensor_slices(
    (x_train, y_train)).shuffle(10000).batch(32)

test_dataset = tf.data.Dataset.from_tensor_slices(
    (x_test, y_test)).batch(32)

## Build the model 

### Model subclassing

使用 `Model subclassing` 建立基本模型

In [4]:
class MyModel(tf.keras.Model):
    def __init__(self):
        super(MyModel, self).__init__()
        
        # Define your layer here
        self.conv1 = Conv2D(32, 3, activation='relu')
        self.flatten = Flatten()
        self.dense1 = Dense(128, activation='relu')
        self.dense2 = Dense(10, activation='softmax')
    
    def call(self, x):
        # Define your forward pass here
        x = self.conv1(x)
        x = self.flatten(x)
        x = self.dense1(x)
        x = self.dense2(x)
        return x

###### Model subclassing using model.summary()

- [model.summary() can't print output shape while using subclass model](https://stackoverflow.com/questions/55235212/model-summary-cant-print-output-shape-while-using-subclass-model)
- [keras 中 model.summary() 輸出的 output shape 為 multiple 解決辦法](https://blog.csdn.net/qq_42074335/article/details/120443553)

In [5]:
model = MyModel()

# If we want to using model.summary()
model.build(input_shape=(None, 28, 28, 1))
model.call(Input(shape=(28, 28, 1)))
model.summary()

Model: "my_model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 26, 26, 32)        320       
                                                                 
 flatten (Flatten)           (None, 21632)             0         
                                                                 
 dense (Dense)               (None, 128)               2769024   
                                                                 
 dense_1 (Dense)             (None, 10)                1290      
                                                                 
Total params: 2,770,634
Trainable params: 2,770,634
Non-trainable params: 0
_________________________________________________________________


### Compile the model

Before the model is ready for training, it needs a few more setting.

These are added during the model's `compile` step:

- Loss function
- Optimizer
- Metrics


In [6]:
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy()
optimizer = tf.keras.optimizers.Adam()

train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')
test_loss = tf.keras.metrics.Mean(name='test_loss')
test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='test_accuracy')

## Training the mode

Use [tf.GradientTape](https://www.tensorflow.org/versions/r2.0/api_docs/python/tf/GradientTape?hl=zh_tw)

In [7]:
@tf.function
def train_step(model, x_batch, y_batch, optimizer, loss_fn,
               train_loss, train_accuracy):
    with tf.GradientTape() as tape:
        predictions = model(x_batch, training=True)
        loss = loss_fn(y_batch, predictions)

    # backward
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    
    # Update training metric after batch
    train_loss.update_state(loss)
    train_accuracy.update_state(y_batch, predictions)

In [8]:
@tf.function
def test_step(model, x_batch, y_batch, loss_fn,
              test_loss, test_accuracy):
    # forward
    predictions = model(x_batch, training=False)
    t_loss = loss_fn(y_batch, predictions)

    test_loss.update_state(t_loss)
    test_accuracy.update_state(y_batch, predictions)

In [9]:
#tf.keras.backend.set_floatx('float64')

EPOCHS = 5
n_batches = len(train_dataset)
for epoch in range(EPOCHS):
    print(f'Epoch {epoch+1}/{EPOCHS}')
    with tqdm(train_dataset, total=n_batches,
              bar_format='{desc:<5.5}{percentage:3.0f}%|{bar:30}{r_bar}') as pbar:
        for batch, (x_train, y_train) in enumerate(pbar):
            train_step(model, x_train, y_train, optimizer, loss_fn,
                       train_loss, train_accuracy)
            pbar.set_postfix({
                'loss': train_loss.result().numpy(),
                'accuracy': train_accuracy.result().numpy()})
    
    # Reset the metrics at the start of the next epoch
    train_loss.reset_states()
    train_accuracy.reset_states()
    
# save model
model.save('model_data/baseline_model')

Epoch 1/5


     100%|██████████████████████████████| 1875/1875 [00:05<00:00, 354.60it/s, loss=0.147, accuracy=0.956]


Epoch 2/5


     100%|██████████████████████████████| 1875/1875 [00:04<00:00, 460.89it/s, loss=0.0458, accuracy=0.985]


Epoch 3/5


     100%|██████████████████████████████| 1875/1875 [00:04<00:00, 465.40it/s, loss=0.0256, accuracy=0.992]


Epoch 4/5


     100%|██████████████████████████████| 1875/1875 [00:04<00:00, 453.04it/s, loss=0.0148, accuracy=0.995]


Epoch 5/5


     100%|██████████████████████████████| 1875/1875 [00:04<00:00, 455.06it/s, loss=0.00981, accuracy=0.997]


INFO:tensorflow:Assets written to: model_data/baseline_model/assets


INFO:tensorflow:Assets written to: model_data/baseline_model/assets


## Conver to TFLite model

In [10]:
# Convert the model
converter = tf.lite.TFLiteConverter.from_saved_model('model_data/baseline_model')                                          
tflite_model = converter.convert()

# Save the model.
with open('model_data/model.tflite', 'wb') as f:
    f.write(tflite_model)

In [11]:
def evaluate_model(interpreter):
    input_index = interpreter.get_input_details()[0]["index"]
    output_index = interpreter.get_output_details()[0]["index"]
    
    # Run predictions on every image in the "test" dataset.
    prediction_digits = []
    for test_image in x_test:
        # Pre-processing: add batch dimension and convert to float32 to match with
        # the model's input data format.
        test_image = np.expand_dims(test_image, axis=0).astype(np.float32)
        interpreter.set_tensor(input_index, test_image)
        
        # Run inference.
        interpreter.invoke()
        
        # Post-processing: remove batch dimension and find the digit with highest
        # probability.
        output = interpreter.tensor(output_index)
        digit = np.argmax(output()[0])
        prediction_digits.append(digit)
    
    # Compare prediction results with ground truth labels to calculate accuracy.
    accurate_count = 0
    for index in range(len(prediction_digits)):
        if prediction_digits[index] == y_test[index]:
            accurate_count += 1
    accuracy = accurate_count * 1.0 / len(prediction_digits)

    return accuracy

In [12]:
# Load the model into the interpreters
interpreter = tf.lite.Interpreter(model_path=str('model_data/model.tflite'))
interpreter.allocate_tensors()

# evaluate the TF Lite model
print(evaluate_model(interpreter))

INFO: Created TensorFlow Lite XNNPACK delegate for CPU.


0.984
