In [None]:
import os #hai
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from sklearn.metrics import classification_report
from PIL import Image

# Constants
IMG_SIZE = 1000
BATCH_SIZE = 16
NUM_CLASSES = 4
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
CHECKPOINT_PATH = 'checkpoint_effinet_V12.pth'
BEST_MODEL_PATH = 'effinet_model_V12.pth'
LOG_FILE = 'training_log_V12.txt'

# Custom Dataset
class ImageDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        label = self.labels[idx]
        try:
            img = Image.open(img_path).convert('RGB')
            img = img.resize((IMG_SIZE, IMG_SIZE))
        except Exception as e:
            print(f"Error loading image {img_path}: {e}")
            img = Image.new('RGB', (IMG_SIZE, IMG_SIZE))  # Create a blank image
        if self.transform:
            img = self.transform(img)
        else:
            img = transforms.ToTensor()(img)
        return img, label

# Model definition
class EfficientNetB3Model(nn.Module):
    def __init__(self, num_classes):
        super(EfficientNetB3Model, self).__init__()
        self.efficientnet = models.efficientnet_b3(weights=models.EfficientNet_B3_Weights.IMAGENET1K_V1)
        # Freeze the base model
        for param in self.efficientnet.parameters():
            param.requires_grad = False
        # Unfreeze the last 100 layers
        for param in list(self.efficientnet.parameters())[-100:]:
            param.requires_grad = True
        # Modify the classifier
        num_ftrs = self.efficientnet.classifier[1].in_features
        self.efficientnet.classifier = nn.Sequential(
            nn.Linear(num_ftrs, 1024),
            nn.ReLU(),
            nn.BatchNorm1d(1024),
            nn.Dropout(0.2),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.BatchNorm1d(512),
            nn.Dropout(0.2),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        return self.efficientnet(x)

def log_epoch_details(epoch, train_loss, train_acc, test_loss, test_acc, test_report, is_best):
    with open(LOG_FILE, 'a') as f:
        f.write(f"Epoch {epoch+1}\n")
        f.write(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}\n")
        f.write(f"Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.4f}\n")
        f.write("Test Classification Report:\n")
        f.write(test_report)
        if is_best:
            f.write("New best model saved!\n")
        f.write("\n" + "="*50 + "\n")

def get_classification_report(model, dataloader):
    model.eval()
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs = inputs.to(DEVICE)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.numpy())
    return classification_report(all_labels, all_preds)

def save_best_model(model, epoch, accuracy):
    torch.save({
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'test_accuracy': accuracy
    }, BEST_MODEL_PATH)
    print(f"Best model saved as {BEST_MODEL_PATH}")


# def train_model(model, train_loader, test_loader, criterion, optimizer, scheduler, num_epochs, start_epoch=0):
#     best_model_wts = None
#     best_acc = 0.0
#     for epoch in range(start_epoch, num_epochs):
#         print(f'Epoch {epoch+1}/{num_epochs}')
#         print('-' * 10)
#         # Training phase
#         model.train()
#         running_loss = 0.0
#         running_corrects = 0
#         for inputs, labels in train_loader:
#             inputs = inputs.to(DEVICE)
#             labels = labels.to(DEVICE)
#             optimizer.zero_grad()
#             outputs = model(inputs)
#             _, preds = torch.max(outputs, 1)
#             loss = criterion(outputs, labels)
#             loss.backward()
#             optimizer.step()
#             running_loss += loss.item() * inputs.size(0)
#             running_corrects += torch.sum(preds == labels.data)
#         epoch_loss = running_loss / len(train_loader.dataset)
#         epoch_acc = running_corrects.double() / len(train_loader.dataset)
#         print(f'Train Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')
#         # Testing phase
#         model.eval()
#         test_loss = 0.0
#         test_corrects = 0
#         with torch.no_grad():
#             for inputs, labels in test_loader:
#                 inputs = inputs.to(DEVICE)
#                 labels = labels.to(DEVICE)
#                 outputs = model(inputs)
#                 _, preds = torch.max(outputs, 1)
#                 loss = criterion(outputs, labels)
#                 test_loss += loss.item() * inputs.size(0)
#                 test_corrects += torch.sum(preds == labels.data)
#         test_loss = test_loss / len(test_loader.dataset)
#         test_acc = test_corrects.double() / len(test_loader.dataset)
#         print(f'Test Loss: {test_loss:.4f} Acc: {test_acc:.4f}')
#         test_report = get_classification_report(model, test_loader)
#         is_best = test_acc > best_acc
#         if is_best:
#             best_acc = test_acc
#             best_model_wts = model.state_dict().copy()
#             save_best_model(model, epoch, best_acc.item())
#         scheduler.step(test_loss)
#         log_epoch_details(epoch, epoch_loss, epoch_acc.item(), test_loss, test_acc.item(), test_report, is_best)
#         # Save checkpoint
#         save_checkpoint(epoch, model, optimizer, scheduler, best_acc.item())
#     print(f'Best Test Acc: {best_acc:.4f}')
#     model.load_state_dict(best_model_wts)
#     return model, best_acc.item()

def train_model(model, train_loader, test_loader, criterion, optimizer, scheduler, num_epochs, start_epoch=0):
    best_model_wts = None
    best_loss = float('inf')  # Initialize best_loss to a high value
    for epoch in range(start_epoch, num_epochs):
        print(f'Epoch {epoch+1}/{num_epochs}')
        print('-' * 10)
        
        # Training phase
        model.train()
        running_loss = 0.0
        running_corrects = 0
        for inputs, labels in train_loader:
            inputs = inputs.to(DEVICE)
            labels = labels.to(DEVICE)
            optimizer.zero_grad()
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)
        epoch_loss = running_loss / len(train_loader.dataset)
        epoch_acc = running_corrects.double() / len(train_loader.dataset)
        print(f'Train Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')
        
        # Testing phase
        model.eval()
        test_loss = 0.0
        test_corrects = 0
        with torch.no_grad():
            for inputs, labels in test_loader:
                inputs = inputs.to(DEVICE)
                labels = labels.to(DEVICE)
                outputs = model(inputs)
                _, preds = torch.max(outputs, 1)
                loss = criterion(outputs, labels)
                test_loss += loss.item() * inputs.size(0)
                test_corrects += torch.sum(preds == labels.data)
        test_loss = test_loss / len(test_loader.dataset)
        test_acc = test_corrects.double() / len(test_loader.dataset)
        print(f'Test Loss: {test_loss:.4f} Acc: {test_acc:.4f}')
        
        test_report = get_classification_report(model, test_loader)
        
        # Check if the current model is the best based on validation loss
        is_best = test_loss < best_loss
        if is_best:
            best_loss = test_loss  # Update best_loss
            best_model_wts = model.state_dict().copy()
            save_best_model(model, epoch, test_acc.item())  # You can also log test_loss if needed
            
        scheduler.step(test_loss)  # Optionally update the learning rate scheduler
        log_epoch_details(epoch, epoch_loss, epoch_acc.item(), test_loss, test_acc.item(), test_report, is_best)
        
        # Save checkpoint
        save_checkpoint(epoch, model, optimizer, scheduler, test_acc.item())
        
    print(f'Best Test Loss: {best_loss:.4f}')
    model.load_state_dict(best_model_wts)
    return model, best_loss

def save_checkpoint(epoch, model, optimizer, scheduler, best_acc):
    torch.save({
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'scheduler_state_dict': scheduler.state_dict(),
        'best_acc': best_acc
    }, CHECKPOINT_PATH)

def load_checkpoint(model, optimizer, scheduler):
    if os.path.exists(CHECKPOINT_PATH):
        checkpoint = torch.load(CHECKPOINT_PATH)
        model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        scheduler.load_state_dict(checkpoint['scheduler_state_dict'])
        start_epoch = checkpoint['epoch'] + 1
        best_acc = checkpoint['best_acc']
        print(f"Resuming from epoch {start_epoch} with best test accuracy: {best_acc:.4f}")
        return start_epoch, best_acc
    return 0, 0.0



# Main execution
if __name__ == "__main__":
    # Load the train CSV file
    train_csv_file = 'processed_images_4_class_train.csv'
    train_df = pd.read_csv(train_csv_file)

    # Load the test CSV file
    test_csv_file = 'processed_images_4_class_test.csv'
    test_df = pd.read_csv(test_csv_file)

    # Use 'level' column as the labels
    train_labels = train_df['label'].values
    test_labels = test_df['label'].values

    # Use all train data for training
    train_paths = train_df['path'].values

    # Use all test data for testing
    test_paths = test_df['path'].values

    # Define transforms for training (no normalization)
    train_transform = transforms.Compose([
        transforms.RandomRotation(20),
        transforms.RandomHorizontalFlip(),
        transforms.RandomVerticalFlip(),
        transforms.ToTensor(),
    ])

    # Create datasets
    train_dataset = ImageDataset(train_paths, train_labels, transform=train_transform)
    test_dataset = ImageDataset(test_paths, test_labels, transform=None)

    # Create data loaders
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)
    test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=4)

    # Initialize the model
    model = EfficientNetB3Model(NUM_CLASSES).to(DEVICE)

    # Loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.01)

    # Learning rate scheduler
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.2, patience=5, min_lr=1e-7)

    # Load checkpoint if exists
    start_epoch, best_acc = load_checkpoint(model, optimizer, scheduler)

    # Train the model
    num_epochs = 50
    model, best_acc = train_model(model, train_loader, test_loader, criterion, optimizer, scheduler, num_epochs, start_epoch)

    # Load the best model for final evaluation
    best_model_checkpoint = torch.load(BEST_MODEL_PATH)
    model.load_state_dict(best_model_checkpoint['model_state_dict'])
    # Evaluate the best model on test data
    test_report = get_classification_report(model, test_loader)

    # Append final test results to the log file
    with open(LOG_FILE, 'a') as f:
        f.write("\nTraining Complete\n")
        f.write(f"Best Test Accuracy: {best_acc:.4f}\n")
        f.write("Final Test Classification Report:\n")
        f.write(test_report)

    print(f"Training complete. All details logged in {LOG_FILE}")
    print(f"Best model saved as {BEST_MODEL_PATH} with test accuracy: {best_acc:.4f}")   


Epoch 1/50
----------
Train Loss: 1.5460 Acc: 0.4259
Test Loss: 2.0485 Acc: 0.4242
Best model saved as effinet_model_V12.pth
Epoch 2/50
----------
Train Loss: 1.2760 Acc: 0.4911
Test Loss: 7.2056 Acc: 0.4848
Epoch 3/50
----------
Train Loss: 1.0905 Acc: 0.5418
Test Loss: 1.6793 Acc: 0.5657
Best model saved as effinet_model_V12.pth
Epoch 4/50
----------
Train Loss: 0.8783 Acc: 0.5862
Test Loss: 2.2310 Acc: 0.6263
Epoch 5/50
----------
Train Loss: 0.8589 Acc: 0.5915
Test Loss: 0.7931 Acc: 0.6061


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


Best model saved as effinet_model_V12.pth
Epoch 6/50
----------
Train Loss: 0.7947 Acc: 0.6252
Test Loss: 5.0876 Acc: 0.6465
Epoch 7/50
----------
Train Loss: 0.7957 Acc: 0.6131
Test Loss: 7.8667 Acc: 0.6364
Epoch 8/50
----------


In [7]:
import pandas as pd 
df = pd.read_csv('processed_images_4_class_train.csv')
df['label'].sum()

3525

In [6]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score
from PIL import Image
import matplotlib.pyplot as plt
import seaborn as sns

# Constants
IMG_SIZE = 1000
BATCH_SIZE = 32
NUM_CLASSES = 4  # Corrected back to 4 classes
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
BEST_MODEL_PATH = 'effinet_model.pth'

# Custom Dataset
class ImageDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        label = self.labels[idx]
        
        try:
            img = Image.open(img_path).convert('RGB')
            img = img.resize((IMG_SIZE, IMG_SIZE))
        except Exception as e:
            print(f"Error loading image {img_path}: {e}")
            img = Image.new('RGB', (IMG_SIZE, IMG_SIZE))  # Create a blank image
        
        if self.transform:
            img = self.transform(img)
        else:
            img = transforms.ToTensor()(img)
        
        return img, label

# Model definition (unchanged)
class EfficientNetB3Model(nn.Module):
    def __init__(self, num_classes):
        super(EfficientNetB3Model, self).__init__()
        self.efficientnet = models.efficientnet_b3(weights=models.EfficientNet_B3_Weights.IMAGENET1K_V1)
        
        # Modify the classifier
        num_ftrs = self.efficientnet.classifier[1].in_features
        self.efficientnet.classifier = nn.Sequential(
            nn.Linear(num_ftrs, 1024),
            nn.ReLU(),
            nn.BatchNorm1d(1024),
            nn.Dropout(0.2),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.BatchNorm1d(512),
            nn.Dropout(0.2),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        return self.efficientnet(x)


def load_model(model_path):
    model = EfficientNetB3Model(NUM_CLASSES).to(DEVICE)
    checkpoint = torch.load(model_path, map_location=DEVICE)
    model.load_state_dict(checkpoint['model_state_dict'])
    model.eval()  # Set the model to evaluation mode
    return model

def predict(model, dataloader):
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs = inputs.to(DEVICE)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.numpy())
    
    return np.array(all_preds), np.array(all_labels)

def plot_confusion_matrix(y_true, y_pred, classes):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(10, 8))  # Adjusted back for 4 classes
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=classes, yticklabels=classes)
    plt.title('Confusion Matrix')
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.savefig('confusion_matrix_VO.png')
    plt.close()

if __name__ == "__main__":
    # Load the test CSV file
    test_csv_file = 'processed_images_4_class_test.csv'
    test_df = pd.read_csv(test_csv_file)

    # Use 'level' column as the labels
    test_labels = test_df['label'].values

    # Use all test data for testing
    test_paths = test_df['path'].values

    # Create test dataset
    test_dataset = ImageDataset(test_paths, test_labels, transform=None)

    # Create test data loader
    test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=4)

    # Load the best model
    model = load_model(BEST_MODEL_PATH)

    # Make predictions
    predictions, true_labels = predict(model, test_loader)

    # Check unique labels in the data
    unique_labels = np.unique(true_labels)
    print("Unique labels in the data:", unique_labels)

    # Generate classification report
    class_names = ['No DR', 'Mild', 'Severe', 'Proliferative DR']  # Corrected back to 4 classes
    report = classification_report(true_labels, predictions, target_names=class_names)
    print("Classification Report:")
    print(report)

    # Plot and save confusion matrix
    plot_confusion_matrix(true_labels, predictions, class_names)
    print("Confusion matrix saved as 'confusion_matrix.png'")


Unique labels in the data: [0 1 2 3]
Classification Report:
                  precision    recall  f1-score   support

           No DR       0.61      0.56      0.58        25
            Mild       0.61      0.76      0.68        25
          Severe       0.91      0.83      0.87        24
Proliferative DR       1.00      0.92      0.96        25

        accuracy                           0.77        99
       macro avg       0.78      0.77      0.77        99
    weighted avg       0.78      0.77      0.77        99

Confusion matrix saved as 'confusion_matrix.png'


In [2]:
! pip install seaborn

Collecting seaborn
  Using cached seaborn-0.13.2-py3-none-any.whl.metadata (5.4 kB)
Using cached seaborn-0.13.2-py3-none-any.whl (294 kB)
Installing collected packages: seaborn
Successfully installed seaborn-0.13.2


In [7]:
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score
# Calculate and print accuracy
accuracy = accuracy_score(true_labels, predictions)
print(f"Accuracy: {accuracy:.4f}")

Accuracy: 0.7677


In [6]:

CATEGORY_MAPPING = {
    0: 'No Diabetic Retinopathy',
    1: 'Mild Diabetic Retinopathy',
    2: 'Moderate to Severe Diabetic Retinopathy',
    3: 'Proliferative Diabetic Retinopathy'
}

explanation_labels = {
    0: (
        'No DR: No diabetic retinopathy detected. '
        'There is a low chance of developing diabetic retinopathy if blood sugar levels are well-managed.'
    ),
    1: (
        'Mild NPDR (Nonproliferative Diabetic Retinopathy): '
        'Microaneurysms: Small, localized dilations of blood vessels in the retina. '
        'This stage may indicate a moderate chance of progression if not monitored and managed properly.'
    ),
    2: (
        'Moderate to Severe NPDR: '
        'Hemorrhages: Includes both dot-and-blot and flame-shaped hemorrhages. '
        'Exudates: Prominent soft and hard exudates may be present, along with retinal edema. '
        'This stage represents a high chance of further progression if left untreated, requiring close monitoring and intervention.'
    ),
    3: (
        'Proliferative Diabetic Retinopathy (PDR): '
        'Neovascularization: Growth of new, abnormal blood vessels on the retina. '
        'Cotton Wool Spots: Soft lesions caused by localized retinal ischemia. '
        'This stage carries a significant risk of vision loss and complications, necessitating urgent medical intervention.'
    )
}


# Model definition (same as before)
class EfficientNetB3Model(nn.Module):
    def __init__(self, num_classes):
        super(EfficientNetB3Model, self).__init__()
        self.efficientnet = models.efficientnet_b3(weights=None)
        num_ftrs = self.efficientnet.classifier[1].in_features
        self.efficientnet.classifier = nn.Sequential(
            nn.Linear(num_ftrs, 1024),
            nn.ReLU(),
            nn.BatchNorm1d(1024),
            nn.Dropout(0.2),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.BatchNorm1d(512),
            nn.Dropout(0.2),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        return self.efficientnet(x)

# Image processing functions (same as before)
def trim(im):
    percentage = 0.02
    img = np.array(im)
    img_gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
    im_bin = img_gray > 0.1 * np.mean(img_gray[img_gray != 0])
    row_sums = np.sum(im_bin, axis=1)
    col_sums = np.sum(im_bin, axis=0)
    rows = np.where(row_sums > img.shape[1] * percentage)[0]
    cols = np.where(col_sums > img.shape[0] * percentage)[0]
    min_row, min_col = np.min(rows), np.min(cols)
    max_row, max_col = np.max(rows), np.max(cols)
    im_crop = img[min_row : max_row + 1, min_col : max_col + 1]
    return Image.fromarray(im_crop)

def resize_maintain_aspect(image, desired_size):
    old_size = image.size
    ratio = float(desired_size) / max(old_size)
    new_size = tuple([int(x * ratio) for x in old_size])
    im = image.resize(new_size, Image.LANCZOS)
    new_im = Image.new("RGB", (desired_size, desired_size))
    new_im.paste(im, ((desired_size - new_size[0]) // 2, (desired_size - new_size[1]) // 2))
    return new_im

def apply_clahe_color(image):
    lab = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2LAB)
    l, a, b = cv2.split(lab)
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    cl = clahe.apply(l)
    limg = cv2.merge((cl, a, b))
    final_image = cv2.cvtColor(limg, cv2.COLOR_LAB2RGB)
    return Image.fromarray(final_image)

def process_image(image_path):
    image = Image.open(image_path).convert('RGB')
    trimmed_image = trim(image)
    resized_image = resize_maintain_aspect(trimmed_image, IMG_SIZE)
    final_image = apply_clahe_color(resized_image)
    return final_image


# New function to generate detailed results
def generate_result(predicted_class, confidence, stage):
    explanation = explanation_labels.get(predicted_class, 'Unknown')
    confidence_percentage = round(confidence * 100, 2)
    
    if predicted_class == 3:  # Proliferative Diabetic Retinopathy
        Risk_Factor = round(confidence * 100, 2)
    else:
        Risk_Factor = round((1 - confidence) * 100, 2)
    
    result = {
        "predicted_class": int(predicted_class),
        "confidence": confidence_percentage,
        "explanation": explanation,
        "warning": None,
        "Risk_Factor": Risk_Factor,
        "Stage": stage
    }
    
    if confidence < 0.55 and predicted_class == 0:
        result["warning"] = f"You have a higher chance of progressing to the next stage with a risk factor of {Risk_Factor}%. Please consult your doctor for further advice."
    elif confidence >= 0.55 and confidence <= 0.74 and predicted_class == 0:
        result["warning"] = f"You have a minimum chance of progressing to the next stage with a risk factor of {Risk_Factor}%."
    elif confidence >= 0.75 and predicted_class == 0:
        result["warning"] = "Your eye is in the safe zone."
    elif predicted_class == 1:  # Mild Diabetic Retinopathy
        result["warning"] = f"Mild diabetic retinopathy detected. Risk factor is {Risk_Factor}%. Please consult your doctor for further advice."
    elif predicted_class == 2:  # Moderate to Severe Diabetic Retinopathy
        result["warning"] = f"Moderate to severe diabetic retinopathy detected. Risk factor is {Risk_Factor}%. Please consult your doctor for further advice."
    elif predicted_class == 3:  # Proliferative Diabetic Retinopathy
        result["warning"] = "Proliferative diabetic retinopathy detected. Urgent medical intervention is necessary to prevent severe vision loss. Please consult your healthcare provider immediately."
    
    return {
        "predicted_class": result["predicted_class"],
        "stage": result["Stage"],
        "confidence": f"{result['confidence']}%",
        "explanation": result["explanation"],
        "Note": result["warning"],
        "Risk": f"{result['Risk_Factor']}%",
    }


import os
import json
import numpy as np
import torch
import torch.nn as nn
from torchvision import transforms, models
from PIL import Image
import cv2

# Constants
IMG_SIZE = 600
NUM_CLASSES = 4
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
BEST_MODEL_PATH = 'effinet_model_1.pth'


def infer(model, image_path, transform):
    img = process_image(image_path)
    img_tensor = transform(img).unsqueeze(0).to(DEVICE)
    
    model.eval()
    with torch.no_grad():
        outputs = model(img_tensor)
        probabilities = torch.nn.functional.softmax(outputs, dim=1)
        confidence, predicted = torch.max(probabilities, 1)
    
    return predicted.item(), confidence.item()

# Main inference function (updated)
def run_inference(left_image_path, right_image_path):
    # Load the model
    model = EfficientNetB3Model(NUM_CLASSES).to(DEVICE)
    checkpoint = torch.load(BEST_MODEL_PATH, map_location=DEVICE)
    model.load_state_dict(checkpoint['model_state_dict'])
    
    # Define transform
    transform = transforms.Compose([
        transforms.ToTensor(),
    ])
    
    # Run inference on both images
    left_class, left_confidence = infer(model, left_image_path, transform)
    right_class, right_confidence = infer(model, right_image_path, transform)
    
    # Generate detailed results
    left_result = generate_result(left_class, left_confidence, "Left Eye")
    right_result = generate_result(right_class, right_confidence, "Right Eye")
    
    # Prepare final results
    results = {
        "left_eye": left_result,
        "right_eye": right_result
    }
    
    return json.dumps(results, indent=2)


# Example usage
if __name__ == "__main__":
    left_image_path = "/home/studio-lab-user/sage/segregated_images/4/1138_right.jpeg"
    right_image_path = "/home/studio-lab-user/sage/segregated_images/4/1138_right.jpeg"
    
    results = run_inference(left_image_path, right_image_path)
    print(results)

{
  "left_eye": {
    "predicted_class": 3,
    "stage": "Left Eye",
    "confidence": "92.05%",
    "explanation": "Proliferative Diabetic Retinopathy (PDR): Neovascularization: Growth of new, abnormal blood vessels on the retina. Cotton Wool Spots: Soft lesions caused by localized retinal ischemia. This stage carries a significant risk of vision loss and complications, necessitating urgent medical intervention.",
    "Note": "Proliferative diabetic retinopathy detected. Urgent medical intervention is necessary to prevent severe vision loss. Please consult your healthcare provider immediately.",
    "Risk": "92.05%"
  },
  "right_eye": {
    "predicted_class": 3,
    "stage": "Right Eye",
    "confidence": "92.05%",
    "explanation": "Proliferative Diabetic Retinopathy (PDR): Neovascularization: Growth of new, abnormal blood vessels on the retina. Cotton Wool Spots: Soft lesions caused by localized retinal ischemia. This stage carries a significant risk of vision loss and complicatio

In [2]:
! pip install opencv-python

Collecting opencv-python
  Using cached opencv_python-4.10.0.84-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (20 kB)
Using cached opencv_python-4.10.0.84-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (62.5 MB)
Installing collected packages: opencv-python
Successfully installed opencv-python-4.10.0.84
