In [1]:
import os
import pandas as pd
import numpy as np
import cv2
import tensorflow as tf

from sklearn.utils import shuffle, resample
from tqdm import tqdm
from sklearn.metrics import cohen_kappa_score

from keras.utils import load_img, img_to_array, to_categorical
from keras.models import model_from_json, Model
from keras.optimizers import SGD, RMSprop, Adam

from sklearn.metrics import accuracy_score, precision_score, recall_score, confusion_matrix, classification_report, cohen_kappa_score
import seaborn as sns
import matplotlib.pyplot as plt

In [2]:
losses = {
    'present_output': 'binary_crossentropy',
    'grading_output': 'categorical_crossentropy'
}

# DDR

In [3]:
# Load the CSV file
df = pd.read_csv('path/to/DDR/DR_grading/labels.csv')
df = df[df['label'] != 5]
dataset_path = 'path/to/DDR/DR_grading/all'

# Define constants
IMAGE_SIZE = (256, 256)  # Example size, adjust to your models
NUM_CLASSES = 5  # Grading levels 0-5

In [None]:
df['label'].value_counts()

In [None]:
# Define target sample sizes for each label
target_sizes = {
    0: 500,
    1: 500,
    2: 500,
    3: 500,
    4: 500
}

# Initialize a list to hold the sliced DataFrames
sliced_dfs = []

# Slice the DataFrame for each label
for label, size in target_sizes.items():
    class_df = df[df['label'] == label]
    if len(class_df) >= size:
        # Undersample if the class size is greater than or equal to the target size
        sliced_df = class_df.sample(size, random_state=42)
    else:
        # Oversample if the class size is smaller than the target size
        sliced_df = resample(class_df, replace=True, n_samples=size, random_state=42)
    sliced_dfs.append(sliced_df)

# Combine all sliced DataFrames
test_df = pd.concat(sliced_dfs)

# Shuffle the final dataset
test_df = shuffle(test_df, random_state=42)

test_df['label'].value_counts()

### ResNet

In [6]:
from keras.applications.resnet import preprocess_input as resenet50_preprocess_input

from keras import backend as K
K.set_image_data_format('channels_last')

In [None]:
with open('models_TL/ResNet50_pretrained_enhanced.json', 'r') as json_file:
    model_json = json_file.read()
model = model_from_json(model_json)
model.load_weights('models_TL/ResNet50_pretrained_enhanced.weights.h5')
model.compile(optimizer='adam', loss=losses, metrics=['accuracy'])

In [None]:
def enhance_img(img):
    # Step 1: Apply median filter with a 3x3 kernel
    img = cv2.medianBlur(img.astype(np.uint8), ksize=3)

    # Step 2: Convert to LAB color space
    lab_img = cv2.cvtColor(img, cv2.COLOR_RGB2LAB)
    l, a, b = cv2.split(lab_img)

    # Step 3: Apply CLAHE on the Luminosity (L) channel with 8x8 tile grid
    clahe = cv2.createCLAHE(clipLimit=6.0, tileGridSize=(8, 8))
    l = clahe.apply(l)

    # Step 4: Merge CLAHE enhanced L with original A and B channels
    lab_img = cv2.merge((l, a, b))

    # Step 5: Convert back to RGB color space
    enhanced_img = cv2.cvtColor(lab_img, cv2.COLOR_LAB2RGB)
    
    return enhanced_img

# Prepare dataset
X = []
y = []

for index, row in tqdm(test_df.iterrows(), total=len(test_df), desc="Processing rows"):
    image_path = row['image']
    label = row['label']
    
    image_path = os.path.join(dataset_path, image_path)
    img = load_img(image_path, target_size=(224, 224))
    x = img_to_array(img)
    x = enhance_img(x)
    x = resenet50_preprocess_input(x)
    X.append(x)
    y.append(label)
    
X = np.array(X)
y = np.array(y)

X.shape, y.shape

y_present = (y > 0).astype(int)  # Binary: 0 (no disease), 1 (disease present)
y_grades = np.where(y_present == 1, y, 0)  # Multiclass: 1-4 if disease present, 0 otherwise
y_grades = to_categorical(y_grades, num_classes=5)

In [None]:
predictions = model.predict(X)
present_predictions = (predictions[0] > 0.5).astype(int)  # Binary output
grade_predictions = np.argmax(predictions[1], axis=1)  # Multiclass output
true_grades = np.argmax(y_grades, axis=1)

conf_matrix = confusion_matrix(true_grades, grade_predictions)

# Visualize confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=range(5), yticklabels=range(5))
plt.xlabel("Predicted Grades")
plt.ylabel("True Grades")
plt.title("Confusion Matrix")
plt.show()

In [None]:
# Calculate metrics
accuracy = accuracy_score(true_grades, grade_predictions)
precision = precision_score(true_grades, grade_predictions, average='weighted')
recall = recall_score(true_grades, grade_predictions, average='weighted')
classification_rep = classification_report(true_grades, grade_predictions, target_names=[f'Grade {i}' for i in range(5)])

print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print("\nClassification Report:")
print(classification_rep)

In [None]:
# Calculate weighted Cohen's kappa
kappa = cohen_kappa_score(true_grades, grade_predictions, weights='linear')
print(f"Cohen's Kappa (weighted): {kappa}")

In [None]:
# Calculate weighted Cohen's kappa
kappa = cohen_kappa_score(true_grades, grade_predictions, weights='quadratic')
print(f"Cohen's Kappa (weighted): {kappa}")

In [None]:
# Predict presence (binary)
binary_predictions = (predictions[0] > 0.5).astype(int)  # Threshold at 0.5

# Calculate metrics
accuracy = accuracy_score(y_present, binary_predictions)
precision = precision_score(y_present, binary_predictions, average='binary')
recall = recall_score(y_present, binary_predictions, average='binary')
classification_rep = classification_report(y_present, binary_predictions, target_names=['No Disease', 'Disease'])

# Print results
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print("\nClassification Report:")
print(classification_rep)

### Feature based

In [None]:
with open('models_features/CNN_all_features.json', 'r') as json_file:
    model_json = json_file.read()
model = model_from_json(model_json)
model.load_weights('models_features/CNN_all_features.weights.h5')
model.compile(optimizer='adam', loss=losses, metrics=['accuracy'])

In [7]:
# EX
with open('models_segmentation/EX.json', 'r') as json_file:
    model_json = json_file.read()
model_hard_exudates = model_from_json(model_json)
model_hard_exudates.load_weights('models_segmentation/EX.weights.h5')
model_hard_exudates.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# SE
with open('models_segmentation/SE.json', 'r') as json_file:
    model_json = json_file.read()
model_soft_exudates = model_from_json(model_json)
model_soft_exudates.load_weights('models_segmentation/SE.weights.h5')
model_soft_exudates.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# HE
with open('models_segmentation/HE.json', 'r') as json_file:
    model_json = json_file.read()
model_haemorrhages = model_from_json(model_json)
model_haemorrhages.load_weights('models_segmentation/HE.weights.h5')
model_haemorrhages.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# MA
with open('models_segmentation/MA.json', 'r') as json_file:
    model_json = json_file.read()
model_microaneurysms = model_from_json(model_json)
model_microaneurysms.load_weights('models_segmentation/MA.weights.h5')
model_microaneurysms.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])


In [8]:
def enhance_img(img):
    # Step 1: Apply median filter with a 3x3 kernel
    img = cv2.medianBlur(img.astype(np.uint8), ksize=3)

    # Step 2: Convert to LAB color space
    lab_img = cv2.cvtColor(img, cv2.COLOR_RGB2LAB)
    l, a, b = cv2.split(lab_img)

    # Step 3: Apply CLAHE on the Luminosity (L) channel with 8x8 tile grid
    clahe = cv2.createCLAHE(clipLimit=6.0, tileGridSize=(8, 8))
    l = clahe.apply(l)

    # Step 4: Merge CLAHE enhanced L with original A and B channels
    lab_img = cv2.merge((l, a, b))

    # Step 5: Convert back to RGB color space
    enhanced_img = cv2.cvtColor(lab_img, cv2.COLOR_LAB2RGB)
    
    return enhanced_img

def get_mask(path, target_size):     
    image = cv2.imread(path)
    image = cv2.resize(image, target_size)
    image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    # Convert image to HSV (Hue, Saturation, Value) color space
    hsv_image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)

    # Define the red color range for masking
    lower_red = np.array([0, 100, 50])
    upper_red = np.array([12, 250, 250])
    mask1 = cv2.inRange(hsv_image, lower_red, upper_red)

    # lower_red2 = np.array([170, 120, 70])
    lower_red2 = np.array([10, 60, 70])
    upper_red2 = np.array([180, 255, 255])
    mask2 = cv2.inRange(hsv_image, lower_red2, upper_red2)
    mask2 = cv2.bitwise_not(mask2)

    # Combine masks for red
    red_mask = mask1 + mask2
    
    # overlay the mask on the original image
    red_mask_3ch = cv2.cvtColor(red_mask, cv2.COLOR_GRAY2BGR)
    mask = cv2.addWeighted(image_rgb, 0.7, red_mask_3ch, 0.3, 0)
    
    return mask

# Define function to preprocess images
def preprocess_image_EX(image_path):
    img = load_img(image_path, target_size=IMAGE_SIZE)
    img = img_to_array(img)
    img = img / 255.0
    return img

def preprocess_image_SE(image_path):
    img = load_img(image_path, target_size=IMAGE_SIZE)
    img = img_to_array(img)
    img = img / 255.0
    return img

def preprocess_image_HE(image_path):
    img = load_img(image_path, target_size=IMAGE_SIZE)
    img = img_to_array(img)
    img = enhance_img(img)
    img = img / 255.0
    return img

def preprocess_image_MA(image_path):
    img = img = get_mask(image_path, target_size=IMAGE_SIZE)
    img = img_to_array(img)
    img = img / 255.0
    return img

def generate_feature_maps(image_path, size=(128, 128)):
    # Apply the specific preprocessing method for each model
    img_hard = preprocess_image_EX(image_path)
    img_soft = preprocess_image_SE(image_path)
    img_haem = preprocess_image_HE(image_path)
    img_micro = preprocess_image_MA(image_path)

    # Add batch dimensions for predictions
    img_hard = np.expand_dims(img_hard, axis=0)
    img_soft = np.expand_dims(img_soft, axis=0)
    img_haem = np.expand_dims(img_haem, axis=0)
    img_micro = np.expand_dims(img_micro, axis=0)

    # Generate masks
    
    mask1 = model_hard_exudates.predict(img_hard, verbose=False)  # Predict mask
    mask1 = (mask1 > 0.1).astype(int)  # Convert to binary
    
    mask2 = model_soft_exudates.predict(img_soft, verbose=False)  # Predict mask
    mask2 = (mask2 > 0.1).astype(int)  # Convert to binary
    
    mask3 = model_haemorrhages.predict(img_haem, verbose=False)  # Predict mask
    mask3 = (mask3 > 0.1).astype(int)  # Convert to binary
    
    mask4 = model_microaneurysms.predict(img_micro, verbose=False)  # Predict mask
    mask4 = (mask4 > 0.1).astype(int)  # Convert to binary
    
    # make hard exudates mask red channel, soft exudates mask green channel, haemorrhages mask blue channel, microaneurysms mask alpha channel
    mask1 = tf.image.resize(mask1, size)
    mask2 = tf.image.resize(mask2, size)
    mask3 = tf.image.resize(mask3, size)
    mask4 = tf.image.resize(mask4, size)

    # Combine masks into a single feature map
    combined = np.concatenate([mask1, mask2, mask3, mask4])  # Shape: (H, W, 4)
    combined = np.transpose(combined, (1, 2, 0, 3))
    combined = np.squeeze(combined)

    return combined


In [None]:
# Prepare dataset
X_features = []
y_labels = []

for index, row in tqdm(test_df.iterrows(), total=len(test_df), desc="Processing rows"):
    img = row['image']
    label = row['label']
    
    image_path = os.path.join(dataset_path, img)
    
    combined_features = generate_feature_maps(image_path, size=(256, 256))
    X_features.append(combined_features)
    y_labels.append(label)

X_features = np.array(X_features)
y = np.array(y_labels)

X_features.shape, y.shape

y_present = (y > 0).astype(int)  # Binary: 0 (no disease), 1 (disease present)
y_grades = np.where(y_present == 1, y, 0)  # Multiclass: 1-4 if disease present, 0 otherwise
y_grades = to_categorical(y_grades, num_classes=5)

In [None]:
predictions = model.predict(X_features)
present_predictions = (predictions[0] > 0.5).astype(int)  # Binary output
grade_predictions = np.argmax(predictions[1], axis=1)  # Multiclass output
true_grades = np.argmax(y_grades, axis=1)

conf_matrix = confusion_matrix(true_grades, grade_predictions)

# Visualize confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=range(5), yticklabels=range(5))
plt.xlabel("Predicted Grades")
plt.ylabel("True Grades")
plt.title("Confusion Matrix")
plt.show()

In [None]:
# Calculate metrics
accuracy = accuracy_score(true_grades, grade_predictions)
precision = precision_score(true_grades, grade_predictions, average='weighted')
recall = recall_score(true_grades, grade_predictions, average='weighted')
classification_rep = classification_report(true_grades, grade_predictions, target_names=[f'Grade {i}' for i in range(5)])

print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print("\nClassification Report:")
print(classification_rep)

In [None]:
# Calculate weighted Cohen's kappa
kappa = cohen_kappa_score(true_grades, grade_predictions, weights='linear')
print(f"Cohen's Kappa (weighted): {kappa}")

In [None]:
# Calculate weighted Cohen's kappa
kappa = cohen_kappa_score(true_grades, grade_predictions, weights='quadratic')
print(f"Cohen's Kappa (weighted): {kappa}")

In [None]:
# Predict presence (binary)
binary_predictions = (predictions[0] > 0.5).astype(int)  # Threshold at 0.5

# Calculate metrics
accuracy = accuracy_score(y_present, binary_predictions)
precision = precision_score(y_present, binary_predictions, average='binary')
recall = recall_score(y_present, binary_predictions, average='binary')
classification_rep = classification_report(y_present, binary_predictions, target_names=['No Disease', 'Disease'])

# Print results
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print("\nClassification Report:")
print(classification_rep)

# IDRiD

In [15]:
# Load the CSV file
df_train = pd.read_csv('path/to/IDRiD/IDRiD_Disease Grading_Training Labels.csv')
dataset_path_train = 'path/to/IDRiD/Disease Grading/Original Images/Training Set'

df_test = pd.read_csv('path/to/IDRiD/Disease Grading/Groundtruths/IDRiD_Disease Grading_Testing Labels.csv')
dataset_path_test = 'path/to/IDRiD/Disease Grading/Original Images/Testing Set'

# Define constants
IMAGE_SIZE = (256, 256)  # Example size, adjust to your models
NUM_CLASSES = 5  # Grading levels 0-5

### ResNet

In [16]:
from keras.applications.resnet import preprocess_input as resenet50_preprocess_input

from keras import backend as K
K.set_image_data_format('channels_last')

In [17]:
with open('models_TL/ResNet50_pretrained_enhanced.json', 'r') as json_file:
    model_json = json_file.read()
model = model_from_json(model_json)
model.load_weights('models_TL/ResNet50_pretrained_enhanced.weights.h5')
model.compile(optimizer='adam', loss=losses, metrics=['accuracy'])

In [None]:
def enhance_img(img):
    # Step 1: Apply median filter with a 3x3 kernel
    img = cv2.medianBlur(img.astype(np.uint8), ksize=3)

    # Step 2: Convert to LAB color space
    lab_img = cv2.cvtColor(img, cv2.COLOR_RGB2LAB)
    l, a, b = cv2.split(lab_img)

    # Step 3: Apply CLAHE on the Luminosity (L) channel with 8x8 tile grid
    clahe = cv2.createCLAHE(clipLimit=6.0, tileGridSize=(8, 8))
    l = clahe.apply(l)

    # Step 4: Merge CLAHE enhanced L with original A and B channels
    lab_img = cv2.merge((l, a, b))

    # Step 5: Convert back to RGB color space
    enhanced_img = cv2.cvtColor(lab_img, cv2.COLOR_LAB2RGB)
    
    return enhanced_img

# Prepare dataset
X = []
y = []

for index, row in tqdm(df_train.iterrows(), total=len(df_train), desc="Processing rows"):
    image_path = row['Image name'] + '.jpg'
    label = row['Retinopathy grade']
    
    image_path = os.path.join(dataset_path_train, image_path)
    img = load_img(image_path, target_size=(224, 224))
    x = img_to_array(img)
    x = enhance_img(x)
    x = resenet50_preprocess_input(x)
    X.append(x)
    y.append(label)
    
for index, row in tqdm(df_test.iterrows(), total=len(df_test), desc="Processing rows"):
    image_path = row['Image name'] + '.jpg'
    label = row['Retinopathy grade']
    
    image_path = os.path.join(dataset_path_test, image_path)
    img = load_img(image_path, target_size=(224, 224))
    x = img_to_array(img)
    x = enhance_img(x)
    x = resenet50_preprocess_input(x)
    X.append(x)
    y.append(label)
    
X = np.array(X)
y = np.array(y)

y_present = (y > 0).astype(int)  # Binary: 0 (no disease), 1 (disease present)
y_grades = np.where(y_present == 1, y, 0)  # Multiclass: 1-4 if disease present, 0 otherwise
y_grades = to_categorical(y_grades, num_classes=5)

In [None]:
X.shape, y.shape

In [None]:
predictions = model.predict(X)
present_predictions = (predictions[0] > 0.5).astype(int)  # Binary output
grade_predictions = np.argmax(predictions[1], axis=1)  # Multiclass output
true_grades = np.argmax(y_grades, axis=1)

conf_matrix = confusion_matrix(true_grades, grade_predictions)

# Visualize confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=range(5), yticklabels=range(5))
plt.xlabel("Predicted Grades")
plt.ylabel("True Grades")
plt.title("Confusion Matrix")
plt.show()

In [None]:
# Calculate metrics
accuracy = accuracy_score(true_grades, grade_predictions)
precision = precision_score(true_grades, grade_predictions, average='weighted')
recall = recall_score(true_grades, grade_predictions, average='weighted')
classification_rep = classification_report(true_grades, grade_predictions, target_names=[f'Grade {i}' for i in range(5)])

print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print("\nClassification Report:")
print(classification_rep)

In [None]:
# Calculate weighted Cohen's kappa
kappa = cohen_kappa_score(true_grades, grade_predictions, weights='linear')
print(f"Cohen's Kappa (weighted): {kappa}")

In [None]:
# Calculate weighted Cohen's kappa
kappa = cohen_kappa_score(true_grades, grade_predictions, weights='quadratic')
print(f"Cohen's Kappa (weighted): {kappa}")

In [None]:
# Predict presence (binary)
binary_predictions = (predictions[0] > 0.5).astype(int)  # Threshold at 0.5

# Calculate metrics
accuracy = accuracy_score(y_present, binary_predictions)
precision = precision_score(y_present, binary_predictions, average='binary')
recall = recall_score(y_present, binary_predictions, average='binary')
classification_rep = classification_report(y_present, binary_predictions, target_names=['No Disease', 'Disease'])

# Print results
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print("\nClassification Report:")
print(classification_rep)

### Feature based

In [16]:
with open('models_features/CNN_all_features.json', 'r') as json_file:
    model_json = json_file.read()
model = model_from_json(model_json)
model.load_weights('models_features/CNN_all_features.weights.h5')
model.compile(optimizer='adam', loss=losses, metrics=['accuracy'])

In [17]:
# EX
with open('models_segmentation/EX.json', 'r') as json_file:
    model_json = json_file.read()
model_hard_exudates = model_from_json(model_json)
model_hard_exudates.load_weights('models_segmentation/EX.weights.h5')
model_hard_exudates.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# SE
with open('models_segmentation/SE.json', 'r') as json_file:
    model_json = json_file.read()
model_soft_exudates = model_from_json(model_json)
model_soft_exudates.load_weights('models_segmentation/SE.weights.h5')
model_soft_exudates.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# HE
with open('models_segmentation/HE.json', 'r') as json_file:
    model_json = json_file.read()
model_haemorrhages = model_from_json(model_json)
model_haemorrhages.load_weights('models_segmentation/HE.weights.h5')
model_haemorrhages.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# MA
with open('models_segmentation/MA.json', 'r') as json_file:
    model_json = json_file.read()
model_microaneurysms = model_from_json(model_json)
model_microaneurysms.load_weights('models_segmentation/MA.weights.h5')
model_microaneurysms.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

In [18]:
def enhance_img(img):
    # Step 1: Apply median filter with a 3x3 kernel
    img = cv2.medianBlur(img.astype(np.uint8), ksize=3)

    # Step 2: Convert to LAB color space
    lab_img = cv2.cvtColor(img, cv2.COLOR_RGB2LAB)
    l, a, b = cv2.split(lab_img)

    # Step 3: Apply CLAHE on the Luminosity (L) channel with 8x8 tile grid
    clahe = cv2.createCLAHE(clipLimit=6.0, tileGridSize=(8, 8))
    l = clahe.apply(l)

    # Step 4: Merge CLAHE enhanced L with original A and B channels
    lab_img = cv2.merge((l, a, b))

    # Step 5: Convert back to RGB color space
    enhanced_img = cv2.cvtColor(lab_img, cv2.COLOR_LAB2RGB)
    
    return enhanced_img

def get_mask(path, target_size):     
    image = cv2.imread(path)
    image = cv2.resize(image, target_size)
    image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    # Convert image to HSV (Hue, Saturation, Value) color space
    hsv_image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)

    # Define the red color range for masking
    lower_red = np.array([0, 100, 50])
    upper_red = np.array([12, 250, 250])
    mask1 = cv2.inRange(hsv_image, lower_red, upper_red)

    # lower_red2 = np.array([170, 120, 70])
    lower_red2 = np.array([10, 60, 70])
    upper_red2 = np.array([180, 255, 255])
    mask2 = cv2.inRange(hsv_image, lower_red2, upper_red2)
    mask2 = cv2.bitwise_not(mask2)

    # Combine masks for red
    red_mask = mask1 + mask2
    
    # overlay the mask on the original image
    red_mask_3ch = cv2.cvtColor(red_mask, cv2.COLOR_GRAY2BGR)
    mask = cv2.addWeighted(image_rgb, 0.7, red_mask_3ch, 0.3, 0)
    
    return mask

# Define function to preprocess images
def preprocess_image_EX(image_path):
    img = load_img(image_path, target_size=IMAGE_SIZE)
    img = img_to_array(img)
    img = img / 255.0
    return img

def preprocess_image_SE(image_path):
    img = load_img(image_path, target_size=IMAGE_SIZE)
    img = img_to_array(img)
    img = img / 255.0
    return img

def preprocess_image_HE(image_path):
    img = load_img(image_path, target_size=IMAGE_SIZE)
    img = img_to_array(img)
    img = enhance_img(img)
    img = img / 255.0
    return img

def preprocess_image_MA(image_path):
    img = img = get_mask(image_path, target_size=IMAGE_SIZE)
    img = img_to_array(img)
    img = img / 255.0
    return img

def generate_feature_maps(image_path, size=(128, 128)):
    # Apply the specific preprocessing method for each model
    img_hard = preprocess_image_EX(image_path)
    img_soft = preprocess_image_SE(image_path)
    img_haem = preprocess_image_HE(image_path)
    img_micro = preprocess_image_MA(image_path)

    # Add batch dimensions for predictions
    img_hard = np.expand_dims(img_hard, axis=0)
    img_soft = np.expand_dims(img_soft, axis=0)
    img_haem = np.expand_dims(img_haem, axis=0)
    img_micro = np.expand_dims(img_micro, axis=0)

    # Generate masks
    
    mask1 = model_hard_exudates.predict(img_hard, verbose=False)  # Predict mask
    mask1 = (mask1 > 0.1).astype(int)  # Convert to binary
    
    mask2 = model_soft_exudates.predict(img_soft, verbose=False)  # Predict mask
    mask2 = (mask2 > 0.1).astype(int)  # Convert to binary
    
    mask3 = model_haemorrhages.predict(img_haem, verbose=False)  # Predict mask
    mask3 = (mask3 > 0.1).astype(int)  # Convert to binary
    
    mask4 = model_microaneurysms.predict(img_micro, verbose=False)  # Predict mask
    mask4 = (mask4 > 0.1).astype(int)  # Convert to binary
    
    # make hard exudates mask red channel, soft exudates mask green channel, haemorrhages mask blue channel, microaneurysms mask alpha channel
    mask1 = tf.image.resize(mask1, size)
    mask2 = tf.image.resize(mask2, size)
    mask3 = tf.image.resize(mask3, size)
    mask4 = tf.image.resize(mask4, size)

    # Combine masks into a single feature map
    combined = np.concatenate([mask1, mask2, mask3, mask4])  # Shape: (H, W, 4)
    combined = np.transpose(combined, (1, 2, 0, 3))
    combined = np.squeeze(combined)

    return combined

In [None]:
# Prepare dataset
X_features = []
y_labels = []

for index, row in tqdm(df_train.iterrows(), total=len(df_train), desc="Processing rows"):
    image_path = row['Image name'] + '.jpg'
    label = row['Retinopathy grade']
    
    image_path = os.path.join(dataset_path_train, image_path)
    combined_features = generate_feature_maps(image_path, size=(256, 256))
    X_features.append(combined_features)
    y_labels.append(label)


for index, row in tqdm(df_test.iterrows(), total=len(df_test), desc="Processing rows"):
    image_path = row['Image name'] + '.jpg'
    label = row['Retinopathy grade']
    
    image_path = os.path.join(dataset_path_train, image_path)
    combined_features = generate_feature_maps(image_path, size=(256, 256))
    X_features.append(combined_features)
    y_labels.append(label)

X_features = np.array(X_features)
y_labels = np.array(y_labels)

y_present = (y_labels > 0).astype(int)  # Binary: 0 (no disease), 1 (disease present)
y_grades = np.where(y_present == 1, y_labels, 0)  # Multiclass: 1-4 if disease present, 0 otherwise
y_grades = to_categorical(y_grades, num_classes=5)

In [None]:
predictions = model.predict(X_features)
present_predictions = (predictions[0] > 0.5).astype(int)  # Binary output
grade_predictions = np.argmax(predictions[1], axis=1)  # Multiclass output
true_grades = np.argmax(y_grades, axis=1)

conf_matrix = confusion_matrix(true_grades, grade_predictions)

# Visualize confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=range(5), yticklabels=range(5))
plt.xlabel("Predicted Grades")
plt.ylabel("True Grades")
plt.title("Confusion Matrix")
plt.show()

In [None]:
# Calculate metrics
accuracy = accuracy_score(true_grades, grade_predictions)
precision = precision_score(true_grades, grade_predictions, average='weighted')
recall = recall_score(true_grades, grade_predictions, average='weighted')
classification_rep = classification_report(true_grades, grade_predictions, target_names=[f'Grade {i}' for i in range(5)])

print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print("\nClassification Report:")
print(classification_rep)

In [None]:
# Calculate weighted Cohen's kappa
kappa = cohen_kappa_score(true_grades, grade_predictions, weights='linear')
print(f"Cohen's Kappa (weighted): {kappa}")

In [None]:
# Calculate weighted Cohen's kappa
kappa = cohen_kappa_score(true_grades, grade_predictions, weights='quadratic')
print(f"Cohen's Kappa (weighted): {kappa}")

In [None]:
# Predict presence (binary)
binary_predictions = (predictions[0] > 0.5).astype(int)  # Threshold at 0.5

# Calculate metrics
accuracy = accuracy_score(y_present, binary_predictions)
precision = precision_score(y_present, binary_predictions, average='binary')
recall = recall_score(y_present, binary_predictions, average='binary')
classification_rep = classification_report(y_present, binary_predictions, target_names=['No Disease', 'Disease'])

# Print results
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print("\nClassification Report:")
print(classification_rep)