In [None]:
import numpy as np 
import pandas as pd 
import os
import matplotlib.pyplot as plt
import seaborn as sns
import keras
from keras.models import Sequential
from keras.layers import Dense, Conv2D , MaxPool2D , Flatten , Dropout , BatchNormalization
from keras.preprocessing.image import ImageDataGenerator
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report,confusion_matrix
from keras.callbacks import ReduceLROnPlateau
import cv2

In [None]:
labels = ['PNEUMONIA', 'NORMAL']
img_size = 150

def get_training_data(data_dir):
    data = [] 
    for label in labels: 
        path = os.path.join(data_dir, label)
        class_num = labels.index(label)
        for img in os.listdir(path):
            
            img_arr = cv2.imread(os.path.join(path, img), cv2.IMREAD_GRAYSCALE)
            resized_arr = cv2.resize(img_arr, (img_size, img_size)) # Reshaping images to preferred size
            data.append([resized_arr, class_num])
    return data

In [None]:
train = get_training_data('./chest_xray/train')
test = get_training_data('./chest_xray/test')
val = get_training_data('./chest_xray/val')

In [None]:
print(len(train))
print(len(test))
print(len(val))
# Validation is only 16 images. 

In [None]:
pneumnoia_count = 0
normal_count = 0
for image in train:
    if(image[1] == 0): # A 0 is the pneumonia class
        pneumnoia_count += 1
    else:
        normal_count += 1

print("Pneumonia Cases: ", pneumnoia_count)
print("Normal Cases: ", normal_count)

In [None]:
# Look at first training image
plt.figure(figsize = (5,5))
plt.imshow(train[0][0], cmap='gray')
plt.title(labels[train[0][1]])

# Look at final training image
plt.figure(figsize = (5,5))
plt.imshow(train[-1][0], cmap='gray')
plt.title(labels[train[-1][1]])

In [None]:
x_train = []
y_train = []

x_test_B = []
y_test_B = []


for feature, label in train:
    x_train.append(feature)
    y_train.append(label)

for feature, label in test:
    x_test_B.append(feature)
    y_test_B.append(label)
    
random_sample = 100

x_train, x_test_val, y_train, y_test_val   = train_test_split(x_train, y_train, test_size=0.1, random_state=random_sample)
x_val,   x_test_A,   y_val,   y_test_A     = train_test_split(x_test_val, y_test_val, test_size=0.5, random_state=random_sample)



In [None]:
print(len(x_train))
print(len(x_val))
print(len(x_test_A))
print(len(x_test_B))
print(x_train[0].shape)

In [None]:
# Normalize the data
x_train = np.array(x_train) / 255
x_val = np.array(x_val) / 255
x_test_A = np.array(x_test_A) / 255
x_test_B = np.array(x_test_A) / 255

In [None]:
# resize data for deep learning 
x_train = x_train.reshape(-1, img_size, img_size, 1)
y_train = np.array(y_train)

x_val = x_val.reshape(-1, img_size, img_size, 1)
y_val = np.array(y_val)

x_test_A = x_test_A.reshape(-1, img_size, img_size, 1)
y_test_A = np.array(y_test_A)

x_test_B = x_test_B.reshape(-1, img_size, img_size, 1)
y_test_B = np.array(y_test_B)

In [None]:
for dataset, dataset_name in zip([y_train,y_val,y_test_A,y_test_B],["y_train","y_val","y_test_A","y_test_B"]):
    pneumnoia_count = 0
    normal_count = 0
    for label in dataset:
        if(label == 0): # A 0 is the pneumonia class
            pneumnoia_count += 1
        else:
            normal_count += 1
    
    print(f"\n{dataset_name}")
    print("Pneumonia Cases: ", pneumnoia_count)
    print("Normal Cases: ", normal_count)

In [None]:
model = Sequential()
model.add(Conv2D(32 , (3,3) , strides = 1 , padding = 'same' , activation = 'relu' , input_shape = (150,150,1)))
model.add(BatchNormalization())
model.add(MaxPool2D((2,2) , strides = 2 , padding = 'same'))
model.add(Conv2D(64 , (3,3) , strides = 1 , padding = 'same' , activation = 'relu'))
model.add(Dropout(0.1))
model.add(BatchNormalization())
model.add(MaxPool2D((2,2) , strides = 2 , padding = 'same'))
model.add(Conv2D(64 , (3,3) , strides = 1 , padding = 'same' , activation = 'relu'))
model.add(BatchNormalization())
model.add(MaxPool2D((2,2) , strides = 2 , padding = 'same'))
model.add(Conv2D(128 , (3,3) , strides = 1 , padding = 'same' , activation = 'relu'))
model.add(Dropout(0.2))
model.add(BatchNormalization())
model.add(MaxPool2D((2,2) , strides = 2 , padding = 'same'))
model.add(Conv2D(256 , (3,3) , strides = 1 , padding = 'same' , activation = 'relu'))
model.add(Dropout(0.2))
model.add(BatchNormalization())
model.add(MaxPool2D((2,2) , strides = 2 , padding = 'same'))
model.add(Flatten())
model.add(Dense(units = 128 , activation = 'relu'))
model.add(Dropout(0.2))
model.add(Dense(units = 1 , activation = 'sigmoid'))
model.compile(optimizer = "rmsprop" , loss = 'binary_crossentropy' , metrics = ['accuracy'])
model.summary()

In [None]:
learning_rate_reduction = ReduceLROnPlateau(monitor='val_accuracy', patience = 2, verbose=1,factor=0.3, min_lr=0.000001)
best_model = keras.callbacks.ModelCheckpoint(filepath='models/best_model.h5', save_best_only=True)

class CustomSaver(keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs={}):
        if epoch % 5 == 0:  # or save after some epoch, each k-th epoch etc.
            self.model.save(f"models/model_epoch_{epoch}.h5")


epoch_saver = CustomSaver()
#epoch_saver = keras.callbacks.ModelCheckpoint(filepath="models/model_epoch_{epoch:02d}.h5", save_best_only=False, save_freq = 2)


In [None]:
history = model.fit(x_train,y_train, batch_size = 32, epochs = 60 , validation_data = (x_val, y_val) ,callbacks = [learning_rate_reduction,best_model,epoch_saver],)