In [1]:
from tensorflow import keras
import numpy as np
from torchvision.transforms import transforms
from PIL import Image
import matplotlib.pyplot as plt
from matplotlib import cm
import uuid
import cv2
from imutils import face_utils
import dlib
import time

In [250]:
colors = [(0,0,255),(255,150,40),(0,255,0),(255,128,255),(0,255,255),
          (255,128,128),(0,128,255),(128,0,255),(255,255,128),(192,128,255),(128,255,128)]
class FaceServiceAssigment:
    def __init__(self, model, thresh=100, dist=100):
        self.thresh = thresh
        self.model = model
        self.dist = dist
        self.faces = []

    def predict(self, img1, img2):
        img1 = Image.fromarray(np.uint8(cm.gist_earth(img1) * 255))
        img2 = Image.fromarray(np.uint8(cm.gist_earth(img2) * 255))

        m1 = img1.convert('L')
        m2 = img2.convert('L')
        trans = transforms.Resize((28, 28))
        m1 = trans(m1)
        m2 = trans(m2)

        m1 = np.array(m1)
        m2 = np.array(m2)
        m1 = np.expand_dims(m1, axis=0)
        m2 = np.expand_dims(m2, axis=0)
        pred = self.model.predict([m1, m2])
        return pred

    
    def getRandomColor(self,i):
        if i<len(colors):
            return colors[i]
        else:
            return colors[i%10]


    def setFacesFirstFrame(self, faces_roi):
        for i, roi in enumerate(faces_roi):
            self.faces.append({"color": self.getRandomColor(i), "face": roi["face"], 'xyhw': roi["xyhw"],
                               'bit': 0, 'id': i, 'toDraw': True})

    def drawFaces(self, frame):
        font = cv2.FONT_HERSHEY_SIMPLEX
        fontScale = 1
        thicknessText = 2

        for face in self.faces:
            if face["toDraw"]:
                x, y, h, w = face["xyhw"]
                x, y, h, w = int(x)-5, int(y)-5, int(h)+10, int(w)+10
                center = (x + w // 2, y + h // 2)
                frame = cv2.ellipse(frame, (center, (w//2, h//2), 0),face["color"] , 2)
                frame = cv2.putText(frame,'id:'+str(face["id"]),(center[0]-30,center[1]-30), font, fontScale, 
                                    face["color"], thicknessText, cv2.LINE_AA)
        return frame


    def reAssignFaces(self, faces_roi_to_assign, thresh=10):
        try:                
            number_faces_in_store = len(self.faces)
            number_faces_assign = len(faces_roi_to_assign)
            matrix = np.ones([number_faces_assign, number_faces_in_store])

            for i, face_roi in enumerate(faces_roi_to_assign):
                check = True
                for j, face_item in enumerate(self.faces):
                    matrix[i][j] = self.thresh
                    if check:
                        matrix[i][j] = self.predict(face_roi["face"], face_item["face"])
                        if matrix[i][j] < 2:
                            check = False

            # need to find the min each row        
            min_values_per_row = (matrix.argmin(axis=1))

            ## switch faces 
            for i, face_roi in enumerate(faces_roi_to_assign):
                if matrix[i][min_values_per_row[i]] < self.thresh:
                    self.faces[min_values_per_row[i]]["toDraw"] = True
                    x, y, h, w = self.faces[min_values_per_row[i]]["xyhw"]
                    x, y, h, w = int(x), int(y), int(h), int(w)
                    center_old = np.array([x + w // 2, y + h // 2])

                    x, y, h, w = face_roi["xyhw"]
                    x, y, h, w = int(x), int(y), int(h), int(w)
                    center_nw = np.array([x + w // 2, y + h // 2])

                    dist = np.linalg.norm(center_nw - center_old)
                    #print(dist)
                    if dist < self.dist:
                        self.faces[min_values_per_row[i]]["face"] = face_roi["face"]
                        self.faces[min_values_per_row[i]]["xyhw"] = face_roi["xyhw"]
                        self.faces[min_values_per_row[i]]["toDraw"] = True
                        
                else:
#                     print("Score: ",min_values_per_row[i],matrix[i][min_values_per_row[i]])
                    self.faces[min_values_per_row[i]]["toDraw"] = False
                                       
        except Exception as e:
            print(e)
            pass


    def getFaces(self):
        return self.faces


In [3]:
def euclidean_distance(vects):
    x, y = vects
    sum_square = K.sum(K.square(x - y), axis=1, keepdims=True)
    return K.sqrt(K.maximum(sum_square, K.epsilon()))


def eucl_dist_output_shape(shapes):
    shape1, shape2 = shapes
    return shape1[0], 1


def contrastive_loss(y_true, y_pred):
    '''Contrastive loss from Hadsell-et-al.'06
    http://yann.lecun.com/exdb/publis/pdf/hadsell-chopra-lecun-06.pdf
    '''
    margin = 1
    square_pred = K.square(y_pred)
    margin_square = K.square(K.maximum(margin - y_pred, 0))
    return K.mean(y_true * square_pred + (1 - y_true) * margin_square)

In [4]:
def visualize(**images):
        """PLot images in one row."""
        n = len(images)
        plt.figure(figsize=(16, 5))
        for i, (name, image) in enumerate(images.items()):
            plt.subplot(1, n, i + 1)
            plt.xticks([])
            plt.yticks([])
            plt.title(' '.join(name.split('_')).title())
            plt.imshow(image)
        plt.show()

In [5]:
from keras import backend as K
from keras.models import Model

Using TensorFlow backend.


In [7]:
path = "siamese_network/Keras_cnn_model1592994168.1201453"
model = keras.models.load_model(path, custom_objects={'contrastive_loss': contrastive_loss})

In [8]:
model.summary()

Model: "model_216"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_323 (InputLayer)          [(None, 28, 28)]     0                                            
__________________________________________________________________________________________________
input_324 (InputLayer)          [(None, 28, 28)]     0                                            
__________________________________________________________________________________________________
model_215 (Model)               (None, 128)          133504      input_323[0][0]                  
                                                                 input_324[0][0]                  
__________________________________________________________________________________________________
lambda_108 (Lambda)             (None, 1)            0           model_215[1][0]          

In [252]:
def test(path, skip=0, limit=100, thresh=50, dist=100, write=False, minNeighbors=3, thresh_dnn=0.4):
    face_service = FaceServiceAssigment(model, thresh, dist)
    video_path = path
    cap = cv2.VideoCapture(video_path)
    # face_cascade = cv2.CascadeClassifier('./haarcascade_frontalface_alt.xml')
    size_w = 1920
    size_h = 1080
    # network
    modelFile = "siamese_network/content/opencv_face_detector_uint8.pb"
    configFile = "siamese_network/content/opencv_face_detector.pbtxt"
    net = cv2.dnn.readNetFromTensorflow(modelFile, configFile)

    start_frame_number = skip
    cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame_number)
    ret, frame = cap.read()
    frame = cv2.resize(frame, (size_w, size_h))

    print(frame.shape)
    conf_threshold = thresh_dnn
    frameWidth = frame.shape[1]
    frameHeight = frame.shape[0]    

    if write:
        print("in write mode")
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter('output_' + str(time.time()) + '.mp4', fourcc, 20.0, (frameWidth, frameHeight))

    frame_skip = skip
    frame_index = 0

    while True:
        if frame_index % 2 == 0:
            # Capture frame-by-frame
            ret, frame = cap.read()
            if ret is None or frame is None:
                break
            frame = cv2.resize(frame, (size_w, size_h))
            frame = cv2.UMat(frame)

            # Our operations on the frame come here
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            frame_gray = cv2.equalizeHist(gray)
            # Detect faces           
            blob = cv2.dnn.blobFromImage(frame, 1.0, (300, 300), [104, 117, 123], False, False)
            net.setInput(blob)
            detections = net.forward()
            faces = []
            for i in range(detections.shape[2]):
                confidence = detections[0, 0, i, 2]
                if confidence > conf_threshold:
                    x1 = int(detections[0, 0, i, 3] * frameWidth)
                    y1 = int(detections[0, 0, i, 4] * frameHeight)
                    x2 = int(detections[0, 0, i, 5] * frameWidth)
                    y2 = int(detections[0, 0, i, 6] * frameHeight)
                    w = x2 - x1
                    h = y2 - y1
                    faces.append((x1, y1, w, h))

            faces_frames = []
            for (x, y, w, h) in faces:
                faceROI = frame_gray.get()[y:y + h, x:x + w]
                faces_frames.append({'face': faceROI, 'xyhw': (x, y, w, h)})
#                 visualize(face = faceROI)
            if frame_index == 0:
                face_service.setFacesFirstFrame(faces_frames)
                print("Detect in first frame", len(faces_frames))
            else:
                if len(faces_frames) != 0:
                    face_service.reAssignFaces(faces_frames, thresh)

            frame = face_service.drawFaces(frame)
            cv2.imshow('frame', frame)
            
            if write:
                image = frame.get()
                image = cv2.resize(image, (size_w, size_h))
                out.write(image)

            if frame_index > limit:
                break

            cv2.waitKey(10)

            # Display the resulting frame
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
        frame_index += 1

    # When everything is done, release the capture
    cap.release()
    
    if write:
        print("in write mode close")
        out.release()

    cv2.destroyAllWindows()
    

    print("frame_index:", frame_index)

In [10]:
# path = "siamese_network/content/Walking.mp4"
path = "siamese_network/content/videoblocks.mp4"

In [253]:
mins = 0
sec = 0

skip = int(60 * 29 * mins) + int(29 * sec)
%time test(path, skip=skip, limit=275, thresh=100, dist=140, write=True, minNeighbors=3, thresh_dnn=0.164)

(1080, 1920, 3)
in write mode
Detect in first frame 9
Score:  7 100.66983795166016
Score:  5 132.9446258544922
in write mode close
frame_index: 276
Wall time: 8min 30s
