<a href="https://colab.research.google.com/github/laraselinseyahi/Diabetic-Retinopathy-Classification-using-Deep-Learning/blob/main/CS230_MultiClassClassifier_LaraSelinSeyahi.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Multiclass Classification with ResNet-50 for Diabetic Retinopathy

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
%cd drive/MyDrive/


In [None]:
import tensorflow as tf
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.callbacks import TensorBoard
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras import layers, models
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.applications import InceptionV3
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Dropout
from tensorflow.keras.optimizers import Adam
import numpy as np
import os
from sklearn.model_selection import train_test_split

In [None]:
data_dir = 'diabetic_retinopathy'

# Class definitions
classes = ["healthy", "mild_dr", "moderate_dr", "proliferate_dr", "severe_dr"]
class_to_idx = {cls: idx for idx, cls in enumerate(classes)}
# creates the following dict {"healthy": 0, "mild_dr": 1, "moderate_dr": 2, "proliferate_dr": 3, "severe_dr": 4}



In [None]:
# Collect file paths and labels
file_paths = []
labels = []

# TensorFlow works efficiently with datasets when file paths and labels are paired,
# as it can process the data in batches and apply preprocessing.
# This ensures that each image is correctly paired with its label, which is crucial for supervised learning.
# This part collects all the image file paths and their corresponding labels.
for cls in classes:
    class_dir = os.path.join(data_dir, cls)
    for file in os.listdir(class_dir):
        file_paths.append(os.path.join(class_dir, file))
        labels.append(class_to_idx[cls])

# Split the dataset into train, validation, and test sets (70% train, remaining eval&test)
train_paths, temp_paths, train_labels, temp_labels = train_test_split(
    file_paths, labels, test_size=0.3, stratify=labels, random_state=42
)
# 20% eval, 10% test
val_paths, test_paths, val_labels, test_labels = train_test_split(
    temp_paths, temp_labels, test_size=0.33, stratify=temp_labels, random_state=42
)

# Sanity check
print(f"Train: {len(train_paths)}, Validation: {len(val_paths)}, Test: {len(test_paths)}")

# Define image size and batch size
IMG_SIZE = (224, 224) # ResNet-50 input size
BATCH_SIZE = 32

# Function to preprocess images
def preprocess_image(file_path, label):
    # Load the image
    image = tf.io.read_file(file_path)
    image = tf.image.decode_jpeg(image, channels=3)
    image = tf.image.resize(image, IMG_SIZE)
    # Perform data augmentation for labels not in [0, 2]
    if label == 1 or label == 3 or label == 4:
        image = tf.image.random_flip_left_right(image)
        image = tf.image.random_flip_up_down(image)
        image = tf.image.random_brightness(image, max_delta=0.2)
        image = tf.image.random_contrast(image, lower=0.8, upper=1.2)
        image = tf.image.random_crop(image, size=(int(IMG_SIZE[0] * 0.9), int(IMG_SIZE[1] * 0.9), 3))
        image = tf.image.resize(image, IMG_SIZE)  # Resize back to original size after cropping
    image = tf.keras.applications.resnet50.preprocess_input(image) #ResNet-50's requirement of [-1, 1]
    return image, label

# Create a TensorFlow dataset
def create_dataset(file_paths, labels, shuffle=True):
    # pair file paths and labels
    dataset = tf.data.Dataset.from_tensor_slices((file_paths, labels))
    # map function applies preprocess_image function to each image
    dataset = dataset.map(preprocess_image, num_parallel_calls=tf.data.AUTOTUNE)
    if shuffle:
        dataset = dataset.shuffle(buffer_size=len(file_paths))
    dataset = dataset.batch(BATCH_SIZE).prefetch(buffer_size=tf.data.AUTOTUNE)
    return dataset

# Create train, validation, and test datasets
train_dataset = create_dataset(train_paths, train_labels)
val_dataset = create_dataset(val_paths, val_labels, shuffle=False)
test_dataset = create_dataset(test_paths, test_labels, shuffle=False)



In [None]:
# RESNET Experimenting

# RESNET

base_model = ResNet50(
    weights='imagenet',
    include_top=False,
    input_shape=(224, 224, 3)
)

for layer in base_model.layers:
  layer.trainable = False

# Add a new top layer for our 5-class classification
model = tf.keras.Sequential([
    base_model,
    tf.keras.layers.GlobalAveragePooling2D(), # pooling to reduce output size
    tf.keras.layers.Dense(5, activation='softmax')  # 5 classes

    # adding more layer increases the ability of your model, but the model is already good
])

base_learning_rate = 0.001
# since this is binary classification, loss is BCE
model.compile(optimizer=Adam(learning_rate=base_learning_rate),
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

# Print model summary
model.summary()

# Train the model
history = model.fit(
    train_dataset,
    validation_data=val_dataset,
    epochs=10,  # Adjust based on your needs
    callbacks=[
        tf.keras.callbacks.EarlyStopping(patience=3, restore_best_weights=True),
        # Early stopping for stopping training when validation performance stops improving
        tf.keras.callbacks.ModelCheckpoint('resnet50_best_model.keras', save_best_only=True)
        # saves best performing model during training
    ]
)

# Evaluate the model on the test set
test_loss, test_acc = model.evaluate(test_dataset)
print(f"Test Accuracy: {test_acc:.2f}")


In [None]:
# RESNET Experimenting

# RESNET

base_model = ResNet50(
    weights='imagenet',
    include_top=False,
    input_shape=(224, 224, 3)
)

for layer in base_model.layers[-10:]: # unfreezing last 10 layers
  layer.trainable = True

# Add a new top layer for our 5-class classification
model = tf.keras.Sequential([
    base_model,
    tf.keras.layers.GlobalAveragePooling2D(), # pooling to reduce output size
    tf.keras.layers.Dense(5, activation='softmax')  # 5 classes

    # adding more layer increases the ability of your model, but the model is already good
])

base_learning_rate = 0.001
# since this is binary classification, loss is BCE
model.compile(optimizer=Adam(learning_rate=base_learning_rate),
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

# Print model summary
model.summary()

# Train the model
history = model.fit(
    train_dataset,
    validation_data=val_dataset,
    epochs=10,  # Adjust based on your needs
    callbacks=[
        tf.keras.callbacks.EarlyStopping(patience=3, restore_best_weights=True),
        tf.keras.callbacks.ModelCheckpoint('resnet50_best_model.keras', save_best_only=True)
    ]
)

# Evaluate the model on the test set
test_loss, test_acc = model.evaluate(test_dataset)
print(f"Test Accuracy: {test_acc:.2f}")




In [None]:
# RESNET

base_model = ResNet50(
    weights='imagenet',
    include_top=False,
    input_shape=(224, 224, 3)
)

for layer in resnet_model.layers:
  layer.trainable = False

# Add a new top layer for our 5-class classification
model = tf.keras.Sequential([
    base_model,
    tf.keras.layers.GlobalAveragePooling2D(), # pooling to reduce output size
    tf.keras.layers.Dense(256, activation='relu'),
    tf.keras.layers.Dropout(0.5), # regularization
    tf.keras.layers.Dense(5, activation='softmax')  # 5 classes
])

base_learning_rate = 0.001
# since this is binary classification, loss is BCE
model.compile(optimizer=Adam(learning_rate=base_learning_rate),
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

# Print model summary
model.summary()

# Train the model
history = model.fit(
    train_dataset,
    validation_data=val_dataset,
    epochs=10,  # Adjust based on your needs
    callbacks=[
        tf.keras.callbacks.EarlyStopping(patience=3, restore_best_weights=True),
        # Early stopping for stopping training when validation performance stops improving
        tf.keras.callbacks.ModelCheckpoint('resnet50_best_model.keras', save_best_only=True)
        # saves best performing model during training
    ]
)

# Evaluate the model on the test set
test_loss, test_acc = model.evaluate(test_dataset)
print(f"Test Accuracy: {test_acc:.2f}")