# <center>Feature Classifier (using VGG16 or ResNet50)<center>

- This file aims to replicate the process done in [*Parkinson Disease Identification using Residual Networks and OPF* by Passos et al.](../Literature/dataset-papers/Passos_etal_22.pdf)
- Use pretrained models to extract features from dataset 
- This is done by importing pretrained model (either VGG16 or ResNet50) and excluding the top layers (fully connected layers).
- After the extracted features are caculated, can run these through as inputs through a global average pooling layer and FC layer to get classification output.


In [1]:
# import libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
import warnings
warnings.filterwarnings('ignore')

import cv2
import PIL

# import ML/DL libraries
from sklearn.model_selection import train_test_split
from sklearn import utils # used to shuffle data

from keras.preprocessing.image import ImageDataGenerator # used for image augmentation


import tensorflow as tf
# used for building and training a new model
from keras import Sequential
from keras.layers import Conv2D, MaxPool2D, Flatten, Dense, Dropout, GlobalAveragePooling2D
from keras.applications import VGG16, ResNet50

# import functions from other python files
from code_files.imagePreprocessing import * 

In [2]:
import os
print(os.environ['CONDA_DEFAULT_ENV'])
!pip install pandas

ENV_ML


In [3]:
# import images (and labels) and store in dataframe
#data_path = 'datasets/folador_spiral/'  #'datasets/folador_spiral/'  
data_path = 'datasets/handPD_HT/'

trainImgs = pd.DataFrame()
testImgs  = pd.DataFrame()
trainArray = []
testArray  = []


for dataType in os.listdir(data_path):
    img_path = []
    lbl = []
    lblName = []
    for group in os.listdir(os.path.join(data_path, dataType)):
        for img in os.listdir(os.path.join(data_path, dataType, group)):
            path = os.path.join(data_path, dataType, group, img)
            img_path.append(path) 

            # convert the image and store as a matrix
            drawing = cv2.imread(path)
            drawing = cv2.resize(drawing, (256,256))

            if dataType == 'test':
                testArray.append(drawing)
            else:
                trainArray.append(drawing)

            # store the labels
            if group == 'healthy':
                lbl.append(0)
                lblName.append('healthy')
            else:
                lbl.append(1)
                lblName.append('parkinsons')

    if dataType == 'train':
        trainLbls = lbl
        trainImgs['image'] = img_path
        trainImgs['label'] = lblName
    else:
        testLbls = lbl
        testImgs['image'] = img_path
        testImgs['label'] = lblName

# shuffle the data
trainImgs, trainArray, trainLbls = utils.shuffle(trainImgs, trainArray, trainLbls)
# testImgs, testArray, testLbls = utils.shuffle(testImgs, testArray, testLbls)

# convert labels to categorical for training model
trainLbls_categorical = tf.keras.utils.to_categorical(trainLbls)
print("Lables of first 5 images: \n", trainLbls_categorical[0:5])

ValueError: Length of values (161) does not match length of index (19)

In [None]:
# display first five images
print("Lables of first train 5 images: ", trainLbls[0:5])
trainImgs.head()

In [None]:
print("Test labels: ", testLbls)
display(testImgs)

## <center> Extract features using pretrained model <center>

In [None]:
# create the convolutional base network for VGG16
# output of this will be feature vectors for each image
VGG16_conv_base = VGG16(weights='imagenet', include_top=False,input_shape=(256,256,3)) # setting include_top=False removes the fully connected layers of the model
VGG16_conv_base.summary()

In [None]:
# define a function that will extract the features from conv network
def extract_features(imgs, num_imgs):
    datagen = ImageDataGenerator(rescale=1./255) # define to rescale pixels in image
    batch_size = 10
    
    features = np.zeros(shape=(num_imgs, 8,8,512)) # shape equal to output of convolutional base
    lbls = np.zeros(shape=(num_imgs,2))

    # preprocess data
    generator = datagen.flow_from_dataframe(imgs, x_col = 'image', y_col='label', target_size=(256,256), class_mode='categorical', batch_size=batch_size)

    # Pass data through convolutional base
    i = 0
    for inputs_batch, labels_batch in generator:
        features_batch = VGG16_conv_base.predict(inputs_batch)
        features[i * batch_size: (i + 1) * batch_size] = features_batch
        lbls[i * batch_size: (i + 1) * batch_size] = labels_batch
        i += 1
        if i * batch_size >= num_imgs:
            break
    return features, lbls

In [16]:
# extract features for both the trainImgs and testImgs
#train_feat, train_lbls = extract_features(trainImgs, 240)
test_feat, test_lbls = extract_features(testImgs, 24)

Found 12 validated image filenames belonging to 2 classes.


ValueError: could not broadcast input array from shape (2,8,8,512) into shape (10,8,8,512)

In [14]:
var1 = train_feat; var2 = train_lbls
print(type(var1), type(var2))
print(np.shape(var1), np.shape(var2))

<class 'numpy.ndarray'> <class 'numpy.ndarray'>
(240, 8, 8, 512) (240, 2)


In [None]:
# evaluate on VGG16 classifier (using cross validation)
# define a function that will fit the model
def defineModel(size): # size is the dimension of the last layer in the pretrained model
    model = Sequential()
    model.add(GlobalAveragePooling2D(input_shape=(size,size,512)))
    # global average pooling is used instead of fully connected layers on top of the feature maps
    # it takes the average of each feature map and the resulting layer is fed directly into the softmax layer
    model.add(Dense(2, activation='softmax'))
    # model.summary()

    opt = tf.keras.optimizers.Adam(learning_rate=1e-3)  # use the Adam optimizer and set an effective learning rate 
    model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['accuracy'])
    return model

In [None]:
model = defineModel(8)
model.summary()

In [None]:
# train the model using cross validation
# will start with k-fold cross validation, taking 80% as training each fold

# define model checkpoint callback
model_chkpt = tf.keras.callbacks.ModelCheckpoint('20221007_VGG16_kfold_folDS_spiral_2.h5', verbose=0, save_best_only=True)

def fit_and_evaluate(train_feat, train_lbls, val_feat, val_lbls, test_feat, test_lbls, epochs):
    model = None
    model = defineModel(8)
    trained_model = model.fit(train_feat, train_lbls, batch_size=10, epochs=epochs, validation_data=(val_feat, val_lbls), callbacks=model_chkpt, verbose=0)

    # testScore = model.evaluate(test_feat, test_lbls)
    return trained_model

In [None]:
# train with k-fold validation
model_history = []
epochs = 250

num_val_samples = int(np.ceil(len(trainArray) * 0.20))
k = int(np.floor(len(trainArray) / num_val_samples))

for i in range(k):
    print("Training on fold K = ", i+1)
    startPt = i * num_val_samples
    endPt   = (i+1) * num_val_samples

    val_x = train_feat[startPt:endPt]
    val_y = train_lbls[startPt:endPt]
    train_x = np.delete(train_feat, np.linspace(startPt, endPt-1, num_val_samples).astype(np.int), axis=0)
    train_y = np.delete(train_lbls, np.linspace(startPt, endPt-1, num_val_samples).astype(np.int), axis=0)

    model_history.append(fit_and_evaluate(train_x, train_y, val_x, val_y, test_feat, test_lbls, epochs=epochs))
    # print(model_history)
    
    print("======="*12, end="\n")
    

In [None]:
# plot the accuracy and loss functions for each fold
color = ['blue', 'black', 'red', 'green','orange', 'cyan', 'grey', 'yellow', 'fuchsia']
f, ax = plt.subplots(2, k, figsize=(35,6))
for i in range(k):
    ax[0][i].plot(model_history[i].history['accuracy'], label='train acc', color=color[i])
    ax[0][i].plot(model_history[i].history['val_accuracy'], label='val acc', linestyle= ':', color=color[i])
    ax[0][i].axis([-10,epochs, .2, 1.1])
    ax[0][i].legend()

    subplot_title = 'k = ' + str(i+1)
    ax[0][i].title.set_text(subplot_title)

for i in range(k):
    ax[1][i].plot(model_history[i].history['loss'], label='train loss', color=color[i])
    ax[1][i].plot(model_history[i].history['val_loss'], label='val loss', linestyle= ':', color=color[i])
    ax[1][i].axis([-10,epochs, .0, 1.1])
    ax[1][i].legend()


In [None]:
# ---------------------------------------------------------------------------------------
#                             LOAD PRE-EXISTING MODEL MODEL
# ---------------------------------------------------------------------------------------
def importModel(filename, testAug, testAugLabel):
    modelPath = 'savedModels/saved_h5_models/' + filename
    testModel = tf.keras.models.load_model(modelPath)

    loss, acc = testModel.evaluate(np.array(testAug), testAugLabel, verbose=2)
    print("Loss: ", loss, "| Accuracy: ", acc)

    return testModel

# load existing model and evaluate the test data
testmodel = importModel('20221007_VGG16_kfold_handPD.h5', test_feat, test_lbls)

In [None]:
def plotMisclassImgs(testModel, test_feat, test_label, test_array):
    test_label = np.array(test_label)
    incorrectImgs = []
    incorrectImgIdx = []

    count = 0
    fig, axes = plt.subplots(3, 8, figsize=(20,8))
    axes = axes.flatten()
    for img, ax in zip(test_array, axes):
        ax.imshow(np.squeeze(img), cmap="gray") # plot image

        # use the model to predict the label
        predImg = testModel.predict(np.expand_dims(test_feat[count], axis=0), verbose=0) # use for grayscale
        # predImg = testModel.predict(test_feat[count])                       # use for RGB
        predLabel = np.argmax(predImg[0])       
        
        if test_label[count] != predLabel:
            ax.set_title('Label: ' + str(test_label[count]) + ' | Pred: ' + str(predLabel), color='red')
            # save off image to array
            incorrectImgs.append(test_array[count])
            incorrectImgIdx.append(count)
        else:
            ax.set_title('Label: ' + str(test_label[count]) + ' | Pred: ' + str(predLabel), color = 'blue')  

        count = count + 1
        
    plt.tight_layout()
    plt.show()

    return np.array(incorrectImgs), np.array(incorrectImgIdx), testModel.predict(test_feat)

In [None]:
# plot the results
misClass_test, misClass_idx, predictions = plotMisclassImgs(testmodel, test_feat, np.argmax(test_lbls, axis=1), testArray)
# print(predictions)

In [None]:
# -----------------------------------------
# -----------------------------------------
# -----------------------------------------
# -----------------------------------------
# other option for displaying accuracy and loss - plot them all overlaid
plt.figure(figsize=(10,10))
plt.title('Accuracies vs Epochs')
for i in range(k):
    plt.plot(model_history[i].history['accuracy'], label=i, color=color[i])
    plt.plot(model_history[i].history['val_accuracy'], label=i, linestyle= 'dashdot', color=color[i])
plt.legend()
plt.show()

plt.figure(figsize=(15,15))
plt.title('Loss vs Epochs')
for i in range(k):
    plt.plot(model_history[i].history['loss'], label=i, color=color[i])
    plt.plot(model_history[i].history['val_loss'], label=i, linestyle= 'dashdot', color=color[i])
plt.legend()
plt.show()
