# Database from
https://www.kaggle.com/datasets/andrewmvd/ocular-disease-recognition-odir5k/data?select=full_df.csv

In [96]:
import tensorflow as tf

from tensorflow.image import resize
from tensorflow.keras.backend import clear_session
from tensorflow import keras
from tensorflow.keras.callbacks import ModelCheckpoint
from sklearn.model_selection import train_test_split
from keras.metrics import  Recall, CategoricalAccuracy
from IPython.display import clear_output
from tensorflow.keras.models import load_model
from tensorflow.keras import layers, models

from tensorflow.keras.utils import to_categorical
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from numpy import concatenate as concat
from scipy.stats import entropy
import os

from imblearn.under_sampling import RandomUnderSampler

from helpers.help import *
from helpers.helptf import *
from sklearn.utils import resample

from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import Input

# Getting rethinophaty data

In [97]:
# load dataset
df = pd.read_csv('hr-dataset/full_df.csv')

# get the diagnostic of hypertensive retinopathy
ds_hr = df[df['Left-Diagnostic Keywords'].str.contains('hypertensive retinopathy', na=False)]

# get the diagnostic of diabetic retinopathy
ds_dr = df[df['Left-Diagnostic Keywords'].str.contains('diabetic retinopathy', na=False)]

# get the diagnostic of normal fundus
ds_normal = df[df['Left-Diagnostic Keywords'] == 'normal fundus']

# Specific dataframe
df_hr = ds_hr[['Left-Diagnostic Keywords', 'Left-Fundus']]
df_dr = ds_dr[['Left-Diagnostic Keywords', 'Left-Fundus']]
df_normal = ds_normal[['Left-Diagnostic Keywords', 'Left-Fundus']]




# Droping class
df_hr = df_hr.drop('Left-Diagnostic Keywords', axis=1)
df_dr = df_dr.drop('Left-Diagnostic Keywords', axis=1)
df_normal = df_normal.drop('Left-Diagnostic Keywords', axis=1)



print(df_hr.shape[0])
print(df_dr.shape[0])
print(df_normal.shape[0])

191
85
2796


# Solving the undersampling of HR 

In [98]:
df_hr_downsampled = resample(df_hr, replace=False, n_samples=85, random_state=10)
df_dr_downsampled = resample(df_dr, replace=False, n_samples=85, random_state=10)
df_normal_downsampled = resample(df_normal, replace=False, n_samples=85, random_state=10)

print(df_hr_downsampled.shape[0])
print(df_dr_downsampled.shape[0])
print(df_normal_downsampled.shape[0])


85
85
85


# Class transformation

In [99]:
# Open Diabetic Retinopathy dataset
path = os.path.join(os.getcwd(),'hr-dataset/preprocessed_images')

# 0 - Normal
# 1 - Diabetic Rethinopaty
# 2 - Hipertensive Rethinopaty

# roam Hipertensive rethinopaty
array = []
for index, row in df_hr_downsampled.iterrows():
    detailPath = os.path.join(path,row['Left-Fundus'])
    if(os.path.exists(detailPath)):
        array.append([detailPath,2])


# roam Diabeic rethinopaty
for index, row in df_dr_downsampled.iterrows():
    detailPath = os.path.join(path,row['Left-Fundus'])
    if(os.path.exists(detailPath)):
        array.append([detailPath,1])

# roam no rethinopaty
for index, row in df_dr_downsampled.iterrows():
    detailPath = os.path.join(path,row['Left-Fundus'])
    if(os.path.exists(detailPath)):
        array.append([detailPath,0])


    
# transforms the array into nparray
dataset=np.array(array)

np.size(dataset,0)

247

# Get the data ready

In [100]:
X,y=dataset[::,0],dataset[::,1]
y = y.astype(int)

#One hot encode the labels
y = to_categorical(y)

#Shuffle the dataset (to make a unbiased model)
p = np.random.permutation(len(X))
X,y = X[p], y[p]

#Strip off 20% samples for hold out test set
test_idxs = np.random.choice(len(X), size=int(0.2*len(X)), replace=False, p=None)
x_test, y_test = X[test_idxs],y[test_idxs]

#Delete the test set samples from X,y 
X = np.delete(X, test_idxs)
y = np.delete(y, test_idxs, axis = 0)

#usual train-val split. We use 20% here just match the test set size to validation set.
x_train, x_val, y_train, y_val = train_test_split(X, y, test_size=0.20, random_state=42)

In [101]:
print(f"Samples in Training set: {x_train.shape[0]}")
print(f"Samples in Validation set: {x_val.shape[0]}")
print(f"Samples in Test set: {x_test.shape[0]}")

Samples in Training set: 158
Samples in Validation set: 40
Samples in Test set: 49


In [102]:
# Check if imbalance
for i in [y_train, y_test, y_val]:
    print(np.unique(i, return_counts = True, axis = 0))

(array([[0., 0., 1.],
       [0., 1., 0.],
       [1., 0., 0.]]), array([60, 50, 48]))
(array([[0., 0., 1.],
       [0., 1., 0.],
       [1., 0., 0.]]), array([13, 19, 17]))
(array([[0., 0., 1.],
       [0., 1., 0.],
       [1., 0., 0.]]), array([12, 12, 16]))


# Prepares Data for the model

In [103]:
val_dataset=build_dataset(x_val,y_val,repeat=False,batch=256)
test_dataset=build_dataset(x_test,y_test,repeat=False,batch=256)

BATCH_SIZE=8
STEPS_PER_EPOCH=len(x_train)/BATCH_SIZE

train_dataset=build_dataset(x_train,y_train,batch=BATCH_SIZE)

# input shape for the model
input_shape=train_dataset.element_spec[0].shape[1:]


print(train_dataset)
print(val_dataset)
print(test_dataset)

input_shape=train_dataset.element_spec[0].shape[1:]
print(input_shape)

<_BatchDataset element_spec=(TensorSpec(shape=(None, 64, 64, 3), dtype=tf.float32, name=None), TensorSpec(shape=(None, 3), dtype=tf.float64, name=None))>
<_BatchDataset element_spec=(TensorSpec(shape=(None, 64, 64, 3), dtype=tf.float32, name=None), TensorSpec(shape=(None, 3), dtype=tf.float64, name=None))>
<_BatchDataset element_spec=(TensorSpec(shape=(None, 64, 64, 3), dtype=tf.float32, name=None), TensorSpec(shape=(None, 3), dtype=tf.float64, name=None))>
(64, 64, 3)


# Load model

In [104]:
dr_model = load_model('model/model_baseline.keras')

"""
print(dr_model.layers)

# Define the input tensor explicitly
input_tensor = Input(shape=input_shape)  # Match the original input shape

# Create the feature extractor
feature_extractor = Model(inputs=input_tensor, outputs=dr_model.layers[-7].output)


for layer in feature_extractor.layers:
    layer.trainable = False  # Freeze feature extractor

print(input_shape)
print(dr_model.layers)
"""
dr_model.summary()

# Add the new layer

In [105]:
# Add new dense layers for your specific classification task

"""
transfer_model = Sequential([
    feature_extractor,               # Use the feature extractor
    Dense(256, activation='relu'),
    BatchNormalization(),
    Dropout(0.5),
    Dense(3, activation='softmax')  # classes: {"no_rethinopathy", "dr","hr"}
])"""


# classes: {"no_rethinopathy", "dr","hr"}
transfer_model = prep_translearn(model=dr_model, top_layers_to_cut=7, out_dim=3, learning_rate=0.001) 

After layer 0 (conv2d_3), shape: (None, 64, 64, 32)
After layer 1 (batch_normalization_4), shape: (None, 64, 64, 32)
After layer 2 (max_pooling2d_3), shape: (None, 32, 32, 32)
After layer 3 (dropout_4), shape: (None, 32, 32, 32)
After layer 4 (conv2d_4), shape: (None, 30, 30, 64)
After layer 5 (batch_normalization_5), shape: (None, 30, 30, 64)
After layer 6 (max_pooling2d_4), shape: (None, 15, 15, 64)
After layer 7 (dropout_5), shape: (None, 15, 15, 64)
After layer 8 (conv2d_5), shape: (None, 13, 13, 128)
After layer 9 (batch_normalization_6), shape: (None, 13, 13, 128)


# Prepare transfer model

In [106]:
transfer_model.compile(
        loss = "categorical_crossentropy",
        optimizer = Adam(),
        metrics=[CategoricalAccuracy()]
    )

transfer_model.summary()

# Train model

In [107]:
# saves the model with the lowest validation Loss
checkpoint=ModelCheckpoint(filepath='model/model_transferlearning.keras',
                           monitor='val_loss',save_best_only=True,verbose=1)

# logs the training progress to a CSV
csv_logger=keras.callbacks.CSVLogger('logger/trainlog_transferlearning.csv',
                                     separator=',',append=False)

# defines a early stop if in 10 epoches the validation loss dont improve
early_stopper=keras.callbacks.EarlyStopping(monitor='val_loss',
                                            min_delta=0.001,
                                            restore_best_weights=True,
                                            patience=10)

callbacks_list=[checkpoint,early_stopper,csv_logger]

In [108]:
EPOCHS = 20
STEPS_PER_EPOCH=len(x_train)/BATCH_SIZE

STEPS_PER_EPOCH = 20
transfer_model.fit(train_dataset,steps_per_epoch=int(STEPS_PER_EPOCH),epochs=EPOCHS,
          validation_data=val_dataset,validation_steps=None,
          callbacks=callbacks_list)

Epoch 1/20
[1m19/20[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m0s[0m 32ms/step - categorical_accuracy: 0.3179 - loss: 7.8550
Epoch 1: val_loss improved from inf to 115.03207, saving model to model/model_transferlearning.keras
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 47ms/step - categorical_accuracy: 0.3233 - loss: 7.7407 - val_categorical_accuracy: 0.4000 - val_loss: 115.0321
Epoch 2/20
[1m19/20[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m0s[0m 35ms/step - categorical_accuracy: 0.4851 - loss: 2.4189
Epoch 2: val_loss improved from 115.03207 to 34.77339, saving model to model/model_transferlearning.keras
[1m20/20[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 39ms/step - categorical_accuracy: 0.4853 - loss: 2.3694 - val_categorical_accuracy: 0.4500 - val_loss: 34.7734
Epoch 3/20
[1m19/20[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m0s[0m 32ms/step - categorical_accuracy: 0.5654 - loss: 1.3408
Epoch 3: val_loss improved from 34.77339 to 6.987

<keras.src.callbacks.history.History at 0x160f4e6c0>

# Evaluation

In [None]:
# load the best model, trained before
model = keras.models.load_model("model/model_transferlearning.keras")
print("-" * 100)

# evaluates with the test_dataset
print(model.evaluate(test_dataset, verbose=0,return_dict=True))

----------------------------------------------------------------------------------------------------
{'categorical_accuracy': 0.16326530277729034, 'loss': 1.8221027851104736}
<_BatchDataset element_spec=(TensorSpec(shape=(None, 64, 64, 3), dtype=tf.float32, name=None), TensorSpec(shape=(None, 3), dtype=tf.float64, name=None))>
