In [None]:
# normal working notebook imports
import pandas as pd
import numpy as np
from numpy import expand_dims
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline

# importing keras for neural net building
import keras
from keras.models import Sequential
from keras.layers import Dense, Dropout
from keras.optimizers import RMSprop
from keras.optimizers import Adam, SGD
from keras.preprocessing.image import ImageDataGenerator,array_to_img, img_to_array, load_img

import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras import datasets, layers, models

# scores for understanding results
from sklearn.metrics import roc_curve, roc_auc_score
from sklearn.metrics import confusion_matrix
from sklearn.metrics import plot_confusion_matrix

# insight into how machine makes decisions
import lime
from lime import lime_image
from skimage.segmentation import mark_boundaries
import random


This process will retrieve all the pictures in the data set and assign a label based on the folder it was in.

In [None]:
# Directory to path of the images
# These images are split into train/test/validation sets
# Then further split by if they have p
train_data_dir = 'chest_xray/chest_xray/train/'
val_data_dir = 'chest_xray/chest_xray/val/'
test_data_dir = 'chest_xray/chest_xray/test/'


# getting the testing data
test_generator = ImageDataGenerator().flow_from_directory(
        test_data_dir, 
        target_size=(64, 64), batch_size=600)

# getting the validation set
val_generator = ImageDataGenerator().flow_from_directory(
        val_data_dir, 
        target_size=(64, 64), batch_size=16)

# Get all the data in the directory data/train (790 images), and reshape them
train_generator = ImageDataGenerator(horizontal_flip=True).flow_from_directory(
        train_data_dir, 
        target_size=(64, 64), batch_size=5200)


In [None]:
# Create the datasets
train_images, train_labels = next(train_generator)
test_images, test_labels = next(test_generator)
val_images, val_labels = next(val_generator)

In [None]:
train_images[0].shape

# EDA

### Going to look at average image

In [None]:
X_train = train_images.reshape(5200,12288)
X_test = test_images.reshape(600,12288)
X_val = val_images.reshape(16,12288)

In [None]:
y_train = train_labels.T[[1]][0]
y_test = test_labels.T[[1]][0]
y_val = val_labels.T[[1]][0]

In [None]:
pd.DataFrame(y_train)[0].value_counts()

In [None]:
print(X_train.shape)
print(y_train.shape)

### base model testing

In [None]:
batch_size = 10
num_classes = 2
epochs = 6

In [None]:
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)
y_val = keras.utils.to_categorical(y_val, num_classes)

In [None]:
print(X_train.shape)
print(y_train.shape)

### standardizing the image values 
This is done so that instead of a value between 0 and 255 we have a value between 0-1 reprenting how much color is there

In [None]:
X_train /= 255
X_test /= 255
X_val /= 255

### Image augmentation for increas in train data

Image augmentation is going to help with overfitting because of how it changes the images. In this case we are going to be flipping images.

In [None]:
data_augmentation = tf.keras.Sequential([
    layers.experimental.preprocessing.RandomFlip('horizontal_and_vertical'),
    layers.experimental.preprocessing.RandomRotation(0.5),
])

In [None]:
train

In [None]:
train_labels[0]

In [None]:
train_generator.class_indices['NORMAL']

In [None]:
image = train_images[0]
label = train_labels[0]
print(train_generator.labels[0])
plt.imshow(image);
#plt.title()

In [None]:
image = tf.expand_dims(image,0)
for i in range(9):
    augmented_image = data_augmentation(image)
    ax = plt.subplot(3, 3, i + 1)
    plt.imshow(augmented_image[0])
    plt.axis("off")

In [None]:
train_generator.class_indices

In [None]:
# model structure
model = Sequential()

# first hidden layer
# 12288 = 64x64x3 = size of image
model.add(Dense(50,activation='relu',input_shape=(12288,)))

# second hidden layer
model.add(Dense(50,activation='relu'))


# output layer
model.add(Dense(num_classes, activation='sigmoid'))

model.summary()

model.compile(loss='binary_crossentropy',
             optimizer=Adam(),
             metrics=['accuracy'])

history = model.fit(X_train, y_train,
                   batch_size=batch_size,
                   epochs=epochs,
                   verbose=1,
                   validation_data=(X_val,y_val))

score = model.evaluate(X_test,y_test,verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])

# CNN

In [None]:
train_images.shape

In [None]:
y_train.shape

In [None]:
model = models.Sequential()

model.add(layers.Conv2D(64, (3,3), activation='relu',input_shape=(64,64,3)))
#model.add(layers.Conv2D(64, (3,3), activation='relu',input_shape=(64,64,3)))
#model.add(Dropout(0.2))
model.add(layers.MaxPool2D((2,2)))

#model.add(layers.SeparableConv2D(64,(3,3),activation='relu'))

model.add(layers.Conv2D(64,(3,3),activation='relu'))
#model.add(layers.Conv2D(64,(3,3),activation='relu'))
#model.add(Dropout(0.1))
model.add(layers.MaxPool2D((2,2)))

model.add(layers.Conv2D(32,(3,3),activation='relu'))
#model.add(Dropout(0.1))
model.add(layers.MaxPool2D((2,2)))

model.add(layers.Conv2D(32,(3,3),activation='relu'))
#model.add(Dropout(0.1))
model.add(layers.MaxPool2D((2,2)))

model.add(layers.Flatten())
model.add(layers.Dense(32, activation='relu'))
model.add(Dropout(0.25))
model.add(layers.Dense(32, activation='relu'))
model.add(Dropout(0.25))
model.add(layers.Dense(2, activation='sigmoid'))

In [None]:
model.compile(optimizer='adam',
             loss='binary_crossentropy',
             metrics=['accuracy'])

history = model.fit(train_images, y_train, epochs=5,validation_data=(val_images,y_val))

In [None]:
score = model.evaluate(test_images,y_test,verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])

In [None]:
model_augmented = tf.keras.Sequential([
    data_augmentation,

    layers.Conv2D(64, (3,3), activation='relu',input_shape=(64,64,3)),
    layers.MaxPool2D((2,2)),

    layers.Conv2D(64,(3,3),activation='relu'),

    layers.MaxPool2D((2,2)),

    layers.Conv2D(32,(3,3),activation='relu'),
    
    layers.MaxPool2D((2,2)),

    layers.Conv2D(32,(3,3),activation='relu'),
    layers.MaxPool2D((2,2)),

    layers.Flatten(),
    layers.Dense(32, activation='relu'),
    Dropout(0.2),
    layers.Dense(32, activation='relu'),
    Dropout(0.2),
    layers.Dense(2, activation='sigmoid')
    ])

In [None]:
model_augmented.compile(optimizer='adam',
             loss='binary_crossentropy',
             metrics=['accuracy'])

history = model_augmented.fit(train_images, y_train, epochs=4,validation_data=(val_images,y_val))

In [None]:
score = model_augmented.evaluate(test_images,test_labels,verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])

In [None]:

y_pred_keras = (model_augmented.predict(test_images) > 0.5).astype("int32")

In [None]:
y_pred_keras

In [None]:
y_pred_keras = y_pred_keras.T[[1]][0]

In [None]:
y_true = test_labels.T[[1]][0]

In [None]:
fpr_keras, tpr_keras, thresholds_keras = roc_curve(y_true, y_pred_keras)


In [None]:
plt.figure(1)
plt.plot([0, 1], [0, 1], 'k--')
plt.plot(fpr_keras, tpr_keras)
plt.xlabel('False positive rate')
plt.ylabel('True positive rate')
plt.title('ROC curve')
plt.show()

### Confusion Matrix

In [None]:
conf_matrix = confusion_matrix(y_true,y_pred_keras)
df_cm = pd.DataFrame(conf_matrix,columns=['Norm','Pneu'],index=['Norm','Pneu'])
df_cm.index.name = 'Actual'
df_cm.columns.name = 'Predicted'

In [None]:
df_cm

In [None]:
fig, ax = plt.subplots(figsize=(5,5))
ax = sns.set(font_scale=1.4)
ax = sns.heatmap(df_cm,cmap='Blues',annot=True,annot_kws={'size':16});

### trying alex net

In [None]:
model = models.Sequential()

model.add(layers.Conv2D(96, (4,4), activation='relu',input_shape=(64,64,3),padding='same'))
model.add(layers.MaxPool2D(pool_size=(2,2),strides=(2,2)))

model.add(layers.Conv2D(256, (11,11), activation='relu',input_shape=(64,64,3),padding='same'))
model.add(layers.MaxPool2D(pool_size=(2,2),strides=(2,2)))

model.add(layers.Conv2D(384, (3,3), activation='relu',input_shape=(64,64,3),padding='same'))
model.add(layers.MaxPool2D(pool_size=(2,2),strides=(2,2)))

model.add(layers.Conv2D(384, (3,3), activation='relu',input_shape=(64,64,3),padding='same'))

model.add(layers.Conv2D(256, (3,3), activation='relu',input_shape=(64,64,3),padding='same'))
model.add(layers.MaxPool2D(pool_size=(2,2),strides=(2,2)))

model.add(layers.Flatten())
model.add(layers.Dense(4096, activation='relu'))
model.add(Dropout(0.4))
model.add(layers.Dense(4096, activation='relu'))
model.add(Dropout(0.4))
model.add(layers.Dense(1000, activation='relu'))
model.add(Dropout(0.4))
model.add(layers.Dense(2, activation='sigmoid'))

In [None]:
model.compile(optimizer='adam',
             loss='binary_crossentropy',
             metrics=['accuracy'])

history = model.fit(train_images, y_train, epochs=5,validation_data=(val_images,y_val))

In [None]:
score = model.evaluate(test_images,y_test,verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])

# Testing for higher accuracym

Through research I found that a common method for convolutional layers is having 32-32-64-64 filters in that progression. This worked very well and increase predciton abilites from on averag 0.80 to on average 0.86. After this process due to lack of overfitting I continually added layers until overfitting.

In [None]:
model_testing = tf.keras.Sequential([
    data_augmentation,

    layers.Conv2D(32, kernel_size=(3,3), activation='relu',input_shape=(64,64,3)),
    layers.MaxPool2D((2,2),strides=2),

    layers.Conv2D(32,(3,3),activation='relu'),
    layers.MaxPool2D((2,2),strides=2),
    
    layers.Conv2D(64,(3,3),activation='relu'),
    layers.MaxPool2D((2,2),strides=2),
    
    layers.Conv2D(64,(3,3),activation='relu'),
    layers.MaxPool2D((2,2),strides=2),
    

    layers.Flatten(),
    layers.Dense(64, activation='relu'),
    Dropout(0.2),
    layers.Dense(64, activation='relu'),
    Dropout(0.2),
    layers.Dense(64, activation='relu'),
    Dropout(0.2),
    layers.Dense(64, activation='relu'),
    Dropout(0.2),
    layers.Dense(64, activation='relu'),
    Dropout(0.2),
    layers.Dense(64, activation='relu'),
    Dropout(0.2),
    layers.Dense(64, activation='relu'),
    Dropout(0.2),
    layers.Dense(64, activation='relu'),
    Dropout(0.2),
    layers.Dense(64, activation='relu'),
    Dropout(0.2),
    layers.Dense(2, activation='sigmoid')
    ])

In [None]:
#callback = tf.keras.callbacks.EarlyStopping(monitor='loss', patience=3)

model_testing.compile(optimizer='adam',
             loss='binary_crossentropy',
             metrics=['accuracy'])

In [None]:
history = model_testing.fit(train_images, y_train, epochs=50,validation_data=(val_images,y_val))

In [None]:
#model_testing.summary()

In [None]:
score = model_testing.evaluate(test_images,y_test,verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])

In order to determine what threshold is going to be the best for out predcitions we are going to show ROC curves for each threshold value. The model predicts how likely it thinks each picture belongs to a class. An example prediction would be \[0.387,0.613\] where it thinks it has a 38% chance of not having pneumonia and 61% chance of having pneumonia. The higher we raise the threshold the more confident we have to be that the image has pneumonia for it to be classfied as such.

In [None]:
threshs = np.arange(0.1,1,0.1)

In [None]:
threshs

In [None]:
y_test_roc = y_test.T[[1]][0]
def roc_per_thresh(threshs):
    rocs = []
    for thresh in threshs:
        y_pred_roc = (model_testing.predict(test_images) > thresh).astype('int32')
        y_pred_roc = y_pred_roc.T[[1]][0]
        fpr, tpr, thresholds = roc_curve(y_test_roc, y_pred_roc)
        rocs.append([thresh,fpr,tpr])
    return rocs

In [None]:
roc_values = roc_per_thresh(threshs)

In [None]:
roc_values[0]

In [None]:
plt.figure(1,figsize=(10,10))
plt.plot([0, 1], [0, 1], 'k--')
for roc in roc_values:
    plt.plot(roc[1],roc[2],label=(round(roc[0],1),round(roc[1][1],2),round(roc[2][1],2)))
plt.xlabel('False positive rate')
plt.ylabel('True positive rate')
plt.title('ROC curve')
plt.legend(title=['(FPR,TPR)'])
plt.show()

In [None]:
y_pred = (model_testing.predict(test_images) > 0.4).astype('int32')

In [None]:
y_pred = y_pred.T[[1]][0]

In [None]:
!pip install lime


In [None]:
!pip install ipywidgets

In [None]:
!jupyter nbextension enable --py widgetsnbextension

In [None]:
explainer = lime_image.LimeImageExplainer(random_state=1)

In [None]:
y_test = y_test.T[[1]][0]

In [None]:
print("Pred: {} \nTrue: {}".format(y_pred[1],y_test[1]))

In [None]:
array_to_img(test_images[1])

In [None]:
bad_predictions = (y_pred == y_test)

In [None]:
test_images_lime = test_images.astype('double')

In [None]:
labels = ['Norm','Pneu']

In [None]:
%%time
fig, ax = plt.subplots(5, 3, sharex='col', sharey='row')
fig.set_figwidth(20)
fig.set_figheight(16)
indecies = random.sample(range(sum(bad_predictions)), 5)
for j in range(5):
    explanation = explainer.explain_instance(test_images_lime[bad_predictions][indecies[j]], 
                                             model_testing.predict, 
                                             top_labels=5, hide_color=0, num_samples=1000, 
                                             random_seed=42)
    ax[j,0].imshow(test_images_lime[bad_predictions][indecies[j]])
    for i in range(2):
        temp, mask = explanation.get_image_and_mask(i, positive_only=True, 
                                                    num_features=5, hide_rest=True)
        ax[j,i+1].imshow(mark_boundaries(temp / 2 + 0.5, mask))

In [None]:
test_labels[9]

In [None]:
def image_outline(i):
    explanation = explainer.explain_instance(test_images[i].astype('double'), model_testing.predict, top_labels=5, hide_color=0, num_samples=1000)
    temp, mask = explanation.get_image_and_mask(explanation.top_labels[0], positive_only=True, num_features=5, hide_rest=False)
    plt.imshow(mark_boundaries(temp / 2 + 0.5, mask))


In [None]:
array_to_img(test_images[5])

In [None]:
image_outline(5)

In [None]:
pic_number = 7
print("Pred: {} \nTrue: {}".format(y_pred[pic_number],y_test[pic_number]))

In [None]:
explanation = explainer.explain_instance(test_images[pic_number].astype('double'), model_testing.predict, top_labels=5, hide_color=0, num_samples=1000)


In [None]:
#Select the same class explained on the figures above.
ind =  explanation.top_labels[0]

#Map each explanation weight to the corresponding superpixel
dict_heatmap = dict(explanation.local_exp[ind])
heatmap = np.vectorize(dict_heatmap.get)(explanation.segments) 

#Plot. The visualization makes more sense if a symmetrical colorbar is used.
plt.imshow(heatmap, cmap = 'RdBu', vmin  = -heatmap.max(), vmax = heatmap.max())
plt.colorbar()

In [None]:
temp, mask = explanation.get_image_and_mask(explanation.top_labels[0], positive_only=True, num_features=5, hide_rest=True)
plt.imshow(mark_boundaries(temp / 2 + 0.5, mask))

In [None]:
temp, mask = explanation.get_image_and_mask(explanation.top_labels[0], positive_only=True, num_features=5, hide_rest=False)
plt.imshow(mark_boundaries(temp / 2 + 0.5, mask))

In [None]:
temp, mask = explanation.get_image_and_mask(explanation.top_labels[0], positive_only=False, num_features=10, hide_rest=False)
plt.imshow(mark_boundaries(temp / 2 + 0.5, mask))