# NIH Chest X-ray Dataset Explanation

## Introduction
The **NIH Chest X-ray Dataset** is a large-scale dataset provided by the National Institutes of Health (NIH), containing **112,120 frontal-view chest X-ray images** from **30,805 unique patients**. The dataset was introduced to support the development of deep learning models for automated diagnosis of thoracic diseases.

## Key Features
- **Large-scale dataset**: Contains over 112,000 X-ray images.
- **Multi-label classification**: Includes annotations for **14 different thoracic diseases**, such as pneumonia, edema, emphysema, and fibrosis.
- **Metadata Availability**: Includes patient age, gender, view position, and image index.
- **Open-Source**: Publicly available for research and development in medical AI applications.

## Labels and Diseases
The dataset provides labels for the following 14 thoracic conditions:
- Atelectasis
- Cardiomegaly
- Consolidation
- Edema
- Effusion
- Emphysema
- Fibrosis
- Hernia
- Infiltration
- Mass
- Nodule
- Pleural Thickening
- Pneumonia
- Pneumothorax

```

## Applications
- **Automated disease diagnosis** using deep learning models.
- **Medical AI research** for improving diagnostic accuracy.
- **Explainability and interpretability studies** to analyze model decision-making.

The NIH Chest X-ray Dataset has been widely used in medical imaging research, contributing to advancements in AI-powered diagnostics.

"""


# EfficientNetV2S Model Explanation

## Introduction
EfficientNetV2S is a smaller variant of the **EfficientNetV2** family, which is an optimized version of EfficientNet, introduced by Google Research in 2021. EfficientNetV2 improves upon its predecessor by using a combination of **fused convolutional layers** and **progressive learning strategies** to achieve better performance with lower computational costs.

## Key Features
- **Fused Convolutions**: Uses both standard and depthwise convolutions to optimize early-stage processing.
- **Smaller and Faster**: Reduces training and inference times while maintaining high accuracy.
- **Progressive Learning**: Employs gradual image size scaling during training to improve model generalization.
- **Optimized Architecture**: Designed using Neural Architecture Search (NAS) to balance efficiency and accuracy.

## Architecture
EfficientNetV2S follows a structured architecture with:
- **Convolutional and Fused Blocks**: Enhances feature extraction with reduced computational costs.
- **SE (Squeeze-and-Excitation) Blocks**: Improves channel-wise feature recalibration.
- **MBConv Blocks**: Efficient depthwise convolutions for lower parameter usage.
- **Global Average Pooling**: Reduces dimensionality before the fully connected layer.
- **Fully Connected Layer**: Final classification output.

## Model Usage in TensorFlow/Keras
```python
from tensorflow.keras.applications import EfficientNetV2S

# Load pre-trained EfficientNetV2S model
model = EfficientNetV2S(weights='imagenet', include_top=True)
model.summary()
```

## Applications
- **Image Classification**: Used in medical imaging, object detection, and large-scale recognition tasks.
- **Edge AI and Mobile Deployment**: Due to its efficiency, it is ideal for resource-constrained devices.
- **Fine-Tuning for Custom Tasks**: Can be adapted for various specialized vision tasks through transfer learning.

EfficientNetV2S is widely adopted for modern deep learning applications, balancing speed and accuracy efficiently.

"""



## Import Libraries

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import warnings
warnings.filterwarnings("ignore")

# ML tools 
import tensorflow as tf
from keras.models import Sequential
from keras import layers
from keras.optimizers import Adam
from tensorflow.keras import layers, Model, optimizers
from tensorflow.keras.applications import *
import os 
from skimage.measure import find_contours

from sklearn.model_selection import train_test_split
from tensorflow.keras.callbacks import ReduceLROnPlateau, ModelCheckpoint, EarlyStopping

## Data Load and Preprocessing

In [None]:
all_classes = ['No Finding', 'Cardiomegaly', 'Hernia', 'Infiltration', 'Nodule', 'Emphysema',
               'Effusion', 'Atelectasis', 'Pleural_Thickening', 'Pneumothorax',
               'Mass', 'Fibrosis', 'Consolidation', 'Edema', 'Pneumonia']
target_cols = all_classes
df = pd.read_csv('./Usable_data/Data_Entry_2017.csv')
df = df[['Image Index','Finding Labels']]
df = df.rename(columns = {'Image Index':'img_ind'})

# One hot encode diseases
for disease in all_classes:
    print("OHC: ",disease)
    df[disease] = np.where(df['Finding Labels'].str.contains(disease), 1,0)

# Create paths
df['img_ind'] = "./Usable_data/" + df['img_ind']
df = df.drop(columns = ['Finding Labels'])

display(df.head(4))
print(df.shape)

# Select dataset for training

In [None]:
# We took a mixed sample with single and multiple diseases that appears to be performing well
df['Disease Load']  = 1 #df[all_classes].sum(axis = 0)

#--------------------------------------------------------------------------------------------------------------------------------------------------------------
# Train on a subset of data

min_samples_required = 1000

labels_df = pd.DataFrame()

for disease in all_classes:
    
    print("Finding samples for:", disease)
    
    cond_disease_exists = df[disease] == 1
    cond_only_this_disease = df['Disease Load'] == 1
    all_filters_necessary = cond_disease_exists & cond_only_this_disease
    
    df_disease = df.loc[all_filters_necessary].reset_index(drop = True)
    samples_available = len(df_disease)
    print("Samples available:", samples_available)
    samples_taken = min(samples_available,min_samples_required)
    print("Samples taken:", samples_taken)
    selected_sample_df = df_disease.iloc[0:samples_taken,:]
    labels_df = pd.concat([labels_df,selected_sample_df], axis=0).reset_index(drop = True)
    print(len(labels_df))

labels_df = labels_df.drop(columns = ['Disease Load'])

## Train-test split for data

In [None]:
paths = labels_df['img_ind']

#Get the multi-labels
label_cols = all_classes
labels = labels_df[label_cols].values

#-----------------------------------------------------------------------------------------------------------------------------------------------------------

# Train test split
(train_paths, valid_paths, 
  train_labels, valid_labels) = train_test_split(paths, labels, test_size=0.2, random_state=11)

print(train_paths.shape, valid_paths.shape)
train_labels.sum(axis=0), valid_labels.sum(axis=0)

## Define parameters

In [None]:
n_classes = len(target_cols)
img_size = 600
n_epochs = 35
lr= 0.0001
seed= 11
val_split= 0.2
seed= 33
batch_size=12
n_classes

## Data preparation for faster ingestion during training

In [None]:
def auto_select_accelerator():
    try:
        tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
        tf.config.experimental_connect_to_cluster(tpu)
        tf.tpu.experimental.initialize_tpu_system(tpu)
        strategy = tf.distribute.experimental.TPUStrategy(tpu)
        print("Running on TPU:", tpu.master())
    except ValueError:
        strategy = tf.distribute.get_strategy()
    print(f"Running on {strategy.num_replicas_in_sync} replicas")
    
    return strategy

'''
Reference
https://www.kaggle.com/xhlulu/ranzcr-efficientnet-tpu-training

'''

def build_decoder(with_labels=True, target_size=(img_size, img_size), ext='jpg'):
    def decode(path):
        file_bytes = tf.io.read_file(path) # Reads and outputs the entire contents of the input filename.

        if ext == 'png':
            img = tf.image.decode_png(file_bytes, channels=3) # Decode a PNG-encoded image to a uint8 or uint16 tensor
        elif ext in ['jpg', 'jpeg']:
            img = tf.image.decode_jpeg(file_bytes, channels=3) # Decode a JPEG-encoded image to a uint8 tensor
        else:
            raise ValueError("Image extension not supported")

        img = tf.cast(img, tf.float32) / 255.0 # Casts a tensor to the type float32 and divides by 255.
        img = tf.image.resize(img, target_size) # Resizing to target size
        return img
    
    def decode_with_labels(path, label):
        return decode(path), label
    
    return decode_with_labels if with_labels else decode


def build_augmenter(with_labels=True):
    def augment(img):
        img = tf.image.random_flip_left_right(img)
        img = tf.image.random_flip_up_down(img)
        img = tf.image.random_saturation(img, 0.8, 1.2)
        img = tf.image.random_brightness(img, 0.1)
        img = tf.image.random_contrast(img, 0.8, 1.2)
        return img
    
    def augment_with_labels(img, label):
        return augment(img), label
    
    return augment_with_labels if with_labels else augment

def build_dataset(paths, labels=None, bsize=32, cache=True,
                  decode_fn=None, augment_fn=None,
                  augment=True, repeat=True, shuffle=1024, 
                  cache_dir=""):
    if cache_dir != "" and cache is True:
        os.makedirs(cache_dir, exist_ok=True)
    
    if decode_fn is None:
        decode_fn = build_decoder(labels is not None)
    
    if augment_fn is None:
        augment_fn = build_augmenter(labels is not None)
    
    AUTO = tf.data.experimental.AUTOTUNE
    slices = paths if labels is None else (paths, labels)
    
    dset = tf.data.Dataset.from_tensor_slices(slices)
    dset = dset.map(decode_fn, num_parallel_calls=AUTO)
    dset = dset.cache(cache_dir) if cache else dset
    dset = dset.map(augment_fn, num_parallel_calls=AUTO) if augment else dset
    dset = dset.repeat() if repeat else dset
    dset = dset.shuffle(shuffle) if shuffle else dset
    dset = dset.batch(bsize).prefetch(AUTO) # overlaps data preprocessing and model execution while training
    return dset


In [None]:
strategy = auto_select_accelerator()
batch_size = strategy.num_replicas_in_sync * batch_size
print('batch size', batch_size)

In [None]:
# Build the tensorflow datasets

decoder = build_decoder(with_labels=True, target_size=(img_size, img_size))

# Build the tensorflow datasets
dtrain = build_dataset(
    train_paths, train_labels, bsize=batch_size, decode_fn=decoder
)

dvalid = build_dataset(
    valid_paths, valid_labels, bsize=batch_size, 
    repeat=False, shuffle=False, augment=False, decode_fn=decoder
)

In [None]:
data, _ = dtrain.take(2)
images = data[0].numpy()

In [None]:
fig, axes = plt.subplots(3, 4, figsize=(20,10))
axes = axes.flatten()
for img, ax in zip(images, axes):
    ax.imshow(img)
    ax.axis('off')
plt.tight_layout()
plt.show()

## Model Building (EfficientNetV2S)

In [None]:
from tensorflow.keras import layers, Model
from tensorflow.keras.applications import EfficientNetV2S

def build_model():
    base = EfficientNetV2S(input_shape=(img_size, img_size, 3), include_top=False, weights='imagenet')
    
    inp = layers.Input(shape=(img_size, img_size, 3))
    x = base(inp)
    
    # Adding additional convolutional layers
    x = layers.Conv2D(256, (3,3), activation='relu', padding='same')(x)
    x = layers.Conv2D(128, (3,3), activation='relu', padding='same')(x)
    x = layers.Conv2D(64, (3,3), activation='relu', padding='same')(x)
    
    x = layers.GlobalAveragePooling2D()(layers.Dropout(0.16)(x))  # Feature extraction + dropout
    x = layers.Dropout(0.3)(x)
    x = layers.Dense(n_classes, activation='sigmoid')(x)  # Multi-label classification
    
    return Model(inputs=inp, outputs=x)

In [None]:
with strategy.scope():
    model = build_model()
    loss = tf.keras.losses.BinaryCrossentropy(label_smoothing=0.0)
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=lr),
        loss=loss,
        metrics=[
            tf.keras.metrics.AUC(multi_label=True),
            tf.keras.metrics.Precision(),
            tf.keras.metrics.Recall(),
            tf.keras.metrics.BinaryAccuracy()
        ]
    )

In [None]:
model.summary()

In [None]:
name = "NIH_EfficientNetV2S_model.h5"  # Updated model name

# Learning rate reduction on plateau
rlr = ReduceLROnPlateau(
    monitor='val_loss', factor=0.1, patience=2, verbose=1, 
    min_delta=1e-4, min_lr=1e-6, mode='min', cooldown=1
)

# Model checkpoint to save the best model
ckp = ModelCheckpoint(
    name, monitor='val_loss', verbose=1, save_best_only=True, mode='min'
)

# Early stopping to prevent overfitting
es = EarlyStopping(
    monitor='val_loss', min_delta=1e-4, patience=5, mode='min', 
    restore_best_weights=True, verbose=1
)


In [None]:
steps_per_epoch = (train_paths.shape[0] // batch_size)
steps_per_epoch

In [None]:
history = model.fit(dtrain,                      
                    validation_data=dvalid,                                       
                    epochs=n_epochs,
                    callbacks=[rlr,es,ckp],
                    steps_per_epoch=steps_per_epoch,
                    verbose=1)

## Model Evaluation

In [None]:
# Save the model
name = './AJB_NIH_EfficientNetV2S_model.keras'
model.save(name)

### Loss over epochs

In [None]:
plt.figure(figsize = (12, 6))
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.plot( history.history["loss"], label = "Training Loss", marker='o')
plt.plot( history.history["val_loss"], label = "Validation Loss", marker='+')
plt.grid(True)
plt.legend()
plt.show()

### ROC-AUC over epochs

In [None]:
plt.figure(figsize = (12, 6))
plt.xlabel("Epochs")
plt.ylabel("AUC")
plt.plot( history.history["auc"], label = "Training AUC" , marker='o')
plt.plot( history.history["val_auc"], label = "Validation AUC", marker='+')
plt.grid(True)
plt.legend()
plt.show()

### Precision over epochs

In [None]:
plt.figure(figsize = (12, 6))
plt.xlabel("Epochs")
plt.ylabel("Precision")
plt.plot( history.history["precision"], label = "Training Precision" , marker='o')
plt.plot( history.history["val_precision"], label = "Validation Precision", marker='+')
plt.grid(True)
plt.legend()
plt.show()

### Accuracy over epochs

In [None]:
plt.figure(figsize = (12, 6))
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.plot( history.history["binary_accuracy"], label = "Training Accuracy" , marker='o')
plt.plot( history.history["val_binary_accuracy"], label = "Validation Accuracy", marker='+')
plt.grid(True)
plt.legend()
plt.show()

### Recall over epochs

In [None]:
plt.figure(figsize = (12, 6))
plt.xlabel("Epochs")
plt.ylabel("Recall")
plt.plot( history.history["recall"], label = "Training Recall" , marker='o')
plt.plot( history.history["val_recall"], label = "Validation Recall", marker='+')
plt.grid(True)
plt.legend()
plt.show()

### Training statistics over epochs (in numbers)

In [None]:
df_training_stats = pd.DataFrame(history.history)
df_training_stats.to_csv("./Training_stats.csv")
df_training_stats

### AUC by class (i.e. by disease)

In [None]:
tf.keras.backend.clear_session()

from sklearn.metrics import roc_auc_score
name = './AJB_NIH_EfficientNetV2S_model.keras'
model= tf.keras.models.load_model(name,  safe_mode=False)
pred= model.predict(dvalid, verbose=1)

print('AUC CHECK-UP per CLASS')

classes= all_classes
for i, n in enumerate(classes):
  print(classes[i])
  print(i, roc_auc_score(valid_labels[:, i], pred[:, i]))
  print('---------')

In [None]:
# Initialize an empty list to store results
results_auc_classwise = []

# Loop through the classes and calculate the ROC AUC scores
for i, n in enumerate(all_classes):
    auc_score = roc_auc_score(valid_labels[:, i], pred[:, i])
    results_auc_classwise.append({"Class": n, "ROC AUC Score": auc_score})

# Convert the list into a pandas DataFrame
df_auc_classwise = pd.DataFrame(results_auc_classwise)
df_auc_classwise.to_csv("./AUC_by_class.csv", index = False)
df_auc_classwise

In [None]:
# Clear session to free memory
tf.keras.backend.clear_session()

# Load the trained model
model = tf.keras.models.load_model(name, safe_mode=False)

# Get model predictions
pred = model.predict(dvalid, verbose=1)

# Compute AUC scores for each class
auc_scores = []
for i, n in enumerate(classes):
    auc = roc_auc_score(valid_labels[:, i], pred[:, i])
    auc_scores.append(auc)

# Convert to NumPy array for easy sorting (optional)
auc_scores = np.array(auc_scores)

# **Plot the AUC scores**
plt.figure(figsize=(12, 6))
plt.barh(classes, auc_scores, color='royalblue')
plt.xlabel("AUC Score")
plt.ylabel("Disease Classes")
plt.title("AUC Score per Disease Class")
plt.xlim(0, 1)  # AUC scores range from 0 to 1
plt.grid(axis="x", linestyle="--", alpha=0.5)

# Annotate bars with values
for index, value in enumerate(auc_scores):
    plt.text(value + 0.02, index, f"{value:.3f}", va="center", fontsize=10)

# Show the plot
plt.show()


In [None]:
from sklearn.metrics import roc_curve, auc

# Clear TensorFlow session
tf.keras.backend.clear_session()

# Load the trained model
model = tf.keras.models.load_model(name, safe_mode=False)

# Get model predictions
pred = model.predict(dvalid, verbose=1)

# Initialize figure for multiple ROC curves
plt.figure(figsize=(10, 8))

# Loop through each disease class and plot its ROC curve
for i, disease in enumerate(classes):
    fpr, tpr, _ = roc_curve(valid_labels[:, i], pred[:, i])  # Compute FPR, TPR
    roc_auc = auc(fpr, tpr)  # Compute AUC
    plt.plot(fpr, tpr, label=f"{disease} (AUC = {roc_auc:.3f})")  # Plot each ROC curve

# Plot diagonal line (random classifier)
plt.plot([0, 1], [0, 1], 'k--', label="Random (AUC = 0.5)")

# Set axis labels and title
plt.xlabel("False Positive Rate (FPR)")
plt.ylabel("True Positive Rate (TPR)")
plt.title("ROC Curves for All Disease Classes")
plt.legend(loc="lower right")  # Add legend
plt.grid(alpha=0.3)  # Light grid for better readability

# Show the plot
plt.show()


### Gradcam Visualization

In [None]:
def gradcam_with_contour(image_path, class_name, model):
    all_classes = ['No Finding', 'Cardiomegaly', 'Hernia', 'Infiltration', 'Nodule', 'Emphysema',
                   'Effusion', 'Atelectasis', 'Pleural_Thickening', 'Pneumothorax',
                   'Mass', 'Fibrosis', 'Consolidation', 'Edema', 'Pneumonia']
    img = cv2.imread(image_path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img_resized = cv2.resize(img, (img_size, img_size)) / 255.0
    img_array = np.expand_dims(img_resized, axis=0)
    
    preds = model.predict(img_array)[0]
    class_idx = all_classes.index(class_name)
    class_prob = preds[class_idx]
    
    # Auto-detect last convolutional layer
    for layer in reversed(model.layers):
        if isinstance(layer, tf.keras.layers.Conv2D):
            last_conv_layer = layer
            break
    
    grad_model = Model(inputs=model.input, outputs=[last_conv_layer.output, model.output])
    with tf.GradientTape() as tape:
        conv_outputs, predictions = grad_model(img_array)
        loss = predictions[:, class_idx]
    grads = tape.gradient(loss, conv_outputs)
    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))
    conv_outputs = conv_outputs[0] * pooled_grads
    heatmap = tf.reduce_mean(conv_outputs, axis=-1).numpy()
    heatmap = np.maximum(heatmap, 0)
    heatmap = heatmap / np.max(heatmap)
    
    # Generate contours
    heatmap_resized = cv2.resize(heatmap, (img.shape[1], img.shape[0]))
    contours = find_contours(heatmap_resized, 0.5)
    
    # Plot image with contour overlay
    fig, ax = plt.subplots(figsize=(8, 8))
    ax.imshow(img)
    for contour in contours:
        ax.plot(contour[:, 1], contour[:, 0], linestyle='dotted', color='red', linewidth=2)
    
    # Add a navy blue bar on top with class name and probability
    ax.add_patch(plt.Rectangle((0, 0), img.shape[1], 40, color='grey', alpha=0.8))
    ax.text(10, 25, f"{class_name}: {class_prob:.2f}", fontsize=14, color='white', weight='bold')
    ax.axis('off')

    if class_name not in ['No Finding']:
        # Save the output image with a new filename
        filename = os.path.basename(image_path)
        filename_without_ext, ext = os.path.splitext(filename)
        output_filename = f"{filename_without_ext}_{class_name}_CONTOUR{ext}"
        plt.savefig(output_filename, bbox_inches='tight', pad_inches=0)
        plt.show()
        
        print(f"Saved visualization as {output_filename}")


In [None]:
def gradcam_with_heatmap(image_path, class_name, model):
    all_classes = ['No Finding', 'Cardiomegaly', 'Hernia', 'Infiltration', 'Nodule', 'Emphysema',
                   'Effusion', 'Atelectasis', 'Pleural_Thickening', 'Pneumothorax',
                   'Mass', 'Fibrosis', 'Consolidation', 'Edema', 'Pneumonia']
    img = cv2.imread(image_path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img_resized = cv2.resize(img, (img_size, img_size)) / 255.0
    img_array = np.expand_dims(img_resized, axis=0)
    
    preds = model.predict(img_array)[0]
    class_idx = all_classes.index(class_name)
    class_prob = preds[class_idx]
    
    # Auto-detect last convolutional layer
    for layer in reversed(model.layers):
        if isinstance(layer, tf.keras.layers.Conv2D):
            last_conv_layer = layer
            break
    
    grad_model = Model(inputs=model.input, outputs=[last_conv_layer.output, model.output])
    with tf.GradientTape() as tape:
        conv_outputs, predictions = grad_model(img_array)
        loss = predictions[:, class_idx]
    grads = tape.gradient(loss, conv_outputs)
    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))
    conv_outputs = conv_outputs[0] * pooled_grads
    heatmap = tf.reduce_mean(conv_outputs, axis=-1).numpy()
    heatmap = np.maximum(heatmap, 0)
    heatmap = heatmap / np.max(heatmap)
    
    # Resize heatmap to match the original image
    heatmap_resized = cv2.resize(heatmap, (img.shape[1], img.shape[0]))
    heatmap_colored = cv2.applyColorMap(np.uint8(255 * heatmap_resized), cv2.COLORMAP_JET)
    superimposed_img = cv2.addWeighted(img, 0.6, heatmap_colored, 0.4, 0)
    
    # Plot image with heatmap overlay
    fig, ax = plt.subplots(figsize=(8, 8))
    ax.imshow(superimposed_img)
    
    # Add a navy blue bar on top with class name and probability
    ax.add_patch(plt.Rectangle((0, 0), img.shape[1], 60, color='navy', alpha=0.8))
    ax.text(10, 25, f"{class_name}: {class_prob:.2f}", fontsize=14, color='white', weight='bold')
    ax.axis('off')
    
    if class_name not in ['No Finding']:
        # Save the output image with a new filename
        filename = os.path.basename(image_path)
        filename_without_ext, ext = os.path.splitext(filename)
        output_filename = f"{filename_without_ext}_{class_name}_HEATMAP{ext}"
        plt.savefig(output_filename, bbox_inches='tight', pad_inches=0)
        plt.show()
        
        print(f"Saved visualization as {output_filename}")

In [None]:
image_path = './Usable_data/00000001_001.png'
class_name = "Emphysema"
gradcam_with_contour(image_path, class_name, model)

In [None]:
gradcam_with_heatmap(image_path, class_name, model)