# 1.Import Dependencies

In [15]:
import pandas as pd
import tensorflow as tf
from matplotlib import pyplot as plt
from tensorflow.keras.applications.inception_v3 import InceptionV3
from tensorflow.keras.applications.resnet50 import ResNet50
from tensorflow.keras.applications.vgg19 import VGG19
from tensorflow.keras.applications.densenet import DenseNet201
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import Concatenate, Conv2D, ReLU, Dropout, MaxPool2D, BatchNormalization
import tensorflow.keras.layers as tfl
from tensorflow.keras.utils import image_dataset_from_directory
import os
import numpy as np
import tensorflow_addons as tfa
from tensorflow import keras
import sklearn
from sklearn.metrics import classification_report
import tensorflow.keras.backend as kb
import seaborn as sns

# 2.Load Dataset

In [16]:
def process(image,label):
    image = tf.cast(image/255. ,tf.float32)
    return image,label

In [17]:
BATCH_SIZE = 8
IMG_SIZE = (224, 224)
train_directory = "../input/retinal-fundus-images/Retinal Fundus Images/train"
test_directory = "../input/retinal-fundus-images/Retinal Fundus Images/test"
val_directory = "../input/retinal-fundus-images/Retinal Fundus Images/val"
train_dataset = image_dataset_from_directory(train_directory,
                                             shuffle=True,
                                             batch_size=BATCH_SIZE,
                                             image_size=IMG_SIZE,
                                             label_mode='categorical',
                                             seed=42)
train_dataset = train_dataset.map(process)

validation_dataset = image_dataset_from_directory(test_directory,
                                             shuffle=True,
                                             batch_size=BATCH_SIZE,
                                             image_size=IMG_SIZE,
                                             label_mode='categorical',
                                             seed=42)
validation_dataset = validation_dataset.map(process)

test_dataset = image_dataset_from_directory(val_directory,
                                             shuffle=True,
                                             batch_size=BATCH_SIZE,
                                             image_size=IMG_SIZE,
                                             label_mode='categorical',
                                             seed=42)
test_dataset = test_dataset.map(process)

# 3.Training!

In [30]:
def block_a(inputs):
    
  x1 = Conv2D(128, (1, 1), padding='same', kernel_initializer='he_normal')(inputs)
  x1 = Conv2D(128, (3, 3), padding='same', kernel_initializer='he_normal')(x1)
  x1 = BatchNormalization(axis=-1)(x1)

  x2 = Conv2D(64, (1, 1), padding='same', kernel_initializer='he_normal')(inputs)
  x2 = Conv2D(64, (5, 5), padding='same', kernel_initializer='he_normal')(x2)
  x2 = BatchNormalization(axis=-1)(x2)
    
  x3 = Conv2D(128, (1, 1), padding='same', kernel_initializer='he_normal')(inputs)
  x3 = Conv2D(32, (7, 7), padding='same', kernel_initializer='he_normal')(x3)
  x3 = BatchNormalization(axis=-1)(x3)

  x4 = MaxPool2D(strides=1, padding='same')(inputs)
    
  x = Concatenate()([x1, x2, x3, x4])
  #x = Dropout(0.3)(x)

  return x

In [31]:
def block_b(inputs):
  
  x1 = Conv2D(64, (1, 1), padding='same', kernel_initializer='he_normal')(inputs)
  x1 = Conv2D(64, (3, 3), padding='same', kernel_initializer='he_normal')(x1)
  x1 = BatchNormalization(axis=-1)(x1)
    
  x2 = Conv2D(16, (1, 1), padding='same', kernel_initializer='he_normal')(inputs)
  x2 = Conv2D(16, (5, 5), padding='same', kernel_initializer='he_normal')(x2)
  x2 = BatchNormalization(axis=-1)(x2)

  x3 = Conv2D(8, (1, 1), padding='same', kernel_initializer='he_normal')(inputs)
  x3 = Conv2D(8, (7, 7), padding='same', kernel_initializer='he_normal')(x3)
  x3 = BatchNormalization(axis=-1)(x3)
    
  x = Concatenate()([x1, x2, x3])

  x = MaxPool2D(padding='valid')(x)
  #x = Dropout(0.3)(x)

  return x

In [32]:
def block_c(inputs):
    
  x1 = Conv2D(128, (1, 1), padding='same', kernel_initializer='he_normal')(inputs)
  x1 = BatchNormalization(axis=-1)(x1)
    
  x2 = Conv2D(128, (1, 1), padding='same', kernel_initializer='he_normal')(inputs)
  x2 = Conv2D(64, (3, 3), padding='same', kernel_initializer='he_normal')(x2)
  x2 = BatchNormalization(axis=-1)(x2)

  x3 = Conv2D(128, (1, 1), padding='same', kernel_initializer='he_normal')(inputs)
  x3 = Conv2D(32, (5, 5), padding='same', kernel_initializer='he_normal')(x3)
  x3 = BatchNormalization(axis=-1)(x3)
    
  x4 = MaxPool2D(strides=1, padding='same')(inputs)

  x = Concatenate()([x1, x2, x3, x4])
  #x = Dropout(0.3)(x)
  
  return x

In [33]:
def block_d(inputs):
  
  x1 = Conv2D(64, (1, 1), padding='same', kernel_initializer='he_normal')(inputs)
  x1 = BatchNormalization(axis=-1)(x1)

  x2 = Conv2D(32, (1, 1), padding='same', kernel_initializer='he_normal')(inputs)
  x2 = Conv2D(32, (3, 3), padding='same', kernel_initializer='he_normal')(x2)
  x1 = BatchNormalization(axis=-1)(x1)
    
  x3 = Conv2D(16, (1, 1), padding='same', kernel_initializer='he_normal')(inputs)
  x3 = Conv2D(16, (5, 5), padding='same', kernel_initializer='he_normal')(x3)
  x1 = BatchNormalization(axis=-1)(x1)

  x = Concatenate()([x1, x2, x3])
    
  x = MaxPool2D(padding='valid')(x)
  #x = Dropout(0.3)(x)
  
  return x

In [38]:
def classification(pre_trained_model_name : str = None, input_shape: tuple =(224, 224, 3), scratch = False):
  
  if pre_trained_model_name == 'inception':
    pre_trained_model = InceptionV3(input_shape=input_shape, include_top=False, weights='imagenet')
  
  elif pre_trained_model_name == 'resnet':
    pre_trained_model = ResNet50(input_shape=input_shape, include_top=False, weights='imagenet')
  
  elif pre_trained_model_name == 'vgg':
    pre_trained_model = VGG19(input_shape=input_shape, include_top=False, weights='imagenet')
  
  elif pre_trained_model_name == "densenet":
    pre_trained_model = DenseNet201(input_shape=input_shape, include_top=False, weights='imagenet')
    
  if scratch == False and pre_trained_model_name != None:
     for layer in pre_trained_model.layers:
        layer.trainable = False
  
  inputs = tf.keras.Input(shape=input_shape, name='Input')

  if pre_trained_model_name != None:
      x = pre_trained_model(inputs)
      x = block_c(x)
      x = block_d(x)
  else:
    x = block_a(inputs)
    x = block_b(x)
    x = block_c(x)
    x = block_d(x)
    x = block_a(x)
    x = block_b(x)
    x = block_c(x)
    x = block_d(x)
    x = block_c(x)
    x = block_d(x)
    
  x = tfl.Flatten(name='Flatten')(x)
  x = tfl.Dense(11, activation='softmax', name='Output')(x)
  model = tf.keras.Model(inputs=inputs, outputs=x, name=f'Transfer_Learning_by_{pre_trained_model_name}')
  
  return model

In [42]:
model = classification(pre_trained_model_name=None)
model.summary()

In [43]:
base_learning_rate = 0.01
checkpoint_filepath = '/kaggle/working/tmp/models/a_b_a_b_b_c_d_c_d_d_11D'
model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_filepath, save_weights_only=False, monitor='val_F1 Score')
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=base_learning_rate),
              loss=tf.keras.losses.CategoricalCrossentropy(),
              metrics=['accuracy',
                       tf.keras.metrics.Precision(name='Precision'),
                       tf.keras.metrics.Recall(name="Recall"),
                       tfa.metrics.F1Score(11, average='macro', threshold=0.5, name='F1 Score')])

In [None]:
history = model.fit(train_dataset,validation_data=validation_dataset, epochs=15, callbacks=model_checkpoint_callback)

# 4.Fine Tunning

In [None]:
for layer in model.layers:
    layer.trainable = True
model.summary()

In [None]:
base_learning_rate = 0.00001
checkpoint_filepath = '/kaggle/working/tmp/models/vgg_c_d_11D'
model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_filepath, save_weights_only=False, monitor='val_F1 Score')
model.compile(optimizer=tf.keras.optimizers.Adam(lr=base_learning_rate),
              loss=tf.keras.losses.CategoricalCrossentropy(),
              metrics=['accuracy',
                       tf.keras.metrics.Precision(),
                       tf.keras.metrics.Recall(),
                       tfa.metrics.F1Score(11, average='macro', threshold=0.5, name='F1 Score')])

In [None]:
history1 = model.fit(train_dataset, validation_data=validation_dataset, epochs=10, callbacks=model_checkpoint_callback)

# 5.Evaluation

In [None]:
acc = [0.] + history1.history['accuracy']
val_acc = [0.] + history1.history['val_accuracy']

loss = history1.history['loss']
val_loss = history1.history['val_loss']

f1 = history1.history['F1 Score']
f1_val = history1.history['val_F1 Score']

precision = history1.history['precision_1']
precision_val = history1.history['val_precision_1']

recall = history1.history['recall_1']
recall_val = history1.history['val_recall_1']

fig = plt.figure(figsize=(10, 20))
plt.subplot(5, 1, 1)
plt.plot(acc, label='Training Accuracy', linewidth=3)
plt.plot(val_acc, label='Validation Accuracy', linewidth=3)
plt.legend(loc='lower right')
plt.ylabel('Accuracy')
#plt.ylim([min(plt.ylim()),1])
plt.title('Training and Validation Accuracy')

plt.subplot(5, 1, 2)
plt.plot(loss, label='Training Loss', linewidth=3)
plt.plot(val_loss, label='Validation Loss', linewidth=3)
plt.legend(loc='upper right')
plt.ylabel('Cross Entropy Loss')
#plt.ylim([0,1.0])
plt.title('Training and Validation Loss')

plt.subplot(5, 1, 3)
plt.plot(precision, label='Training Precision', linewidth=3)
plt.plot(precision_val, label='Validation Precision', linewidth=3)
plt.legend(loc='lower right')
plt.ylabel('Precision')
#plt.ylim([0,1.0])
plt.title('Training and Validation Precision')

plt.subplot(5, 1, 4)
plt.plot(recall, label='Training Recall', linewidth=3)
plt.plot(recall_val, label='Validation Recall', linewidth=3)
plt.legend(loc='lower right')
plt.ylabel('Recall')
#plt.ylim([0,1.0])
plt.title('Training and Validation Recall')


plt.subplot(5, 1, 5)
plt.plot(f1, label='Training F1 Score', linewidth=3)
plt.plot(f1_val, label='Validation F1 Score', linewidth=3)
plt.legend(loc='lower right')
plt.ylabel('F1 Score')
#plt.ylim([min(plt.ylim()),1])
plt.title('Training and Validation F1 Score')
plt.xlabel('epoch')
plt.show()
fig.savefig("plt_a_b_a_b_b_c_d_c_d_d_11D.jpg", format='jpg', dpi=600)

In [None]:
test_preds = []
test_labels = []
columns = ['Dry AMD', 'Glaucoma', "Normal Fundus", "Wet AMD", "Mild DR", "Moderate DR", "Severe DR", "Proliferate DR", "Cataract", "Hypertensive Retinopathy", "Pathological Myopia"]
for images, labels in test_dataset.take(-1):
  array_labels = np.array(labels)
  array_labels = np.where(array_labels == 1)[1]
  for ii in range(array_labels.shape[0]):
    test_labels.append(array_labels[ii])
    test_preds.append(np.argmax(np.array(model.predict(tf.expand_dims(images[ii], 0)))))
test_labels = np.array(test_labels)
test_preds = np.array(test_preds)
cm = np.zeros((11, 11))
for i in range(433):
  cm[test_labels[i]][test_preds[i]] = cm[test_labels[i]][test_preds[i]] + 1
plt.figure(figsize=(10, 10))
cf_matrix = pd.DataFrame(cm, columns=columns, index=columns)
sns.heatmap(cf_matrix, annot=True, cmap="Blues")
plt.savefig("cm_a_b_a_b_b_c_d_c_d_d_11D.jpg", format='jpg', dpi=600)

In [None]:
!zip -r a_b_a_b_b_c_d_c_d_d_11D.zip "./tmp/models/a_b_a_b_b_c_d_c_d_d_11D"

In [None]:
from pathlib import Path
cr = sklearn.metrics.classification_report(test_labels, test_preds, target_names=columns, output_dict=True)
filepath = Path('cr_a_b_a_b_b_c_d_c_d_d_11D.csv')
pd.DataFrame(cr).transpose().to_csv(filepath)

# 6.Majority Voting

In [None]:
columns = ['Dry AMD', 'Glaucoma', "Normal Fundus", "Wet AMD", "Mild DR", "Moderate DR", "Severe DR", "Proliferate DR", "Cataract", "Hypertensive Retinopathy", "Pathological Myopia"]
cm = np.zeros((11, 11))
models = []
pathes = ["/content/drive/MyDrive/Colab Notebooks/densenet_c_d_11D/tmp/models/dense_c_d_11D", 
         "/content/drive/MyDrive/Colab Notebooks/resnet_c_d_11D/tmp/models/resnet_c_d_11D",
         "/content/drive/MyDrive/Colab Notebooks/inception_c_d_11D/tmp/models/inception_c_d_11D",
         "/content/drive/MyDrive/Colab Notebooks/vgg_c_d_11D/tmp/models/vgg_c_d_11D",
          "/content/drive/MyDrive/Colab Notebooks/xception/xception_c_d_11D"]

for path in pathes:
  models.append(tf.keras.models.load_model(path))
test_preds = []
test_labels = []
n = len(pathes)
for images, labels in test_dataset.take(-1):

  array_labels = np.array(labels)
  array_labels = np.where(array_labels == 1)[1]

  for i in range(array_labels.shape[0]):
    test_labels.append(array_labels[i])
    for model in models:
      test_preds.append(np.argmax(np.array(model.predict(tf.expand_dims(images[i], 0)))))
test_labels = np.array(test_labels)
test_preds = np.array(test_preds)

vote = np.zeros(433)
for j in range(433):
   count = np.bincount(test_preds[j*n:(j + 1)*n + 1])
   vote[j] = np.argmax(count)
vote = np.array(vote, dtype='int32')
for i in range(433):
  cm[test_labels[i]][vote[i]] = cm[test_labels[i]][vote[i]] + 1
plt.figure(figsize=(10, 10))
cf_matrix = pd.DataFrame(cm, columns=columns, index=columns)
sns.heatmap(cf_matrix, annot=True, cmap="Blues")
plt.savefig("cm_Ensemble.jpg", format='jpg', dpi=600)

In [None]:
from pathlib import Path
cr = sklearn.metrics.classification_report(test_labels, vote, target_names=columns, output_dict=True)
filepath = Path('cr_ensemble.csv')
pd.DataFrame(cr).transpose().to_csv(filepath)

In [None]:
b_test_preds = np.zeros((11, 433*5))
b_test_labels = np.zeros((11, 433))
b_votes = np.zeros((11, 433))

for j in range(11):
  b_votes[j, np.where(vote == j)] = 1

for j in range(11):
  b_test_preds[j, np.where(test_preds == j)] = 1
b_test_preds = b_test_preds.reshape(11, 433, 5)

for j in range(11):
  b_test_labels[j, np.where(test_labels == j)] = 1


In [None]:
models_name = ['DenseNet','ResNet', 'VGG', "Inception", "Conv"]
fig = plt.figure(figsize=(10, 55))
for j in range(11):
  for i in range(n):
    fpr, tpr, _ = sklearn.metrics.roc_curve(b_test_labels[j, :], b_test_preds[j, :, i])
    plt.plot(fpr, tpr, label=models_name[i])
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.plot([0,1], [0, 1], "--")
    plt.title(columns[j])
    plt.scatter(fpr, tpr)
    plt.subplot(12 , 1, j+1)
  fpr, tpr, _ = sklearn.metrics.roc_curve(b_test_labels[j, :], b_votes[j, :])
  plt.plot(fpr, tpr, label="Vote")
  plt.xlabel("False Positive Rate")
  plt.ylabel("True Positive Rate")
  plt.plot([0,1], [0, 1], "--")
  plt.title(columns[j])
  plt.scatter(fpr, tpr)
  plt.legend(loc='lower right')
  plt.subplot(12 , 1, 12)
plt.tight_layout()
plt.show()
fig.savefig("roc_a_b_a_b_b_c_d_c_d_d_11D.jpg", format='jpg', dpi=600)