### Import Libraries

In [1]:
import numpy as np
import os
from random import shuffle
import random
from tqdm import tqdm
import pickle
import glob

# OpenCV
# import cv2

# Pandas
import pandas as pd

# Tensorflow
import tensorflow as tf
from tensorflow.keras.applications.densenet import DenseNet169
from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.applications.resnet import ResNet101
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Flatten

from sklearn.model_selection import train_test_split

### Read image file paths and store to a list both for covid and non-covid

In [2]:
dataset_path = "../../dataset/xray/resized/"

# all_files = []

# covid_files = glob.glob(os.path.join(dataset_path, "COVID", "*.png"))

# non_covid_files = glob.glob(os.path.join(dataset_path, "Normal", "*.png"))
# non_covid_files = non_covid_files[:3616]

# # non_covid_files.extend(glob.glob(os.path.join(dataset_path, "Viral Pneumonia", "*.png")))


# # Get all the files from the directory in a two element list. First element is list of file location to covid images and second element is list of file location to non-covid images.
# all_files = [covid_files, non_covid_files]
# print("Image Files Count\nCovid: {}\nNon-Covid: {}\nTotal: {}".format(len(covid_files), len(non_covid_files), len(all_files[0] + all_files[1])))

### Function to read image from file list and store the corresponding label

In [3]:
# def get_dataset(files, label, count, img_size):
#   dataset = []  # List to hold all the dataset. Each element is a dictionary

#   for j in tqdm(files):  # Loop over each file location
#     data_dict = {}
#     data_dict["id"] = count
#     data_dict["filepath"] = j
#     try:
#       img = cv2.imread(j)
#       img = cv2.resize(img, (img_size, img_size))
#       data_dict["image"] = img
#       data_dict["label"] = label
#       count += 1
#       dataset.append(data_dict)
#     except Exception as e:
#       print("faulty image: {} {}".format(j, e))
#   return dataset, count

### Read image from the file and store the corresponding label in a list

In [4]:
# c_dataset, nc_dataset, t_dataset = [], [], []
# labels = [1, 0] # 1 = Covid, 0 = Noncovid
# count = 0       # Count to record the ids of files. Each file has a unique ID.
# img_size = 224
# # all_files => [NC, C]
# for i, data in enumerate(all_files): # only two loops for Covid and Non-Covid
#   dataset, count = get_dataset(data, labels[i], count, img_size)
#   if labels[i] == 1:
#     c_dataset = dataset
#   else:
#     nc_dataset = dataset
# tot_dataset = c_dataset + nc_dataset
# print("Dataset Count\nCovid: {}\nNon-Covid: {}\nTotal: {}".format(len(c_dataset), len(nc_dataset), len(tot_dataset)))

### New Data set

In [5]:
# c_dataset, nc_dataset, t_dataset = [], [], []
# labels = [1, 0] # 1 = Covid, 0 = Noncovid
# count = 0       # Count to record the ids of files. Each file has a unique ID.
# img_size = 224
# # all_files => [NC, C]
# for i, data in enumerate(all_files): # only two loops for Covid and Non-Covid
#   dataset, count = get_dataset(data, labels[i], count, img_size)
#   if labels[i] == 1:
#     c_dataset = dataset
#   else:
#     nc_dataset = dataset
# tot_dataset = c_dataset + nc_dataset
# print("Dataset Count\nCovid: {}\nNon-Covid: {}\nTotal: {}".format(len(c_dataset), len(nc_dataset), len(tot_dataset)))

# # save dataset in pickle file
# with open('./pickle_files/x_ray/x_ray_dataset_new.pickle', 'wb') as handle:
#   pickle.dump(tot_dataset, handle, protocol=pickle.HIGHEST_PROTOCOL)

### Clip the extra data to make both class somehow balanced

In [6]:
# print("Initial Total Dataset: {}".format(len(tot_dataset)))
# c_2125 = tot_dataset[:2125] # first 2125 data is Covid data
# nc_2275 = tot_dataset[2125:4400] # next 2275 data is Non-Covid data
# t_dataset = c_2125 + nc_2275

# # confirm the correct dataset labels
# print(len(c_2125), c_2125[0]["label"], c_2125[2124]["label"])
# print(len(nc_2275), nc_2275[0]["label"], nc_2275[2274]["label"])
# print("After cliping, Total Dataset: {}".format(len(t_dataset)))

# # save dataset in pickle file
# with open('../pickle_files/x_ray/x_ray_dataset.pickle', 'wb') as handle:
#   pickle.dump(t_dataset, handle, protocol=pickle.HIGHEST_PROTOCOL)

In [7]:
# # dataset_file = '../AL_Refined/pickle_files/x_ray/x_ray_dataset.pickle'
# dataset_file = './pickle_files/x_ray/x_ray_dataset_new.pickle'
# with open(dataset_file, 'rb') as handle:
#   x_ray_dataset = pickle.load(handle)

# # suffle the data
# # random.seed(42)
# # shuffle(x_ray_dataset)

# print(len(x_ray_dataset), x_ray_dataset[0]['label'])

### Extract image only from the dataset to send to DNN

In [8]:
# x = []
# y = []
# for data in x_ray_dataset:
#   x.append(data["image"])
#   y.append(data["label"])
# x = np.array(x)
# print(len(x), len(y))

In [9]:
# x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3, random_state=42)
# print(len(x_train), len(x_test))
# # print(y_test)

### Define batch size

In [10]:
IMAGE_SIZE = (224, 224)
BATCH_SIZE = 16
EPOCHS = 20
BASE_MODELS = ["vgg16net", "resnet101", "densenet169"]
seed_no = random.randint(1,100)

### Generate batches of images to feed into DNN

In [11]:
train_datagen = ImageDataGenerator(
  rescale=1./255,
  # rotation_range=20,
  # width_shift_range=0.2,
  # height_shift_range=0.2,
  shear_range=0.2,
  zoom_range=0.2,
  # horizontal_flip=True,
  validation_split=0.2,
  fill_mode='nearest'
)

In [12]:
test_datagen = ImageDataGenerator(rescale=1./255)

In [13]:
# train_generator = train_datagen.flow(
#   x_train,
#   y_train,
#   batch_size=BATCH_SIZE)

train_generator = train_datagen.flow_from_directory(
  dataset_path,
  # color_mode="grayscale",
  # subset="training",
  # seed=seed_no,
  class_mode='binary',
  target_size=IMAGE_SIZE,
  batch_size=BATCH_SIZE)

Found 5182 images belonging to 3 classes.


In [14]:
# test_generator = test_datagen.flow(
#   x_test,
#   y_test,
#   batch_size=BATCH_SIZE)

test_generator = train_datagen.flow_from_directory(
  dataset_path,
  # color_mode="grayscale",
  subset="validation",
  # seed=seed_no,
  class_mode='categorical',
  target_size=IMAGE_SIZE,
  batch_size=BATCH_SIZE)

Found 1035 images belonging to 3 classes.


In [15]:
def get_model(img_size, model_sel):
  if model_sel == 1:
    vgg_pre_t = VGG16(input_shape=(img_size, img_size, 3), include_top=False, weights ='imagenet')
    return vgg_pre_t, 25088

  elif model_sel == 2:
    resnet_pre_t = ResNet101(input_shape=(img_size, img_size, 3), include_top=False, weights='imagenet')
    return resnet_pre_t, 100352

  elif model_sel == 3:
    densenet169_pre_t = DenseNet169(input_shape=(img_size, img_size, 3), include_top=False, weights ='imagenet' )
    return densenet169_pre_t, 81536

In [16]:
select_model = 1 # int(input("Enter the number for: \n 1) VGGNET16 \n 2) Resnet101  \n 3) Densenet169 "))

In [17]:
# Load pre-trained selected model without classification layers
base_model, feature_size = get_model(IMAGE_SIZE[0], select_model)

In [18]:
# Add custom classification layers on top of ResNet50
x = base_model.output

x = GlobalAveragePooling2D()(x)

x = Dense(1024, activation='relu')(x)

predictions = Dense(1, activation='sigmoid')(x)
print(x.shape, predictions.shape)

(None, 1024) (None, 1)


In [19]:
# Combine base model and custom layers into a new model
model = Model(inputs=base_model.input, outputs=predictions)

In [20]:
# Freeze the layers of the base model
for layer in base_model.layers:
  layer.trainable = False

In [23]:
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

In [24]:
# Train the model
history = model.fit(train_generator,
          steps_per_epoch=len(train_generator),
          epochs=EPOCHS,
          validation_data=test_generator,
          validation_steps=len(test_generator))

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20

KeyboardInterrupt: 

In [None]:
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Training and Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
# plt.ylim(0.5, 1)
plt.show()

In [46]:
# Evaluate the model on test set
test_loss, test_accuracy = model.evaluate(test_generator, steps=len(test_generator))
print("Test Accuracy:", test_accuracy)

Test Accuracy: 0.9079999923706055


In [47]:
model_name = "models/xray" + "_" + BASE_MODELS[select_model-1] + "_" + str(BATCH_SIZE) + "_" + str(EPOCHS) + "_expert_acc_" + str(round(test_accuracy * 100, 0)) + ".h5"
model.save(model_name)

In [132]:
from sklearn.metrics import confusion_matrix, precision_score, f1_score, roc_auc_score
from tensorflow.keras.models import load_model

In [130]:
# Load the saved model
loaded_model = load_model(model_name)

In [144]:
learning_rate = loaded_model.optimizer.learning_rate.numpy()
print(learning_rate)

0.001


In [135]:
# test_generator2 = test_datagen.flow(
#   x_test[:5],
#   y_test[:5],
#   batch_size=BATCH_SIZE)

# # Evaluate the loaded model on the test set
# test_loss, test_accuracy = loaded_model.evaluate(test_generator2, steps=len(test_generator2))
# print("Test Accuracy:", test_accuracy)

# test_loss, test_accuracy = loaded_model.evaluate(test_generator, steps=len(test_generator))

test_data, test_labels = [], []
for _ in range(len(test_generator)):
    batch_data, batch_labels = test_generator.next()
    test_data.append(batch_data)
    test_labels.append(batch_labels)

test_data = np.concatenate(test_data)
test_labels = np.concatenate(test_labels)

# Make predictions on the test set
predictions = loaded_model.predict(test_data)




In [136]:
# Convert predicted probabilities to predicted class labels
predicted_labels = (predictions > 0.5).astype(int)  # Thresholding at 0.5

# Calculate precision
precision = precision_score(test_labels, predicted_labels)

# Calculate F1 score
f1 = f1_score(test_labels, predicted_labels)

# Calculate AUC-ROC
auc_roc = roc_auc_score(test_labels, predictions)

# Calculate confusion matrix
conf_matrix = confusion_matrix(test_labels, predicted_labels)

print("Accuracy:", test_accuracy)
print("Loss:", test_loss)
print("Precision:", precision)
print("F1 Score:", f1)
print("AUC-ROC Score:", auc_roc)
print("Confusion Matrix:")
print(conf_matrix)

Accuracy: 0.9733840227127075
Loss: 0.06917782127857208
Precision: 0.9968203497615262
F1 Score: 0.9728471683475562
AUC-ROC Score: 0.9987695133149679
Confusion Matrix:
[[658   2]
 [ 33 627]]


In [137]:
loaded_model.summary()

Model: "model_4"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_4 (InputLayer)        [(None, 224, 224, 3)]     0         
                                                                 
 block1_conv1 (Conv2D)       (None, 224, 224, 64)      1792      
                                                                 
 block1_conv2 (Conv2D)       (None, 224, 224, 64)      36928     
                                                                 
 block1_pool (MaxPooling2D)  (None, 112, 112, 64)      0         
                                                                 
 block2_conv1 (Conv2D)       (None, 112, 112, 128)     73856     
                                                                 
 block2_conv2 (Conv2D)       (None, 112, 112, 128)     147584    
                                                                 
 block2_pool (MaxPooling2D)  (None, 56, 56, 128)       0   

In [129]:
## Inferencing with the new data
from tensorflow.keras.preprocessing import image

dataset_path = "../dataset/xrays"

non_covid_files = glob.glob(os.path.join(dataset_path, "Noncovid", "pneumonia", "*.jpg"))
covid_files = glob.glob(os.path.join(dataset_path, "Covid", "*.png"))
print(covid_files[0], len(non_covid_files))

new_image = image.load_img(non_covid_files[600], target_size=IMAGE_SIZE)
new_image_array = image.img_to_array(new_image)
new_image_array = np.expand_dims(new_image_array, axis=0)
print(new_image_array.shape)

new_image_array = new_image_array / 255.0  # Normalize pixel values

new_test_img = x_test[0]
new_test_img = np.expand_dims(x_test[0], axis=0)
new_test_img = new_test_img / 255.0

print(new_test_img.shape)

y_pred = loaded_model.predict(new_image_array)

y_real = y_test[0]

print(y_real, y_pred[0])
y_pred = loaded_model.predict(new_test_img)
print(y_real, y_pred[:, 0], y_pred )

../dataset/xrays/Covid/MIDRC-RICORD-1C-SITE2-000120-98054-0.png 774
(1, 224, 224, 3)
(1, 224, 224, 3)
1 [2.5972411e-05]
1 [0.98138994] [[0.98138994]]


In [192]:
# further inference in new dataset

dataset_path = "../COVID-19_Radiography_Dataset/"
# covid_files = glob.glob(os.path.join(dataset_path, "COVID/images", "*.png"))
# covid_files = glob.glob(os.path.join(dataset_path, "Lung_Opacity/images", "*.png"))
# covid_files = glob.glob(os.path.join(dataset_path, "Viral Pneumonia/images", "*.png"))
covid_files = glob.glob(os.path.join(dataset_path, "Normal/images", "*.png"))
print(len(covid_files))
new_data = []
for i in range(50):
  new_image = image.load_img(covid_files[i], target_size=IMAGE_SIZE)
  new_image_array = image.img_to_array(new_image)
  new_image_array = np.expand_dims(new_image_array, axis=0)
  new_image_array = new_image_array / 255.0  # Normalize pixel values
  new_data.append(new_image_array)

new_data = np.concatenate(new_data)
y_pred = loaded_model.predict(new_data)


10192


In [197]:
# print(y_pred)
print(covid_files[48])
print(np.where(y_pred < 0.5))
# np.fromiter((x for x in y_pred if x < 0.5), dtype=y_pred.dtype)

../COVID-19_Radiography_Dataset/Normal/images/Normal-1148.png
(array([ 7,  9, 10, 12, 22, 26, 37, 47, 48]), array([0, 0, 0, 0, 0, 0, 0, 0, 0]))
