In [1]:
from math import floor
from os.path import isfile, join
from scipy.ndimage.interpolation import zoom, rotate


import face_recognition
import imageio

import keras.layers import Input, Dense, Flatten, Conv2D, MaxPooling2D, BatchNormalization, Dropout, Reshape, Concatenate, LeakyReLU
import keras.models import Model
import keras.optimizers import Adam
import matplotlib.pyplot as plt
import numpy as np

SyntaxError: invalid syntax (<ipython-input-1-1bfacfb553ec>, line 9)

# Defining MesoNet CNN model

In [None]:
class DFModel:
    def __init__(self, lr):
        self.model = self._configure_model()
        optimizer = Adam(lr = lr)
        self.model.compile(optimizer = optimizer, loss = 'mean_squared_error', metrics = ['accuracy'])

    def _configure_model(self):
        # Input layer
        # Height, Width, # of channels
        X = Input(shape = (256, 256, 3))

        # Each batch below - is convolutional block
        
        # It is a convolutional layer
        # First parameter is a dimensionality of output space
        # Second parameter is a kernel size - size of convolutional window
        # Parameter 'padding' sets whether the dimensionalty of input is the same as of output
        # Parameter 'activation' sets the type of activation function
        # Double parentheses syntax - is specific of Keras model initialization
        X_1 = Conv2D(8, (3,3), padding='same', activation='relu')(x)
        # This layer does a data normalization:
        # Transform data in such a way that mean is near 0 and sd is near 1
        X_1 = BatchNormalization()(X_1)
        # This layer reduces dimensionality of output by taking the max value out of each "pool"
        # Parameter pool_size define the size of pool
        X_1 = MaxPooling2D(pool_size=(2,2), padding='same')(X_1)

        # The same explanation as below is applicable to each code batch below
        X_2 = Conv2D(8, (5,5), padding='same', activation='relu')(X_1)
        X_2 = BatchNormalization()(X_2)
        X_2 = MaxPooling2D(pool_size=(2,2), padding='same')(X_2)


        X_3 = Conv2D(16, (5,5), padding='same', activation='relu')(X_2)
        X_3 = BatchNormalization()(X_3)
        X_3 = MaxPooling2D(pool_size=(2,2), padding='same')(X_3)

        X_4 = Conv2D(16, (5,5), padding='same', activation='relu')(X_3)
        X_4 = BatchNormalization()(X_4)
        X_4 = MaxPooling2D(pool_size=(2,2), padding='same')(X_4)

        y = Flatten()(x4)
        y = Dropout(0.5)(y)
        y = Dense(16)(y)
        y = LeakyReLu(alpha=0.1)(y)
        y = Dropout(0.5)(y)
        y = Dense(1, activation='sigmoid')(y)

        return Model(inputs=x, outputs=y)

# To avoid processing all video frame, we can reduce it only to the face area.

## Here we introduce the wrappers for entites

### Video

In [None]:
class VideoWrapper:
    def __init__(self, path):
        self.container = imageio.get_reader(path, 'ffmpeg')
        self.length = self.container.count_frames()
        self.fps = self.container.get_meta_data()['fps']
        self.path = path
        
    def __call__(self, key):
        return self.get(key)
    
    def __len__(self):
        return self.length
    
    def init_head(self):
        self.container.set_image_index(0)
    
    def next_frame(self):
        self.container.get_next_data()
    
    def get(self, key):
        return self.container.get_data(key)

### Face

In [None]:
class FaceWrapper(VideoWrapper):
    def __init__(self, path, first_face = True):
        super().__init__(path)
        self.face_locations = {}
        # Each face has such coords: (center, length, rotation)
        self.coordinates = {}
        
        self.last_frame = self.get(0)
        self.frame_shape = self.last_frame.shape[:2]
        self.last_location = (0, 200, 200, 0)
        
        if first_face:
            face_positions = face_recognition.face_locations(self.last_frame, sample_freq = 2)
            if len(face_positions) > 0:
                self.last_location = face_positions[0]
    
    @staticmethod
    def pop_largest_location(coords_list):
        max_location = coords_list[0]
        max_size = 0
        if len(coords_list) > 1:
            for coord in coords_list:
                size = coord[2] - coord[0]
                if size > max_size:
                    max_size = size
                    max_location = coord
        return max_location
    
    @staticmethod
    def upsample_location(reduced_location, upsampled_origin, factor):
        y0, x1, y1, x0 = reduced_location
        Y0 = np.round(upsampled_origin[0] + y0 * factor)
        X1 = np.round(upsampled_origin[1] + x1 * factor)
        Y1 = np.round(upsampled_origin[0] + y1 * factor)
        X0 = np.round(upsampled_origin[1] + x0 * factor)
        return (Y0, X1, Y1, X0)
    
    def expand_location_zone(self, coords, margin = 0.2):
        # Expanding face area by the margin value
        offset = np.round(margin * (loc[2] - loc[0]))
        y0 = np.max(loc[0] - offset, 0)
        x1 = np.min(loc[1] + offset, self.frame_shape[1])
        y1 = np.min(loc[2] + offset, self.frame_shape[0])
        x0 = np.max(loc[3] - offset, 0)
        return (y0, x1, y1, x0)
    
    @staticmethod
    def L2(A, B):
        return np.sqrt(np.sum(np.square(A - B)))
    
    def get_face(self, f):
        frame = self.get(f)
        if i in self.faces:
            loc = self.faces[i]
            patch = frame[loc[0]:loc[2], loc[3]:loc[1]]
            
            return patch
        return frame
    
    @staticmethod
    def get_image_slice(img, y0, y1, x0, x1):
        m, n = img.shape[:2]
        padding = max(-y0, y1-m, -x0, x1-n, 0)
        padded_img = np.pad(img, ((padding, padding), (padding, padding), (0, 0)), 'reflect')
        return padded_img[(padding + y0):(padding + y1),
                        (padding + x0):(padding + x1)]
    
    def find_coordinates(self, landmark, K=2.2):
        # Process face features
        eye_1 = np.mean(landmark['left_eye'], axis=0)
        eye_2 = np.mean(landmark['right_eye'], axis=0)
        
        eyes = (eye_1 + eye_2) / 2
        
        nose = np.mean(landmark['nose_tip'], axis=0) / 2 + np.mean(landmark['nose_bridge'], axis=0) / 2
        
        top_lip = np.mean(landmark['top_lip'], axis=0)
        bot_lip = np.mean(landmark['bottom_lip'], axis=0)
        lips = (top_lip + bot_lip) / 2
        
        c = nose
        eyes_l2 = self.L2(eye_1, eye_2)
        face_l2 = self.L2(lips, eyes)
        
        center = np.max(eyes_l2, face_l2) * K
        
        if lips[1] == eyes[1]:
            if B[0] > E[0]:
                rot = 90
            else:
                rot = -90
        else:
            rot = np.arctan((lips[0] - eyes[0]) / (lips[1] - eyes[1])) / np.pi * 180
                 # Center                              length           rotation
        return ( (floor(center[1]), floor(center[0])), floor(distance), rot)
    
    # Main function in face processing    
    def localize_face(self, resize = 0.5, stop = 0, skipstep = 0, no_face_acceleration_threshold = 3, cut_left = 0, cut_right = -1, use_frameset = False, frameset = []):
        not_found = 0
        no_face = 0
        # Face acceleration
        no_face_acc = 0
        
        # Using predefined frameset
        if (use_frameset):
            finder_frameset = frameset
        else:
            if (stop != 0):
                finder_frameset = range(0, min(self.length, stop), skipstep + 1)
            else:
                finder_frameset = range(0, self.length, skipstep + 1)
        
        for i in finder_frameset:
            frame = self.get(i)
            # Cutting face out of frame
            if (cut_left != 0 or cut_right != -1):
                frame[:, :cut_left] = 0
                frame[:, cut_right:] = 0
                
            # Start looking in the area from previous step
            potential_location = self.expand_location_zone(self.last_location)
            potential_face_patch = frame[potential_location[0]:potential_location[2], potential_location[3]:potential_location[1]]
            # left upper corner
            potential_face_patch_origin = (potential_location[0], potential_location[3])
            
            reduced_potential_face_patch = zoom(potential_face_patch, (resize, resize, 1))
            reduced_face_locations = face_recognition.face_locations(reduced_potential_face_patch, model = 'cnn')
            
            # If face is found
            if len(reduced_face_locations) > 0:
                no_face_acc = 0

                reduced_face_location = self.pop_largest_location(reduced_face_locations)
                face_location = self.upsample_location(reduced_face_location, potential_face_patch_origin, 1 / resize)
                
                # Add new defined face
                self.faces[i] = face_location
                # Update last face location
                self.last_location = face_location
                
                # Extract face coords (rotation, length and center) from landmarks
                landmarks = face_recognition.face_landmarks(frame, [face_location])
                
                if len(landmarks) > 0:
                    # we assume that there is one and only one landmark group
                    self.coordinates[i] = self.find_coordinates(landmarks[0])
            # If face is not found
            else:
                not_found += 1
                
                if no_face_acc < no_face_acceleration_threshold:
                    # Look over frame for face 
                    face_locations = face_recognition.face_locations(frame, number_of_times_to_upsample = 2)
                else:
                    # Avoid spending to much time on a long scene without faces
                    reduced_frame = zoom(frame, (resize, resize, 1))
                    face_locations = face_recognition.face_locations(reduced_frame)
                
                if len(face_locations) > 0:
                    no_face_acc = 0
                    
                    face_location = self.pop_largest_location(face_locations)
                    
                    # Upsample location
                    if no_face_acc > no_face_acceleration_threshold:
                        face_location = self.upsample_location(face_location, (0, 0), 1 / resize)
                    
                    # Add new defined face
                    self.faces[i] = face_location
                    # Update last face location
                    self.last_location = face_location
                    
                    # Extract face coords (rotation, length and center) from landmarks
                    landmarks = face_recognition.face_landmarks(frame, [face_location])
                    if len(landmarks) > 0:
                        self.coordinates[i] = self.find_coordinates(landmarks[0])
                        
                else:
                    print('Face extraction warning : ',i, '- no face')
                    no_face_acc += 1
                    no_face += 1
                
        return 0
    
    def get_aligned_face(self, i, l_factor = 1.3):
        
        frame = self.get(i)
        
        if i in self.coordinates:
            # center, length, rotation
            c, l, r = self.coordinates[i]
            l = int(l) * l_factor
            dl_ = floor(np.sqrt(2) * l / 2)
            patch = self.get_image_slice(frame,
                                    floor(c[0] - dl_),
                                    floor(c[0] + dl_),
                                    floor(c[1] - dl_),
                                    floor(c[1] + dl_))
            rotated_patch = rotate(patch, -r, reshape=False)
            # note : dl_ is the center of the patch of length 2dl_
            return self.get_image_slice(rotated_patch,
                                    floor(dl_-l//2),
                                    floor(dl_+l//2),
                                    floor(dl_-l//2),
                                    floor(dl_+l//2))
        return frame

### FaceBatchGenerator

In [None]:
# Model of subset of frame for video
class FaceBatchGenerator:
    
    def __init__(self, face_finder, target_size = 256):
        self.finder = face_finder
        self.target_size = target_size
        self.head = 0
        self.length = int(face_finder.length)
        
    def resize_patch(self, patch):
        m, n = patch.shape[:2]
        return zoom(patch, (self.target_size / m, self.target_size / n, 1))
    
    def next_batch(self, batch_size=50):
        batch = np.zeros((1, self.target_size, self.target_size, 3))
        stop = np.min(self.head + batch_size, self.length)
        i = 0
        while (i < batch_size) and (self.head < self.length):
            if self.head in self.finder.coordinates:
                patch = self.finder.get_aligned_face(self.head)
                batch = np.concatenate((batch, np.expand_dims(self.resize_patch(patch), axis = 0)),
                                        axis = 0)
                i += 1
            self.head += 1
        return batch[1:]

### Face predictions

In [None]:
def predict_faces(generator, classifier, batch_size = 50, output_size = 1):
    n = len(generator.finder.coordinates.items())
    profile = np.zeros((1, output_size))
    for epoch in range(n // batch_size + 1):
        face_batch = generator.next_batch(batch_size = batch_size)
        prediction = classifier.predict(face_batch)
        if (len(prediction) > 0):
            profile = np.concatenate((profile, prediction))
    return profile[1:]

### Model running, face extraction and prediction

In [None]:
def run_model(classifier, dirname, frame_subsample_count = 30):
    file_names = ''
    predictions = {}
    
    for video_file in filenames:
        path = join(dirname, vid)
        face_finder = FaceWrapper(path, load_first_face = False)
        step = max(floor(face_finder.length / frame_subsample_count), 0)
        face_finder.find_faces(resize=0.5, skipstep = skipstep)
        
        gen = FaceBatchGenerator(face_finder)
        pred = predict_faces(gen, classifier)
        
        predictions[vid[:-4]] = (np.mean(pred > 0.5), p)
    
    return predictions

In [None]:
import face_recognition
import cv2
import time

input_video = cv2.VideoCapture('video_1_raw.mp4')

fps = int(input_video.get(cv2.CAP_PROP_FPS))
frame_count = int(input_video.get(cv2.CAP_PROP_FRAME_COUNT))
frame_width = int(input_video.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(input_video.get(cv2.CAP_PROP_FRAME_HEIGHT))

print(fps)
print(frame_count)
print(frame_width)
print(frame_height)

codec = cv2.VideoWriter.fourcc(*'XVID')
video_writer = cv2.VideoWriter('video_1_processed.mp4', codec,fps, (frame_width, frame_height))

face_locations = []

count = 0
percentage_of_frames = 4
start = time.time()
while (True):
    ret, frame = input_video.read()
    if count % percentage_of_frames == 0:
        if not ret:
            print("Video ended!")
            break

        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)

        face_locations = face_recognition.face_locations(rgb_frame, model='cnn')

        for top, right, bottom, left in face_locations:
            cv2.rectangle(frame, (left, top), (right, bottom), (0, 0, 225), 2)

        video_writer.write(frame)

        print('Processed ', count%percentage_of_frames, ' frames')

    count += 1

print('Result:', count)
print('Taken time: ', (time.time() - start) % 60, ' minutes')

input_video.release()
video_writer.release()
cv2.destroyAllWindows()
