| Step | What we do                                                | From                         |
| ---- | --------------------------------------------------------- | ---------------------------- |
| 1Ô∏è‚É£  | Input a **fundus image**                                  | IDRiD A/B                    |
| 2Ô∏è‚É£  | Predict **DR stage** using a **CNN classifier**           | Part B                       |
| 3Ô∏è‚É£  | Predict **lesions** using **UNet segmentation**           | Part A                       |
| 4Ô∏è‚É£  | Match the CNN stage prediction to the lesion evidence     | You! (via logic or Grad-CAM) |
| 5Ô∏è‚É£  | üí¨ Output: "Model predicts Stage 2 due to Exudates + MAs" | YOUR GOAL!                   |


In [1]:
from torch.utils.data import Dataset
import pandas as pd
import os
from PIL import Image
from torchvision import transforms
from glob import glob
from torch.utils.data import DataLoader
import torch


class IDRiDGradingDataset(Dataset):
    def __init__(self, csv_file, img_dir, transform=None):
        self.labels_df = pd.read_csv(csv_file)  # ‚úÖ use read_csv
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.labels_df)

    def __getitem__(self, idx):
        base_name = self.labels_df.iloc[idx, 0]  # e.g. IDRiD_361
        possible_files = glob(os.path.join(self.img_dir, base_name + ".*"))
        if not possible_files:
            raise FileNotFoundError(f"‚ùå Image not found for: {base_name}")
        img_path = possible_files[0]

        image = Image.open(img_path).convert("RGB")
        # label = int(self.labels_df.iloc[idx, 1])
        label = int(self.labels_df['Retinopathy grade'].iloc[idx])


        if self.transform:
            image = self.transform(image)

        return image, label


In [2]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.5]*3, [0.5]*3)
])


In [3]:
dataset = IDRiDGradingDataset(
    csv_file=r"C:\Users\HP\Desktop\gsfc\Sem7\MPII\implementation_trial\second\B. Disease Grading\2. Groundtruths\a. IDRiD_Disease Grading_Training Labels.csv",
    img_dir=r"C:\Users\HP\Desktop\gsfc\Sem7\MPII\implementation_trial\second\B. Disease Grading\1. Original Images\a. Training Set",
    transform=transform
)

dataloader = DataLoader(dataset, batch_size=16, shuffle=True)


In [4]:
import torch.nn as nn
import torch.nn.functional as F

class DRStageClassifier(nn.Module):
    def __init__(self):
        super(DRStageClassifier, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3)
        self.fc1 = nn.Linear(64 * 54 * 54, 128)
        self.fc2 = nn.Linear(128, 5)  # 5 classes: 0 to 4

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))  # (224 ‚Üí 111)
        x = self.pool(F.relu(self.conv2(x)))  # (111 ‚Üí 54)
        x = x.view(-1, 64 * 54 * 54)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x


In [5]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


In [6]:
model = DRStageClassifier().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

epochs = 10

for epoch in range(epochs):
    running_loss = 0.0
    for images, labels in dataloader:
        images, labels = images.to(device), labels.to(device)

        outputs = model(images)
        loss = criterion(outputs, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    print(f"‚úÖ Epoch {epoch+1}, Loss: {running_loss/len(dataloader):.4f}")


‚úÖ Epoch 1, Loss: 1.9511
‚úÖ Epoch 2, Loss: 1.3740
‚úÖ Epoch 3, Loss: 1.2864
‚úÖ Epoch 4, Loss: 1.2255
‚úÖ Epoch 5, Loss: 1.1723
‚úÖ Epoch 6, Loss: 1.0898
‚úÖ Epoch 7, Loss: 1.0294
‚úÖ Epoch 8, Loss: 0.9235
‚úÖ Epoch 9, Loss: 0.8983
‚úÖ Epoch 10, Loss: 0.8624


In [7]:
model = DRStageClassifier().to(device)
model.load_state_dict(torch.load("cnn_dr_stage.pth", map_location=device))
model.eval()

  model.load_state_dict(torch.load("cnn_dr_stage.pth", map_location=device))


DRStageClassifier(
  (conv1): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1))
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1))
  (fc1): Linear(in_features=186624, out_features=128, bias=True)
  (fc2): Linear(in_features=128, out_features=5, bias=True)
)

In [8]:
torch.save(model.state_dict(), "cnn_dr_stage.pth")

In [None]:
# print(dataset.labels_df.columns)

In [9]:
correct = 0
total = 0

for images, labels in dataloader:
    images, labels = images.to(device), labels.to(device)

    with torch.no_grad():
        outputs = model(images)
        predicted = torch.argmax(outputs, 1)

    correct += (predicted == labels).sum().item()
    total += labels.size(0)

accuracy = (correct / total) * 100
print(f"‚úÖ Model Accuracy on Dataset: {accuracy:.2f}%")


‚úÖ Model Accuracy on Dataset: 79.66%


In [10]:
for i, (images, labels) in enumerate(dataloader):
    images, labels = images.to(device), labels.to(device)

    with torch.no_grad():
        outputs = model(images)
        predicted = torch.argmax(outputs, 1)

    for j in range(len(labels)):
        actual_label = labels[j].item()
        pred_label = predicted[j].item()
        result = "‚úÖ" if actual_label == pred_label else "‚ùå"
        print(f"{result} Image {i*len(labels)+j+1}: Predicted {pred_label}, Actual {actual_label}")


‚úÖ Image 1: Predicted 0, Actual 0
‚úÖ Image 2: Predicted 2, Actual 2
‚úÖ Image 3: Predicted 3, Actual 3
‚úÖ Image 4: Predicted 0, Actual 0
‚ùå Image 5: Predicted 3, Actual 4
‚úÖ Image 6: Predicted 2, Actual 2
‚úÖ Image 7: Predicted 0, Actual 0
‚úÖ Image 8: Predicted 3, Actual 3
‚ùå Image 9: Predicted 2, Actual 3
‚ùå Image 10: Predicted 0, Actual 2
‚úÖ Image 11: Predicted 1, Actual 1
‚úÖ Image 12: Predicted 2, Actual 2
‚úÖ Image 13: Predicted 3, Actual 3
‚úÖ Image 14: Predicted 4, Actual 4
‚ùå Image 15: Predicted 2, Actual 4
‚úÖ Image 16: Predicted 0, Actual 0
‚úÖ Image 17: Predicted 0, Actual 0
‚úÖ Image 18: Predicted 0, Actual 0
‚úÖ Image 19: Predicted 2, Actual 2
‚úÖ Image 20: Predicted 2, Actual 2
‚ùå Image 21: Predicted 2, Actual 0
‚úÖ Image 22: Predicted 0, Actual 0
‚úÖ Image 23: Predicted 2, Actual 2
‚úÖ Image 24: Predicted 4, Actual 4
‚úÖ Image 25: Predicted 2, Actual 2
‚úÖ Image 26: Predicted 2, Actual 2
‚úÖ Image 27: Predicted 3, Actual 3
‚úÖ Image 28: Predicted 4, Actual 4
‚

üß† Why Is This Happening?

DR stages can look similar, especially in early/mid stages

Your CNN has never ‚Äúseen‚Äù features like hemorrhages, exudates ‚Äî it‚Äôs just guessing patterns

Fundus images are complex; CNN alone has no semantic knowledge of retinal lesions

UNET MODEL TRAINING


In [11]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class UNet(nn.Module):
    def __init__(self, in_channels=3, out_channels=1):
        super(UNet, self).__init__()

        def conv_block(in_c, out_c):
            return nn.Sequential(
                nn.Conv2d(in_c, out_c, kernel_size=3, padding=1),
                nn.ReLU(inplace=True),
                nn.Conv2d(out_c, out_c, kernel_size=3, padding=1),
                nn.ReLU(inplace=True),
            )

        self.encoder1 = conv_block(in_channels, 64)
        self.pool1 = nn.MaxPool2d(2)
        self.encoder2 = conv_block(64, 128)
        self.pool2 = nn.MaxPool2d(2)
        self.encoder3 = conv_block(128, 256)
        self.pool3 = nn.MaxPool2d(2)

        self.bottleneck = conv_block(256, 512)

        self.up3 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2)
        self.decoder3 = conv_block(512, 256)
        self.up2 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)
        self.decoder2 = conv_block(256, 128)
        self.up1 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
        self.decoder1 = conv_block(128, 64)

        self.final = nn.Conv2d(64, out_channels, kernel_size=1)

    def forward(self, x):
        e1 = self.encoder1(x)
        e2 = self.encoder2(self.pool1(e1))
        e3 = self.encoder3(self.pool2(e2))

        b = self.bottleneck(self.pool3(e3))

        d3 = self.up3(b)
        d3 = torch.cat([d3, e3], dim=1)
        d3 = self.decoder3(d3)

        d2 = self.up2(d3)
        d2 = torch.cat([d2, e2], dim=1)
        d2 = self.decoder2(d2)

        d1 = self.up1(d2)
        d1 = torch.cat([d1, e1], dim=1)
        d1 = self.decoder1(d1)

        return torch.sigmoid(self.final(d1))


In [12]:
import os
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import numpy as np

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


In [13]:
class EXSegmentationDataset(Dataset):
    def __init__(self, image_dir, mask_dir, transform=None):
        self.image_dir = image_dir
        self.mask_dir = mask_dir
        self.image_names = sorted(os.listdir(image_dir))
        self.mask_names = sorted(os.listdir(mask_dir))
        self.transform = transform

    def __len__(self):
        return len(self.image_names)

    def __getitem__(self, idx):
        img_path = os.path.join(self.image_dir, self.image_names[idx])
        mask_path = os.path.join(self.mask_dir, self.mask_names[idx])

        image = Image.open(img_path).convert("RGB").resize((256, 256))
        mask = Image.open(mask_path).convert("L").resize((256, 256))

        image = transforms.ToTensor()(image)
        mask = transforms.ToTensor()(mask)

        return image, mask


1. Microaneurysms

In [14]:
image_dir = r"C:\Users\HP\Desktop\gsfc\Sem7\MPII\implementation_trial\second\A. Segmentation\1. Original Images\a. Training Set"
mask_dir = r"C:\Users\HP\Desktop\gsfc\Sem7\MPII\implementation_trial\second\A. Segmentation\2. All Segmentation Groundtruths\a. Training Set\1. Microaneurysms"

dataset = EXSegmentationDataset(image_dir, mask_dir)
loader = DataLoader(dataset, batch_size=4, shuffle=True)


In [15]:
model = UNet().to(device)
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

epochs = 10

for epoch in range(epochs):
    model.train()
    epoch_loss = 0.0

    for images, masks in loader:
        images, masks = images.to(device), masks.to(device)

        outputs = model(images)
        loss = criterion(outputs, masks)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    print(f"‚úÖ Epoch {epoch+1}/{epochs}, Loss: {epoch_loss/len(loader):.4f}")


‚úÖ Epoch 1/10, Loss: 0.2031
‚úÖ Epoch 2/10, Loss: 0.0349
‚úÖ Epoch 3/10, Loss: 0.0352
‚úÖ Epoch 4/10, Loss: 0.0337
‚úÖ Epoch 5/10, Loss: 0.0346
‚úÖ Epoch 6/10, Loss: 0.0360
‚úÖ Epoch 7/10, Loss: 0.0340
‚úÖ Epoch 8/10, Loss: 0.0339
‚úÖ Epoch 9/10, Loss: 0.0338
‚úÖ Epoch 10/10, Loss: 0.0336


In [None]:
torch.save(model.state_dict(), "unet_ma.pth")
print("‚úÖ MA model saved as unet_ma.pth")


3. Hard Exudates

In [None]:
image_dir = r"C:\Users\HP\Desktop\gsfc\Sem7\MPII\implementation_trial\second\A. Segmentation\1. Original Images\a. Training Set"
mask_dir = r"C:\Users\HP\Desktop\gsfc\Sem7\MPII\implementation_trial\second\A. Segmentation\2. All Segmentation Groundtruths\a. Training Set\3. Hard Exudates"

dataset = EXSegmentationDataset(image_dir, mask_dir)
loader = DataLoader(dataset, batch_size=4, shuffle=True)


In [None]:
model = UNet().to(device)
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

epochs = 10

for epoch in range(epochs):
    model.train()
    epoch_loss = 0.0

    for images, masks in loader:
        images, masks = images.to(device), masks.to(device)

        outputs = model(images)
        loss = criterion(outputs, masks)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    print(f"‚úÖ Epoch {epoch+1}/{epochs}, Loss: {epoch_loss/len(loader):.4f}")


In [None]:
torch.save(model.state_dict(), "unet_ex.pth")
print("‚úÖ EX model saved as unet_ex.pth")


2. Haemorrhages

In [None]:
class LesionSegmentationDataset(Dataset):
    def __init__(self, image_dir, mask_dir, lesion_suffix="EX", transform=None):
        self.image_dir = image_dir
        self.mask_dir = mask_dir
        self.transform = transform
        self.lesion_suffix = lesion_suffix

        all_imgs = sorted([f for f in os.listdir(image_dir) if f.endswith(".jpg")])
        all_masks = sorted([f for f in os.listdir(mask_dir) if f.endswith(".tif")])

        # Extract basenames without suffix
        img_basenames = {os.path.splitext(f)[0] for f in all_imgs}
        mask_basenames = {os.path.splitext(f)[0].rsplit("_", 1)[0] for f in all_masks}

        # Match images and masks with same base
        common_basenames = sorted(list(img_basenames & mask_basenames))

        self.image_names = [f + ".jpg" for f in common_basenames]
        self.mask_names  = [f + f"_{self.lesion_suffix}.tif" for f in common_basenames]

    def __len__(self):
        return len(self.image_names)

    def __getitem__(self, idx):
        try:
            img_path = os.path.join(self.image_dir, self.image_names[idx])
            mask_path = os.path.join(self.mask_dir, self.mask_names[idx])

            image = Image.open(img_path).convert("RGB").resize((256, 256))
            mask = Image.open(mask_path).convert("L").resize((256, 256))

            image = transforms.ToTensor()(image)
            mask = transforms.ToTensor()(mask)

            return image, mask

        except Exception as e:
            print(f"‚ùå Error at index {idx}: {e}")
            return self.__getitem__((idx + 1) % len(self.image_names))


In [None]:
# image_dir = r"C:\Users\HP\Desktop\gsfc\Sem7\MPII\implementation_trial\second\A. Segmentation\1. Original Images\a. Training Set"
# mask_dir = r"C:\Users\HP\Desktop\gsfc\Sem7\MPII\implementation_trial\second\A. Segmentation\2. All Segmentation Groundtruths\a. Training Set\2. Haemorrhages"

# dataset = LesionSegmentationDataset(image_dir, mask_dir, lesion_suffix="HE")

# print(f"üßæ Total matched image-mask pairs: {len(dataset)}")

# for i in range(min(len(dataset), 5)):
#     print(f"\nüîπ Image Name: {dataset.image_names[i]}")
#     print(f"üîπ Mask Name : {dataset.mask_names[i]}")


In [None]:
from torch.utils.data import DataLoader

image_dir = r"C:\Users\HP\Desktop\gsfc\Sem7\MPII\implementation_trial\second\A. Segmentation\1. Original Images\a. Training Set"
mask_dir  = r"C:\Users\HP\Desktop\gsfc\Sem7\MPII\implementation_trial\second\A. Segmentation\2. All Segmentation Groundtruths\a. Training Set\2. Haemorrhages"

dataset = LesionSegmentationDataset(image_dir, mask_dir, lesion_suffix="HE")
loader = DataLoader(dataset, batch_size=4, shuffle=True)

print(f"üî• Ready! Dataset length: {len(dataset)}")


In [None]:
model = UNet().to(device)
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

epochs = 10

for epoch in range(epochs):
    model.train()
    epoch_loss = 0.0

    for images, masks in loader:
        images, masks = images.to(device), masks.to(device)

        outputs = model(images)
        loss = criterion(outputs, masks)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    print(f"‚úÖ Epoch {epoch+1}/{epochs}, Loss: {epoch_loss/len(loader):.4f}")


In [None]:
torch.save(model.state_dict(), "unet_he.pth")
print("‚úÖ The model saved as unet_he.pth")


UNET Models integrating with CNN Model

In [None]:
unet_ex = UNet().to(device)   # Exudates
unet_ma = UNet().to(device)   # Microaneurysms
unet_he = UNet().to(device)   # Hemorrhages

# Load trained weights
# unet_ex.load_state_dict(torch.load("unet_ex.pth"))
# unet_ma.load_state_dict(torch.load("unet_ma.pth"))
# unet_he.load_state_dict(torch.load("unet_he.pth"))
unet_ex.load_state_dict(torch.load(r"C:\Users\HP\Desktop\gsfc\Sem7\MPII\implementation_trial\second\unet_ex.pth"))
unet_ma.load_state_dict(torch.load(r"C:\Users\HP\Desktop\gsfc\Sem7\MPII\implementation_trial\second\unet_ma.pth"))
unet_he.load_state_dict(torch.load(r"C:\Users\HP\Desktop\gsfc\Sem7\MPII\implementation_trial\second\unet_he.pth"))


In [None]:
from torchvision.transforms import ToTensor
from PIL import Image

def predict_mask(unet_model, image_path):
    img = Image.open(image_path).convert("RGB").resize((256, 256))
    tensor = ToTensor()(img).unsqueeze(0).to(device)

    with torch.no_grad():
        pred = unet_model(tensor)
    return pred.squeeze().cpu().numpy()


In [None]:
def get_detected_lesions(image_path):
    lesion_detected = []

    ex_mask = predict_mask(unet_ex, image_path)
    ma_mask = predict_mask(unet_ma, image_path)
    he_mask = predict_mask(unet_he, image_path)

    if ex_mask.sum() > 1000:
        lesion_detected.append("Exudates")
    if ma_mask.sum() > 1000:
        lesion_detected.append("Microaneurysms")
    if he_mask.sum() > 1000:
        lesion_detected.append("Hemorrhages")

    return lesion_detected


In [None]:
stage_labels = {
    0: "No DR",
    1: "Mild DR",
    2: "Moderate DR",
    3: "Severe DR",
    4: "Proliferative DR"
}

def explain_prediction(image_path):
    pred_stage = cnn_predict_stage(image_path)
    lesions = get_detected_lesions(image_path)

    print("üì∏ Image:", image_path.split("/")[-1])
    print(f"üß† CNN Predicted Stage: {pred_stage} ({stage_labels[pred_stage]})")

    if lesions:
        print(f"üî¨ Lesions Detected: {', '.join(lesions)}")
        print(f"‚úÖ Explanation: DR stage {pred_stage} likely due to presence of {', '.join(lesions)}")
    else:
        print("‚ö†Ô∏è No significant lesions detected in this image.")


In [None]:
def cnn_predict_stage(img_path):
    cnn_model.eval()
    img = Image.open(img_path).convert("RGB").resize((224, 224))
    tensor = transform(img).unsqueeze(0).to(device)

    with torch.no_grad():
        output = cnn_model(tensor)
        print("üß™ CNN Output Shape:", output.shape)

        pred = torch.argmax(output, dim=1).item()

    return pred


In [None]:
explain_prediction(r"C:\Users\HP\Desktop\gsfc\Sem7\MPII\implementation_trial\second\A. Segmentation\1. Original Images\b. Testing Set\IDRiD_66.jpg")


In [None]:
explain_prediction(r"C:\Users\HP\Desktop\gsfc\Sem7\MPII\implementation_trial\second\A. Segmentation\1. Original Images\a. Training Set\IDRiD_01.jpg")


So when it says:

‚ö†Ô∏è No significant lesions detected

It means:
‚ùóYour UNet model didn't detect any lesion regions for that image ‚Äî even though there‚Äôs a mask for it in groundtruth.

This could be due to:

UNet not learning properly

Threshold too high

Prediction too soft (low confidence)

Or simply: image size mismatch or wrong preprocessing