In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
import torch.backends.cudnn as cudnn
import numpy as np
import torchvision
from torchvision import datasets, models, transforms
import matplotlib.pyplot as plt
import time
import os
import copy
import random
import pandas as pd
import PIL
from tqdm import tqdm

In [None]:
def set_seed(no):
    torch.manual_seed(no)
    random.seed(no)
    np.random.seed(no)
    os.environ['PYTHONHASHSEED'] = str()
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True

set_seed(100)

In [None]:
# Data augmentation and normalization for training
# Just normalization for validation
data_transforms = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'test': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

In [None]:
train = pd.read_csv("OCT_final_train_csv.csv")
val = pd.read_csv("OCT_final_val_csv.csv")

In [None]:
train.head()

In [None]:
#train[train.source=="oct_mendely"]

In [None]:
val.level.value_counts()

In [None]:
val.level.value_counts()

In [None]:
class CustomDatasetLabeled(torch.utils.data.Dataset):
    def __init__(self, df, split, images_folder, transform = None):
        self.df = df
        self.images_folder = images_folder
        self.transform = transform
        self.split=split
        #self.class2index = {"cat":0, "dog":1}

    def __len__(self):
        return len(self.df)
    def __getitem__(self, index):
        filename = self.df.loc[index]["path"]
        label = self.df.loc[index]["level"]
        image = PIL.Image.open(os.path.join(self.images_folder, filename))
        image = image.convert("RGB")
        if self.transform is not None:
            image = self.transform(image)
        if self.split=="unlabeled":
            return image, -1
        return image, label

In [None]:
train_data = CustomDatasetLabeled(df=train, split="train",  images_folder="./", transform=data_transforms['train'])
val_data = CustomDatasetLabeled(df=val, split="val", images_folder="./", transform=data_transforms['val'])

In [None]:
dataset = torch.utils.data.ConcatDataset([train_data, val_data])


In [None]:
#train_data[0][0].shape
dataset[0][0].shape

In [None]:
# train_loader = torch.utils.data.DataLoader(train_data, batch_size=64, shuffle=True, 
#                                           num_workers=8)
# val_loader = torch.utils.data.DataLoader(val_data, batch_size=64, shuffle=False, num_workers=8)
train_loader = torch.utils.data.DataLoader(dataset, batch_size=80, shuffle=True, 
                                           num_workers=8)

In [None]:
class_names = [0, 1, 2, 3, 4, 5, 6, 7]

In [None]:
dinov2_vits14 = torch.hub.load('facebookresearch/dinov2', 'dinov2_vits14')


In [None]:
class DinoVisionTransformerClassifier(nn.Module):
    def __init__(self):
        super(DinoVisionTransformerClassifier, self).__init__()
        self.transformer = dinov2_vits14
        self.classifier = nn.Sequential(
            nn.Linear(384, 256),
            nn.ReLU(),
            nn.Linear(256, 5)
        )
    
    def forward(self, x):
        x = self.transformer(x)
        x = self.transformer.norm(x)
        x = self.classifier(x)
        return x

     

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
model = DinoVisionTransformerClassifier()
model.load_state_dict(torch.load("./models/x_ray_pretrain_dino_10_epochs.pth"))

In [None]:
for param in model.parameters():
   param.requires_grad = False

In [None]:
model.classifier

In [None]:
model.classifier = nn.Sequential(
    nn.Linear(in_features=384, out_features=256, bias=True),
    nn.ReLU(),
    nn.Linear(in_features=256, 
              out_features=8,
              bias=True)).to(device)

In [None]:
model.classifier

In [None]:
#for param in model.parameters():
#    print(param.requires_grad)

In [None]:
import torch.optim as optim



criterion = nn.CrossEntropyLoss()
# optimizer = optim.SGD(model.parameters(), lr=0.0001, momentum=0.9)
optimizer = optim.Adam(model.parameters(), lr=0.0001)
     

In [None]:
model = model.to(device)

In [None]:
for epoch in range(5):  # loop over the dataset multiple times

    running_loss = 0.0
    for i, data in enumerate(train_loader, 0):
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = model(inputs.to(device))
        loss = criterion(outputs, labels.to(device))
        loss.backward()
        optimizer.step()

        # print statistics
        running_loss += loss.item()
        if i % 50 == 49:    # print every 2000 mini-batches
            print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 50:.3f}')
            running_loss = 0.0

print('Finished Training')
     

In [None]:
from pathlib import Path

# Create models directory (if it doesn't already exist), see: https://docs.python.org/3/library/pathlib.html#pathlib.Path.mkdir
MODEL_PATH = Path("models")
MODEL_PATH.mkdir(parents=True, # create parent directories if needed
                 exist_ok=True # if models directory already exists, don't error
)

# Create model save path
MODEL_NAME = "torch_dino_finetuned_with_valid_data_merged_5_epoch.pth"
MODEL_SAVE_PATH = MODEL_PATH / MODEL_NAME

# Save the model state dict
print(f"Saving model to: {MODEL_SAVE_PATH}")
torch.save(obj=model.state_dict(), # only saving the state_dict() only saves the learned parameters
           f=MODEL_SAVE_PATH)

# Dino Validation

In [None]:
test = pd.read_csv("OCT_final_testwcsv.csv")
test['level'] = test['label']
test.drop("label", axis=1, inplace=True)
test_data = CustomDatasetLabeled(df=test, split="test", images_folder="./", transform=data_transforms['test'])
test_loader = torch.utils.data.DataLoader(test_data, batch_size=64, shuffle=False, num_workers=8)

In [None]:
test.head()

In [None]:
#test = test[test.path!="data/oct/test/0/*"].reset_index(drop=True)

In [None]:
#test[test.level==1]
test.level.value_counts()

In [None]:
train.level.value_counts()

In [None]:
correct = 0
total = 0
# since we're not training, we don't need to calculate the gradients for our outputs
with torch.no_grad():
    for data in tqdm(test_loader):
        images, labels = data
        # calculate outputs by running images through the network
        outputs = model(images.to(device))
        # the class with the highest energy is what we choose as prediction
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted.to("cpu") == labels).sum().item()

print(f'Accuracy of the network on the {len(test_loader)} test images: {100 * correct // total} %')

In [None]:
# Import tqdm for progress bar
from tqdm.auto import tqdm

# 1. Make predictions with trained model
y_preds = []
labels = []
model.eval()
with torch.inference_mode():
  for X, y in tqdm(test_loader, desc="Making predictions"):
    # Send data and targets to target device
    X, y = X.to(device), y.to(device)
    # Do the forward pass
    y_logit = model(X)
    # Turn predictions from logits -> prediction probabilities -> predictions labels
    y_pred = torch.softmax(y_logit, dim=1).argmax(dim=1)
    # Put predictions on CPU for evaluation
    y_preds.append(y_pred.cpu())
    labels.append(y)
# Concatenate list of predictions into a tensor
y_pred_tensor = torch.cat(y_preds)
label_tensor = torch.cat(labels)

In [None]:
#y_pred_tensor

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import seaborn as sn

In [None]:
cf_matrix = confusion_matrix(label_tensor.cpu(), y_pred_tensor.cpu())
disp = ConfusionMatrixDisplay(confusion_matrix=cf_matrix,
                             display_labels=class_names)
disp.plot()

In [None]:
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score

print(precision_score(y_pred_tensor.cpu(), label_tensor.cpu(), average="weighted"))
print(recall_score(y_pred_tensor.cpu(), label_tensor.cpu(), average="weighted"))
print(f1_score(y_pred_tensor.cpu(), label_tensor.cpu(), average="weighted"))
print(accuracy_score(y_pred_tensor.cpu(), label_tensor.cpu()))

In [None]:
FP = cf_matrix.sum(axis=0) - np.diag(cf_matrix) 
FN = cf_matrix.sum(axis=1) - np.diag(cf_matrix)
TP = np.diag(cf_matrix)
TN = cf_matrix.sum() - (FP + FN + TP)
FP = FP.astype(float)
FN = FN.astype(float)
TP = TP.astype(float)
TN = TN.astype(float)
# Sensitivity, hit rate, recall, or true positive rate
TPR = TP/(TP+FN)
# Specificity or true negative rate
TNR = TN/(TN+FP) 
# Precision or positive predictive value
PPV = TP/(TP+FP)
# Negative predictive value
NPV = TN/(TN+FN)
# Fall out or false positive rate
FPR = FP/(FP+TN)
# False negative rate
FNR = FN/(TP+FN)
# False discovery rate
FDR = FP/(TP+FP)
# Overall accuracy for each class
ACC = (TP+TN)/(TP+FP+FN+TN)

print("Sensitivity: {}".format(TPR))
print("Specificity: {}".format(TNR))

# Testing Pretrained Resnet

In [None]:
#model1 = models.resnet18(weights='IMAGENET1K_V1')
model1 = models.resnet18(pretrained=True)
num_ftrs = model1.fc.in_features
model1.fc = nn.Linear(num_ftrs, 8)
model1 = model1.to(device)

In [None]:
correct = 0
total = 0
# since we're not training, we don't need to calculate the gradients for our outputs
with torch.no_grad():
    for data in tqdm(test_loader):
        images, labels = data
        # calculate outputs by running images through the network
        outputs = model1(images.to(device))
        # the class with the highest energy is what we choose as prediction
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted.to("cpu") == labels).sum().item()

print(f'Accuracy of the network on the {len(val_loader)} test images: {100 * correct // total} %')