In [1]:
!pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121

Looking in indexes: https://download.pytorch.org/whl/cu121


In [2]:
!pip3 install torch torchvision torchaudio



In [3]:
!pip show torch
!pip show torchvision

Name: torch
Version: 2.4.1+cu121
Summary: Tensors and Dynamic neural networks in Python with strong GPU acceleration
Home-page: https://pytorch.org/
Author: PyTorch Team
Author-email: packages@pytorch.org
License: BSD-3
Location: /usr/local/lib/python3.10/dist-packages
Requires: filelock, fsspec, jinja2, networkx, sympy, typing-extensions
Required-by: accelerate, easyocr, fastai, kornia, pytorch-ignite, pytorch-lightning, stable-baselines3, timm, torchaudio, torchmetrics, torchvision
Name: torchvision
Version: 0.19.1+cu121
Summary: image and video datasets and models for torch deep learning
Home-page: https://github.com/pytorch/vision
Author: PyTorch Core Team
Author-email: soumith@pytorch.org
License: BSD
Location: /usr/local/lib/python3.10/dist-packages
Requires: numpy, pillow, torch
Required-by: easyocr, fastai, timm


In [4]:
import os
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import numpy as np
from PIL import Image
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

In [5]:
# --- 1. Data Exploration ---
def explore_dataset(dataset_dir):
    """Explores the dataset, printing counts, dimensions, and pixel value info"""
    for split in ["test", "train", "val"]:
        for cls in ["NORMAL", "PNEUMONIA"]:
            class_path = os.path.join(dataset_dir, split, cls)
            images = os.listdir(class_path)
            print(f"  {split}/{cls}: {len(images)} images")

            if images:
                img_path = os.path.join(class_path, images[0])
                img = Image.open(img_path)
                img_np = np.asarray(img)
                print(f"    Sample shape: {img_np.shape}")
                print(f"    Sample min pixel value: {np.min(img_np)}")
                print(f"    Sample max pixel value: {np.max(img_np)}")

In [6]:
# --- 2. Data Preprocessing ---
def preprocess_dataset(dataset_dir, batch_size=32):
    """Preprocesses the dataset using torchvision and returns DataLoaders."""
    transform = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),  # Converts to tensor and scales to [0,1]
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

    train_dataset = datasets.ImageFolder(os.path.join(dataset_dir, "train"), transform=transform)
    val_dataset = datasets.ImageFolder(os.path.join(dataset_dir, "val"), transform=transform)
    test_dataset = datasets.ImageFolder(os.path.join(dataset_dir, "test"), transform=transform)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    return train_loader, val_loader, test_loader

In [7]:
# --- 3. Model Building (CNN) ---
class SimpleCNN(nn.Module):
    def __init__(self, num_classes=2):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(64 * 56 * 56, 128)
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 64 * 56 * 56)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

def create_model(learning_rate=0.001):
    model = SimpleCNN()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.CrossEntropyLoss()
    return model, optimizer, criterion

In [8]:
# --- 4. Model Training ---
def train_model(model, train_loader, val_loader, optimizer, criterion, num_epochs, device):
    model.to(device)
    start_time = time.time()

    train_losses = []
    val_losses = []
    
    for epoch in range(num_epochs):
        model.train()
        epoch_train_loss = 0.0
        for inputs, labels in train_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            epoch_train_loss += loss.item()

        avg_train_loss = epoch_train_loss/len(train_loader)
        train_losses.append(avg_train_loss)
        print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {avg_train_loss:.4f}", end = ", ")

        model.eval() #Eval mode to disable dropout, etc.
        epoch_val_loss = 0.0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs = inputs.to(device)
                labels = labels.to(device)

                outputs = model(inputs)
                loss = criterion(outputs, labels)

                epoch_val_loss += loss.item()

        avg_val_loss = epoch_val_loss/len(val_loader)
        val_losses.append(avg_val_loss)
        print(f"Val Loss: {avg_val_loss:.4f}")
    
    elapsed = time.time() - start_time
    print(f"Training complete in {elapsed//60:.0f}m {elapsed%60:.0f}s")
    return model, train_losses, val_losses

In [9]:
# --- 5. Model Evaluation ---
def evaluate_model(model, test_loader, device):
    model.to(device)
    model.eval() #Eval mode
    all_labels = []
    all_preds = []

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(preds.cpu().numpy())

    accuracy = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds, average='weighted', zero_division=0)
    recall = recall_score(all_labels, all_preds, average='weighted', zero_division=0)
    f1 = f1_score(all_labels, all_preds, average='weighted', zero_division=0)

    cm = confusion_matrix(all_labels, all_preds)
    return accuracy, precision, recall, f1, cm

In [10]:
# --- 6. Results Visualization ---
def visualize_results(cm, class_names, train_losses, val_losses, save_prefix):
    """Visualizes the confusion matrix and loss curves"""

    # Confusion matrix
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=class_names, yticklabels=class_names)
    plt.xlabel("Predicted Labels")
    plt.ylabel("True Labels")
    plt.title("Confusion Matrix")
    plt.savefig(f"{save_prefix}_cm.png")
    plt.close()

    # Loss curves
    plt.figure(figsize=(8,6))
    plt.plot(train_losses, label="Training Loss")
    plt.plot(val_losses, label = "Validation Loss")
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.legend()
    plt.title("Training and Validation Loss")
    plt.savefig(f"{save_prefix}_loss.png")
    plt.close()
    

In [12]:
# --- Main Execution ---
if __name__ == '__main__':
    base_dir = "/kaggle/input/chest-xray-pneumonia"  # Current directory
    dataset_dir = os.path.join(base_dir, "chest_xray")
    num_epochs = 10    # Set Number of training epochs
    learning_rate = 0.001
    save_prefix = "cnn_results"

    # Check if CUDA is available, if not use the CPU.
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Device: {device}")

    # 1. Data Exploration
    explore_dataset(dataset_dir)

    # 2. Data Preprocessing
    train_loader, val_loader, test_loader = preprocess_dataset(dataset_dir)
    print("DataLoaders created")

    # 3. Model Building
    model, optimizer, criterion = create_model(learning_rate)
    print("Model created")
    print(model)
    
    # 4. Model Training
    trained_model, train_losses, val_losses = train_model(model, train_loader, val_loader, optimizer, criterion, num_epochs, device)
    print("Model training complete")

    # 5. Model Evaluation
    accuracy, precision, recall, f1, cm = evaluate_model(trained_model, test_loader, device)
    print(f"\nEvaluation Metrics:")
    print(f"  Accuracy: {accuracy:.4f}")
    print(f"  Precision: {precision:.4f}")
    print(f"  Recall: {recall:.4f}")
    print(f"  F1 Score: {f1:.4f}")
    
    # 6. Results Visualization
    class_names = ["NORMAL", "PNEUMONIA"]
    visualize_results(cm, class_names, train_losses, val_losses, save_prefix)
    print (f"Results saved as {save_prefix}_cm.png and {save_prefix}_loss.png")

Device: cuda
  test/NORMAL: 234 images
    Sample shape: (941, 1612)
    Sample min pixel value: 0
    Sample max pixel value: 255
  test/PNEUMONIA: 390 images
    Sample shape: (1104, 1624)
    Sample min pixel value: 0
    Sample max pixel value: 255
  train/NORMAL: 1341 images
    Sample shape: (1128, 1336)
    Sample min pixel value: 0
    Sample max pixel value: 255
  train/PNEUMONIA: 3875 images
    Sample shape: (712, 1024)
    Sample min pixel value: 0
    Sample max pixel value: 255
  val/NORMAL: 8 images
    Sample shape: (1416, 1736)
    Sample min pixel value: 0
    Sample max pixel value: 255
  val/PNEUMONIA: 8 images
    Sample shape: (664, 1152)
    Sample min pixel value: 0
    Sample max pixel value: 255
DataLoaders created
Model created
SimpleCNN(
  (conv1): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv2): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, cei