In [None]:
from tensorflow import lite
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
import pandas as pd
import random, os
import shutil
import matplotlib.pyplot as plt
from matplotlib.image import imread
from keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.metrics import categorical_accuracy
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.python.keras.layers import Dense, Flatten,Dropout
import PIL
from tensorflow.keras.optimizers import Adam

In [None]:
from google.colab import files
uploaded = files.upload()

In [None]:
# Add an additional column, mapping to the type
df = pd.read_csv('a. IDRiD_Disease Grading_Training Labels.csv')
df.head()

In [None]:
df.drop(df.iloc[:,2:12],axis=1,inplace=True)

In [None]:
df.head()

In [None]:

diagnosis_dict_binary = {
    0: 'No_DR',
    1: 'DR',
    2: 'DR',
    3: 'DR',
    4: 'DR'
}

diagnosis_dict = {
    0: 'No_DR',
    1: 'Mild',
    2: 'Moderate',
    3: 'Severe',
    4: 'Proliferate_DR',
}


df['binary_type'] =  df['Retinopathy grade'].map(diagnosis_dict_binary.get)
df['type'] = df['Retinopathy grade'].map(diagnosis_dict.get)
df.head()

In [None]:
df.isna().sum()

In [None]:
df.describe()

In [None]:
mld=len(df[df['type']=='Mild'])
sevr=len(df[df['type']=='Severe'])
pdr=len(df[df['type']=='Proliferate_DR'])
mdr=len(df[df['type']=='Moderate'])
ndr=len(df[df['type']=='No_DR'])
print("Retinopathy Grade:-   \t\tCount")
print("  -- Mild:- \t\t\t",mld)
print("  -- Severe:- \t\t\t",sevr)
print("  -- Proliferate_DR:- \t\t",pdr)
print("  -- Moderate:- \t\t",mdr)
print("  -- Non Glaucomatic(No_DR):- \t",ndr)

Exploratory Data Analysis

In [None]:
colors = ( "orange", "cyan",
          "grey", "indigo", "beige")

df['type'].value_counts().plot(kind='pie',
                               autopct='%1.1f%%',
                               explode = (0.1, 0.0, 0.2, 0.3, 0.2),
                               shadow = True,
                               colors = colors,
                               startangle = 90)

In [None]:
dr=mld+sevr+pdr+mdr
nodr=ndr
print("DR vs NO_DR:-   \t\tCount")
print("  -- DR:- \t\t\t",dr)
print("  -- NO_DR:- \t\t\t",nodr)

In [None]:
df['binary_type'].value_counts().plot(kind='barh',color=('orange','grey'))
val=[]
val.append(dr)
val.append(nodr)
for index,value in enumerate(val):
    plt.text(value, index,str(value))
plt.show()

In [None]:
train_intermediate, val = train_test_split(df, test_size = 0.25, stratify = df['type'])
train, test = train_test_split(train_intermediate, test_size = 0.25 / (1 - 0.25), stratify = train_intermediate['type'])

print(train['type'].value_counts(), '\n')
print(test['type'].value_counts(), '\n')
print(val['type'].value_counts(), '\n')
# print(train.head())

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
cd /content/drive/My Drive/gaussian_filtered_images/

In [None]:
# !mkdir No_DR
# !mkdir Mild
# !mkdir Moderate
# !mkdir Severe
# !mkdir Proliferate_DR

In [None]:
!pwd

In [None]:
import os
print(os.getcwd())

In [None]:
base_dir = os.getcwd()
print(base_dir)

In [None]:
# Create working directories for train/val/test

train_dir = os.path.join(base_dir, 'train')
val_dir = os.path.join(base_dir, 'val')
test_dir = os.path.join(base_dir, 'test')


if os.path.exists(train_dir):
    shutil.rmtree(train_dir)
os.makedirs(train_dir)

if os.path.exists(val_dir):
    shutil.rmtree(val_dir)
os.makedirs(val_dir)

if os.path.exists(test_dir):
    shutil.rmtree(test_dir)
os.makedirs(test_dir)

In [None]:

from google.colab import drive
drive.mount('/content/drive')

In [None]:
print(os.getcwd())

In [None]:
df

In [None]:
train.head()

In [None]:
src_dir = r'./'
for index, row in train.iterrows():
    diagnosis = row['type']
    binary_diagnosis = row['binary_type']
    id_code = row['Image name'] + ".jpg"
    srcfile = os.path.join(src_dir, diagnosis, id_code)
    dstfile = os.path.join(train_dir, binary_diagnosis)
    os.makedirs(dstfile, exist_ok = True)
    shutil.copy(srcfile, dstfile)

for index, row in val.iterrows():
    diagnosis = row['type']
    binary_diagnosis = row['binary_type']
    id_code = row['Image name'] + ".jpg"
    srcfile = os.path.join(src_dir, diagnosis, id_code)
    dstfile = os.path.join(val_dir, binary_diagnosis)
    os.makedirs(dstfile, exist_ok = True)
    shutil.copy(srcfile, dstfile)

for index, row in test.iterrows():
    diagnosis = row['type']
    binary_diagnosis = row['binary_type']
    id_code = row['Image name'] + ".jpg"
    srcfile = os.path.join(src_dir, diagnosis, id_code)
    dstfile = os.path.join(test_dir, binary_diagnosis)
    os.makedirs(dstfile, exist_ok = True)
    shutil.copy(srcfile, dstfile)

In [None]:
train_path = 'train'
val_path = 'val'
test_path = 'test'

train_batches = ImageDataGenerator(rescale = 1./255).flow_from_directory(train_path, target_size=(224,224),batch_size=3, shuffle = True)
val_batches = ImageDataGenerator(rescale = 1./255).flow_from_directory(val_path, target_size=(224,224),batch_size=2, shuffle = True)
test_batches = ImageDataGenerator(rescale = 1./255).flow_from_directory(test_path, target_size=(224,224), shuffle = False)

In [None]:
# This function will plot images in the form of a grid with 1 row and 5 columns where images are placed in each column.
def plotImages(images_arr):
    fig, axes = plt.subplots(1, 3, figsize=(15,3))
    axes = axes.flatten()
    for img, ax in zip( images_arr, axes):
        ax.imshow(img)
        ax.axis('off')
    plt.tight_layout()
    plt.show()
plotImages(sample_training_images[:3])

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.python.keras.layers import Dense, Flatten,Dropout
import PIL
from tensorflow.keras.optimizers import Adam

In [None]:
#CNN

model = tf.keras.Sequential([
    layers.Conv2D(64, (4,4), padding="valid", input_shape=(224,224,3), activation = 'relu'),
    layers.MaxPooling2D(pool_size=(2,2)),
    layers.BatchNormalization(),

    layers.Conv2D(32, (3,3), padding="valid", activation = 'relu'),
    layers.MaxPooling2D(pool_size=(2,2)),
    layers.BatchNormalization(),

    layers.Conv2D(16, (3,3), padding="valid", activation = 'relu'),
    layers.MaxPooling2D(pool_size=(2,2)),
    layers.BatchNormalization(),

    layers.Flatten(),
    layers.Dense(32, activation = 'relu'),
    layers.Dropout(0.3),
    layers.Dense(2, activation = 'softmax')
])

model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate = 1e-5),
              loss=tf.keras.losses.BinaryCrossentropy(),
              metrics=['acc'])

history = model.fit(train_batches,
                    epochs=50,
                    validation_data=val_batches)

In [None]:
model.summary()

In [None]:
print("Plotting accuracy versus epoch")
plt.plot(history.history['acc'], label='accuracy')
plt.plot(history.history['val_acc'], label = 'val_accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.ylim([0.5, 1])
plt.legend(loc='lower right')
print("The model is being evaluated")
test_loss, test_acc = model.evaluate(test_batches, verbose=2)
print("The accuracy of the model is:")
print(test_acc)

# RESNET-**50**

> Indented block

---





In [None]:
#ResNet50

# resnet_model = Sequential()

# pretrained_model = tf.keras.applications.ResNet50( include_top=False,
#     weights='imagenet',
#     # input_tensor=None,
#     input_shape=(224,224,3),
#     pooling='max',
#     classes=2
# )

# for layer in pretrained_model.layers[-15:]:
#   layer.trainable=False


# resnet_model.add(pretrained_model)
# resnet_model.add(Dropout(0.3))
# resnet_model.add(Flatten())
# resnet_model.add(Dense(64,activation='relu'))
# resnet_model.add(Dense(2,activation='softmax'))


In [None]:
# resnet_model.summary()

In [None]:
# resnet_model.compile(optimizer=Adam(learning_rate=0.0001), loss='binary_crossentropy', metrics=['accuracy'])

In [None]:
# history = resnet_model.fit(train_batches,
#                     epochs=50,
#                     validation_data=val_batches
#                     )

In [None]:
# print("Plotting accuracy versus epoch")
# plt.plot(history.history['accuracy'], label='accuracy')
# plt.plot(history.history['val_accuracy'], label = 'val_accuracy')
# plt.xlabel('Epoch')
# plt.ylabel('Accuracy')
# plt.ylim([0.5, 1])
# plt.legend(loc='lower right')
# print("The model is being evaluated")
# test_loss, test_acc = resnet_model.evaluate(val_batches, verbose=2)
# print("The accuracy of the model is:")
# print(test_acc)

# **VGG16**

In [None]:
# train_path = 'train'
# val_path = 'val'
# test_path = 'test'

# train_batches = ImageDataGenerator(rescale = 1./255).flow_from_directory(train_path, target_size=(224,224),batch_size=2, shuffle = True)
# val_batches = ImageDataGenerator(rescale = 1./255).flow_from_directory(val_path, target_size=(224,224),batch_size=2, shuffle = True)
# test_batches = ImageDataGenerator(rescale = 1./255).flow_from_directory(test_path, target_size=(224,224), shuffle = False)

In [None]:
#  VGG16

# from tensorflow.python.ops.gen_math_ops import Max
# from tensorflow.keras.applications.vgg16 import VGG16
# from keras.layers import Conv2D, MaxPooling2D


# model = Sequential()

# VGG_model=tf.keras.applications.VGG16(
#     include_top=False,
#     weights="imagenet",
#     input_shape=(224,224,3),
#     pooling='max',
# )

# for layer in VGG_model.layers[:-15]:
#    layer.trainable=False

# model.add(VGG_model)
# model.add(Dropout(0.4))
# model.add(Dense(64,activation='relu'))
# model.add(Dense(32,activation='relu'))
# model.add(Dense(2,activation='softmax'))
# model.summary()

In [None]:
# model.compile(optimizer=Adam(learning_rate=0.00001), loss='binary_crossentropy', metrics=['accuracy'])

In [None]:
# history = model.fit(train_batches,
#                     epochs=30,
#                     validation_data=val_batches
#                     )

In [None]:
# print("Plotting accuracy versus epoch")
# plt.plot(history.history['accuracy'], label='accuracy')
# plt.plot(history.history['val_accuracy'], label = 'val_accuracy')
# plt.xlabel('Epoch')
# plt.ylabel('Accuracy')
# plt.ylim([0, 2])
# plt.legend(loc='lower right')
# print("The model is being evaluated")
# test_loss, test_acc = model.evaluate(test_batches, verbose=2)
# print("The accuracy of the model is:")
# print(test_acc)

# CNN+VGG16 (Hybrid Model)

In [None]:
# train_path = 'train'
# val_path = 'val'
# test_path = 'test'

# train_batches = ImageDataGenerator(rescale = 1./255).flow_from_directory(train_path, target_size=(580, 580),batch_size=3, shuffle = True)
# val_batches = ImageDataGenerator(rescale = 1./255).flow_from_directory(val_path, target_size=(580, 580),batch_size=2, shuffle = True)
# test_batches = ImageDataGenerator(rescale = 1./255).flow_from_directory(test_path, target_size=(580, 580), shuffle = False)

In [None]:
# #CNN
# #VGG16
# from tensorflow.python.ops.gen_math_ops import Max
# from tensorflow.keras.applications.vgg16 import VGG16
# from keras.layers import Conv2D, MaxPooling2D


# model = Sequential()
# model.add(Conv2D(32, (4, 4), input_shape = (580, 580,3), activation = 'relu'))
# model.add(MaxPooling2D(pool_size = (2, 2)))
# model.add(Conv2D(16, (3, 3), activation = 'relu'))
# model.add(MaxPooling2D(pool_size = (2, 2)))
# model.add(Conv2D(3, (3, 3), activation = 'relu'))
# model.add(MaxPooling2D(pool_size = (2, 2)))
# VGG_model=tf.keras.applications.VGG16(
#     include_top=False,
#     weights="imagenet",
#     input_shape=( 70, 70, 3),
#     pooling='max',
# )

# for layer in VGG_model.layers[:-15]:
#    layer.trainable=False

# model.add(VGG_model)
# model.add(Dropout(0.4))
# model.add(Flatten())
# model.add(Dense(64,activation='relu'))
# model.add(Dense(32,activation='relu'))
# model.add(Dense(2,activation='softmax'))
# model.summary()



In [None]:
# model.compile(optimizer=Adam(learning_rate=0.00001), loss='binary_crossentropy', metrics=['accuracy'])

In [None]:
# history = model.fit(train_batches,
#                     # epochs=30,
#                     validation_data=val_batches
#                     )

In [None]:
# model.evaluate()

In [None]:
# print("Plotting accuracy versus epoch")
# plt.plot(history.history['accuracy'], label='accuracy')
# plt.plot(history.history['val_accuracy'], label = 'val_accuracy')
# plt.xlabel('Epoch')
# plt.ylabel('Accuracy')
# plt.ylim([0, 2])
# plt.legend(loc='lower right')
# print("The model is being evaluated")
# test_loss, test_acc = model.evaluate(test_batches, verbose=2)
# print("The accuracy of the model is:")
# print(test_acc)

In [None]:
# from tensorflow.python.ops.gen_math_ops import Max
# from keras.layers import Conv2D, MaxPooling2D


# CNN+ResNet50 (Hybrid **Model**)

In [None]:
#CNN

# model.add(Conv2D(64, (4, 4), input_shape = (559,559,3), activation = 'relu'))
# model.add(MaxPooling2D(pool_size = (2, 2)))
# model.add(Conv2D(32, (4, 4), input_shape = (274, 274,3), activation = 'relu'))
# model.add(MaxPooling2D(pool_size = (2, 2)))
# model.add(Conv2D(16, (3, 3), activation = 'relu'))
# model.add(MaxPooling2D(pool_size = (2, 2)))
# model.add(Conv2D(3, (3, 3), activation = 'relu'))
# model.add(MaxPooling2D(pool_size = (2, 2)))

#VGG16


# model = Sequential()


# resnet_model = Sequential()
# pretrained_model= tf.keras.applications.ResNet50(include_top=False,
#                    input_shape=(224, 224, 3),
#                    pooling='max',classes=2,
#                    weights="imagenet")
# for layer in pretrained_model.layers[-15:]:
#         layer.trainable=False

# resnet_model.add(pretrained_model)

# # resnet_model.summary()


# # Merge
# model.add(resnet_model)
# model.add(Dropout(0.4))
# model.add(Flatten())
# model.add(Dense(64,activation='relu'))
# model.add(Dense(32,activation='relu'))
# model.add(Dense(2,activation='softmax'))
# model.summary()


In [None]:
# train_path = 'train'
# val_path = 'val'
# test_path = 'test'

# train_batches = ImageDataGenerator(rescale = 1./255).flow_from_directory(train_path, target_size=(224,224),batch_size=3, shuffle = True)
# val_batches = ImageDataGenerator(rescale = 1./255).flow_from_directory(val_path, target_size=(224,224),batch_size=2, shuffle = True)
# test_batches = ImageDataGenerator(rescale = 1./255).flow_from_directory(test_path, target_size=(224,224), shuffle = False)

In [None]:
# model = Sequential()
# model.add(Conv2D(16, (4, 4), input_shape = (224,224,3), activation = 'relu'))
# model.add(MaxPooling2D(pool_size = (2, 2)))
# model.add(Conv2D(3, (4, 4), activation = 'relu'))
# model.add(MaxPooling2D(pool_size = (2, 2)))
# vg_model=Sequential()
# VGG_model=tf.keras.applications.VGG16(
#     include_top=False,
#     weights="imagenet",
#     input_shape=( 53, 53, 3),
#     pooling='max',
# )
# for layer in VGG_model.layers[:-15]:
#    layer.trainable=False
# from tensorflow.keras import Model
# vg_model.add(VGG_model)
# vg_model.add(Dropout(0.4))
# vg_model.add(Dense(units = 512, activation = 'relu'))
# vg_model.add(Dense(units = 128, activation = 'relu'))
# vg_model.add(Dense(units = 2,activation='softmax'))
# out2 = vg_model(model.outputs)
# m3 = Model(model.inputs,out2)
# m3.summary()

# #cnn+vgg

In [None]:
# m3.compile(optimizer=Adam(learning_rate=0.00001), loss='binary_crossentropy', metrics=['accuracy'])

In [None]:
# history = m3.fit(train_batches,
#                     epochs=30,
#                     validation_data=val_batches
#                     )

In [None]:
# print("Plotting accuracy versus epoch")
# plt.plot(history.history['accuracy'], label='accuracy')
# plt.plot(history.history['val_accuracy'], label = 'val_accuracy')
# plt.xlabel('Epoch')
# plt.ylabel('Accuracy')
# plt.ylim([0, 2])
# plt.legend(loc='lower right')
# print("The model is being evaluated")
# test_loss, test_acc = m3.evaluate(test_batches, verbose=2)
# print("The accuracy of the model is:")
# print(test_acc)

In [None]:
# train_path = 'train'
# val_path = 'val'
# test_path = 'test'

# train_batches = ImageDataGenerator(rescale = 1./255).flow_from_directory(train_path, target_size=(224,224),batch_size=3, shuffle = True)
# val_batches = ImageDataGenerator(rescale = 1./255).flow_from_directory(val_path, target_size=(224,224),batch_size=2, shuffle = True)
# test_batches = ImageDataGenerator(rescale = 1./255).flow_from_directory(test_path, target_size=(224,224), shuffle = False)

In [None]:
# from keras.layers import TimeDistributed

In [None]:
# model = Sequential()
# model.add(Conv2D(16, (4, 4), input_shape = (224,224,3), activation = 'relu'))
# model.add(MaxPooling2D(pool_size = (2, 2)))
# model.add(Conv2D(3, (4, 4), activation = 'relu'))
# model.add(MaxPooling2D(pool_size = (2, 2)))
# model.add(Flatten())
# # model.outputs[0].shape
# lstm=Sequential()
# lstm.add(TimeDistributed(model,))
# lstm.add(LSTM(1024))
# lstm.add(Dense(128))
# lstm.add(Dense(2, activation='softmax'))

# lstm.build(224,224,1)
# lstm.summary()

In [None]:
# X=model.outputs[0].shape
# X.reshape()

## RESNET-**VGG**

In [None]:
# train_path = 'train'
# val_path = 'val'
# test_path = 'test'

# train_batches = ImageDataGenerator(rescale = 1./255).flow_from_directory(train_path, target_size=(224,224),batch_size=3, shuffle = True)
# val_batches = ImageDataGenerator(rescale = 1./255).flow_from_directory(val_path, target_size=(224,224),batch_size=2, shuffle = True)
# test_batches = ImageDataGenerator(rescale = 1./255).flow_from_directory(test_path, target_size=(224,224), shuffle = False)

In [None]:

# vg_model=Sequential()
# VGG_model=tf.keras.applications.VGG16(
#     include_top=False,
#     weights="imagenet",
#     input_shape=( 900,900, 3),
#     pooling='max',
# )
# for layer in VGG_model.layers[:-15]:
#    layer.trainable=False
# from tensorflow.keras import Model
# vg_model.add(VGG_model)
# # m3 = Model(model.inputs,out2)
# # m3.summary()
# vg_model.summary()

In [None]:

# # resnet_model = Sequential()
# pretrained_model= tf.keras.applications.ResNet50(include_top=False,
#                    input_shape=(900, 900, 3),
#                    pooling='max',classes=2,
#                    weights="imagenet")
# for layer in pretrained_model.layers[-10:]:
#         layer.trainable=False

# # resnet_model.add(pretrained_model)
# pretrained_model.summary()

In [None]:
# inputs_2 = keras.Input(shape=(224, 224, 3), name="img")
# vgg = tf.keras.applications.VGG16(input_tensor=inputs_2, weights='imagenet', include_top=False) #VGG16(input_shape=IMAGE_SIZE + [3], weights='imagenet', include_top=False)
# for layer in vgg.layers:
#     layer.trainable = False
# #x = Flatten()(vgg.output)

# resnet = tf.keras.applications.ResNet50(input_tensor=inputs_2, weights='imagenet', include_top=False) #ResNet50(input_shape=IMAGE_SIZE + [3], weights='imagenet', include_top=False)
# for layer in resnet.layers:
#     layer.trainable = False
# #y = Flatten()(resnet.output)

# #concatenated_output= layers.add([x, y])
# mergedOutput =  tf.keras.layers.Concatenate()([vgg.output, resnet.output])
# x = layers.Dense(256, activation="relu")(mergedOutput)
# prediction = Dense(3, activation='softmax')(x)
# model = Model(inputs=inputs_2, outputs=prediction)


# model.compile(loss="categorical_crossentropy",optimizer='adam',metrics=['accuracy'])
# # model.summary()

In [None]:
# history = model.fit(train_batches,
#                     epochs=30,
#                     validation_data=val_batches
#                     )