In [1]:
import mediapipe as mp
import pickle as pkl
import os
import numpy as np
from utilz import *
import cv2
import clip
import torch
from PIL import Image
import pandas
import collections

In [2]:
video_path = '../SIMS/Raw/'
video_ids, clip_ids, texts, annotations, modes = load_data('../SIMS/label.csv')

# mediapipe

In [None]:
MODE = 'ori'
max_len = 10

In [None]:
visual = {'train':[], 'valid':[], 'test':[]}
for video_id, clip_id, mode in zip(video_ids, clip_ids, modes):
    clip_id_ = '000' + str(clip_id)
    file = video_path + str(video_id) + '/' + clip_id_[-4:] + '.mp4' 
    
    if MODE == 'ori':
        cap = cv2.VideoCapture(file)
        images = []
        while cap.isOpened():
            success, image = cap.read()
            if success:
                image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
                images.append(cv2.resize(image, (64,64)))
            else:
                break
        if len(images)>max_len:
            images = images[::int(len(images)/max_len)]
        if len(images)<max_len:
            images = np.concatenate([images, np.zeros((max_len-len(images), 64, 64))], axis=0)
        visual[mode].append(images[:max_len])
        
    elif MODE == 'face':
        mp_face_detection = mp.solutions.face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5) 
        # mp_drawing = mp.solutions.drawing_utils
        cap = cv2.VideoCapture(file)
        faces = []
        while cap.isOpened():
            success, image = cap.read()
            if success:
                img_h, img_w, img_c = image.shape
                image.flags.writeable = False
                image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
                results = mp_face_detection.process(image)
                image = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
                if results.detections:
                    for detection in results.detections:
                        xmin = int(detection.location_data.relative_bounding_box.xmin * img_w)
                        ymin = int(detection.location_data.relative_bounding_box.ymin * img_h)
                        width = int(detection.location_data.relative_bounding_box.width * img_w)
                        height = int(detection.location_data.relative_bounding_box.height * img_w)
                        ymin = max(0, ymin)
                        xmin = max(0,xmin)
                        face = cv2.resize(image[ymin:ymin+width, xmin:xmin+width], (28,28))
            else:
                break         
        if len(faces)>max_len:
            faces = faces[::int(len(faces)/max_len)]
        if len(faces)<max_len:
            try:
                faces = np.concatenate([faces, np.zeros((max_len-len(faces), 28, 28))], axis=0)
            except:
                faces = np.zeros((max_len, 28, 28))
        visual[mode].append(faces[:max_len])
        
save_features(visual, './data/visual_mpori.pkl')

# OpenFace

In [None]:
for video_id, clip_id, mode in zip(video_ids, clip_ids, modes):
    clip_id_ = '000' + str(clip_id)
    file = video_path + str(video_id) + '/' + clip_id_[-4:] + '.mp4' 
    os.system('../../../OpenFace/build/bin/FaceLandmarkVidMulti -f {} -out_dir ./data/OFprocessed/{}/{}/'.format(file, str(video_id), str(clip_id)))

In [None]:
FAU_list = OP_para()

feature_path = './data/OFprocessed/'
FAU_features = {'train':[], 'valid':[], 'test':[]}

max_len = 30

for video_id, clip_id, mode in zip(video_ids, clip_ids, modes):
    clip_id_ = '000' + str(clip_id)
    file =  feature_path + str(video_id) + '/' + str(clip_id) + '/' + clip_id_[-4:] + '.csv' 
    
    try:
        file_ct = pandas.read_csv(file)
        dominant_face = list(collections.Counter(list(file_ct['face_id'])).keys())[0]

        features = []
        for i in range(len(file_ct['frame'])):
            if file_ct['face_id'][i]==dominant_face and file_ct['success'][i]==1:
                fau_ft = []
                for fau in FAU_list:
                    fau_ft.append(file_ct[fau][i])
            features.append(fau_ft)
            
    except:
        print("no faces", file)
        features.append(np.zeros((max_len, 49)))
                        
    if len(features)>max_len:
        features = features[::int(len(features)/max_len)]
    if len(features)<max_len:
        features = np.concatenate([features, np.zeros((max_len-len(features), 49))], axis=0)
        
    FAU_features[mode].append(features[:max_len])
    
save_features(FAU_features, './data/visual_OFfts.pkl')

# CLIP

In [4]:
# https://github.com/openai/CLIP
device = "cuda" if torch.cuda.is_available() else "cpu"
model, preprocess = clip.load("ViT-B/32", device=device)

In [7]:
max_len = 10

visual = {'train':[], 'valid':[], 'test':[]}

with torch.no_grad():
    for video_id, clip_id, mode in zip(video_ids, clip_ids, modes):
        clip_id_ = '000' + str(clip_id)
        file = video_path + str(video_id) + '/' + clip_id_[-4:] + '.mp4' 

        cap = cv2.VideoCapture(file)
        image_features = []
        while cap.isOpened():
            success, image = cap.read()
            if success:
                image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
                image_ = preprocess(Image.fromarray(np.uint8(image))).unsqueeze(0).to(device)
                image_ft = model.encode_image(image_).cpu().detach().numpy()[0]
                image_features.append(image_ft)
            else:
                break
                
        if len(image_features)>max_len:
            image_features = image_features[::int(len(image_features)/max_len)]
        if len(image_features)<max_len:
            image_features = np.concatenate([image_features, np.zeros((max_len-len(image_features), 512))], axis=0)
        
        visual[mode].append(image_features[:max_len])
    
save_features(visual, './data/visual_clip.pkl')

In [8]:
np.shape(visual['train'])

(1368, 10, 512)