# Assignment: Use CNN to do image classification
Author: Long M. Luu

In [None]:
%%capture
!pip install tensorflow --upgrade
# Upgrade tensorflow from 2.2 to 2.3

In [None]:
# Load dataset CIFAR 10, source: https://www.cs.toronto.edu/~kriz/cifar.html
import tensorflow as tf

(x_train, y_train), (x_test, y_test) = tf.keras.datasets.cifar10.load_data()

In [None]:
# Classes of CIFAR 10
classes = ["airplane", "automobile", "bird", "cat", "deer", "dog", "frog", "horse", "ship", "truck"]

In [None]:
# Show the image with the corresponding label
import matplotlib.pyplot as plt

plt.figure(figsize=(10,10))
for i in range(36): # Plot 36 images
    plt.subplot(6,6,i+1)
    plt.xticks([])
    plt.yticks([])
    plt.grid(False)
    # START CODE HERE
    plt.imshow(x_train[i], cmap=plt.cm.binary) 
    plt.xlabel(classes[y_train[i][0]])
    # END CODE HERE
plt.show()

In [None]:
# Split dataset into train and valid
from sklearn.model_selection import train_test_split
X_train, X_valid, Y_train, Y_valid = train_test_split(x_train/255.0, y_train, test_size=0.2, shuffle=True, random_state=42)

In [None]:
# Check shape
print(X_train.shape)
print(X_valid.shape)
print(Y_train.shape)
print(Y_valid.shape)
print(x_test.shape)
print(y_test.shape)

In [None]:
# Normalize test data
x_test = x_test / 255.0

In [None]:
"""
TODO 1: Create Feedforward model as follows (suggested):
Flatten()
First layer: 1024, activation tanh, input_shape=(32, 32, 3)
Dropout layer: 0.42
Second layer: 512, activation tanh
Dropout layer: 0.42
Third layer: 256, activation tanh
Fourth layer: 10, activation softmax
"""
from tensorflow.keras.layers import Flatten, Dense, Dropout
def create_fnn_model():
    # START CODE HERE
    model = None
    # END CODE HERE
    model.compile(loss="sparse_categorical_crossentropy", optimizer="adam", metrics=["accuracy"])
    return model

fnn_model = create_fnn_model()
fnn_model.summary()

In [None]:
epochs=50
"""
Early Stopping callback
monitor: metrics that the Callback will watch
patience: integer, after n epochs if the monitor value does not improve, stop training
restore_best_weights: boolean, restore weights when monitor value is highest
"""
early_stop = tf.keras.callbacks.EarlyStopping(monitor="val_loss", patience=epochs//5, restore_best_weights=True)

In [None]:
# Train the model
fnn_model.fit(X_train, Y_train, epochs=20, batch_size=32, validation_data=(X_valid, Y_valid), callbacks=[early_stop])

In [None]:
# Evaluate the model
fnn_model.evaluate(x_test, y_test)

### It is clear that even with a lot of parameters, FNN cannot perform well (accuracy is pretty low), let's try CNN

In [None]:
"""
TODO 2: create a CNN model as follows:
Conv2D, 32 filters, kernel size (5, 5), input shape = (32, 32, 3), same padding
BatchNorm
Activation: relu
MaxPooling2D, same padding
Conv2D, 64 filters, kernel size (5, 5), same padding
BatchNorm
Activation: relu
MaxPooling2D, same padding
Dropout 0.42
Conv2D, 128 filters, kernel size (3, 3), valid padding
BatchNorm
MaxPooling2D, same padding
Flatten
Dropout, rate 0.42
Dense, 256 nodes, activation tanh
Dense, 128 nodes, activation tanh
Dense, 10 nodes, activation softmax
"""
from tensorflow.keras.layers import Conv2D, BatchNormalization, MaxPooling2D, Dense, Dropout, Flatten, Activation

def create_cnn_model():
    model = None
    model.compile(loss="sparse_categorical_crossentropy", optimizer="adam", metrics=["accuracy"])
    return model

cnn_model = create_cnn_model()
cnn_model.summary()

In [None]:
# Train the model, recall that epochs = 50
history = cnn_model.fit(X_train, Y_train, batch_size=32, epochs=epochs, 
                        validation_data=(X_valid, Y_valid), callbacks=[early_stop])

In [None]:
# Plot accuracy vs epoch
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['train', 'val'], loc='upper left')

In [None]:
# Plot loss vs epoch
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['train', 'val'], loc='upper left')

In [None]:
# Evaluate the model
# Global result: https://paperswithcode.com/sota/image-classification-on-cifar-10
eval = cnn_model.evaluate(x_test, y_test)

CNN with less paramters (but longer training time), performs a lot better than FNN. Let's see the wrong predictions

In [None]:
import numpy as np

prediction = cnn_model.predict(x_test)
label_prediction = np.argmax(prediction, axis=-1)

In [None]:
# Plot some predictions
plt.figure(figsize=(20,20))
for i in range(100):
    plt.subplot(10,10,i+1)
    plt.xticks([])
    plt.yticks([])
    plt.grid(False)
    plt.imshow(x_test[i], cmap=plt.cm.binary)
    plt.xlabel('True: {}, Pred: {}'.format(y_test[i], label_prediction[i]))
plt.show()

In [None]:
# Get index of wrong answers
wrongAns = []
for i in range(len(y_test)):
  if y_test[i][0] != label_prediction[i]:
    wrongAns.append(i)
len(wrongAns)

## Recall that `classes = ["airplane", "automobile", "bird", "cat", "deer", "dog", "frog", "horse", "ship", "truck"]`

In [None]:
# Plot wrong answers
plt.figure(figsize=(25,25))
for i in range(49):
    plt.subplot(7,7,i+1)
    plt.xticks([])
    plt.yticks([])
    plt.grid(False)
    plt.imshow(x_test[wrongAns[i]], cmap=plt.cm.binary)
    plt.xlabel('True: {}, Pred: {}'.format(y_test[wrongAns[i]], label_prediction[wrongAns[i]]))
plt.show()

Transfer learning  
The purpose is to take a pretrained model and run it with our data

In [None]:
def lrfn(epoch):
    """
    Learning rate callback, to maintain good lr but do not destroy the pretrained work

    Arguments:
        None
    Returns:
        next learning rate
    """
    start_lr = 0.00001
    min_lr = 0.00001
    max_lr = 0.00005*8
    rampup_epochs = 5
    sustain_epochs = 0
    exp_decay = .8

    if epoch < rampup_epochs :
        return (max_lr - start_lr)/rampup_epochs*epoch + start_lr
    elif epoch < rampup_epochs + sustain_epochs :
        return max_lr
    else:
        return (max_lr - min_lr) * exp_decay ** (epoch - rampup_epochs - sustain_epochs) + min_lr

lr_callback = tf.keras.callbacks.LearningRateScheduler(lambda epoch:lrfn(epoch), verbose=True)


In [None]:
"""
https://keras.io/api/applications/efficientnet/#efficientnetb1-function
Create pretrained model as follows:
EfficientNetB1, include_top=False, input_shape=(32, 32, 3), pooling="avg", trainable=False
Dropout, 0.42
Dense, 256, tanh
Dropout, 0.42
Dense, 128, tanh
Dense, 10, softmax

Code sample: https://codelabs.developers.google.com/codelabs/keras-flowers-transfer-learning/#3
https://ai.googleblog.com/2019/05/efficientnet-improving-accuracy-and.html
"""
def create_pretrained_model():
    model = None
    model.compile(loss="sparse_categorical_crossentropy", optimizer="adam", metrics=["accuracy"])
    return model
pretrained = create_pretrained_model()
pretrained.summary()


In [None]:
# Train the model, recall that epochs=50
history = pretrained.fit(X_train, Y_train, batch_size=32, epochs=epochs, 
                        validation_data=(X_valid, Y_valid), callbacks=[early_stop, lr_callback])

In [None]:
eval_pretrained = pretrained.evaluate(x_test, y_test)

## Tune the hyperparameters to match your desired output
### Next task: create the model to predict CIFAR 100 dataset.

In [None]:
import tensorflow as tf
(x_train_cif100, y_train_cif100), (x_test_cif100, y_test_cif100) = tf.keras.datasets.cifar100.load_data()

In [None]:
cif100_classes = ['apple', 'aquarium_fish', 'baby', 'bear', 'beaver', 'bed', 'bee', 'beetle', 'bicycle', 'bottle', 'bowl', 'boy', 'bridge', 'bus', 'butterfly', 
                    'camel', 'can', 'castle', 'caterpillar', 'cattle', 'chair', 'chimpanzee', 'clock', 'cloud', 'cockroach', 'couch', 'crab', 'crocodile', 'cup', 
                    'dinosaur', 'dolphin', 'elephant', 'flatfish', 'forest', 'fox', 'girl', 'hamster', 'house', 'kangaroo', 'computer_keyboard', 
                    'lamp', 'lawn_mower', 'leopard', 'lion', 'lizard', 'lobster', 'man', 'maple_tree', 'motorcycle', 'mountain', 'mouse', 'mushroom', 
                    'oak_tree', 'orange', 'orchid', 'otter', 'palm_tree', 'pear', 'pickup_truck', 'pine_tree', 'plain', 'plate', 'poppy', 'porcupine', 'possum', 
                    'rabbit', 'raccoon', 'ray', 'road', 'rocket', 'rose', 'sea', 'seal', 'shark', 'shrew', 'skunk', 'skyscraper', 'snail', 'snake', 'spider', 'squirrel', 
                  'streetcar', 'sunflower', 'sweet_pepper', 'table', 'tank', 'telephone', 'television', 'tiger', 'tractor', 'train', 'trout', 'tulip', 'turtle', 
                    'wardrobe', 'whale', 'willow_tree', 'wolf', 'woman', 'worm']

In [None]:
# Show the image with the corresponding label
import matplotlib.pyplot as plt

plt.figure(figsize=(10,10))
for i in range(36): # Plot 36 images
    plt.subplot(6,6,i+1)
    plt.xticks([])
    plt.yticks([])
    plt.grid(False)
    # START CODE HERE
    plt.imshow(x_train_cif100[i], cmap=plt.cm.binary) 
    plt.xlabel(cif100_classes[y_train_cif100[i][0]])
    # END CODE HERE
plt.show()

In [None]:
"""
ImageDataGenerator: use generator to train the model. After the input is given, it will be deleted to save space.
IDG also supports image augmentation.
IDG prevents RAM overflow if converting from image to numpy is space-consuming
"""
from tensorflow.keras.preprocessing.image import ImageDataGenerator

datagen = ImageDataGenerator(rescale=1./255, zca_whitening=True, 
                             horizontal_flip=True, vertical_flip=True, validation_split=0.2)
datagen.fit(x_train_cif100)

In [None]:
train_flow = datagen.flow(x_train_cif100, y_train_cif100, batch_size = 32, subset="training")
valid_flow = datagen.flow(x_train_cif100, y_train_cif100, batch_size = 32, subset="validation")

In [None]:
def create_cif100_model():
    # Create your model here
    model = None
    model.compile(loss="sparse_categorical_crossentropy", optimizer="adam", metrics=["accuracy"])
    return model

model_cif100 = create_cif100_model()
model_cif100.summary()

In [None]:
def lrfn(epoch):
    """
    Learning rate callback, to maintain good lr but do not destroy the pretrained work

    Arguments:
        None
    Returns:
        next learning rate
    """
    start_lr = 0.00001
    min_lr = 0.00001
    max_lr = 0.00005*8
    rampup_epochs = 5
    sustain_epochs = 0
    exp_decay = .8

    if epoch < rampup_epochs :
        return (max_lr - start_lr)/rampup_epochs*epoch + start_lr
    elif epoch < rampup_epochs + sustain_epochs :
        return max_lr
    else:
        return (max_lr - min_lr) * exp_decay ** (epoch - rampup_epochs - sustain_epochs) + min_lr

In [None]:
"""
EarlyStopping callback: if after n=(epochs//10), the monitor value does not increase, stop training
LearningRateScheduler: change learning rate bases on current epoch
ModelCheckpoint: save model after each epoch, but only keep the best (bases on monitor value)
"""

epochs = 100
early_stop = tf.keras.callbacks.EarlyStopping(monitor="val_accuracy", 
                                              patience=epochs//10, restore_best_weights=True)
lr_callback = tf.keras.callbacks.LearningRateScheduler(lambda epoch:lrfn(epoch),
                                                       verbose=True) # Optional, if use Transfer learning
model_cp = tf.keras.callbacks.ModelCheckpoint("weight_model_cp.h5", monitor="val_loss", save_best_only=True)

In [None]:
history = model_cif100.fit(train_flow, epochs=100, validation_data=valid_flow, callbacks=[early_stop, lr_callback, model_cp])

In [None]:
# Plot accuracy vs epoch
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['train', 'val'], loc='upper left')

In [None]:
# Plot loss vs epoch
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['train', 'val'], loc='upper left')

In [None]:
test_datagen = ImageDataGenerator(rescale=1./255)
test_datagen.fit(x_test_cif100)
test_flow = datagen.flow(x_test, y_test, batch_size=32)

In [None]:
# Global leaderboard: https://paperswithcode.com/sota/image-classification-on-cifar-100
prediction = model_cif100.evaluate(test_flow, steps=len(x_test_cif100)/32)