In [2]:
from torch.optim.lr_scheduler import CosineAnnealingLR, ReduceLROnPlateau
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from torchvision import datasets, transforms, models
from torch.utils.data import ConcatDataset
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
from torch import nn, optim
import torch

In [3]:
base_folder = "/content/drive/MyDrive/Data"
train_folder = 'train'
validation_folder = 'valid'
test_folder = 'test'

train_path = base_folder + '/' + train_folder
validation_path = base_folder + '/' + validation_folder
test_path = base_folder + '/' + test_folder

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

augment_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(degrees=15),
    transforms.RandomVerticalFlip(p=0.5),
    transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),
    transform
])

train_data = datasets.ImageFolder(root=train_path, transform=transform)
augmented_data = datasets.ImageFolder(root=train_path, transform=augment_transform)
combined_train_data = ConcatDataset([train_data, augmented_data])

valid_data = datasets.ImageFolder(root=validation_path, transform=transform)
test_data = datasets.ImageFolder(root=test_path, transform=transform)

In [4]:
train_batch_size = 100
valid_batch_size = 9

train_dataloader = DataLoader(combined_train_data, batch_size=train_batch_size, shuffle=True)
valid_dataloader = DataLoader(valid_data, batch_size=valid_batch_size)
test_dataloader = DataLoader(test_data, batch_size=1)

In [5]:
class CustomResNet(nn.Module):
    def __init__(self, num_classes):
        super(CustomResNet, self).__init__()
        self.resnet = models.resnet50(weights='ResNet50_Weights.IMAGENET1K_V2')
        num_features = self.resnet.fc.in_features
        self.resnet.fc = nn.Identity()
        self.classifier = nn.Sequential(
            nn.ReLU(),
            nn.Linear(num_features, num_classes)
        )

    def forward(self, x):
        x = self.resnet(x)
        x = self.classifier(x)
        return x

class AveragingModel(nn.Module):
    def __init__(self,models):
        super(AveragingModel, self).__init__()
        self.models = models

    def forward(self,x):
        outputs = [torch.sigmoid(model(x)) for model in self.models]
        # outputs = [torch.softmax(model(x), dim=1) for model in self.models]
        stacked_outputs = torch.stack(outputs)
        averaged_output = torch.mean(stacked_outputs, dim=0)
        return averaged_output

In [6]:
def train(dataloader,model,loss_fn,optimizer):
    for X, y in dataloader:
        model.train()

        y_pred = model(X)
        loss = loss_fn(y_pred, y)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    return loss.item()

def test(dataloader,model,loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    loss, accuracy = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            y_pred = model(X)
            loss += loss_fn(y_pred, y).item()
            accuracy += (y_pred.argmax(1) == y).type(torch.float).sum().item()
    loss /= num_batches
    accuracy /= size
    return loss, accuracy

def recompute_batch_statistics(model, dataloader):
    model.train()
    with torch.no_grad():
        for X, _ in dataloader:
            model(X)

In [None]:
model = CustomResNet(4)
loss_fn = nn.CrossEntropyLoss()

optimizer1 = optim.AdamW(params=model.parameters(), lr=0.0005)
scheduler1 = CosineAnnealingLR(optimizer1, T_max=10, eta_min=0.00001)
best_valid_loss = float('inf')
best_valid_acc = 0
no_improvement_epochs_phase1 = 0
patience_phase1 = 5
best_model_state_phase = None

print(f'Epoch\tTrain loss\tValid loss\tAccuracy')
for epoch in range(10):
    train_loss = train(train_dataloader,model,loss_fn,optimizer1)
    valid_loss, accuracy = test(valid_dataloader,model,loss_fn)
    scheduler1.step()
    print(f'{epoch+1:<5}\t{train_loss:<10.5f}\t{valid_loss:<10.5f}\t{accuracy:<8.5f}')

    if accuracy > best_valid_acc or (accuracy == best_valid_acc and valid_loss < best_valid_loss):
        best_valid_acc = accuracy
        best_valid_loss = valid_loss
        no_improvement_epochs_phase1 = 0
        best_model_state_phase = model.state_dict()
    else:
        no_improvement_epochs_phase1 += 1

    if no_improvement_epochs_phase1 >= patience_phase1:
        print("Early stopping Phase 1")
        break

if best_model_state_phase is not None:
    model.load_state_dict(best_model_state_phase)
    recompute_batch_statistics(model, train_dataloader)

optimizer2 = optim.SGD(params=model.parameters(), lr=optimizer1.param_groups[0]['lr'], momentum=0.9)
scheduler2 = ReduceLROnPlateau(optimizer2, mode='min', patience=4, factor=0.1)
no_improvement_epochs_phase2 = 0
patience_phase2 = 5

print(f'\nEpoch\tTrain loss\tValid loss\tAccuracy')
for epoch in range(10):
    train_loss = train(train_dataloader,model,loss_fn,optimizer2)
    valid_loss, accuracy = test(valid_dataloader,model,loss_fn)
    scheduler2.step(valid_loss)
    print(f'{epoch+1:<5}\t{train_loss:<10.5f}\t{valid_loss:<10.5f}\t{accuracy:<8.5f}')

    if accuracy > best_valid_acc or (accuracy == best_valid_acc and valid_loss < best_valid_loss):
        best_valid_acc = accuracy
        best_valid_loss = valid_loss
        no_improvement_epochs_phase2 = 0
        best_model_state_phase = model.state_dict()
    else:
        no_improvement_epochs_phase2 += 1

    if no_improvement_epochs_phase2 >= patience_phase2:
        print("Early stopping Phase 2")
        break

if best_model_state_phase is not None:
    model.load_state_dict(best_model_state_phase)
    recompute_batch_statistics(model, train_dataloader)

optimizer3 = optim.AdamW(params=model.parameters(), lr=optimizer2.param_groups[0]['lr'])
scheduler3 = ReduceLROnPlateau(optimizer3, mode='min', patience=4, factor=0.1)
no_improvement_epochs_phase3 = 0
patience_phase3 = 5

print(f'\nEpoch\tTrain loss\tValid loss\tAccuracy')
for epoch in range(10):
    train_loss = train(train_dataloader,model,loss_fn,optimizer3)
    valid_loss, accuracy = test(valid_dataloader,model,loss_fn)
    scheduler3.step(valid_loss)
    print(f'{epoch+1:<5}\t{train_loss:<10.5f}\t{valid_loss:<10.5f}\t{accuracy:<8.5f}')

    if accuracy > best_valid_acc or (accuracy == best_valid_acc and valid_loss < best_valid_loss):
        best_valid_acc = accuracy
        best_valid_loss = valid_loss
        no_improvement_epochs_phase3 = 0
        best_model_state_phase3 = model.state_dict()
    else:
        no_improvement_epochs_phase3 += 1

    if no_improvement_epochs_phase3 >= patience_phase3:
        print("Early stopping Phase 3")
        break

if best_model_state_phase is not None:
    model.load_state_dict(best_model_state_phase)
    recompute_batch_statistics(model, train_dataloader)

Epoch	Train loss	Valid loss	Accuracy


In [None]:
def final_test(dataloader,model,loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    loss, accuracy = 0, 0
    predictions = []
    with torch.no_grad():
        for X, y in dataloader:
            y_pred = model(X)
            loss += loss_fn(y_pred, y).item()
            accuracy += (y_pred.argmax(1) == y).type(torch.float).sum().item()
            predictions.append(torch.sigmoid(y_pred))
    loss /= num_batches
    accuracy /= size
    return loss, accuracy, predictions


test_loss, accuracy, predictions = final_test(test_dataloader,model,loss_fn)

ans = [a.argmax().item() for a in predictions]

labels = ['adenocarcinoma', 'large carcinoma', 'normal', 'squamous carcinoma']
mapping = {0:'adenocarcinoma', 1:'large carcinoma', 2:'normal', 3:'squamous carcinoma'}

cm = confusion_matrix([y.item() for _, y in test_dataloader], ans, labels=range(len(labels)))
ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels).plot(xticks_rotation='vertical')
plt.title(f'Accuracy:{cm.diagonal().sum()/cm.sum()*100:.2f}%')
plt.show()

torch.save(model.state_dict(), f'ct_scan_model({round(cm.diagonal().sum()/cm.sum()*10000)}).pth')

In [None]:
model1 = CustomResNet(4)
model1.load_state_dict(torch.load('ct_scan_model(9016).pth'))

model2 = CustomResNet(4)
model2.load_state_dict(torch.load('ct_scan_model(9270).pth'))

model3 = CustomResNet(4)
model3.load_state_dict(torch.load('ct_scan_model(9333).pth'))

loss_fn = nn.CrossEntropyLoss()

In [None]:
def final_test(dataloader,model,loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    loss, accuracy = 0, 0
    predictions = []
    with torch.no_grad():
        for X, y in dataloader:
            y_pred = model(X)
            loss += loss_fn(y_pred, y).item()
            accuracy += (y_pred.argmax(1) == y).type(torch.float).sum().item()
            predictions.append(torch.sigmoid(y_pred))
    loss /= num_batches
    accuracy /= size
    return loss, accuracy, predictions

averaged_models = [AveragingModel([model1,model2]), AveragingModel([model1,model3]), AveragingModel([model2,model3]), AveragingModel([model1,model2,model3])]
modelnames = ['Model 1 + 2', 'Model 1 + 3', 'Model 2 + 3', 'Model 1 + 2 + 3']

test_loss, accuracy, predictions1 = final_test(test_dataloader,model1,loss_fn)
test_loss, accuracy, predictions2 = final_test(test_dataloader,model2,loss_fn)
test_loss, accuracy, predictions3 = final_test(test_dataloader,model3,loss_fn)

ans1 = [a.argmax().item() for a in predictions1]
ans2 = [a.argmax().item() for a in predictions2]
ans3 = [a.argmax().item() for a in predictions3]

labels = ['adenocarcinoma', 'large carcinoma', 'normal', 'squamous carcinoma']
mapping = {0:'adenocarcinoma', 1:'large carcinoma', 2:'normal', 3:'squamous carcinoma'}

cm = confusion_matrix([y.item() for _, y in test_dataloader], ans1, labels=range(len(labels)))
ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels).plot(xticks_rotation='vertical')
cm.diagonal().sum()
plt.title(f'Model 1\nAccuracy:{cm.diagonal().sum()/cm.sum()*100:.2f}%')
plt.show()

cm = confusion_matrix([y.item() for _, y in test_dataloader], ans2, labels=range(len(labels)))
ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels).plot(xticks_rotation='vertical')
plt.title(f'Model 2\nAccuracy:{cm.diagonal().sum()/cm.sum()*100:.2f}%')
plt.show()

cm = confusion_matrix([y.item() for _, y in test_dataloader], ans3, labels=range(len(labels)))
ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels).plot(xticks_rotation='vertical')
plt.title(f'Model 3\nAccuracy:{cm.diagonal().sum()/cm.sum()*100:.2f}%')
plt.show()


for modelname, averaged_model in zip(modelnames,averaged_models):
    test_loss, accuracy, predictions = final_test(test_dataloader,averaged_model,loss_fn)
    ans = [a.argmax().item() for a in predictions]
    cm = confusion_matrix([y.item() for _, y in test_dataloader], ans, labels=range(len(labels)))
    ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels).plot(xticks_rotation='vertical')
    plt.title(f'{modelname}\nAccuracy:{cm.diagonal().sum()/cm.sum()*100:.2f}%')
    plt.show()

# torch.save(model.state_dict(), 'ct_scan_model.pth')