In [None]:
from keras.losses import MSE
from keras.layers import Dense, Dropout, GlobalAveragePooling2D
from keras.layers import Conv2D, BatchNormalization, MaxPooling2D, SeparableConv2D
from keras.layers import Input, Add, Flatten, ReLU, Concatenate,ELU
from keras.models import Model
from keras.optimizers import RMSprop
from tqdm import tqdm
import numpy as np
import os
import cv2
from skimage.restoration import denoise_nl_means, estimate_sigma

In [None]:
import tensorflow as tf
import logging

# Set TensorFlow logging level to suppress warnings
#tf.get_logger().setLevel(logging.ERROR)
import warnings
warnings.simplefilter("ignore")

In [None]:
from scipy.ndimage import zoom
def resize_with_ratio(image, dims, interpolationFlag):
    height, width = image.shape
    scale_factor = min(dims[0] / width, dims[1] / height)

    target_original_height = int(height * scale_factor)
    target_original_width = int(width * scale_factor)

    if interpolationFlag == "Spline":
        downscaled_image = zoom(image, scale_factor, order = 3)
    else:
        downscaled_image = cv2.resize(image, (target_original_width, target_original_height),
                                      interpolation=interpolationFlag)

    pad_top = (dims[1] - target_original_height) // 2
    pad_bottom = dims[1] - target_original_height - pad_top
    pad_left = (dims[0] - target_original_width) // 2
    pad_right = dims[0] - target_original_width - pad_left

    padded_image = cv2.copyMakeBorder(downscaled_image, pad_top, pad_bottom, pad_left, pad_right, cv2.BORDER_CONSTANT)

    return padded_image

In [None]:
#clahe = cv2.createCLAHE(clipLimit = 2.5, tileGridSize = (4,4))

# Step 1: Load and preprocess the data
def load_data(directory, image_size=(250, 250)):
    images = []
    labels = []
    for label, category in enumerate(["NORMAL", "PNEUMONIA"]):
        category_path = os.path.join(directory, category)
        for file_name in tqdm(os.listdir(category_path)):
            image_path = os.path.join(category_path, file_name)
            image = cv2.imread(image_path)
            image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
            image = resize_with_ratio(image, (600, 600), cv2.INTER_AREA)
            #image = image / 255.0
            #sigma = estimate_sigma(image, multichannel=False, average_sigmas=True)
            #image = denoise_nl_means(image, patch_size=4,
            #                         patch_distance=2,
            #                         h=0.2 * sigma, fast_mode=True)
            #image = (image*255).astype("uint8")
            #image = clahe.apply(image)
            #image = cv2.resize(image, image_size, interpolation = cv2.INTER_AREA)
            #image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
            image = image / 255.0
            image = np.expand_dims(image, axis=-1)
            images.append(image)
            if category == "PNEUMONIA":
                labels.append(1)  # Assign label 1 for PNEUMONIA images
            else:
                labels.append(0)  # Assign label 0 for NORMAL images
    return np.array(images), np.array(labels)

In [None]:
# Step 2: Define the CNN architecture
def residual_block(layer, k_, in_, out_, strides = (1,1), useShortcut = False):

    shortcut = layer

    layer = SeparableConv2D(in_, kernel_size=k_, strides=(1,1), padding="same")(layer)
    layer = BatchNormalization()(layer)
    layer = ELU()(layer)

    layer = SeparableConv2D(in_, kernel_size=k_, strides=strides, padding="same")(layer)
    layer = BatchNormalization()(layer)
    layer = ELU()(layer)

    layer = SeparableConv2D(out_, kernel_size = k_, strides=(1,1), padding = "same")(layer)
    layer = BatchNormalization()(layer)

    if strides != (1,1) or useShortcut:
        shortcut = SeparableConv2D(out_, kernel_size = k_, strides=strides, padding = "same")(shortcut)
        shortcut = BatchNormalization()(shortcut)

    layer = Add()([shortcut, layer])
    layer = ReLU()(layer)

    return layer

def cnn_branch(input_, k_ = (4,4)):

    layer = SeparableConv2D(16, kernel_size = k_, strides = (1,1), padding = "same")(input_)
    layer = BatchNormalization()(layer)
    layer = ReLU()(layer)

    layer = residual_block(layer, k_, 16, 32, useShortcut = True)
    layer = MaxPooling2D()(layer)

    layer = residual_block(layer, k_, 32, 64, useShortcut = True)
    layer = MaxPooling2D()(layer)

    layer = residual_block(layer, k_, 64, 128, useShortcut = True)
    layer = MaxPooling2D()(layer)

    layer = residual_block(layer, k_, 128, 256, useShortcut = True)
    layer = MaxPooling2D()(layer)

    layer = residual_block(layer, k_, 256, 384, useShortcut = True)
    layer = GlobalAveragePooling2D()(layer)

    return layer


def create_cnn_model(size = (75, 75), k_=(5,5)):
    input_ = Input((75,75,1))

    branch_1 = cnn_branch(input_, k_)
    branch_2 = cnn_branch(input_, k_)

    layer = Concatenate()([branch_1, branch_2])

    layer = Dense(512, activation = "relu")(layer)
    layer = Dropout(0.5)(layer)
    layer = Dense(256, activation = "relu")(layer)
    layer = Dropout(0.5)(layer)
    layer = Dense(128, activation = "relu")(layer)
    layer = Dropout(0.5)(layer)
    layer = Dense(64, activation = "relu")(layer)
    layer = Dropout(0.5)(layer)
    layer = Dense(2, activation = "softmax")(layer)

    model = Model(inputs = input_, outputs = layer)
    return model

In [None]:
from keras.callbacks import Callback, ModelCheckpoint, ReduceLROnPlateau
from sklearn.model_selection import StratifiedShuffleSplit
from keras.preprocessing.image import ImageDataGenerator


class MaxAccuracy(Callback):
    def __init__(self):
        super(MaxAccuracy, self).__init__()
        self.max_train_accuracy = 0.0
        self.max_val_accuracy = 0.0

    def on_epoch_end(self, epoch, logs=None):
        train_accuracy = logs['accuracy']
        val_accuracy = logs['val_accuracy']
        if train_accuracy > self.max_train_accuracy:
            self.max_train_accuracy = train_accuracy
        if val_accuracy > self.max_val_accuracy:
            self.max_val_accuracy = val_accuracy
        print(
            f" Max Train Accuracy: {self.max_train_accuracy:.4f}, Max Validation Accuracy: {self.max_val_accuracy:.4f}")

def train_model(model, model_loc, x, y, n_splits=1, test_size=0.25, random_state=47, n_epochs=30,
                min_learning_rate=0.0000001, lr_decay_factor=0.8):
    splitter = StratifiedShuffleSplit(n_splits=n_splits, test_size=test_size, random_state=random_state)

    aug_set_id = 0
    histories = []
    max_accuracy = MaxAccuracy()
    for train_ids, test_ids in splitter.split(x, y):
        checkpoint = ModelCheckpoint("model_" + str(model_loc) + "_" + str(aug_set_id) + ".h5", monitor="val_accuracy",
                                     save_best_only=True, model="max")
        aug_set_id = aug_set_id + 1

        x_train, x_test, y_train, y_test = x[train_ids], x[test_ids], y[train_ids], y[test_ids]

        histories.append(model.fit(x_train, y_train, batch_size=32,
                                   epochs=n_epochs, validation_data=(x_test, y_test),
                                   callbacks=[ReduceLROnPlateau(monitor="val_accuracy", factor=lr_decay_factor,
                                                                patience=2, min_lr=min_learning_rate), checkpoint, max_accuracy]))

    return histories

In [None]:
images, labels = load_data(".", (600, 600))

In [None]:
from PIL import Image
#images[2099].reshape(600,600)
im_to_save = (images[2099]*255).reshape(600,600).astype(np.uint8)
img = Image.fromarray(im_to_save)
img.save("file.png")

In [None]:
from keras.utils import to_categorical
labels = to_categorical(labels)

In [None]:
from keras.backend import clear_session
from keras.utils import plot_model
clear_session()
n_splits = 8
res = 600
factor = int(res/n_splits)
n_model = 0
histories = {}
for i in range(n_splits):
    for j in range(n_splits):
        model = create_cnn_model()
        plot_model(model, to_file='model_plot.png', show_shapes=True, show_layer_names=True)
        model.compile(optimizer=RMSprop(), metrics=["accuracy"], loss="categorical_crossentropy")
        histories[n_model] = train_model(model, n_model, images[:, i*factor:min((i+1)*factor, res), j*factor:min((j+1)*factor, res), :], labels)
        n_model = n_model+1
        break

In [None]:
import pickle
with open('base_histories.pkl', 'wb') as f:
    pickle.dump(histories, f)
#with open('!!saved_scores\histories.pkl', 'rb') as f:
#    histories_saved = pickle.load(f)

In [None]:
with open('base_histories.pkl', 'rb') as f:
    base_histories_saved = pickle.load(f)

In [None]:
with open('!!saved_scores\histories.pkl', 'rb') as f:
    histories_saved = pickle.load(f)

In [None]:
import matplotlib.pyplot as plt
plt.plot(histories[0][0].history['loss'], color="orange", label="PatchCNN w/o residual learning")
plt.plot(histories_saved[0][0].history['loss'], color="blue", label="PatchCNN w residual learning")
plt.legend()
plt.grid(True)
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.gcf().set_dpi(300)
plt.savefig('plot.png', bbox_inches='tight', pad_inches=0, dpi=300, facecolor='white')
plt.show()

In [None]:
histories_saved[0][0].history['loss']

In [None]:
model_accs = {}
model = create_cnn_model()
model.compile(optimizer=RMSprop(), metrics=["accuracy"], loss="binary_crossentropy")
for n_model in range(64):
    n_splits = 8
    res = 600
    factor = int(res/n_splits)
    best_split = 0
    for n_split in range(1):
        model.load_weights("model_"+str(n_model)+"_"+str(n_split)+".h5")
        eval = model.evaluate(images[:, int(n_model/n_splits)*factor:min((int(n_model/n_splits)+1)*factor, res), int(n_model%n_splits)*factor:min((int(n_model%n_splits)+1)*factor, res), :], labels)
        best_split = eval[1] if best_split < eval[1] else best_split
        model_accs[n_model] = best_split
    print("Region " + str(n_model) + " : " + str(best_split))

In [None]:
models={}
from keras.models import load_model
for n_model in model_accs:
    if model_accs[n_model] > 0.97:
        model = load_model("model_"+str(n_model)+"_0.h5")
        models[n_model]=model

In [None]:
import json

with open("best_model.json", "w") as file:
    json.dump(list(models.keys()), file)

In [None]:
import json

with open("best_model.json", "r") as file:
    model_list = json.load(file)

In [None]:
from keras.layers import Lambda
from keras.models import load_model
tf.config.run_functions_eagerly(True)

sub_models = []
sub_model_outputs = []
n_splits = 8
res = 600
factor = int(res/n_splits)
main_input = Input((600, 600, 1))
def extract_patch(input_image, n_model, size=factor):
    return input_image[:, int(n_model/n_splits)*factor:min((int(n_model/n_splits)+1)*factor, res), int(n_model%n_splits)*factor:min((int(n_model%n_splits)+1)*factor, res), :]

for n_model in model_list:
    model = load_model("!!saved_scores\model_"+str(n_model)+"_0.h5")
    for layer in model.layers[:-1]:
        layer.trainable = False
    new_output_layer = Dense(1, activation="sigmoid")(model.layers[-2].output)
    sub_model = Model(inputs = model.input, outputs = new_output_layer)
    start_row, start_col = (n_model // n_splits) * 60, (n_model % n_splits) * 60  # Adjust based on your actual layout
    patch = Lambda(extract_patch, arguments={'n_model': n_model})(main_input)

    sub_models.append(sub_model)
    sub_model_outputs.append(sub_model(patch))
concatenated_outputs = Concatenate()(sub_model_outputs)
dense_layer = Dense(64, activation='relu')(concatenated_outputs)
final_output = Dense(2, activation='softmax')(dense_layer)
ensemble_model = Model(inputs=main_input, outputs=final_output)
ensemble_model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
ensemble_model.summary()

In [None]:
histories = train_model(ensemble_model, "universal", images, labels)
#ensemble_model.load_weights("model_universal_0.h5")

In [None]:
from sklearn.metrics import roc_auc_score, confusion_matrix
from sklearn.metrics import roc_curve, auc
predictions = ensemble_model.predict(images)
preds = np.argmax(predictions, axis=1)
pred_probs = predictions[:,1]

In [None]:
import matplotlib.pyplot as plt
fpr, tpr, _ = roc_curve(labels, preds)
roc_auc = auc(fpr, tpr)
plt.figure()
plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc="lower right")
plt.grid(True)
plt.savefig('roc.png', bbox_inches='tight', pad_inches=0, dpi=300, facecolor='white')
plt.show()

In [None]:
from sklearn.metrics import precision_recall_curve, auc
precision, recall, _ = precision_recall_curve(labels, pred_probs)
plt.figure()
plt.plot(recall, precision, color='darkorange', lw=2)
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve')
plt.grid(True)
plt.savefig('prerec.png', bbox_inches='tight', pad_inches=0, dpi=300, facecolor='white')
plt.show()

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns
conf_matrix = confusion_matrix(labels, preds)
plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', cbar=False)
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.title('Confusion Matrix')
plt.savefig('confCXR.png', bbox_inches='tight', pad_inches=0, dpi=300, facecolor='white')
plt.show()

In [None]:
import pickle
with open('histories_ensemble.pkl', 'wb') as f:
    pickle.dump(histories, f)

In [None]:
#Grad-CAM for patch CNNs
#Needs modification after multi branch approach
import keras
import matplotlib.pyplot as plt
n_model = 8
n_splits = 8
res = 600
factor = int(res/n_splits)
bestModel = keras.models.load_model("!!saved_scores\model_8_0.h5")
image = images[400]
image = image[int(n_model/n_splits)*factor:min((int(n_model/n_splits)+1)*factor, res), int(n_model%n_splits)*factor:min((int(n_model%n_splits)+1)*factor, res)]
target_layers = []
sub_models = []
target_layers.append(bestModel.get_layer(index=-23))
target_layers.append(bestModel.get_layer(index=-24))
sub_models.append(tf.keras.models.Model(bestModel.inputs, [target_layers[0].output, bestModel.output]))
sub_models.append(tf.keras.models.Model(bestModel.inputs, [target_layers[1].output, bestModel.output]))

target_class = []
preds = []

with tf.GradientTape() as tape:
    last_conv_out_1, preds_1 = sub_models[0](image)
    last_conv_out_2, preds_2 = sub_models[1](image)
    tape.watch(last_conv_out_1)
    tape.watch(last_conv_out_2)
    target_class.append(preds_1[0][0])
    target_class.append(preds_2[0][0])

grads=[]
pooled_grads = []
grads.append(tape.gradient(target_class[0], last_conv_out_1))
grads.append(tape.gradient(target_class[1], last_conv_out_2))
pooled_grads.append(tf.reduce_mean(grads[0], axis=(0, 1, 2)))
pooled_grads.append(tf.reduce_mean(grads[1], axis=(0, 1, 2)))

last_conv_out_1 = last_conv_out_1[0]
last_conv_out_2 = last_conv_out_2[0]

heatmap = last_conv_out_1 @ pooled_grads[0][..., tf.newaxis]
heatmap = tf.squeeze(heatmap)
heatmap = tf.maximum(heatmap, 0) / tf.math.reduce_max(heatmap)

plt.imshow(heatmap)