In [None]:
# Import necessary libraries
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from PIL import Image
from sklearn.metrics import classification_report, confusion_matrix

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.regularizers import l2

# Set random seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

# Set matplotlib style
plt.style.use('ggplot')

In [None]:
# Define dataset paths
base_dir = '/kaggle/input/chest-xray-pneumonia/chest_xray'  # Update this path as needed
train_dir = os.path.join(base_dir, 'train')
test_dir = os.path.join(base_dir, 'test')
val_dir = os.path.join(base_dir, 'val')

# Check if directories exist
print("Train directory exists:", os.path.exists(train_dir))
print("Test directory exists:", os.path.exists(test_dir))
print("Val directory exists:", os.path.exists(val_dir))

# Count images in each directory
def count_images(directory):
    normal_count = len(os.listdir(os.path.join(directory, 'NORMAL')))
    pneumonia_count = len(os.listdir(os.path.join(directory, 'PNEUMONIA')))
    return normal_count, pneumonia_count

train_normal, train_pneumonia = count_images(train_dir)
test_normal, test_pneumonia = count_images(test_dir)
val_normal, val_pneumonia = count_images(val_dir)

print(f"Training set - Normal: {train_normal}, Pneumonia: {train_pneumonia}")
print(f"Test set - Normal: {test_normal}, Pneumonia: {test_pneumonia}")
print(f"Validation set - Normal: {val_normal}, Pneumonia: {val_pneumonia}")

# Plot class distribution
fig, axes = plt.subplots(1, 3, figsize=(15, 5))

sets = ['Train', 'Test', 'Validation']
normal_counts = [train_normal, test_normal, val_normal]
pneumonia_counts = [train_pneumonia, test_pneumonia, val_pneumonia]

for i, (normal, pneumonia) in enumerate(zip(normal_counts, pneumonia_counts)):
    axes[i].bar(['Normal', 'Pneumonia'], [normal, pneumonia], color=['skyblue', 'lightcoral'])
    axes[i].set_title(f'{sets[i]} Set Distribution')
    axes[i].set_ylabel('Number of Images')
    
plt.tight_layout()
plt.show()

In [None]:
# Display sample images from each class
def display_sample_images(directory, title):
    fig, axes = plt.subplots(1, 2, figsize=(10, 5))
    fig.suptitle(title, fontsize=16)
    
    # Normal sample
    normal_img_path = os.path.join(directory, 'NORMAL', os.listdir(os.path.join(directory, 'NORMAL'))[0])
    normal_img = Image.open(normal_img_path)
    axes[0].imshow(normal_img, cmap='gray')
    axes[0].set_title('Normal')
    axes[0].axis('off')
    
    # Pneumonia sample
    pneumonia_img_path = os.path.join(directory, 'PNEUMONIA', os.listdir(os.path.join(directory, 'PNEUMONIA'))[0])
    pneumonia_img = Image.open(pneumonia_img_path)
    axes[1].imshow(pneumonia_img, cmap='gray')
    axes[1].set_title('Pneumonia')
    axes[1].axis('off')
    
    plt.show()

display_sample_images(train_dir, 'Sample Training Images')

In [None]:
# Define image dimensions
IMG_HEIGHT = 150
IMG_WIDTH = 150
BATCH_SIZE = 32

# Create data generators
# Baseline generator (only rescaling)
baseline_datagen = ImageDataGenerator(rescale=1./255)

# Strong augmentation generator
augmented_datagen = ImageDataGenerator(
    rescale=1./255,
    rotation_range=40,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest',
    brightness_range=[0.8, 1.2],
    validation_split=0.2  # Using 20% of training data for validation
)

# Create data generators
baseline_train_generator = baseline_datagen.flow_from_directory(
    train_dir,
    target_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=BATCH_SIZE,
    class_mode='binary',
    shuffle=True
)

baseline_val_generator = baseline_datagen.flow_from_directory(
    val_dir,
    target_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=BATCH_SIZE,
    class_mode='binary',
    shuffle=False
)

# For augmented model, we'll use a subset of training data for validation
augmented_train_generator = augmented_datagen.flow_from_directory(
    train_dir,
    target_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=BATCH_SIZE,
    class_mode='binary',
    shuffle=True,
    subset='training'
)

augmented_val_generator = augmented_datagen.flow_from_directory(
    train_dir,
    target_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=BATCH_SIZE,
    class_mode='binary',
    shuffle=False,
    subset='validation'
)

# Test generator (same for both models)
test_datagen = ImageDataGenerator(rescale=1./255)
test_generator = test_datagen.flow_from_directory(
    test_dir,
    target_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=BATCH_SIZE,
    class_mode='binary',
    shuffle=False
)

In [None]:
def create_cnn_model():
    model = Sequential([
        # First convolutional block
        Conv2D(32, (3, 3), activation='relu', input_shape=(IMG_HEIGHT, IMG_WIDTH, 3)),
        BatchNormalization(),
        MaxPooling2D((2, 2)),
        Dropout(0.25),
        
        # Second convolutional block
        Conv2D(64, (3, 3), activation='relu'),
        BatchNormalization(),
        MaxPooling2D((2, 2)),
        Dropout(0.25),
        
        # Third convolutional block
        Conv2D(128, (3, 3), activation='relu'),
        BatchNormalization(),
        MaxPooling2D((2, 2)),
        Dropout(0.25),
        
        # Fourth convolutional block
        Conv2D(256, (3, 3), activation='relu'),
        BatchNormalization(),
        MaxPooling2D((2, 2)),
        Dropout(0.25),
        
        # Flatten and dense layers
        Flatten(),
        Dense(512, activation='relu', kernel_regularizer=l2(0.001)),
        BatchNormalization(),
        Dropout(0.5),
        Dense(1, activation='sigmoid')
    ])
    
    return model

# Create baseline model
baseline_model = create_cnn_model()
baseline_model.compile(
    optimizer=Adam(learning_rate=0.001),
    loss='binary_crossentropy',
    metrics=['accuracy', 'precision', 'recall']
)

# Create augmented model
augmented_model = create_cnn_model()
augmented_model.compile(
    optimizer=Adam(learning_rate=0.001),
    loss='binary_crossentropy',
    metrics=['accuracy', 'precision', 'recall']
)

# Display model architecture
baseline_model.summary()

In [None]:
# Define callbacks
early_stopping = EarlyStopping(
    monitor='val_loss',
    patience=10,
    restore_best_weights=True,
    verbose=1
)

reduce_lr = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.2,
    patience=5,
    min_lr=1e-7,
    verbose=1
)

# Calculate class weights to handle imbalance
total_train = train_normal + train_pneumonia
weight_for_0 = total_train / (2 * train_normal)  # weight for normal class
weight_for_1 = total_train / (2 * train_pneumonia)  # weight for pneumonia class

class_weights = {0: weight_for_0, 1: weight_for_1}
print(f"Class weights: {class_weights}")

In [None]:
# Evaluate models on test set
print("Evaluating baseline model...")
baseline_results = baseline_model.evaluate(test_generator, verbose=0)
print(f"Baseline Model - Test Loss: {baseline_results[0]:.4f}, Test Accuracy: {baseline_results[1]:.4f}")

print("Evaluating augmented model...")
augmented_results = augmented_model.evaluate(test_generator, verbose=0)
print(f"Augmented Model - Test Loss: {augmented_results[0]:.4f}, Test Accuracy: {augmented_results[1]:.4f}")

# Plot training history comparison
def plot_training_history(history, title):
    fig, axes = plt.subplots(1, 2, figsize=(15, 5))
    fig.suptitle(title, fontsize=16)
    
    # Plot accuracy
    axes[0].plot(history.history['accuracy'], label='Training Accuracy')
    axes[0].plot(history.history['val_accuracy'], label='Validation Accuracy')
    axes[0].set_title('Model Accuracy')
    axes[0].set_xlabel('Epoch')
    axes[0].set_ylabel('Accuracy')
    axes[0].legend()
    
    # Plot loss
    axes[1].plot(history.history['loss'], label='Training Loss')
    axes[1].plot(history.history['val_loss'], label='Validation Loss')
    axes[1].set_title('Model Loss')
    axes[1].set_xlabel('Epoch')
    axes[1].set_ylabel('Loss')
    axes[1].legend()
    
    plt.show()

plot_training_history(baseline_history, 'Baseline Model Training History')
plot_training_history(augmented_history, 'Augmented Model Training History')

In [None]:
# Generate predictions
baseline_predictions = baseline_model.predict(test_generator, verbose=0)
augmented_predictions = augmented_model.predict(test_generator, verbose=0)

# Convert probabilities to binary predictions
baseline_pred_binary = (baseline_predictions > 0.5).astype(int)
augmented_pred_binary = (augmented_predictions > 0.5).astype(int)

# Get true labels
true_labels = test_generator.classes

# Generate classification reports
print("Baseline Model Classification Report:")
print(classification_report(true_labels, baseline_pred_binary, target_names=['Normal', 'Pneumonia']))

print("\nAugmented Model Classification Report:")
print(classification_report(true_labels, augmented_pred_binary, target_names=['Normal', 'Pneumonia']))

# Plot confusion matrices
def plot_confusion_matrix(y_true, y_pred, title):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
                xticklabels=['Normal', 'Pneumonia'], 
                yticklabels=['Normal', 'Pneumonia'])
    plt.title(title)
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.show()

plot_confusion_matrix(true_labels, baseline_pred_binary, 'Baseline Model Confusion Matrix')
plot_confusion_matrix(true_labels, augmented_pred_binary, 'Augmented Model Confusion Matrix')

In [None]:
# Visualize some correct and incorrect predictions
def visualize_predictions(model, generator, num_images=5):
    # Get a batch of data
    x_batch, y_batch = next(generator)
    
    # Make predictions
    preds = model.predict(x_batch, verbose=0)
    pred_classes = (preds > 0.5).astype(int).flatten()
    
    # Get true classes
    true_classes = y_batch.astype(int)
    
    # Find correct and incorrect predictions
    correct_indices = np.where(pred_classes == true_classes)[0]
    incorrect_indices = np.where(pred_classes != true_classes)[0]
    
    # Display correct predictions
    print("Correct Predictions:")
    fig, axes = plt.subplots(1, min(num_images, len(correct_indices)), figsize=(15, 5))
    if len(correct_indices) > 0:
        for i, idx in enumerate(correct_indices[:num_images]):
            axes[i].imshow(x_batch[idx])
            axes[i].set_title(f'True: {true_classes[idx]}, Pred: {pred_classes[idx]}')
            axes[i].axis('off')
        plt.show()
    
    # Display incorrect predictions
    print("Incorrect Predictions:")
    fig, axes = plt.subplots(1, min(num_images, len(incorrect_indices)), figsize=(15, 5))
    if len(incorrect_indices) > 0:
        for i, idx in enumerate(incorrect_indices[:num_images]):
            axes[i].imshow(x_batch[idx])
            axes[i].set_title(f'True: {true_classes[idx]}, Pred: {pred_classes[idx]}')
            axes[i].axis('off')
        plt.show()

# Reset test generator to get a consistent batch
test_generator.reset()
visualize_predictions(baseline_model, test_generator)

In [None]:
# Create a comparison table
comparison_data = {
    'Model': ['Baseline', 'Strong Augmentation'],
    'Test Accuracy': [baseline_results[1], augmented_results[1]],
    'Test Loss': [baseline_results[0], augmented_results[0]],
    'Precision': [baseline_results[2], augmented_results[2]],
    'Recall': [baseline_results[3], augmented_results[3]]
}

comparison_df = pd.DataFrame(comparison_data)
print("Model Comparison:")
print(comparison_df)

# Plot comparison
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
metrics = ['Test Accuracy', 'Test Loss', 'Precision', 'Recall']
colors = ['skyblue', 'lightcoral']

for i, metric in enumerate(metrics):
    row, col = i // 2, i % 2
    axes[row, col].bar(comparison_df['Model'], comparison_df[metric], color=colors)
    axes[row, col].set_title(metric)
    axes[row, col].set_ylabel(metric)
    
    # Add value labels on bars
    for j, v in enumerate(comparison_df[metric]):
        axes[row, col].text(j, v + 0.01, f'{v:.3f}', ha='center', va='bottom')

plt.tight_layout()
plt.show()

# Final summary
print("\nFINAL SUMMARY:")
print("="*50)
print(f"Baseline Model achieved {baseline_results[1]*100:.2f}% accuracy on the test set.")
print(f"Augmented Model achieved {augmented_results[1]*100:.2f}% accuracy on the test set.")

if augmented_results[1] > baseline_results[1]:
    improvement = ((augmented_results[1] - baseline_results[1]) / baseline_results[1]) * 100
    print(f"Strong augmentation improved performance by {improvement:.2f}%.")
else:
    print("Strong augmentation did not improve performance in this case.")

In [None]:
# Save models
baseline_model.save('pneumonia_baseline_cnn.h5')
augmented_model.save('pneumonia_augmented_cnn.h5')
print("Models saved successfully!")

# Save training history
import pickle

with open('baseline_history.pkl', 'wb') as f:
    pickle.dump(baseline_history.history, f)
    
with open('augmented_history.pkl', 'wb') as f:
    pickle.dump(augmented_history.history, f)
    
print("Training history saved successfully!")

# Save results to CSV
comparison_df.to_csv('model_comparison.csv', index=False)
print("Results saved to CSV!")