<a href="https://colab.research.google.com/github/jett220201/AI_Project/blob/main/Models/ResNet50V2_Fine_tunning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Solid waste sorter (PUJ) using ResNet50V2

- Juan Esteban Torres Tamayo
- Janner Arley Rosero Mora


**Import libraries**


In [None]:
%matplotlib notebook
#Only activate if you're using Jupyter Notebook

In [None]:
import os
import cv2
import glob
import random
import sklearn
import numpy as np
import tensorflow as tf
import tensorflow_hub as hub
import tensorflow.keras as kr
import matplotlib.pyplot as plt
from sklearn.metrics import auc
from sklearn.metrics import roc_curve
from keras.models import Model, Sequential
from matplotlib.collections import EventCollection
from sklearn.model_selection import train_test_split
from keras.layers import Dense, Flatten, Conv2D, MaxPooling2D


#Load and split dataset

In [None]:
# Read input images and assign labels based on folder names
print(os.listdir("data/"))

Control parameters

In [None]:
Seed_0 = random.randint( 1,2**32 - 1 )
N_epochs = 20
Batch_size = 64
LR = 0.001
ImageSize = 224  #Size in pixels 
Shuffle = True

In [None]:
#Capture training data and labels into respective lists
images = []
labels = [] 

for directory_path in glob.glob("data/*"):
    label = directory_path.split("\\")[-1]
    print(label)
    for img_path in glob.glob(os.path.join(directory_path, "*.jpg")):
        img = cv2.imread(img_path, cv2.IMREAD_COLOR)       
        img = cv2.resize(img, (ImageSize, ImageSize))
        images.append(img)
        labels.append(label)

#Convert lists to arrays        
images = np.array(images)
labels = np.array(labels)
  

In [None]:
#Spliting Dataset
X_train, X_test, y_train, y_test = train_test_split(images, labels, test_size=0.2, random_state=42)

#Encode labels from text to integers.
from sklearn import preprocessing
le = preprocessing.LabelEncoder()
le.fit(y_test)
test_labels_encoded = le.transform(y_test)
le.fit(y_train)
train_labels_encoded = le.transform(y_train)
#print(test_labels_encoded)
print(train_labels_encoded)
len(train_labels_encoded)

Y_train = train_labels_encoded
Y_test = test_labels_encoded

#0 = Aprovechable
#1 = No_Aprovechable

#Import ResNetV2 and train model

In [None]:
# Import ResNet V2 and freeze all the layers
ResNetV2 = hub.KerasLayer("https://tfhub.dev/google/imagenet/resnet_v2_50/feature_vector/5", 
           trainable=False, 
           arguments=dict(batch_norm_momentum=0.997))

ResNetV2.build([None, ImageSize, ImageSize, 3])

In [None]:
# Generating path to save weights per epoch 
checkpoint_path = "training_1/cp-{epoch:d}.ckpt"
checkpoint_dir = os.path.dirname(checkpoint_path)

# Create a callback that saves the model's weights
cp_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath = checkpoint_path, 
    verbose = 1, 
    save_weights_only = True,
    save_freq = 'epoch')

In [None]:
# Create a new model and use ResNetV2 with fine-tuning method
model = kr.Sequential([
    
    kr.layers.Rescaling( 1./255, input_shape = ( ImageSize, ImageSize, 3 ) ),
    ResNetV2,
    kr.layers.Dense( units = 256, activation = 'relu' ),
    kr.layers.BatchNormalization(),
    kr.layers.Dense( units = 1, activation = 'sigmoid' )
])

model.summary() #Summary of the architecture

# Compile model with Adam as optimizer an BinaryCrossEntropy as loss function
model.compile(optimizer = tf.keras.optimizers.Adam( learning_rate = LR ),
              loss = 'binary_crossentropy',
              metrics = ['accuracy', 'AUC', tf.keras.metrics.TruePositives(), tf.keras.metrics.TrueNegatives(), tf.keras.metrics.FalsePositives(), tf.keras.metrics.FalseNegatives()])

# Train model with callbacks
history = model.fit(x = X_train,
                    y = Y_train,
                    batch_size = Batch_size,
                    epochs = N_epochs,
                    verbose = 1,
                    shuffle = Shuffle,
                    validation_data = (X_test, Y_test),
                    callbacks=[cp_callback])


#Calculate Confusion Matrix, Specificity, Recall, MCC, F1 Score, AUC and ROC curve



In [None]:
history.history #Show elements of history  

In [None]:
# Get values of TN, FP, FN, TP for the confusion matrix
FN = history.history['val_false_negatives_4'] # For each compile the number plus by 
TN = history.history['val_true_negatives_4']  # 1 -> false_negatives_1,2,3....
FP = history.history['val_false_positives_4']
TP = history.history['val_true_positives_4']
print('-Values of Confusion Matrix-')
print('False Negatives: ', FN)
print('True Negatives: ', TN)
print('False Positives: ', FP)
print('True Positives: ', TP,'\n')

# Show Confusion Matrix
print('-Confusion Matrix-')
for i in range(N_epochs):
    Matrix = np.zeros((2,2))
    Matrix[0][0] = TN[i]
    Matrix[0][1] = FP[i]
    Matrix[1][0] = FN[i]
    Matrix[1][1] = TP[i]
    print(Matrix,'\n')


#Calculate F1 Score
F1 = []
for i in range(N_epochs):
    try:
        temp = ( ( 2*TP[i] ) / ( 2*TP[i] + FP[i] + FN[i] ) )
    except:
        temp = 0
    F1.append(temp)
print('-Result for F1 Score-')
print(F1,'\n')


#Calculate MCC
MCC = []
for i in range(N_epochs):
    try:
        temp = ( (TP[i]*TN[i]) - (FP[i]*FN[i]) ) / ( np.sqrt( (TP[i]+FP[i])*(TP[i]+FN[i])*(TN[i]+FP[i])*(TN[i]+FN[i]) ) )
    except:
        temp = 0
    MCC.append(temp)
print('-Result for MCC-')
print(MCC,'\n')


#Calculate Recall - Validation
Recall = []
for i in range(N_epochs):
    try:
        temp = ( TP[i] / ( TP[i] + FN[i] ) )
    except:
        temp = 0
    Recall.append(temp)
print('-Result for Recall-')
print(Recall,'\n')


#Calculate Specificity - Validation
Specificity = []
for i in range(N_epochs):
    try:
        temp = ( TN[i] / ( TN[i] + FP[i] )  )
    except:
        temp = 0
    Specificity.append(temp)
print('-Result for Specificity-')
print(Specificity,'\n')

In [None]:
#Show Validation metrics
xdata = [ x for x in range(N_epochs) ]
fig1 = plt.figure('Metrics')
plt.plot(history.history['val_accuracy'].index(max(history.history['val_accuracy'])), max(history.history['val_accuracy']), marker = 'o', markerfacecolor = 'red', markersize = 12)
plt.plot(xdata, Recall, color = 'c', linewidth = 2,linestyle='dashed')
plt.plot(xdata, Specificity, color = 'm',linewidth = 2,linestyle='dashed')
plt.plot(xdata, MCC, color = 'b',linewidth = 2.5)
plt.plot(xdata, F1, color = 'r',linewidth = 2.5)
plt.plot(xdata, history.history['val_auc'], color = 'tab:orange',linewidth = 2.5)
plt.plot(xdata, history.history['val_accuracy'], color = 'tab:olive',linewidth = 2.5)
plt.legend(['Best score','Recall','Specificity','MCC','F1','AUC','Accuracy'])
plt.title('Validation metrics')
plt.xlabel('Epochs')
plt.ylim(0, 1)

In [None]:
#Show accuracy metrics
fig2 = plt.figure('Accuracy')
plt.plot(history.history['val_accuracy'].index(max(history.history['val_accuracy'])), max(history.history['val_accuracy']), marker = 'o', markerfacecolor = 'red', markersize = 12)
plt.plot(xdata, history.history['accuracy'], color = 'tab:green',linewidth = 2.5)
plt.plot(xdata, history.history['val_accuracy'], color = 'tab:olive',linewidth = 2.5)
plt.legend(['Best Score','Accuracy','val_accuracy'])
plt.title('Accuracy vs val_accuracy')
plt.xlabel('Epochs')
plt.ylim(0, 1)
plt.show()

In [None]:
print("Best ACC_val = ", max(history.history['val_accuracy']))
print( "Epoch = ",history.history['val_accuracy'].index(max(history.history['val_accuracy'])) + 1 )

In [None]:
#Calculating AUC - ROC curve
Y_tf = model.predict(X_test).ravel()
fpr_keras, tpr_keras, thresholds_keras = roc_curve(Y_test, Y_tf)
auc_keras = auc(fpr_keras, tpr_keras)

In [None]:
#Show ROC-AUC Curve
plt.figure("ROC-AUC Curve")
plt.plot([0, 1], [0, 1], 'k--')
plt.plot(fpr_keras, tpr_keras, label='ResNet 50 V2 (area = {:.3f})'.format(auc_keras), color = 'tab:orange')
plt.xlabel('False positive rate')
plt.ylabel('True positive rate')
plt.title('ROC curve')
plt.legend(loc='best')
plt.show()

# Zoom in view of the upper left corner.
plt.figure('Zoomed / Recovered model')
plt.xlim(0, 0.4)
plt.ylim(0.6, 1)
plt.plot([0, 1], [0, 1], 'k--')
plt.plot(fpr_keras, tpr_keras, label='ResNet 50 V2 (area = {:.3f})'.format(auc_keras),color = 'tab:orange')
plt.xlabel('False positive rate')
plt.ylabel('True positive rate')
plt.title('ROC curve (zoomed in at top left)')
plt.legend(loc='best')

In [None]:
print('=======================================')
print("|            Best Scores               |")
print('=======================================\n')
print('        Epoch :        {}'.format(1 + history.history['val_accuracy'].index(max(history.history['val_accuracy']))))
print('        Val_Accuracy : {:.4f}'.format(history.history['val_accuracy'][history.history['val_accuracy'].index(max(history.history['val_accuracy']))]))
print('        Recall :       {:.4f}'.format(Recall[history.history['val_accuracy'].index(max(history.history['val_accuracy']))]))
print('        Specificity :  {:.4f}'.format(Specificity[history.history['val_accuracy'].index(max(history.history['val_accuracy']))]))
print('        AUC :          {:.4f}'.format(history.history['val_auc'][history.history['val_accuracy'].index(max(history.history['val_accuracy']))]))
print('        F1 Score :     {:.4f}'.format(F1[history.history['val_accuracy'].index(max(history.history['val_accuracy']))]))
print('        MCC :          {:.4f}'.format(MCC[history.history['val_accuracy'].index(max(history.history['val_accuracy']))]))
print('        Seed:         ',Seed_0)

#Printing best result of training 

|            Best Scores               |

        Epoch :        6
        Val_Accuracy : 0.8710
        Recall :       0.8350
        Specificity :  0.9086
        AUC :          0.9187
        F1 Score :     0.8687
        MCC :          0.7446
        Seed:          4987606742


#Export model for storage


In [None]:
#Save model 
model.save('Model_1/ResNet50V2.h5')


#Load model

In [None]:
#Load the saved model again
new_model = tf.keras.models.load_model( 'Model_1/ResNet50V2.h5', custom_objects={'KerasLayer':hub.KerasLayer}, compile=True, options=None )

#Show the model architecture
new_model.summary()

In [None]:
#Setting Best weights saved
BestWeights = 'training_1/cp-{}.ckpt'.format(history.history['val_accuracy'].index(max(history.history['val_accuracy'])) + 1 )
#Loading Best weights saved on new_model
model.load_weights(BestWeights)

In [None]:
#Verify metrics
# Evaluate the restored model
loss, acc, aucscore, tp, tn, fp, fn = model.evaluate(X_test, Y_test, verbose=2)
print('Restored model, accuracy: {:5.2f}%'.format(100 * acc))


In [None]:
#Calculating AUC - ROC curve / Restored model
Y_tf = model.predict(X_test).ravel()
fpr_keras, tpr_keras, thresholds_keras = roc_curve(Y_test, Y_tf)
auc_keras = auc(fpr_keras, tpr_keras)



In [None]:
#Show ROC-AUC Curve - Recovered model
plt.figure("ROC-AUC Curve - Recovered model")
plt.plot([0, 1], [0, 1], 'k--')
plt.plot(fpr_keras, tpr_keras, label='ResNet 50 V2 Restored model (area = {:.3f})'.format(auc_keras), color = 'tab:orange')
plt.xlabel('False positive rate')
plt.ylabel('True positive rate')
plt.title('ROC curve')
plt.legend(loc='best')
plt.show()

# Zoom in view of the upper left corner.
plt.figure('Zoomed')
plt.xlim(0, 0.4)
plt.ylim(0.6, 1)
plt.plot([0, 1], [0, 1], 'k--')
plt.plot(fpr_keras, tpr_keras, label='ResNet 50 V2 Restored model(area = {:.3f})'.format(auc_keras),color = 'tab:orange')
plt.xlabel('False positive rate')
plt.ylabel('True positive rate')
plt.title('ROC curve (zoomed in at top left)')
plt.legend(loc='best')

In [None]:
#Show predicted vs real labels
index = 0
for i in Y_tf:
    if i >= 0.5:
        Y_tf[index] = int(1)
    else:
        Y_tf[index] = int(0)
    index += 1
print("                           *** Real Labels ***\n")
print(Y_test,'\n')
print("                         *** Predicted Labels ***\n")
print( Y_tf)

In [None]:
#Validate real ACC of restored model
index = 0
tp_val = 0
for c in Y_test:
    if Y_test[index] == Y_tf[index]:
        tp_val += 1
    index += 1
print('  Manual ACC_Val:   {:.2f}%'.format(100*tp_val/len(Y_test)))