In [69]:
# This script explores CNN model performance based on training image resolution.
# We test the following training image sizes:
# 64 x 64, 128 x 128, 224 x 224, 384 x 384

import numpy as np
from timeit import default_timer as timer
import matplotlib.pyplot as plt
import pydot
import random

import tensorflow as tf
from tensorflow.keras import datasets, layers, models
from tensorflow.keras.optimizers import SGD
from tensorflow.keras import backend as K
from tensorflow.keras.callbacks import Callback

        
from sklearn.metrics import recall_score, classification_report
from sklearn.datasets import make_classification

import json
import pickle
import os
import sys
sys.path.append("../python/")
from helpers import *

In [2]:
images_size64_exp5_Pr_Po_Im = np.load('../../data/tidy/preprocessed_images/size64_exp5_Pr_Po_Im.npy', allow_pickle=True)
images_size64_exp5_Pr_Im = np.load('../../data/tidy/preprocessed_images/size64_exp5_Pr_Im.npy', allow_pickle=True)
images_size64_exp5_PrPo_Im = np.load('../../data/tidy/preprocessed_images/size64_exp5_PrPo_Im.npy', allow_pickle=True)
images_size64_exp5_Pr_PoIm = np.load('../../data/tidy/preprocessed_images/size64_exp5_Pr_PoIm.npy', allow_pickle=True)

images_size128_exp5_Pr_Po_Im = np.load('../../data/tidy/preprocessed_images/size128_exp5_Pr_Po_Im.npy', allow_pickle=True)
images_size128_exp5_Pr_Im = np.load('../../data/tidy/preprocessed_images/size128_exp5_Pr_Im.npy', allow_pickle=True)
images_size128_exp5_PrPo_Im = np.load('../../data/tidy/preprocessed_images/size128_exp5_PrPo_Im.npy', allow_pickle=True)
images_size128_exp5_Pr_PoIm = np.load('../../data/tidy/preprocessed_images/size128_exp5_Pr_PoIm.npy', allow_pickle=True)

images_size224_exp5_Pr_Po_Im = np.load('../../data/tidy/preprocessed_images/size224_exp5_Pr_Po_Im.npy', allow_pickle=True)
images_size224_exp5_Pr_Im = np.load('../../data/tidy/preprocessed_images/size224_exp5_Pr_Im.npy', allow_pickle=True)
images_size224_exp5_PrPo_Im = np.load('../../data/tidy/preprocessed_images/size224_exp5_PrPo_Im.npy', allow_pickle=True)
images_size224_exp5_Pr_PoIm = np.load('../../data/tidy/preprocessed_images/size224_exp5_Pr_PoIm.npy', allow_pickle=True)

images_size384_exp5_Pr_Po_Im = np.load('../../data/tidy/preprocessed_images/size384_exp5_Pr_Po_Im.npy', allow_pickle=True)
images_size384_exp5_Pr_Im = np.load('../../data/tidy/preprocessed_images/size384_exp5_Pr_Im.npy', allow_pickle=True)
images_size384_exp5_PrPo_Im = np.load('../../data/tidy/preprocessed_images/size384_exp5_PrPo_Im.npy', allow_pickle=True)
images_size384_exp5_Pr_PoIm = np.load('../../data/tidy/preprocessed_images/size384_exp5_Pr_PoIm.npy', allow_pickle=True)

In [50]:
#NUM_CLASS = 2
NUM_CHANNELS = 1
SCENARIO_LIST = ["Pr_Po_Im", "Pr_Im", "PrPo_Im", "Pr_PoIm"]
RESOLUTION_PERFORMANCE_METRICS_DIR = '../../results/resolution-tests'

In [6]:
input_image_shape_64 = getImageShape(images_size64_exp5_Pr_Im, num_channels = NUM_CHANNELS)
input_image_shape_128 = getImageShape(images_size128_exp5_Pr_Im, num_channels = NUM_CHANNELS)
input_image_shape_224 = getImageShape(images_size224_exp5_Pr_Im, num_channels = NUM_CHANNELS)
input_image_shape_384 = getImageShape(images_size384_exp5_Pr_Im, num_channels = NUM_CHANNELS)

(64, 64, 1)
(128, 128, 1)
(224, 224, 1)
(384, 384, 1)


In [44]:
def getOptCNNParams(image_size):
    opt_params = dict.fromkeys(SCENARIO_LIST)
    for s in SCENARIO_LIST:
        with open('../../results/models/' + str(image_size) + '/' + s + '/attributes/hyperparameters.txt') as f: 
            data = f.read() 
        js = json.loads(data)   
        opt_params[s] = js
    return(opt_params)

In [45]:
opt_params_64 = getOptCNNParams(64)
opt_params_128 = getOptCNNParams(128)

In [46]:
# Can either load a previously saved model or define here.
# model = models.load_model('../../results/models/MODEL_NAME')
def constructBaseCNN(image_shape, scenario, opt_params_dict):
    p_dict = opt_params_dict[scenario]
    if scenario=="Pr_Po_Im":
        num_classes = 3
    else:
        num_classes = 2
    base_model = models.Sequential([
        layers.Conv2D(filters = 64, kernel_size = p_dict['kernel_size'], strides = 2, activation="relu", padding="same", input_shape = image_shape),
        layers.MaxPooling2D(2),
        layers.Conv2D(128, 3, activation="relu", padding="same"),
        layers.Conv2D(128, 3, activation="relu", padding="same"),
        layers.MaxPooling2D(2),
        layers.Conv2D(256, 3, activation="relu", padding="same"),
        layers.Conv2D(256, 3, activation="relu", padding="same"),
        layers.MaxPooling2D(2),
        layers.Flatten(),
        
        layers.Dense(p_dict['units'], activation = p_dict['activation']),
        layers.BatchNormalization(),
        layers.Dropout(p_dict['dropout']), 
        
        layers.Dense(p_dict['units'], activation = p_dict['activation']), 
        layers.Dropout(p_dict['dropout']),
        
        layers.Dense(num_classes, activation="softmax")
    ])
    learning_rate = p_dict['learning_rate']
    return(base_model, learning_rate)

In [66]:
# The resolution test will be conducted scenario by scenario.
def testResolutionPerformance(scenario, opt_params = opt_params_64, num_epochs = 10, trial_seed = 1): 
    resolution_keys = [64, 128, 224, 384]
    performance_dict = dict.fromkeys(resolution_keys)
    if scenario == "Pr_Po_Im":
        imageset = [images_size64_exp5_Pr_Po_Im, images_size128_exp5_Pr_Po_Im, images_size224_exp5_Pr_Po_Im, images_size384_exp5_Pr_Po_Im]
    elif scenario == "Pr_Im":
        imageset = [images_size64_exp5_Pr_Im]#, images_size128_exp5_Pr_Im, images_size224_exp5_Pr_Im, images_size384_exp5_Pr_Im]
    elif scenario == "PrPo_Im":
        imageset = [images_size64_exp5_PrPo_Im, images_size128_exp5_PrPo_Im, images_size224_exp5_PrPo_Im, images_size384_exp5_PrPo_Im]
    elif scenario == "Pr_PoIm":
        imageset = [images_size64_exp5_Pr_PoIm, images_size128_exp5_Pr_PoIm, images_size224_exp5_Pr_PoIm, images_size384_exp5_Pr_PoIm]

    k = 0
    for resolution_set in imageset:
        res_key = resolution_keys[k]
        performance_dict[res_key] = {}
        training_images_and_labels, test_images_and_labels = splitData(resolution_set, prop = 0.80, seed_num = trial_seed)
        training_images, training_labels = getImageAndLabelArrays(training_images_and_labels)
        validation_images, validation_labels = getImageAndLabelArrays(test_images_and_labels)
        # early_stopping_callback = tf.keras.callbacks.EarlyStopping(monitor='loss', patience=3)
        # batch_training_histories = Histories()
        # metrics_multiclass = Metrics(validation_images,validation_labels)  TODO
        # K.clear_session()
        input_shape = (res_key, res_key, NUM_CHANNELS)
        model, opt_learning_rate =  constructBaseCNN(input_shape, scenario, opt_params)
        reset_weights(model) # re-initialize model weights
        opt = tf.keras.optimizers.Adam(learning_rate = opt_learning_rate)
        model.compile(loss='categorical_crossentropy',  optimizer = "adam", metrics = ['accuracy'])
        hist = model.fit(training_images, training_labels, batch_size = 32, epochs = num_epochs, verbose=0, validation_data=(validation_images, validation_labels)) #, callbacks=[batch_training_histories])
        performance_dict[res_key]['scenario'] = scenario
        performance_dict[res_key]['image_size'] = res_key
        performance_dict[res_key]['metrics'] = hist
        performance_dict[res_key]['best_val_acc'] = np.max(hist.history['val_accuracy'])
        k += 1    
    return(performance_dict)

In [70]:
def main(num_trials = 5):
    scenario_performance_dict = dict.fromkeys(SCENARIO_LIST)
    if not os.path.exists(RESOLUTION_PERFORMANCE_METRICS_DIR): # check if 'tidy/preprocessed_images' subdirectory does not exist
        os.makedirs(RESOLUTION_PERFORMANCE_METRICS_DIR) # if not, create it    
    for i in range(num_trials):
        for s in SCENARIO_LIST:
            print("Conducting resolution performance test; Scenario: " + s + "; Trial: " + str(i+1))
            scenario_performance_dict[s] = testResolutionPerformance(s, opt_params = opt_params_64, num_epochs = 10, trial_seed = 1 + i) #ultimately should be averaged across trials       
            scenario_filename = "resolution_performance_" + s + "_trial_" + str(i+1) + ".txt"
            with open(os.path.join(RESOLUTION_PERFORMANCE_METRICS_DIR, scenario_filename), 'w') as file:
                file.write(json.dump(scenario_performance_dict)) # use `json.loads` to do the reverse
    return

In [None]:
if __name__ == "__main__":
    main()

Conducting resolution performance test; Scenario: Pr_Po_Im; Trial: 1


In [None]:
# Plotting 
# p1[64]['metrics'].history['loss']
# plt.close('all')
# for m in ['recall', 'precision', 'f1-score']:
#     for c in [0,1,2]:
#         plt.plot(metrics_multiclass.get(m,c), label='Class {0} {1}'.format(c,m))
        
# plt.legend(loc='lower right')
# plt.show()

In [65]:
np.max([1,2])

2