In [None]:
#requirements install

!pip install torch==2.2.1
!pip install torchvision==0.17.1
!pip install matplotlib
!pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121

In [None]:
#task_1.1

import os
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader
from torchvision.models import resnet18, ResNet18_Weights
import matplotlib.pyplot as plt
from sklearn.metrics import classification_report

# CUDA availability
if torch.cuda.is_available():
    device = torch.device("cuda")
    print("CUDA is available. Running on GPU.")
else:
    device = torch.device("cpu")
    print("CUDA is not available. Running on CPU.")

data_dir = "/content/drive/MyDrive/Grad/CAP5516 MIC/chest_xray"

# data augmentation for the training set
train_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.RandomRotation(10),
    transforms.RandomHorizontalFlip(),
    transforms.RandomResizedCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# transformations for validation and testing sets
val_test_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

image_datasets = {
    'train': datasets.ImageFolder(os.path.join(data_dir, 'train'), train_transform),
    'val': datasets.ImageFolder(os.path.join(data_dir, 'val'), val_test_transform),
    'test': datasets.ImageFolder(os.path.join(data_dir, 'test'), val_test_transform)
}

dataloaders = {x: DataLoader(image_datasets[x], batch_size=4, shuffle=True, num_workers=2) for x in ['train', 'val', 'test']}

def initialize_model(model_name, num_classes, use_pretrained=True):
    model = None

    if model_name == "resnet18":
        if use_pretrained:
            weights = ResNet18_Weights.DEFAULT
            model = resnet18(weights=weights)
        else:
            model = resnet18(weights=None)
        num_ftrs = model.fc.in_features
        model.fc = nn.Linear(num_ftrs, num_classes)

    return model

model = initialize_model("resnet18", 2, use_pretrained=False)
model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9, weight_decay=1e-4)

scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

# train model
def train_model(model, dataloaders, criterion, optimizer, scheduler, num_epochs=25):
    train_loss_history = []
    val_loss_history = []
    train_acc_history = []
    val_acc_history = []

    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)

        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
            else:
                model.eval()

            running_loss = 0.0
            running_corrects = 0

            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            if phase == 'train':
                train_loss_history.append(epoch_loss)
                train_acc_history.append(epoch_acc)
                scheduler.step()
            else:
                val_loss_history.append(epoch_loss)
                val_acc_history.append(epoch_acc)

    return model, train_loss_history, val_loss_history, train_acc_history, val_acc_history

model_ft, train_loss_history, val_loss_history, train_acc_history, val_acc_history = train_model(model, dataloaders, criterion, optimizer, scheduler, num_epochs=25)

# test model
def test_model(model, dataloaders, phase='test', num_failure_cases=5):
    model.eval()
    correct = 0
    total = 0
    true_labels = []
    predicted_labels = []
    misclassified_images = []
    misclassified_true_labels = []
    misclassified_predicted_labels = []

    with torch.no_grad():
        for data in dataloaders[phase]:
            images, labels = data
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            true_labels.extend(labels.cpu().numpy())
            predicted_labels.extend(predicted.cpu().numpy())

            misclassified_mask = (predicted != labels)
            misclassified_images.extend(images[misclassified_mask].cpu())
            misclassified_true_labels.extend(labels[misclassified_mask].cpu().numpy())
            misclassified_predicted_labels.extend(predicted[misclassified_mask].cpu().numpy())

    print(f'Accuracy of the network on the test images: {100 * correct / total}%')
    print('Classification Report:')
    print(classification_report(true_labels, predicted_labels, target_names=['Normal', 'Pneumonia']))

    # Display misclassified images
    fig, axes = plt.subplots(nrows=1, ncols=num_failure_cases, figsize=(20, 4))
    for i in range(num_failure_cases):
        if i >= len(misclassified_images):
            break
        image = misclassified_images[i]
        true_label = misclassified_true_labels[i]
        predicted_label = misclassified_predicted_labels[i]
        axes[i].imshow(image.permute(1, 2, 0).numpy())
        axes[i].set_title(f"True: {true_label}, Predicted: {predicted_label}")
        axes[i].axis('off')
    plt.tight_layout()
    plt.show()

test_model(model, dataloaders)


# plotting
plt.figure(figsize=(10, 5))
plt.plot(train_loss_history, label='Training Loss')
plt.plot(val_loss_history, label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training and Validation Loss Curves')
plt.legend()
plt.show()

In [None]:
#task_1.2


# data augmentation for the training set
train_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.RandomRotation(10),
    transforms.RandomHorizontalFlip(),
    transforms.RandomResizedCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# transformations for validation and testing sets
val_test_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

image_datasets = {
    'train': datasets.ImageFolder(os.path.join(data_dir, 'train'), train_transform),
    'val': datasets.ImageFolder(os.path.join(data_dir, 'val'), val_test_transform),
    'test': datasets.ImageFolder(os.path.join(data_dir, 'test'), val_test_transform)
}

dataloaders = {x: DataLoader(image_datasets[x], batch_size=4, shuffle=True, num_workers=2) for x in ['train', 'val', 'test']}

def initialize_model(model_name, num_classes, use_pretrained=True):
    model = None

    if model_name == "resnet18":
        if use_pretrained:
            weights = ResNet18_Weights.DEFAULT
            model = resnet18(weights=weights)
        else:
            model = resnet18(weights=None)
        num_ftrs = model.fc.in_features
        model.fc = nn.Linear(num_ftrs, num_classes)

    return model

model = initialize_model("resnet18", 2, use_pretrained=True)
model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9, weight_decay=1e-4)


scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

# train model
def train_model(model, dataloaders, criterion, optimizer, scheduler, num_epochs=25):
    train_loss_history = []
    val_loss_history = []
    train_acc_history = []
    val_acc_history = []

    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)

        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
            else:
                model.eval()

            running_loss = 0.0
            running_corrects = 0

            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            if phase == 'train':
                train_loss_history.append(epoch_loss)
                train_acc_history.append(epoch_acc)
                scheduler.step()
            else:
                val_loss_history.append(epoch_loss)
                val_acc_history.append(epoch_acc)

    return model, train_loss_history, val_loss_history, train_acc_history, val_acc_history

model_ft, train_loss_history, val_loss_history, train_acc_history, val_acc_history = train_model(model, dataloaders, criterion, optimizer, scheduler, num_epochs=25)

#test model
def test_model(model, dataloaders, phase='test', num_failure_cases=5):
    model.eval()
    correct = 0
    total = 0
    true_labels = []
    predicted_labels = []
    misclassified_images = []
    misclassified_true_labels = []
    misclassified_predicted_labels = []

    with torch.no_grad():
        for data in dataloaders[phase]:
            images, labels = data
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            true_labels.extend(labels.cpu().numpy())
            predicted_labels.extend(predicted.cpu().numpy())

            misclassified_mask = (predicted != labels)
            misclassified_images.extend(images[misclassified_mask].cpu())
            misclassified_true_labels.extend(labels[misclassified_mask].cpu().numpy())
            misclassified_predicted_labels.extend(predicted[misclassified_mask].cpu().numpy())

    print(f'Accuracy of the network on the test images: {100 * correct / total}%')
    print('Classification Report:')
    print(classification_report(true_labels, predicted_labels, target_names=['Normal', 'Pneumonia']))

    # Display misclassified images
    fig, axes = plt.subplots(nrows=1, ncols=num_failure_cases, figsize=(20, 4))
    for i in range(num_failure_cases):
        if i >= len(misclassified_images):
            break
        image = misclassified_images[i]
        true_label = misclassified_true_labels[i]
        predicted_label = misclassified_predicted_labels[i]
        axes[i].imshow(image.permute(1, 2, 0).numpy())
        axes[i].set_title(f"True: {true_label}, Predicted: {predicted_label}")
        axes[i].axis('off')
    plt.tight_layout()
    plt.show()

test_model(model, dataloaders)

# plotting
plt.figure(figsize=(10, 5))
plt.plot(train_loss_history, label='Training Loss')
plt.plot(val_loss_history, label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training and Validation Loss Curves')
plt.legend()
plt.show()