# Computer Vision Analysis
### Adding RetinaNet to our model
#### By Ronny Toribio, Kadir O. Altunel, Michael Cook-Stahl
#### Based on [Hands on Machine Learning 2nd edition](https://github.com/ageron/handson-ml2/), [FER2013 candidate 1](https://www.kaggle.com/code/ritikjain00/model-training-fer-13) and [FER2013 candidate 2](https://www.kaggle.com/code/gauravsharma99/facial-emotion-recognition/notebook)

### Import modules

In [None]:
%matplotlib inline
import os
import os.path
import shutil
from glob import glob
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.model_selection import ParameterGrid
import tensorflow as tf
from tensorflow.keras import Input
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.models import Model, Sequential, model_from_json
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dropout, Dense, BatchNormalization, Layer, UpSampling2D
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, CSVLogger, ModelCheckpoint 
from google.colab import drive

### Constants

In [None]:
BATCH_SIZE = 32
IMAGE_ZOOM = 0.3
IMAGE_SHAPE = (48, 48)
INPUT_SHAPE = (48, 48, 1)
TRAIN_DIR = os.path.join("fer2013", "train")
TEST_DIR = os.path.join("fer2013", "test")
GOOGLE_DRIVE_PATH = "/content/drive/MyDrive/Machine Learning/GridSearch"

### Utility functions (Bounding Boxes)

In [None]:
def swap_xy(boxes):
    return tf.stack([boxes[:, 1], boxes[:, 0], boxes[:, 3], boxes[:, 2]], axis=-1)


def convert_to_xywh(boxes):
    return tf.concat(
        [(boxes[..., :2] + boxes[..., 2:]) / 2.0, boxes[..., 2:] - boxes[..., :2]],
        axis=-1,
    )

def convert_to_corners(boxes):
    return tf.concat(
        [boxes[..., :2] - boxes[..., 2:] / 2.0, boxes[..., :2] + boxes[..., 2:] / 2.0],
        axis=-1,
    )

### Computing pairwise Intersection Over Union (Bounding Boxes)

In [None]:
def compute_iou(boxes1, boxes2):
    boxes1_corners = convert_to_corners(boxes1)
    boxes2_corners = convert_to_corners(boxes2)
    lu = tf.maximum(boxes1_corners[:, None, :2], boxes2_corners[:, :2])
    rd = tf.minimum(boxes1_corners[:, None, 2:], boxes2_corners[:, 2:])
    intersection = tf.maximum(0.0, rd - lu)
    intersection_area = intersection[:, :, 0] * intersection[:, :, 1]
    boxes1_area = boxes1[:, 2] * boxes1[:, 3]
    boxes2_area = boxes2[:, 2] * boxes2[:, 3]
    union_area = tf.maximum(
        boxes1_area[:, None] + boxes2_area - intersection_area, 1e-8
    )
    return tf.clip_by_value(intersection_area / union_area, 0.0, 1.0)

### AnchorBox (Bounding Boxes)

In [None]:
class AnchorBox:
    def __init__(self):
        self.aspect_ratios = [0.5, 1.0, 2.0]
        self.scales = [2 ** x for x in [0, 1 / 3, 2 / 3]]
        self._num_anchors = len(self.aspect_ratios) * len(self.scales)
        self._strides = [2 ** i for i in range(3, 8)]
        self._areas = [x ** 2 for x in [32.0, 64.0, 128.0, 256.0, 512.0]]
        self._anchor_dims = self._compute_dims()

    def _compute_dims(self):
        anchor_dims_all = []
        for area in self._areas:
            anchor_dims = []
            for ratio in self.aspect_ratios:
                anchor_height = tf.math.sqrt(area / ratio)
                anchor_width = area / anchor_height
                dims = tf.reshape(
                    tf.stack([anchor_width, anchor_height], axis=-1), [1, 1, 2]
                )
                for scale in self.scales:
                    anchor_dims.append(scale * dims)
            anchor_dims_all.append(tf.stack(anchor_dims, axis=-2))
        return anchor_dims_all

    def _get_anchors(self, feature_height, feature_width, level):
        rx = tf.range(feature_width, dtype=tf.float32) + 0.5
        ry = tf.range(feature_height, dtype=tf.float32) + 0.5
        centers = tf.stack(tf.meshgrid(rx, ry), axis=-1) * self._strides[level - 3]
        centers = tf.expand_dims(centers, axis=-2)
        centers = tf.tile(centers, [1, 1, self._num_anchors, 1])
        dims = tf.tile(
            self._anchor_dims[level - 3], [feature_height, feature_width, 1, 1]
        )
        anchors = tf.concat([centers, dims], axis=-1)
        return tf.reshape(
            anchors, [feature_height * feature_width * self._num_anchors, 4]
        )

    def get_anchors(self, image_height, image_width):
        anchors = [
            self._get_anchors(
                tf.math.ceil(image_height / 2 ** i),
                tf.math.ceil(image_width / 2 ** i),
                i,
            )
            for i in range(3, 8)
        ]
        return tf.concat(anchors, axis=0)

### Build Head Function (Bounding Boxes)

In [None]:
def build_head(output_filters, bias_init):
    head = Sequential([tensorflow.keras.Input(shape=[None, None, 256])])
    kernel_init = tf.initializers.RandomNormal(0.0, 0.01)
    for _ in range(4):
        head.add(Conv2D(256, kernel_size=(3, 3), padding="same", activation="relu", kernel_initializer=kernel_init))
    head.add(Conv2D(output_filters, 3, 1, padding="same", kernel_initializer=kernel_init, bias_initializer=bias_init,))
    return head

### Feature Pyramid Network as a Keras Layer (Bounding Boxes)

In [None]:
class FeaturePyramid(Layer):
    def __init__(self, backbone=None, **kwargs):
        super(FeaturePyramid, self).__init__(name="FeaturePyramid", **kwargs)
        self.backbone = backbone if backbone else get_backbone()
        self.conv_c3_1x1 = Conv2D(256, 1, 1, "same")
        self.conv_c4_1x1 = Conv2D(256, 1, 1, "same")
        self.conv_c5_1x1 = Conv2D(256, 1, 1, "same")
        self.conv_c3_3x3 = Conv2D(256, 3, 1, "same")
        self.conv_c4_3x3 = Conv2D(256, 3, 1, "same")
        self.conv_c5_3x3 = Conv2D(256, 3, 1, "same")
        self.conv_c6_3x3 = Conv2D(256, 3, 2, "same")
        self.conv_c7_3x3 = Conv2D(256, 3, 2, "same")
        self.upsample_2x = UpSampling2D(2)

    def call(self, images, training=False):
        c3_output, c4_output, c5_output = self.backbone(images, training=training)
        p3_output = self.conv_c3_1x1(c3_output)
        p4_output = self.conv_c4_1x1(c4_output)
        p5_output = self.conv_c5_1x1(c5_output)
        p4_output = p4_output + self.upsample_2x(p5_output)
        p3_output = p3_output + self.upsample_2x(p4_output)
        p3_output = self.conv_c3_3x3(p3_output)
        p4_output = self.conv_c4_3x3(p4_output)
        p5_output = self.conv_c5_3x3(p5_output)
        p6_output = self.conv_c6_3x3(c5_output)
        p7_output = self.conv_c7_3x3(tf.nn.relu(p6_output))
        return p3_output, p4_output, p5_output, p6_output, p7_output

### LabelEncoder (Bounding Boxes)

In [None]:
class LabelEncoder:
    def __init__(self):
        self._anchor_box = AnchorBox()
        self._box_variance = tf.convert_to_tensor(
            [0.1, 0.1, 0.2, 0.2], dtype=tf.float32
        )

    def _match_anchor_boxes(
        self, anchor_boxes, gt_boxes, match_iou=0.5, ignore_iou=0.4
    ):
        iou_matrix = compute_iou(anchor_boxes, gt_boxes)
        max_iou = tf.reduce_max(iou_matrix, axis=1)
        matched_gt_idx = tf.argmax(iou_matrix, axis=1)
        positive_mask = tf.greater_equal(max_iou, match_iou)
        negative_mask = tf.less(max_iou, ignore_iou)
        ignore_mask = tf.logical_not(tf.logical_or(positive_mask, negative_mask))
        return (
            matched_gt_idx,
            tf.cast(positive_mask, dtype=tf.float32),
            tf.cast(ignore_mask, dtype=tf.float32),
        )

    def _compute_box_target(self, anchor_boxes, matched_gt_boxes):
        box_target = tf.concat(
            [
                (matched_gt_boxes[:, :2] - anchor_boxes[:, :2]) / anchor_boxes[:, 2:],
                tf.math.log(matched_gt_boxes[:, 2:] / anchor_boxes[:, 2:]),
            ],
            axis=-1,
        )
        box_target = box_target / self._box_variance
        return box_target

    def _encode_sample(self, image_shape, gt_boxes, cls_ids):
        anchor_boxes = self._anchor_box.get_anchors(image_shape[1], image_shape[2])
        cls_ids = tf.cast(cls_ids, dtype=tf.float32)
        matched_gt_idx, positive_mask, ignore_mask = self._match_anchor_boxes(anchor_boxes, gt_boxes)
        matched_gt_boxes = tf.gather(gt_boxes, matched_gt_idx)
        box_target = self._compute_box_target(anchor_boxes, matched_gt_boxes)
        matched_gt_cls_ids = tf.gather(cls_ids, matched_gt_idx)
        cls_target = tf.where(tf.not_equal(positive_mask, 1.0), -1.0, matched_gt_cls_ids)
        cls_target = tf.where(tf.equal(ignore_mask, 1.0), -2.0, cls_target)
        cls_target = tf.expand_dims(cls_target, axis=-1)
        label = tf.concat([box_target, cls_target], axis=-1)
        return label

    def encode_batch(self, batch_images, gt_boxes, cls_ids):
        images_shape = tf.shape(batch_images)
        batch_size = images_shape[0]
        labels = tf.TensorArray(dtype=tf.float32, size=batch_size, dynamic_size=True)
        for i in range(batch_size):
            label = self._encode_sample(images_shape, gt_boxes[i], cls_ids[i])
            labels = labels.write(i, label)
        batch_images = tf.keras.applications.resnet.preprocess_input(batch_images)
        return batch_images, labels.stack()

### Retinanet (Bounding Boxes)

In [None]:
class RetinaNet(Model):
    def __init__(self, num_classes, backbone=None, **kwargs):
        super(RetinaNet, self).__init__(name="RetinaNet", **kwargs)
        self.fpn = FeaturePyramid(backbone)
        self.num_classes = num_classes
        prior_probability = tf.constant_initializer(-np.log((1 - 0.01) / 0.01))
        self.cls_head = build_head(9 * num_classes, prior_probability)
        self.box_head = build_head(9 * 4, "zeros")

    def call(self, image, training=False):
        features = self.fpn(image, training=training)
        N = tf.shape(image)[0]
        cls_outputs = []
        box_outputs = []
        for feature in features:
            box_outputs.append(tf.reshape(self.box_head(feature), [N, -1, 4]))
            cls_outputs.append(tf.reshape(self.cls_head(feature), [N, -1, self.num_classes]))
        cls_outputs = tf.concat(cls_outputs, axis=1)
        box_outputs = tf.concat(box_outputs, axis=1)
        return tf.concat([box_outputs, cls_outputs], axis=-1)

### RetinaNetBoxLoss (Bounding Boxes)

In [None]:
class RetinaNetBoxLoss(tf.losses.Loss):
    def __init__(self, delta):
        super(RetinaNetBoxLoss, self).__init__(reduction="none", name="RetinaNetBoxLoss")
        self._delta = delta

    def call(self, y_true, y_pred):
        difference = y_true - y_pred
        absolute_difference = tf.abs(difference)
        squared_difference = difference ** 2
        loss = tf.where(
            tf.less(absolute_difference, self._delta),
            0.5 * squared_difference,
            absolute_difference - 0.5,
        )
        return tf.reduce_sum(loss, axis=-1)

### RetinaNetClassificationLoss

In [None]:
class RetinaNetClassificationLoss(tf.losses.Loss):
    def __init__(self, alpha, gamma):
        super(RetinaNetClassificationLoss, self).__init__(reduction="none", name="RetinaNetClassificationLoss")
        self._alpha = alpha
        self._gamma = gamma

    def call(self, y_true, y_pred):
        cross_entropy = tf.nn.sigmoid_cross_entropy_with_logits(labels=y_true, logits=y_pred)
        probs = tf.nn.sigmoid(y_pred)
        alpha = tf.where(tf.equal(y_true, 1.0), self._alpha, (1.0 - self._alpha))
        pt = tf.where(tf.equal(y_true, 1.0), probs, 1 - probs)
        loss = alpha * tf.pow(1.0 - pt, self._gamma) * cross_entropy
        return tf.reduce_sum(loss, axis=-1)

### RetinaNetLoss

In [None]:
class RetinaNetLoss(tf.losses.Loss):
    def __init__(self, num_classes=80, alpha=0.25, gamma=2.0, delta=1.0):
        super(RetinaNetLoss, self).__init__(reduction="auto", name="RetinaNetLoss")
        self._clf_loss = RetinaNetClassificationLoss(alpha, gamma)
        self._box_loss = RetinaNetBoxLoss(delta)
        self._num_classes = num_classes

    def call(self, y_true, y_pred):
        y_pred = tf.cast(y_pred, dtype=tf.float32)
        box_labels = y_true[:, :, :4]
        box_predictions = y_pred[:, :, :4]
        cls_labels = tf.one_hot(
            tf.cast(y_true[:, :, 4], dtype=tf.int32),
            depth=self._num_classes,
            dtype=tf.float32,
        )
        cls_predictions = y_pred[:, :, 4:]
        positive_mask = tf.cast(tf.greater(y_true[:, :, 4], -1.0), dtype=tf.float32)
        ignore_mask = tf.cast(tf.equal(y_true[:, :, 4], -2.0), dtype=tf.float32)
        clf_loss = self._clf_loss(cls_labels, cls_predictions)
        box_loss = self._box_loss(box_labels, box_predictions)
        clf_loss = tf.where(tf.equal(ignore_mask, 1.0), 0.0, clf_loss)
        box_loss = tf.where(tf.equal(positive_mask, 1.0), box_loss, 0.0)
        normalizer = tf.reduce_sum(positive_mask, axis=-1)
        clf_loss = tf.math.divide_no_nan(tf.reduce_sum(clf_loss, axis=-1), normalizer)
        box_loss = tf.math.divide_no_nan(tf.reduce_sum(box_loss, axis=-1), normalizer)
        loss = clf_loss + box_loss
        return loss

### Mount Google Drive and unzip dataset

In [None]:
drive.mount('/content/drive')
os.makedirs(GOOGLE_DRIVE_PATH, exist_ok=True)
!unzip /content/drive/MyDrive/FER.zip

### Load Facial Emotion Recognition dataset
#### training, validation, and testing

In [None]:
train_datagen = ImageDataGenerator(rescale=1./255, zoom_range=IMAGE_ZOOM, horizontal_flip=True,
                                   validation_split=0.10)
Xy_train = train_datagen.flow_from_directory(TRAIN_DIR, batch_size=BATCH_SIZE, 
                                   target_size=IMAGE_SHAPE, shuffle=True, subset="training",
                                   color_mode="grayscale", class_mode="categorical")

Xy_valid = train_datagen.flow_from_directory(TRAIN_DIR, batch_size=BATCH_SIZE, 
                                   target_size=IMAGE_SHAPE, shuffle=True, subset="validation",
                                   color_mode="grayscale", class_mode="categorical")

test_datagen = ImageDataGenerator(rescale=1./255)
Xy_test = test_datagen.flow_from_directory(TEST_DIR, batch_size=BATCH_SIZE,
                                   target_size=IMAGE_SHAPE, shuffle=True,
                                   color_mode="grayscale", class_mode="categorical")

### Load our CNN model from JSON and HDF5

In [None]:
with open("cnn_model.json", "r") as json_model_file:
    json_model_str = json_model_file.read()
model = model_from_json(json_model_str)
model.load_weights("cnn_model_weights.h5")

### Remove classification block layers and freeze the rest

In [None]:
model = Sequential(model.layers[:-5])
for layer in model.layers:
    layer.trainable = False

### Attach RetinaNet to our model

In [None]:
model = RetinaNet(7, model)

### Compile our new CNN model

In [None]:
loss_fn = RetinaNetLoss(7)
learning_rates = [2.5e-06, 0.000625, 0.00125, 0.0025, 0.00025, 2.5e-05]
learning_rate_boundaries = [125, 250, 500, 240000, 360000]
learning_rate_fn = tf.optimizers.schedules.PiecewiseConstantDecay(
    boundaries=learning_rate_boundaries, values=learning_rates
)

model.compile(loss=loss_fn, optimizer=Nadam(learning_rate=learning_rate_fn, beta_1=0.9, epsilon=1e-07),
    metrics=["accuracy"])

### Save our new CNN model architecture as JSON

In [None]:
model_json2 = model.to_json()
with open("cnn_model2.json") as json_model_file2:
    f.write(model_json2)

### Model Checkpoint

In [None]:
checkpoint_cb = ModelCheckpoint("cnn_model2_weights.h5", monitor = 'accuracy', verbose =1, 
                                save_best_only = True, save_weights_only = True)

### Early stopping callback

In [None]:
early_stopping_cb = EarlyStopping(min_delta=0.00005, patience=11, verbose=1, restore_best_weights=True)

### Reduce learning rate on plateau callback

In [None]:
reduce_lr_cb = ReduceLROnPlateau(factor=0.5, patience=7, min_l=1e-7, verbose=1)

### CSV Logger Callback

In [None]:
csv_cb = CSVLogger("cnn_model2_training.csv")

### Train our new CNN model

In [None]:
history = cur_model.fit(Xy_train, epochs=80,
                        validation_data=(Xy_valid),
                        steps_per_epoch=Xy_train.n // BATCH_SIZE,
                        validation_steps=Xy_valid.n // BATCH_SIZE,
                        callbacks=[early_stopping_cb, reduce_lr_cb, csv_cb, checkpoint_cb])

### Evaluate new CNN model

In [None]:
print("Evaluate train set")
model.evaluate(Xy_train)
print("Evaluate validation set")
model.evaluate(Xy_valid)
print("Evaluate test set")
model.evaluate(Xy_test)

### Convert histories into graph images

In [None]:
training = pd.read_csv("cnn_model2_training.csv", index_col="epoch")
fig, ax1 = plt.subplot(1, 1)
ax2 = ax1.twinx()
ax1.set_xlabel("Epochs")
ax1.set_ylabel("Accuracy", color="b")
ax2.set_ylabel("Loss", color="y")
ax1.plot(training["epoch"], training["accuracy"], "b-", label="accuracy")
ax1.plot(training["epoch"], training["val_accuracy"], "r-", label="val_accuracy")
ax1.plot(training["epoch"], training["lr"], "g-", label="lr")
ax2.plot(training["epoch"], training["loss"], "y-", label="loss")
ax2.plot(training["epoch"], training["val_loss"], "m-", label="val_loss")
h1, l1 = ax1.get_legend_handles_labels()
h2, l2 = ax2.get_legend_handles_labels()
plt.legend(h1+h2, l1+l2)
plt.title(name)
plt.savefig(name + ".png")

### Copy models to Google Drive

In [None]:
for f in os.listdir("."):
    shutil.copy(f, GOOGLE_DRIVE_PATH)