In [None]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms, models

In [None]:
# Debug: Check imports
print("PyTorch version:", torch.__version__)
print("Models type:", type(models))
print("Available models:", [x for x in dir(models) if not x.startswith('_')][:10])
print("ResNet18 available:", hasattr(models, 'resnet18'))

# **Base line code**

In [None]:
# Data preparation
import os

data_dir = "/content/drive/MyDrive/NIH_ChestXray_subset_split"

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

train_dataset = datasets.ImageFolder(root=f"{data_dir}/train", transform=transform)
val_dataset = datasets.ImageFolder(root=f"{data_dir}/val", transform=transform)
test_dataset = datasets.ImageFolder(root=f"{data_dir}/test", transform=transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=2)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=2)

print(f"Train: {len(train_dataset)}, Val: {len(val_dataset)}, Test: {len(test_dataset)}")
print(f"Classes: {train_dataset.classes}")

In [None]:
# ======================
# Dataset Analysis: Check class distribution and labels
# ======================
import os
from collections import Counter

def analyze_dataset(data_dir):
    """Analyze the dataset structure and class distribution"""
    
    print("=" * 60)
    print("DATASET ANALYSIS")
    print("=" * 60)
    
    for split in ['train', 'val', 'test']:
        split_path = os.path.join(data_dir, split)
        if not os.path.exists(split_path):
            print(f"Warning: {split} directory not found!")
            continue
            
        print(f"\n{split.upper()} SET:")
        print("-" * 30)
        
        class_counts = {}
        total_samples = 0
        
        # Get all class directories
        class_dirs = [d for d in os.listdir(split_path) 
                     if os.path.isdir(os.path.join(split_path, d))]
        class_dirs.sort()  # Sort for consistent output
        
        for class_name in class_dirs:
            class_path = os.path.join(split_path, class_name)
            # Count image files
            image_files = [f for f in os.listdir(class_path) 
                          if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
            count = len(image_files)
            class_counts[class_name] = count
            total_samples += count
            
        # Print class distribution
        for class_name, count in class_counts.items():
            percentage = (count / total_samples) * 100 if total_samples > 0 else 0
            print(f"  {class_name:<15}: {count:>5} samples ({percentage:>5.1f}%)")
            
        print(f"  {'TOTAL':<15}: {total_samples:>5} samples")
        
        # Check for class imbalance
        if class_counts:
            max_class = max(class_counts, key=class_counts.get)
            min_class = min(class_counts, key=class_counts.get)
            imbalance_ratio = class_counts[max_class] / class_counts[min_class]
            print(f"  Imbalance ratio (max/min): {imbalance_ratio:.1f}:1")
            print(f"  Most frequent: {max_class} ({class_counts[max_class]} samples)")
            print(f"  Least frequent: {min_class} ({class_counts[min_class]} samples)")

# Analyze the dataset
print("Analyzing NIH Chest X-ray dataset...")
analyze_dataset(data_dir)

# Also check what the ImageFolder classes are mapped to
print("\n" + "=" * 60)
print("PYTORCH IMAGEFOLDER CLASS MAPPING")
print("=" * 60)
print("Class indices mapping:")
if 'train_dataset' in locals():
    for idx, class_name in enumerate(train_dataset.classes):
        print(f"  Index {idx}: {class_name}")
else:
    print("  Train dataset not loaded yet. Run the data preparation cell first.")

In [None]:
# Create train/val/test split if needed
import os
import shutil
import random
from sklearn.model_selection import train_test_split

def create_split(source_dir, dest_dir):
    if os.path.exists(dest_dir):
        return
    
    random.seed(42)
    os.makedirs(dest_dir, exist_ok=True)
    
    for split in ['train', 'val', 'test']:
        os.makedirs(f"{dest_dir}/{split}", exist_ok=True)
    
    class_dirs = [d for d in os.listdir(source_dir) if os.path.isdir(os.path.join(source_dir, d))]
    
    for class_name in class_dirs:
        class_path = os.path.join(source_dir, class_name)
        image_files = [f for f in os.listdir(class_path) if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
        
        random.shuffle(image_files)
        n_total = len(image_files)
        n_train = int(n_total * 0.7)
        n_val = int(n_total * 0.15)
        
        splits = {
            'train': image_files[:n_train],
            'val': image_files[n_train:n_train + n_val],
            'test': image_files[n_train + n_val:]
        }
        
        for split, files in splits.items():
            split_class_dir = f"{dest_dir}/{split}/{class_name}"
            os.makedirs(split_class_dir, exist_ok=True)
            for file in files:
                shutil.copy2(os.path.join(class_path, file), os.path.join(split_class_dir, file))

source_data_dir = "/content/drive/MyDrive/NIH_ChestXray_subset"
split_data_dir = "/content/drive/MyDrive/NIH_ChestXray_subset_split"

create_split(source_data_dir, split_data_dir)
print("Data split completed")

In [None]:
# ======================
# 2. Model definition
# ======================
num_classes = 4  # No Finding, Pneumonia, Effusion, Cardiomegaly (following project deliverable)
model = models.resnet18(pretrained=False)
model.fc = nn.Linear(model.fc.in_features, num_classes)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

In [None]:
# ======================
# 3. Loss and optimizer
# ======================
# For baseline, use unweighted loss first to see the class imbalance effect
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:

# ======================
# 4. Training and evaluation functions
# ======================
def train_one_epoch(model, loader, optimizer, criterion):
    model.train()
    running_loss, correct, total = 0.0, 0, 0
    for images, labels in loader:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, preds = torch.max(outputs, 1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)
    return running_loss / len(loader), correct / total

def evaluate(model, loader, criterion):
    model.eval()
    running_loss, correct, total = 0.0, 0, 0
    with torch.no_grad():
        for images, labels in loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            running_loss += loss.item()

            _, preds = torch.max(outputs, 1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)
    return running_loss / len(loader), correct / total



In [None]:
# ======================
# 5. Training loop
# ======================
num_epochs = 5
for epoch in range(num_epochs):
    train_loss, train_acc = train_one_epoch(model, train_loader, optimizer, criterion)
    val_loss, val_acc = evaluate(model, val_loader, criterion)
    print(f"Epoch {epoch+1}: "
          f"Train Loss={train_loss:.4f}, Train Acc={train_acc:.4f}, "
          f"Val Loss={val_loss:.4f}, Val Acc={val_acc:.4f}")

In [None]:
# ======================
# 6. Final test evaluation with Detailed Metrics
# ======================

# Get detailed predictions for baseline model
def evaluate_with_predictions(model, loader, criterion):
    model.eval()
    running_loss, correct, total = 0.0, 0, 0
    all_preds, all_labels = [], []
    
    with torch.no_grad():
        for images, labels in loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            running_loss += loss.item()

            _, preds = torch.max(outputs, 1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)
            
            # Store predictions and labels
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    return running_loss / len(loader), correct / total, all_preds, all_labels

# Evaluate baseline model with detailed metrics
test_loss, test_acc, test_preds_baseline, test_labels_baseline = evaluate_with_predictions(model, test_loader, criterion)

print("=" * 60)
print("BASELINE MODEL RESULTS")
print("=" * 60)
print(f"Final Test: Loss={test_loss:.4f}, Acc={test_acc:.4f}")
print()

# Import necessary libraries for detailed metrics
from sklearn.metrics import classification_report, confusion_matrix, precision_recall_fscore_support
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

# Class names
class_names =  ['Cardiomegaly', 'Effusion', 'No Finding', 'Pneumonia']

# Detailed classification report
print("Classification Report:")
print(classification_report(test_labels_baseline, test_preds_baseline, target_names=class_names))

# Confusion Matrix
cm_baseline = confusion_matrix(test_labels_baseline, test_preds_baseline)
plt.figure(figsize=(8, 6))
sns.heatmap(cm_baseline, annot=True, fmt='d', cmap='Reds', 
            xticklabels=class_names, yticklabels=class_names)
plt.title('Baseline Model - Confusion Matrix')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.show()

# Calculate per-class metrics
precision_baseline, recall_baseline, f1_baseline, support_baseline = precision_recall_fscore_support(test_labels_baseline, test_preds_baseline)

print("\nPer-class Metrics:")
for i, class_name in enumerate(class_names):
    print(f"{class_name}:")
    print(f"  Precision: {precision_baseline[i]:.4f}")
    print(f"  Recall:    {recall_baseline[i]:.4f}")
    print(f"  F1-score:  {f1_baseline[i]:.4f}")
    print(f"  Support:   {support_baseline[i]}")
    print()