In [1]:
pip install openpyxl

In [2]:
pip install tensorflow==2.4.2

In [5]:
pip install keras==2.4.2

In [6]:
import numpy as np
import pandas as pd
import os
import glob
import matplotlib
import seaborn as sns
import matplotlib.pyplot as plt
IMG_SIZE = 224
BATCH_SIZE = 32
NO_EPOCHS = 50
NUM_CLASSES = 2
DATA_FOLDER = "../input/ocular-disease-recognition-odir5k/ODIR-5K/ODIR-5K/Training Images"
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from tqdm import tqdm
import cv2 as cv
from random import shuffle 
from IPython.display import SVG
from keras.utils.vis_utils import model_to_dot
from tensorflow.keras.utils import plot_model
from tensorflow.keras.applications import ResNet50
from tensorflow.python.keras.models import Sequential
from tensorflow.python.keras.layers import Dense, Flatten, GlobalAveragePooling2D
import tensorflow_addons as tfa
import tensorflow as tf
%matplotlib inline 


## Load tabelar data (Excel)

In [8]:
data_df = pd.read_excel(open("../input/ocular-disease-recognition-odir5k/ODIR-5K/ODIR-5K/data.xlsx", 'rb'), sheet_name='Sheet1')  

In [9]:
data_df.columns = ["id", 'age', "sex", "left_fundus", "right_fundus", "left_diagnosys", "right_diagnosys", "normal",
                  "diabetes", "glaucoma", "cataract", "amd", "hypertension", "myopia", "other"]

# Train images

Let's check which images have the `hypertension` flag positive and which images have the eye diagnosys associated clearly stating either `hypertension` or `normal`


In [10]:
print(data_df.loc[(data_df.hypertension==1)].shape)
print(data_df.loc[data_df.hypertension==0].shape)

In [11]:
data_df.loc[(data_df.hypertension==1)]['left_diagnosys'].value_counts()

In [12]:
data_df.loc[(data_df.hypertension==1)]['right_diagnosys'].value_counts()

In [13]:
def has_hypertension_mentioned(text):
    if 'hypertension' in text:
        return 1
    else:
        return 0

In [14]:
data_df['le_hypertension'] = data_df['left_diagnosys'].apply(lambda x: has_hypertension_mentioned(x))
data_df['re_hypertension'] = data_df['right_diagnosys'].apply(lambda x: has_hypertension_mentioned(x))

In [15]:
hypertension_le_list = data_df.loc[(data_df.hypertension==1) & (data_df.le_hypertension==1)]['left_fundus'].values
hypertension_re_list = data_df.loc[(data_df.hypertension==1) & (data_df.re_hypertension==1)]['right_fundus'].values
print(len(hypertension_le_list), len(hypertension_re_list))
non_hypertension_le_list = data_df.loc[(data_df.hypertension==0) & (data_df.left_diagnosys=="normal fundus")]['left_fundus'].sample(150, random_state=314).values
non_hypertension_re_list = data_df.loc[(data_df.hypertension==0) & (data_df.right_diagnosys=="normal fundus")]['right_fundus'].sample(150, random_state=314).values
print(len(non_hypertension_le_list), len(non_hypertension_re_list))

In [16]:
hypertension_list = np.concatenate((hypertension_le_list, hypertension_re_list), axis = 0)
non_hypertension_list = np.concatenate((non_hypertension_le_list, non_hypertension_re_list), axis = 0)
print(len(non_hypertension_list), len(hypertension_list))

In [17]:
print(len(os.listdir(DATA_FOLDER)))

In [18]:
def label_image(label):
    if label == 1:
        return [1,0]
    elif label == 0: 
        return [0,1]

def process_data(data_image_list, DATA_FOLDER, is_hypertension):
    data_df = []
    for img in tqdm(data_image_list):
        path = os.path.join(DATA_FOLDER,img)
        label = label_image(is_hypertension)
        img = cv.imread(path,cv.IMREAD_COLOR)
        img = cv.resize(img, (IMG_SIZE,IMG_SIZE))
        data_df.append([np.array(img),np.array(label)])
    shuffle(data_df)
    return data_df

In [19]:
cat_df = process_data(hypertension_list, DATA_FOLDER, 1)

In [20]:
cat_no_df = process_data(non_hypertension_list, DATA_FOLDER, 0)

In [21]:
def show_images(data, isTest=False):
    f, ax = plt.subplots(5,5, figsize=(15,15))
    for i,data in enumerate(data[:25]):
        img_num = data[1]
        img_data = data[0]
        label = np.argmax(img_num)
        if label  == 0: 
            str_label='hypertension'
        elif label == 1: 
            str_label='No hypertension'
        if(isTest):
            str_label="None"
        ax[i//5, i%5].imshow(img_data)
        ax[i//5, i%5].axis('off')
        ax[i//5, i%5].set_title("Label: {}".format(str_label))
    plt.show()

show_images(cat_df)

In [22]:
show_images(cat_no_df)

In [23]:
train = cat_df + cat_no_df
shuffle(train)
show_images(train)

In [24]:
X = np.array([i[0] for i in train]).reshape(-1,IMG_SIZE,IMG_SIZE,3)
y = np.array([i[1] for i in train])

# Train model

In [25]:
model = Sequential()
model.add(ResNet50(include_top=False, pooling='max', weights='imagenet'))
model.add(Dense(NUM_CLASSES, activation='softmax'))
# ResNet-50 model is already trained, should not be trained
model.layers[0].trainable = True

In [26]:
opt = tfa.optimizers.LazyAdam()
loss = tf.keras.losses.CategoricalCrossentropy(label_smoothing=0.025)
model.compile(optimizer='sgd', loss=loss, metrics=['accuracy'])

In [27]:
model.build((None,224,224,3)) # `input_shape` is the shape of the input data
model.summary()

In [28]:
plot_model(model, to_file='model.png')
SVG(model_to_dot(model).create(prog='dot', format='svg'))

In [29]:
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42,stratify=y)

## Train model

In [30]:
train_model = model.fit(X_train, y_train,
                  batch_size=BATCH_SIZE,
                  epochs=NO_EPOCHS,
                  verbose=1,
                  validation_data=(X_val, y_val))

## Validation results

In [31]:
def plot_accuracy_and_loss(train_model):
    hist = train_model.history
    acc = hist['accuracy']
    val_acc = hist['val_accuracy']
    loss = hist['loss']
    val_loss = hist['val_loss']
    epochs = range(len(acc))
    f, ax = plt.subplots(1,2, figsize=(14,6))
    ax[0].plot(epochs, acc, 'g', label='Training accuracy')
    ax[0].plot(epochs, val_acc, 'r', label='Validation accuracy')
    ax[0].set_title('Training and validation accuracy')
    ax[0].legend()
    ax[1].plot(epochs, loss, 'g', label='Training loss')
    ax[1].plot(epochs, val_loss, 'r', label='Validation loss')
    ax[1].set_title('Training and validation loss')
    ax[1].legend()
    plt.show()
plot_accuracy_and_loss(train_model)

In [32]:
score = model.evaluate(X_val, y_val, verbose=0)
print('Validation loss:', score[0])
print('Validation accuracy:', score[1])

In [33]:
#get the predictions for the test data
predicted_classes = model.predict_classes(X_val)
#get the indices to be plotted
y_true = np.argmax(y_val,axis=1)

In [34]:
correct = np.nonzero(predicted_classes==y_true)[0]
incorrect = np.nonzero(predicted_classes!=y_true)[0]

In [35]:
target_names = ["hypertension", "Normal"]
print(classification_report(y_true, predicted_classes, target_names=target_names))

In [37]:
model.save("resnet hypertension.h5")