In [None]:
#Importing dependencies
import csv
import cv2
import h5py
import itertools
import keras
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import numpy as np
import os
import pandas as pd
import random
import scipy
import seaborn as sns
import sklearn
import skimage
import warnings
import zlib

from glob import glob
from imblearn.over_sampling import RandomOverSampler
from imblearn.under_sampling import RandomUnderSampler
from keras import backend as K
from keras import models, layers, optimizers
from keras.applications.vgg16 import VGG16
from keras.applications.inception_v3 import InceptionV3
from keras.callbacks import Callback, EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
from keras.layers import Activation, Dense, Dropout, Flatten, Conv2D, MaxPool2D, Lambda, MaxPooling2D, AveragePooling2D, BatchNormalization
from keras.models import Model, Sequential, model_from_json
from keras.optimizers import SGD, RMSprop, Adam, Adagrad, Adadelta, RMSprop
from keras.preprocessing.image import ImageDataGenerator
from keras.utils import np_utils
from keras.utils.np_utils import to_categorical
from skimage.transform import resize
from sklearn import model_selection
from sklearn.metrics import confusion_matrix, accuracy_score
from sklearn.model_selection import train_test_split, learning_curve, KFold, cross_val_score, StratifiedKFold
from sklearn.utils import class_weight
from tqdm import tqdm
from PIL import Image

%matplotlib inline
warnings.filterwarnings("ignore")

In [None]:
#Variables
#Directories
dir_dataset = "../DataSet/"
dir_dataset_train = dir_dataset + "train/"
dir_dataset_test  = dir_dataset + "test/"
dir_transferlearning = "../TransferLearning/"

#Labels
normal_label    = 0
pneumonia_label = 1
unknown_label   = 2

#Dataset
name_dataset = "dataset.h5"
exists_dataset = True

#Directories Transfer learning
pretrained_label_VGG16 = 'VGG16'
pretrained_label_InceptionV3 = 'InceptionV3'

dir_transferlearning_vgg16 = dir_transferlearning +  "Vgg16.h5"
dir_transferlearning_inception = dir_transferlearning +  "InceptionV3.h5"

In [None]:
#Defining hyperparameters
height_img          = 150
width_img           = 150
channel_img         = 3
max_epoch_vgg16     = 100
max_epoch_inception = 100
number_classes      = 2
optimizer           = keras.optimizers.RMSprop(lr=0.0001)

In [None]:
#Defining map characters and transferLearning
map_characters = {normal_label: 'No Pneumonia',
                  pneumonia_label: 'Yes Pneumonia'}

pretrained_model_VGG16 = VGG16(weights = dir_transferlearning_vgg16,
                               include_top=False,
                               input_shape=(height_img, width_img, channel_img))

pretrained_model_InceptionV3 = InceptionV3(weights = dir_transferlearning_inception,
                                           include_top=False,
                                           input_shape=(height_img, width_img, channel_img))

In [None]:
#Load and define mehotd resizing load image
def get_dataset_resizing(folder):
    x = []
    y = []
    
    for folderName in os.listdir(folder):
        if not folderName.startswith('.'):
            
            if folderName in ['NORMAL']:
                label = normal_label
            elif folderName in ['PNEUMONIA']:
                label = pneumonia_label
            else:
                label = unknown_label
                
            for image_filename in tqdm(os.listdir(folder + folderName)):
                img_file = cv2.imread(folder + folderName + '/' + image_filename)
                
                if img_file is not None:
                    img_file = skimage.transform.resize(img_file, (height_img, width_img, channel_img))                
                    img_arr = np.asarray(img_file)
                    x.append(img_arr)
                    y.append(label)
                    
    x = np.asarray(x)
    y = np.asarray(y)
    
    return x,y

In [None]:
#create memory dataset
if not exists_dataset:
    x_train, y_train = get_dataset_resizing(dir_dataset_train)
    x_test,  y_test  = get_dataset_resizing(dir_dataset_test)

In [None]:
#save dataset in computer or load dataset from computer
if not exists_dataset:
    hf = h5py.File(name_dataset, 'w')
    hf.create_dataset('x_train', data=x_train)
    hf.create_dataset('y_train', data=y_train)
    hf.create_dataset('x_test', data=x_test)
    hf.create_dataset('y_test', data=y_test)
    hf.close()
else:
    with h5py.File(name_dataset,'r') as hf:
        x_train = hf['x_train'][:]
        y_train = hf['y_train'][:]
        x_test  = hf['x_test'][:]
        y_test  = hf['y_test'][:]

In [None]:
#converts the dataset to binary array
y_train_hot = to_categorical(y_train, num_classes = number_classes)
y_test_hot  = to_categorical(y_test, num_classes = number_classes)

In [None]:
#defining metric and presentation of the obtained results
class MetricsCheckpoint(Callback):    
    def __init__(self, savepath):
        super(MetricsCheckpoint, self).__init__()
        self.savepath = savepath
        self.history = {}
        
    def on_epoch_end(self, epoch, logs=None):
        for k, v in logs.items():
            self.history.setdefault(k, []).append(v)
        np.save(self.savepath, self.history)

def plotKerasLearningCurve():
    plt.figure(figsize=(10,5))
    metrics = np.load('logs.npy')[()]
    filt = ['acc']
    
    for k in filter(lambda x : np.any([kk in x for kk in filt]), metrics.keys()):
        l = np.array(metrics[k])
        plt.plot(l, c= 'r' if 'val' not in k else 'b', label='val' if 'val' in k else 'train')
        x = np.argmin(l) if 'loss' in k else np.argmax(l)
        y = l[x]
        plt.scatter(x,y, lw=0, alpha=0.25, s=100, c='r' if 'val' not in k else 'b')
        plt.text(x, y, '{} = {:.10f}'.format(x,y), size='15', color= 'r' if 'val' not in k else 'b')   
        
    plt.legend(loc=4)
    plt.axis([0, None, None, None]);
    plt.grid()
    plt.xlabel('Number of epochs')
    plt.ylabel('Accuracy')

def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    plt.figure(figsize = (5,5))
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=90)
    plt.yticks(tick_marks, classes)
    
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, cm[i, j],
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")
        
    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

def plot_learning_curve(history):
    plt.figure(figsize=(8,8))
    plt.subplot(1,2,1)
    plt.plot(history.history['acc'])
    plt.plot(history.history['val_acc'])
    plt.title('model accuracy')
    plt.ylabel('accuracy')
    plt.xlabel('epoch')
    plt.legend(['train', 'test'], loc='upper left')
    plt.savefig('./accuracy_curve.png')
    plt.subplot(1,2,2)
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('model loss')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.legend(['train', 'test'], loc='upper left')

    plt.savefig('./loss_curve.png')

In [None]:
#Defining network architecture
def pretrainedNetwork(xtrain, ytrain, xtest, ytest, i_pretrained_model, pretrained_label,
                      i_pretrained_weights, i_class_weight, i_num_classes,
                      i_max_epochs, i_optimizer, i_labels):
    
    if pretrained_label in pretrained_label_VGG16:
        base_model  = pretrained_model_VGG16
    elif pretrained_label in pretrained_label_InceptionV3:        
        base_model  = pretrained_model_InceptionV3    
    else:
        base_model  = pretrained_model_VGG16       
        
    x           = base_model.output
    x           = Flatten()(x)
    predictions = Dense(i_num_classes, activation='softmax')(x)
    model       = Model(inputs=base_model.input, outputs=predictions)
    
    for layer in base_model.layers:
        layer.trainable = False        
    
    model.compile(loss='categorical_crossentropy', 
                  optimizer=i_optimizer, 
                  metrics=['accuracy'])
    
    callbacks_list = [keras.callbacks.EarlyStopping(monitor='val_acc', patience=3, verbose=1)]
    model.summary()
    
    history = model.fit(xtrain, ytrain, epochs=i_max_epochs, class_weight=i_class_weight,
                        validation_data=(xtest, ytest), verbose=1,
                        callbacks = [MetricsCheckpoint('logs')])
    
    model.save_weights('DeepLearningPneumonia.h5')
    
    score = model.evaluate(xtest, ytest, verbose=0)
    print('\nKeras CNN - accuracy:', score[1], '\n')
    
    y_pred = model.predict(xtest)
    print('\n', sklearn.metrics.classification_report(np.where(ytest > 0)[1],
                                                      np.argmax(y_pred, axis=1),
                                                      target_names=list(i_labels.values())), sep='') 
    
    Y_pred_classes = np.argmax(y_pred, axis = 1) 
    Y_true = np.argmax(ytest, axis = 1) 
    confusion_mtx = confusion_matrix(Y_true, Y_pred_classes) 
    plotKerasLearningCurve()
    plt.show()
    plot_learning_curve(history)
    plt.show()
    plot_confusion_matrix(confusion_mtx, classes = list(i_labels.values()))
    plt.show()
    return model    

In [None]:
#Initial weights
x_train_shape = x_train.shape[1]*x_train.shape[2]*x_train.shape[3]
x_test_shape  = x_test.shape[1]*x_test.shape[2]*x_test.shape[3]
x_train_flat  = x_train.reshape(x_train.shape[0], x_train_shape)
x_test_flat   = x_test.reshape(x_test.shape[0], x_test_shape)

yy_train = y_train
yy_test  = y_test
ros      = RandomUnderSampler(ratio='auto')

x_train_ros, y_train_ros = ros.fit_sample(x_train_flat, yy_train)
x_test_ros, y_test_ros   = ros.fit_sample(x_test_flat, yy_test)

y_train_ros_hot = to_categorical(y_train_ros, num_classes = number_classes)
y_test_ros_hot = to_categorical(y_test_ros, num_classes = number_classes)

for i in range(len(x_train_ros)):
    height, width, channels = height_img, width_img, channel_img
    x_train_ros_reshaped = x_train_ros.reshape(len(x_train_ros), height, width, channels)
    
for i in range(len(x_test_ros)):
    height, width, channels = height_img, width_img, channel_img
    x_test_ros_reshaped = x_test_ros.reshape(len(x_test_ros), height,width, channels)

In [None]:
#Weight balancing
class_weight_y = class_weight.compute_class_weight('balanced', np.unique(y_train), y_train)
print("Class Weights: ", class_weight_y)

class_weight_yy = class_weight.compute_class_weight('balanced', np.unique(yy_train), yy_train)
print("Old Class Weights: ", class_weight_yy)

class_weight_ros = class_weight.compute_class_weight('balanced', np.unique(y_train_ros), y_train_ros)
print("New Class Weights: ", class_weight_ros)

In [None]:
#training network VGG16
pretrainedNetwork(x_train_ros_reshaped, y_train_ros_hot, x_test_ros_reshaped, y_test_ros_hot,
                  pretrained_model_VGG16, pretrained_label_VGG16, dir_transferlearning_vgg16,
                  class_weight_ros, number_classes, max_epoch_vgg16, optimizer, map_characters)

In [None]:
#raining network InceptionV3

#Não obitivemos bons resultados com a InceptionV3

#retrainedNetwork(x_train_ros_reshaped, y_train_ros_hot, x_test_ros_reshaped, y_test_ros_hot,
                  #retrained_model_InceptionV3, pretrained_label_InceptionV3, dir_transferlearning_inception,
                  #lass_weight_ros, number_classes, max_epoch_inception, optimizer, map_characters)