# Transfer learning - ConvNeXtBase
This notebook includes experiments done for the purpose of the first homework due for the Artificial Neural Networks & Deep Learning course at Politecnico di Milano. Here the focus is on applying _Transfer Learning_ using _ConvNeXtBase_ as pre-trained network.



# Homework 1
The first homework asked to design and train a CNN to make it perform a binary image classification task, starting from a labelled dataset of healhty and unhealthy plants. After being trained, the network should have been able to make predictions on the health status of a plant, given as input a picture of it.




## Connect to Google Drive




In [None]:
from google.colab import drive
drive.mount('/gdrive')
%cd /gdrive/My Drive/[2023-2024] AN2DL/Homework1

## Import libraries

In [43]:
# Fix randomness and hide warnings
seed = 42

import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
os.environ['PYTHONHASHSEED'] = str(seed)
os.environ['MPLCONFIGDIR'] = os.getcwd()+'/configs/'

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=Warning)

import numpy as np
np.random.seed(seed)

import logging

import random

In [44]:
# Import tensorflow
import tensorflow as tf
from tensorflow import keras as tfk
from tensorflow.keras import layers as tfkl
tf.autograph.set_verbosity(0)
tf.get_logger().setLevel(logging.ERROR)
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)
tf.random.set_seed(seed)
tf.compat.v1.set_random_seed(seed)
print(tf.__version__)

2.14.0


In [45]:
# Import other libraries
import cv2
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, confusion_matrix
from sklearn.cluster import DBSCAN
from sklearn.preprocessing import StandardScaler
from collections import Counter
from sklearn.utils import class_weight
import pandas as pd
import seaborn as sns
from collections import defaultdict

## Load data

In [55]:
# Import the public dataset
public_data = np.load('public_data.npz', allow_pickle=True)

# Get the 3-dimensional numpy array of samples
X_train_val = public_data['data']

# Get the 1-dimensional numpy array of labels
y_train_val = public_data['labels']

# Expand the dimension of labels
y_train_val = np.expand_dims(y_train_val, axis=-1)

## Inspect data

In [None]:
print("Training Data Shape:", X_train_val.shape)
print("Training Validation Data Shape", y_train_val.shape)

In [57]:
# Normalize data to the range [0, 1]
X_train_val = X_train_val.astype("float32")/255.

In [None]:
# Display a sample of images from the training-validation dataset
num_img = 10
fig, axes = plt.subplots(1, num_img, figsize=(20, 20))

# Iterate through the selected number of images
for i in range(num_img):
    idx = np.random.randint(len(X_train_val))
    ax = axes[i % num_img]
    ax.imshow(X_train_val[idx])
    ax.set_title('{}'.format(y_train_val[idx][0]))

# Adjust layout and display the images
plt.tight_layout()
plt.show()

## Remove outliers
To remove outliers from the dataset, a clustering algorithm is applied.


More in detail, _DBSCAN_ (Density-Based Spatial Clustering of Applications with Noise) is the algorithm responsible of identifying clusters in the space of data points.


_DBSCAN_ makes use of two parameters:

*   `eps`: the maximum distance between a couple of data points in order for them to be considered part of the same neighbourhood
*   `min_samples`: the minimum number of data points required to form a dense region


In [59]:
# Flatten training-validation aset
X_flattened = np.array(X_train_val).reshape(len(X_train_val), -1)

# Standardize the flattened data using StandardScaler
scaler = StandardScaler()
X_normalized = scaler.fit_transform(X_flattened)

# Apply DBSCAN clustering algorithm
scan = DBSCAN(eps=0.5, min_samples=5)
labels = scan.fit_predict(X_normalized)

# Count the occurrences of each cluster label
cluster_count = Counter(labels)

# Create a dictionary to store images and labels for each cluster
clustered_images = defaultdict(list)

# Iterate over all points and store images and labels based on their cluster
for i, label in enumerate(labels):
    clustered_images[label].append((X_train_val[i], y_train_val[i]))

# Identify the label of the dominant cluster
plant_cluster_label = max(cluster_count, key=cluster_count.get)

# Filter images belonging to the dominant cluster label
filtered_images = [image for i, image in enumerate(X_train_val) if labels[i] == plant_cluster_label]

# Filter labels corresponding to the filtered images
filtered_labels = [label for i, label in enumerate(y_train_val) if labels[i] == plant_cluster_label]

# Update X_train_val and y_train_val with filtered images and labels, respectively
X_train_val = np.array(filtered_images)
y_train_val = np.array(filtered_labels)

# Save other clusters
other_clusters = {label: [(image, label) for image, label in clustered_images[label]]
                  for label in clustered_images if label != plant_cluster_label}

In [None]:
# Print the number of samples per outlier cluster
for label, count in cluster_count.items():
  if label == plant_cluster_label:
    print(f"Cluster of plants: {count} samples")
  else:
    print(f"Cluster {1+label} of outliers: {count} samples")

# Print the shape of the cleaned training-validation dataset
print("Training Data Shape:", X_train_val.shape)
print("Training Validation Data Shape", y_train_val.shape)

In [None]:
# Display a sample of images from the cleaned training-validation dataset
num_img = 10
fig, axes = plt.subplots(1, num_img, figsize=(20, 20))

# Iterate through the selected number of images
for i in range(num_img):
    idx = np.random.randint(len(X_train_val))
    ax = axes[i % num_img]
    ax.imshow(X_train_val[idx])
    ax.set_title('{}'.format(y_train_val[idx][0]))

# Adjust layout and display the images
plt.tight_layout()
plt.show()

In [None]:
# Display 5 samples from outliers cluster 1 and 5 samples from outliers cluster 2
fig, axes = plt.subplots(2, 5, figsize=(20, 8))

# Display 5 samples from outliers cluster 1
outliers_cluster_1_data = other_clusters.get(0, [])[:5]  # Get data for outliers cluster 1
for i, (image, label) in enumerate(outliers_cluster_1_data):
    ax = axes[0, i]
    ax.imshow(image)
    ax.set_title(f'Outliers Cluster 1')

# Display 5 samples from outliers cluster 2
outliers_cluster_2_data = other_clusters.get(1, [])[:5]  # Get data for outliers cluster 2
for i, (image, label) in enumerate(outliers_cluster_2_data):
    ax = axes[1, i]
    ax.imshow(image)
    ax.set_title(f'Outliers Cluster 2')

# Adjust layout and display the images
plt.tight_layout()
plt.show()

## Generate test set
A test set is conditionally generated, depending on the value of a boolean variable.

Indeed the test set is meant to be used only locally. This is to accomplish an understanding of the model's perofrmance, even without having to test it on CodaLab.

However the local test set is meant to be removed before training the model to be uploaded on CodaLab, in order to let it learn from as many samples as possible, being the input dataset limited in size.

In [None]:
is_generate_test_set_on = False

if is_generate_test_set_on:
  # Split the cleaned dataset into training-validation and test sets
  X_train_val, X_test, y_train_val, y_test = train_test_split(X_train_val, y_train_val, random_state=seed, test_size=0.175, stratify=y_train_val)
  # Split the train-validation set into training and validation sets
  X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, random_state=seed, test_size=len(X_test), stratify=y_train_val)

  print("Test Data Shape:", X_test.shape)
  print("Test Label Shape:", y_test.shape)

  # Map 'healthy' values of labels to '1' and 'unhealthy' to '0'
  y_test = (y_test == 'unhealthy').astype(int)

else:
  # Split the clean dataset into training and validation sets
  X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, random_state=seed, test_size=0.175, stratify=y_train_val)

print("Training Data Shape:", X_train.shape)
print("Training Label Shape:", y_train.shape)
print("Validation Data Shape:", X_val.shape)
print("Validation Label Shape:", y_val.shape)

# Map 'healthy' values of labels to '1' and 'unhealthy' to '0'
y_val = (y_val == 'unhealthy').astype(int)
y_train = (y_train == 'unhealthy').astype(int)

## Introduce class weights

In [None]:
# Number of classes
classes = 2

# Initialize a numpy array for counting class occurences
elements_per_class = np.zeros(2, dtype='int')

# Total number of images in the train set
total_img = y_train.size

# Count the occurences of the target classes in the training set
for i in y_train:
  elements_per_class[i]+=1

proportions = {0: elements_per_class[0]/total_img, 1:elements_per_class[1]/total_img}
print('Proportion of classes: ', proportions)

# Initialize a numpy array for class weights
class_weights = np.zeros(2, dtype=float)

# Compute classe weights for each class
class_weights[0] = total_img/(classes*elements_per_class[0])
class_weights[1] = total_img/(classes*elements_per_class[1])

class_weights={0: class_weights[0], 1:class_weights[1]}
print('Class weigths: ', class_weights)

## Build model

In [66]:
# Define batch size, number of epochs, learning rate, input shape, and output shape
batch_size = 64
epochs = 200
learning_rate = 1e-3

input_shape = X_train.shape[1:]
output_shape = y_train.shape[-1]

# Print batch size, epochs, learning rate, input shape, and output shape
print(f"Batch Size: {batch_size}, Epochs: {epochs}, Learning Rate: {learning_rate}")
print(f"Input Shape: {input_shape}, Output Shape: {output_shape}")

Batch Size: 64, Epochs: 200, Learning Rate: 0.001
Input Shape: (96, 96, 3), Output Shape: 1


In [67]:
net = tfk.applications.ConvNeXtBase(
    input_shape=(96, 96, 3),
    include_top=False,
    weights="imagenet",
    pooling='avg',
)

In [68]:
def build_model(input_shape, output_shape, seed=seed):
  # Initialize random seed
  tf.random.set_seed(seed)

  # Define trnaformation used for data augmentation
  preprocessing = tf.keras.Sequential([
      tfkl.RandomFlip("horizontal_and_vertical", seed = seed),
      tfkl.RandomTranslation(0.1,0.1, seed = seed, fill_mode = "reflect"),
      tfkl.RandomRotation(0.3,seed = seed, fill_mode = "reflect"),
      tfkl.RandomZoom(0.2,seed = seed, fill_mode = "reflect")
    ], name='preprocessing')

  # Keep weigths freezed for each layer
  net.trainable = False

  input_layer = tfkl.Input(shape=input_shape, name='Input')

  # Apply data augmentation to the input
  preprocessing = preprocessing(input_layer)
  x = net(preprocessing)
  x= tfkl.Dropout(0.3, seed=seed)(x)
  output_layer = tfkl.Dense(units=output_shape, activation='sigmoid',name='Output', kernel_regularizer=tf.keras.regularizers.l2(2e-4))(x)

 # Create a Model connecting input and output
  model = tfk.Model(inputs=input_layer, outputs=output_layer, name='CNN')

 # Compile the model
  model.compile(loss=tfk.losses.BinaryCrossentropy(), optimizer=tfk.optimizers.Adam(learning_rate), metrics=['accuracy'])

  # Return model
  return model

In [None]:
model = build_model(input_shape, output_shape, seed)
model.summary()

## Train model

In [None]:
early_stopping = tfk.callbacks.EarlyStopping(monitor='val_accuracy', patience=10, mode='max', restore_best_weights=True)
LR_reduction =  tfk.callbacks.ReduceLROnPlateau(monitor="val_accuracy", factor=0.9, patience=2, min_lr=1e-5, mode='max')


# Train the model and save its history
history = model.fit(
    x=(X_train*255),
    y=y_train,
    batch_size=batch_size,
    epochs=epochs,
    class_weight = class_weights,
    validation_data=((X_val*255), y_val),
    callbacks=[early_stopping,LR_reduction]
).history

## Plot metrics

In [None]:
# Find the epoch with the highest validation accuracy
best_epoch = np.argmin(history['val_loss'])

# Plot training and validation performance metrics
plt.figure(figsize=(20, 5))

# Plot training and validation loss
plt.plot(history['loss'], label='Training', alpha=0.8, color='#ff7f0e', linewidth=3)
plt.plot(history['val_loss'], label='Validation', alpha=0.8, color='#4D61E2', linewidth=3)
plt.plot(np.argmin(history['val_loss']), history['val_loss'][np.argmin(history['val_loss'])], marker='*', alpha=0.8, markersize=10, color='#4D61E2')
plt.legend(loc='upper left')
plt.title('Binary Crossentropy')
plt.grid(alpha=0.3)

plt.figure(figsize=(20, 5))

# Plot training and validation accuracy, highlighting the best epoch
plt.plot(history['accuracy'], label='Training', alpha=0.8, color='#ff7f0e', linewidth=3)
plt.plot(history['val_accuracy'], label='Validation', alpha=0.8, color='#4D61E2', linewidth=3)
plt.plot(np.argmax(history['val_accuracy']), history['val_accuracy'][np.argmax(history['val_accuracy'])], marker='*', alpha=0.8, markersize=10, color='#4D61E2')
plt.legend(loc='upper left')
plt.title('Accuracy')
plt.grid(alpha=0.3)

plt.show()

## Make inference

In [72]:
if is_generate_test_set_on:
  # Predict labels for the entire test set
  predictions = model.predict(X_test*255, verbose=0)

  # Display the shape of the predictions
  print("Predictions Shape:", predictions.shape)

In [73]:
if is_generate_test_set_on:
  # Compute the confusion matrix
  threshold = 0.5
  binary_prediction = tf.where(predictions > threshold, 1, 0)
  cm = confusion_matrix(y_test, binary_prediction)

  # Compute classification metrics
  accuracy = accuracy_score(y_test, binary_prediction)
  precision = precision_score(y_test, binary_prediction, average='macro')
  recall = recall_score(y_test, binary_prediction, average='macro')
  f1 = f1_score(y_test, binary_prediction, average='macro')

  # Display the computed metrics
  print('Accuracy:', accuracy.round(4))
  print('Precision:', precision.round(4))
  print('Recall:', recall.round(4))
  print('F1:', f1.round(4))

  # Plot the confusion matrix
  plt.figure(figsize=(10, 8))
  sns.heatmap(cm.T, xticklabels=list(('healthy','unhealthy')), yticklabels=list(('healthy','unhealthy')), cmap='Blues', annot=True)
  plt.xlabel('True labels')
  plt.ylabel('Predicted labels')
  plt.show()

## Save model

In [28]:
save = False

if save:
  model.save("13_11_23_test_convnestbase_FT_2Dense")

## Load model

In [32]:
load = False

if load:
  model = tf.keras.models.load_model('13_11_23_test_convnestbase_FT_2Dense')

## Fine tuning

In [None]:
model.get_layer('convnext_base').trainable = True
for i, layer in enumerate(model.get_layer('convnext_base').layers):
   print(i, layer.name, layer.trainable)

In [None]:
# Freeze first N layers
N = 109
for i, layer in enumerate(model.get_layer('convnext_base').layers[:N]):
  layer.trainable=False
for i, layer in enumerate(model.get_layer('convnext_base').layers):
   print(i, layer.name, layer.trainable)
model.summary()

In [76]:
model.compile(loss=tfk.losses.BinaryCrossentropy(), optimizer=tfk.optimizers.Adam(7e-5), metrics='accuracy')     #### MODIFIED learning rate

In [None]:
early_stopping = tfk.callbacks.EarlyStopping(monitor='val_loss', patience=10, mode='min', restore_best_weights=True)
LR_reduction =  tfk.callbacks.ReduceLROnPlateau(monitor="val_accuracy", factor=0.1, patience=10, min_lr=1e-6, mode='max')

# Train the model and save its history
history = model.fit(
    x=(X_train*255),
    y=y_train,
    batch_size=batch_size,
    epochs=epochs,
    class_weight = class_weights,
    validation_data=((X_val*255), y_val),
    callbacks=[early_stopping,LR_reduction]
).history

### Plot metric fine tuning

In [None]:
# Find the epoch with the highest validation loss
best_epoch = np.argmin(history['val_loss'])

# Plot training and validation performance metrics
plt.figure(figsize=(20, 5))

# Plot training and validation loss
plt.plot(history['loss'], label='Training', alpha=0.8, color='#ff7f0e', linewidth=3)
plt.plot(history['val_loss'], label='Validation', alpha=0.8, color='#4D61E2', linewidth=3)
plt.plot(np.argmin(history['val_loss']), history['val_loss'][np.argmin(history['val_loss'])], marker='*', alpha=0.8, markersize=10, color='#4D61E2')
plt.legend(loc='upper left')
plt.title('Binary Crossentropy')
plt.grid(alpha=0.3)

plt.figure(figsize=(20, 5))

# Plot training and validation accuracy, highlighting the best epoch
plt.plot(history['accuracy'], label='Training', alpha=0.8, color='#ff7f0e', linewidth=3)
plt.plot(history['val_accuracy'], label='Validation', alpha=0.8, color='#4D61E2', linewidth=3)
plt.plot(np.argmax(history['val_accuracy']), history['val_accuracy'][np.argmax(history['val_accuracy'])], marker='*', alpha=0.8, markersize=10, color='#4D61E2')
plt.legend(loc='upper left')
plt.title('Accuracy')
plt.grid(alpha=0.3)

plt.show()

### Make inference after fine tuning

In [79]:
if is_generate_test_set_on:
  # Predict labels for the entire test set
  predictions = model.predict(X_test*255, verbose=0)

  # Display the shape of the predictions
  print("Predictions Shape:", predictions.shape)

In [81]:
if is_generate_test_set_on:
  # Compute the confusion matrix
  threshold = 0.5
  binary_prediction = tf.where(predictions > threshold, 1, 0)
  cm = confusion_matrix(y_test, binary_prediction)

  # Compute classification metrics
  accuracy = accuracy_score(y_test, binary_prediction)
  precision = precision_score(y_test, binary_prediction, average='macro')
  recall = recall_score(y_test, binary_prediction, average='macro')
  f1 = f1_score(y_test, binary_prediction, average='macro')

  # Display the computed metrics
  print('Accuracy:', accuracy.round(4))
  print('Precision:', precision.round(4))
  print('Recall:', recall.round(4))
  print('F1:', f1.round(4))

  # Plot the confusion matrix
  plt.figure(figsize=(10, 8))
  sns.heatmap(cm.T, xticklabels=list(('healthy','unhealthy')), yticklabels=list(('healthy','unhealthy')), cmap='Blues', annot=True)
  plt.xlabel('True labels')
  plt.ylabel('Predicted labels')
  plt.show()

### Save model

In [35]:
save = False

if save:
  model.save('Models/ConvNeXt')