In [None]:
!pip3 install ultralytics gdown

In [4]:
import os
from pathlib import Path
from ultralytics import YOLO
import cv2
import numpy as np
import pandas as pd
from tqdm import tqdm

# Unzip Videos into LeftVideo and RightVideo folders

In [None]:
# care that original zip files are named slightly differently
!mkdir -p LeftVideo
!unzip Videos_LeftCamera.zip -d LeftVideo
!mkdir -p RightVideo
!unzip Video_RightCamera.zip -d RightVideo

# Split videos into frames

In [None]:
# jpg files would inherit Right/Left marker from video so we can put them together
!mkdir -p raw_images/
!cd LeftVideo && for FILE_NAME in *.avi; do ffmpeg -i "$FILE_NAME" "${FILE_NAME%.*}"%05d.jpg -hide_banner; done
!mv LeftVideo/*.jpg raw_images/
!cd RightVideo && for FILE_NAME in *.avi; do ffmpeg -i "$FILE_NAME" "${FILE_NAME%.*}"%05d.jpg -hide_banner; done
!mv RightVideo/*.jpg raw_images/

# Crop faces from frames

In [35]:
# yolo weights from https://github.com/derronqi/yolov8-face/tree/main
!gdown "https://drive.google.com/uc?id=1qcr9DbgsX3ryrz2uU8w4Xm3cOrRywXqb"

Downloading...
From: https://drive.google.com/uc?id=1qcr9DbgsX3ryrz2uU8w4Xm3cOrRywXqb
To: /data2/disfa/yolov8n-face.pt
100%|██████████████████████████████████████| 6.39M/6.39M [00:00<00:00, 39.6MB/s]


In [29]:
os.chdir('/data2/disfa/')
!mkdir -p cropped_jpgs
!ls

cropped_jpgs	  disfa_labels_test.csv   left_train.csv
crops		  disfa_labels_train.csv  left_val.csv
disfa_labels.csv  left_all.csv		  raw_components


In [None]:
def crop_or_pad(orig, bbox):
    max_side = max(bbox[3] - bbox[1], bbox[2] - bbox[0])
    bbox_xc, bbox_yc = (bbox[2] + bbox[0]) / 2, (bbox[3] + bbox[1]) / 2,

    orig_h, orig_w, _ = orig.shape
    on_orig_x1 = int(max(bbox_xc - np.floor(max_side / 2), 0))
    on_orig_x2 = int(min(bbox_xc + np.ceil(max_side / 2), orig_w))
    on_orig_y1 = int(max(bbox_yc - np.floor(max_side / 2), 0))
    on_orig_y2 = int(min(bbox_yc + np.ceil(max_side / 2), orig_h))

    pad = np.zeros((max_side, max_side, 3), dtype=np.uint8)

    on_pad_x1 = max_side // 2 - (on_orig_x2 - on_orig_x1) // 2
    on_pad_x2 = on_pad_x1 + on_orig_x2 - on_orig_x1
    on_pad_y1 = max_side // 2 - (on_orig_y2 - on_orig_y1) // 2
    on_pad_y2 = on_pad_y1 + on_orig_y2 - on_orig_y1

    pad[on_pad_y1:on_pad_y2, on_pad_x1:on_pad_x2, :] = orig[on_orig_y1:on_orig_y2, on_orig_x1:on_orig_x2, :].copy()
    return pad

root_path = Path('.')
raw_path = root_path/'raw_components/LeftVideo/jpg' # where are raw images
img_paths = [raw_path/f for f in os.listdir(raw_path)]
print(img_paths)
crops_path = Path('cropped_images') # where to put crops

crops_min_size = 1
crops_max_size = 300
crops_conf_thresh = 0.6

weight_path = "yolov8n-face.pt"
model = YOLO(weight_path)

meta = []

for img_path in tqdm(img_paths):
    img = cv2.imread(img_path.as_posix())[:,:,::-1]
    if img is None:
        continue

    results = model.predict(img, verbose=False, show=False, conf=0.25)[0]

    for result_idx, result in enumerate(results):
        xc, yc, w, h = [int(n) for n in result.boxes.xywh.tolist()[0]]
        x1, y1, x2, y2 = [int(n) for n in result.boxes.xyxy.tolist()[0]]
        confidence = result.boxes.conf.tolist()[0]

        if confidence < crops_conf_thresh:
            continue
        if result.keypoints is None:
            print(img_path, None)
            continue
        left_eye, left_eye_conf = result.keypoints.xy[0][0].cpu().numpy(), result.keypoints.conf[0][0]
        right_eye, right_eye_conf = result.keypoints.xy[0][1].cpu().numpy(), result.keypoints.conf[0][1]

        if left_eye_conf > 0.5 and right_eye_conf > 0.5:
            dy = right_eye[1] - left_eye[1]
            dx = right_eye[0] - left_eye[0]
            angle = np.degrees(np.arctan2(dy, dx))
            rot_mat = cv2.getRotationMatrix2D(center=(xc, yc), angle=angle, scale=1.)
            img_rot = cv2.warpAffine(img, rot_mat, (img.shape[1], img.shape[0]), flags=cv2.INTER_LINEAR)

            crop = crop_or_pad(img_rot, [x1, y1, x2, y2])

            crop_size, _, _ = crop.shape
            if crop_size < crops_min_size:
                continue
            elif crop_size > crops_max_size:
                crop = cv2.resize(crop, (crops_max_size, crops_max_size), interpolation=cv2.INTER_LINEAR)

            cv2.imwrite(crops_path / img_path.name, crop[:,:,::-1])

            meta.append({"raw_name": f"{raw_path}/{img_path.name}",
                         "bbox_crop": [x1, y1, x2, y2]})

meta_df = pd.DataFrame(meta)

IOPub data rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)

 10%|███████████████▎                                                                                                                                        | 13217/130814 [10:19<1:31:16, 21.47it/s]

### Do the same for RightVideo, merge jpg files into one folder

# Prepare labels

In [51]:
label_path = Path('/data2/disfa/raw_components/ActionUnit_Labels')
res = []
for movie in os.listdir(label_path):
    for txt in os.listdir((label_path/movie).as_posix()):
        with open((label_path/movie/txt).as_posix(), 'r') as f:
            for l in f.readlines():
                res.append([movie, txt.split('.')[0].split('_')[1].replace('au',''), *l.strip().split(',')])

In [52]:
df = pd.DataFrame(res)
df.columns = ['video', 'au', 'frame', 'value']

In [53]:
df = df.pivot_table(values='value', index=['video', 'frame'], columns='au', aggfunc='first').reset_index()
df.columns = [x if x in ['video', 'frame'] else f'AU{x}' for x in df.columns]

In [54]:
# here we match jpgs with labels
df_tmp = df.copy()
df['filename'] = df.apply(lambda r: f'LeftVideo{r.video}_comp{int(r.frame):05d}.jpg', axis=1)
df_tmp['filename'] = df_tmp.apply(lambda r: f'RightVideo{r.video}_comp{int(r.frame):05d}.jpg', axis=1)
df = pd.concat([df, df_tmp]).reset_index()
del df_tmp

In [55]:
df.head()

Unnamed: 0,index,video,frame,AU1,AU12,AU15,AU17,AU2,AU20,AU25,AU26,AU4,AU5,AU6,AU9,filename
0,0,SN001,1,0,2,0,0,0,0,0,0,0,0,0,0,LeftVideoSN001_comp00001.jpg
1,1,SN001,10,0,2,0,0,0,0,0,0,0,0,0,0,LeftVideoSN001_comp00010.jpg
2,2,SN001,100,0,2,0,0,0,0,0,0,0,0,0,0,LeftVideoSN001_comp00100.jpg
3,3,SN001,1000,0,0,0,0,0,0,0,0,0,0,0,0,LeftVideoSN001_comp01000.jpg
4,4,SN001,1001,0,0,0,0,0,0,0,0,0,0,0,0,LeftVideoSN001_comp01001.jpg


In [56]:
# movie, frame, filename, AU1, AU2, ...
df.to_csv('disfa_labels.csv', index=False)

# Split into train/test by video

alternatively you can train on the whole dataset and test on another dataset

In [57]:
videos = df.video.unique()
np.random.seed(1337)
test_videos = np.random.choice(videos, size=int(len(videos)/5))
df[~df['video'].isin(test_videos)].to_csv('disfa_labels_train.csv', index=False)
df[df['video'].isin(test_videos)].to_csv('disfa_labels_test.csv', index=False)


In [59]:
df_proc_tmp = df[df.filename.isin(os.listdir('cropped_images'))].iloc[:100]
df_proc_tmp.shape

(100, 16)

In [61]:
df_proc_tmp[~df_proc_tmp['video'].isin(test_videos)].to_csv('df_proc_tmp_train.csv', index=False)
df_proc_tmp[df_proc_tmp['video'].isin(test_videos)].to_csv('df_proc_tmp_test.csv', index=False)
