In [None]:
# Import modules
import cv2
import pickle
import os
from tqdm import tqdm
import numpy as np
import csv
import random
from sklearn.model_selection import train_test_split
from google.colab import drive

# Import Keras modules and its important APIs
import keras
from keras.layers import Dense, Conv2D, BatchNormalization, Activation
from keras.layers import AveragePooling2D, Input, Flatten
from keras.optimizers import Adam
from keras.callbacks import ModelCheckpoint, LearningRateScheduler
from keras.callbacks import ReduceLROnPlateau
from keras.preprocessing.image import ImageDataGenerator
from keras.regularizers import l2
from keras import backend as K
from keras.models import Model
from keras.datasets import cifar10

In [None]:
# mount google drive
drive.mount("/content/gdrive", force_remount=True)

LOAD NON AUGMENTED DATA

In [None]:
# Read img_name and label from csv file then upload image with img_name
line_count = 0
total_images = 0
path = '/content/path_to_images'
X = []
y = []

with open('/content/filenames_and_labels.csv', newline='') as csvfile:
  csvfile = csv.reader(csvfile, delimiter=',', quotechar='|')
  for row in csvfile:
    if line_count == 0:
      line_count += 1
    else:
      img_name = row[1]
      if os.path.isfile(os.path.join(path, img_name)):
        img = cv2.imread(os.path.join(path, img_name))
        X.append(img)
        y.append(row[2])
        total_images += 1
      else:
        pass
      line_count += 1
    if line_count % 100 == 0:
      print(line_count)
  print(f'Processed {line_count} lines from CSV file.')
  print(f'Imported {total_images} images into input X.')

=> Implement data augmentation techniques of your choosing based on the nature of the images and the number of extra images required.

IMPORT AUGMENTED DATA

In [None]:
X = pickle.load( open( "/content/augmented_X.p", "rb" ) )
y = pickle.load( open( "/content/augmented_y.p", "rb" ) )

In [None]:
# Train - test split
x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=25)

# Convert lists to np arrays
x_train = np.array(x_train)
x_test = np.array(x_test)

# Convert y_train and y_test data from string to int ('0.0' to 0)
y_train = np.array(y_train, dtype="float")
y_train = np.array(y_train, dtype="int")
y_test = np.array(y_test, dtype="float")
y_test = np.array(y_test, dtype="int")

HYPER-PARAMETERS

In [None]:
batch_size = 32
epochs = 85
num_classes = 20

# Data Preprocessing
subtract_pixel_mean = True
n = 3

# Select ResNet Version
version = 2

# Computed depth of ResNet model
if version == 1:
    depth = n * 6 + 2
elif version == 2:
    depth = n * 9 + 2

# Model name, depth and version
model_type = 'ResNet % dv % d' % (depth, version)

# Input image dimensions.
input_shape = x_train.shape[1:]

# Normalize data.
x_train = x_train.astype('float32') / 255
x_test = x_test.astype('float32') / 255

# If subtract pixel mean is enabled
if subtract_pixel_mean:
    x_train_mean = np.mean(x_train, axis = 0)
    x_train -= x_train_mean
    x_test -= x_train_mean

# Print Training and Test Samples
print('x_train shape:', x_train.shape)
print(x_train.shape[0], 'train samples')
print(x_test.shape[0], 'test samples')
print('y_train shape:', y_train.shape)

# Convert class vectors to binary class matrices.
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)

In [None]:
# Setting learning rate for different number of Epochs
def lr_schedule(epoch):
    lr = 1e-3
    if epoch > 180:
        lr *= 0.5e-3
    elif epoch > 160:
        lr *= 1e-3
    elif epoch > 120:
        lr *= 1e-2
    elif epoch > 80:
        lr *= 1e-1
    print('Learning rate: ', lr)
    return lr

In [None]:
# Basic ResNet Building Block
def resnet_layer(inputs,
                 num_filters=16,
                 kernel_size=3,
                 strides=1,
                 activation='relu',
                 batch_normalization=True,
                 conv_first=False):
    conv=Conv2D(num_filters,
                  kernel_size=kernel_size,
                  strides=strides,
                  padding='same',
                  kernel_initializer='he_normal',
                  kernel_regularizer=l2(1e-4))

    x=inputs
    if conv_first:
        x = conv(x)
        if batch_normalization:
            x = BatchNormalization()(x)
        if activation is not None:
            x = Activation(activation)(x)
    else:
        if batch_normalization:
            x = BatchNormalization()(x)
        if activation is not None:
            x = Activation(activation)(x)
        x = conv(x)
    return x

In [None]:
# ResNet V2 architecture
def resnet_v2(input_shape, depth, num_classes=20):
    if (depth - 2) % 9 != 0:
        raise ValueError('depth should be 9n + 2 (eg 56 or 110 in [b])')
    # Start model definition.
    num_filters_in = 16 #8 # 16
    num_res_blocks = int((depth - 2) / 9)

    inputs = Input(shape=input_shape)
    # V2 performs Conv2D with BN-ReLU on input before splitting into 2 paths
    x = resnet_layer(inputs=inputs,
                     num_filters=num_filters_in,
                     conv_first=True)

    # Instantiate the stack of residual units
    for stage in range(3):
        for res_block in range(num_res_blocks):
            activation = 'relu'
            batch_normalization = True
            strides = 1
            if stage == 0:
                num_filters_out = num_filters_in * 4
                if res_block == 0:  # first layer and first stage
                    activation = None
                    batch_normalization = False
            else:
                num_filters_out = num_filters_in * 2
                if res_block == 0:  # first layer but not first stage
                    strides = 2    # downsample

            # bottleneck residual unit
            y = resnet_layer(inputs=x,
                             num_filters=num_filters_in,
                             kernel_size=1,
                             strides=strides,
                             activation=activation,
                             batch_normalization=batch_normalization,
                             conv_first=False)
            y = resnet_layer(inputs=y,
                             num_filters=num_filters_in,
                             conv_first=False)
            y = resnet_layer(inputs=y,
                             num_filters=num_filters_out,
                             kernel_size=1,
                             conv_first=False)
            if res_block == 0:
                # linear projection residual shortcut connection to match changed dims
                x = resnet_layer(inputs=x,
                                 num_filters=num_filters_out,
                                 kernel_size=1,
                                 strides=strides,
                                 activation=None,
                                 batch_normalization=False)
            x = keras.layers.add([x, y])

        num_filters_in = num_filters_out

    # Add classifier on top.
    # v2 has BN-ReLU before Pooling
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = AveragePooling2D(pool_size=8)(x)
    y = Flatten()(x)
    outputs = Dense(num_classes,
                    activation='softmax',
                    kernel_initializer='he_normal')(y)
    # outputs = idx_max(outputs)
    print("outputs are ", outputs)

    # Instantiate model.
    model = Model(inputs=inputs, outputs=outputs)
    return model

In [None]:
# Main function
if version == 2:
    model = resnet_v2(input_shape = input_shape, depth = depth)

model.compile(loss ='categorical_crossentropy',
              optimizer = Adam(learning_rate = lr_schedule(0)),
              metrics =['accuracy', tf.keras.metrics.Recall(), tf.keras.metrics.Precision(),
                        tf.keras.metrics.FalseNegatives(),
                        tf.keras.metrics.TrueNegatives(),
                        tf.keras.metrics.FalsePositives(),
                        tf.keras.metrics.TruePositives()])

model.summary()
print(model_type)

# Prepare model saving directory.
save_dir = os.path.join(os.getcwd(), 'saved_models')
model_name = 'eye_classifier_% s_model.{epoch:03d}.h5' % model_type
if not os.path.isdir(save_dir):
    os.makedirs(save_dir)
filepath = os.path.join(save_dir, model_name)

# Prepare callbacks for model saving and for learning rate adjustment.
checkpoint = ModelCheckpoint(filepath = filepath,
                             monitor ='val_acc',
                             verbose = 1,
                             save_best_only = True)

lr_scheduler = LearningRateScheduler(lr_schedule)

lr_reducer = ReduceLROnPlateau(factor = np.sqrt(0.1),
                               cooldown = 0,
                               patience = 5,
                               min_lr = 0.5e-6)

callbacks = [checkpoint, lr_reducer, lr_scheduler]

# Run training
model.fit(x_train, y_train,
          batch_size = batch_size,
          epochs = epochs,
          validation_data =(x_test, y_test),
          shuffle = True,
          callbacks = callbacks)

# Score trained model.
scores = model.evaluate(x_test, y_test, verbose = 1)
print('Test loss:', scores[0])
print('Test accuracy:', scores[1])

In [None]:
model.save('/content/resnet_model')

In [None]:
resnet_model = keras.models.load_model('/content/resnet_model')

In [None]:
# Preprocess x_test the way it is preprocessed for the ResNet
# Normalize data.
x_train = x_train.astype('float32') / 255
x_test = x_test.astype('float32') / 255

# Subtract pixel mean
x_train_mean = np.mean(x_train, axis = 0)
x_train -= x_train_mean
x_test -= x_train_mean

In [None]:
# Obtain y_pred
y_pred = resnet_model.predict(x_test)

In [None]:
# Change y_pred from one-hot to int
y_pred_toInt = []
for i in range(np.shape(y_pred)[0]):
  y_pred_toInt.append(np.argmax(y_pred[i]))

In [None]:
# Quick check for accuracy
wrong = 0
correct = 0
for i in range(np.shape(y_pred_toInt)[0]):
  if y_pred_toInt[i] != y_test[i]:
    wrong += 1
  else:
    correct += 1
print("wrong: ", wrong)
print("Correct: ", correct)

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns
import numpy as np

y_true = y_test
y_pred = y_pred_toInt

categories = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

cf_matrix = confusion_matrix(y_true, y_pred, labels = categories

row_sums = cf_matrix.sum(axis=1)
df_norm_col = cf_matrix / row_sums[:, np.newaxis]

ax = sns.heatmap(df_norm_col, cmap='viridis', annot=False)
ax.xaxis.tick_top()

In [None]:
# Change y_test and y_pred to one-hot
y_onehot_test = keras.utils.to_categorical(y_test, 20)
y_onehot_pred = keras.utils.to_categorical(y_pred, 20)

In [None]:
import matplotlib.pyplot as plt
from sklearn.multiclass import OneVsRestClassifier
from sklearn.metrics import roc_curve, auc, RocCurveDisplay
from itertools import cycle

# Calculate the AUC of the ROC
n_classes = 20
fpr = dict()
tpr = dict()
roc_auc = dict()
for i in range(n_classes):
    yot = y_onehot_test[:][i]
    yop = y_onehot_pred[:][i]
    fpr[i], tpr[i], _ = roc_curve(yot, yop)
    roc_auc[i] = auc(fpr[i], tpr[i])
roc_auc

In [None]:
# Quick check for accuracy
sum(roc_auc.values()) / n_classes

In [None]:
# Plot ROC curve
fig, ax = plt.subplots(figsize=(10, 10))
colors = cycle(["aqua", "darkorange", "cornflowerblue"])
for class_id, color in zip(range(n_classes), colors):
    RocCurveDisplay.from_predictions(
        y_onehot_test[:, class_id],
        y_onehot_pred[:, class_id],
        name=f"ROC curve for {class_id}",
        color=color,
        ax=ax,
    )
plt.plot([0, 1], [0, 1], "k--", label="chance level (AUC = 0.5)")