In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms, models
from PIL import Image
import os
import json
import matplotlib.pyplot as plt
import numpy as np
from tqdm.notebook import tqdm
import random
import zipfile
import shutil
import os, shutil
from google.colab import files
from sklearn.metrics import precision_score, recall_score



In [None]:
def setup_kaggle(): #connecting to kaggle
        print("Please upload your kaggle.json file")
        uploaded = files.upload()
        os.makedirs('/root/.kaggle', exist_ok=True)
        for fn in uploaded.keys():
            shutil.move(fn, '/root/.kaggle/kaggle.json')
        os.chmod('/root/.kaggle/kaggle.json', 600)
        from kaggle.api.kaggle_api_extended import KaggleApi
        api = KaggleApi()
        api.authenticate()
        print("Kaggle API authenticated successfully")
        return api

In [None]:
def download_kaggle_dataset(dataset_name, download_path='/content', unzip=True, force_download=False, api=None):
    dataset_folder = dataset_name.split("/")[-1]
    dataset_path = os.path.join(download_path, dataset_folder)

    if os.path.exists(dataset_path) and not force_download:
        print(f"Dataset already exists at {dataset_path}")
        return dataset_path

    if api is None:
        api = setup_kaggle()
        if api is None:
            print("Kaggle API setup failed.")
            return None
    try:
        print(f"Downloading {dataset_name}...")
        api.dataset_download_files(dataset_name, path=download_path, unzip=unzip)
        print("Download completed.")
        return dataset_path
    except Exception as e:
        print(f"Download failed: {e}")
        return None

In [None]:
class ImageFolderDataset(Dataset):
    def __init__(self, root_dir, transform=None, verbose=False):
        self.root_dir = root_dir
        self.transform = transform
        self.image_paths = []
        self.labels = []
        self.classes = sorted([d for d in os.listdir(root_dir) if os.path.isdir(os.path.join(root_dir, d))]) #sort labels

        for label, class_name in enumerate(self.classes):
            class_dir = os.path.join(root_dir, class_name)
            for img_name in os.listdir(class_dir):
                img_path = os.path.join(class_dir, img_name)
                if os.path.isfile(img_path) and img_name.lower().endswith(('.png', '.jpg', '.jpeg')):
                    self.image_paths.append(img_path)
                    self.labels.append(label)

            if verbose: #מדפיס כמה תמונות נמצאות בתיקייה של כל מחלקה.
                print(f"  Class '{class_name}': {sum(1 for f in os.listdir(class_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg')))} images")

        if verbose: #מדפיס את הסיכום: כמה תמונות וקטגוריות נמצאו.
            print(f"Loaded {len(self.image_paths)} images from {len(self.classes)} classes")

    def __len__(self):
        return len(self.image_paths) #כמה תמונות יש בדטה סט

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        label = self.labels[idx]
        try:
            image = Image.open(img_path).convert("RGB")
        except Exception as e:
            print(f"Error loading image {img_path}: {e}")
            image = Image.new('RGB', (224, 224))  # Placeholder

        if self.transform:
            image = self.transform(image)

        return image, torch.tensor(label, dtype=torch.long) #מחזיר זוג: התמונה שעברה טרנספורמציה, והתווית כ־Tensor של PyTorch.


In [None]:

class Classifier(nn.Module):
    def __init__(self, num_classes, dropout_rate=0.5):
        super(Classifier, self).__init__()
        self.model = models.densenet121(weights='IMAGENET1K_V1')
        in_features = self.model.classifier.in_features
        self.model.classifier = nn.Sequential(
            nn.Dropout(dropout_rate),
            nn.Linear(in_features, num_classes)
        )

    def forward(self, x):
        return self.model(x)


In [None]:
def compute_precision_recall_f1(cm):
    """
    Compute precision, recall and F1-score based on confusion matrix (only for binary classification).
    """

    if cm.shape != (2, 2):
        raise ValueError("This function supports only binary classification (2 classes).")

    TN, FP, FN, TP = cm[0,0], cm[0,1], cm[1,0], cm[1,1]

    precision = TP / (TP + FP) if (TP + FP) > 0 else 0.0
    recall = TP / (TP + FN) if (TP + FN) > 0 else 0.0
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0.0

    print(f"Precision: {precision:.4f}")
    print(f"Recall:    {recall:.4f}")
    print(f"F1 Score:  {f1_score:.4f}")

    return precision, recall, f1_score

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix

def plot_confusion_matrix(model, dataloader, device, class_names=None):
    model.eval()
    y_true = []
    y_pred = []

    with torch.no_grad():
        for images, labels in dataloader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            y_true.extend(labels.cpu().numpy())
            y_pred.extend(predicted.cpu().numpy())

    cm = confusion_matrix(y_true, y_pred)
    print("Confusion Matrix:")
    print(cm)
    TN=cm[0,0]
    FP=cm[0,1]
    FN=cm[1,0]
    TP=cm[1,1]
    fig, ax = plt.subplots(figsize=(6, 6))
    im = ax.imshow(cm, interpolation='nearest', cmap='Blues')
    ax.figure.colorbar(im, ax=ax)

    classes = class_names if class_names is not None else np.arange(cm.shape[0])

    # Set ticks and labels
    ax.set(xticks=np.arange(cm.shape[1]),
           yticks=np.arange(cm.shape[0]),
           xticklabels=classes,
           yticklabels=classes,
           ylabel='True label',
           xlabel='Predicted label')

    plt.title('Confusion Matrix')

    # Label each cell with its count and TP/FP/FN/TN
    thresh = cm.max() / 2.
    if cm.shape == (2, 2):  # Binary classification
        labels = np.array([["TN", "FP"],
                           ["FN", "TP"]])
        for i in range(2):
            for j in range(2):
                ax.text(j, i, f"{labels[i, j]}\n{cm[i, j]}",
                        ha="center", va="center",
                        color="white" if cm[i, j] > thresh else "black")
    else:
        for i in range(cm.shape[0]):
            for j in range(cm.shape[1]):
                ax.text(j, i, str(cm[i, j]),
                        ha="center", va="center",
                        color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.show()
    return TN, FP, FN, TP

In [None]:
def train_model_with_eval(model, train_loader, val_loader, criterion, optimizer, scheduler=None, epochs=10, device=None):
    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    print(f"Training on {device}")
    model.to(device)

    history = {
        'train_loss': [], 'train_acc': [],
        'val_loss': [], 'val_acc': []
    }
    #train
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        correct_train = 0
        total_train = 0

        train_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs} [Train]")
        for batch_X, batch_y in train_bar:#loop for each batch
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)

            optimizer.zero_grad()
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total_train += batch_y.size(0)
            correct_train += (predicted == batch_y).sum().item()

            train_bar.set_postfix({'loss': loss.item(), 'acc': correct_train / total_train})

        train_loss = running_loss / len(train_loader)
        train_accuracy = correct_train / total_train
        history['train_loss'].append(train_loss)
        history['train_acc'].append(train_accuracy)

        # ---------- Evaluation after each epoch ----------
        model.eval()
        correct_val = 0
        total_val = 0
        val_loss = 0.0

        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                total_val += labels.size(0)
                correct_val += (predicted == labels).sum().item()

        val_loss = val_loss / len(val_loader)
        val_accuracy = correct_val / total_val
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_accuracy)

        if scheduler is not None:
            scheduler.step(val_loss)

        print(f"Epoch {epoch+1}/{epochs}")
        print(f"  Train Loss: {train_loss:.4f}, Train Acc: {train_accuracy:.4f}")
        print(f"  Val Loss:   {val_loss:.4f}, Val Acc:   {val_accuracy:.4f}")

    # ---------- Plot Results ----------
    epochs_range = range(1, epochs + 1)

    plt.figure(figsize=(12, 5))

    plt.subplot(1, 2, 1)
    plt.plot(epochs_range, history['train_loss'], label='Train Loss')
    plt.plot(epochs_range, history['val_loss'], label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Loss over Epochs')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(epochs_range, history['train_acc'], label='Train Accuracy')
    plt.plot(epochs_range, history['val_acc'], label='Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.title('Accuracy over Epochs')
    plt.legend()

    plt.tight_layout()
    plt.show()
    return model, history

In [None]:
def test_model(model, test_loader, device=None):
    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    model.to(device)
    model.eval()

    all_preds = []
    all_labels = []

    with torch.no_grad():
        for images, labels in tqdm(test_loader, desc="Testing"):
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, preds = torch.max(outputs, 1)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    # Calculate accuracy
    accuracy1 = sum(np.array(all_preds) == np.array(all_labels)) / len(all_labels)
    print(f"1.Test Accuracy: {accuracy1:.4f}")
    TN, FP, FN, TP=plot_confusion_matrix(model, test_loader, device)
    accuracy2 = (TP + TN) / (TP + TN + FP + FN)
    precision2 = TP / (TP + FP) if (TP + FP) > 0 else 0
    recall2 = TP / (TP + FN) if (TP + FN) > 0 else 0

    print(f"2 Test Accuracy CM: {accuracy2:.4f}")
    print(f"Precision CM: {precision2:.4f}")
    print(f"Recall CM: {recall2:.4f}")

    accuracy3 = np.mean(all_preds == all_labels)
    precision3 = precision_score(all_labels, all_preds, average='macro', zero_division=0)
    recall3 = recall_score(all_labels, all_preds, average='macro', zero_division=0)

    print(f"3 Test Accuracy: {accuracy3:.4f}")
    print(f"3 Precision (macro): {precision3:.4f}")
    print(f"3 Recall (macro): {recall3:.4f}")

    return all_preds, all_labels

In [None]:
def check_paths(train_path, val_path,test_path):
    """
    Checks if the provided paths for the training and validation datasets exist and are valid directories.

    Args:
        train_path (str): Path to the training dataset.
        val_path (str): Path to the validation dataset.

    Returns:
        bool: True if both paths exist and are valid directories, False otherwise.
    """
    if not os.path.exists(train_path):
        print(f"Training path '{train_path}' does not exist.")
        return False
    if not os.path.isdir(train_path):
        print(f"Training path '{train_path}' is not a valid directory.")
        return False

    if not os.path.exists(val_path):
        print(f"Validation path '{val_path}' does not exist.")
        return False
    if not os.path.isdir(val_path):
        print(f"Validation path '{val_path}' is not a valid directory.")
        return False
    if not os.path.exists(test_path):
        print(f"Validation path '{test_path}' does not exist.")
        return False
    if not os.path.isdir(test_path):
        print(f"Validation path '{test_path}' is not a valid directory.")
        return False

    return True

In [None]:
def main():
    print("Installing required packages...")
    !pip install -q kaggle

    # Set up Kaggle and download dataset
    api = setup_kaggle()
    dataset_path = download_kaggle_dataset("pkdarabi/diagnosis-of-diabetic-retinopathy", api=api)
    if dataset_path is None:
        print("Dataset not found.")
        return
    dataset_path = '/content/Diagnosis of Diabetic Retinopathy'
    train_path = os.path.join(dataset_path, 'train')
    val_path = os.path.join(dataset_path, 'valid')
    test_path = os.path.join(dataset_path, 'test')

    if not check_paths(train_path, val_path, test_path):
      print("Dataset not found.")
      return

    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

    # Load datasets
    train_dataset = ImageFolderDataset(train_path, transform=transform)
    val_dataset = ImageFolderDataset(val_path, transform=transform)
    test_dataset= ImageFolderDataset(test_path, transform=transform)

    if (train_dataset.classes != val_dataset.classes) or (val_dataset.classes != test_dataset.classes):
        print("Warning: Training validation and test class labels don't match.")

    num_classes = len(train_dataset.classes)
    print(f"Number of classes: {num_classes}")
    class_mapping = {i: cls for i, cls in enumerate(train_dataset.classes)}
    with open('class_mapping.json', 'w') as f:
        json.dump(class_mapping, f)

    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=2)
    val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False, num_workers=2)
    test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=2)

    model = Classifier(num_classes, dropout_rate=0.2)

    criterion = nn.CrossEntropyLoss()

    optimizer = optim.Adam(model.parameters(), lr=1e-6)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=2, factor=0.1)

    model, history = train_model_with_eval(model, train_loader, val_loader, criterion, optimizer, scheduler, epochs=100)
    test_preds, test_labels=test_model(model, test_loader)

    # Save model
    model_path = 'final_model.pth'
    torch.save({
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'num_classes': num_classes,
        'class_mapping': class_mapping
    }, model_path)

    try:
        from google.colab import drive
        drive.mount('/content/drive')
        shutil.copy(model_path, '/content/drive/MyDrive/final_model.pth')
        print("Model saved to Google Drive.")
    except Exception as e:
        print(f"Failed to save model to Google Drive: {e}")

if __name__ == "__main__":
    main()