pip install numpy

pip install mxnet-cu100

conda install cudatoolkit=10

pip install opencv-python

pip install Pillow==6.1

In [None]:
import os,
from os.path import isfile, isdir, join
import time
from retinaface import RetinaFace
import cv2
import numpy as np
from PIL import Image,ImageDraw
import math

In [None]:
def mkfolder(path):
    if not os.path.isdir(path):
        os.mkdir(path)
        return False
    else:
        return True

In [None]:
def detect_face(img,detector):
    thresh = 0.8
    scales = [1024, 1980]

    im_shape = img.shape
    target_size = scales[0]
    max_size = scales[1]
    im_size_min = np.min(im_shape[0:2])
    im_size_max = np.max(im_shape[0:2])
    im_scale = float(target_size) / float(im_size_min)

    if np.round(im_scale * im_size_max) > max_size:
        im_scale = float(max_size) / float(im_size_max)
    
    scales = [im_scale]
    flip = False

    faces, landmarks = detector.detect(img, thresh, scales=scales, do_flip=flip)
    #print 'faces.shape:', faces.shape
    #print 'landmarks.shape:', landmarks.shape
    
    return faces, landmarks

In [None]:
def visualize_landmark5(img,landmark5):
    for l in range(landmark5.shape[0]):
        color = (0,0,255)
        if l==0 or l==3:
            color = (0,255,0)
        img = cv2.circle(img, (landmark5[l][0], landmark5[l][1]), 1, color, 2)
    return img

In [None]:
def align_face(img,landmark5):
    left_eye = landmark5[0]
    right_eye = landmark5[1]
    dy = right_eye[1] - left_eye[1]
    dx = right_eye[0] - left_eye[0]
    angle = math.atan2(dy, dx) * 180. / math.pi
    eye_center = ((left_eye[0] + right_eye[0]) // 2,(left_eye[1] + right_eye[1]) // 2)
    
    rotate_matrix = cv2.getRotationMatrix2D(eye_center, angle, scale=1)
    rotated_img = cv2.warpAffine(img, rotate_matrix, (img.shape[1], img.shape[0]))
    
    return rotated_img, eye_center, angle

In [None]:
def rotate(origin, point, angle, row):
    x1, y1 = point
    x2, y2 = origin
    y1 = row - y1
    y2 = row - y2
    angle = math.radians(angle)
    x = x2 + math.cos(angle) * (x1 - x2) - math.sin(angle) * (y1 - y2)
    y = y2 + math.sin(angle) * (x1 - x2) + math.cos(angle) * (y1 - y2)
    y = row - y
    return int(x), int(y)

In [None]:
def rotate_landmark5(landmark5, eye_center, angle, row):
    rotated_landmark5 = []
    for l in range(landmark5.shape[0]):
        rotated_landmark = rotate(eye_center, landmark5[l], angle, row)
        rotated_landmark5.append(rotated_landmark)
    rotated_landmark5 = np.array(rotated_landmark5)
    return rotated_landmark5

In [None]:
def corp_face(img, size, landmark5):
    x_min = np.min(landmark5, axis=0)[0]
    x_max = np.max(landmark5, axis=0)[0]
    x_center = (x_max - x_min) / 2 + x_min
    #left, right = (x_center - size / 2, x_center + size / 2)
    
    eye_landmark = []
    eye_landmark.append(landmark5[0])
    eye_landmark.append(landmark5[1])
    eye_landmark = np.array(eye_landmark)
    eye_center = np.mean(eye_landmark, axis=0)
    
    lip_landmark = []
    lip_landmark.append(landmark5[3])
    lip_landmark.append(landmark5[4])
    lip_landmark = np.array(lip_landmark)
    lip_center = np.mean(lip_landmark, axis=0)

    y_center = eye_center[1] + ((lip_center[1] - eye_center[1])/2)
    #mid_part = lip_center[1] - eye_center[1]
    top, bottom = y_center - 150 , y_center + 150
    left, right = x_center - 150, x_center + 150
    
    pil_img = Image.fromarray(img)
    left, top, right, bottom = [int(i) for i in [left, top, right, bottom]]
    cropped_img = pil_img.crop((left, top, right, bottom))
    cropped_img = np.array(cropped_img)
    return cropped_img, left, top

In [None]:
def transfer_landmark5(landmark5, left,top):
    transferred_landmark5 = []
    for l in range(landmark5.shape[0]):
        transferred_landmark = (landmark5[l][0] - left, landmark5[l][1] - top)
        transferred_landmark5.append(transferred_landmark)
    transferred_landmark5 = np.array( transferred_landmark5)
    return transferred_landmark5

In [None]:
def processing_video(write_path, frame_name, frame_count, img, detector):
    faces, landmarks = detect_face(img, detector)

    if landmarks is None:
        print (frame_name, 'The landmarks have not found.')
        return False
    else:
        #print 'find', landmarks.shape[0], 'faces'
        for l in range(landmarks.shape[0]):
            landmark5 = landmarks[l].astype(np.int)
            aligned_img, eye_center, angle = align_face(img, landmark5)
            rotated_landmark5 = rotate_landmark5(landmark5, eye_center, angle, img.shape[0]) 
            cropped_img, left, top = corp_face(aligned_img, 300, rotated_landmark5)
            transferred_landmark5 = transfer_landmark5(rotated_landmark5, left,top)
            #filename = write_path + frame_name[:-4] + '_' + str(l+1) + '.jpg'
            filename = write_path + frame_name[:-4] + '_00' +format(str(frame_count), '0>3s') + 'f' + str(l+1) + '.jpg'
            cv2.imwrite(filename, cropped_img)
        return True

In [None]:
# training
#read_path = '/media/islab/disk2/dfdc_train_test/dfdc_train_video/'
#write_path = '/media/islab/disk2/dfdc_train_test/dfdc_train_face/'
# testing
read_path = 'E:\\Train_dataset\\dfdc_train_part_00\\dfdc_train_part_0'
write_path = 'E:\\Train_dataset\\dfdc_train_part_00_faces'
executed = mkfolder(write_path)

gpuid = 0
detector = RetinaFace('/home/islab/insightface/RetinaFace/retinaface-R50/R50', 0, gpuid, 'net3')

In [None]:
tStart = time.time()
filesname = os.listdir(read_path)
#fp = open(write_path+'log.txt', "a")

video_count = 0
for files in filesname:
    if files.endswith('.mp4'):
        r_path = read_path + files
        w_path = write_path + files[:-4] + '/'
        executed = mkfolder(w_path)
        
        video_count = video_count + 1
        
        if executed:
            print ('\n' + r_path + ' had been done before ...')
            continue
        
        print ('\n(',video_count,'/',len(filesname),')')
        print ('Procesiing the '+ r_path + ' ...' )
        #fp.write('\nProcesiing the '+ r_path + ' ...\n' )
        
        frame_count = 0
        detect_count = 0
        cap = cv2.VideoCapture(r_path)
        while(cap.isOpened()):
            # if no frame then break
            ret, frame = cap.read()
            if not ret :
                print ('Done! the '+ files+ ' total frames:'+ str(frame_count))
                print ('(detect_count/frame_count):', detect_count,'/',frame_count)
                #fp.write('Done! the '+ files+ ' total frames:'+ str(frame_count)+'\n')
                #fp.write('(detect_count/frame_count): '+str(detect_count)+' / '+str(frame_count)+'\n')
                break

            frame_count = frame_count + 1
            if frame_count % 10 != 1:
                continue
            
            
            check = processing_video(w_path,files, frame_count, frame, detector)
            if check:
                detect_count = detect_count + 1
            
            break
            

        cap.release()

tEnd = time.time()
print ("\nIt cost %f sec" % (tEnd - tStart))
print ('\nDone!!!')     
#fp.close()

# Evaluation

It cost 497.974612 sec for testing dataset three frame

It cost 51664.295490 sec for training dataset one frame