In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
from torchvision import datasets, models, transforms
from torch.utils.data import DataLoader
import os
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt

In [2]:
# Data transformations
data_transforms = {
    'train': transforms.Compose([
        transforms.Resize((224, 224)),  # VGG-16 expects 224x224 images
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])  # Normalization for pre-trained models
    ]),
    'val': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'test': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

In [3]:
data_dir = '../data/2/chest_xray'
image_datasets = {
    'train': datasets.ImageFolder(os.path.join(data_dir, 'train'), data_transforms['train']),
    'val': datasets.ImageFolder(os.path.join(data_dir, 'val'), data_transforms['val']),
    'test': datasets.ImageFolder(os.path.join(data_dir, 'test'), data_transforms['test'])
}

dataloaders = {
    'train': DataLoader(image_datasets['train'], batch_size=32, shuffle=True),
    'val': DataLoader(image_datasets['val'], batch_size=32, shuffle=False),
    'test': DataLoader(image_datasets['test'], batch_size=32, shuffle=False)
}

In [4]:
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val', 'test']}
class_names = image_datasets['train'].classes

# Count class distribution
class_counts = {}
for phase in ['train', 'val', 'test']:
    class_counts[phase] = {class_name: 0 for class_name in class_names}
    dataset = image_datasets[phase]
    for _, label in dataset.samples:
        class_name = class_names[label]
        class_counts[phase][class_name] += 1

for phase in ['train', 'val', 'test']:
    print(f"\nClass counts in {phase}:")
    for class_name, count in class_counts[phase].items():
        print(f"{class_name}: {count}")


Class counts in train:
NORMAL: 959
PNEUMONIA: 3875

Class counts in val:
NORMAL: 8
PNEUMONIA: 8

Class counts in test:
NORMAL: 234
PNEUMONIA: 390


In [5]:
# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [6]:
# Load the VGG-16 with batch normalization
model = models.vgg16_bn(pretrained=True) 

# Freeze training for all layers in the feature extractor
for param in model.features.parameters():
    param.requires_grad = False

# Modify the classifier for binary classification
num_features = model.classifier[6].in_features
# Replace the last layer with a new Linear layer with 2 output features for binary classification
model.classifier = nn.Sequential(
    *list(model.classifier.children())[:-1],  # Keep all layers except the last one
    nn.Linear(num_features, 2)  # Add new layer for 2 classes
)

# Move the model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
exp_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)



In [7]:
def train_model(model, criterion, optimizer, num_epochs=20):
    for epoch in range(num_epochs):
        print(f'Epoch {epoch+1}/{num_epochs}')
        print('-' * 10)

        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
            else:
                model.eval()

            running_loss = 0.0
            running_corrects = 0

            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)
                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

    print('Training complete')

def test_model(model):
    model.eval()
    running_corrects = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for inputs, labels in dataloaders['test']:
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)

            running_corrects += torch.sum(preds == labels.data)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    test_acc = running_corrects.double() / dataset_sizes['test']
    print(f'Test Accuracy: {test_acc:.4f}')

    # Confusion matrix
    cm = confusion_matrix(all_labels, all_preds)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=class_names)
    disp.plot(cmap=plt.cm.Blues)
    plt.title("Pneumonia X-Ray Confusion Matrix")
    plt.show()

def save_model(model, path='vgg16_pneumonia.pth'):
    torch.save(model.state_dict(), path)
    print(f"Model saved as {path}")

In [None]:
train_model(model, criterion, optimizer)

Epoch 1/20
----------
train Loss: 0.1945 Acc: 0.9185
val Loss: 0.6206 Acc: 0.6875
Epoch 2/20
----------
train Loss: 0.1131 Acc: 0.9580
val Loss: 0.4787 Acc: 0.7500
Epoch 3/20
----------
train Loss: 0.0824 Acc: 0.9681
val Loss: 0.3812 Acc: 0.8125
Epoch 4/20
----------
train Loss: 0.0744 Acc: 0.9694
val Loss: 0.3155 Acc: 0.8750
Epoch 5/20
----------
train Loss: 0.0717 Acc: 0.9737
val Loss: 0.1624 Acc: 0.9375
Epoch 6/20
----------
train Loss: 0.0567 Acc: 0.9785
val Loss: 0.1588 Acc: 0.9375
Epoch 7/20
----------
train Loss: 0.0445 Acc: 0.9830
val Loss: 0.0963 Acc: 1.0000
Epoch 8/20
----------
train Loss: 0.0403 Acc: 0.9855
val Loss: 0.0859 Acc: 1.0000
Epoch 9/20
----------
train Loss: 0.0436 Acc: 0.9837
val Loss: 0.0990 Acc: 0.9375
Epoch 10/20
----------
train Loss: 0.0270 Acc: 0.9913
val Loss: 0.1408 Acc: 0.9375
Epoch 11/20
----------
train Loss: 0.0294 Acc: 0.9882
val Loss: 0.1446 Acc: 0.9375
Epoch 12/20
----------
train Loss: 0.0214 Acc: 0.9938
val Loss: 0.1073 Acc: 0.9375
Epoch 13/20
-

In [None]:
test_model(model)

In [None]:
save_model(model)