In [1]:
import torch
from torch.utils.data import Dataset
from sklearn.model_selection import train_test_split
from torchvision import transforms
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import os
from torchvision import transforms
from PIL import Image
from torchvision import models
import optuna

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else 'cpu')

In [3]:
paths = []
label_to_idx = {
    'normal':0,
    'diabetic_retinopathy':1,
    'glaucoma':2,
    'cataract':3,
}
idx_to_label = {
    0:'normal',
    1:"diabetic_retinopathy",
    2:'glaucoma',
    3:'cataract'
}
for class_name in os.listdir("dataset"):
    img_paths = os.listdir(f'dataset/{class_name}')
    for img_path in img_paths:
        paths.append((label_to_idx[class_name], os.path.join("dataset",class_name,img_path)))

class CustomDataset(Dataset):
    def __init__(self, paths_list, transform = None):
        super().__init__()
        self.path_list = paths_list
        self.transform = transform

    def __len__(self):
        return len(self.path_list)

    def __getitem__(self, idx):
        class_idx, img_path = self.path_list[idx]
        try:
            with Image.open(img_path) as img:
                img = img.convert("RGB")
                if self.transform:
                    img = self.transform(img)
                return class_idx, img
        except Exception as e:
            print(f"Error loading {img_path}:{str(e)}")
            return None

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    transforms.Resize((224,224))
])

In [4]:
labels = [item[0] for item in paths]

In [5]:
train_paths, test_paths = train_test_split(paths, stratify = labels, test_size = 0.25)

In [6]:
train_dataset = CustomDataset(train_paths, transform = transform)
test_dataset = CustomDataset(test_paths, transform = transform)

In [7]:
vgg16 = models.vgg16(weights='VGG16_Weights.IMAGENET1K_V1')

In [8]:
for param in vgg16.parameters():
    param.requires_grad = False
vgg16.to(device)

VGG(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (6): ReLU(inplace=True)
    (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): ReLU(inplace=True)
    (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU(inplace=True)
    (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (15): ReLU(inplace=True)
    (16): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1

In [9]:
class CNN(nn.Module):
    def __init__(self, num_hidden_layers=3, neurons_per_layer=4096, dropout_rate=0.4):
        super().__init__()
        
        self.features = vgg16.features

        self.avgpool = nn.AdaptiveAvgPool2d((7, 7))

        layers = []
        inp_dim = 512*7*7

        if num_hidden_layers >= 1:
            layers.extend([
                nn.Flatten(),
                nn.Linear(inp_dim,4096),
                nn.ReLU(),
                nn.Dropout(dropout_rate)
            ])
            inp_dim = 4096

        for _ in range(1, num_hidden_layers):
            layers.extend([
                nn.Linear(inp_dim,neurons_per_layer),
                nn.ReLU(),
                nn.Dropout(dropout_rate)
            ])
            inp_dim = neurons_per_layer
            
        layers.append(nn.Linear(inp_dim,4))

        self.classifier = nn.Sequential(*layers)

    def forward(self, X):
        x = self.features(X)
        x = self.avgpool(x)
        x = self.classifier(x)
        return x

In [10]:
# def objective(trial):
# epochs = trial.suggest_int("epochs", 3,10)
# num_hidden_layers = trial.suggest_int('num_hidden_layers', 1,3, step = 1)
# neurons_per_layer = trial.suggest_categorical('neurons_per_layer', [512, 1024, 2048, 4096])
# lr = trial.suggest_float("lr", 1e-5, 1e-1, log = True)
# dropout_rate = trial.suggest_float('dropout_rate', 0.1,0.7, step = 0.1)
# batch_size = trial.suggest_categorical('batch_size', [8,16,32,64,128])
# weight_decay = trial.suggest_float("weight_decay", 1e-5, 1e-3, log = True)
# opt = trial.suggest_categorical('opt',['adam','sgd'])

# train_loader = DataLoader(train_dataset, batch_size = 128, shuffle = True, pin_memory = True)
# test_loader = DataLoader(test_dataset, batch_size = 128, shuffle = False, pin_memory = True)

model = CNN()
model.to(device)

# loss_fn = nn.CrossEntropyLoss()

# if opt == "adam":
# optimizer = optim.Adam(model.classifier.parameters(), lr = 0.0006196910851444527, weight_decay = 0.00015375266336036664)
# # else:
#     # optimizer = optim.SGD(model.classifier.parameters(), lr = lr, weight_decay = weight_decay, momentum = 0.9)
# epochs = 4
# for epoch in range(epochs):
#     total_epoch_loss = 0
#     for batch_labels,batch_features in train_loader:
#         batch_labels = batch_labels.to(device)
#         batch_features =  batch_features.to(device)

#         outputs = model(batch_features)
#         loss = loss_fn(outputs,batch_labels)
#         total_epoch_loss += loss.item()
#         optimizer.zero_grad()
#         loss.backward()
#         optimizer.step()
#     avg_epoch_loss = total_epoch_loss/len(train_loader)
#     print(f"Epoch:{epoch+1}, Loss:{avg_epoch_loss}")

# model.eval()

# total = 0
# correct = 0

# with torch.no_grad():
#     for batch_labels,batch_features in test_loader:
#         batch_labels = batch_labels.to(device)
#         batch_features =  batch_features.to(device)

#         outputs = model(batch_features)
#         _, y_pred = torch.max(outputs, 1)
#         total += batch_labels.shape[0]
#         correct += (batch_labels == y_pred).sum().item()
        
#     accuracy = correct/total

# return accuracy

CNN(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (6): ReLU(inplace=True)
    (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): ReLU(inplace=True)
    (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU(inplace=True)
    (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (15): ReLU(inplace=True)
    (16): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1

In [None]:
# study = optuna.create_study(
#     direction = "maximize",
#     sampler = optuna.samplers.TPESampler(),
#     pruner = optuna.pruners.MedianPruner()
# )
# study.optimize(objective, n_trials = 50) # CHange n_trials to 50 for final execution

In [None]:
# Best trial:
# Value: 0.9014218009478673
# Params:
# epochs:5
# num_hidden_layers:3
# neurons_per_layer:4096
# lr:0.0006196910851444527
# dropout_rate:0.4
# batch_size:128
# weight_decay:0.00015375266336036664
# opt:adam

In [19]:
#execute after study is done

print("Best trial:")
trial = study.best_trial
print(f"Value: {trial.value}")
print("Params:")
for k,v in trial.params.items():
    print(f"{k}:{v}")

# Visualize
optuna.visualization.plot_optimization_history(study)

Best trial:
Value: 0.9014218009478673
Params:
epochs:5
num_hidden_layers:3
neurons_per_layer:4096
lr:0.0006196910851444527
dropout_rate:0.4
batch_size:128
weight_decay:0.00015375266336036664
opt:adam


ImportError: Tried to import 'plotly' but failed. Please make sure that the package is installed correctly to use this feature. Actual error: No module named 'plotly'.

In [51]:
total = 0
correct = 0

#87,91
with torch.no_grad():
    for batch_labels,batch_features in train_loader:
        batch_features,batch_labels = batch_features.to(device),batch_labels.to(device)
        outputs = model(batch_features)
        _, y_pred = torch.max(outputs, 1)
        total += batch_labels.shape[0]
        correct += (batch_labels == y_pred).sum().item()
print("train: ",correct/total)

train:  0.9247311827956989


In [50]:
total = 0
correct = 0


with torch.no_grad():
    for batch_labels,batch_features in test_loader:
        batch_features,batch_labels = batch_features.to(device),batch_labels.to(device)
        outputs = model(batch_features)
        _, y_pred = torch.max(outputs, 1)
        total += batch_labels.shape[0]
        correct += (batch_labels == y_pred).sum().item()
print("test: ",correct/total)

test:  0.8530805687203792


In [48]:
torch.save({
    "model_state_dict": model.state_dict(),
    "label_to_idx": label_to_idx,
    "idx_to_label":idx_to_label,
    "input_size": (3,224,224) #Change this as input size defined in CNN definition most probably (3,224,224) if using vgg as pre trained base
}, "fundus_classifier(test-89.38, train-96.74).pth")

In [17]:
# model = torch.load('fundus_classifier(test-89.38, train-96.74).pth', map_location = device)

checkpoint = torch.load('fundus_classifier(test-89.38, train-96.74).pth')

model = CNN()  # Make sure CNN matches the model architecture you trained
model.load_state_dict(checkpoint['model_state_dict'])  # ✅ Correct key
model.to(device)
model.eval()

# Also extract label mappings if needed:
idx_to_label = checkpoint['idx_to_label']
label_to_idx = checkpoint['label_to_idx']

In [18]:
model

CNN(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (6): ReLU(inplace=True)
    (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): ReLU(inplace=True)
    (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU(inplace=True)
    (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (15): ReLU(inplace=True)
    (16): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1

In [19]:
def do_inference(image_path, idx_to_label, model):
    img = Image.open(image_path)
    img = img.convert("RGB")
    img = transform(img)
    img = img.unsqueeze(0)
    img = img.to(device)
    
    logits = model(img)
    probabilities = F.softmax(logits, dim =1)
    prob, prediction = torch.max(probabilities, dim=1)
    return float(prob), idx_to_label[int(prediction)]

In [22]:
import torch.nn.functional as F
do_inference('dataset/glaucoma/_10_1472170.jpg', idx_to_label, model)

(0.9891865849494934, 'glaucoma')