In [1]:
import torch.nn as nn 
import torchvision
import torch 
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms import transforms
import pathlib 
from tqdm import tqdm 
import torch.optim as optim
from torchvision.models import ResNet50_Weights
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from glob import glob 
import os 

In [2]:
class Res_Net(nn.Module):
    def __init__(self,classes):
        super().__init__()
        self.model = torchvision.models.resnet50(weights=ResNet50_Weights.DEFAULT)
        self.model.fc = nn.Linear(self.model.fc.in_features, len(classes))

    def forward(self, x:torch.Tensor) -> torch.Tensor:
        return self.model(x)

In [3]:
class BP_Eye_Dataset(Dataset):
    def __init__(self, image_paths, classes, my_transforms):
        self.image_paths = image_paths
        self.classes = classes
        self.my_transforms = my_transforms

        self.idx_to_class = {i:j for i, j in enumerate(self.classes)}
        self.class_to_idx = {value:key for key,value in self.idx_to_class.items()}
        
    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_filepath = self.image_paths[idx] 
        label = image_filepath.split('/')[-2]

        image = torchvision.io.read_image(image_filepath)
        
        image = self.my_transforms(image)
        label_final = self.class_to_idx[label]

        return image, label_final

In [4]:
num_epochs = 1
device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')
train_path = '/mnt/Enterprise2/shirshak/Glaucoma_Dataset_eyepacs_airogs_lightv2/eyepac-light-v2-512-jpg/train/'
val_path = '/mnt/Enterprise2/shirshak/Glaucoma_Dataset_eyepacs_airogs_lightv2/eyepac-light-v2-512-jpg/validation/'
test_path = '/mnt/Enterprise2/shirshak/Glaucoma_Dataset_eyepacs_airogs_lightv2/eyepac-light-v2-512-jpg/test/'

In [5]:
train_data_paths = glob(train_path + "*/*.jpg")
val_data_paths = glob(val_path + "*/*.jpg")
test_data_paths = glob(test_path + "*/*.jpg")

In [6]:
my_transforms = transforms.Compose([
    transforms.ToPILImage(), 
    transforms.ToTensor(),
    transforms.RandomHorizontalFlip(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]),
    transforms.Resize((256,256))
])

In [7]:
train_dataset = BP_Eye_Dataset(train_data_paths, classes = ["NRG", "RG"], my_transforms=my_transforms)

In [10]:
train_dataset[:20]

AttributeError: 'list' object has no attribute 'split'

In [9]:
train_dataset = torchvision.datasets.ImageFolder(train_path, transform=my_transform)
val_dataset = torchvision.datasets.ImageFolder(val_path, transform=my_transform)
test_dataset = torchvision.datasets.ImageFolder(test_path, transform=my_transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=True)

root=pathlib.Path(train_path)
classes=sorted([j.name.split('/')[-1] for j in root.iterdir()])

NameError: name 'my_transform' is not defined

In [None]:
model = Res_Net(classes=classes)
model = model.to(device)
loss_func = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

In [None]:
def get_needed_metrics(labels, predicted): 
    assert isinstance(labels, list)
    assert isinstance(predicted, list)
    
    accuracy = accuracy_score(labels, predicted)
    precision = precision_score(labels, predicted, zero_division=0.0)
    recall = recall_score(labels, predicted, zero_division=0.0)
    f1 = f1_score(labels, predicted, zero_division=0.0)
    
    return accuracy, precision, recall, f1

In [None]:
for epoch in range(num_epochs):
        model.train()
        acc_train_epoch, precision_train_epoch, recall_train_epoch, f1_train_epoch  = [], [], [], []
        for inputs, labels in tqdm(train_loader, desc=f'Training Epoch {epoch}/{num_epochs}', unit='batch'):
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = nn.Sigmoid()(model(inputs))     
            
            train_loss = loss_func(outputs, labels)
            train_loss.backward()
            optimizer.step()
        
            _, predicted_train = torch.max(outputs, 1)
            

            acc_batch_train, precision_batch_train, recall_batch_train, f1_batch_train = get_needed_metrics(labels.cpu().detach().tolist(), predicted_train.cpu().detach().tolist())
        
            acc_train_epoch.append(acc_batch_train)
            precision_train_epoch.append(precision_batch_train)
            recall_train_epoch.append(recall_batch_train)
            f1_train_epoch.append(f1_batch_train)
        
        # Validating the model
        model.eval()
        acc_val_epoch, precision_val_epoch, recall_val_epoch, f1_val_epoch  = [], [], [], []
        with torch.no_grad():
            for inputs, labels in tqdm(val_loader, desc=f'Testing {epoch}/{num_epochs}', unit='batch'):
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = nn.Sigmoid()(model(inputs))     
                val_loss = loss_func(outputs, labels)
                _, predicted_val = torch.max(outputs, 1)

                acc_batch_val, precision_batch_val, recall_batch_val, f1_batch_val = get_needed_metrics(labels.cpu().detach().tolist(), predicted_val.cpu().detach().tolist())
                
                acc_val_epoch.append(acc_batch_val)
                precision_val_epoch.append(precision_batch_val)
                recall_val_epoch.append(recall_batch_val)
                f1_val_epoch.append(f1_batch_val)

In [None]:
checkpoint = {
    'epoch': epoch,
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'loss': train_loss,
}
torch.save(checkpoint, '/home/shirshak/BPEye_Project_2024/zzz_tests/checkpoint.pth')

# FOR TESTING PURPOSES ONLY

In [None]:
import torch.nn as nn 
import torchvision
import torch 
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms import transforms
import pathlib 
from tqdm import tqdm 
import torch.optim as optim
from torchvision.models import ResNet50_Weights
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

In [None]:
test_path = '/mnt/Enterprise2/shirshak/Glaucoma_Dataset_eyepacs_airogs_lightv2/eyepac-light-v2-512-jpg/test/'
device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')


transform = transforms.Compose([
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(), # convert 0-255 to 0-1 and from np to tensors
        transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])])

In [None]:
class Res_Net(nn.Module):
    def __init__(self,classes):
        super().__init__()
        self.model = torchvision.models.resnet50(weights=ResNet50_Weights.DEFAULT)
        self.model.fc = nn.Linear(self.model.fc.in_features, len(classes))
    
    def forward(self, x:torch.Tensor) -> torch.Tensor:
        # print(self.model.fc.in_features)
        return self.model(x)

In [None]:
def get_needed_metrics(labels, predicted): 
    assert isinstance(labels, list)
    assert isinstance(predicted, list)
    
    accuracy = accuracy_score(labels, predicted)
    precision = precision_score(labels, predicted, zero_division=0.0)
    recall = recall_score(labels, predicted, zero_division=0.0)
    f1 = f1_score(labels, predicted, zero_division=0.0)
    
    return accuracy, precision, recall, f1

In [None]:
root=pathlib.Path(test_path)
classes=sorted([j.name.split('/')[-1] for j in root.iterdir()])

In [None]:
test_dataset = torchvision.datasets.ImageFolder(test_path, transform=transform)
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=True)

In [None]:
loaded_model = Res_Net(classes=classes)
checkpoint = torch.load('/home/shirshak/BPEye_Project_2024/zzz_tests/checkpoint.pth')
loaded_model = loaded_model.to(device)
loaded_model.load_state_dict(checkpoint['model_state_dict'])
loaded_optimizer = optim.Adam(loaded_model.parameters(), lr=1e-4)
loaded_optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
epoch = checkpoint['epoch']
train_loss = checkpoint['loss']

In [None]:
loss_func = nn.CrossEntropyLoss()

In [None]:
loaded_model.eval()
acc_test_epoch, precision_test_epoch, recall_test_epoch, f1_test_epoch  = [], [], [], []
with torch.no_grad():
    for inputs, labels in tqdm(test_loader, desc=f'Testing'):
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = nn.Sigmoid()(loaded_model(inputs))     
        val_loss = loss_func(outputs, labels)
        _, predicted_test = torch.max(outputs, 1)

        acc_batch_test, precision_batch_test, recall_batch_test, f1_batch_test = get_needed_metrics(labels.cpu().detach().tolist(), predicted_test.cpu().detach().tolist())
        
        acc_test_epoch.append(acc_batch_test)
        precision_test_epoch.append(precision_batch_test)
        recall_test_epoch.append(recall_batch_test)
        f1_test_epoch.append(f1_batch_test)

In [None]:
torch.tensor(acc_test_epoch).mean(), torch.tensor(precision_test_epoch).mean(), torch.tensor(recall_test_epoch).mean(), torch.tensor(f1_test_epoch).mean()

In [None]:
import os
import torchvision
from torchvision.transforms import transforms

In [None]:
transforms = transforms.Compose([
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(), # convert 0-255 to 0-1 and from np to tensors
            transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]),
            transforms.Resize((256,256))
        ])


class ImageFolderWithPaths(torchvision.datasets.ImageFolder):
    def __getitem__(self, index):
        img, label = super(ImageFolderWithPaths, self).__getitem__(index)
        path = self.imgs[index][0]
        return (img, label, path)    

In [None]:
root_dir = "/mnt/Enterprise2/shirshak/GLAUCOMA_DATASET_EYEPACS_AIROGS_ZENODO/preprocessed_separated_train_test_val/"

In [None]:
data_train = ImageFolderWithPaths(root=os.path.join(root_dir, "train"), transform=transforms)
data_val = ImageFolderWithPaths(root=os.path.join(root_dir, "val"), transform=transforms)
data_test = ImageFolderWithPaths(root=os.path.join(root_dir, "test"), transform=transforms)

#TODO CHANGE THIS WHEN DOING FULL PIPELINE
data_train = data_train[:20]
data_val = data_val[:20]
data_test = data_test[:20]

In [None]:
data_train[0]