# Lie Detection
#### Based on Facial Micro-expressions, Body Posture, Eye Movements

> Mayur Sharma\
> Rohan deep Kujur\
> Khushi Tulsian\
> Atharva Karve

### Package Imports

In [None]:
import math
import cv2
import numpy as np
import pandas as pd

import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision

import threading as thd

mp_drawing  = mp.solutions.drawing_utils

## Detectors

### Face bounds detector

In [None]:
# https://mediapipe.readthedocs.io/en/latest/solutions/face_detection.html

class FaceDetector:
    '''
    FaceDetector is used to get the 'bounds' for a face.
    'bounds' are used to crop the image befor eseonding it for face/iris landmark detection.
    '''
    def __init__(self):
        # creating detector object
        with open(r".\data\blaze_face_short_range.tflite", "rb") as model_file:
            model_data = model_file.read()
        
        options = vision.FaceDetectorOptions(
            base_options = python.BaseOptions(model_asset_buffer=model_data),
            running_mode = vision.RunningMode.IMAGE )
        
        self.face_detector = vision.FaceDetector.create_from_options(options)

    
    def detect_face_bounds(self, image:np.array) -> tuple:
        # convert nump image to mediapipe format
        mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=cv2.cvtColor(image, cv2.COLOR_RGB2BGR))
        
        # detect face in image
        self.face_detection_result = self.face_detector.detect(mp_image)
    
        # if face detected, draw on image and return bounds
        if self.face_detection_result.detections:
            bbox = self.face_detection_result.detections[0].bounding_box
            print(f"Face in region: ({bbox.origin_x}, {bbox.origin_y}) \t Width: {bbox.width}px Height:{bbox.height}px")
            
            #self.draw_bounds(image, bbox)
            
            return self.expand_bounds(bbox)
        print("No face detected")
        return None

    
    def draw_bounds(self, image:np.array, bbox) -> tuple:
        height, width, _ = image.shape
        start_point = bbox.origin_x, bbox.origin_y
        end_point   = bbox.origin_x + bbox.width, bbox.origin_y + bbox.height
        
        cv2.rectangle(image, start_point, end_point, (255,0,0), 3)

    
    def expand_bounds(self, bbox, scale:int=1.3):
        height, width = bbox.height, bbox.width
        x = max(0, round( bbox.origin_x -  width*(scale-1)/2 )  )
        y = max(0, round( bbox.origin_y - height*(scale-1)/2 - height*0.2) )

        #print(x,y)
        bbox.x = x
        bbox.y = y
        bbox.width  = round(scale*width)
        bbox.height = round(scale*height)

        return bbox

### Facemesh detector

In [None]:
class FaceMeshDetector:
    def __init__(self):
        self.LEFT_IRIS  = list(LANDMARKS_LOC['iris_L'])
        self.RIGHT_IRIS = list(LANDMARKS_LOC['iris_R'])
        self.mp_face_mesh = mp.solutions.face_mesh
        self.face_mesh    = self.mp_face_mesh.FaceMesh(refine_landmarks=True)

    
    def detect(self, image:np.array):
        results = self.face_mesh.process(image)
        self.draw_face_landmarks(image, results)   # OPTIONAL
        
        return results

    
    def draw_face_landmarks(self, image, results):
        if not results.multi_face_landmarks:
            return

        img_h, img_w = image.shape[:2]
        mesh_points = np.array([np.multiply([p.x, p.y], [img_w, img_h]).astype(int) for p in results.multi_face_landmarks[0].landmark])
        cv2.polylines(image, [mesh_points[self.LEFT_IRIS]], True, (0,255,0), 1, cv2.LINE_AA)
        cv2.polylines(image, [mesh_points[self.RIGHT_IRIS]], True, (0,255,0), 1, cv2.LINE_AA)

        for face_landmark in results.multi_face_landmarks:
            mp_drawing.draw_landmarks(
                image = image,
                landmark_list = face_landmark,
                connections   = self.mp_face_mesh.FACEMESH_TESSELATION,
    
                landmark_drawing_spec = mp_drawing.DrawingSpec(
                    color=(255,0,0),
                    thickness=0,
                    circle_radius=1),
                
                connection_drawing_spec = mp_drawing.DrawingSpec(
                    color=(255,255,255),
                    thickness=1,
                    circle_radius=1)    )

### Holistic detector

In [None]:
class HolisticDetector:
    def __init__(self):
        self.mp_holistic = mp.solutions.holistic
        self.holistic    = self.mp_holistic.Holistic()

    
    def detect(self, image:np.array):
        results = self.holistic.process(image)
        self.draw_landmarks(image, results)     # OPTIONAL
    
        return results

    
    def draw_landmarks(self, image:np.array, results):
        mp_drawing.draw_landmarks(
            image = image,
            landmark_list = results.pose_landmarks,
            connections   = self.mp_holistic.POSE_CONNECTIONS,
            
            landmark_drawing_spec = mp_drawing.DrawingSpec(
                color=(0,230,255),
                thickness=2,
                circle_radius=1),
            
            connection_drawing_spec = mp_drawing.DrawingSpec(
                color=(255,255,255),
                thickness=2,
                circle_radius=1)
            )

## Data Accumulater

In [None]:
'''
- eyebrow (L&R)                x2
- lips    (L&R)                x2
- face: left right top bottom  x4
- iris    (L&R)                x2
- body                         x2   shoulder, knees
- neck & body angle            x1
= 13 columns : x and y         = 26 features
'''

columns0 = ['lip_L', 'lip_R', 'brow_L', 'brow_R', 'iris_L', 'iris_R'] + [f"face{i}" for i in range(4)] + ["shoulder", "knee"]
columns  = list()
for col in columns0:
    columns.append(col+'x')
    columns.append(col+'y')
columns.extend(['neck_angle', 'waist_angle', 'frame', 'question_no', 'TRUTH'])


# https://raw.githubusercontent.com/google/mediapipe/master/mediapipe/modules/face_geometry/data/canonical_face_model_uv_visualization.png
# https://github.com/google/mediapipe/blob/master/docs/solutions/hands.md
LANDMARKS_LOC = {
    'brow_L'     : {107, 66, 105, 63, 70, 46, 53, 52, 65, 55},
    'brow_R'     : {336, 285, 296, 295, 334, 282, 293, 283, 276, 300},
    'iris_L'     : {474, 475, 476, 477},
    'iris_R'     : {469, 470, 471, 472},
    'lip_L'      : {78, 191, 80, 81, 82, 95, 88, 178, 87},
    'lip_R'      : {308, 415, 324, 310, 318, 311, 402, 312, 317},
    'face0'      : {54, 68, 103, 104, 108, 69, 67, 10, 151, 338, 337, 397, 333, 332, 298, 284, 251, 301, 21, 71, 109, 297, 299},                                                                       #forehead
    'face1'      : {18, 32, 83, 140, 148, 152, 171, 175, 176, 199, 200, 201, 208, 262, 313, 369, 377, 396, 400, 421, 428},                                                       #chin
    'face2'      : {36, 50, 58, 93, 101, 111, 116, 117, 118, 123, 132, 137, 138, 147, 172, 177, 186, 187, 192, 203, 205, 206, 207, 212, 213, 214, 215, 216, 227, 228, 234},      #left_face
    'face3'      : {266, 280, 288, 323, 330, 340, 345, 346, 347, 352, 361, 366, 367, 376, 397, 401, 410, 411, 416, 423, 425, 426, 427, 432, 433, 434, 435, 436, 447, 448, 454}   #right_face
}
POSTURE_LOC = {'nose':0, 'shoulder':12, 'elbow':14, 'knee':26} #nose, shoulder, elbow, knee


# video intervals where the subject answers
# INTERVALS = (
#     (90,  140, 1),
#     (170, 220, 1),
#     (250, 340, 0),
#     (370, 450, 1),    # TESTING INTERVAL
#     (480, 540, 1),
#     (600, 660, 0),
#     (inf, inf,0) )    # prevents detection on remainder video

# TEST_INTERVAL = 4-1

# TRAINING_FEATURES = pd.DataFrame(columns=columns)
# TESTING_FEATURES  = pd.DataFrame(columns=columns)
# TRAINING_FEATURES.shape

print(columns)
pd.DataFrame(columns=columns).set_index("frame")

In [None]:
class CaptureData:
    def __init__(self, name="data"):
        self.name = name
        
        self.df_face = pd.DataFrame(columns=columns)
        self.df_face.set_index("frame", inplace=True)

        self.df_pos = pd.DataFrame(columns=columns)
        self.df_pos.set_index("frame", inplace=True)

    
    def __repr__(self):
        print(self.df)
        return self.name

    
    def record_facelandmarks(self, frame_no, results, question_no, truth):
        nose_fix = (0.5,0.5)
        nose = results.multi_face_landmarks[0].landmark[4]  #nose at index 4
        dist = np.array([nose.x-nose_fix[0], nose.y-nose_fix[1]])           #dist is for keeping the nose at (0.5,0.5)
        features = {"TRUTH": truth, "question_no":question_no}
        
        for feature in LANDMARKS_LOC:
            feature_loc = np.array([0,0], dtype=np.float64)
            
            for idx in LANDMARKS_LOC[feature]:
                mark = results.multi_face_landmarks[0].landmark[idx]
                feature_loc += np.array( [mark.x, mark.y] )
                
            feature_loc /= len(LANDMARKS_LOC[feature])           # average feature location
            
            result = feature_loc - dist                          # normalize
            features[feature+'x'] = result[0]
            features[feature+'y'] = result[1]
        
        new_row = pd.Series(features, name = frame_no)
        self.df_face = pd.concat([self.df_face, new_row.to_frame().T])

    
    def calc_angle(self, p1, p2, p3):
        a = p1[0] - p2[0], p1[1] - p2[1]
        b = p1[0] - p3[0], p1[1] - p3[1]
        ab = a[0]*b[0] + a[1]*b[1]

        norm_a = math.sqrt(a[0]**2 + a[1]**2)
        norm_b = math.sqrt(b[0]**2 + b[1]**2)

        theta = round(math.degrees(math.acos(ab/(norm_a*norm_b))), 6)
        return theta
    
    def record_posture(self, frame_no, results, question_no, truth):
        hip_fix = (0.25, 0.55)
        hip  = results.pose_landmarks.landmark[24]               #hip at index 24
        dist = np.array([hip.x-hip_fix[0], hip.y-hip_fix[1]])    #dist is for keeping the hip at (0.25,0.55)
        features = {"TRUTH": truth, "question_no":question_no}

        locations = {}
        
        for feature in POSTURE_LOC:
            idx  = POSTURE_LOC[feature]
            mark = results.pose_landmarks.landmark[idx]
            feature_loc = np.array([mark.x,mark.y], dtype=np.float64)
            
            result = feature_loc - dist                          #normalize
            locations[feature] = (result[0], result[1])

        
        features["waist_angle"] = self.calc_angle(hip_fix, locations["knee"], locations["shoulder"])
        features[ "neck_angle"] = self.calc_angle(locations["shoulder"], hip_fix, locations["nose"])
        
        features["shoulderx"]   = locations["shoulder"][0]
        features["shouldery"]   = locations["shoulder"][1]
        features["kneex"]       = locations["knee"][0]
        features["kneey"]       = locations["knee"][1]
        
        new_row = pd.Series(features, name = frame_no)
        self.df_pos = pd.concat([self.df_pos, new_row.to_frame().T])

    
    def comiple_and_save_data(self):
        df = pd.merge(self.df_pos, self.df_face, on='frame', how='inner')
        return df

In [None]:
CaptureData.calc_angle([0,0], [0,0], [0,1], [1,0])

## Live video detecting

### testing

### Offset calculator test

### CamConnector

In [None]:
class CamVideoManager:
    def __init__(self, TRAIN_VIDEO_NO:int, camF, camS):
        self.offsetF   = self.offsetS = 0
        self.camF      = camF
        self.camS      = camS
        
        segments       = pd.read_csv(r"./train_videos/segments.csv", index_col="Train_no")
        self.selection = segments.loc[TRAIN_VID_NO]
        offset         = self.selection["offsetF"]
        
        if offset < 0:
            self.offsetS = abs(offset)
        else:
            self.offsetF = abs(offset)

        self.set_frame() # initialize
        self.INTERVALS = self.get_intervals()
        
    
    def get_intervals(self) -> list[dict]:
        intervals = []
        row = self.selection[3:].to_list()
        for i in range(0, len(row), 3):
            intervals.append( {"start": row[i], "end":row[i+1], "truth":row[i+2]} )
        return(intervals)

    
    def set_frame(self, frame_no:int=0, cam:int=0):
        #cam: 0 = both,   1 = front,   2 = side
        if cam in {0,1}:
            self.camF.set(cv2.CAP_PROP_POS_FRAMES, frame_no+self.offsetF-1)
        if cam in {0,2}:
            self.camS.set(cv2.CAP_PROP_POS_FRAMES, frame_no+self.offsetS-1)

## Process_tasks (threads)

In [None]:
def process_face(camF):
    print("PROCESSING FACE")
    CURR_FRAME    = 1
    CURR_INTERVAL = 0


    bbox = None
    while bbox is None:
        _, frameF = camF.read()
        bbox = face_bounds_detect.detect_face_bounds(frameF)
    
    CROP = { "y1" : bbox.y,
             "y2" : bbox.y+bbox.width,
             "x1" : bbox.x,
             "x2" : bbox.x+bbox.height }


    CURR_FRAME = CAMS.INTERVALS[CURR_INTERVAL]["start"]
    CAMS.set_frame(CURR_FRAME, 1)
    
    while True:
        # print(CURR_FRAME)
        _, frameF = camF.read()

        cropFx256    = cv2.resize(frameF[CROP["y1"]:CROP["y2"], CROP["x1"]:CROP["x2"] ], (350,350))
        results_face = face_mesh_detector.detect(cropFx256)
        capture_data.record_facelandmarks(CURR_FRAME, results_face, CURR_INTERVAL, CAMS.INTERVALS[CURR_INTERVAL]["truth"])
        
        cv2.imshow("front", cropFx256)
        key = cv2.waitKey(1)
        if key == ord('q'):
            break

        CURR_FRAME += 1
        if CURR_FRAME == CAMS.INTERVALS[CURR_INTERVAL]['end']:
            if CURR_FRAME == LAST_FRAME:
                cv2.destroyWindow("front")
                print("END OF STREAM")
                break
            CURR_INTERVAL += 1
            CURR_FRAME = CAMS.INTERVALS[CURR_INTERVAL]["start"]
            CAMS.set_frame(CURR_FRAME, 1)
            print("JUMP"*10, CURR_INTERVAL, CAMS.INTERVALS[CURR_INTERVAL])

In [None]:
def process_posture(camS):
    print("PROCESSING BODY123456")
    CURR_FRAME    = 1
    CURR_INTERVAL = 0

    
    bbox = None
    while bbox is None:
        _, frameS = camS.read()
        bbox = holistic_detector.detect(frameS).pose_landmarks.landmark
    
    height, width, _ = frameS.shape
    CROP = { "y1" : round(height*bbox[ 0].y -150),                                   # crop.x = bbox.y, because crop is matrix
             "y2" : round(height*bbox[32].y +100),
             "x1" : round( width*bbox[24].x -150),
             "x2" : round( width*bbox[26].x +180) }
    print(CROP)

    CURR_FRAME = CAMS.INTERVALS[CURR_INTERVAL]["start"]
    CAMS.set_frame(CURR_FRAME, 2)
    
    while True:
        # print(CURR_FRAME)
        _, frameS = camS.read()
        frameS = frameS[CROP["y1"]:CROP["y2"], CROP["x1"]:CROP["x2"] ]
        
        results_posture = holistic_detector.detect(frameS)
        capture_data.record_posture(CURR_FRAME, results_posture, CURR_INTERVAL, CAMS.INTERVALS[CURR_INTERVAL]["truth"])
        
        cv2.imshow("side", cv2.resize(frameS, (0,0), fx=0.4, fy=0.4))
        key = cv2.waitKey(1)
        if key == ord('q'):
            break

        CURR_FRAME += 1
        if CURR_FRAME == CAMS.INTERVALS[CURR_INTERVAL]['end']:
            if CURR_FRAME == LAST_FRAME:
                cv2.destroyWindow("side")
                print("END OF STREAM")
                break
            CURR_INTERVAL += 1
            CURR_FRAME = CAMS.INTERVALS[CURR_INTERVAL]["start"]
            CAMS.set_frame(CURR_FRAME, 2)
            print("JUMP"*10, CURR_INTERVAL, CAMS.INTERVALS[CURR_INTERVAL])

## MAIN

In [None]:
TRAIN_VID_NO  = 3

face_bounds_detect = FaceDetector()
face_mesh_detector = FaceMeshDetector()
holistic_detector  = HolisticDetector()
capture_data = CaptureData()

In [None]:
camF = cv2.VideoCapture(rf".\train_videos\train{TRAIN_VID_NO}-f.mp4")
camS = cv2.VideoCapture(rf".\train_videos\train{TRAIN_VID_NO}-s.mp4")

CAMS = CamVideoManager(TRAIN_VID_NO, camF, camS)
CAMS.set_frame(CAMS.INTERVALS[0]["start"])
LAST_FRAME = CAMS.INTERVALS[-1]["end"]
print(f"{CAMS.INTERVALS}")

## MultiThreaded

In [None]:
# process_posture(camS)
# process_face(camF)
# cv2.destroyAllWindows()

if __name__ == "__main__":
    thd1 = thd.Thread(target=process_posture, args=(camS,))
    thd2 = thd.Thread(target=process_face,    args=(camF,))
    
    thd1.start()
    thd2.start()
    
    thd1.join()
    thd2.join()
    print("DONE")
    cv2.destroyAllWindows()

In [None]:
name = "khushi"

c1 = ['lip_Lx', 'lip_Ly', 'lip_Rx', 'lip_Ry', 'brow_Lx', 'brow_Ly', 'brow_Rx', 'brow_Ry', 'iris_Lx', 'iris_Ly', 'iris_Rx', 'iris_Ry', 'face0x', 'face0y', 'face1x', 'face1y', 'face2x', 'face2y', 'face3x', 'face3y']
c2 = ['shoulderx', 'shouldery', 'kneex', 'kneey', 'neck_angle', 'waist_angle', 'question_no', 'TRUTH']

d1 = capture_data.df_face.drop(columns=c2)
d2 = capture_data.df_pos.drop( columns=c1)

#capture_data.df_pos, capture_data.df_face, how="inner", on="frame")

DATASET = pd.concat([d1,d2], axis=1)
DATASET.to_csv(rf'.\data\{name}_landmarka.csv')

## Synced (non-multi-thread)