In [None]:
import os, sys
from os import listdir
from os.path import isfile, isdir, join
from tqdm.notebook import tqdm
from argparse import ArgumentParser

In [None]:
import numpy as np
import cv2, imutils
from PIL import Image
import json, yaml, collections
import pickle as pkl
import time
from datetime import datetime, timedelta
from scipy.spatial.transform import Rotation

In [None]:
import torch
import torch.nn as nn
from torch import functional as F
from torchvision import transforms as T
from omegaconf import DictConfig, OmegaConf
from typing import List
import tempfile

In [None]:
from ptgaze.common import Camera, Face, FacePartsName, Visualizer
from ptgaze.utils import get_3d_face_model
from ptgaze.head_pose_estimation.head_pose_normalizer import HeadPoseNormalizer
from ptgaze.models import create_model
from ptgaze.transforms import create_transform

In [None]:
class GazeEstimator:
    EYE_KEYS = [FacePartsName.REYE, FacePartsName.LEYE]

    def __init__(self, config: DictConfig):
        self._config = config
        self._face_model3d = get_3d_face_model(config)
        self.camera = Camera(config.gaze_estimator.camera_params)
        self._normalized_camera = Camera(
            config.gaze_estimator.normalized_camera_params)

        # self._landmark_estimator = LandmarkEstimator(config)
        self._head_pose_normalizer = HeadPoseNormalizer(
            self.camera, self._normalized_camera,
            self._config.gaze_estimator.normalized_camera_distance)

        self.device = torch.device(self._config.device)
        self._gaze_estimation_model = self._load_model()
        self._transform = create_transform(config)

    def _load_model(self) -> torch.nn.Module:
        model = create_model(self._config)
        checkpoint = torch.load(self._config.gaze_estimator.checkpoint,
                                map_location='cpu')
        model.load_state_dict(checkpoint['model'])
        model.to(self.device)
        model.eval()
        return model

    def detect_faces(self, image: np.ndarray, landmarks_arr:list) -> List[Face]:
        # return self._landmark_estimator.detect_faces(image)
        detected = []
        for l_idx in range(len(landmarks_arr)):
            pts = landmarks_arr[l_idx][:, :2]*np.array([[image.shape[1], image.shape[0]]])
            bbox = np.vstack([pts.min(axis=0), pts.max(axis=0)])
            bbox = np.round(bbox).astype(np.int32)
            detected.append(Face(bbox, pts))
        return detected

    def estimate_gaze(self, image: np.ndarray, face: Face) -> None:
        self._face_model3d.estimate_head_pose(face, self.camera)
        self._face_model3d.compute_3d_pose(face)
        self._face_model3d.compute_face_eye_centers(face, self._config.mode)

        if self._config.mode == 'MPIIGaze':
            for key in self.EYE_KEYS:
                eye = getattr(face, key.name.lower())
                self._head_pose_normalizer.normalize(image, eye)
            self._run_mpiigaze_model(face)
        elif self._config.mode == 'MPIIFaceGaze':
            self._head_pose_normalizer.normalize(image, face)
            self._run_mpiifacegaze_model(face)
        elif self._config.mode == 'ETH-XGaze':
            self._head_pose_normalizer.normalize(image, face)
            self._run_ethxgaze_model(face)
        else:
            raise ValueError

    @torch.no_grad()
    def _run_mpiigaze_model(self, face: Face) -> None:
        images = []
        head_poses = []
        for key in self.EYE_KEYS:
            eye = getattr(face, key.name.lower())
            image = eye.normalized_image
            normalized_head_pose = eye.normalized_head_rot2d
            if key == FacePartsName.REYE:
                image = image[:, ::-1].copy()
                normalized_head_pose *= np.array([1, -1])
            image = self._transform(image)
            images.append(image)
            head_poses.append(normalized_head_pose)
        images = torch.stack(images)
        head_poses = np.array(head_poses).astype(np.float32)
        head_poses = torch.from_numpy(head_poses)

        images = images.to(self.device)
        head_poses = head_poses.to(self.device)
        predictions = self._gaze_estimation_model(images, head_poses)
        predictions = predictions.cpu().numpy()

        for i, key in enumerate(self.EYE_KEYS):
            eye = getattr(face, key.name.lower())
            eye.normalized_gaze_angles = predictions[i]
            if key == FacePartsName.REYE:
                eye.normalized_gaze_angles *= np.array([1, -1])
            # eye.angle_to_vector()
            # eye.denormalize_gaze_vector()

    @torch.no_grad()
    def _run_mpiifacegaze_model(self, face: Face) -> None:
        image = self._transform(face.normalized_image).unsqueeze(0)

        image = image.to(self.device)
        prediction = self._gaze_estimation_model(image)
        prediction = prediction.cpu().numpy()

        face.normalized_gaze_angles = prediction[0]
        # face.angle_to_vector()
        # face.denormalize_gaze_vector()

    @torch.no_grad()
    def _run_ethxgaze_model(self, face: Face) -> None:
        image = self._transform(face.normalized_image).unsqueeze(0)

        image = image.to(self.device)
        prediction = self._gaze_estimation_model(image)
        prediction = prediction.cpu().numpy()

        face.normalized_gaze_angles = prediction[0]
        # face.angle_to_vector()
        # face.denormalize_gaze_vector()

In [None]:
def generate_dummy_camera_params(config: DictConfig, cap=None, downscale=1) -> None:
    if cap is None:
        w = int(1280/downscale)
        h = int(720/downscale)
    elif type(cap)==cv2.VideoCapture:
        w, h = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)/downscale), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)/downscale)
    else:
        raise ValueError
    
    out_file = config.gaze_estimator.camera_params
    c_dict = {
        'image_width': w,
        'image_height': h,
        'camera_matrix': {
            'rows': 3,
            'cols': 3,
            'data': [w, 0., w / 2, 0., w, h / 2, 0., 0., 1.]
        },
        'distortion_coefficients': {
            'rows': 1,
            'cols': 5,
            'data': [0., 0., 0., 0., 0.]
        }
    }
    with open(out_file, 'w') as fp:
        yaml.safe_dump(c_dict, fp)
        # yaml.dump(c_dict, fp, default_flow_style = True)

In [None]:
def disp_gaze_perpective(frame, x1, y1, rot_mat, dist=100., color=(0,0,255), thickness=2):
    focal_length = frame.shape[1]/2
    camera_origin = np.array([0., 0., focal_length], dtype=float)
    camera_center = (frame.shape[1] / 2, frame.shape[0] / 2)
    camera_matrix = np.array(
        [[focal_length, 0., camera_center[0]],
        [0., focal_length, camera_center[1]],
        [0., 0., 1.]], dtype=float)
    # print(camera_matrix)
    # print(x1, y1)
    
    src_pts = np.array([[0., 0., 1e-5, 1.], [0., 0., dist, 1.]], dtype=float)
    Tmat = np.identity(4, dtype=float)
    Tmat[:3, :3] = rot_mat
    Tmat[:3, 3] = np.array([x1, y1, 0])
    dst_pts = (Tmat @ src_pts.T).T
    dst_pts[:, :3] -= camera_origin
    dst_pts[:, 2] *= -1
    dst_pts /= dst_pts[:, [2]]
    img_pts = (camera_matrix @ dst_pts[:, :3].T).T
    # img_pts /= img_pts[:, [2]]
    
    img = cv2.arrowedLine(frame, tuple(np.round(img_pts[0, :2]).astype(np.int32)),
                   tuple(np.round(img_pts[1, :2]).astype(int)), color,
                   thickness, cv2.LINE_AA, tipLength=0.18)
    return img

def disp_gaze_ROIperpective(frame, x1, y1, cx, cy, width, height, rot_mat, dist=100., color=(0,0,255), thickness=2):
    focal_length = frame.shape[1]/2
    camera_origin = np.array([0, 0, focal_length], dtype=float)
    camera_center = (width / 2, height / 2)
    camera_matrix = np.array(
        [[focal_length, 0., camera_center[0]],
        [0., focal_length, camera_center[1]],
        [0., 0., 1.]], dtype=float)
    # print(camera_matrix)
    # print(x1, y1)
    
    src_pts = np.array([[0., 0., 1e-5, 1.], [0., 0., dist, 1.]], dtype=float)
    Tmat = np.identity(4, dtype=float)
    Tmat[:3, :3] = rot_mat
    Tmat[:3, 3] = np.array([x1-width/2, y1-height/2, 0])
    dst_pts = (Tmat @ src_pts.T).T
    dst_pts[:, :3] -= camera_origin
    dst_pts[:, 2] *= -1
    dst_pts /= dst_pts[:, [2]]
    img_pts = (camera_matrix @ dst_pts[:, :3].T).T
    img_pts -= np.array([[-cx, -cy, 0]])
    
    img = cv2.arrowedLine(frame, tuple(np.round(img_pts[0, :2]).astype(np.int32)),
                   tuple(np.round(img_pts[1, :2]).astype(int)), color,
                   thickness, cv2.LINE_AA, tipLength=0.18)
    return img

def disp_gaze_orthographic(frame, x1, y1, rot_mat, dist=100., color=(0,0,255), thickness=2):
    focal_length = frame.shape[1]/2
    camera_origin = np.array([0., 0., focal_length], dtype=float)
    camera_center = (frame.shape[1] / 2, frame.shape[0] / 2)
    
    src_pts = np.array([[0., 0., 1e-5, 1.], [0., 0., dist, 1.]], dtype=float)
    Tmat = np.identity(4, dtype=float)
    Tmat[:3, :3] = rot_mat
    Tmat[:3, 3] = np.array([x1, y1, 0])
    dst_pts = (Tmat @ src_pts.T).T
    dst_pts[:, :3] -= camera_origin
    dst_pts[:, 2] *= -1

    img = cv2.arrowedLine(frame, tuple(np.round(dst_pts[0, :2]).astype(np.int32)),
                   tuple(np.round(dst_pts[1, :2]).astype(int)), color,
                   thickness, cv2.LINE_AA, tipLength=0.18)
    return img

In [None]:
left_eye = [33, 246, 161, 160, 159, 158, 157, 173, 133, 155, 154, 153, 145, 144, 163, 7]
right_eye = [362, 398, 384, 385, 386, 387, 388, 466, 263, 249, 390, 373, 374, 380, 381, 382]
nose_tip = [1, 4]
left_lipend = [61, 76]
right_lipend = [306, 291]
landmark_keys = [left_eye, right_eye, nose_tip, left_lipend, right_lipend]
landmark_color = [(255, 0, 64), (255, 64, 0), (0, 255, 0), (64, 0, 255), (0, 64, 255)]

def get_five_landmarks(frame, landmarks):
    pts = []
    for idx in range(len(landmark_keys)):
        vertices = landmarks[landmark_keys[idx], :]
        pt = np.mean(vertices[:, :2], axis=0)*np.array([frame.shape[1], frame.shape[0]])
        pts.append(pt)
    pts = np.stack(pts)
    return pts

def draw_landmarks(frame, landmarks):
    for idx in range(landmarks.shape[0]):
        cv2.circle(frame, (*landmarks[idx].astype(int),), 3, landmark_color[idx], -1)
    return frame

In [None]:
sys.argv = ['code.py', '-use_dummy_camera_params']
parser = ArgumentParser()
parser.add_argument('-mode', metavar='mode', default='eth-xgaze', type=str, help='{mpiigaze,mpiifacegaze,eth-xgaze}')
parser.add_argument('-model', metavar='model', default='./ptgaze/models/eth-xgaze_resnet18.pth', type=str)
parser.add_argument('-config', metavar='config', default='./ptgaze/data/configs/eth-xgaze.yaml', type=str)
parser.add_argument('-device', metavar='device', default='cuda:0', type=str)
parser.add_argument('-use_dummy_camera_params', action='store_true')
args = parser.parse_args()
print(args)

In [None]:
os.environ['PACKAGE_ROOT'] = 'C:\\Users\\Rishabh\\Documents\\Repos\\face\\Gaze\\gazeEstimation\\ptgaze'
config = OmegaConf.load(args.config)
config['PACKAGE_ROOT'] = './ptgaze'
config['gaze_estimator'].camera_params = config['gaze_estimator'].camera_params.format(config['PACKAGE_ROOT'])
config['gaze_estimator'].normalized_camera_params = config['gaze_estimator'].normalized_camera_params.format(config['PACKAGE_ROOT'])
config['gaze_estimator'].checkpoint = args.model
config.device = args.device

config.gaze_estimator.use_dummy_camera_params = args.use_dummy_camera_params
if config.gaze_estimator.use_dummy_camera_params:
    generate_dummy_camera_params(config)

print(config)

In [None]:
root = 'D:/Datasets/JALI/'
video_dir = join(root, 'reencoded')
video_names = [f for f in listdir(video_dir) if isfile(join(video_dir, f))]
video_names.sort()
print('\n'.join(video_names))

In [None]:
exit_flag = False
pose_dir = join(root, 'pose')
save_dir = join(root, 'ETHGaze')
if not isdir(save_dir):
    os.mkdir(save_dir)
fixed_height = 480
downsample = 1

for vid_num in tqdm(range(len(video_names))):
    vid_prefix = video_names[vid_num].split('.mp4')[0]
    vid_path = join(video_dir, video_names[vid_num])
    pose_path = join(pose_dir, vid_prefix+'.pkl')
    
    cap = cv2.VideoCapture(vid_path)
    width, height = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps, frame_count = cap.get(cv2.CAP_PROP_FPS), int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    print(vid_prefix)
    print('FPS:{},RES:{}x{}|COUNT:{}'.format(fps, width, height, frame_count))
    
    pose_writer = None
    save_path = join(save_dir, vid_prefix+'.mp4')
    downscale = height/fixed_height
    generate_dummy_camera_params(config, cap, downscale)

    gaze_estimator = GazeEstimator(config)
    gaze_estimator.camera.camera_matrix = gaze_estimator.camera.camera_matrix.astype(np.float32)
    gaze_estimator.camera.dist_coefficients = gaze_estimator.camera.dist_coefficients.reshape(5).astype(np.float32)
    print(gaze_estimator.camera.camera_matrix, gaze_estimator.camera.dist_coefficients)
    
    prev_rotation=None
    deltatime = 0
    kf_obj = cv2.KalmanFilter(4, 2, 0)
    kf_obj.transitionMatrix = np.array([[1,0,1,0], [0,1,0,1], [0,0,1,0], [0,0,0,1]], dtype=np.float32)
    kf_obj.measurementMatrix = np.array([[1,0,0,0], [0,1,0,0]], dtype=np.float32)

    pose_dict = {}
    with open(pose_path, 'rb') as fp:
        pose_dict = pkl.load(fp)
    landmarks_arr = pose_dict['LANDMARKS']
    faces_arr = pose_dict['BBOX']
    
    nface_dict = {}
    raw_faces = []
    raw_landmarks = []
    raw_rot_eulers = []
    raw_rot_matrices = []
    smooth_rot_eulers = []
    smooth_rot_matrices = []

    for frame_num in range(frame_count):
        ret, frame = cap.read()
        if not ret:
            break
        frame_resized = imutils.resize(frame, height=fixed_height)
        undistorted_frame = cv2.undistort(
                                frame_resized, gaze_estimator.camera.camera_matrix,
                                gaze_estimator.camera.dist_coefficients)
        frame_copy = frame_resized.copy()
        
        # nface_dict[str(frame_num)] = []
        raw_face = faces_arr[frame_num]#*np.array([width, height, width, height])
        raw_bbox = (raw_face/downscale).astype(int)
        raw_bbox = (raw_bbox[0], raw_bbox[1], raw_bbox[2]-raw_bbox[0], raw_bbox[3]-raw_bbox[1])
        face_ar = raw_bbox[2]*raw_bbox[3]
        start_time = time.time()

        if face_ar>0:
            landmark_pts = landmarks_arr[frame_num].astype(np.float32)
            five_landmarks = get_five_landmarks(frame, landmark_pts)/downscale
            # nface_dict[str(frame_num)] = [raw_face, five_landmarks.astype(float)]
            raw_faces.append(raw_face.astype(np.float32))
            raw_landmarks.append(landmark_pts[:468, :2]*np.array([[frame.shape[1], frame.shape[0]]]))

            faces_list = gaze_estimator.detect_faces(undistorted_frame, [landmark_pts[:468]])
            cv2.rectangle(frame_copy, raw_bbox, (0,0,255), 2)
            cv2.circle(frame_copy, (int(five_landmarks[2, 0]), int(five_landmarks[2, 1])), 3, (0,255,0), -1)
            # draw_landmarks(frame_copy, five_landmarks)
            for face_obj in faces_list:
                gaze_estimator.estimate_gaze(undistorted_frame, face_obj)
                pitch_predicted, yaw_predicted = face_obj.normalized_gaze_angles
                rot_obj = Rotation.from_euler('xyz', [pitch_predicted, -yaw_predicted, 0], degrees=False)
                rot_mat = rot_obj.as_matrix()
                end_time = time.time()
                estimated_mat = None
                raw_rot_eulers.append(rot_obj.as_euler('xyz', degrees=True))
                raw_rot_matrices.append(rot_mat)

                if prev_rotation==None:
                    # angular_velocity = rot_obj.as_euler('yxz')/(deltatime+end_time-start_time)
                    kf_obj.statePre = np.array([pitch_predicted, -yaw_predicted, 0, 0], dtype=np.float32)
                    kf_obj.processNoiseCov = np.identity(4, dtype=np.float32) * 1e-4
                    kf_obj.measurementNoiseCov = np.identity(2, dtype=np.float32) * 1e-1
                    kf_obj.errorCovPost = np.identity(4, dtype=np.float32) * 1e-1
                    smooth_rot_eulers.append(rot_obj.as_euler('xyz', degrees=True))
                    smooth_rot_matrices.append(rot_mat)
                else:
                    angular_velocity = (rot_obj.as_euler('yxz', \
                                        degrees=False)-prev_rotation.as_euler('yxz', \
                                        degrees=False))/(deltatime+end_time-start_time)
                    estimated = kf_obj.correct(np.array([pitch_predicted, -yaw_predicted], dtype=np.float32))
                    kf_obj.statePre = np.array([pitch_predicted, -yaw_predicted, angular_velocity[0], angular_velocity[1]], dtype=np.float32)
                    predicted = kf_obj.predict()
                    estimated_mat = Rotation.from_euler('xyz', np.array([predicted[0], predicted[1], 0.]), degrees=False)
                    estrot_mat = estimated_mat.as_matrix()
                    smooth_rot_eulers.append(estimated_mat.as_euler('xyz', degrees=True))
                    smooth_rot_matrices.append(estrot_mat)
                prev_rotation = rot_obj
                deltatime = 0

                frame_copy = disp_gaze_orthographic(frame_copy, 
                                                    five_landmarks[2, 0], 
                                                    five_landmarks[2, 1], 
                                                    rot_mat, dist=frame_resized.shape[0]//3, color=(0,0,255))
                # frame_copy = disp_gaze_ROIperpective(frame_copy, 
                #                            five_landmarks[2, 0]-raw_bbox[0], 
                #                            five_landmarks[2, 1]-raw_bbox[1], 
                #                            raw_bbox[0], raw_bbox[1], raw_bbox[2], raw_bbox[3], 
                #                            rot_mat, dist=frame_resized.shape[0]//3, color=(0,0,255))
                if estimated_mat is not None:
                    frame_copy = disp_gaze_orthographic(frame_copy, 
                                                        five_landmarks[2, 0], 
                                                        five_landmarks[2, 1], 
                                                        estrot_mat, dist=frame_resized.shape[0]//3, color=(0,255,0))
                    # frame_copy = disp_gaze_ROIperpective(frame_copy, 
                    #                                     five_landmarks[2, 0]-raw_bbox[0], 
                    #                                     five_landmarks[2, 1]-raw_bbox[1], 
                    #                                     raw_bbox[0], raw_bbox[1], raw_bbox[2], raw_bbox[3], 
                    #                                     estrot_mat, dist=frame_resized.shape[0]//3, color=(0,255,0))
        else:
            end_time = time.time()
            deltatime += end_time-start_time
            raw_faces.append(np.zeros((4), dtype=np.float32))
            raw_landmarks.append(np.zeros((468, 2), dtype=np.float32))
            raw_rot_eulers.append(np.zeros((3,), dtype=float))
            raw_rot_matrices.append(np.identity(3, dtype=float))
            smooth_rot_eulers.append(np.zeros((3,), dtype=float))
            smooth_rot_matrices.append(np.identity(3, dtype=float))

        frame_copy = imutils.resize(frame_copy, height=480)
        if pose_writer is None:
            pose_writer = cv2.VideoWriter(save_path, cv2.VideoWriter_fourcc('m', 'p', '4', 'v'),
                                         fps/downsample, (frame_copy.shape[1], frame_copy.shape[0]))
        pose_writer.write(frame_copy)
        cv2.imshow('Video', frame_copy)
        ch = cv2.waitKey(1)
        if ch<0:
            continue
        elif chr(ch)=='x':
            exit_flag = True
            break
        elif chr(ch)=='n':
            break
    
    raw_faces = np.stack(raw_faces, axis=0)
    raw_landmarks = np.stack(raw_landmarks, axis=0)
    raw_rot_eulers = np.stack(raw_rot_eulers, axis=0)
    raw_rot_matrices = np.stack(raw_rot_matrices, axis=0)
    smooth_rot_eulers = np.stack(smooth_rot_eulers, axis=0)
    smooth_rot_matrices = np.stack(smooth_rot_matrices, axis=0)
    with open(join(save_dir, vid_prefix+'.pkl'), 'wb') as fp:
        pkl.dump({'RAW_GAZE':{'EULER':raw_rot_eulers, 'ROTMAT':raw_rot_matrices},
                  'SMOOTH_GAZE':{'EULER':smooth_rot_eulers, 'ROTMAT':smooth_rot_matrices},
                  'FACES':raw_faces, 'LANDMARKS':raw_landmarks}, fp)

    if pose_writer is not None:
        pose_writer.release()
        pose_writer = None
        ffmpeg_cmd = 'ffmpeg {} -i "{}" -i "{}" -map 0:v -map 1:a -c:v libx264 -c:a copy -strict -2 "{}"'.format(
                                                                                '-hide_banner -loglevel error',
                                                                                save_path,
                                                                                vid_path,
                                                                                join(save_dir, 'vid.mp4'))
        os.system(ffmpeg_cmd)
        os.remove(save_path)
        os.rename(join(save_dir, 'vid.mp4'), save_path)
    if exit_flag:
        cv2.destroyAllWindows()
        break

In [None]:
cv2.destroyAllWindows()