### Using CNN512 Model (without dropout) - luminosity + normalized images

---

In [1]:
import os
import cv2
import numpy as np
import pandas as pd
import tensorflow as tf
from scipy.ndimage import gaussian_filter
from sklearn.model_selection import train_test_split
import tensorflow_addons as tfa
from keras.preprocessing.image import ImageDataGenerator

# Load the dataset
data = pd.read_csv("train_split_key_transformation.csv")

# Split the dataset into training and testing sets
train_data = data[data["split"] == "train"]
test_data = data[data["split"] == "test"]

# Define output directories for preprocessed and normalized images
preprocessed_output_dir_train = "preprocessed_images_train/"
preprocessed_output_dir_test = "preprocessed_images_test/"
normalized_output_dir_train = "normalized_images_train/"
normalized_output_dir_test = "normalized_images_test/"

# Create directories if they don't exist
os.makedirs(preprocessed_output_dir_train, exist_ok=True)
os.makedirs(preprocessed_output_dir_test, exist_ok=True)
os.makedirs(normalized_output_dir_train, exist_ok=True)
os.makedirs(normalized_output_dir_test, exist_ok=True)


# Preprocessing function
def preprocess_image(image_path, output_dir):
    # Load and resize the image to 512x512
    resized_image = cv2.imread(image_path)

    # Convert to LAB color space
    lab = cv2.cvtColor(resized_image, cv2.COLOR_BGR2LAB)
    l, a, b = cv2.split(lab)

    # Apply CLAHE to L channel
    clahe = cv2.createCLAHE(clipLimit=5.0, tileGridSize=(8, 8))
    l_eq = clahe.apply(l)

    # Merge back LAB channels
    lab_eq = cv2.merge((l_eq, a, b))
    enhanced_image = cv2.cvtColor(lab_eq, cv2.COLOR_LAB2BGR)

    # Image noise removal using Gaussian filter
    filtered_image = gaussian_filter(enhanced_image, sigma=1)

    # Save the preprocessed image
    image_name = os.path.basename(image_path)
    output_path = os.path.join(output_dir, image_name)
    cv2.imwrite(output_path, filtered_image)

    return output_path


# Define augmentation function
def augment_image(image):
    # Apply augmentation operations
    image = tfa.image.rotate(image, tf.random.uniform([], -35, 35, dtype=tf.float32))
    image = tf.image.random_flip_left_right(image)
    image = tf.image.random_flip_up_down(image)
    image = tfa.image.shear_x(
        image, tf.random.uniform([], -0.15, 0.15, dtype=tf.float32), 0
    )
    image = tfa.image.shear_y(
        image, tf.random.uniform([], -0.15, 0.15, dtype=tf.float32), 0
    )
    image = tfa.image.translate(
        image, [tf.random.uniform([], -0.1, 0.1), tf.random.uniform([], -0.1, 0.1)]
    )
    image = tf.image.random_brightness(image, max_delta=0.5)
    return image


def normalize_images(images, factor=0.5):
    normalized_images = []
    for image in images:
        # Ensure the image is in float32 format
        image_float32 = image.astype(np.float32) / 255.0

        # Compute mean intensity for all channels
        mean_intensity = np.mean(image_float32)

        # Compute scaling factor for intensity adjustment
        scaling_factor = factor / mean_intensity

        # Scale each channel independently
        balanced_image = np.clip(image_float32 * scaling_factor, 0, 1)

        # Convert image back to uint8 format
        normalized_image = (balanced_image * 255).astype(np.uint8)

        normalized_images.append(normalized_image)

    return np.array(normalized_images)


# Apply preprocessing to training images
X_train_paths = []

for img_name in train_data["Image name"]:
    image_path = "train/" + img_name + ".jpg"
    output_path = preprocess_image(image_path, preprocessed_output_dir_train)
    X_train_paths.append(output_path)

# Apply preprocessing to testing images
X_test_paths = []

for img_name in test_data["Image name"]:
    image_path = "test/" + img_name + ".jpg"
    output_path = preprocess_image(image_path, preprocessed_output_dir_test)
    X_test_paths.append(output_path)

# Load preprocessed images
X_train = np.array([cv2.imread(img_path) for img_path in X_train_paths])
X_test = np.array([cv2.imread(img_path) for img_path in X_test_paths])

# Normalize and save training images
for i, image_path in enumerate(X_train_paths):
    image = cv2.imread(image_path)
    normalized_image = normalize_images(image)
    cv2.imwrite(
        os.path.join(normalized_output_dir_train, os.path.basename(image_path)),
        normalized_image,
    )

# Normalize and save testing images
for i, image_path in enumerate(X_test_paths):
    image = cv2.imread(image_path)
    normalized_image = normalize_images(image)
    cv2.imwrite(
        os.path.join(normalized_output_dir_test, os.path.basename(image_path)),
        normalized_image,
    )

# Apply augmentation to training images
X_train_augmented = []

for image in X_train:
    augmented_image = augment_image(image)
    X_train_augmented.append(augmented_image)

# Convert augmented images to numpy array
X_train_augmented = np.array(X_train_augmented)

# Extract labels
y_train = train_data["Retinopathy grade new"].values
y_test = test_data["Retinopathy grade new"].values





TensorFlow Addons (TFA) has ended development and introduction of new features.
TFA has entered a minimal maintenance and release mode until a planned end of life in May 2024.
Please modify downstream libraries to take dependencies from other repositories in our TensorFlow community (e.g. Keras, Keras-CV, and Keras-NLP). 

For more information see: https://github.com/tensorflow/addons/issues/2807 

 The versions of TensorFlow you are currently using is 2.15.0 and is not supported. 
Some things might work, some things might not.
If you were to encounter a bug, do not file an issue.
If you want to make sure you're using a tested and supported configuration, either change the TensorFlow version or the TensorFlow Addons's version. 
You can find the compatibility matrix in TensorFlow Addon's readme:
https://github.com/tensorflow/addons


In [2]:
from tensorflow.keras.layers import (
    Conv2D,
    MaxPooling2D,
    Flatten,
    Dense,
    BatchNormalization,
    ZeroPadding2D,
    Activation,
    Dropout,
)
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import SGD
from sklearn.utils.class_weight import compute_class_weight
from tensorflow.keras.callbacks import LearningRateScheduler
import math

# Define the CNN512 model architecture
model = Sequential()

# Input Layer (Zero Padding)
model.add(ZeroPadding2D(padding=(2, 2), input_shape=(512, 512, 3)))

# Layer 1, 2, 3
model.add(Conv2D(32, (3, 3)))
model.add(BatchNormalization())
model.add(Activation("relu"))

# Layer 4
model.add(MaxPooling2D(pool_size=(2, 2)))

# Layer 5, 6, 7
model.add(Conv2D(64, (3, 3)))
model.add(BatchNormalization())
model.add(Activation("relu"))

# Layer 8
model.add(MaxPooling2D(pool_size=(2, 2)))

# Layer 9, 10, 11
model.add(Conv2D(96, (3, 3)))
model.add(BatchNormalization())
model.add(Activation("relu"))

# Layer 12
model.add(MaxPooling2D(pool_size=(2, 2)))

# Layer 13, 14, 15
model.add(Conv2D(96, (3, 3)))
model.add(BatchNormalization())
model.add(Activation("relu"))

# Layer 16
model.add(MaxPooling2D(pool_size=(2, 2)))

# Layer 17, 18, 19
model.add(Conv2D(128, (3, 3)))
model.add(BatchNormalization())
model.add(Activation("relu"))

# Layer 20
model.add(MaxPooling2D(pool_size=(2, 2)))

# Layer 21, 22, 23
model.add(Conv2D(200, (3, 3)))
model.add(BatchNormalization())
model.add(Activation("relu"))

# Layer 24
model.add(MaxPooling2D(pool_size=(2, 2)))

# Layer 25
model.add(Flatten())

# Layer 26, 27, 28
model.add(Dense(1000))
model.add(BatchNormalization())
model.add(Activation("relu"))

# Layer 29, 30, 31
model.add(Dense(500))
model.add(BatchNormalization())
model.add(Activation("relu"))

# Layer 32
model.add(Dense(4, activation="softmax"))  # Assuming 4 classes for Retinopathy grade





In [3]:
# Base learning rate for custom CNNs
base_learning_rate = 1e-4
# Maximum learning rate for custom CNNs
max_learning_rate = 1e-1

# Create an instance of SGD optimizer with initial learning rate
optimizer = SGD(learning_rate=base_learning_rate, momentum=0.9)

# Create class weights
# Convert y_train to a hashable data type
y_train_list = list(y_train)
classes = np.unique(y_train_list)

# Create class weights
class_weights = compute_class_weight("balanced", classes=classes, y=y_train_list)
class_weight_dict = dict(zip(classes, class_weights))


# Define triangular schedule
def triangular_schedule(epoch):
    """Triangular learning rate scheduler."""
    cycle_length = 10  # Define the length of a cycle
    cycle = math.floor(1 + epoch / (2 * cycle_length))
    x = abs(epoch / cycle_length - 2 * cycle + 1)
    lr = base_learning_rate + (max_learning_rate - base_learning_rate) * max(0, (1 - x))
    return lr


# When fitting the model, include the learning rate scheduler callback
lr_scheduler = LearningRateScheduler(triangular_schedule)

# Compile the model
model.compile(
    optimizer=optimizer, loss="sparse_categorical_crossentropy", metrics=["accuracy"]
)

# Fit the model
history = model.fit(
    X_train_augmented,
    y_train,
    epochs=50,
    batch_size=32,
    validation_data=(X_test, y_test),
    class_weight=class_weight_dict,
    callbacks=[lr_scheduler],
)

# Evaluate the model
model.evaluate(X_test, y_test)

Epoch 1/50


Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


[2.5709457397460938, 0.375]

In [4]:
# Evaluate the model
test_loss_norm, test_accuracy_norm = model.evaluate(X_test, y_test)
print("IDRiD Test Accuracy (without dropout):", test_accuracy_norm)

IDRiD Test Accuracy (without dropout): 0.375


---

### Using CNN512 Model (without dropout) - luminosity + normalized images for MESSIDOR

In [5]:
# Preprocessing function
def preprocess_image(image_path, output_dir):
    # Load and resize the image to 512x512
    resized_image = cv2.imread(image_path)

    # Convert to LAB color space
    lab = cv2.cvtColor(resized_image, cv2.COLOR_BGR2LAB)
    l, a, b = cv2.split(lab)

    # Apply CLAHE to L channel
    clahe = cv2.createCLAHE(clipLimit=5.0, tileGridSize=(8, 8))
    l_eq = clahe.apply(l)

    # Merge back LAB channels
    lab_eq = cv2.merge((l_eq, a, b))
    enhanced_image = cv2.cvtColor(lab_eq, cv2.COLOR_LAB2BGR)

    # Image noise removal using Gaussian filter
    filtered_image = gaussian_filter(enhanced_image, sigma=1)

    # Save the preprocessed image
    image_name = os.path.basename(image_path)
    output_path = os.path.join(output_dir, image_name)
    cv2.imwrite(output_path, filtered_image)

    return output_path

In [6]:
# Load the dataset
data_m = pd.read_excel("Messidor_Data/messidor_mapping.xlsx")

# Split the dataset into training and testing sets
test_data_messidor = data_m[data_m["Split"] == "Test"]

# Define output directories for preprocessed and normalized images
preprocessed_output_dir_test_m = "Messidor_Data/messidor_preprocessed_images_test/"
normalized_output_dir_test_m = "Messidor_Data/messidor_normalized_images_test/"

# Create directories if they don't exist
os.makedirs(preprocessed_output_dir_test_m, exist_ok=True)
os.makedirs(normalized_output_dir_test_m, exist_ok=True)

# Apply preprocessing to testing images
X_test_paths_m = []

for img_name in test_data_messidor["Image_ID"]:
    image_name_without_extension = os.path.splitext(img_name)[0]  # Remove extension
    image_path = f"Messidor_Data/messidor_test/{image_name_without_extension}.jpg"  # Append .jpg extension
    output_path = preprocess_image(image_path, preprocessed_output_dir_test_m)
    X_test_paths_m.append(output_path)

# Load preprocessed images
X_test_m = np.array([cv2.imread(img_path) for img_path in X_test_paths_m])

# Normalize and save testing images
for i, image_path in enumerate(X_test_paths_m):
    image = cv2.imread(image_path)
    normalized_image = normalize_images(image)
    cv2.imwrite(
        os.path.join(normalized_output_dir_test_m, os.path.basename(image_path)),
        normalized_image,
    )

# Extract labels
y_test_m = test_data_messidor["Retinopathy_Grade"].values

In [7]:
# Evaluate the model
test_loss_norm_m, test_accuracy_norm_m = model.evaluate(X_test_m, y_test_m)
print("Messidor Test Accuracy (without dropout):", test_accuracy_norm_m)

Messidor Test Accuracy (without dropout): 0.25


---

### Using CNN512 Model (with dropout) - luminosity + normalized images

In [8]:
# Define the CNN512 model architecture
model_dropout = Sequential()

# Input Layer (Zero Padding)
model_dropout.add(ZeroPadding2D(padding=(2, 2), input_shape=(512, 512, 3)))

# Layer 1, 2, 3
model_dropout.add(Conv2D(32, (3, 3)))
model_dropout.add(BatchNormalization())
model_dropout.add(Activation("relu"))

# Layer 4
model_dropout.add(MaxPooling2D(pool_size=(2, 2)))

# Layer 5, 6, 7
model_dropout.add(Conv2D(64, (3, 3)))
model_dropout.add(BatchNormalization())
model_dropout.add(Activation("relu"))

# Layer 8
model_dropout.add(MaxPooling2D(pool_size=(2, 2)))

# Layer 9, 10, 11
model_dropout.add(Conv2D(96, (3, 3)))
model_dropout.add(BatchNormalization())
model_dropout.add(Activation("relu"))

# Layer 12
model_dropout.add(MaxPooling2D(pool_size=(2, 2)))

# Layer 13, 14, 15
model_dropout.add(Conv2D(96, (3, 3)))
model_dropout.add(BatchNormalization())
model_dropout.add(Activation("relu"))

# Layer 16
model_dropout.add(MaxPooling2D(pool_size=(2, 2)))

# Layer 17, 18, 19
model_dropout.add(Conv2D(128, (3, 3)))
model_dropout.add(BatchNormalization())
model_dropout.add(Activation("relu"))

# Layer 20
model_dropout.add(MaxPooling2D(pool_size=(2, 2)))

# Layer 21, 22, 23
model_dropout.add(Conv2D(200, (3, 3)))
model_dropout.add(BatchNormalization())
model_dropout.add(Activation("relu"))

# Layer 24
model_dropout.add(MaxPooling2D(pool_size=(2, 2)))

# Layer 25
model_dropout.add(Flatten())

# Layer 26, 27, 28
model_dropout.add(Dense(1000))
model_dropout.add(BatchNormalization())
model_dropout.add(Activation("relu"))
model_dropout.add(Dropout(0.5))

# Layer 29, 30, 31
model_dropout.add(Dense(500))
model_dropout.add(BatchNormalization())
model_dropout.add(Activation("relu"))
model_dropout.add(Dropout(0.5))

# Layer 32
model_dropout.add(
    Dense(4, activation="softmax")
)  # Assuming 4 classes for Retinopathy grade

In [9]:
# Base learning rate for custom CNNs
base_learning_rate = 1e-4
# Maximum learning rate for custom CNNs
max_learning_rate = 1e-1

# Create an instance of SGD optimizer with initial learning rate
optimizer = SGD(learning_rate=base_learning_rate, momentum=0.9)

# Create class weights
# Convert y_train to a hashable data type
y_train_list = list(y_train)
classes = np.unique(y_train_list)

# Create class weights
class_weights = compute_class_weight("balanced", classes=classes, y=y_train_list)
class_weight_dict = dict(zip(classes, class_weights))


# Define triangular schedule
def triangular_schedule(epoch):
    """Triangular learning rate scheduler."""
    cycle_length = 10  # Define the length of a cycle
    cycle = math.floor(1 + epoch / (2 * cycle_length))
    x = abs(epoch / cycle_length - 2 * cycle + 1)
    lr = base_learning_rate + (max_learning_rate - base_learning_rate) * max(0, (1 - x))
    return lr


# When fitting the model, include the learning rate scheduler callback
lr_scheduler = LearningRateScheduler(triangular_schedule)

# Compile the model
model_dropout.compile(
    optimizer=optimizer, loss="sparse_categorical_crossentropy", metrics=["accuracy"]
)

# Fit the model
history = model_dropout.fit(
    X_train_augmented,
    y_train,
    epochs=50,
    batch_size=32,
    validation_data=(X_test, y_test),
    class_weight=class_weight_dict,
    callbacks=[lr_scheduler],
)

# Evaluate the model
model_dropout.evaluate(X_test, y_test)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


[1.3847036361694336, 0.25]

In [10]:
# Evaluate the model
test_loss, test_accuracy = model_dropout.evaluate(X_test, y_test)
print("IDRiD Test Accuracy (with dropout):", test_accuracy)

IDRiD Test Accuracy (with dropout): 0.25


### Using CNN512 Model (with dropout) - luminosity + normalized images MESSIDOR

In [11]:
# Evaluate the model
test_loss_dropout_m, test_accuracy_dropout_m = model_dropout.evaluate(
    X_test_m, y_test_m
)
print("Messidor Test Accuracy (with dropout):", test_accuracy_dropout_m)

Messidor Test Accuracy (with dropout): 0.25


---

### Train model on base images

In [12]:
# Load the dataset
data = pd.read_csv("train_split_key_transformation.csv")

# Split the dataset into training and testing sets
train_data = data[data["split"] == "train"]
test_data = data[data["split"] == "test"]

# Define output directories for preprocessed and normalized images
preprocessed_output_dir_train = "preprocessed_images_train/"
preprocessed_output_dir_test = "preprocessed_images_test/"
normalized_output_dir_train = "normalized_images_train/"
normalized_output_dir_test = "normalized_images_test/"


# Define augmentation function
def augment_image(image):
    # Apply augmentation operations
    image = tfa.image.rotate(image, tf.random.uniform([], -35, 35, dtype=tf.float32))
    image = tf.image.random_flip_left_right(image)
    image = tf.image.random_flip_up_down(image)
    image = tfa.image.shear_x(
        image, tf.random.uniform([], -0.15, 0.15, dtype=tf.float32), 0
    )
    image = tfa.image.shear_y(
        image, tf.random.uniform([], -0.15, 0.15, dtype=tf.float32), 0
    )
    image = tfa.image.translate(
        image, [tf.random.uniform([], -0.1, 0.1), tf.random.uniform([], -0.1, 0.1)]
    )
    image = tf.image.random_brightness(image, max_delta=0.5)
    return image


# Apply preprocessing to training images
X_train_paths = []

for img_name in train_data["Image name"]:
    image_path = "train/" + img_name + ".jpg"
    output_path = preprocess_image(image_path, preprocessed_output_dir_train)
    X_train_paths.append(output_path)

# Apply preprocessing to testing images
X_test_paths = []

for img_name in test_data["Image name"]:
    image_path = "test/" + img_name + ".jpg"
    output_path = preprocess_image(image_path, preprocessed_output_dir_test)
    X_test_paths.append(output_path)

# Load preprocessed images
X_train = np.array([cv2.imread(img_path) for img_path in X_train_paths])
X_test = np.array([cv2.imread(img_path) for img_path in X_test_paths])

# Normalize and save training images
for i, image_path in enumerate(X_train_paths):
    image = cv2.imread(image_path)
    normalized_image = normalize_images(image)
    cv2.imwrite(
        os.path.join(normalized_output_dir_train, os.path.basename(image_path)),
        normalized_image,
    )

# Normalize and save testing images
for i, image_path in enumerate(X_test_paths):
    image = cv2.imread(image_path)
    normalized_image = normalize_images(image)
    cv2.imwrite(
        os.path.join(normalized_output_dir_test, os.path.basename(image_path)),
        normalized_image,
    )


# Apply preprocessing to training images
X_train_paths = ["train/" + img_name + ".jpg" for img_name in train_data["Image name"]]

# Apply preprocessing to testing images
X_test_paths = ["test/" + img_name + ".jpg" for img_name in test_data["Image name"]]

# Load images
X_train = np.array([cv2.imread(img_path) for img_path in X_train_paths])
X_test = np.array([cv2.imread(img_path) for img_path in X_test_paths])

# Extract labels
y_train = train_data["Retinopathy grade new"].values
y_test = test_data["Retinopathy grade new"].values

# Apply augmentation to training images
X_train_augmented = []

for image in X_train:
    augmented_image = augment_image(image)
    X_train_augmented.append(augmented_image)

# Convert augmented images to numpy array
X_train_augmented = np.array(X_train_augmented)

# Extract labels
y_train = train_data["Retinopathy grade new"].values
y_test = test_data["Retinopathy grade new"].values

In [14]:
# Load the dataset
data = pd.read_csv("train_split_key_transformation.csv")

# Split the dataset into training and testing sets
train_data = data[data["split"] == "train"]
test_data = data[data["split"] == "test"]


# Define augmentation function
def augment_image(image):
    # Apply augmentation operations
    image = tfa.image.rotate(image, tf.random.uniform([], -35, 35, dtype=tf.float32))
    image = tf.image.random_flip_left_right(image)
    image = tf.image.random_flip_up_down(image)
    image = tfa.image.shear_x(
        image, tf.random.uniform([], -0.15, 0.15, dtype=tf.float32), 0
    )
    image = tfa.image.shear_y(
        image, tf.random.uniform([], -0.15, 0.15, dtype=tf.float32), 0
    )
    image = tfa.image.translate(
        image, [tf.random.uniform([], -0.1, 0.1), tf.random.uniform([], -0.1, 0.1)]
    )
    image = tf.image.random_brightness(image, max_delta=0.5)
    return image


# Apply preprocessing to training images NOT
X_train_paths = []

for img_name in train_data["Image name"]:
    image_path = "train/" + img_name + ".jpg"
    X_train_paths.append(image_path)

# Apply preprocessing to testing images NOT
X_test_paths = []

for img_name in test_data["Image name"]:
    image_path = "test/" + img_name + ".jpg"
    X_test_paths.append(image_path)

X_train = np.array([cv2.imread(img_path) for img_path in X_train_paths])
X_test = np.array([cv2.imread(img_path) for img_path in X_test_paths])

# Apply augmentation to training images
X_train_augmented = []

for image in X_train:
    augmented_image = augment_image(image)
    X_train_augmented.append(augmented_image)

# Convert augmented images to numpy array
X_train_augmented = np.array(X_train_augmented)

# Extract labels
y_train = train_data["Retinopathy grade new"].values
y_test = test_data["Retinopathy grade new"].values

In [15]:
# Define the CNN512 model architecture
model_base = Sequential()

# Input Layer (Zero Padding)
model_base.add(ZeroPadding2D(padding=(2, 2), input_shape=(512, 512, 3)))

# Layer 1, 2, 3
model_base.add(Conv2D(32, (3, 3)))
model_base.add(BatchNormalization())
model_base.add(Activation("relu"))

# Layer 4
model_base.add(MaxPooling2D(pool_size=(2, 2)))

# Layer 5, 6, 7
model_base.add(Conv2D(64, (3, 3)))
model_base.add(BatchNormalization())
model_base.add(Activation("relu"))

# Layer 8
model_base.add(MaxPooling2D(pool_size=(2, 2)))

# Layer 9, 10, 11
model_base.add(Conv2D(96, (3, 3)))
model_base.add(BatchNormalization())
model_base.add(Activation("relu"))

# Layer 12
model_base.add(MaxPooling2D(pool_size=(2, 2)))

# Layer 13, 14, 15
model_base.add(Conv2D(96, (3, 3)))
model_base.add(BatchNormalization())
model_base.add(Activation("relu"))

# Layer 16
model_base.add(MaxPooling2D(pool_size=(2, 2)))

# Layer 17, 18, 19
model_base.add(Conv2D(128, (3, 3)))
model_base.add(BatchNormalization())
model_base.add(Activation("relu"))

# Layer 20
model_base.add(MaxPooling2D(pool_size=(2, 2)))

# Layer 21, 22, 23
model_base.add(Conv2D(200, (3, 3)))
model_base.add(BatchNormalization())
model_base.add(Activation("relu"))

# Layer 24
model_base.add(MaxPooling2D(pool_size=(2, 2)))

# Layer 25
model_base.add(Flatten())

# Layer 26, 27, 28
model_base.add(Dense(1000))
model_base.add(BatchNormalization())
model_base.add(Activation("relu"))

# Layer 29, 30, 31
model_base.add(Dense(500))
model_base.add(BatchNormalization())
model_base.add(Activation("relu"))

# Layer 32
model_base.add(
    Dense(4, activation="softmax")
)  # Assuming 4 classes for Retinopathy grade

In [16]:
# Base learning rate for custom CNNs
base_learning_rate = 1e-4
# Maximum learning rate for custom CNNs
max_learning_rate = 1e-1

# Create an instance of SGD optimizer with initial learning rate
optimizer = SGD(learning_rate=base_learning_rate, momentum=0.9)

# Create class weights
# Convert y_train to a hashable data type
y_train_list = list(y_train)
classes = np.unique(y_train_list)

# Create class weights
class_weights = compute_class_weight("balanced", classes=classes, y=y_train_list)
class_weight_dict = dict(zip(classes, class_weights))


# Define triangular schedule
def triangular_schedule(epoch):
    """Triangular learning rate scheduler."""
    cycle_length = 10  # Define the length of a cycle
    cycle = math.floor(1 + epoch / (2 * cycle_length))
    x = abs(epoch / cycle_length - 2 * cycle + 1)
    lr = base_learning_rate + (max_learning_rate - base_learning_rate) * max(0, (1 - x))
    return lr


# When fitting the model, include the learning rate scheduler callback
lr_scheduler = LearningRateScheduler(triangular_schedule)

# Compile the model
model_base.compile(
    optimizer=optimizer, loss="sparse_categorical_crossentropy", metrics=["accuracy"]
)

# Fit the model
history = model_base.fit(
    X_train_augmented,
    y_train,
    epochs=50,
    batch_size=32,
    validation_data=(X_test, y_test),
    class_weight=class_weight_dict,
    callbacks=[lr_scheduler],
)

# Evaluate the model
model_base.evaluate(X_test, y_test)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


[2.172149896621704, 0.2916666567325592]

In [17]:
# Evaluate the model
test_loss_base, test_accuracy_base = model_base.evaluate(X_test, y_test)
print("IDRiD Test Accuracy BASE images (without dropout):", test_accuracy_base)

IDRiD Test Accuracy BASE images (without dropout): 0.2916666567325592


In [18]:
# Load the dataset
data_m = pd.read_excel("Messidor_Data/messidor_mapping.xlsx")

# Split the dataset into training and testing sets
test_data_messidor = data_m[data_m["Split"] == "Test"]

# Apply preprocessing to testing images
X_test_paths_m = []

for img_name in test_data_messidor["Image_ID"]:
    image_name_without_extension = os.path.splitext(img_name)[0]  # Remove extension
    image_path = f"Messidor_Data/messidor_test/{image_name_without_extension}.jpg"  # Append .jpg extension
    X_test_paths_m.append(image_path)

# Load preprocessed images
X_test_m = np.array([cv2.imread(img_path) for img_path in X_test_paths_m])

# Extract labels
y_test_m = test_data_messidor["Retinopathy_Grade"].values

In [19]:
# Evaluate the model
test_loss_base_m, test_accuracy_base_m = model_base.evaluate(X_test_m, y_test_m)
print("Messidor Test Accuracy BASE images (without dropout):", test_accuracy_base_m)

Messidor Test Accuracy BASE images (without dropout): 0.25


---

### Suspected experiment

In [20]:
import os
import cv2
import numpy as np
import pandas as pd
import tensorflow as tf
from scipy.ndimage import gaussian_filter
from sklearn.model_selection import train_test_split
import tensorflow_addons as tfa
from keras.preprocessing.image import ImageDataGenerator

# Load the dataset
data = pd.read_csv("train_split_key_transformation.csv")

# Split the dataset into training and testing sets
train_data = data[data["split"] == "train"]
test_data = data[data["split"] == "test"]

# Define output directories for preprocessed and normalized images
preprocessed_output_dir_train = "preprocessed_images_train/"
preprocessed_output_dir_test = "preprocessed_images_test/"
normalized_output_dir_train = "normalized_images_train/"
normalized_output_dir_test = "normalized_images_test/"

# Create directories if they don't exist
os.makedirs(preprocessed_output_dir_train, exist_ok=True)
os.makedirs(preprocessed_output_dir_test, exist_ok=True)
os.makedirs(normalized_output_dir_train, exist_ok=True)
os.makedirs(normalized_output_dir_test, exist_ok=True)


# Preprocessing function
def preprocess_image(image_path, output_dir):
    # Load and resize the image to 512x512
    resized_image = cv2.imread(image_path)

    # Convert to LAB color space
    lab = cv2.cvtColor(resized_image, cv2.COLOR_BGR2LAB)
    l, a, b = cv2.split(lab)

    # Apply CLAHE to L channel
    clahe = cv2.createCLAHE(clipLimit=5.0, tileGridSize=(8, 8))
    l_eq = clahe.apply(l)

    # Merge back LAB channels
    lab_eq = cv2.merge((l_eq, a, b))
    enhanced_image = cv2.cvtColor(lab_eq, cv2.COLOR_LAB2BGR)

    # Image noise removal using Gaussian filter
    filtered_image = gaussian_filter(enhanced_image, sigma=1)

    # Save the preprocessed image
    image_name = os.path.basename(image_path)
    output_path = os.path.join(output_dir, image_name)
    cv2.imwrite(output_path, filtered_image)

    return output_path


# Define augmentation function
def augment_image(image):
    # Apply augmentation operations
    image = tfa.image.rotate(image, tf.random.uniform([], -35, 35, dtype=tf.float32))
    image = tf.image.random_flip_left_right(image)
    image = tf.image.random_flip_up_down(image)
    image = tfa.image.shear_x(
        image, tf.random.uniform([], -0.15, 0.15, dtype=tf.float32), 0
    )
    image = tfa.image.shear_y(
        image, tf.random.uniform([], -0.15, 0.15, dtype=tf.float32), 0
    )
    image = tfa.image.translate(
        image, [tf.random.uniform([], -0.1, 0.1), tf.random.uniform([], -0.1, 0.1)]
    )
    image = tf.image.random_brightness(image, max_delta=0.5)
    return image


def normalize_images(images, factor=0.5):
    normalized_images = []
    for image in images:
        # Ensure the image is in float32 format
        image_float32 = image.astype(np.float32) / 255.0

        # Compute mean intensity for all channels
        mean_intensity = np.mean(image_float32)

        # Compute scaling factor for intensity adjustment
        scaling_factor = factor / mean_intensity

        # Scale each channel independently
        balanced_image = np.clip(image_float32 * scaling_factor, 0, 1)

        # Convert image back to uint8 format
        normalized_image = (balanced_image * 255).astype(np.uint8)

        normalized_images.append(normalized_image)

    return np.array(normalized_images)


# Apply preprocessing to training images
X_train_paths = []

for img_name in train_data["Image name"]:
    image_path = "train/" + img_name + ".jpg"
    output_path = preprocess_image(image_path, preprocessed_output_dir_train)
    X_train_paths.append(output_path)

# Apply preprocessing to testing images
X_test_paths = []

for img_name in test_data["Image name"]:
    image_path = "test/" + img_name + ".jpg"
    output_path = preprocess_image(image_path, preprocessed_output_dir_test)
    X_test_paths.append(output_path)

# Load preprocessed images
X_train = np.array([cv2.imread(img_path) for img_path in X_train_paths])
X_test = np.array([cv2.imread(img_path) for img_path in X_test_paths])

X_train_paths_normalized = []
X_test_paths_normalized = []

# Normalize and save training images
for i, image_path in enumerate(X_train_paths):
    image = cv2.imread(image_path)
    normalized_image = normalize_images(image)
    cv2.imwrite(
        os.path.join(normalized_output_dir_train, os.path.basename(image_path)),
        normalized_image,
    )
    X_train_paths_normalized.append(
        os.path.join(normalized_output_dir_train, os.path.basename(image_path)),
        normalized_image,
    )

# Normalize and save testing images
for i, image_path in enumerate(X_test_paths):
    image = cv2.imread(image_path)
    normalized_image = normalize_images(image)
    cv2.imwrite(
        os.path.join(normalized_output_dir_test, os.path.basename(image_path)),
        normalized_image,
    )
    X_test_paths_normalized.append(
        os.path.join(normalized_output_dir_test, os.path.basename(image_path)),
        normalized_image,
    )

X_train_new_norm = np.array([cv2.imread(img_path) for img_path in X_train_paths])
X_test_new_norm = np.array([cv2.imread(img_path) for img_path in X_test_paths])

# Apply augmentation to training images
X_train_augmented = []

for image in X_train_new_norm:
    augmented_image = augment_image(image)
    X_train_augmented.append(augmented_image)

# Convert augmented images to numpy array
X_train_augmented = np.array(X_train_augmented)

# Extract labels
y_train = train_data["Retinopathy grade new"].values
y_test = test_data["Retinopathy grade new"].values

TypeError: list.append() takes exactly one argument (2 given)

In [None]:
# Base learning rate for custom CNNs
base_learning_rate = 1e-4
# Maximum learning rate for custom CNNs
max_learning_rate = 1e-1

# Create an instance of SGD optimizer with initial learning rate
optimizer = SGD(learning_rate=base_learning_rate, momentum=0.9)

# Create class weights
# Convert y_train to a hashable data type
y_train_list = list(y_train)
classes = np.unique(y_train_list)

# Create class weights
class_weights = compute_class_weight("balanced", classes=classes, y=y_train_list)
class_weight_dict = dict(zip(classes, class_weights))


# Define triangular schedule
def triangular_schedule(epoch):
    """Triangular learning rate scheduler."""
    cycle_length = 10  # Define the length of a cycle
    cycle = math.floor(1 + epoch / (2 * cycle_length))
    x = abs(epoch / cycle_length - 2 * cycle + 1)
    lr = base_learning_rate + (max_learning_rate - base_learning_rate) * max(0, (1 - x))
    return lr


# When fitting the model, include the learning rate scheduler callback
lr_scheduler = LearningRateScheduler(triangular_schedule)

# Compile the model
model.compile(
    optimizer=optimizer, loss="sparse_categorical_crossentropy", metrics=["accuracy"]
)

# Fit the model
history = model.fit(
    X_train_augmented,
    y_train,
    epochs=50,
    batch_size=32,
    validation_data=(X_test, y_test),
    class_weight=class_weight_dict,
    callbacks=[lr_scheduler],
)

# Evaluate the model
model.evaluate(X_test_new_norm, y_test)