<a href="https://colab.research.google.com/github/zehanzz/Pneumonia_Detection/blob/main/Pneumonia_Detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install -q kaggle
from google.colab import files
files.upload()
!mkdir ~/.kaggle/
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json
!kaggle datasets download -d paultimothymooney/chest-xray-pneumonia

In [None]:
!unzip chest-xray-pneumonia.zip

In [None]:
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
import seaborn as sns
import keras
from keras.models import Sequential
from keras.layers import Dense, Conv2D , MaxPool2D , Flatten , Dropout , BatchNormalization
from keras.preprocessing.image import ImageDataGenerator
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report,confusion_matrix
from keras.callbacks import ReduceLROnPlateau
import cv2

In [None]:
labels = ['PNEUMONIA', 'NORMAL']
img_size = 150
def get_training_data(data_dir):
    data = []
    for label in labels:
        path = os.path.join(data_dir, label)
        class_num = labels.index(label)
        for img in os.listdir(path):
            try:
                img_arr = cv2.imread(os.path.join(path, img), cv2.IMREAD_GRAYSCALE)
                resized_arr = cv2.resize(img_arr, (img_size, img_size)) # Reshaping images to preferred size
                data.append([resized_arr, class_num])
            except Exception as e:
                print(e)
    return np.array(data)

In [None]:
!pwd

In [None]:
train = get_training_data('/content/chest_xray/chest_xray/train')
test = get_training_data('/content/chest_xray/chest_xray/test')
val = get_training_data('/content/chest_xray/chest_xray/val')

In [None]:
x_train = []
y_train = []

x_val = []
y_val = []

x_test = []
y_test = []

for feature, label in train:
    x_train.append(feature)
    y_train.append(label)

for feature, label in test:
    x_test.append(feature)
    y_test.append(label)

for feature, label in val:
    x_val.append(feature)
    y_val.append(label)

In [None]:
# Normalize the data
x_train = np.array(x_train) / 255
x_val = np.array(x_val) / 255
x_test = np.array(x_test) / 255

In [None]:
# resize data for deep learning
x_train = x_train.reshape(-1, img_size, img_size, 1)
y_train = np.array(y_train)

x_val = x_val.reshape(-1, img_size, img_size, 1)
y_val = np.array(y_val)

x_test = x_test.reshape(-1, img_size, img_size, 1)
y_test = np.array(y_test)

In [None]:
# With data augmentation to prevent overfitting and handling the imbalance in dataset

datagen = ImageDataGenerator(
        featurewise_center=False,  # set input mean to 0 over the dataset
        samplewise_center=False,  # set each sample mean to 0
        featurewise_std_normalization=False,  # divide inputs by std of the dataset
        samplewise_std_normalization=False,  # divide each input by its std
        zca_whitening=False,  # apply ZCA whitening
        rotation_range = 30,  # randomly rotate images in the range (degrees, 0 to 180)
        zoom_range = 0.2, # Randomly zoom image
        width_shift_range=0.1,  # randomly shift images horizontally (fraction of total width)
        height_shift_range=0.1,  # randomly shift images vertically (fraction of total height)
        horizontal_flip = True,  # randomly flip images
        vertical_flip=False)  # randomly flip images


datagen.fit(x_train)

In [None]:
model = Sequential()
model.add(Conv2D(32, (3, 3), strides=1, padding='same', activation='relu', input_shape=(150, 150, 1), name='conv2d_1'))
model.add(BatchNormalization(name='batch_norm_1'))
model.add(MaxPool2D((2, 2), strides=2, padding='same', name='maxpool_1'))
model.add(Conv2D(64, (3, 3), strides=1, padding='same', activation='relu', name='conv2d_2'))
model.add(Dropout(0.1, name='dropout_1'))
model.add(BatchNormalization(name='batch_norm_2'))
model.add(MaxPool2D((2, 2), strides=2, padding='same', name='maxpool_2'))
model.add(Conv2D(64, (3, 3), strides=1, padding='same', activation='relu', name='conv2d_3'))
model.add(BatchNormalization(name='batch_norm_3'))
model.add(MaxPool2D((2, 2), strides=2, padding='same', name='maxpool_3'))
model.add(Conv2D(128, (3, 3), strides=1, padding='same', activation='relu', name='conv2d_4'))
model.add(Dropout(0.2, name='dropout_2'))
model.add(BatchNormalization(name='batch_norm_4'))
model.add(MaxPool2D((2, 2), strides=2, padding='same', name='maxpool_4'))
model.add(Conv2D(256, (3, 3), strides=1, padding='same', activation='relu', name='conv2d_5'))
model.add(Dropout(0.2, name='dropout_3'))
model.add(BatchNormalization(name='batch_norm_5'))
model.add(MaxPool2D((2, 2), strides=2, padding='same', name='maxpool_5'))
model.add(Flatten())
model.add(Dense(units=128, activation='relu', name='dense_1'))
model.add(Dropout(0.2, name='dropout_4'))
model.add(Dense(units=1, activation='sigmoid', name='dense_2'))
model.compile(optimizer="rmsprop", loss='binary_crossentropy', metrics=['accuracy'])
model.summary()

In [None]:
!pip install tqdm

In [None]:
def apply_selective_feature_dropping(model, images, labels, drop_ratio=0.2, keep_ratio=0.2):
    with tf.GradientTape() as tape:

        tape.watch(images)

        outputs = model(images, training=True)

        labels = tf.reshape(labels, (-1, 1))
        loss = tf.keras.losses.binary_crossentropy(labels, outputs)

    grads = tape.gradient(loss, images)

    norm_grads = tf.norm(grads, axis=-1)

    norm_grads_flat = tf.reshape(norm_grads, [tf.shape(norm_grads)[0], -1])

    total_features = np.prod(images.shape[1:4])

    keep_features = int(keep_ratio * total_features)
    drop_features = int(drop_ratio * total_features)

    sorted_values, sorted_indices = tf.math.top_k(norm_grads_flat, k=total_features)

    drop_indices = sorted_indices[:, keep_features:keep_features+drop_features]

    mask = tf.ones_like(norm_grads_flat)

    batch_indices = tf.reshape(
        tf.repeat(tf.range(tf.shape(norm_grads_flat)[0]), tf.shape(drop_indices)[1]), (-1, 1)
    )
    scatter_indices = tf.concat([batch_indices, tf.reshape(drop_indices, (-1, 1))], axis=1)

    mask = tf.tensor_scatter_nd_update(mask, scatter_indices, tf.zeros(tf.shape(scatter_indices)[0]))

    mask = tf.reshape(mask, tf.shape(images))

    masked_images = tf.multiply(images, mask)

    return masked_images

In [None]:
import tensorflow as tf
import numpy as np
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import ReduceLROnPlateau
from tqdm.notebook import tqdm


learning_rate_reduction = ReduceLROnPlateau(monitor='val_accuracy', patience=2, verbose=1, factor=0.3, min_lr=0.000001)

model.compile(optimizer='adam',
              loss='binary_crossentropy',
              metrics=['accuracy'])

learning_rate_reduction.set_model(model)

feature_ratio = 0.065

def apply_feature_dropping(model, images, labels):
    with tf.GradientTape() as tape:

        tape.watch(images)

        outputs = model(images, training=True)

        labels = tf.reshape(labels, (-1, 1))
        loss = tf.keras.losses.binary_crossentropy(labels, outputs)

    grads = tape.gradient(loss, images)

    norm_grads = tf.norm(grads, axis=-1)

    norm_grads_flat = tf.reshape(norm_grads, [tf.shape(norm_grads)[0], -1])

    neg_norm_grads_flat = -norm_grads_flat

    _, idx_mask = tf.math.top_k(neg_norm_grads_flat, k=int(feature_ratio * np.prod(images.shape[1:4])), sorted=False)

    mask = tf.ones_like(norm_grads_flat)

    batch_indices = tf.reshape(
        tf.repeat(tf.range(tf.shape(norm_grads_flat)[0]), tf.shape(idx_mask)[1]), (-1, 1)
    )
    scatter_indices = tf.concat([batch_indices, tf.reshape(idx_mask, (-1, 1))], axis=1)

    mask = tf.tensor_scatter_nd_update(mask, scatter_indices, tf.zeros(tf.shape(scatter_indices)[0]))

    mask = tf.reshape(mask, tf.shape(images))

    masked_images = tf.multiply(images, mask)

    return masked_images


epochs = 12
num_batches = len(x_train) // 32  # assuming batch_size of 32

for epoch in range(epochs):
    print(f"Starting Epoch {epoch + 1}/{epochs}")

    train_losses = []
    train_accuracies = []

    pbar = tqdm(total=num_batches, ncols=70)

    for i, (batch_x, batch_y) in enumerate(datagen.flow(x_train, y_train, batch_size=32)):
        batch_x = tf.convert_to_tensor(batch_x, dtype=tf.float32)

        batch_y = tf.convert_to_tensor(batch_y, dtype=tf.float32)

        masked_images = apply_feature_dropping(model, batch_x, batch_y)

        loss, acc = model.train_on_batch(masked_images, batch_y)

        train_losses.append(loss)
        train_accuracies.append(acc)

        pbar.set_description(f'Epoch {epoch + 1}/{epochs}, Train Accuracy: {np.mean(train_accuracies):.4f}')
        pbar.update()

        if i + 1 == num_batches:
            break

    pbar.close()

    avg_train_loss = np.mean(train_losses)
    avg_train_acc = np.mean(train_accuracies)

    val_loss, val_acc = model.evaluate(datagen.flow(x_val, y_val), verbose=0)

    learning_rate_reduction.on_epoch_end(epoch, {"val_accuracy": val_acc})

    print(f"Epoch {epoch + 1}/{epochs}, Train Accuracy: {avg_train_acc:.4f}, Validation Accuracy: {val_acc:.4f}")

print("Training completed!")


In [None]:
from tensorflow.keras.callbacks import ReduceLROnPlateau
import tensorflow as tf

# Define the learning rate reduction callback
learning_rate_reduction = ReduceLROnPlateau(monitor='val_accuracy', patience=2, verbose=1, factor=0.3, min_lr=0.000001)


In [None]:
from tensorflow.keras.callbacks import EarlyStopping

early_stopping = EarlyStopping(monitor='val_loss', patience=10, verbose=1, restore_best_weights=True)

history = model.fit(x=train_masks,
                     y=y_train,
                     batch_size=32,
                     epochs=20,
                     validation_data=(x_val, y_val),
                     callbacks=[learning_rate_reduction, early_stopping])

print("Training completed!")

In [None]:
history = model.fit(datagen.flow(x_train, y_train, batch_size=32) ,epochs = 12 , validation_data = datagen.flow(x_val, y_val) ,callbacks = [learning_rate_reduction])

In [None]:
print("Loss of the model is - " , model.evaluate(x_test,y_test)[0])
print("Accuracy of the model is - " , model.evaluate(x_test,y_test)[1]*100 , "%")

In [None]:
model.save('my_model.h5')