<a href="https://colab.research.google.com/github/olcaykursun/ML/blob/main/neuralnets/Transfer_Learning_with_CNNs.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<b>Transfer Learning Implementation and Usage with CNNs</b>  
AUM Machine Learning, Dr. Olcay Kursun, okursun@aum.edu  
Date: 04/02/2024 (Spring 2024)  

Description: This script demonstrates the implementation of a neural network model using TensorFlow and Keras. It covers the construction, compilation, and training of the model, along with examples of transfer learning and feature extraction.


In [None]:
import numpy as np
from tensorflow.keras.datasets import fashion_mnist
from tensorflow.keras.utils import to_categorical

# Load the full dataset
(full_x_train, full_y_train), (full_x_test, full_y_test) = fashion_mnist.load_data()

# Normalize the pixel values
full_x_train, full_x_test = full_x_train / 255.0, full_x_test / 255.0

full_x_train.shape

(60000, 28, 28)

In [None]:
# Expand dimensions to include channel information
full_x_train = np.expand_dims(full_x_train, -1)
full_x_test = np.expand_dims(full_x_test, -1)

full_x_train.shape

(60000, 28, 28, 1)

In [None]:
# Let's say we keep the first n1 classes, build our features, and transfer that knowledge of how to extract features
# to solve a different problem: A problem with few examples that belong to a totally new set of classes
# for example if n1=8 then we remove classes 8 and 9 for this example and use classes 0-7 for learning good features
n1 = 6        # We will use all the examples of these n1 classes: base task is to learn the base model
n2 = 10 - n1  # We will use only 5 examples: target task is uses base to solve this new problem with a small dataset

base_classes = range(n1)
idx_train = np.isin(full_y_train, base_classes)
idx_test = np.isin(full_y_test, base_classes)

base_x_train, base_y_train = full_x_train[idx_train], full_y_train[idx_train]
base_x_test, base_y_test = full_x_test[idx_test], full_y_test[idx_test]

# Convert labels to one-hot encoding
base_y_train = to_categorical(base_y_train, num_classes=n1)
base_y_test = to_categorical(base_y_test, num_classes=n1)

base_y_train.shape

(36000, 6)

In [None]:
# We will test the quality of the features learned on the n2 classes with few examples
num_samples_per_class = 5

target_classes = range(n1, 10)
idx_train = np.isin(full_y_train, target_classes)
idx_test = np.isin(full_y_test, target_classes)

target_x_train, target_y_train = full_x_train[idx_train], full_y_train[idx_train]

all_selected_indices = []
for class_label in target_classes:
    # Indices of all instances of the current class
    class_indices = np.where(target_y_train == class_label)[0]

    # Randomly choose num_samples_per_class indices from this class
    chosen_indices = np.random.choice(class_indices, num_samples_per_class, replace=False)

    # Append these indices to the list
    all_selected_indices.extend(chosen_indices)  # Use extend or +

# Use the selected indices to build the small dataset for the target task
target_x_train, target_y_train = target_x_train[all_selected_indices], target_y_train[all_selected_indices]

target_x_test, target_y_test = full_x_test[idx_test], full_y_test[idx_test]

# Convert labels to one-hot encoding
target_y_train = to_categorical(target_y_train - n1, num_classes=n2)
target_y_test = to_categorical(target_y_test - n1, num_classes=n2)
# For binary (class 8 vs 9), we could also use:
#target_y_train = 0+(target_y_train == 8)   #also works: (target_y_train == 8).astype(int)

target_y_train.shape

(20, 4)

In [None]:
idx_train

array([ True, False, False, ..., False, False, False])

In [None]:
target_x_train.shape

(20, 28, 28, 1)

In [None]:
from tensorflow.keras import layers, models

input_shape = (28, 28, 1)

model = models.Sequential([
    layers.Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=input_shape),
    layers.MaxPooling2D(pool_size=(2, 2)),
    layers.Conv2D(64, (3, 3), activation='relu'),
    layers.MaxPooling2D((2, 2)),
    layers.Flatten(),
    layers.Dense(64, activation='relu'),    #What we see on lines 11 and 12 is an MLP that uses the features conv-layers extracted
    layers.Dense(n1, activation='softmax')
])

model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

2024-04-09 00:53:37.448962: I metal_plugin/src/device/metal_device.cc:1154] Metal device set to: Apple M1 Pro
2024-04-09 00:53:37.448984: I metal_plugin/src/device/metal_device.cc:296] systemMemory: 16.00 GB
2024-04-09 00:53:37.448990: I metal_plugin/src/device/metal_device.cc:313] maxCacheSize: 5.33 GB
2024-04-09 00:53:37.449022: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:306] Could not identify NUMA node of platform GPU ID 0, defaulting to 0. Your kernel may not have been built with NUMA support.
2024-04-09 00:53:37.449037: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:272] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 0 MB memory) -> physical PluggableDevice (device: 0, name: METAL, pci bus id: <undefined>)


In [None]:
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 26, 26, 32)        320       
                                                                 
 max_pooling2d (MaxPooling2  (None, 13, 13, 32)        0         
 D)                                                              
                                                                 
 conv2d_1 (Conv2D)           (None, 11, 11, 64)        18496     
                                                                 
 max_pooling2d_1 (MaxPoolin  (None, 5, 5, 64)          0         
 g2D)                                                            
                                                                 
 flatten (Flatten)           (None, 1600)              0         
                                                                 
 dense (Dense)               (None, 64)                1

In [None]:
# Train the model
model.fit(base_x_train, base_y_train, epochs=5, validation_split=0.2)

# Evaluate the new model on the test set
print('\nNot the focus of transfer learning, but a good accuracy here shows learned base features can be good:')
test_loss, test_acc = model.evaluate(base_x_test, base_y_test)
print(f"Base-task test loss: {test_loss}")
print(f"Base-task test accuracy: {test_acc}")

# Alternative to "evaluate", we can use the model to predict the classes of the test set and then calculate the accuracy
predictions = model.predict(base_x_test)
# The predictions are in the form of probabilities for each class.
predicted_classes = np.argmax(predictions, axis=1)
true_classes = np.argmax(base_y_test, axis=1)
# Calculate accuracy by comparing predicted and true classes
accuracy = np.mean(predicted_classes == true_classes)
print(f"Alternative way of computing accuracy: {accuracy}")

Epoch 1/5
  1/900 [..............................] - ETA: 4:34 - loss: 1.7887 - accuracy: 0.1250

2024-04-09 00:53:37.825197: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:117] Plugin optimizer for device_type GPU is enabled.


Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5

Not the focus of transfer learning, but a good accuracy here shows learned base features can be good:
Base-task test loss: 0.1828967034816742
Base-task test accuracy: 0.9383333325386047
Alternative way of computing accuracy: 0.9383333333333334


In [None]:
# Extract features
# We can feed them to Decision Tree (instead of the dense layers forming an MLP in the original setup)
# We can feed them to a clustering algorithm as inputs for clustering never before seen examples and classes

# remove the MLP from the top (the two dense layers)
feature_extractor = models.Model(inputs=model.input, outputs=model.layers[-3].output)
features_target_train = feature_extractor.predict(target_x_train) #get features for the training set of the target task
features_target_test = feature_extractor.predict(target_x_test) #get features for the test set of the target task



In [None]:
from sklearn.tree import DecisionTreeClassifier

# Train the decision tree using the features learned from the base task
tree = DecisionTreeClassifier()
tree.fit(features_target_train, np.argmax(target_y_train, axis=1))  # Fit to the max class index

# Evaluate the decision tree
accuracy = tree.score(features_target_test, np.argmax(target_y_test, axis=1))
print(f"Target-task Decision Tree Accuracy (using features learned from the base): {accuracy}")

Target-task Decision Tree Accuracy (using features learned from the base): 0.756


In [None]:
# Use the raw features for training/testing the decision tree

num_input_channels = np.prod(input_shape)

# Instantiate a new decision tree and train on the small dataset without knowledge-transfer (from scratch)
tree = DecisionTreeClassifier()
tree.fit(target_x_train.reshape(-1, num_input_channels), np.argmax(target_y_train, axis=1))  # Fit to the max class index

# Evaluate the decision tree
accuracy = tree.score(target_x_test.reshape(-1, num_input_channels), np.argmax(target_y_test, axis=1))
print(f"Target-task Decision Tree Accuracy using raw features: {accuracy}")

Target-task Decision Tree Accuracy using raw features: 0.78475


In [None]:
# Learned features can be used as input to a standalone MLP as well

# New model using the extracted features for n2-class classification problem
new_model = models.Sequential([
   # layers.Dense(16, activation='relu', input_shape=(features_target_train.shape[1],)),
    layers.Dense(n2, activation='softmax')  #For n2=1 can also be replaced by (1, 'sigmoid') and binary_crossentropy
])

# Train the new model on the target-task
new_model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
new_model.fit(features_target_train, target_y_train, epochs=100, verbose = 0)

# Evaluate the new model on the test set
test_loss, test_acc = new_model.evaluate(features_target_test, target_y_test)

print(f"Standalone MLP Test loss: {test_loss}")
print(f"Standalone MLP Test accuracy: {test_acc}")

Standalone MLP Test loss: 0.3319225013256073
Standalone MLP Test accuracy: 0.8945000171661377


In [None]:
new_model.summary()

Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_2 (Dense)             (None, 4)                 6404      
                                                                 
Total params: 6404 (25.02 KB)
Trainable params: 6404 (25.02 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [None]:
# Start fine-tuning

# Create a new classifier-layer on top use unpacking of the layers of the model, shallow copied

# This will perform shallow copy of the base layers
# So, if there were complex connections in the base, then this should be used as it preserves the architecture
# as opposed to the unpacking method below

from tensorflow.keras import models, layers

# In Keras, layers and models (including submodels created with the Model constructor) are all callable objects.
# This means you can treat them like functions and pass input tensors to them to obtain output tensors.
truncated_pretrained_model = models.Model(inputs=model.inputs, outputs=model.layers[-3].output)

print(model.layers[0].trainable)

# Freeze all layers in the truncated_pretrained_model
for layer in truncated_pretrained_model.layers:
    layer.trainable = False

target_model = models.Sequential([
    truncated_pretrained_model,         # Use the submodel/functional at first, again a form of shallow copy of the layers
    layers.Dense(64, activation='relu'),
    layers.Dense(n2, activation='softmax')   #Randomly initialized, so we need some warming-up
])

# Compile the model
target_model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

target_model.fit(target_x_train, target_y_train, epochs=5)
# Evaluate the target_model on the test set
test_loss, test_acc = target_model.evaluate(target_x_test, target_y_test)

print(f"target_model test loss: {test_loss}")
print(f"target_model test accuracy: {test_acc}")

True
Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
target_model test loss: 0.5966358780860901
target_model test accuracy: 0.890250027179718


In [None]:
len(target_model.layers)
print(model.layers[0].trainable)

False


In [None]:
# Unfreeze all layers in the target_model, not recommended for small datasets
# Due to the shallow copy above, the base model will also change

# To re-enable training for the layers inside truncated_pretrained_model
for layer in truncated_pretrained_model.layers:
    layer.trainable = True

# To ensure the outer layers are trainable (if they were set to non-trainable for some reason), iterate over them as well
for layer in target_model.layers:
    layer.trainable = True

# Compile the model, because it will initialize the computation graph that contains learning rates, momentums etc.
target_model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

target_model.fit(target_x_train, target_y_train, epochs=50)  #"model" will also be updated

# Evaluate the target_model on the test set
test_loss, test_acc = target_model.evaluate(target_x_test, target_y_test)

print(f"Retrained target_model test loss: {test_loss}")
print(f"Retrained target_model test accuracy: {test_acc}")

print(target_model.layers[0]==model.layers[0])
weights_model = model.layers[0].get_weights()
weights_target_model = target_model.layers[0].get_weights()
if np.array_equal(weights_model[0], weights_target_model[0]):
    print("Weights of the first layers are the same.")
else:
    print("Weights of the first layers are different.")

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50
Retrained target_model test loss: 0.560497522354126
Retrained target_model test accuracy: 0.8794999718666077
False
Weights of the first layers are the same.


In [None]:
target_model.layers[1].weights[1].shape

TensorShape([64])

In [None]:
# Unpacking approach is not preferred

target_model = models.Sequential([
    *model.layers[:-2],
    layers.Dense(64, activation='relu'),
    layers.Dense(n2, activation='softmax')
])

# Freeze all layers coming from the base model
for layer in target_model.layers[:-2]:
    layer.trainable = False

# Compile the model
target_model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

target_model.fit(target_x_train, target_y_train, epochs=5)
# Evaluate the target_model on the test set
test_loss, test_acc = target_model.evaluate(target_x_test, target_y_test)

print(f"target_model test loss: {test_loss}")
print(f"target_model test accuracy: {test_acc}")

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
target_model test loss: 0.4182673990726471
target_model test accuracy: 0.8697500228881836


In [None]:
# Copy options: saving/loading the model, cloning, and serialization/deserialization.

from tensorflow.keras.models import clone_model, Model
from tensorflow.keras.layers import Input, Dense

cloned_model = clone_model(model)
cloned_model.set_weights(model.get_weights())

model_input = Input(shape=input_shape)
x = model_input
print(x.shape)
for layer in cloned_model.layers[:-2]:
    x = layer(x)
    print(x.shape)
x = Dense(64, activation='relu')(x)
print(x.shape)
new_output = Dense(n2, activation='softmax')(x)
print(new_output.shape)
target_model = Model(inputs=model_input, outputs=new_output)

for layer in target_model.layers[:-2]:
    layer.trainable = True

target_model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

target_model.fit(target_x_train, target_y_train, epochs=5, verbose=0)
test_loss, test_acc = target_model.evaluate(target_x_test, target_y_test)
print(f"target_model test loss: {test_loss}")
print(f"target_model test accuracy: {test_acc}")

(None, 28, 28, 1)
(None, 26, 26, 32)
(None, 13, 13, 32)
(None, 11, 11, 64)
(None, 5, 5, 64)
(None, 1600)
(None, 64)
(None, 4)
target_model test loss: 0.35482150316238403
target_model test accuracy: 0.9024999737739563


In [None]:
np.argmax(target_model(np.random.rand(20,28,28,1)), axis=1)

array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])

In [None]:
# In a way, cloning approaches work like this:
# Manually replicate the architecture of the original model and copy the weights of corresponding layers.

target_model = models.Sequential([
    layers.Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=input_shape),
    layers.MaxPooling2D(pool_size=(2, 2)),
    layers.Conv2D(64, (3, 3), activation='relu'),
    layers.MaxPooling2D((2, 2)),
    layers.Flatten(),
    layers.Dense(64, activation='relu'),
    layers.Dense(n2, activation='softmax')
])

for i, layer in enumerate(model.layers[:-2]):
    target_model.layers[i].set_weights(layer.get_weights())
    target_model.layers[i].trainable = False

target_model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

target_model.fit(target_x_train, target_y_train, epochs=5, verbose=0)
test_loss, test_acc = target_model.evaluate(target_x_test, target_y_test)
print(f"target_model test loss: {test_loss}")
print(f"target_model test accuracy: {test_acc}")

target_model test loss: 0.4635031223297119
target_model test accuracy: 0.890749990940094


In [None]:
# Using the same simple approach outlined above, we will explore popular pre-trained CNN models for computer vision
# - AlexNet: The 2012 milestone that reignited deep learning, building on CNN foundations laid by the Cognitron in 1979.
# - VGG (VGG16, VGG19): Known for its simplicity, using very small convolution filters.
# - ResNet (ResNet50, ResNet101, ResNet152): Introduced residual connections to enable training of very deep networks.
# - Inception (InceptionV3, Inception-ResNet): Uses multiple kernel sizes at the same convolutional layer.
# - MobileNet (MobileNetV1, MobileNetV2, MobileNetV3): Designed for mobile and embedded vision applications, focusing on efficiency.

import tensorflow as tf

# Load pre-trained ResNet50 model without the top (classification) layers
resnet = tf.keras.applications.ResNet50(
    include_top=False,  # Exclude the top (classification) layers
    weights='imagenet',  # Load pre-trained ImageNet weights
    input_shape=(224, 224, 3)  # Specify the input shape of the images
)

# Summary of the loaded ResNet50 model
resnet.summary()

Model: "resnet50"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_2 (InputLayer)        [(None, 224, 224, 3)]        0         []                            
                                                                                                  
 conv1_pad (ZeroPadding2D)   (None, 230, 230, 3)          0         ['input_2[0][0]']             
                                                                                                  
 conv1_conv (Conv2D)         (None, 112, 112, 64)         9472      ['conv1_pad[0][0]']           
                                                                                                  
 conv1_bn (BatchNormalizati  (None, 112, 112, 64)         256       ['conv1_conv[0][0]']          
 on)                                                                                       

In [None]:
# Resize your datasets to 224x224 for ResNet
from skimage.transform import resize
def resize_dataset(images):
    # Initialize the target array
    target_shape = (images.shape[0], 224, 224, 3)
    resized_images = np.zeros(target_shape)
    for i in range(images.shape[0]):
        # Resize each image
        resized_image = resize(images[i], (224, 224), mode='constant', anti_aliasing=True)
        # Convert grayscale to RGB by repeating the channels
        resized_images[i] = np.repeat(resized_image, 3, axis=-1)
    return resized_images

target_x_train_resized = resize_dataset(target_x_train)
target_x_test_resized = resize_dataset(target_x_test)

In [None]:
target_x_train.shape

(20, 28, 28, 1)

In [None]:
target_x_train_resized.shape

(20, 224, 224, 3)

In [None]:
# Normalize the resized training and testing datasets to match the preprocessing
# used for ResNet50 during its training on ImageNet. This usually involves scaling
# the pixel values to a range the network expects based on its pretraining dataset.

from tensorflow.keras.applications.resnet50 import preprocess_input

# Assuming target_x_train_resized is numpy array
target_x_train_resized = preprocess_input(target_x_train_resized * 255) #As if coming from the image file, 0-255
target_x_test_resized = preprocess_input(target_x_test_resized * 255)

In [None]:
from tensorflow.keras import models, layers

target_model = models.Sequential([
    resnet,
    layers.GlobalAveragePooling2D(), #or alternatively adjust upper layers: layers.Flatten() or other Dense() layers
    #layers.Flatten(),
    #layers.Dense(16, activation='sigmoid'),
    layers.Dense(n2, activation='softmax')
])

for layer in resnet.layers:
    layer.trainable = False

# We could adjust the learning rate
optimizer = tf.keras.optimizers.legacy.Adam(learning_rate=1e-3)
target_model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])

target_model.fit(target_x_train_resized, target_y_train, epochs=10)
# Evaluate the target_model on the test set
test_loss, test_acc = target_model.evaluate(target_x_test_resized, target_y_test)

print(f"target_model test loss: {test_loss}")
print(f"target_model test accuracy: {test_acc}")

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
target_model test loss: 0.3583078682422638
target_model test accuracy: 0.903249979019165
