In [0]:
'''
This notebook covers how to use tf.keras to build a classification model like what we talked about in the previous series.
'''

In [0]:
import os
import pickle
import numpy as np

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

tf.keras.backend.clear_session()  # For easy reset of notebook state.

# CHANGE THESE TO FIT YOUR FOLDER NAMES
base_folder = '##PUT YOUR BASE FOLDER HERE##'
data_folder = os.path.join( base_folder, '##YOUR CATS AND DOGS FOLDER##' )
save_folder = os.path.join( base_folder, '##WHERE YOU WANT TO SAVE YOUR MODELS##' )
input_shape = (224,224,3)

In [0]:
tf.__version__

In [0]:
def basic_classification_model(input_shape, model_name='basic_model'):
  inputs = keras.Input(shape=input_shape)
  x = layers.Conv2D(32, 3, padding='same')(inputs)
  x = layers.BatchNormalization()(x)
  x = layers.ReLU()(x)
  x = layers.Conv2D(64, 3, padding='same')(x)
  x = layers.BatchNormalization()(x)
  x = layers.ReLU()(x)
  x = layers.Conv2D(64, 3, padding='same')(x)
  x = layers.BatchNormalization()(x)
  x = layers.ReLU()(x)
  block_1_output = layers.MaxPooling2D(2)(x) # 112

  x = layers.Conv2D(64, 3, padding='same')(block_1_output)
  x = layers.BatchNormalization()(x)
  x = layers.ReLU()(x)
  x = layers.Conv2D(64, 3, padding='same')(x)
  x = layers.BatchNormalization()(x)
  x = layers.ReLU()(x)
  x = layers.add([x, block_1_output])
  x = layers.Conv2D(128, 3, padding='same')(x)
  x = layers.BatchNormalization()(x)
  x = layers.ReLU()(x)
  block_2_output = layers.MaxPooling2D(2)(x) #56

  x = layers.Conv2D(128, 3, padding='same')(block_2_output)
  x = layers.BatchNormalization()(x)
  x = layers.ReLU()(x)
  x = layers.Conv2D(128, 3, padding='same')(x)
  x = layers.BatchNormalization()(x)
  x = layers.ReLU()(x)
  x = layers.add([x, block_2_output])
  x = layers.Conv2D(256, 3, padding='same')(x)
  x = layers.BatchNormalization()(x)
  x = layers.ReLU()(x)
  block_3_output = layers.MaxPooling2D(2)(x) #28

  x = layers.Conv2D(256, 3, padding='same')(block_3_output)
  x = layers.BatchNormalization()(x)
  x = layers.ReLU()(x)
  x = layers.Conv2D(256, 3, padding='same')(x)
  x = layers.BatchNormalization()(x)
  x = layers.ReLU()(x)
  x = layers.add([x, block_3_output])
  x = layers.Conv2D(512, 3, padding='same')(x)
  x = layers.BatchNormalization()(x)
  x = layers.ReLU()(x)
  block_4_output = layers.MaxPooling2D(2)(x) #14

  x = layers.Conv2D(512, 3, padding='same')(block_4_output)
  x = layers.BatchNormalization()(x)
  x = layers.ReLU()(x)
  x = layers.Conv2D(512, 3, padding='same')(x)
  x = layers.BatchNormalization()(x)
  x = layers.ReLU()(x)
  x = layers.add([x, block_4_output])
  x = layers.Conv2D(1024, 3, padding='same')(x)
  x = layers.BatchNormalization()(x)
  x = layers.ReLU()(x)
  x = layers.MaxPooling2D(2)(x) #7

  x = layers.GlobalAveragePooling2D()(x) #1024

  x = layers.Dense(1024)(x)
  x = layers.BatchNormalization()(x)
  x = layers.ReLU()(x)

  x = layers.Dropout(0.5)(x) # Alternatively, we can use:
  x = layers.Dense(2)(x) # x = layers.Dense(1)(x)
  predictions = layers.Softmax()(x) # predictions = layers.Sigmoid()(x)

  model = keras.Model(inputs, predictions, name=model_name)
  model.compile( optimizer=tf.keras.optimizers.Adam(0.001),
                 loss=keras.losses.CategoricalCrossentropy(from_logits=False), # BinaryCrossentropy instead
                 metrics=['accuracy'] )
  return model
  
  # If you want to know more about optimizers: https://ruder.io/optimizing-gradient-descent/index.html

In [0]:
model_context = 'basic_model'
model = basic_classification_model(input_shape, model_name=model_context)

In [0]:
# To see what you've built, you can use model.summary()
model.summary()

In [0]:
# You can also view a graphical plot of the model
model_plot = tf.keras.utils.plot_model(model, show_shapes=True)
display(model_plot)

In [0]:
# To train the model, we first need data. 
# Inside data_folder there are two folders: "dog" and "cat", which have images of dogs and cats respectively.
# We will use keras's built-in utilities to read the images from these folders and feed them into our model for training.

# But first, let's look at some of the data

In [0]:
cat_folder = os.path.join( data_folder, 'cat' )
cat_files = list(os.listdir( cat_folder ))
dog_folder = os.path.join( data_folder, 'dog' )
dog_files = list(os.listdir( dog_folder ))

In [0]:
from IPython.display import Image
# Look at the first cat image
Image(filename= os.path.join( cat_folder, cat_files[0] ) )

In [0]:
# Look at the last dog image
Image(filename= os.path.join( dog_folder, dog_files[-1] ) )

In [0]:
# Set up the data generators to read from our data_folder
bs = 32 # The batch size is 32

# An object that applies transformations to the images before they are consumed by the model
# These transformations include (1) preprocessing, like rescaling or normalization (2) data augmentation
datagen = tf.keras.preprocessing.image.ImageDataGenerator(
        rescale=1./255, # divide each pixel value by 255. Each pixel is in the range 0-255, so after division it is in 0-1
        rotation_range=20, # rotate the image between -20 to +20 degrees
        width_shift_range=0.2, # translate the image left-right for 20% of the image's width
        height_shift_range=0.2, # same, for up-down and height
        zoom_range=0.2,
        horizontal_flip=True,
        validation_split=0.2)
print('Making training data generator...')
train_gen = datagen.flow_from_directory(
        data_folder,
        target_size=input_shape[:2],
        batch_size=bs,
        subset='training')
print('Making validation data generator...')
val_gen = datagen.flow_from_directory(
        data_folder,
        target_size=input_shape[:2],
        batch_size=bs,
        subset='validation')

In [0]:
# Callbacks are useful to help you monitor the progress of the training as it is going on
# and to intervene in between if certain conditions are met.

# This monitors the validation loss of the model as it is training, saving a full copy of the model and it's specs everytime the
# validation loss reaches an unprecedented low.
model_checkpoint = tf.keras.callbacks.ModelCheckpoint(
    filepath=os.path.join( save_folder, '{}-best_val_loss.h5'.format(model_context) ),
    save_weights_only=False,
    monitor='val_loss',
    mode='auto',
    save_best_only=True)

# If the validation loss doesn't improve for 20 epochs, stop training
earlystopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=20)

# If the validation loss doesn't improve for 5 epochs, reduce the learning rate to 0.2 times it's previous value
reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5)

In [0]:
# Start training the model
n_epochs=50
model.fit(train_gen,
          epochs=n_epochs,
          steps_per_epoch=train_gen.n // bs,
          validation_data=val_gen,
          validation_steps=val_gen.n // bs,
          callbacks=[model_checkpoint, earlystopping, reduce_lr])

In [0]:
# Let's load an independent image to try our model on
# CHANGE TO YOUR FOLDER AND IMAGE NAMES
test_img_path = os.path.join(base_folder, 'src_images', 'boss.jpg')

In [0]:
from tensorflow.keras.preprocessing.image import load_img, img_to_array

def run_image_on_model(img_path, model, label_map):
  pil_img = load_img(test_img_path)
  pil_img = pil_img.resize( input_shape[:2] )
  img_arr = img_to_array(pil_img)
  # Remember to normalize the image values the same way you did when you trained the model
  img_arr = img_arr / 255.
  # We need to wrap this in an np.array with dimensions (b,H,W,C). Currently, the shape is only (H,W,C)
  img_arr = np.array( [img_arr] )
  pred = model.predict(img_arr, batch_size=1)[0]
  pred_idx = np.argmax(pred)
  return label_map[pred_idx]

In [0]:
# The generator's internal labeling of cat/dog
print(train_gen.class_indices)

In [0]:
# Construct a reverse mapping
label_map = {v:k for k,v in train_gen.class_indices.items()}
label_map

In [0]:
Image(filename=test_img_path)

In [0]:
# Your results may vary here. It's possible that your model will predict correctly
print( 'model prediction: {}'.format(run_image_on_model(test_img_path, model, label_map)) )

In [0]:
# Let's try and improve the accuracy of the model by applying transfer learning.
# The key idea behind transfer learning is to leverage on a more powerful model trained on a different but related task.
# In this case, we are going to use a pre-trained ResNet50 from the keras applications module, that was pre-trained on a 1000-category classification dataset
# Both are classification tasks, so they are related. However, the pre-trained model predicts a probability vector between 1000 classes
# So we'd need to modify it to only predict cat and dog classes.

# The features extracted by the powerful model are better than what we have now. The idea is to use these features to 
# "warm-up" our own model.
# Once the model is sufficiently warmed up, we can train both parts together.

In [0]:
# When include_top=False, we are discarding the 1000 category predictions
transfer_model = tf.keras.applications.ResNet50(input_shape=input_shape, include_top=False)

In [0]:
transfer_model.summary()

In [0]:
# Notice that we are left with a 7x7 square of depth 2048.
# We will apply GAP to reduce this tensor to a vector of length 2048, and train a classifier at the end to distinguish between two classes
# But first, we should disable training for the ResNet50 temporarily:
for layer in transfer_model.layers:
  layer.trainable = False

In [0]:
# Inspect the model to see that the number of trainable params is zero
transfer_model.summary()

In [0]:
# Now let's rig together a new model
def transfer_learning_model(input_shape, base_model, model_name='transfer_learning_model'):
  # Freeze the base model
  for layer in base_model.layers:
    layer.trainable = False
  inputs = keras.Input(shape=input_shape)
  # First, run the input through the power model. x contains good extracted features.
  x = base_model(inputs)
  # Notice that the rest below are more or less the same
  x = layers.GlobalAveragePooling2D()(x) #2048

  x = layers.Dense(1024)(x)
  x = layers.BatchNormalization()(x)
  x = layers.ReLU()(x)
  
  x = layers.Dropout(0.5)(x)
  x = layers.Dense(2)(x)
  predictions = layers.Softmax()(x) # predictions = layers.Sigmoid()(x)

  model = keras.Model(inputs, predictions, name=model_name)
  # Fine tuning requires a lower learning rate. The pre-trained model will be upset by the new rookie layers otherwise.
  model.compile( optimizer=tf.keras.optimizers.Adam(0.00001),
                 loss=keras.losses.CategoricalCrossentropy(from_logits=False),
                 metrics=['accuracy'] )
  return model

In [0]:
transfer_learning_model = transfer_learning_model(input_shape, transfer_model)

In [0]:
# Check that the output is as expected
transfer_learning_model.summary()

In [0]:
# Use the same callbacks, but with a different model_context
model_context = 'transfer_learning'

model_checkpoint = tf.keras.callbacks.ModelCheckpoint(
    filepath=os.path.join( save_folder, '{}-best_val_loss.h5'.format(model_context) ),
    save_weights_only=False,
    monitor='val_loss',
    mode='auto',
    save_best_only=True)

# If the validation loss doesn't improve for 20 epochs, stop training
earlystopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=20)

# If the validation loss doesn't improve for 5 epochs, reduce the learning rate to 0.2 times it's previous value
reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5)

In [0]:
# Warm up for 10 epochs
n_epochs_warmup=10
# Followed by 40 epochs with all params trainable
n_epochs_fullblast=40

print('Warming up for {} epochs...'.format(n_epochs_warmup))
transfer_learning_model.fit(train_gen,
          epochs=n_epochs_warmup,
          steps_per_epoch=train_gen.n // bs,
          validation_data=val_gen,
          validation_steps=val_gen.n // bs,
          callbacks=[model_checkpoint, earlystopping, reduce_lr])


print('Done. Unfreezing all layers and training for {} more epochs...'.format(n_epochs_fullblast))
# After the warm-up, unfreeze all the layers of the base ResNet50
for layer in transfer_model.layers:
  layer.trainable = True

transfer_learning_model.fit(train_gen,
          epochs=n_epochs_fullblast,
          steps_per_epoch=train_gen.n // bs,
          validation_data=val_gen,
          validation_steps=val_gen.n // bs,
          callbacks=[model_checkpoint, earlystopping, reduce_lr])

In [0]:
# Let's try again with the new transfer-learned model
Image(filename=test_img_path)

In [0]:
print( 'model prediction: {}'.format(run_image_on_model(test_img_path, transfer_learning_model, label_map)) )

In [0]:
# Summary
# Transfer learning and fine-tuning are good options to explore if you want to improve your model's performance. You need to
# be mentally prepared to carefully tune learning rates and warm-up procedures though.