<a href="https://colab.research.google.com/github/robagby/Computer-Vision/blob/main/ResNetv2/Image_Classification_with_ResNetv2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
%tensorflow_version 2.x
import tensorflow as tf
from tensorflow import keras
import tensorflow_datasets as tfds

from keras.layers import Input, Activation, Conv2D, Dense, Dropout, BatchNormalization, ReLU, DepthwiseConv2D, GlobalAveragePooling2D, GlobalMaxPooling2D, Add
from keras.models import Model
from keras import regularizers

import math
import numpy as np
from functools import reduce
import matplotlib.pyplot as plt

import os
import warnings

In [None]:
device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
  raise SystemError('GPU device not found')
print('Found GPU at: {}'.format(device_name))

Found GPU at: /device:GPU:0


In [None]:
num_classes       = 10
num_channels      = 3
num_rows          = 32
num_cols          = 32
num_crop_rows     = 28
num_crop_cols     = 28

cifar_mean        = np.array([[[125.30691805, 122.95039414, 113.86538318]]]) 
cifar_std         = np.array([[[ 62.99321928,  62.08870764,  66.70489964]]]) 

batch_size        = 32
shuffle_buffer    = 5000
max_lr            = 0.001

initial_lr_scale  = 0.01
initial_lr_epochs = 5
end_lr_scale      = 0.01
end_lr_epochs     = 55

num_epochs        = initial_lr_epochs + end_lr_epochs
initial_lr        = max_lr*initial_lr_scale
end_lr            = max_lr*end_lr_scale

save_path         = './save/model/'
!mkdir -p "$save_path"

In [None]:
def preprocess_train(train):
    image = train["image"]
    label = train["label"]
  
    image = tf.math.divide(tf.math.subtract(tf.dtypes.cast(image, tf.float32), cifar_mean), cifar_std)
    image = tf.image.random_flip_left_right(image)
    image = tf.image.random_crop(image, size=[num_crop_rows, num_crop_cols, 3])

    label = tf.dtypes.cast(label, tf.int32)
    
    return image, label


def preprocess_test(test):
    image = test["image"]
    label = test["label"]

    crows = (num_rows - num_crop_rows) // 2
    ccols = (num_cols - num_crop_cols) // 2

    image = tf.math.divide(tf.math.subtract(tf.dtypes.cast(image, tf.float32), cifar_mean), cifar_std)
    image = tf.image.crop_to_bounding_box(image, crows, ccols, num_crop_rows, num_crop_cols)

    label = tf.dtypes.cast(label, tf.int32)
    
    return image, label

In [None]:
def lr_schedule(epoch):
    # Linear warmup followed by cosine decay.
    if epoch < initial_lr_epochs:
        lr = (max_lr - initial_lr)*(float(epoch)/initial_lr_epochs) + initial_lr
    else:
        lr = (max_lr - end_lr)*max(0.0, math.cos(((float(epoch) - initial_lr_epochs)/(end_lr_epochs - 1.0))*(math.pi/2.0))) + end_lr

    return lr

In [None]:
def plot_training_curves(history):
    # Training and validation data accuracy
    acc     = history.history['accuracy']
    val_acc = history.history['val_accuracy']

    # Training and validation loss.
    loss     = history.history['loss']
    val_loss = history.history['val_loss']

    # Plot accuracy.
    plt.figure(figsize=(8, 8))
    plt.subplot(2, 1, 1)
    plt.plot(acc, label='Training Accuracy')
    plt.plot(val_acc, label='Validation Accuracy')
    plt.legend(loc='lower right')
    plt.ylabel('Accuracy')
    plt.ylim([min(plt.ylim()), 1])
    plt.title('Training and Validation Accuracy')

    # Plot loss.
    plt.subplot(2, 1, 2)
    plt.plot(loss, label='Training Loss')
    plt.plot(val_loss, label='Validation Loss')
    plt.legend(loc='upper right')
    plt.ylabel('Cross Entropy')
    plt.ylim([0, 2.0])
    plt.title('Training and Validation Loss')
    plt.xlabel('Epoch')
    plt.show()

In [None]:
train = tfds.load("cifar10", split=tfds.Split.TRAIN)
test  = tfds.load("cifar10", split=tfds.Split.TEST)

# Preprocess training dataset.
train = train.map(preprocess_train, num_parallel_calls=4)
train = train.shuffle(buffer_size=shuffle_buffer)
train = train.batch(batch_size)
train = train.prefetch(buffer_size=1)

# Preprocess testing dataset.
test  = test.map(preprocess_test, num_parallel_calls=4)
test  = test.batch(batch_size)
test  = test.prefetch(buffer_size=1)

[1mDownloading and preparing dataset cifar10/3.0.2 (download: 162.17 MiB, generated: 132.40 MiB, total: 294.58 MiB) to /root/tensorflow_datasets/cifar10/3.0.2...[0m


Dl Completed...: 0 url [00:00, ? url/s]

Dl Size...: 0 MiB [00:00, ? MiB/s]

Extraction completed...: 0 file [00:00, ? file/s]






0 examples [00:00, ? examples/s]

Shuffling and writing examples to /root/tensorflow_datasets/cifar10/3.0.2.incompleteMZZOP0/cifar10-train.tfrecord


  0%|          | 0/50000 [00:00<?, ? examples/s]

0 examples [00:00, ? examples/s]

Shuffling and writing examples to /root/tensorflow_datasets/cifar10/3.0.2.incompleteMZZOP0/cifar10-test.tfrecord


  0%|          | 0/10000 [00:00<?, ? examples/s]

[1mDataset cifar10 downloaded and prepared to /root/tensorflow_datasets/cifar10/3.0.2. Subsequent calls will reuse this data.[0m


In [None]:
class ResNetv2(Model):


    def __init__(self, num_classes, momentum=.99, epsilon=.001, channels=3, blocks_shape=(4,6,3,)):
        super(ResNetv2, self).__init__()
        self.encoder_tail = Conv2D(16, 3, strides=1, padding='same', activation=None, use_bias=False)
        filters           = 16
        self.blocks       = []
        # Iterate over levels of ResNet.
        for i in range(3):
            self.blocks.append(ResNetv2.ConvBlock(filters, strides=2))
            self.blocks.extend([ResNetv2.ResidualBlock(filters, strides=1) for _ in range(blocks_shape[i])])

            filters *= 2

        self.batch_norm   = BatchNormalization(axis=-1, momentum=momentum, epsilon=epsilon, center=True, scale=True)
        self.relu         = ReLU()

        self.pool         = GlobalAveragePooling2D()
        self.decoder      = Dense(num_classes, activation='softmax')


    def call(self, inputs):
        # Encoder - Tail.
        x = self.encoder_tail(inputs)
        for layer in self.blocks:
            x = layer(x)

        x = self.batch_norm(x)
        x = self.relu(x)
        # Encoder - Output.
        encoder_output = x
        # Decoder.
        y              = self.pool(encoder_output)
        decoder_output = self.decoder(y) 
        return decoder_output  


    class ConvBlock(keras.layers.Layer):


        def __init__(self, filters, strides=2, momentum=.99, epsilon=.001):
            super(ResNetv2.ConvBlock, self).__init__()
            self.bn1   = BatchNormalization(axis=-1, momentum=momentum, epsilon=epsilon, center=True, scale=True)
            self.relu1 = ReLU()
            self.conv1 = Conv2D(filters, 1, strides=strides, padding='same', activation=None, use_bias=False)

            self.bn2   = BatchNormalization(axis=-1, momentum=momentum, epsilon=epsilon, center=True, scale=True)
            self.relu2 = ReLU()
            self.conv2 = Conv2D(filters, 3, strides=1, padding='same', activation=None, use_bias=False)

            self.bn3   = BatchNormalization(axis=-1, momentum=momentum, epsilon=epsilon, center=True, scale=True)
            self.relu3 = ReLU()
            self.conv3 = Conv2D(4*filters, 1, strides=1, padding='same', activation=None, use_bias=False)

            self.conv4 = Conv2D(4*filters, 1, strides=strides, padding='same', activation=None, use_bias=False)
            self.add   = Add()


        def call(self, inputs):
            residual = self.bn1(inputs)
            residual = self.relu1(residual)
            residual = self.conv1(residual)
            residual = self.bn2(residual)
            residual = self.relu2(residual)
            residual = self.bn3(residual)
            residual = self.relu3(residual)
            residual = self.conv3(residual)

            x        = self.conv4(inputs)
            x        = self.add([x, residual])
            return x


    class ResidualBlock(keras.layers.Layer):


        def __init__(self, filters, strides=1, momentum=.99, epsilon=.001):
            super(ResNetv2.ResidualBlock, self).__init__()
            self.bn1   = BatchNormalization(axis=-1, momentum=momentum, epsilon=epsilon, center=True, scale=True)
            self.relu1 = ReLU()
            self.conv1 = Conv2D(filters, 1, strides=strides, padding='same', activation=None, use_bias=False)

            self.bn2   = BatchNormalization(axis=-1, momentum=momentum, epsilon=epsilon, center=True, scale=True)
            self.relu2 = ReLU()
            self.conv2 = Conv2D(filters, 3, strides=strides, padding='same', activation=None, use_bias=False)

            self.bn3   = BatchNormalization(axis=-1, momentum=momentum, epsilon=epsilon, center=True, scale=True)
            self.relu3 = ReLU()
            self.conv3 = Conv2D(4*filters, 1, strides=strides, padding='same', activation=None, use_bias=False)

            self.add   = Add()


        def call(self, inputs):
            residual = self.bn1(inputs)
            residual = self.relu1(residual)
            residual = self.conv1(residual)
            residual = self.bn2(residual)
            residual = self.relu2(residual)
            residual = self.bn3(residual)
            residual = self.relu3(residual)
            residual = self.conv3(residual)

            x        = self.add([inputs, residual])
            return x

In [None]:
model     = ResNetv2(num_classes)

optimizer = tf.keras.optimizers.Adam(initial_lr)
loss      = 'sparse_categorical_crossentropy'
metrics   = ['accuracy']

callbacks = [
             keras.callbacks.LearningRateScheduler(lr_schedule),
             ]

model.compile(optimizer=optimizer, loss=loss, metrics=metrics)

In [None]:
with tf.device('/device:GPU:0'):
    history = model.fit(train, batch_size=batch_size, epochs=num_epochs, callbacks=callbacks, validation_data=test)

Epoch 1/60
Epoch 2/60
Epoch 3/60
Epoch 4/60
Epoch 5/60
Epoch 6/60
Epoch 7/60
Epoch 8/60
Epoch 9/60
Epoch 10/60


In [None]:
plot_training_curves(history)

In [None]:
loss, acc = model.evaluate(test)

print('Test Loss:     ', loss)
print('Test Accuracy: ', acc)