In [1]:
import xml.etree.ElementTree as ET
import pandas as pd
import lxml
import os
import matplotlib.image as mpimg
import matplotlib.pyplot as plt
import tensorflow as tf 
import numpy as np 
from sklearn.model_selection import train_test_split
import pickle
import cv2
import matplotlib.patches as patches

In [2]:
""" setting up google drive """

from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
""" defining data directories used later """

MOUNTPOINT = '/content/drive'
DATADIR = os.path.join(MOUNTPOINT, 'MyDrive', 'Project_Face_Mask')
DATADIR_annotations = os.path.join(DATADIR, 'Annotations')

In [4]:
""" loading test / train datasets that have been generated using "data_generation.ipynb" """

ds_train = tf.data.experimental.load(os.path.join(DATADIR, 'train_ds_FRCNN'))
ds_test = tf.data.experimental.load(os.path.join(DATADIR, 'test_ds_FRCNN'))

In [5]:
def prepare_mnist_data(ds):

    """ convert data from uint8 to float32 """
    ds = ds.map(lambda img, label, boxs: (tf.cast(img, tf.float32), label, boxs))

    """ convert images to grayscale """
    ds = ds.map(lambda img, label, boxs: (tf.image.rgb_to_grayscale(img), label, boxs))

    """ create one-hot targets """
    ds = ds.map(lambda img, label, boxs: (img, tf.one_hot(label, depth=3), boxs))
    
    """ shuffle, batching only with only 1 sample due to memory reasons"""
    ds = ds.shuffle(1000).batch(1)
    
    return ds

ds_train = prepare_mnist_data(ds_train)
ds_test = prepare_mnist_data(ds_test)

In [6]:
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Dense, Dropout, Flatten
from tensorflow.keras import backend as K

class Partial_VGG(tf.keras.Model):
    """ Backbone CNN to extract feature maps for further classification """
    def __init__(self): 
        super(Partial_VGG, self).__init__()
        self.conv_1_1 = Conv2D(32, (3, 3), activation='relu', padding='same')
        self.conv_1_2 = Conv2D(32, (3, 3), activation='relu', padding='same')
        self.pool_1 = MaxPooling2D((2, 2), strides=(2, 2))
        
        self.conv_2_1 = Conv2D(64, (3, 3), activation='relu', padding='same')
        self.conv_2_2 = Conv2D(64, (3, 3), activation='relu', padding='same')
        self.pool_2 = MaxPooling2D((2, 2), strides=(2, 2))
        
        self.conv_3_1 = Conv2D(128, (3, 3), activation='relu', padding='same')
        self.conv_3_2 = Conv2D(128, (3, 3), activation='relu', padding='same')
        self.conv_3_3 = Conv2D(128, (3, 3), activation='relu', padding='same')
        
        self.pool_3 = MaxPooling2D((2, 2), strides=(2, 2))
        
        self.conv_4_1 = Conv2D(256, (3, 3), activation='relu', padding='same')
        self.conv_4_2 = Conv2D(256, (3, 3), activation='relu', padding='same')
        self.conv_4_3 = Conv2D(256, (3, 3), activation='relu', padding='same')
        self.pool_4 = MaxPooling2D((2, 2), strides=(2, 2))
        
        self.conv_5_1 = Conv2D(256, (3, 3), activation='relu', padding='same')
        self.conv_5_2 = Conv2D(256, (3, 3), activation='relu', padding='same')
        self.conv_5_3 = Conv2D(256, (3, 3), activation='relu', padding='same')
        
    def call(self, img):
        feat_extr = self.conv_1_1(img)
        feat_extr = self.conv_1_2(feat_extr)
        feat_extr = self.pool_1(feat_extr)
        
        feat_extr = self.conv_2_1(feat_extr)
        feat_extr = self.conv_2_2(feat_extr)
        feat_extr = self.pool_2(feat_extr)
        
        feat_extr = self.conv_3_1(feat_extr)
        feat_extr = self.conv_3_2(feat_extr)
        feat_extr = self.conv_3_3(feat_extr)
        
        feat_extr = self.pool_3(feat_extr)
        
        feat_extr = self.conv_4_1(feat_extr)
        feat_extr = self.conv_4_2(feat_extr)
        feat_extr = self.conv_4_3(feat_extr)
        feat_extr = self.pool_4(feat_extr)
    
        feat_extr = self.conv_5_1(feat_extr)
        feat_extr = self.conv_5_2(feat_extr)
        feat_extr = self.conv_5_3(feat_extr)
        
        return feat_extr

In [7]:
class RPN_Layer(tf.keras.layers.Layer):
    """
    Region Proposal Network which creates Region Proposals for object detection 
    
        returns the input for the ROI Pooling after beeing transformed in the rpn_to_roi method
    """
    def __init__(self, num_anchors): 
        super(RPN_Layer, self).__init__()
        
        """ backbone CNN layer before classification / regression """
        self.conv_1 = Conv2D(512, (3, 3), padding='same', activation='relu')
        
        """ classification whether an object is found or not """ 
        self.classif = Conv2D(num_anchors, (1, 1), activation='sigmoid')
        
        """ regression for proposed bounding boxes  """
        self.regress = Conv2D(num_anchors * 4, (1, 1), activation='linear')
        
    def call(self, feat_map):
        feat_extr = self.conv_1(feat_map)
        obj_class = self.classif(feat_extr)
        box_regr = self.regress(feat_extr)
        
        return [obj_class, box_regr, feat_map]
    
class RoiPooling(tf.keras.layers.Layer):
    """
    Pooling of the region proposals into a fixed size (7x7) in order to feed
    them into the classification layer 
    """
    def __init__(self, pool_size, num_rois):
        super(RoiPooling, self).__init__()
        
        self.pool_size = pool_size
        self.num_rois = num_rois

    def build(self, input_shape):
        self.nb_channels = input_shape[0][3]  

    def call(self, img_rois):
        img, rois = img_rois
        input_shape = tf.shape(img)

        outputs = []

        for nb_roi in range(self.num_rois):
            """
            extracting coordinates of bounding boxes and resize the specific
            region to the given pool size 7x7
            """
            x_min = int(rois[nb_roi, 0])
            y_min = int(rois[nb_roi, 1])
            x_max = int(rois[nb_roi, 2])
            y_max = int(rois[nb_roi, 3])      

            pooled_pict = tf.image.resize(img[:, y_min:y_max, x_min:x_max, :], (self.pool_size, self.pool_size))
            outputs.append(pooled_pict)           
        
        roi_results = np.concatenate(outputs, axis=0)
        roi_results = np.expand_dims(roi_results, 0)

        return roi_results

class Class_Layer(tf.keras.layers.Layer):
    """ Layer that that processes the output from RPN """
    def __init__(self, num_rois, nb_classes = 3): 
        super(Class_Layer, self).__init__()
        
        """ define the pool size """
        self.nb_pooling_regions = 7
        
        """ last part of the Faster RCNN architecture """
        self.roi_pool = RoiPooling(self.nb_pooling_regions, num_rois)
        self.flatten = Flatten()
        self.dense_1 = Dense(2048, activation='relu')
        self.dropout_1 = Dropout(0.5)
        self.dense_2 = Dense(2048, activation='relu')
        self.dropout_2 = Dropout(0.5)
        
        """ 
        softmax classification for prediciting whether the specific image
        with bounding box inherits a incorrectly worn mask, no mask or a correctly
        worn mask
        """
        self.classif = Dense(nb_classes, activation='softmax', kernel_initializer='zero')
        
        """ regression for the specific bounding box """
        self.regr = Dense(4, activation='linear', kernel_initializer='zero')
    
    def call(self, feat_map, input_rois):
        feat_extr = self.roi_pool([feat_map, input_rois])
        feat_extr = self.flatten(feat_extr)
        feat_extr = self.dense_1(feat_extr)
        feat_extr = self.dropout_1(feat_extr)
        feat_extr = self.dense_2(feat_extr)
        feat_extr = self.dropout_2(feat_extr)
        
        out_classif = self.classif(feat_extr)
        out_regr = self.regr(feat_extr)
        
        return [out_classif, out_regr]

class FasterRCNN(tf.keras.Model):
  """ model that brings together all parts of the Faster RCNN """
  def __init__(self): 
      super(FasterRCNN, self).__init__()
      self.vgg = Partial_VGG()
      self.rpn = RPN_Layer(9)
      self.classif = Class_Layer(16)
  
  def call(self, img):
      feat_extr = self.vgg(img)
      
      x_class, x_regr, feat_map = self.rpn(feat_extr)
      
      regions_of_interest = rpn_to_roi(x_regr, x_class)
      
      out_class, out_regr = self.classif(feat_map, regions_of_interest)

      return out_class, out_regr

In [8]:
def apply_regr(anchor, regr):
    """
    method that offests the regressed region proposals with the respective anchor 
    
    Parameters:
        anchor: position of the bounding box
        regr: region proposal 
    
    Returns:
        x_min, y_min, width, height
    """
    
    """ extracting coordinates from box/anchor """
    x = anchor[0, :, :]
    y = anchor[1, :, :]
    w = anchor[2, :, :]
    h = anchor[3, :, :]

    anch_x = regr[0, :, :]
    anch_y = regr[1, :, :]
    anch_w = regr[2, :, :]
    anch_h = regr[3, :, :]
    
    """ calculating middle point of regr box """
    mx = x + w/2.
    my = y + h/2.
    
    """ offsetting the regr box with the anchor box """
    mx1 = anch_x * w + mx
    my1 = anch_y * h + my
    
    """ rounding and calculating the exponential / product"""
    w1 = np.round(np.exp(anch_w.astype(np.float64)) * w)
    h1 = np.round(np.exp(anch_h.astype(np.float64)) * h)
    x1 = np.round(mx1 - w1/2.)
    y1 = np.round(my1 - h1/2.)
    
    return np.stack([x1, y1, w1, h1])
 
def rpn_to_roi(x_regr, x_class, max_boxes=16, rpn_stride=16):
    """
    transform region proposals that they can be fed to the roi_pooling layer
    
    Parameters:
        x_regr: bounding box proposals 
        x_class: classification probabilities
        max_boxes: maximum boxes to extract per image 
        rpn_stride: summation of strides in backbone CNN
    
    Returns:
        regions_of_interest: maximum max_boxes number of bounding boxes 
    """
    
    anchor_no = 0
    
    """ define default anchor_sizes and anchor_ratios"""
    anchor_sizes = [128, 256, 512]
    anchor_ratios = [(1,1), (1,2*np.sqrt(2)), (2*np.sqrt(2),1)]
    
    """ define size of rpn table """
    (rows, cols) = x_class.shape[1:3]
    calc_rpns = np.zeros((4, x_class.shape[1], x_class.shape[2], x_class.shape[3]))
    
    for anchor_size in anchor_sizes:
        for anchor_ratio in anchor_ratios:
            """ anchor_y = (128 * 2) / 16 = 16 => height of current anchor """ 
            anchor_x = (anchor_size * anchor_ratio[0]) / rpn_stride
            anchor_y = (anchor_size * anchor_ratio[1]) / rpn_stride
            
            """ reshape bounding boxes """ 
            regr = x_regr[0, :, :, 4 * anchor_no:4 * anchor_no + 4] 
            regr = np.transpose(regr, (2, 0, 1)) 
            
            """ creates N-D coordinate array for vectorized evaluations """
            X, Y = np.meshgrid(np.arange(cols),np.arange(rows))
            
            """ get coordinates, width and height of current anchor """
            calc_rpns[0, :, :, anchor_no] = X - anchor_x/2 
            calc_rpns[1, :, :, anchor_no] = Y - anchor_y/2 
            calc_rpns[2, :, :, anchor_no] = anchor_x      
            calc_rpns[3, :, :, anchor_no] = anchor_y       
            
            """ offseting the anchor with the given regression boxes """
            calc_rpns[:, :, :, anchor_no] = apply_regr(calc_rpns[:, :, :, anchor_no], regr)
            
            """ width and height should be at maximum 1 """
            calc_rpns[2, :, :, anchor_no] = np.maximum(1, calc_rpns[2, :, :, anchor_no])
            calc_rpns[3, :, :, anchor_no] = np.maximum(1, calc_rpns[3, :, :, anchor_no])
            
            """ get x_max and y_max by addding the height/width to the respective coordinates """
            calc_rpns[2, :, :, anchor_no] += calc_rpns[0, :, :, anchor_no]
            calc_rpns[3, :, :, anchor_no] += calc_rpns[1, :, :, anchor_no]

            """ make sure that the boundung boxes are inside the feature map """
            calc_rpns[0, :, :, anchor_no] = np.maximum(0, calc_rpns[0, :, :, anchor_no])
            calc_rpns[1, :, :, anchor_no] = np.maximum(0, calc_rpns[1, :, :, anchor_no])
            calc_rpns[2, :, :, anchor_no] = np.minimum(cols-1, calc_rpns[2, :, :, anchor_no])
            calc_rpns[3, :, :, anchor_no] = np.minimum(rows-1, calc_rpns[3, :, :, anchor_no])
            
            anchor_no += 1
    
    """ reshaping the boxes and the probabilities into usable format (4050, N)"""
    boxes = np.reshape(calc_rpns.transpose((0, 3, 1, 2)), (4, -1)).transpose((1, 0)) 
    probs = np.array(x_class).transpose((0, 3, 1, 2)).reshape((-1)) 
    x1 = boxes[:, 0]
    y1 = boxes[:, 1]
    x2 = boxes[:, 2]
    y2 = boxes[:, 3]
    
    """ delete boxes that are not existend (where x_max/y_max is lower than x_min/y_min) """
    to_del = np.where((x1 - x2 >= 0) | (y1 - y2 >= 0))
    boxes = np.delete(boxes, to_del, 0)
    probs = np.delete(probs, to_del, 0)

    """ 
    apply non_max_suppression in order to create a choice for boxes based on 
    the threshold for IoU (threshold for max overlap of the bounding boxes) and
    the score which imply the respective objectness scores
    """
    indices = tf.image.non_max_suppression(boxes, probs, max_boxes, iou_threshold=0.5, score_threshold=0.4)
    regions = [boxes[i] for i in indices]
    
    
    return np.array(regions)



In [9]:
def train_step(model, inp, target, bbox, loss_function_class, loss_function_regr, optimizer):
    """
    method to be executed each train step, for each image / bbox combination

    Parameters:
      model: model to be trained 
      inp: input image 
      target: label for the image with respective bounding box
      bbox: bounding box coordinates for that specific image
      loss_function_class: loss function for the classification task
      loss_function_class: loss function for the regression task
      optimizer: optimizer to be used for training

    Returns:
      loss: respective calculated loss
      pred_class: prediction for the class of the image + bounding box
      pred_regr: prediction for the bounding box itself
    """
    with tf.GradientTape() as tape:
        pred_class, pred_regr = model(inp)

        loss_class = loss_function_class(target, pred_class)
        loss_regr = loss_function_regr(bbox, pred_regr)
        loss = loss_class  + loss_regr * 0.1
        
        gradients = tape.gradient(loss, model.trainable_variables, unconnected_gradients=tf.UnconnectedGradients.ZERO)
        
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    return loss, pred_class, pred_regr
  
def test(model, test_data, loss_func_regr, loss_func_class):
    """
    method to test the trained model on the never seen test dataset
    Parameters:
      model: trained model
      test_data: testing dataset
      loss_func_class: loss function used for training the classification
      loss_func_regr : loss function used for training the regression
    
    Returns:
      test_loss: identified loss for the test dataset
    """

    test_loss_aggregator = []

    for (input, target, box) in test_data:
      pred_class, pred_regr = model(input)

      test_loss_regr = loss_func_regr(box, pred_regr)
      test_loss_class = loss_func_class(target, pred_class)
      sample_test_loss = test_loss_regr * 0.1 + test_loss_class

      test_loss_aggregator.append(sample_test_loss.numpy())

    test_loss = tf.reduce_mean(test_loss_aggregator)

    return test_loss

def plot_bbox(img, box, label):
    """
    method to plot a specific bounding box on the respective image 
    
    Parameters:
        img: original image 
        box: bounding box 
    """
    
    x1, y1, x2, y2 = box
    
    if label == 0:
        col= (0,0,255)
    elif label == 1:
        col= (0,255,0)
    elif label == 2:
        col= (255,0,0)
    
    cv2.rectangle(img, (int(x1), int(y1)), (int(x2), int(y2)), col, 1)

    cv2.imshow("bbox_test", img/255.0)
    cv2.waitKey()  
    cv2.destroyAllWindows()


In [None]:
tf.keras.backend.clear_session()

""" defining parameters for training """

num_epochs = 50
optimizer = tf.keras.optimizers.Adam()
loss_func_class = tf.keras.losses.CategoricalCrossentropy()
loss_func_regr = tf.keras.losses.Huber()
model = FasterRCNN()

train_losses = []
results = []

for epoch in range(num_epochs):   
    results.append([])
    epoch_loss = []
    i=0
    for img_data in ds_train:
        img, label, box = img_data 
        loss_curr, pred_class, pred_regr = train_step(model, img, label, box, loss_func_class, loss_func_regr, optimizer)
        epoch_loss.append(loss_curr)
        results[epoch].append((tf.squeeze(pred_regr), tf.squeeze(pred_class)))

    train_losses.append(tf.reduce_mean(epoch_loss))

    print(f'train_loss epoch {epoch}: {tf.reduce_mean(epoch_loss)}\n')

In [None]:
test_loss = test(model, ds_test, loss_func_regr, loss_func_class)

print(f'test_loss: {test_loss}')

In [None]:
""" plotting the before saved training loss """

DATADIR_RESULTS = os.path.join(DATADIR, "results")
train_losses_bbox = np.load(f'/content/drive/MyDrive/Project_Face_Mask/results/train_losses_classif.npy', allow_pickle=True)

plt.xlabel("epoch")
plt.ylabel("combined loss")
plt.plot(train_losses_bbox)

In [None]:
""" 
Plotting the bounding boxes
At first the predicted and after the original

doesn't work in colab, had to execute on local machine in jupyter
"""
epoch = 1

for i, img_data in enumerate(ds_train.take(10)):
    img, label, box = img_data
    plot_bbox(img.numpy(), results[epoch][i][0].numpy(), np.argmax(results[epoch][i][1].numpy(), axis=0))
    plot_bbox(img.numpy(), box.numpy(), label.numpy())