##### Copyright 2023 The MediaPipe Authors. All Rights Reserved.

In [1]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

This notebook is based on a tutorial produced by The MediaPipe Authors, and contains the above licenses. The paper "3D Facial Feature Tracking with
Multimodal Depth Fusion" uses the Face Landmarker and Face Detector model structures, weights, and loading procedures from MediaPipe to facilitate the facial feature tracking process. </br>

The use of <b>laplace_max_step</b> function and the compilation of the results into cumulative .CSV files represent a divergence from the original notebook produced by MediaPipe for the application detailed in "3D Facial Feature Tracking with
Multimodal Depth Fusion".

Install MediaPipe library and download the Face Landmarker v2 and Face Detector.


Face Landmarker documentation can be found [here](https://developers.google.com/mediapipe/solutions/vision/face_landmarker#models).


We will use the models "as-is" and will not do any further fine-tuning due to the relative adherence of our application with typical facial data training sets.

In [None]:
!pip install -q mediapipe
!wget -O face_landmarker_v2_with_blendshapes.task -q https://storage.googleapis.com/mediapipe-models/face_landmarker/face_landmarker/float16/1/face_landmarker.task
!wget -q -O detector.tflite -q https://storage.googleapis.com/mediapipe-models/face_detector/blaze_face_short_range/float16/1/blaze_face_short_range.tflite

Mount files stored in Google Drive for use by the code. Skip if running locally or in a different compute environment.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Defines the function that calculates where the focus is at a maximum for the purposes of 3D absolute depth estimation. Not needed if only doing x and y analysis.

In [None]:
import os
def laplace_max_step(pathname):
  laplacemax = 0
  for StepNo in os.scandir(pathname):
    image = cv2.imread(pathname + str(os.path.basename(StepNo)), cv2.IMREAD_GRAYSCALE)
    #makes sure that program doesn't crash if an image doesn't load right
    if image is None:
      stepmax = 471
      continue
    (x_max,y_max) = image.shape
    image_face = mp.Image.create_from_file(pathname + str(os.path.basename(StepNo)))
    face_result = face_detector.detect(image_face)

    for detection in face_result.detections:
      # Draw bounding_box
      bbox = detection.bounding_box
      #cv2.rectangle(annotated_image, start_point, end_point, TEXT_COLOR, 3)
      #image = cv2.imread(pathname_max, cv2.IMREAD_GRAYSCALE)
      img = Image.fromarray(image, 'L')
      # (y,x,y+h,x+w) gives left bound, upper bound, right bound, bottom
      img = img.crop((bbox.origin_x, bbox.origin_y, bbox.origin_x + bbox.width, bbox.origin_y + bbox.height))
      #img = np.array(img)[:, :, ::-1]
      img = np.array(img)
      img = cv2.GaussianBlur(img, (7, 7), 0)
      laplacian = cv2.Laplacian(img, cv2.CV_64F)
      lapval = np.std(laplacian)
      if lapval > laplacemax:
        laplacemax = lapval
        stepmax = int(str(os.path.basename(StepNo))[0:3])
  return stepmax

The instantiation of the 2 models and load of the model wegiths, as detailed in the original MediaPipe tutorial notebook.

In [None]:
from pickle import FALSE
# STEP 1: Import the necessary modules.
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
num_blanks = 0
# STEP 2: Create an FaceLandmarker object.
base_options = python.BaseOptions(model_asset_path='face_landmarker_v2_with_blendshapes.task')
options = vision.FaceLandmarkerOptions(base_options=base_options,
                                       output_face_blendshapes=False,
                                       output_facial_transformation_matrixes=False,
                                       num_faces=1)
detector = vision.FaceLandmarker.create_from_options(options)
#face_detection
base_options_face = python.BaseOptions(model_asset_path='detector.tflite')
face_options = vision.FaceDetectorOptions(base_options=base_options_face)
face_detector = vision.FaceDetector.create_from_options(face_options)

Key landmark indices (as defined in MediaPipe Model Documentation [here](https://developers.google.com/mediapipe/solutions/vision/face_landmarker#models)):
- 4: tip of nose
- 8: base of nose
- 9 between eyebrows
- 473: right iris
- 468: left iris
- 0: top of lip (center)
- 17 bottom of lip (center)
- 57: L mouth
- 287: R mouth
- 10: center forehead
- 332: R forehead
- 103: L forehead
- 152: bottom of chin
- 48: L nose
- 278: R nose
- 446: R R eye
- 464: L R eye
- 243: R L eye
- 226: L L eye

The below loop performs the following for each session for each participant:
1. Define empty facial landmark dictionary to store the positions of the 478 facial features
2. Detect the face in the in-focus image using the MediaPipe face detector
3. Crop the image to the bounding box of the face
4. Find the step setting of the camera that corresponds to the most in-focus image using laplace_max_step
5. Detect the facial landmarks on the most in-focus image using MediaPipe landmarker
6. Record all positions, timestamps, and estimate depth to overall face based on calibration of steps to depth (in cm) as calibrated in a separate data collection.
7. Loop through the facial landmark values once all detections are complete to create a temporal-based measurement of net distance traveled across x, y, and z axes.


In [None]:
# tk
import pandas as pd


todo = [30]
#todo = [23,28,30]
done = []

for sb in todo:
  if sb in done:
    continue
  sub = f'e{sb}'
  print(sub)
  path = pth = f'/content/drive/MyDrive/Fatigue Project - Zhang/1-HRC data collection and processing/Data_Collection_Raw/{sub}/1_Facial_Raw/{sub}/{sub}'
  end_pth = '/content/drive/MyDrive/Fatigue Project - Zhang/Results-Dec/'
  for test in ['baseline', 'session1', 'session2', 'session3', 'session4', 'session5', 'session6', 'session7', 'session8']:
    tst_path = f'{path} {test}'
    land_dct = {}
    face_depth = []
    Second = []
    Testname = []
    sub_name = []
    face_wid = 17.465
    mvmt = []
    for i in range(478):
      land_dct[f'feat_{i}_x'] = []
      land_dct[f'feat_{i}_y'] = []
      land_dct[f'feat_{i}_z'] = []
    try:
      os.scandir(tst_path)
    except FileNotFoundError:
      continue
    for minute in os.scandir(tst_path):
      if minute is None:
        continue
      for second in os.scandir(minute):
        if second is None:
          continue
        path2 = tst_path + '/' + str(os.path.basename(minute)) + '/' + str(os.path.basename(second)) + '/'
        #print path so that we can see what image triggered a particular error
        try:
          stepmax = laplace_max_step(path2)
        except UnboundLocalError:
          continue
        pathname_max = os.path.join(path2, str(stepmax) + '.jpg')

        try:
          focused_image = mp.Image.create_from_file(pathname_max)
        except RuntimeError:
          continue

        focused_image2 = cv2.imread(pathname_max, cv2.IMREAD_GRAYSCALE)
        if focused_image is None:
          continue
        detection_result = detector.detect(focused_image)
        (x_max,y_max) = focused_image2.shape
        try:
          rslt = detection_result.face_landmarks[0]
        except IndexError:
          continue
        for i in range(len(rslt)):
          land_dct[f'feat_{i}_x'].append(rslt[i].x * x_max)
          land_dct[f'feat_{i}_y'].append(rslt[i].y * y_max)
          land_dct[f'feat_{i}_z'].append(rslt[i].z * face_wid)

        Second.append((int(os.path.basename(minute))* 60) + int(os.path.basename(second)))
        Testname.append(str(test))
        sub_name.append(sub)




        #converts the above step value into depth in centimeters
        cm_depth = 2.54 * (282.37 - 0.54 * int(stepmax))

        #add to face_depth list
        face_depth.append(cm_depth)


    df_og = pd.DataFrame({'Subject Name': sub_name, 'Test Name': Testname,'Time (s)':Second, 'Face Depth (cm)':face_depth, **land_dct})
    df = df_og.where((df_og['Test Name'] == f'{test}')).dropna().sort_values('Time (s)').set_index('Time (s)')
    lst_xdiff = []
    lst_ydiff = []
    lst_zdiff = []
    lst_xdiff_abs = []
    lst_ydiff_abs = []
    for i in range(478):
      df[f'feat_{i}_ztot'] = -1 * df[f'feat_{i}_z'] + df['Face Depth (cm)']
      #mag = abs(df[f'feat_{i}_x'].diff()) + abs(df[f'feat_{i}_y'].diff()) + abs(df[f'feat_{i}_ztot'].diff())
      mag = abs(df[f'feat_{i}_x'].diff()) + abs(df[f'feat_{i}_y'].diff())
      # df[f'feat_{i}_xydiff'] = mag
      # df[f'feat_{i}_xydiff'].fillna(0, inplace=True)
      df[f'feat_{i}_xdiff'] = df[f'feat_{i}_x'].diff()
      df[f'feat_{i}_xdiff'].fillna(0, inplace=True)
      df[f'feat_{i}_ydiff'] = df[f'feat_{i}_y'].diff()
      df[f'feat_{i}_ydiff'].fillna(0, inplace=True)
      df[f'feat_{i}_zdiff'] = abs(df[f'feat_{i}_ztot'].diff())
      df[f'feat_{i}_zdiff'].fillna(0, inplace=True)

      lst_xdiff.append(np.sum(df[f'feat_{i}_xdiff']))
      lst_ydiff.append(np.sum(df[f'feat_{i}_ydiff']))
      lst_zdiff.append(np.sum(df[f'feat_{i}_zdiff']))

      lst_xdiff_abs.append(np.sum(abs(df[f'feat_{i}_xdiff'])))
      lst_ydiff_abs.append(np.sum(abs(df[f'feat_{i}_ydiff'])))

    df_sums = pd.DataFrame({'Feature Number': range(478),
                      'X Disp': lst_xdiff,
                      'Y Disp': lst_ydiff,
                      'X Abs': lst_xdiff_abs,
                      'Y Abs': lst_ydiff_abs})

    csv_nm = f'{sub}-{test}.csv'
    csv_sumsnm = f'{sub}-{test}-SUMS.csv'
    #os.path.join(f'{end_pth}{sub}_processed/1_Facial_Processed', csv_nm)
    df.to_csv(os.path.join(end_pth, csv_nm))
    df_sums.to_csv(os.path.join(end_pth, csv_sumsnm))