In [None]:
# Import Libraries
import numpy as np
import os

from sklearn.metrics import mean_squared_error
from sklearn import metrics, model_selection

import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import datasets, transforms
import torchvision.models as models

from tqdm.notebook import tqdm, trange
import PIL

In [None]:
# Sends to CUDA device if available, cpu if not
device_str = "cuda" if torch.cuda.is_available() else "cpu"
print(f"using device '{device_str}'")
device = torch.device(device_str)

In [None]:
# Data augmentation (from deep_learning_2.ipynb)

# training data
data_transforms = transforms.Compose([ #data augmentation step 
        #transforms.RandomResizedCrop(224), #random croppings of the image
        transforms.CenterCrop(224),
        transforms.RandomHorizontalFlip(), #mirror image
        #transforms.RandomVerticalFlip(), #mirror image
        transforms.RandomRotation(10), #rotates image
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# test data
data_transforms_test = transforms.Compose([
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) #our new images must match statistics of original data
])

In [None]:
# Data loader (from deep_learning_2.ipynb)

# OCT images:
# train_data = datasets.ImageFolder('/work/meb135/cleandata/train', transform=data_transforms)
# test_data = datasets.ImageFolder('/work/meb135/cleandata/test', transform=data_transforms_test)
# test_data = datasets.ImageFolder('/work/meb135/cleandata/val', transform=data_transforms_test)

# OCTA images:
train_data = datasets.ImageFolder('/work/meb135/OCTAcleandata/train', transform=data_transforms)
test_data = datasets.ImageFolder('/work/meb135/OCTAcleandata/test', transform=data_transforms_test)
# test_data = datasets.ImageFolder('/work/meb135/OCTAcleandata/val', transform=data_transforms_test)

train_loader = torch.utils.data.DataLoader(train_data, batch_size=1, shuffle=True, num_workers=0)
test_loader = torch.utils.data.DataLoader(test_data, batch_size=1, shuffle=False, num_workers=0)

In [None]:
from sklearn.metrics import roc_auc_score, roc_curve, precision_recall_curve
import matplotlib.pyplot as plt

Training Data

In [None]:
#notes: good thru resnet, densenet = classifier w/o [-1], 
modelTypes = ['AlexNet']#, 'VGG', 'ResNet', 'DenseNet', 'GoogleNet']
modelArr = ['models.alexnet(pretrained=True)']#, 'models.vgg16(pretrained=True)', \
            #'models.resnet18(pretrained=True)', 'models.densenet161(pretrained=True)', \
            #'models.googlenet(pretrained=True)']
modelClass = ['classifierSeq']#, 'classifierSeq', 'fc', 'classifierNSeq', 'fc']
modelAcc = []
modelAUC = []
fprs = []
tprs = []
recalls = []
precisions = []


def test_model(m):
    preds = []
    trues = []
    
    correct = 0
    
    total = len(test_data)
    model_ft.eval() # set the model into evaluation mode, which changes the 
    # behavior of the batch norm layer so that it is not sensitive to batch size
    with torch.no_grad():
        # Iterate through test set minibatchs 
        for images, labels in tqdm(test_loader, total=len(test_loader)):
            # Forward pass
            inputs = images.to(device)
            labels = labels.unsqueeze(1).to(device)
            y = model_ft(inputs)

            predictions = torch.round(torch.sigmoid(y)).long()
            correct += torch.sum((predictions == labels).float())
            
            predictions_proba = torch.sigmoid(y)
            pred_proba = predictions_proba.cpu().data.numpy()[0][0]

            lab=labels.cpu().data.numpy()[0][0]
            
            trues.append(lab)
            preds.append(pred_proba)

    fpr, tpr, _ = roc_curve(trues, preds)
    auc_score = roc_auc_score(trues, preds)
    precision, recall, _ = precision_recall_curve(trues, preds)

    fprs.append(fpr)
    tprs.append(tpr)
    precisions.append(precision)
    recalls.append(recall)

    accuracy = correct/total
    print(f'Test accuracy for {modelTypes[m]}: {accuracy}')
    print(f'AUC Score for {modelTypes[m]}: {auc_score}')
    
    return accuracy, auc_score


def train_model(model, criterion, optimizer, m, num_epochs=25):
  
    for epoch in trange(num_epochs):
        print(f'yay epoch {epoch}')
        for images, labels in tqdm(train_loader, total=len(train_loader)):
            inputs = images.to(device)
            labels = labels.float().unsqueeze(1).to(device)
            # zero the parameter gradients
            optimizer.zero_grad()

            # Do the forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            # Calculate gradients and step (update parameters)
            loss.backward()
            optimizer.step()

        model_output = test_model(m)
        modelAcc.append(float(model_output[0].cpu().data.numpy()))
        modelAUC.append(model_output[1])

        model_ft.train() # set model back to training mode (batch norm layers back to normal) after testing

# iterate here!!
for ind, m in enumerate(modelArr):
    model_ft = eval(m)
    model_ft.to(device)
    
    if modelClass[ind] == 'classifierSeq':
        num_ftrs = model_ft.classifier[-1].in_features # Need to know # of features coming out of the penultimate layer
        model_ft.classifier[-1] = nn.Linear(num_ftrs, 1) # Redefining last feature to predict binary
        model_ft.to(device)
    
    elif modelClass[ind] == 'classifierNSeq':
        num_ftrs = model_ft.classifier.in_features # Need to know # of features coming out of the penultimate layer
        model_ft.classifier = nn.Linear(num_ftrs, 1) # Redefining last feature to predict binary
        model_ft.to(device)
    
    else:
        num_ftrs = model_ft.fc.in_features # Need to know # of features coming out of the penultimate layer
        model_ft.fc = nn.Linear(num_ftrs, 1) # Redefining last feature to predict binary
        model_ft.to(device)

    criterion = nn.BCEWithLogitsLoss() # binary cross entropy loss
    optimizer = torch.optim.SGD(model_ft.parameters(), lr=1e-5, momentum=0.9) # momentum smooths the dataset (impacts stochastic gradient descent)

    train_model(model_ft, criterion, optimizer, ind, num_epochs=100)

Test Data

In [None]:
final_test_data = datasets.ImageFolder('/work/meb135/cleandata/test', transform=data_transforms_test)
final_test_loader = torch.utils.data.DataLoader(final_test_data, batch_size=1, shuffle=False, num_workers=0)

modelAcc_TEST = []
modelAUC_TEST = []
fprs_TEST = []
tprs_TEST = []
recalls_TEST = []
precisions_TEST = []

def final_test_model(m):
    preds = []
    trues = []
    
    correct = 0
    
    total = len(final_test_data)
    model_ft.eval() # set the model into evaluation mode, which changes the 
    # behavior of the batch norm layer so that it is not sensitive to batch size
    with torch.no_grad():
        # Iterate through test set minibatchs 
        for images, labels in tqdm(final_test_loader, total=len(final_test_loader)):
            # Forward pass
            inputs = images.to(device)
            labels = labels.unsqueeze(1).to(device)
            y = model_ft(inputs)

            predictions = torch.round(torch.sigmoid(y)).long()
            correct += torch.sum((predictions == labels).float())
            
            predictions_proba = torch.sigmoid(y)
            pred_proba = predictions_proba.cpu().data.numpy()[0][0]

            lab=labels.cpu().data.numpy()[0][0]
            
            trues.append(lab)
            preds.append(pred_proba)

    fpr, tpr, _ = roc_curve(trues, preds)
    auc_score = roc_auc_score(trues, preds)
    precision, recall, _ = precision_recall_curve(trues, preds)

    fprs_TEST.append(fpr)
    tprs_TEST.append(tpr)
    precisions_TEST.append(precision)
    recalls_TEST.append(recall)

    accuracy = correct/total
    print(f'Test accuracy: {accuracy}')
    print(f'AUC Score: {auc_score}')
    
    return accuracy, auc_score

model_output_TEST = final_test_model(m)
modelAcc_TEST.append(float(model_output_TEST[0].cpu().data.numpy()))
modelAUC_TEST.append(model_output_TEST[1])