In [2]:
import cv2
import numpy as np
from skimage import io
from batch_face import RetinaFace, LandmarkPredictor, draw_landmarks, Timer
import live_pose_estimator
import time

In [None]:
def get_landmarks(frame, faces):
    ### Predict landmarks from given face co-ordinates ###
    landmarks = predictor(faces, frame, from_fd=True)
    return landmarks

In [None]:
def draw_landmarks_cv(frame, faces, landmarks):
    ### Draw landmarks on faces using CV2 - Possible to draw multiple faces with a For loop, however we are only interested in having one face in the frame ### 
    frame = draw_landmarks(frame, faces[0][0], landmarks[0])
    return frame

In [None]:
def get_head_pose(frame, faces_pose):
    head_poses = head_pose_estimator(faces_pose, frame, input_face_type='tuple', update_dict=True)
    return head_poses

In [None]:
def draw_head_pose_cube_cv(frame, faces, pose):
    head_pose_estimator.plot_pose_cube(frame, faces[0][0], **pose)

In [None]:
def updated_bbox(landmarks):
    ldm_new = landmarks[0]
    (x1, y1), (x2, y2) = ldm_new.min(0), ldm_new.max(0)
    box_new = np.array([x1, y1, x2, y2])
    box_new[:2] -= 10
    box_new[2:] += 10
    faces = [[box_new, None, None]]
    return faces

Opens camera and passes frames to functions, comment/uncomment functions for desired tracking

In [None]:
### Open camera ###
cap = cv2.VideoCapture(0)
detector = RetinaFace(0)
predictor = LandmarkPredictor(0)
head_pose_estimator = SixDRep(0)
detect_time = time.time()
faces = None

while True:
    # Capture frame-by-frame
    ret, frame = cap.read()
    loop_time = time.time()
    
    ### NOTE: RGB values are normalized within RetinaFace ###
    ### Detect faces if none exist ###
    
    # Calculate the time difference
    elapsed_time = time.time() - detect_time

    # Check if n seconds has passed: The shorter the elapsed time - the more face detections are done, but also the lower the fps and efficiency
    if faces is None or elapsed_time >= 1:
        faces = detector(frame, cv=True, threshold=0.5)
        detect_time = time.time()
    else:
        ### This is an efficiency method of predicting the face bound-box - especially for live camera. It uses the min and max values from the results of the previous landmark 'predictor' function. Helps increase the fps rate ###
        ### However, it will not detect new faces, or when a face has gone ###
        faces = updated_bbox(landmarks)

    if len(faces) == 0:
        print("NO face is detected!")
        continue

    ### Predict landmarks from face ###
    landmarks = get_landmarks(frame, faces)

    ### Estimate head pose from face ###
    pose = get_head_pose(frame, faces)
    
    ### Draw landmarks (AND/OR) pose cube ###
    frame = draw_landmarks_cv(frame, faces, landmarks)
    draw_head_pose_cube_cv(frame, faces, pose[0])

    # Calculate and display FPS, Pitch, Yaw and Roll
    fps = 1 / (time.time() - loop_time)
    cv2.putText(frame, f"FPS: {fps:.2f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2)
    cv2.putText(frame, f"Pitch: {pose[0]['pitch']:.2f}", (10, 70), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
    cv2.putText(frame, f"Yaw: {pose[0]['yaw']:.2f}", (10, 100), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
    cv2.putText(frame, f"Roll: {pose[0]['roll']:.2f}", (10, 130), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
    
    ### Display the resulting frame ###
    cv2.imshow('', frame)

    ### Press 'q' to exit the video window ###
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

### Release the capture when done ###
cap.release()
cv2.destroyAllWindows()

In [None]:
from sixdrepnet import SixDRepNet
import sixdrepnet.utils as utils
from opencv_transforms import transforms as cv_transforms
import torch
import numpy as np

crop_resize = cv_transforms.Compose([cv_transforms.Resize(224),
                                    cv_transforms.CenterCrop(224)])

normalize = cv_transforms.Compose([cv_transforms.ToTensor(),
                                    cv_transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])])


def chunk_generator(l, n):
    for i in range(0, len(l), n):
        yield l[i:i + n]

def flatten(l):
    return [item for sublist in l for item in sublist]

def chunk_call(model, chunk_size, input_tensor):
    outputs = []
    for chunk in chunk_generator(input_tensor, chunk_size):
        outputs.append(model(chunk))
    if isinstance(outputs[0], torch.Tensor):
        return torch.cat(outputs, dim=0)
    else:
        return flatten(outputs)

class SixDRep:
    def __init__(self, gpu_id: int= -1, dict_path: str='') -> None:
        self.model = SixDRepNet(gpu_id=gpu_id, dict_path=dict_path)
        if gpu_id == -1:
            self.device = torch.device('cpu')
        else:
            self.device = torch.device('cuda:{}'.format(gpu_id))

    def plot_pose_cube(self, frame, box, yaw, pitch, roll):
        x_min = int(box[0])
        y_min = int(box[1])
        x_max = int(box[2])
        y_max = int(box[3])
        bbox_width = abs(x_max - x_min)
        bbox_height = abs(y_max - y_min)

        x_min = max(0, x_min-int(0.2*bbox_height))
        y_min = max(0, y_min-int(0.2*bbox_width))
        x_max = x_max+int(0.2*bbox_height)
        y_max = y_max+int(0.2*bbox_width)
        utils.plot_pose_cube(frame,  yaw, pitch, roll, x_min + int(.5*(x_max-x_min)), y_min + int(.5*(y_max-y_min)), size=bbox_width)

    def __call__(self, all_faces, frames, batch_size=None, input_face_type='tuple', update_dict=True):
        '''
        frames: list of np.ndarray, 0~255, uint8, rgb order
        batch_size: int, if None, no chunking
        input_face_type: str, 'tuple' or 'dict' or 'box'
        update_dict: bool, if True, update the input dictionary with head pose
        '''
        # if update_dict:
        #     assert input_face_type == 'dict', 'input_face_type should be dict when updating dictionary'

        #assert len(frames) == len(all_faces) M
        if batch_size is None:
            batch_size = len(all_faces) # no chunking
        imgs_for_model = []
        metas = []
        for face, i in zip(all_faces, range(len(frames))):
            #for j, face in enumerate(faces): M
            frame = frames #M
            if input_face_type == 'tuple':
                box = face[0]
            elif input_face_type == 'dict':
                box = face['box']
            elif input_face_type == 'box':
                box = face
            x_min = int(box[0])
            y_min = int(box[1])
            x_max = int(box[2])
            y_max = int(box[3])
            
            bbox_width = abs(x_max - x_min)
            bbox_height = abs(y_max - y_min)

            x_min = max(0, x_min-int(0.2*bbox_height))
            y_min = max(0, y_min-int(0.2*bbox_width))
            x_max = x_max+int(0.2*bbox_height)
            y_max = y_max+int(0.2*bbox_width)
            img = frame[y_min:y_max, x_min:x_max]
            imgs_for_model.append(normalize(crop_resize(img)))
            metas.append((i, 0, x_min, y_min, x_max, y_max, bbox_width, bbox_height))

                # pitch, yaw, roll = model.predict(img)
                # img = model.draw_axis(img, yaw, pitch, roll)
                # frame[y_min:y_max, x_min:x_max] = img

                # utils.plot_pose_cube(frame,  yaw, pitch, roll, x_min + int(.5*(
                #             x_max-x_min)), y_min + int(.5*(y_max-y_min)), size=bbox_width)

        imgs_for_model = torch.stack(imgs_for_model).to(self.device)
        with torch.no_grad():
            pred = chunk_call(self.model.model, batch_size, imgs_for_model)

        euler = utils.compute_euler_angles_from_rotation_matrices(pred)*180/np.pi
        p = euler[:, 0].cpu().detach().numpy()
        y = euler[:, 1].cpu().detach().numpy()
        r = euler[:, 2].cpu().detach().numpy()

        # reorganize the output
        outputs = [] #[] for _ in range(len(frames)) M

        for (i, j, x_min, y_min, x_max, y_max, bbox_width, bbox_height), pitch, yaw, roll in zip(metas, p, y, r):
            #utils.plot_pose_cube(frames[i], yaw, pitch, roll, x_min + int(.5*(x_max-x_min)), y_min + int(.5*(y_max-y_min)), size=bbox_width)
            head_pose = {
                'pitch': pitch,
                'yaw': yaw,
                'roll': roll
            }
            outputs.append(head_pose)
            if update_dict and input_face_type == 'dict':

                all_faces[0]['head_pose'] = head_pose
        #for faces, output in zip(all_faces, outputs):
            #assert len(faces) == len(output)
        return outputs