In [4]:
# Cell 1: Import libraries and set seeds for reproducibility.
import os
import math
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
import pydicom
import cv2

from sklearn.model_selection import train_test_split
from sklearn.utils import class_weight

# Set seeds for reproducibility
SEED = 42
np.random.seed(SEED)
tf.random.set_seed(SEED)

print("✅ Libraries imported and seeds set!")


✅ Libraries imported and seeds set!


In [5]:
# Cell 2: Load dataset metadata and split data into training and validation sets.
dataset_path = "/kaggle/input/rsna-pneumonia-detection-challenge"
train_labels_csv = os.path.join(dataset_path, "stage_2_train_labels.csv")
class_info_csv = os.path.join(dataset_path, "stage_2_detailed_class_info.csv")

# Read CSV files
labels_df = pd.read_csv(train_labels_csv)
class_info_df = pd.read_csv(class_info_csv)

# Merge on 'patientId' and simplify labels (0: Normal, 1: Pneumonia)
merged_df = pd.merge(labels_df, class_info_df, on="patientId")
labels_simple = merged_df[['patientId', 'Target']].drop_duplicates().reset_index(drop=True)
labels_simple['Target'] = labels_simple['Target'].map({0: 'Normal', 1: 'Pneumonia'})
labels_simple['patientId'] = labels_simple['patientId'].astype(str) + ".dcm"

# Split data (80% train, 20% validation), stratified by target.
train_df, val_df = train_test_split(labels_simple, test_size=0.2, random_state=SEED, stratify=labels_simple['Target'])
print("Train samples:", len(train_df))
print("Validation samples:", len(val_df))


Train samples: 21347
Validation samples: 5337


In [6]:
# Cell 3: Define a function to load and preprocess DICOM images.
def load_preprocess_dicom(dicom_path, img_size=(240,240)):
    dicom_data = pydicom.dcmread(dicom_path)
    img_array = dicom_data.pixel_array.astype(np.float32)
    # Normalize pixel values to [0,1]
    img_norm = (img_array - np.min(img_array)) / (np.max(img_array) - np.min(img_array) + 1e-10)
    # Resize image
    img_resized = cv2.resize(img_norm, img_size)
    # Convert grayscale to 3-channel RGB
    img_rgb = np.stack([img_resized]*3, axis=-1)
    return img_rgb

# Test the function on a sample image
sample_image_path = os.path.join(dataset_path, "stage_2_train_images", train_df.iloc[0]['patientId'])
sample_img = load_preprocess_dicom(sample_image_path)
print("✅ Sample image shape (should be 240x240x3):", sample_img.shape)


✅ Sample image shape (should be 240x240x3): (240, 240, 3)


In [7]:
# Cell 4: Create data generators for training and validation.
def data_generator(df, batch_size=64, img_size=(240,240), infinite=True):
    def gen():
        if infinite:
            while True:
                shuffled_df = df.sample(frac=1).reset_index(drop=True)
                for _, row in shuffled_df.iterrows():
                    patient_id = row['patientId']
                    label = 1 if row['Target'] == 'Pneumonia' else 0
                    dicom_path = os.path.join(dataset_path, "stage_2_train_images", patient_id)
                    img = load_preprocess_dicom(dicom_path, img_size)
                    yield img, label
        else:
            for _, row in df.iterrows():
                patient_id = row['patientId']
                label = 1 if row['Target'] == 'Pneumonia' else 0
                dicom_path = os.path.join(dataset_path, "stage_2_train_images", patient_id)
                img = load_preprocess_dicom(dicom_path, img_size)
                yield img, label

    ds = tf.data.Dataset.from_generator(
        gen,
        output_types=(tf.float32, tf.int32),
        output_shapes=((img_size[0], img_size[1], 3), ())
    )
    ds = ds.shuffle(buffer_size=1000).batch(batch_size).prefetch(tf.data.AUTOTUNE)
    return ds

BATCH_SIZE = 64
train_ds = data_generator(train_df, batch_size=BATCH_SIZE, img_size=(240,240), infinite=True)
val_ds = data_generator(val_df, batch_size=BATCH_SIZE, img_size=(240,240), infinite=False)
print("✅ Data generators created with batch size:", BATCH_SIZE)


✅ Data generators created with batch size: 64


In [8]:
# Cell 5: Compute class weights to mitigate class imbalance.
y_train = train_df['Target'].apply(lambda x: 1 if x == 'Pneumonia' else 0)
weights = class_weight.compute_class_weight('balanced', classes=np.unique(y_train), y=y_train)
class_weights = {i: w for i, w in enumerate(weights)}
print("✅ Class weights computed:", class_weights)


✅ Class weights computed: {0: 0.6454314567333858, 1: 2.219022869022869}


In [9]:
# Cell 6: Install Keras Tuner (if not already installed)
!pip install -q keras-tuner
import keras_tuner as kt


In [10]:
# Cell 7: Define the model building function for hyperparameter tuning.
def build_model(hp):
    from tensorflow.keras.applications import DenseNet201
    from tensorflow.keras.layers import GlobalAveragePooling2D, Dense, Dropout, Input
    from tensorflow.keras.models import Model

    # Load DenseNet201 with ImageNet weights (without the top classifier)
    base_model = DenseNet201(weights='imagenet', include_top=False, input_shape=(240,240,3))
    
    # Hyperparameter to decide whether to unfreeze the base or not
    unfreeze = hp.Boolean('unfreeze', default=True)
    if unfreeze:
        base_model.trainable = True
    else:
        base_model.trainable = False

    inputs = Input(shape=(240,240,3))
    x = base_model(inputs, training=False)
    x = GlobalAveragePooling2D()(x)
    # Hyperparameter: number of units in the Dense layer
    dense_units = hp.Int('dense_units', min_value=64, max_value=256, step=32, default=128)
    x = Dense(dense_units, activation='relu')(x)
    # Hyperparameter: dropout rate
    dropout_rate = hp.Float('dropout', 0.2, 0.5, step=0.1, default=0.5)
    x = Dropout(dropout_rate)(x)
    outputs = Dense(1, activation='sigmoid')(x)
    
    model = Model(inputs, outputs)
    # Hyperparameter: learning rate (use a smaller rate if unfreezing)
    lr = hp.Float('lr', min_value=1e-5, max_value=1e-3, sampling='LOG', default=1e-4)
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=lr),
                  loss='binary_crossentropy',
                  metrics=['accuracy'])
    return model

# Instantiate the tuner
tuner = kt.RandomSearch(
    build_model,
    objective='val_accuracy',
    max_trials=10,
    executions_per_trial=1,
    directory='my_dir',
    project_name='DenseNet201_tuning'
)

tuner.search_space_summary()


Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/densenet/densenet201_weights_tf_dim_ordering_tf_kernels_notop.h5
[1m74836368/74836368[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step
Search space summary
Default search space size: 4
unfreeze (Boolean)
{'default': True, 'conditions': []}
dense_units (Int)
{'default': 128, 'conditions': [], 'min_value': 64, 'max_value': 256, 'step': 32, 'sampling': 'linear'}
dropout (Float)
{'default': 0.5, 'conditions': [], 'min_value': 0.2, 'max_value': 0.5, 'step': 0.1, 'sampling': 'linear'}
lr (Float)
{'default': 0.0001, 'conditions': [], 'min_value': 1e-05, 'max_value': 0.001, 'step': None, 'sampling': 'log'}


In [11]:
# Cell 8: Run the hyperparameter search on the training dataset.
steps_per_epoch = math.ceil(len(train_df) / BATCH_SIZE)
EPOCHS = 10

tuner.search(
    train_ds,
    epochs=EPOCHS,
    steps_per_epoch=steps_per_epoch,
    validation_data=val_ds,
    class_weight=class_weights
)

# Get the best hyperparameters.
best_hp = tuner.get_best_hyperparameters(num_trials=1)[0]
print("✅ Best hyperparameters found:")
print(best_hp.values)


Trial 10 Complete [00h 59m 22s]
val_accuracy: 0.7820873260498047

Best val_accuracy So Far: 0.8369870781898499
Total elapsed time: 09h 20m 15s
✅ Best hyperparameters found:
{'unfreeze': True, 'dense_units': 64, 'dropout': 0.2, 'lr': 0.00030039157763138153}


In [12]:
# Cell 9: Build the model with the best hyperparameters from the tuner.
best_model = tuner.hypermodel.build(best_hp)
best_model.summary()

# Train this model with the frozen base.
history = best_model.fit(
    train_ds,
    epochs=EPOCHS,
    steps_per_epoch=steps_per_epoch,
    validation_data=val_ds,
    callbacks=[EarlyStopping(monitor='val_loss', patience=7, restore_best_weights=True, verbose=1),
               ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3, min_lr=1e-6, verbose=1)],
    class_weight=class_weights
)


NameError: name 'EarlyStopping' is not defined

In [None]:
# Cell 10: Clone the frozen-base model for later evaluation.
model_frozen = tf.keras.models.clone_model(best_model)
model_frozen.build((None, 240,240,3))
model_frozen.set_weights(best_model.get_weights())
print("✅ Frozen-base model cloned.")


In [None]:
# Cell 11: Unfreeze the base layers for fine-tuning.
# Here we unfreeze all layers; alternatively, you can partially unfreeze.
from tensorflow.keras.optimizers import Adam

base_model = best_model.layers[0]  # Assuming the base model is the first layer in the Sequential model.
base_model.trainable = True

# Recompile with a lower learning rate for fine-tuning.
best_model.compile(optimizer=Adam(learning_rate=1e-5),
                   loss='binary_crossentropy',
                   metrics=['accuracy'])
print("✅ Base model layers unfrozen for fine-tuning.")


In [None]:
# Cell 12: Fine-tune the model with unfrozen base layers.
FINE_TUNE_EPOCHS = 10

fine_tune_history = best_model.fit(
    train_ds,
    epochs=FINE_TUNE_EPOCHS,
    steps_per_epoch=steps_per_epoch,
    validation_data=val_ds,
    callbacks=[EarlyStopping(monitor='val_loss', patience=7, restore_best_weights=True, verbose=1),
               ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3, min_lr=1e-6, verbose=1)],
    class_weight=class_weights
)


In [None]:
# Cell 13: Evaluate the frozen-base and fine-tuned models using evaluation metrics and plot ROC curves.
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, roc_curve, auc

def evaluate_model(model, dataset, stage_name=""):
    y_true = []
    y_pred = []
    y_scores = []  # Raw probabilities
    
    for images, labels in dataset:
        preds = model.predict(images)
        y_true.extend(labels.numpy())
        y_pred.extend((preds > 0.5).astype("int32").flatten())
        y_scores.extend(preds.flatten())
    
    acc = accuracy_score(y_true, y_pred)
    prec = precision_score(y_true, y_pred)
    rec = recall_score(y_true, y_pred)
    f1 = f1_score(y_true, y_pred)
    cm = confusion_matrix(y_true, y_pred)
    fpr, tpr, _ = roc_curve(y_true, y_scores)
    roc_auc = auc(fpr, tpr)
    
    print(f"=== {stage_name} Model ===")
    print("Accuracy: {:.2f}%".format(acc * 100))
    print("Precision: {:.2f}".format(prec))
    print("Recall: {:.2f}".format(rec))
    print("F1-Score: {:.2f}".format(f1))
    print("Confusion Matrix:\n", cm)
    print("AUC: {:.2f}".format(roc_auc))
    print()
    
    plt.figure(figsize=(8,6))
    plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC (AUC = {:.2f})'.format(roc_auc))
    plt.plot([0,1], [0,1], color='navy', lw=2, linestyle='--')
    plt.xlim([0.0,1.0])
    plt.ylim([0.0,1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title(f'{stage_name} ROC Curve')
    plt.legend(loc="lower right")
    plt.show()

# Evaluate the frozen-base model
evaluate_model(model_frozen, val_ds, stage_name="Frozen Base")

# Evaluate the fine-tuned model (current best_model)
evaluate_model(best_model, val_ds, stage_name="Fine-Tuned")


In [None]:
# Cell 14: Plot combined learning curves for accuracy and loss.
plt.figure(figsize=(14,5))

# Accuracy Plot
plt.subplot(1,2,1)
plt.plot(history.history['accuracy'], label='Frozen Base - Train Acc')
plt.plot(history.history['val_accuracy'], label='Frozen Base - Val Acc')
plt.plot(fine_tune_history.history['accuracy'], label='Fine-Tuned - Train Acc')
plt.plot(fine_tune_history.history['val_accuracy'], label='Fine-Tuned - Val Acc')
plt.title('Accuracy over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend(loc='lower right')

# Loss Plot
plt.subplot(1,2,2)
plt.plot(history.history['loss'], label='Frozen Base - Train Loss')
plt.plot(history.history['val_loss'], label='Frozen Base - Val Loss')
plt.plot(fine_tune_history.history['loss'], label='Fine-Tuned - Train Loss')
plt.plot(fine_tune_history.history['val_loss'], label='Fine-Tuned - Val Loss')
plt.title('Loss over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend(loc='upper right')

plt.tight_layout()
plt.show()
