In [1]:
import numpy as np
import os
from tqdm import tqdm
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.linear_model import LogisticRegression as LR
from sklearn.model_selection import StratifiedKFold as KFold
from sklearn.model_selection import train_test_split
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
from sklearn.metrics import classification_report
from sktime.transformations.panel.rocket import MiniRocketMultivariate as minirocket
from sklearn.ensemble import RandomForestClassifier as RF
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegressionCV as LRCV
from sklearn import metrics
from sklearn.preprocessing import StandardScaler

In [2]:
# get relative indices
# manually entered via looking at
#https://raw.githubusercontent.com/google/mediapipe/a908d668c730da128dfa8d9f6bd25d519d006692/mediapipe/modules
#/face_geometry/data/canonical_face_model_uv_visualization.png

lower = [76, 77, 90, 180, 85, 16, 315, 404, 320, 307]

upper = [184, 74, 73, 72, 11, 302, 303, 304, 408, 306]

u2 = [61, 185, 40, 39, 37, 0, 267, 269, 270, 409]
l2 = [291, 375, 321, 405, 314, 17, 84, 181, 91, 146]

u3 = [57, 186, 92, 165, 167, 164, 393, 391, 322, 410]
l3 = [287, 273, 335, 406, 313, 18, 83, 182, 106, 43]

combo_indices = lower + upper + u2 + l2 + u3 + l3

In [3]:
opath = "/Users/nraman/Documents/thesis_videos/"
# make sure to get 
subfolders = [i for i in os.listdir(opath) if ".mp4" not in i and ".DS_Store" not in i and ".MOV" not in i and "test" not in i]

def find_pointfolder(subfolder, path = "/Users/nraman/Documents/thesis_videos/"):
    cur_path = f"{path}{subfolder}"
    # get names of files within the subfolder
    subfiles = os.listdir(f"{path}{subfolder}")
    nps = [i for i in subfiles if ".npy" in i]
    
    if(len(nps) == 1):
        array = np.load(f"{cur_path}/{nps[0]}")
        array = array[:, combo_indices]
        return(array.reshape(array.shape[0], len(combo_indices)*2))

In [4]:
ground_folders = [i for i in subfolders if "ground" in i]
speak_folders = [i for i in subfolders if "ground" not in i]
bad_ones = []

ground_data = []
speak_data = []

for s in speak_folders:
    data = find_pointfolder(s)
    if(type(data) == np.ndarray):
        speak_data.append(data)
    else:
        bad_ones.append(s)



for g in ground_folders:
    ar_init = find_pointfolder(g)
    if(type(ar_init) == np.ndarray):
        ground_data.append(ar_init)
    else:
        bad_ones.append(g)
        
s_lengths = [s.shape[0] for s in speak_data]
g_lengths = [s.shape[0] for s in ground_data]
trainmax = np.max(s_lengths + g_lengths)

In [5]:
testfolders = [i for i in os.listdir(opath) if ".mp4" not in i and ".DS_Store" not in i and ".MOV" not in i and "test" in i]

In [6]:
testground = [i for i in testfolders if "ground" in i]
testspeak = [i for i in testfolders if "ground" not in i]
test_bad = []

test_ground_data = []
test_speak_data = []

for s in testspeak:
    data = find_pointfolder(s)
    if(type(data) == np.ndarray):
        test_speak_data.append(data)
    else:
        test_bad.append(s)

s_lengths = [s.shape[0] for s in test_speak_data]

for g in testground:
    ar_init = find_pointfolder(g)
    if(type(ar_init) == np.ndarray):
        test_ground_data.append(ar_init)
    else:
        test_bad.append(g)
        
ts_lengths = [s.shape[0] for s in test_speak_data]
tg_lengths = [s.shape[0] for s in test_ground_data]
testmax = np.max(ts_lengths + tg_lengths)

maxlen = max([trainmax, testmax])

In [7]:
# each input is list of arrays (each 2d, of dimension #timepoints x #points*2)--ground corresponds to no speech
# get dataframe of dimension #samples x #features
def timeseries_df(groundlist, speaklist, num_features = len(combo_indices)*2, const = 0, pd_mode = True):
    sz = len(groundlist) + len(speaklist)
    numspeak = len(speaklist)
    
    if(pd_mode == True):
        df = pd.DataFrame(np.zeros([sz, num_features])).astype(object)
    else:
        df = np.zeros((sz, num_features, maxlen))
    
    for j in range(int(sz)):
        for a in range(len(combo_indices)*2):
            if(j < numspeak):
                cur = speaklist[j][:, a]
            elif (j >= numspeak):
                it = j - numspeak
                cur = groundlist[it][:, a]
            
            if(pd_mode == True):
                df.iloc[j, a] = np.pad(cur, (0, maxlen - len(cur)), constant_values = (const))
            else:
                df[a, j] = np.pad(cur, (0, maxlen - len(cur)), constant_values = (const))
    
    true = np.concatenate([np.ones(numspeak), np.zeros(sz - numspeak)])
    
    return df, true

In [8]:
train_X, train_y = timeseries_df(ground_data, speak_data)

In [9]:
# # train Rocket embedding
rocket = minirocket()
rocket.fit(train_X)
inter = rocket.transform(train_X)

scaler = StandardScaler().fit(inter)
X_train_transform = scaler.transform(inter)

In [16]:
# # train logistic regression classifier
classifier = LR(max_iter = 10000, class_weight = {0:0.01, 1:1})#SVC(C = 1e-1)
classifier.fit(X_train_transform, train_y)

LogisticRegression(class_weight={0: 0.01, 1: 1}, max_iter=10000)

In [17]:
testX, testy = timeseries_df(test_ground_data, test_speak_data)

In [18]:
preds = classifier.predict(scaler.transform(rocket.transform(testX)))

In [19]:
c = metrics.confusion_matrix(testy, preds)
print("Confusion Matrix: " + str(c))
print("False rejection rate: " + str(c[1, 0]/np.sum(c)))

Confusion Matrix: [[ 9  4]
 [ 4 37]]
False rejection rate: 0.07407407407407407


In [20]:
print(classification_report(testy, preds))

              precision    recall  f1-score   support

         0.0       0.69      0.69      0.69        13
         1.0       0.90      0.90      0.90        41

    accuracy                           0.85        54
   macro avg       0.80      0.80      0.80        54
weighted avg       0.85      0.85      0.85        54

