This code utilizes Talos to optimize the architecture and hyperparameter of the custom model


In [None]:
#install talos if not done before

!pip install -q talos

In [None]:
#load libraries

import talos as ta
import cv2
import tensorflow as tf
import numpy as np
import os
from keras.models import Sequential, model_from_json
from keras.layers import Dense, Conv2D, Dropout, BatchNormalization, GlobalAveragePooling2D, MaxPooling2D, Flatten, Activation
from keras.activations import softmax
from keras.losses import categorical_crossentropy
from keras.utils import np_utils
import matplotlib.pyplot as plt
import itertools
from talos.model.normalizers import lr_normalizer
from keras.optimizers import Adam, SGD
import talos as ta
from keras.optimizers import SGD

In [None]:
# custom function to plot confusion matrix

def plot_confusion_matrix(cm, classes,
                          normalize=False, #if true all values in confusion matrix is between 0 and 1
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    print(cm)
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, cm[i, j],
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")
    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    

In [None]:
# load data and transform to keras compatible format

train_data_dir = 'malaria100/train'
test_data_dir = 'malaria100/test'
nb_train_samples = 24760 
nb_test_samples = 2730 
img_rows_orig = 100 
img_cols_orig = 100

def load_training_data():
    labels = os.listdir(train_data_dir)
    total = len(labels)
    X_train = np.ndarray((nb_train_samples, img_rows_orig, img_cols_orig, 3), dtype=np.uint8)
    Y_train = np.zeros((nb_train_samples,), dtype='uint8')
    i = 0
    print('-'*30)
    print('Creating training images...')
    print('-'*30)
    j = 0
    for label in labels:
        image_names_train = os.listdir(os.path.join(train_data_dir, label))
        total = len(image_names_train)
        print(label, total)
        for image_name in image_names_train:
            img = cv2.imread(os.path.join(train_data_dir, label, image_name), cv2.IMREAD_COLOR)
            img = np.array([img])
            X_train[i] = img
            Y_train[i] = j
            if i % 100 == 0:
                print('Done: {0}/{1} images'.format(i, total))
            i += 1
        j += 1    
    print(i)                
    print('Loading done.')
    print('Transform targets to keras compatible format.')
    Y_train = np_utils.to_categorical(Y_train[:nb_train_samples], num_classes)
    np.save('imgs_train.npy', X_train, Y_train) 
    return X_train, Y_train
    
def load_test_data():
    labels = os.listdir(test_data_dir)
    X_test = np.ndarray((nb_test_samples, img_rows_orig, img_cols_orig, 3), dtype=np.uint8)
    Y_test = np.zeros((nb_test_samples,), dtype='uint8')
    i = 0
    print('-'*30)
    print('Creating test images...')
    print('-'*30)
    j = 0
    for label in labels:
        image_names_test = os.listdir(os.path.join(test_data_dir, label))
        total = len(image_names_test)
        print(label, total)
        for image_name in image_names_test:
            img = cv2.imread(os.path.join(test_data_dir, label, image_name), cv2.IMREAD_COLOR)
            img = np.array([img])
            X_test[i] = img
            Y_test[i] = j
            if i % 100 == 0:
                print('Done: {0}/{1} images'.format(i, total))
            i += 1
        j += 1
    print(i)            
    print('Loading done.')
    print('Transform targets to keras compatible format.');
    Y_test = np_utils.to_categorical(Y_test[:nb_test_samples], num_classes)
    np.save('imgs_test.npy', X_test, Y_test) 
    return X_test, Y_test


def load_resized_training_data(img_rows, img_cols):

    X_train, Y_train = load_training_data()
    X_train = np.array([cv2.resize(img, (img_rows,img_cols)) for img in X_train[:nb_train_samples,:,:,:]])
    
    return X_train, Y_train
    
def load_resized_test_data(img_rows, img_cols):

    X_test, Y_test = load_test_data()
    X_test = np.array([cv2.resize(img, (img_rows,img_cols)) for img in X_test[:nb_test_samples,:,:,:]])
    
    return X_test, Y_test


In [None]:
# convert to keras compatible format

img_rows=100 
img_cols=100
channel = 3 
num_classes = 2 
batch_size = 32
num_epoch = 30

#load data and convert to keras compatible format

X_train, Y_train = load_resized_training_data(img_rows, img_cols)
X_test, Y_test = load_resized_test_data(img_rows, img_cols)

#print the shape of the data

print(X_train.shape, Y_train.shape)
print(X_test.shape, Y_test.shape)   
print(X_train.shape[0], 'train samples')
print(X_test.shape[0], 'test samples')

In [None]:
x = np.concatenate((X_train, X_test), axis=0)
x.shape

In [None]:
y = np.concatenate((Y_train, Y_test), axis=0)
y.shape

In [None]:
#declare the base architecture and parameters to optimize in the custom model

def custom_model_fn(X_train, Y_train, X_test, Y_test, params):
    conv_dropout = float(params['conv_dropout'])
    dense1_neuron = int(params['dense1_neuron'])
    model = Sequential()
    model.add(BatchNormalization(input_shape=X_train.shape[1:]))
    model.add(Conv2D(64, (5, 5), padding='same', activation=params['activation']))
    model.add(MaxPooling2D(pool_size=(2, 2), strides=(2,2)))
    model.add(Dropout(conv_dropout))

    model.add(BatchNormalization(input_shape=X_train.shape[1:]))
    model.add(Conv2D(128, (5, 5), padding='same', activation=params['activation']))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(conv_dropout))

    model.add(BatchNormalization(input_shape=X_train.shape[1:]))
    model.add(Conv2D(256, (5, 5), padding='same', activation=params['activation']))
    model.add(MaxPooling2D(pool_size=(2, 2), strides=(2,2)))
    model.add(Dropout(conv_dropout))

    model.add(GlobalAveragePooling2D())
    model.add(Dense(dense1_neuron))
    model.add(Activation(params['activation']))
    model.add(Dropout(params['dropout']))
    model.add(Dense(2))
    model.add(Activation('softmax'))
    model.compile(optimizer=params['optimizer'](), loss='categorical_crossentropy',metrics=['accuracy'])
    
    
    out = model.fit(
        x, y, epochs=num_epoch, batch_size=batch_size, 
        verbose=1,
        validation_data=[X_test, Y_test]
    )
    return out, model

In [None]:
#declare the parameters to optimize

para = {
    'dense1_neuron': [256, 512],
    'activation': ['relu', 'elu'],
    'conv_dropout': [0.25, 0.5],
    'optimizer': [Adam, SGD],
    'dropout': [0.25, 0.5]
}

In [None]:
#run the experiment with talos

scan_results = ta.Scan(x, y, para, custom_model_fn)

In [None]:
#print the scan results

scan_results.data.head()

In [None]:
#get the model ID that gives the best values for validation accuracy

model_id = scan_results.data['val_acc'].astype('float').argmax() - 1
model_id + 1

In [None]:
#load the best model

model = model_from_json(scan_results.saved_models[model_id])
model.set_weights(scan_results.saved_weights[model_id])
model.summary()

In [None]:
# access the summary details

scan_results.details

In [None]:
# use Scan object as input

report = ta.Reporting(scan_results)

# access the dataframe with the results

report.data.head(-3)

In [None]:
# get the number of rounds in the Scan

report.rounds()

In [None]:
# get the highest result for any metric, here for the validation accuracy

report.high('val_acc')

In [None]:
#save the model

model.save('best_model.h5')
model.summary()