<a href="https://colab.research.google.com/github/frapasti/Human-Hands/blob/main/BatchGenerator.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install natsort
import os
from natsort import natsorted
from keras.preprocessing.image import load_img
from keras.preprocessing.image import img_to_array
import cv2
import imutils
from google.colab.patches import cv2_imshow
import tensorflow as tf
import matplotlib.pyplot as plt
import numpy as np
from math import trunc

from keras.models import Sequential 

from keras.layers.core import Dense, Dropout, Activation 
from keras.utils import np_utils
from keras.layers import LeakyReLU   
from keras.regularizers import l2  
from tensorflow import keras
from keras.models import Model                   

def listdir_fullpath(d):
    return [os.path.join(d, f) for f in os.listdir(d)]

from google.colab import drive
drive.mount('/content/drive')

!unzip drive/My\ Drive/DataSet/egoHands.zip

In [12]:
#Get from the unzipped dataset the list of all the paths of images and txt files
#listdir returns unsorted so use natsorted method!
bboxes_paths = natsorted(listdir_fullpath('/content/egoHands/boundingboxes'))
img_paths = natsorted(listdir_fullpath('/content/egoHands/frames'))

assert len(bboxes_paths) == len(img_paths), f"Number of bboxes_paths doesn't correspond with number of images!"

#Read all the text files and create a list of list of bounding boxes, one list per image 
bboxes = []*len(bboxes_paths)
for path in bboxes_paths:
  bboxes_it = []
  file = open(path, 'r')
  Lines = file.readlines()
  for line in Lines:
    bboxes_it.append([int(x) for x in line.split(',')])
  bboxes.append(bboxes_it)

In [35]:
import numpy as np

class DataGenerator(tf.keras.utils.Sequence):
    'Generates data for Keras'
    def __init__(self, path_list, bboxes_list, batch_size=25, dim=(448,448,3),
                 divisions=7, shuffle=True):
        'Initialization'
        self.dim = dim
        self.batch_size = batch_size
        self.path_list = path_list
        self.S = divisions
        self.bboxes_list = bboxes_list
        self.shuffle = shuffle
        self.on_epoch_end() #triggered at beginning and end of each epoch
        self.cell_size = dim[0]/divisions

    def __len__(self):
        'Denotes the number of batches per epoch'
        return int(np.floor(len(self.path_list) / self.batch_size))

    def __getitem__(self, index):
      
        'Generate one batch of data'
        # Generate indexes of the batch
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]

        # Generate data
        X, Y = self.__data_generation(indexes)

        return X, Y

    def on_epoch_end(self):
        'Updates indexes after each epoch'
        self.indexes = np.arange(len(self.path_list))
        if self.shuffle == True: # For more robust data
            np.random.shuffle(self.indexes)

    def __data_generation(self, indexes):
        'Generates data containing batch_size samples' # X : (n_samples, *dim, n_channels)
        # Initialization
        X = np.empty((self.batch_size, *self.dim))
        Y = np.empty((self.batch_size,self.S,self.S,5))

        batch_num = 0
        # Generate data
        for i in indexes:
          original_img = load_img(self.path_list[i])
          width, height = original_img.size
          # load the image with the required size and calculate scale factors
          image = load_img(self.path_list[i], target_size=(448, 448))
          scale_w = 448 / width 
          scale_h = 448 / height
          image = img_to_array(image)
          # scale pixel values to [0, 1]
          image = image.astype('float32')
          image /= 255.0
          y_img = np.zeros((self.S,self.S,5))
          for box in self.bboxes_list[i]:
            xleft = int(box[0] * scale_w)
            yleft = int(box[1] * scale_h)
            b_width = int(box[2] * scale_w)
            b_height = int(box[3] * scale_h)
            
            ox = xleft + b_width/2
            oy = yleft + b_height/2
            # Calculate the coordinates of the cell in the grid that contains the center 
            grid_col = trunc(ox/self.cell_size)
            grid_row = trunc(oy/self.cell_size) 
            # Calculate the coordinates of the center of the bbox w.r.t the associated cell; (0,0) top left and (1,1) bottom right corners of the cell
            ox_cell = (ox - (grid_col)*self.cell_size)/self.cell_size
            oy_cell = (oy - (grid_row)*self.cell_size)/self.cell_size
            # Calculate the width and height of the bbox in terms of cell size, a bbox of width 448/S(cell size) will have grid_width = 1
            grid_width = b_width/self.cell_size
            grid_heigth = b_height/self.cell_size
            # Put the results into y; 1 represent the probability of the class
            y = [1,ox_cell,oy_cell,grid_width,grid_heigth]
            y_img[grid_row][grid_col] = y

          # Store sample
          X[batch_num,] = image

          # Store grid
          Y[batch_num,] = y_img
        
          batch_num += 1

        return X, Y

In [56]:
def custom_loss(y_true, y_pred):
  
  # y_true (Batch size, 7, 7, 5)
  # y_pred (Batch size, 7, 7, 5)

  mse = tf.keras.losses.MeanSquaredError(reduction = "sum") # Define the SUM squared error loss
  predictions = tf.reshape(y_pred,(-1,7,7,5)) # The predictions are a tensor, need some reshaping to manipulate it

  exists_box = tf.expand_dims(y_true[...,0], 3) # A box exists if the first entry of the cell is equal to 1 

  #------------#
  #| BOX LOSS |#
  #------------#

  pred_box = exists_box*predictions[...,1:5] #Calculate only loss for the cells that contain a box
  target_box = exists_box*y_true[...,1:5] #Target boxes

  epsilon = tf.fill(tf.shape(pred_box[..., 2:4]), 1e-6) #Needed to avoid divergence of square root derivatives in back propagation

  # width and height are penalyzed using the square root, however predictions can be negative so multiply by sign in order to obtain positive
  # and take absoulte value in the square root 
  wh_pred = tf.math.sign(pred_box[...,3:5]) * tf.math.sqrt(tf.math.abs(pred_box[...,3:5] + epsilon))
  wh_targ = tf.math.sqrt(target_box[...,3:5] + epsilon)

  # Get also centers
  xy_pred = pred_box[...,1:3]
  xy_true = target_box[...,1:3]

  # Concatenate the new xy and wh in order to calculate sum squared root
  final_pred_box = tf.concat([xy_pred,wh_pred], axis = 3)
  final_true_box = tf.concat([xy_true,wh_targ], axis = 3)
  box_loss = mse(tf.reshape(final_pred_box, (-1, tf.shape(final_pred_box)[-1])),tf.reshape(final_true_box, (-1, tf.shape(final_true_box)[-1])))
  

  #---------------#
  #| OBJECT LOSS |#
  #---------------#
  
  # Take only the first entry of each box corresponding to the probability that there's an object
  pred_obj = predictions[...,0:1]
  true_obj = y_true[...,0:1]

  #Calculate object loss as in the paper
  object_loss = mse(tf.reshape(exists_box*pred_obj, (-1, )), tf.reshape(exists_box*true_obj, (-1, )) )

  #------------------#
  #| NO OBJECT LOSS |#
  #------------------#

  # Calculate the loss for cells that don't have objects
  non_exists_box = 1 - exists_box
  no_object_loss = mse(tf.reshape(non_exists_box*pred_obj, (-1, )), tf.reshape(non_exists_box*true_obj, (-1, )))

  #--------------#
  #| FINAL LOSS |#
  #--------------#

  # Penalize more the box loss and less the no object loss   
  total_loss = 5*box_loss + object_loss + 0.5*no_object_loss
  
  return total_loss

In [57]:
tf.keras.backend.clear_session()


yolo = Sequential()
lrelu = LeakyReLU(alpha=0.1)

yolo.add(tf.keras.layers.Conv2D(32, (7, 7), padding="same", activation = lrelu, strides = (1,1), input_shape=(448,448,3), kernel_regularizer=l2(5e-4)))
yolo.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2),strides = (2,2), padding = 'same'))

yolo.add(tf.keras.layers.Conv2D(64, (3, 3), padding="same", strides = (2,2), activation = lrelu, kernel_regularizer=l2(5e-4) ))
yolo.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2),strides = (2,2), padding = 'same'))

yolo.add(tf.keras.layers.Conv2D(64, (1, 1), padding="same", strides = (2,2), activation = lrelu, kernel_regularizer=l2(5e-4)))
yolo.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2),strides = (2,2), padding = 'same'))

yolo.add(tf.keras.layers.Reshape((12544,), input_shape=(14,14,64)))


yolo.add(Dense(245))

yolo.add(tf.keras.layers.Reshape((7,7,5), input_shape=(245,)))


yolo.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 448, 448, 32)      4736      
                                                                 
 max_pooling2d (MaxPooling2D  (None, 224, 224, 32)     0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 112, 112, 64)      18496     
                                                                 
 max_pooling2d_1 (MaxPooling  (None, 56, 56, 64)       0         
 2D)                                                             
                                                                 
 conv2d_2 (Conv2D)           (None, 28, 28, 64)        4160      
                                                                 
 max_pooling2d_2 (MaxPooling  (None, 14, 14, 64)       0

In [58]:
training_generator = DataGenerator(img_paths, bboxes)

In [59]:
# Can also try the standard sum squared loss that penalizes everything equally
# just use : tf.keras.losses.MeanSquaredError(reduction = "sum")
checkpoint_filepath = '/content/drive/MyDrive/CheckPoints'
model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_filepath,save_weights_only=True)
yolo.compile(loss=custom_loss, optimizer='adam', metrics = ['accuracy'])

In [None]:
yolo.fit(x = training_generator, epochs=10, verbose=1, callbacks=[model_checkpoint_callback])

Epoch 1/10