In [32]:
from PIL import Image
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers
from sklearn.preprocessing import OneHotEncoder 
from tensorflow.keras.models import load_model,Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import Dense,Input,Conv2D,Flatten,MaxPool2D
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import glob
from sklearn.metrics import confusion_matrix
from mlxtend.plotting import plot_confusion_matrix
import numpy as np
import matplotlib.pyplot as plt

In [6]:
# # With % train / test / val 70 / 20 / 10 taken from same directory
base_dir = '../all_data/'
all_norm_files = np.array(glob.glob(base_dir+'NORMAL/*'))
all_pnm_files = np.array(glob.glob(base_dir+'PNEUMONIA/*'))
np.random.shuffle(all_norm_files)
np.random.shuffle(all_pnm_files)

def train_test_val(data, ratios, label):
    i1 = int(len(data) * ratios['train'])
    i2 = int(len(data) * ratios['traintest'])
    train = data[:i1]
    test = data[i1:i2]
    val = data[i2:]
    return { 
        'files': {
            'train': train, 
            'test': test, 
            'val': val
        },
        'labels': {
            'train': np.array([label]*len(train)),
            'test': np.array([label]*len(test)),
            'val': np.array([label]*len(val))
        }
    }

data = { 'normal': {}, 'pneumonia':{} }
data['normal'] = train_test_val(all_norm_files, {'train': 0.7, 'traintest': 0.9}, 'normal')
data['pneumonia'] = train_test_val(all_pnm_files, {'train': 0.7, 'traintest': 0.9}, 'pneumonia')

In [7]:
im_size = 160
def get_imgs(files):
    print(files[:3])
    images = np.zeros((len(files), im_size, im_size,3), dtype='float32')
    for i, img_file in enumerate(files):
        img = np.array(Image.open(img_file).resize((160, 160)).convert("RGB")) 
        images[i] = img
    return images
trn_norm_x = get_imgs(data['normal']['files']['train']) # trn_norm_img
trn_pnm_x = get_imgs(data['pneumonia']['files']['train'])
tst_norm_x = get_imgs(data['normal']['files']['test'])
tst_pnm_x = get_imgs(data['pneumonia']['files']['test'])
val_norm_x = get_imgs(data['normal']['files']['val'])
val_pnm_x = get_imgs(data['pneumonia']['files']['val'])

['../all_data/NORMAL\\NORMAL2-IM-0346-0001.jpeg'
 '../all_data/NORMAL\\NORMAL2-IM-0059-0001.jpeg'
 '../all_data/NORMAL\\NORMAL2-IM-1138-0001.jpeg']
['../all_data/PNEUMONIA\\person1918_bacteria_4825.jpeg'
 '../all_data/PNEUMONIA\\person18_bacteria_57.jpeg'
 '../all_data/PNEUMONIA\\person24_bacteria_109.jpeg']
['../all_data/NORMAL\\NORMAL2-IM-0918-0001.jpeg'
 '../all_data/NORMAL\\NORMAL2-IM-0207-0001.jpeg'
 '../all_data/NORMAL\\NORMAL2-IM-1203-0001.jpeg']
['../all_data/PNEUMONIA\\person118_virus_224.jpeg'
 '../all_data/PNEUMONIA\\person257_bacteria_1193.jpeg'
 '../all_data/PNEUMONIA\\person847_bacteria_2767.jpeg']
['../all_data/NORMAL\\NORMAL2-IM-0775-0001.jpeg'
 '../all_data/NORMAL\\NORMAL2-IM-0385-0001.jpeg'
 '../all_data/NORMAL\\IM-0746-0001.jpeg']
['../all_data/PNEUMONIA\\person370_bacteria_1687.jpeg'
 '../all_data/PNEUMONIA\\person133_bacteria_634.jpeg'
 '../all_data/PNEUMONIA\\person807_virus_1441.jpeg']


array([[[[  0.,   0.,   0.],
         [  0.,   0.,   0.],
         [  0.,   0.,   0.],
         ...,
         [  0.,   0.,   0.],
         [  0.,   0.,   0.],
         [  0.,   0.,   0.]],

        [[  0.,   0.,   0.],
         [  0.,   0.,   0.],
         [  0.,   0.,   0.],
         ...,
         [  0.,   0.,   0.],
         [  0.,   0.,   0.],
         [  0.,   0.,   0.]],

        [[  0.,   0.,   0.],
         [  0.,   0.,   0.],
         [  0.,   0.,   0.],
         ...,
         [  0.,   0.,   0.],
         [  0.,   0.,   0.],
         [  0.,   0.,   0.]],

        ...,

        [[  0.,   0.,   0.],
         [  0.,   0.,   0.],
         [  0.,   0.,   0.],
         ...,
         [  0.,   0.,   0.],
         [  0.,   0.,   0.],
         [  0.,   0.,   0.]],

        [[  0.,   0.,   0.],
         [  0.,   0.,   0.],
         [  0.,   0.,   0.],
         ...,
         [  0.,   0.,   0.],
         [  0.,   0.,   0.],
         [  0.,   0.,   0.]],

        [[  0.,   0.,   0.],
       

In [None]:
print("train normal array shape :",trn_norm_x.shape)
print("train pneumonia array shape :",trn_pnm_x.shape)
print("\ntest normal array shape :",tst_norm_x.shape)
print("test pneumonia array shape :",tst_pnm_x.shape)
print("\nval normal array shape :",val_norm_x.shape)
print("val pneumonia array shape :",val_pnm_x.shape)

train normal array shape : (1108, 160, 160, 3)
train pneumonia array shape : (2991, 160, 160, 3)

test normal array shape : (316, 160, 160, 3)
test pneumonia array shape : (854, 160, 160, 3)

val normal array shape : (159, 160, 160, 3)
val pneumonia array shape : (428, 160, 160, 3)


In [10]:
x_train = np.append(trn_norm_x, trn_pnm_x, axis=0)
y_train = np.append(data['normal']['labels']['train'], data['pneumonia']['labels']['train'])
x_test = np.append(tst_norm_x,tst_pnm_x,axis=0)
y_test = np.append(data['normal']['labels']['test'], data['pneumonia']['labels']['test'])
x_val = np.append(val_norm_x,val_pnm_x,axis=0)
y_val = np.append(data['normal']['labels']['val'],data['pneumonia']['labels']['val'])

encoder = OneHotEncoder(sparse=False) # NORMAL is [1,0] and PNEUMONIA is [0,1]
y_train_enc= encoder.fit_transform(y_train.reshape(-1,1))
y_test_enc= encoder.fit_transform(y_test.reshape(-1,1))
y_val_enc= encoder.fit_transform(y_val.reshape(-1,1))

In [108]:
batch_size = 16

# train_datagen = ImageDataGenerator(rotation_range=45,
#                    width_shift_range=0.2,
#                    height_shift_range=0.2,
#                    shear_range=0.2,
#                    zoom_range=0.25,
#                    horizontal_flip=True,
#                    fill_mode='nearest')

train_datagen = ImageDataGenerator(rescale = 1.0/255,
                                   samplewise_center=True,
                                   samplewise_std_normalization=True,
                                   zoom_range = 0.2, 
                                   width_shift_range=0.1,  
                                   height_shift_range=0.1)

train_generator = train_datagen.flow(x_train,
                       y_train_enc,
                       batch_size=batch_size)


# val_datagen = ImageDataGenerator(rotation_range=45,
#                    width_shift_range=0.2,
#                    height_shift_range=0.2,
#                    shear_range=0.2,
#                    zoom_range=0.25,
#                    horizontal_flip=True,
#                    fill_mode='nearest')

val_datagen  = ImageDataGenerator(rescale = 1.0/255,
                                         samplewise_center=True,
                                         samplewise_std_normalization=True,
                                         zoom_range = 0.2, 
                                         width_shift_range=0.1,  
                                         height_shift_range=0.1)

val_generator = val_datagen.flow(x_val,
                       y_val_enc,
                       batch_size=batch_size)

test_datagen  = ImageDataGenerator(rescale = 1.0/255,
                                    samplewise_center=True,
                                    samplewise_std_normalization=True)

test_generator = test_datagen.flow(x_test,
                     y_test_enc,
                     batch_size=batch_size)

In [107]:
# gen = train_datagen.flow(x_train,y_train_enc,batch_size=1)
# plt.imshow(gen[0][0][0])
# print(gen[0][1])

In [112]:
n_train = len(trn_norm_x)+len(trn_pnm_x)
epochs = 20
learning_rate = 1e-3

model = tf.keras.Sequential(name='X-ray_CNN')

model.add(tf.keras.layers.InputLayer(input_shape=(160,160,3)))

model.add(tf.keras.layers.Conv2D(filters=32, kernel_size=(3, 3)))
model.add(tf.keras.layers.BatchNormalization())
model.add(tf.keras.layers.Activation('relu')) # relu activation makes each value 0 if it is negative
model.add(tf.keras.layers.MaxPool2D(pool_size=(2, 2)))

model.add(tf.keras.layers.Conv2D(filters=64, kernel_size=(3, 3)))
model.add(tf.keras.layers.BatchNormalization())
model.add(tf.keras.layers.Activation('relu'))
model.add(tf.keras.layers.MaxPool2D(pool_size=(2, 2)))

model.add(tf.keras.layers.Conv2D(filters=128, kernel_size=(3, 3)))
model.add(tf.keras.layers.BatchNormalization())
model.add(tf.keras.layers.Activation('relu'))
model.add(tf.keras.layers.MaxPool2D(pool_size=(2, 2)))

model.add(tf.keras.layers.Conv2D(filters=128, kernel_size=(3, 3)))
model.add(tf.keras.layers.BatchNormalization())
model.add(tf.keras.layers.Activation('relu'))
model.add(tf.keras.layers.MaxPool2D(pool_size=(2, 2)))

model.add(tf.keras.layers.Flatten())
model.add(tf.keras.layers.Dense(units=512, activation='relu'))
model.add(tf.keras.layers.Dense(units=2, activation='sigmoid'))

METRICS = ['accuracy',
        tf.keras.metrics.Precision(name='precision'),
        tf.keras.metrics.Recall(name='recall')]
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate=1e-2,
    decay_steps=10000,
    decay_rate=0.9)

model.compile(optimizer=tf.keras.optimizers.SGD(learning_rate=lr_schedule),
              loss=tf.keras.losses.BinaryCrossentropy(),
              metrics=METRICS)
hist = model.fit(train_generator,
           steps_per_epoch= x_train.shape[0] // batch_size,
           epochs= epochs,
           validation_data= test_generator,
           validation_steps= x_test.shape[0] // batch_size)
model.summary()

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20

In [None]:
print('Model finished training!!')

In [None]:
import seaborn as sns
# labels = ['bacteria','normal','virus']
#confusion matrix
y_pred = model.predict(x_test)
#transforming label back to original
y_pred = encoder.inverse_transform(y_pred).squeeze()
#matrix of Actual vs Prediction data
# c_matrix = confusion_matrix(y_test, y_pred)
# plt.figure(figsize=(8,8))
# plt.title('Confusion matrix',fontsize=14)
# sns.heatmap(
#   c_matrix, xticklabels=labels,yticklabels=labels,
#   fmt='d', annot=True,annot_kws={"size": 14}, cmap='Reds')
# plt.xlabel("Predicted",fontsize=12)
# plt.ylabel("Actual",fontsize=12)
# plt.show()



In [None]:
(y_pred.squeeze() == y_test.squeeze()).sum() / len(y_test) # Test Accuracy

0.8711340206185567

In [None]:
# len(y_pred), len(y_test), np.array(y_pred.squeeze() == y_test.squeeze(), dtype=np.int64).sum()
# y_test.squeeze()
# y_pred_np = y_pred.squeeze()
y_pred = y_pred.squeeze()
y_test = y_test.squeeze()

# Make sure both input arrays have the same length
if len(y_pred) != len(y_test):
    raise ValueError("Input arrays must have the same length")

# True Positives (TP), True Negatives (TN), False Positives (FP), and False Negatives (FN)
TP = np.sum(np.logical_and(y_pred == 'normal', y_test == 'normal'))
TN = np.sum(np.logical_and(y_pred == 'pneumonia', y_test == 'pneumonia'))
FP = np.sum(np.logical_and(y_pred == 'normal', y_test == 'pneumonia'))
FN = np.sum(np.logical_and(y_pred == 'pneumonia', y_test == 'normal'))
print(TP, TN, FP, FN)

# Accuracy
accuracy = (TP + TN) / (TP + TN + FP + FN)

# Precision
precision = TP / (TP + FP) if TP + FP > 0 else 0

# Recall (Sensitivity or True Positive Rate)
recall = TP / (TP + FN) if TP + FN > 0 else 0

# F1 Score
f1_score = 2 * (precision * recall) / (precision + recall) if precision + recall > 0 else 0

print({
    "Accuracy": accuracy,
    "Precision": precision,
    "Recall": recall,
    "F1 Score": f1_score
})

144 363 64 11
{'Accuracy': 0.8711340206185567, 'Precision': 0.6923076923076923, 'Recall': 0.9290322580645162, 'F1 Score': 0.7933884297520661}


In [None]:
import pandas as pd
pd.Series(y_test).value_counts()

pneumonia    427
normal       155
dtype: int64

In [None]:
import datetime
model.save('xraymodel_tertiary_'+str(datetime.datetime.now().strftime("%Y%m%d_%H%M%S")))



INFO:tensorflow:Assets written to: xraymodel_tertiary_20231016_123547\assets


INFO:tensorflow:Assets written to: xraymodel_tertiary_20231016_123547\assets


In [None]:
def plot_confusion(n, generator, title, batch_size):
    labels = []
    for i in range(0, n//batch_size):
        # print(i)
        labels.extend(generator[i][1])
    labels = np.array(labels)
    print(len(labels))
    # preds = model.predict_classes(generator)
    preds = (model.predict(generator) > 0.5).astype("int32")[:,0]
    # print(preds, n)
    preds = np.reshape(preds, n)

    cm  = confusion_matrix(labels, preds[:len(labels)])
    plt.figure()
    plot_confusion_matrix(cm,figsize=(12,8), hide_ticks=False,cmap=plt.cm.Blues)
    plt.xticks(range(2), ['Normal', 'Pneumonia'], fontsize=16)
    plt.yticks(range(2), ['Normal', 'Pneumonia'], fontsize=16)
    plt.title(title, fontsize=20)
    return plt.show()

plot_confusion(len(x_test), test_generator, 'Test set', batch_size)
# plot_confusion(n_train, train_generator, 'Training set', 16)
# plot_confusion(n_validation, validation_generator, 'Validation set', 16)

# 0.973958 accuracy for test

576


ValueError: Classification metrics can't handle a mix of multilabel-indicator and binary targets

In [None]:
len(x_train), len(y_train)

(4696, 4696)