In [1]:
import numpy as np
import cv2
import tensorflow as tf
import time
import sys
import os
import shutil
import matplotlib.pyplot as plt
from tensorflow import keras
import keras.backend as K
from keras.layers import Lambda,Dense,Flatten,Conv2D,MaxPool2D,Input,Embedding
from keras.preprocessing.image import ImageDataGenerator,array_to_img,img_to_array,load_img
from keras.utils import np_utils
from keras.models import load_model
from keras.optimizers import RMSprop,SGD
from keras.losses import binary_crossentropy
from keras.models import Sequential,Model
from imutils import paths
from PIL import Image
from numpy import asarray,savez_compressed,load,expand_dims
from mtcnn.mtcnn import MTCNN
from vidgear.gears import VideoGear
from os import listdir
from os.path import isdir
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import LabelEncoder,Normalizer
from sklearn.svm import SVC
from pathlib import Path

  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])
Using TensorFlow backend.


In [2]:
def show_image(image):
    cv2.imshow('image', image)
    while(1):
        if cv2.waitKey(10)&0xFF==27:
            break
    cv2.destroyAllWindows()

In [3]:
def extract_face(path,image,required_size=(160, 160)):
    if path!='':
        image=Image.open(path)
    #image=image.convert('RGB')
    pixels=asarray(image)
    detector=MTCNN()
    results=detector.detect_faces(pixels)
    face_array=[]
    for i in range(len(results)): # multiple faces\n",
        x1,y1,width,height=results[i]['box']
        x1,y1=abs(x1),abs(y1)
        x2,y2=x1+width,y1+height
        face=pixels[y1:y2,x1:x2]
        image=Image.fromarray(face)
        image=image.resize(required_size)
        face_array.append(asarray(image))
    return face_array

In [4]:
def load_faces(directory):
    faces=list()
    for filename in listdir(directory):
        path=directory+filename
        face=extract_face(path,image='')
        faces.extend(face)
    return faces

In [5]:
def load_dataset(directory):
    X,y=list(),list()
    if directory[-1]!='/':
        directory+='/'
    for subdir in listdir(directory):
        path=directory+subdir+'/'
        if not isdir(path):
            continue
        faces=load_faces(path)
        labels=[subdir for _ in range(len(faces))]
        X.extend(faces)
        y.extend(labels)
    return np.asarray(X),np.asarray(y)

In [6]:
def get_embedding(model,face_pixels):
    face_pixels=face_pixels.astype('float32')
    mean,std=face_pixels.mean(),face_pixels.std()
    face_pixels=(face_pixels-mean)/std
    samples=expand_dims(face_pixels,axis=0)
    yhat=model.predict(samples)
    return yhat[0]

In [7]:
def newDataSet(dataset,model):
    newData=list()
    for face_pixels in dataset:
        embedding=get_embedding(model, face_pixels)
        newData.append(embedding)
    newData=asarray(newData)
    return newData

In [8]:
def generate_train_images(folder_from,folder_to):
    for filename in listdir(folder_from):
        path=os.path.join(folder_from,filename)
        datagen=ImageDataGenerator(rotation_range=30,width_shift_range=0.2,height_shift_range=0.2,shear_range=0.2,
                                   zoom_range=0.2,horizontal_flip=True)
        image=cv2.imread(path)
        x=img_to_array(image)/255.
        x=x.reshape((1,)+x.shape)
        count=1
        file=filename.split('.')
        folder_in=os.path.join(folder_to,file[0])
        try:
            if os.path.exists(folder_in): # creating seperate folders for training images
                os.remove(folder_in)
            os.makedirs(folder_in)
        except OSError:
            pass
        for batch in datagen.flow(x,batch_size=1):
            image_name=file[0]+str(count)+"."+file[1]
            cv2.imwrite(folder_in+'/'+image_name,image)
            if count==20:
                break
            count+=1

In [9]:
def contrastive_loss(y_true,y_pred):
    return K.mean((y_true*K.square(y_pred))+((1-y_true)*K.square(K.maximum(1-y_pred,0))))

In [10]:
facenet_model=load_model('C:/Users/abc/Desktop/Face recognition/FaceNet/model/facenet_keras.h5')








Instructions for updating:
Please use `rate` instead of `keep_prob`. Rate should be set to `rate = 1 - keep_prob`.




In [11]:
def siameseModel(required_size):
    left=Input(required_size)
    right=Input(required_size)
    l1_distance_layer=Lambda(lambda tensors:K.abs(tensors[0]-tensors[1]))
    l1_distance=l1_distance_layer([left,right])
    prediction=Dense(units=1,activation='sigmoid')(l1_distance)
    model=Model(inputs=[left,right],outputs=prediction)
    return model

In [12]:
curr_dir='/'.join(os.getcwd().split('\\'))
trainImagePath=curr_dir+"/train_images"
testImagePath=curr_dir+"/test_images"
mainVideoPath='C:/Users/abc/Pictures/Camera Roll/'
videoPath=mainVideoPath+'WIN_20190926_17_34_42_Pro.mp4'
inputImagePath=curr_dir+'/image'
facenet_path=curr_dir+'/facenet'
facenet_model_path=facenet_path+'/model'
facenet_weights_path=facenet_path+'/weights'
framerate=1

In [13]:
generate_train_images(inputImagePath,trainImagePath)

In [14]:
x_single,y_single=list(),list()
directory=inputImagePath+'/'
for filename in listdir(directory):
    path=directory+filename
    face=extract_face(path,image='')
    y_single.append(filename)
    x_single.append(face[0])

Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
Instructions for updating:
Deprecated in favor of operator or tf.math.divide.



In [15]:
x,y=load_dataset(trainImagePath)
x_single_embedding=newDataSet(x_single,facenet_model)
x_embedding=newDataSet(x,facenet_model)

In [16]:
x_train1,x_train2,y_train=[],[],[]
h=0
for i in range(0,len(x)-1):
    for j in range(i+1,len(x)):
        x_train1.append(x_embedding[i])
        x_train2.append(x_embedding[j])
        if y[i]==y[j]:
            y_train.append(1)
        else:
            y_train.append(0)

In [17]:
x_train=[x_train1,x_train2]

In [18]:
savez_compressed(curr_dir+'/face_dataset.npz',x_train,y_train)

In [19]:
data=load(curr_dir+'/face_dataset.npz')
trainX,trainy=data['arr_0'],data['arr_1']

In [20]:
trainX=list(trainX)

In [21]:
in_encoder=Normalizer(norm='l2')
# trainX=in_encoder.transform(trainX)
# testX=in_encoder.transform(testX)

In [22]:
out_encoder=LabelEncoder()
out_encoder.fit(trainy)
trainy=out_encoder.transform(trainy)
# testy=out_encoder.transform(testy)

In [23]:
model=siameseModel((128,))
model.compile(loss='binary_crossentropy',optimizer='RMSProp',metrics=['acc'])


Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where


In [24]:
model.summary()

Model: "model_1"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            (None, 128)          0                                            
__________________________________________________________________________________________________
input_2 (InputLayer)            (None, 128)          0                                            
__________________________________________________________________________________________________
lambda_1 (Lambda)               (None, 128)          0           input_1[0][0]                    
                                                                 input_2[0][0]                    
__________________________________________________________________________________________________
dense_1 (Dense)                 (None, 1)            129         lambda_1[0][0]             

In [25]:
model.fit(trainX,trainy,epochs=10,verbose=1)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x1f833691e48>

In [26]:
yhat_train=model.predict(trainX)
yhat_train=list(yhat_train)

In [27]:
def getFrame(videoObject,sec,count,imagePath):
    videoObject.set(cv2.CAP_PROP_POS_MSEC,sec*1000)
    hasFrames,image=videoObject.read()
    return hasFrames,image

In [64]:
def captureFrame(videoPath,framerate,imagePath,model,x_single_embedding):
    videoObject=cv2.VideoCapture(videoPath)
    sec=0.5
    count=1
    success,image=getFrame(videoObject=videoObject,sec=sec,count=count,imagePath=imagePath)
    while success==1:
        face_array=extract_face('',image)
        for faces in face_array:
            right=get_embedding(facenet_model,faces)
            for left in x_single_embedding:
                x_tr=[[left],[right]]
                out=model.predict(x=x_tr)
                print(out)
                if out>=0.5:
                    print("Found at",sec)
        sec+=framerate
        sec=round(sec,2)
        count+=1
        success,image=getFrame(videoObject=videoObject,sec=sec,count=count,imagePath=imagePath)
    videoObject.release()

In [65]:
captureFrame(videoPath=videoPath,framerate=1,imagePath='',model=model,x_single_embedding=x_single_embedding)

[[0.]]
[[0.]]
[[0.]]
[[0.00040037]]
[[0.]]
[[0.]]
[[0.]]
[[0.00163693]]
[[0.]]
[[0.]]
[[0.]]
[[7.8460784e-05]]
[[0.]]
[[0.]]
[[0.]]
[[2.24927e-05]]
[[0.]]
[[0.]]
[[0.]]
[[9.2287795e-05]]
[[0.]]
[[0.]]
[[0.]]
[[4.498604e-05]]
[[0.]]
[[0.]]
[[0.]]
[[8.082408e-05]]
[[0.]]
[[0.]]
[[0.]]
[[0.00032784]]
[[0.]]
[[2.9824264e-07]]
[[0.]]
[[0.]]
[[2.861003e-07]]
[[3.5420777e-05]]
[[1.6620646e-07]]
[[0.]]
[[4.2577702e-07]]
[[1.4935746e-05]]
[[1.5633022e-07]]
[[0.]]
[[0.]]
[[2.920805e-07]]
[[0.]]
[[0.]]
[[6.9441404e-07]]
[[0.]]
[[0.]]
[[1.5421207e-08]]
[[0.]]
[[0.]]
[[0.]]
[[4.3699434e-05]]
[[0.0001038]]
[[3.3522593e-08]]
[[0.]]
[[0.]]
[[0.0002471]]
[[6.300622e-08]]
[[0.]]
[[0.]]


In [None]:
# results=model.predict_proba(testX)
# yhat_test=model.predict(testX)
score_train=accuracy_score(trainy,yhat_train)
print('Accuracy: train=%.3f'%score_train*100)
# score_test=accuracy_score(testy,yhat_test)
# print('Accuracy: train=%.3f, test=%.3f'%(score_train*100,score_test*100))
print('Accuracy: train=%.3f'%score_train*100)
# yhat_test=out_encoder.inverse_transform(yhat_test)
# yhat_test
# results