In [1]:
# ========================================================
# To train and test a classifier using Transfer Learning.
# =======================================================

#--- Import necessary modules from Python libraries.
import tensorflow as tf
from tensorflow.keras.datasets import cifar10
from tensorflow.keras.applications import vgg16, mobilenet
from tensorflow.keras.models import Model, load_model
from tensorflow.keras import layers
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.metrics import Accuracy, Precision, Recall, AUC
import matplotlib.pyplot as plt
import numpy as np
import cv2, os, pickle
import csv
from sklearn.model_selection import train_test_split
import keras
# from my_classes import DataGenerator

2024-11-25 16:25:12.542897: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-11-25 16:25:12.551876: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1732530312.560668  169057 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1732530312.563285  169057 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-11-25 16:25:12.581326: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instr

In [2]:
class DataGenerator(keras.utils.Sequence):
    'Generates data for Keras'
    def __init__(self, list_IDs, labels, batch_size=32, dim=(32,32,32), n_channels=1,
                 n_classes=10, shuffle=True):
        'Initialization'
        self.dim = dim
        self.batch_size = batch_size
        self.labels = labels
        self.list_IDs = list_IDs
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.shuffle = shuffle
        self.on_epoch_end()

    def __len__(self):
        'Denotes the number of batches per epoch'
        return int(np.floor(len(self.list_IDs) / self.batch_size))

    def __getitem__(self, index):
        'Generate one batch of data'
        # Generate indexes of the batch
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]

        # Find list of IDs
        list_IDs_temp = [self.list_IDs[k] for k in indexes]

        # Generate data
        X, y = self.__data_generation(list_IDs_temp)

        return X, y

    def on_epoch_end(self):
        'Updates indexes after each epoch'
        self.indexes = np.arange(len(self.list_IDs))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)

    def __data_generation(self, list_IDs_temp):
        'Generates data containing batch_size samples' # X : (n_samples, *dim, n_channels)
        # Initialization
        X = np.empty((self.batch_size, *self.dim, self.n_channels))
        y = np.empty((self.batch_size), dtype=int)

        # Generate data
        for i, ID in enumerate(list_IDs_temp):
            # Store sample
            X[i,] = np.load('data/' + ID + '.npy')

            # Store class
            y[i] = self.labels[ID]

        return X, keras.utils.to_categorical(y, num_classes=self.n_classes)

In [3]:
#--- Fixed terms
WORKING_DIR = '/home/mursalin/m3c/computer-vision/task/'  
IMG_SIZE = 32
NUM_CHANNEL = 3
EARLY_STOP_PATIENCE = 50
LR_REDUCE_PATIENCE = 10
LR_REDUCE_FACTOR = 0.8 #--- new_lr = old_lr * LR_REDUCE_FACTOR
NUM_CLASSES = 10
WARMUP_EPOCHS = 1
EPOCHS = 5
BATCH_SIZE = 32

In [4]:
def resize_images(images):
    # Resize images to (224, 224, 3) as required by the VGG16 model
    resized_images = np.zeros((images.shape[0], IMG_SIZE, IMG_SIZE, 3), dtype=np.uint8)
    for i in range(images.shape[0]):
        resized_images[i] = cv2.resize(images[i], (IMG_SIZE, IMG_SIZE))
    return resized_images

In [5]:
def process_data():
    # Load CIFAR-10 dataset
    (trainX, trainY), (testX, testY) = cifar10.load_data()

    #--- Turn 3D image dataset into 4D dataset for Conv2D layers
    print('trainX.shape: {}, trainX.dtype: {}'.format(trainX.shape, trainX.dtype))
    print('testX.shape: {}, testX.dtype: {}'.format(testX.shape, testX.dtype))
    
    # resize data
    trainX = resize_images(trainX)
    testX = resize_images(testX)
    print('trainX.shape: {}, trainX.dtype: {}'.format(trainX.shape, trainX.dtype))
    print('testX.shape: {}, testX.dtype: {}'.format(testX.shape, testX.dtype))

    #--- Preprocess imageset according to the preprocess procedure of pre-trained model
    trainX = vgg16.preprocess_input(trainX)
    testX = vgg16.preprocess_input(testX)
    print('trainX.shape: {}, trainX.dtype: {}'.format(trainX.shape, trainX.dtype))
    print('testX.shape: {}, testX.dtype: {}'.format(testX.shape, testX.dtype))
    
    # Convert labels to 1D arrays
    trainY = trainY.flatten()
    testY = testY.flatten()
    
    # Combine train and test sets for a complete split
    X = np.concatenate((trainX, testX), axis=0)
    y = np.concatenate((trainY, testY), axis=0)
    
    # Split data into train (80%), validation (10%), and test (10%)
    trainX, tempX, trainY, tempY = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
    valX, testX, valY, testY = train_test_split(tempX, tempY, test_size=0.5, random_state=42, stratify=tempY)
    
    # Directory to save .npy files
    data_dir = "data/"
    os.makedirs(data_dir, exist_ok=True)
    
    # Save train, validation, and test images as .npy files
    for i in range(len(trainX)):
        np.save(os.path.join(data_dir, f"train_{i}.npy"), trainX[i])
    
    for i in range(len(valX)):
        np.save(os.path.join(data_dir, f"val_{i}.npy"), valX[i])
    
    for i in range(len(testX)):
        np.save(os.path.join(data_dir, f"test_{i}.npy"), testX[i])
    
    # Prepare list_IDs and labels for the generator
    train_IDs = [f"train_{i}" for i in range(len(trainX))]
    val_IDs = [f"val_{i}" for i in range(len(valX))]
    test_IDs = [f"test_{i}" for i in range(len(testX))]
    
    labels = {f"train_{i}": trainY[i] for i in range(len(trainY))}
    labels.update({f"val_{i}": valY[i] for i in range(len(valY))})
    labels.update({f"test_{i}": testY[i] for i in range(len(testY))})
    
    # Parameters for DataGenerator
    params = {
        'dim': (IMG_SIZE, IMG_SIZE),
        'batch_size': BATCH_SIZE,
        'n_classes': NUM_CLASSES,
        'n_channels': NUM_CHANNEL,
        'shuffle': True
    }

    # Create DataGenerators
    training_generator = DataGenerator(train_IDs, labels, **params)
    validation_generator = DataGenerator(val_IDs, labels, **params)
    testing_generator = DataGenerator(test_IDs, labels, **params)
    
    # Test the generator
    X_train, y_train = training_generator[0]  # Fetch the first training batch
    X_val, y_val = validation_generator[0]  # Fetch the first validation batch
    X_test, y_test = testing_generator[0]  # Fetch the first testing batch
    
    print(f"Train Batch X shape: {X_train.shape}, y shape: {y_train.shape}")
    print(f"Validation Batch X shape: {X_val.shape}, y shape: {y_val.shape}")
    print(f"Test Batch X shape: {X_test.shape}, y shape: {y_test.shape}")
        
    #--- Cross check
    # plt.imshow(trainX[0])
    # plt.title(trainY[0])
    # plt.show()
    # plt.close()
    
    return training_generator, validation_generator, testing_generator

In [6]:
def build_model():
	#--- Load a pre-trained backbone
	base_model = vgg16.VGG16(include_top = False, weights = 'imagenet', input_shape = (IMG_SIZE, IMG_SIZE, NUM_CHANNEL))
	base_model.summary(show_trainable = True)
		
	#--- Build a new model based on loaded backbone
	inputs = base_model.input
	x = base_model.output
	x = layers.Flatten()(x)
	x = layers.Dense(128, activation = 'relu')(x)
	x = layers.Dense(64, activation = 'relu')(x)	
	outputs = layers.Dense(10, activation = 'softmax')(x)
	model = Model(inputs, outputs)
	
	#--- Compile model
	model.compile(optimizer='adam', loss = 'categorical_crossentropy', metrics = ['accuracy'])
	
	return model

In [7]:
def save_model_performance(performance_path, history):
	#--- Save history into a dictionary
	hist_dict = history.history
	with open(performance_path + 'PerformanceDict.pkl', 'wb') as f:
		pickle.dump(hist_dict, f)

	#--- Plot progress graphs
	# Plot loss
	x_axis = np.arange(len(hist_dict['loss']))
	plt.rcParams.update({'font.size': 22})
	plt.figure(figsize = (20, 20))
	plt.plot(x_axis, hist_dict['loss'], 'k.--', linewidth = 2, markersize = 12)
	plt.plot(x_axis, hist_dict['val_loss'], 'g*--', linewidth = 2, markersize = 12)
	plt.xlabel('Loss')
	plt.ylabel('Epoch')
	plt.title('Training and Validation Loss')
	plt.xticks(rotation = 90)
	plt.legend(['training_loss', 'validation_loss'])
	plt.savefig(performance_path + 'Loss.jpg')
	plt.close()

	# Plot accuracy
	metric = 'accuracy'
	plt.rcParams.update({'font.size': 22})
	plt.figure(figsize = (20, 20))
	plt.plot(x_axis, hist_dict[metric], 'k.--', linewidth = 2, markersize = 12)
	plt.plot(x_axis, hist_dict['val_' + metric], 'g*--', linewidth = 2, markersize = 12)
	plt.xlabel('Accuracy')
	plt.ylabel('Epoch')
	plt.title('Training and Validation Accuracy')
	plt.xticks(rotation = 90)
	plt.legend(['training_' + metric, 'validation_' + metric])
	plt.savefig(performance_path + metric + '.jpg')
	plt.close()


In [8]:
def train_classifier(storage_dir, train_data, val_data):
    #--- Build model
    model = build_model()
    model.summary(show_trainable = True)
    
    #--- Freez backbone
    for layer in model.layers[:-5]:
        layer.trainable = False
    model.summary(show_trainable = True)
    
    # Train the model with data generator
    model.fit(
        train_data,
        validation_data=val_data,
        epochs=WARMUP_EPOCHS
    )
    
    #--- Unfreez some Convolutional layers of backbone for fine-tuning
    for layer in model.layers[-7:-5]:
        layer.trainable = True
    model.summary(show_trainable = True)	
    
    #--- Callbacks
    # model_path = storage_dir + 'VGG16_Classifier.weights.h5'
    model_path = storage_dir + 'VGG16_Classifier.weights.keras'
    callbacks = [
        ModelCheckpoint(model_path, monitor = "val_loss", mode = 'min', save_best_only = True, save_weights_only = False),
        EarlyStopping(monitor = "val_loss", mode = 'min', patience = EARLY_STOP_PATIENCE),
        ReduceLROnPlateau(monitor = "val_loss", mode = 'min', factor = LR_REDUCE_FACTOR, patience = LR_REDUCE_PATIENCE)
    ]


    # Train the model with data generator
    hist = model.fit(
        train_data,
        validation_data=val_data,
        epochs=EPOCHS, 
        callbacks=callbacks
    )
    
    #--- Save history
    performance_path = storage_dir + 'TrainVal_'
    save_model_performance(performance_path, hist)
    
    return hist

In [15]:
def test_classifier(storage_dir, test_data):
    #--- Load trained model
    # model = build_model()	
    # model_path = storage_dir + 'VGG16_Classifier.weights.h5'
    model_path = storage_dir + 'VGG16_Classifier.weights.keras'
    # model_weights = model.load_weights(model_path)
    model = load_model(model_path)
    
    #--- Compile model when we need metrics not mentioned while training
    model.compile(optimizer='adam', loss = 'categorical_crossentropy', metrics = ['accuracy', Precision()])
    
    #--- Predict model's output
    predictedY = np.argmax(model.predict(test_data), axis = -1)

    # print("ok")
    
    # Example: Compare predicted and true labels
    # #--- Predict and collect outputs
    # predictedY = []
    # trueY = []

    # for X_batch, y_batch in test_data:
    #     # Make predictions for the current batch
    #     batch_predictions = np.argmax(model.predict(X_batch, verbose=0), axis=-1)  # Predicted labels
        
    #     # Convert one-hot encoded true labels to integer labels
    #     if len(y_batch.shape) == 2:  # For one-hot encoded labels
    #         batch_true = np.argmax(y_batch, axis=-1)
    #     else:  # For integer labels
    #         batch_true = y_batch.flatten()

    #     predictedY.extend(batch_predictions)
    #     trueY.extend(batch_true)

    # # Convert lists to numpy arrays for easier processing
    # predictedY = np.array(predictedY)
    # trueY = np.array(trueY)

    # #--- Display comparison of actual vs predicted for a subset
    # print("\nActual vs Predicted Output (Sample):")
    # print("===================================")
    # num_samples = min(20, len(trueY))  # Display up to 20 samples
    # for i in range(num_samples):
    #     print(f"Actual: {trueY[i]:<10} Predicted: {predictedY[i]:<10}")

    print(len(test_data))
    
    # int_testY = []
    # i = 0
    # for _, y in test_data:  # Iterate over all batches in the test generator
    #     for j in range(y.shape[0]):
    #         int_testY.append(np.argmax(y[0], axis=-1))
    #     # # y_array = np.array(y)  # Ensure `y` is a numpy array
    #     # int_testY.extend(np.argmax(y, axis=-1))  # Convert to integer labels
    #     # print('ok-1')
    # # int_testY = np.concatenate(int_testY)  # Combine all batches into a single array
    # #     if i == 0:
    # #         print(y)
    # #         i += 1
    # #     else:
    # #         continue
    # #     # int_testY.append(np.argmax(y, axis=-1))  # Get true labels
    # # int_testY = np.array(int_testY)

    print('ok')
    # int_testY = np.argmax(testY, axis = -1)
    # n = predictedY.shape[0]
    
    # n = 10
    # print('Original_Y 	Predicted_Y')
    # print('========== 	===========')	
    # for i in range(n):
    #     print('{}                 {}'.format(int_testY[i], predictedY[i]))
    
    #--- Evaluate model performance
    test_metrics = model.evaluate(test_data)
    
    return test_metrics

In [16]:
#--- Create a directory to store model and figures
storage_dir = WORKING_DIR + 'datagen/' 
if (os.path.exists(storage_dir) == False):
    os.makedirs(storage_dir)
else:
    print(storage_dir + ' exists.')
    
#--- Prepare data
train_data, val_data, test_data = process_data()

#--- Train a classifier using Transfer learning
history = train_classifier(storage_dir, train_data, val_data)

#--- Test trained classifier
test_metrics = test_classifier(storage_dir, test_data)


/home/mursalin/m3c/computer-vision/task/datagen/ exists.
trainX.shape: (50000, 32, 32, 3), trainX.dtype: uint8
testX.shape: (10000, 32, 32, 3), testX.dtype: uint8
trainX.shape: (50000, 32, 32, 3), trainX.dtype: uint8
testX.shape: (10000, 32, 32, 3), testX.dtype: uint8
trainX.shape: (50000, 32, 32, 3), trainX.dtype: float32
testX.shape: (10000, 32, 32, 3), testX.dtype: float32
Train Batch X shape: (32, 32, 32, 3), y shape: (32, 10)
Validation Batch X shape: (32, 32, 32, 3), y shape: (32, 10)
Test Batch X shape: (32, 32, 32, 3), y shape: (32, 10)


  self._warn_if_super_not_called()


[1m1500/1500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m14s[0m 7ms/step - accuracy: 0.4601 - loss: 2.2985 - val_accuracy: 0.6051 - val_loss: 1.1397


Epoch 1/5
[1m1500/1500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 7ms/step - accuracy: 0.6506 - loss: 1.0183 - val_accuracy: 0.6462 - val_loss: 1.0518 - learning_rate: 0.0010
Epoch 2/5
[1m1500/1500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 7ms/step - accuracy: 0.6840 - loss: 0.9130 - val_accuracy: 0.6649 - val_loss: 1.0147 - learning_rate: 0.0010
Epoch 3/5
[1m1500/1500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 6ms/step - accuracy: 0.7125 - loss: 0.8295 - val_accuracy: 0.6521 - val_loss: 1.0597 - learning_rate: 0.0010
Epoch 4/5
[1m1500/1500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 6ms/step - accuracy: 0.7346 - loss: 0.7657 - val_accuracy: 0.6589 - val_loss: 1.0407 - learning_rate: 0.0010
Epoch 5/5
[1m1500/1500[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 7ms/step - accuracy: 0.7552 - loss: 0.6949 - val_accuracy: 0.6516 - val_loss: 1.0862 - learning_rate: 0.0010


  saveable.load_own_variables(weights_store.get(inner_path))
  self._warn_if_super_not_called()


[1m187/187[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 6ms/step
187
ok
[1m187/187[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 7ms/step - accuracy: 0.6681 - loss: 0.9875 - precision_3: 0.7507
