<a href="https://colab.research.google.com/github/robert-pineau/CIND-860-Capstone/blob/main/CIND860_TL_RESNET50_full_augment_cc_only_evaluate_only_fullset_test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

CIND860: W24: Advanced Data Analytics Project
Robert M. Pineau
941-049-371

In [7]:
import sys
import os
import glob
import re
from google.colab import drive
import numpy as np
import math
import random
os.environ['CUDA_VISIBLE_DEVICES'] = "0"
import tensorflow as tf
import keras
from keras import models

device_names = tf.config.list_physical_devices('GPU')
print(device_names)

if tf.test.gpu_device_name() != "/device:GPU:0":
  print("raise SystemError('GPU device not found')")
print('Found GPU at: {}'.format(device_names))


[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]
Found GPU at: [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


In [8]:
#Data Generator, this is used to provide to the training(fit) method
#one batch of data at a time.  This is needed because all of the data cannot fit into memory at the same
#time, so it needs to be loaded by the batch.
#
#This uses a custom class for this, with expected standard methods __init__, __len__, and __getitem__
#The __init__ method simply sets some internal values us batch size, overall lengths, etc.
#
#The __len__ method returns the number of batches.
#The __getitem__  method loads from file the appropriatte batch: X(the image), and Y(the class variable) and
#returns it to the fit method.
class DataGenerator(keras.utils.Sequence):
  def __init__(self, cnn_use, numpy_dir, total_size, batch_size=12):
    self.cnn_use = cnn_use
    self.total_size = total_size
    self.batch_size = batch_size
    self.numpy_dir = numpy_dir

  def __len__(self):
    return int(self.total_size / self.batch_size)

  def __getitem__(self, subset_index):
    X = np.load(f"{self.numpy_dir}/{self.cnn_use}_data_X_{subset_index}.npy")
    Y = np.load(f"{self.numpy_dir}/{self.cnn_use}_data_Y_{subset_index}.npy")
    return X, Y


#This method work very similarly to __getitem__ in the dataGenerator of the training script.
#but is standalone, not part of the DataGenerator Class.  It is used to be
#able to extract the test and validate data itself in order to perform
#manual checks on the algorithms performance(using predict, instead of evaluate)

def get_validate_X_Y_Z(numpy_dir, subset_index):
    X = np.load(f"{numpy_dir}/validate_data_X_{subset_index}.npy")
    Y = np.load(f"{numpy_dir}/validate_data_Y_{subset_index}.npy")
    Z = np.load(f"{numpy_dir}/validate_data_Z_{subset_index}.npy")
    return X, Y, Z

def get_test_X_Y_Z(numpy_dir, subset_index):
    X = np.load(f"{numpy_dir}/test_data_X_{subset_index}.npy")
    Y = np.load(f"{numpy_dir}/test_data_Y_{subset_index}.npy")
    Z = np.load(f"{numpy_dir}/test_data_Z_{subset_index}.npy")
    return X, Y, Z


In [9]:
#Previously all training, validate, and test data was loaded from image files and saved in numpy format in batches of 12.
#This grabs those numpy files from my google drive and copies them to the runtime drive for this session.
#Do it this way the "dataGenerator" is able to load the data from the local machine to save time during training, validation, and testing.
!date
drive.mount('/content/drive',force_remount=True)
my_dir = "attempt9"
remote_image_dir = f"/content/drive/MyDrive/Colab Notebooks/{my_dir}"
remote_numpy_dir = f"{remote_image_dir}"
glob_string = f"{remote_numpy_dir}/*.npy"
local_numpy_dir = "/tmp/rpineau_numpy"
os.system(f"mkdir {local_numpy_dir}")

os.system(f"cd \"{remote_numpy_dir}\" && cp full_test.tgz {local_numpy_dir}/test.tgz")
#os.system(f"cd \"{remote_numpy_dir}\" && cp validate.tgz {local_numpy_dir}/validate.tgz")
os.system(f"cd \"{remote_numpy_dir}\" && cp keras_cnn_model_RESNET50.keras {local_numpy_dir}/keras_cnn_model.keras")
os.system(f"cd \"{local_numpy_dir}\" && /usr/bin/tar -zxpvf test.tgz")
#os.system(f"cd \"{local_numpy_dir}\" && /usr/bin/tar -zxpvf validate.tgz")

!date

Fri Apr 12 04:22:46 PM UTC 2024
Mounted at /content/drive
Fri Apr 12 04:34:16 PM UTC 2024


In [10]:
#Need to count the entries contained in all the numpy files.
#Z contains the original image_id.
#Since Z is easy to load, only count Z.

these_counts = {"train":0,"validate":0,"test":0}

#for i in ["train","validate","test"]:
for i in ["test"]:
  glob_string = f"{local_numpy_dir}/{i}_data_Z_*.npy"
  np_list = glob.glob(os.path.join("", glob_string))
  for n in np_list:
    thisZ = np.load(n)
    these_counts[i] += len(thisZ)

print(f"{these_counts}")

{'train': 0, 'validate': 0, 'test': 54036}


In [11]:
#Initialize the test  DataGenerator(as explained above in the class definition)
test_generator = DataGenerator("test",local_numpy_dir,these_counts["test"],batch_size=12)
#validate_generator = DataGenerator("validate",local_numpy_dir,these_counts["validate"],batch_size=12)

In [12]:
model_file = f"{local_numpy_dir}/keras_cnn_model.keras"
model = keras.models.load_model(model_file)

In [13]:
#val_loss,val_acc = model.evaluate(validate_generator)
#print(f"MODEL1: Val Loss: {np.round(val_loss,3)} Val Accuracy: {np.round(val_acc*100,3)}%")

test_loss,test_acc = model.evaluate(test_generator)
print(f"MODEL1: Test Loss: {np.round(test_loss,3)} Test Accuracy: {np.round(test_acc*100,3)}%")

MODEL1: Test Loss: 0.711 Test Accuracy: 65.256%


In [None]:
this_dir = f"{local_numpy_dir}"

Y_test_all = []
Y_all = []
Z_all = []

for i in range(0,int(these_counts["test"]/12)):
  X,Y,Z = get_test_X_Y_Z(this_dir,i)
  Y_test = model.predict(X, verbose=0)
  Y_test = np.round(Y_test).flatten()
  Y_test_all = np.append(Y_test_all,Y_test,axis=0)
  Y_all = np.append(Y_all,Y,axis=0)
  Z_all = np.append(Z_all,Z,axis=0)

n=0
for y in Y_test_all:
  print(f"Predicted Y is {Y_test_all[n]} Actual Y is {Y_all[n]} from file {Z_all[n]}")
  n = n+1

In [15]:
print("CORRECT PREDS")
n=0
for y in Y_all:
  if Y_test_all[n] == Y_all[n]:
    #pass
    print(f"Predicted Y is {Y_test_all[n]} Actual Y is {Y_all[n]} File {Z_all[n]}")
  n = n+1

print("INCORRECT PREDS")
n=0
for y in Y_all:
  if Y_test_all[n] != Y_all[n]:
    #pass
    print(f"Predicted Y is {Y_test_all[n]} Actual Y is {Y_all[n]} File {Z_all[n]}")
  n = n+1


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Predicted Y is 0.0 Actual Y is 0.0 File /mnt/wd/CIND860/database/test6/1986957375.png
Predicted Y is 0.0 Actual Y is 0.0 File /mnt/wd/CIND860/database/test6/1813226588.png
Predicted Y is 0.0 Actual Y is 0.0 File /mnt/wd/CIND860/database/test6/17065562.png
Predicted Y is 0.0 Actual Y is 0.0 File /mnt/wd/CIND860/database/test6/1782837244.png
Predicted Y is 0.0 Actual Y is 0.0 File /mnt/wd/CIND860/database/test6/1986667926.png
Predicted Y is 0.0 Actual Y is 0.0 File /mnt/wd/CIND860/database/test6/1110971857.png
Predicted Y is 0.0 Actual Y is 0.0 File /mnt/wd/CIND860/database/test6/527468376.png
Predicted Y is 0.0 Actual Y is 0.0 File /mnt/wd/CIND860/database/test6/910866531.png
Predicted Y is 0.0 Actual Y is 0.0 File /mnt/wd/CIND860/database/test6/1721908966.png
Predicted Y is 0.0 Actual Y is 0.0 File /mnt/wd/CIND860/database/test6/1862605458.png
Predicted Y is 0.0 Actual Y is 0.0 File /mnt/wd/CIND860/database/test6/81919121

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



In [None]:
def calc_confusion_values(Y,Y_test):
  TP = 0
  TN = 0
  FP = 0
  FN = 0
  n = 0
  for y in Y:
    if Y[n] == 1 and Y_test[n] == 1:
      TP = TP+1
    elif Y[n] == 0 and Y_test[n] == 1:
      FP = FP+1
    elif Y[n] == 0 and Y_test[n] == 0:
      TN = TN+1
    elif Y[n] == 1 and Y_test[n] == 0:
      FN = FN+1
    n = n+1

  cf = np.empty([2,2])
  cf[0,0] = TN
  cf[0,1] = FP
  cf[1,0] = FN
  cf[1,1] = TP
  return(cf)


#        TPR (recall) = TP / P
#        ACC (accuracy) = (TP + TN) / (P + N)
#        PPV (precison) = TP / (TP + FP)
def calc_recall(cf):
  TN, FP, FN, TP=cf.ravel()
  return(TP/(TP+FN))

def calc_precision(cf):
  TN, FP, FN, TP=cf.ravel()
  return(TP/(TP+FP))

def calc_accuracy(cf):
  TN, FP, FN, TP=cf.ravel()
  return((TP+TN)/(TP+FP+TN+FN))

In [17]:
cf=calc_confusion_values(Y_all, Y_test_all)
TN, FP, FN, TP=cf.ravel()

print("Confusion Matrix:")
print(cf)
print("")
print("TP: ", TP,", FP: ", FP,", TN: ", TN,", FN:", FN)

recall = round(calc_recall(cf),3)
precision = round(calc_precision(cf),3)
accuracy = round(calc_accuracy(cf)*100,3)

print(f"Recall: {recall} Precison: {precision} Accuracy: {accuracy}%")


Confusion Matrix:
[[34888. 18323.]
 [  451.   374.]]

TP:  374.0 , FP:  18323.0 , TN:  34888.0 , FN: 451.0
Recall: 0.453 Precison: 0.02 Accuracy: 65.256%
