In [None]:
import pandas as pd
import numpy as np
import os
import tensorflow as tf
import matplotlib.pyplot as plt
import seaborn as sns
import gdown
import zipfile
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import cv2
from tensorflow.keras.utils import Sequence
import albumentations as A
from albumentations.core.transforms_interface import DualTransform
from tensorflow.keras.optimizers.schedules import ExponentialDecay
from tensorflow.keras.callbacks import ReduceLROnPlateau
from tensorflow.keras import layers, models, callbacks
from sklearn.utils.class_weight import compute_class_weight



print(tf.__version__)

2.17.0


In [2]:
file_id = "1oQ6Vy_HqZlVHnkFspgxMn0IcE__D8Kmh"
zip_filename = "ocular-disease-recognition.zip"
extract_path = "./ocular-disease-recognition"

# Check if the file already exists
if not os.path.exists(zip_filename):
    print(f"Downloading {zip_filename}...")
    gdown.download(f"https://drive.google.com/uc?id={file_id}", zip_filename, quiet=False)
else:
    print(f"{zip_filename} already exists. Skipping download.")

ocular-disease-recognition.zip already exists. Skipping download.


In [3]:
# Check if already extracted
if not os.path.exists(extract_path):
    os.makedirs(extract_path, exist_ok=True)
    print(f"Extracting {zip_filename}...")

    with zipfile.ZipFile(zip_filename, "r") as zip_ref:
        zip_ref.extractall(extract_path)

    print(f"Extraction complete! Files extracted to: {extract_path}")
else:
    print(f"Extraction skipped: {extract_path} already exists.")

Extraction skipped: ./ocular-disease-recognition already exists.


In [None]:
# Load the dataset (Update the path if necessary)
dataset_path = "processed_ocular_disease.csv"
df = pd.read_csv(dataset_path)

# **Split into Train, Validation, and Test Sets**
train_df, temp_df = train_test_split(df, test_size=0.3, random_state=42)  # 70% Train, 30% Temp
val_df, test_df = train_test_split(temp_df, test_size=0.5, random_state=42)  # 15% Val, 15% Test

In [None]:
# Compute class weights
class_labels = np.unique(df['labels'])
class_weights = compute_class_weight(class_weight="balanced", classes=class_labels, y=df['labels'])
class_weight_dict = {i: class_weights[i] for i in range(len(class_labels))}

print("Computed Class Weights:", class_weight_dict)


In [None]:
from sklearn.utils import resample

def balance_classes(df):
    """Resample dataset to balance classes."""
    max_size = df['labels'].value_counts().max()  # Find the max number of samples in any class
    balanced_df = pd.concat([
        resample(df[df['labels'] == cls], replace=True, n_samples=max_size, random_state=42)
        for cls in df['labels'].unique()
    ])
    return balanced_df.sample(frac=1).reset_index(drop=True)  # Shuffle after resampling

# Apply to training data only
train_df_balanced = balance_classes(train_df)

# Check if balancing worked
print(train_df_balanced['labels'].value_counts())  # Should now be balanced


labels
4    2030
5    2030
2    2030
3    2030
6    2030
1    2030
7    2030
0    2030
Name: count, dtype: int64


In [10]:
class DualImageAugmentation(DualTransform):
    def __init__(self, transforms, always_apply=False, p=0.5):
        super(DualImageAugmentation, self).__init__(always_apply, p)
        self.transforms = A.Compose(transforms)

    def apply(self, img, **params):
        return self.transforms(image=img)["image"]

    def apply_to_image1(self, img, **params):
        return self.transforms(image=img)["image"]

class OcularDatasetGenerator(Sequence):
    def __init__(self, df, batch_size=32, img_size=(128, 128), shuffle=True, augment=True):
        self.df = df
        self.batch_size = batch_size
        self.img_size = img_size
        self.shuffle = shuffle
        self.augment = augment  
        self.indices = np.arange(len(df))
        
        # Define augmentation pipeline if augmentation is enabled
        if augment:
            self.augmentation_pipeline = self.get_augmentation_pipeline()
        else:
            self.augmentation_pipeline = None

        self.on_epoch_end()

    def __len__(self):
        return int(np.floor(len(self.df) / self.batch_size))  

    def __getitem__(self, index):
        batch_indices = self.indices[index * self.batch_size:(index + 1) * self.batch_size]
        batch = self.df.iloc[batch_indices]
        X, y = self.__data_generation(batch)
        return np.array(X), np.array(y)  

    def __data_generation(self, batch):
        X_batch = []
        y_batch = []
        
        for _, row in batch.iterrows():
            left_image_path = os.path.join('ocular-disease-recognition/preprocessed_images', row['Left-Fundus'])
            right_image_path = os.path.join('ocular-disease-recognition/preprocessed_images', row['Right-Fundus'])
            
            left_image = self.load_image(left_image_path)
            right_image = self.load_image(right_image_path)

            if left_image is None or right_image is None:
                continue

            # Apply identical augmentations to both images
            if self.augment and self.augmentation_pipeline:
                augmented = self.augmentation_pipeline(image=left_image, image1=right_image)
                left_image = augmented["image"]
                right_image = augmented["image1"]

            # Ensure channel dimension is included (for grayscale images)
            left_image = np.expand_dims(left_image, axis=-1)  # Shape: (128, 128, 1)
            right_image = np.expand_dims(right_image, axis=-1)  # Shape: (128, 128, 1)

            combined_image = np.concatenate((left_image, right_image), axis=-1)  # Shape: (128, 128, 2)
            X_batch.append(combined_image)

            y_batch.append(int(row['labels']))  

        return np.array(X_batch, dtype=np.float32), np.array(y_batch, dtype=np.int32)

    def load_image(self, image_path):
        image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
        if image is None:
            return None
        image = cv2.resize(image, self.img_size)
        image = image / 255.0  
        return image

    def get_augmentation_pipeline(self):
        return DualImageAugmentation([
            A.RandomBrightnessContrast(p=0.4),
            A.GaussianBlur(blur_limit=(3, 7), p=0.3), 
            A.HorizontalFlip(p=0.5),
            A.Affine(rotate=(-20, 20), scale=(0.9, 1.1), translate_percent=(0.05, 0.05), p=0.6), 
            A.ElasticTransform(p=0.3),
            A.GridDistortion(p=0.3),
            A.RandomRotate90(p=0.3),
            A.CLAHE(p=0.2)  # Contrast enhancement
        ])

    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indices)

In [None]:
# **Create Data Generators**
# batch_size = 32
batch_size = 16
train_generator = OcularDatasetGenerator(train_df_balanced, batch_size=batch_size, augment=True)
val_generator = OcularDatasetGenerator(val_df, batch_size=batch_size)  # No augmentation for validation
test_generator = OcularDatasetGenerator(test_df, batch_size=batch_size, shuffle=False)  # No augmentation & no shuffle for testing

# **Get Number of Classes**
num_classes = len(np.unique(df['labels']))
print(f"Number of Classes: {num_classes}")

# **Define CNN Model**
model = models.Sequential([
    layers.Conv2D(32, (3, 3), activation='relu', input_shape=(128, 128, 2)),
    layers.MaxPooling2D((2, 2)),
    layers.Conv2D(64, (3, 3), activation='relu'),
    layers.MaxPooling2D((2, 2)),
    layers.Conv2D(128, (3, 3), activation='relu'),
    layers.MaxPooling2D((2, 2)),
    layers.Conv2D(256, (3, 3), activation='relu'),
    layers.MaxPooling2D((2, 2)),
    
    layers.Flatten(),
    layers.Dense(256, activation='relu'),
    layers.Dropout(0.5),  # Dropout for regularization
    layers.Dense(128, activation='relu'),
    layers.Dense(num_classes, activation='softmax')
])


# Exponential Decay Learning Rate
lr_schedule = ExponentialDecay(initial_learning_rate=0.001, decay_steps=1000, decay_rate=0.96, staircase=True)

# ReduceLROnPlateau (Decrease learning rate when validation loss plateaus)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3, verbose=1)

# Compile with new learning rate
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=lr_schedule),
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])


# **Define Early Stopping Callback**
early_stopping = callbacks.EarlyStopping(
    monitor='val_loss',    # Stop if validation loss stops improving
    patience=5,            # Wait for 5 epochs before stopping
    restore_best_weights=True  # Restore best model weights
)

# **Train the Model with Early Stopping**
history = model.fit(
    train_generator,
    epochs=100,
    validation_data=val_generator,
    callbacks=[early_stopping]
)

Number of Classes: 8
Computed Class Weights: {0: 3.0037593984962405, 1: 2.726962457337884, 2: 0.4968905472636816, 3: 2.813380281690141, 4: 6.2421875, 5: 3.4439655172413794, 6: 0.2781065088757396, 7: 1.1285310734463276}
Epoch 1/100


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)
  if is_sparse(pd_dtype):
  if is_sparse(pd_dtype) or not is_extension_array_dtype(pd_dtype):
  self._warn_if_super_not_called()


[1m1015/1015[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m72s[0m 69ms/step - accuracy: 0.2165 - loss: 1.9612 - val_accuracy: 0.2221 - val_loss: 1.8940
Epoch 2/100
[1m1015/1015[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m71s[0m 70ms/step - accuracy: 0.5041 - loss: 1.3147 - val_accuracy: 0.2642 - val_loss: 1.7960
Epoch 3/100
[1m1015/1015[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m71s[0m 70ms/step - accuracy: 0.6314 - loss: 0.9871 - val_accuracy: 0.3827 - val_loss: 1.6654
Epoch 4/100
[1m1015/1015[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m72s[0m 70ms/step - accuracy: 0.7150 - loss: 0.7539 - val_accuracy: 0.4107 - val_loss: 1.6881
Epoch 5/100
[1m1015/1015[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m72s[0m 70ms/step - accuracy: 0.7634 - loss: 0.6130 - val_accuracy: 0.4494 - val_loss: 1.6600
Epoch 6/100
[1m1015/1015[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m70s[0m 68ms/step - accuracy: 0.7976 - loss: 0.5162 - val_accuracy: 0.4801 - val_loss: 1.6214
Epoch 7/10

In [11]:
# **Evaluate the Model on Test Set**
test_loss, test_acc = model.evaluate(test_generator)
print(f"Test Accuracy: {test_acc:.4f}")

[1m59/59[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 152ms/step - accuracy: 0.4790 - loss: 1.4926
Test Accuracy: 0.4764
