In [1]:
import pickle
from os import listdir, getcwd
from os.path import dirname, abspath, join
import cv2
import numpy as np

In [2]:
data = {}
data_dir = abspath(dirname(getcwd()))
for f in listdir(data_dir):
    if '.pickle' in f:
        data_path = join(data_dir, f)
        with open(data_path, 'rb') as fp:
            data[data_path.split('/')[-1].split('.')[0]] = pickle.load(fp)

In [3]:


def data_to_numpy(data: list) -> np.array:
    arr = np.empty((len(data),len(data[0]), len(data[0][0])), np.float64)
    for i in range(len(data)):
        for j in range(len(data[0])):
            arr[i, j, :] = [data[i][j]['x'], data[i][j]['y'], data[i][j]['z']]
    return arr

def standardize_data(data, axis_, center=True, scale=True):
    shape_ = list(data.shape)
    shape_[axis_] = 1
        
    if center:
        data = data - data.mean(axis=axis_).reshape(*shape_)
    if scale:
        data = data - data.min(axis=axis_).reshape(*shape_)
        data = data / (data.max(axis=axis_) - data.min(axis=axis_)).reshape(*shape_)

    return data

def to_sequential(data: np.ndarray, seq_length: int=10, axis: int=1) -> np.ndarray:
    '''Transforms to sequential data

    Parameters
    ----------
    data: np.ndarray
        The data to be processed
    seq_length: int
        The total number of consequent samples that would be used
        to generate the sequential data
    axis: int
        The nex axis where the sequences would be put in

    Returns
    -------
        np.ndarray
        The transformed sequential data

    '''

    shape_ = tuple(val - seq_length + 1 if i == 0 else val for i, val in enumerate(data.shape))
    shape_ = shape_[:axis] + (seq_length,) + shape_[axis:]
    seq_data = np.zeros(shape_)
    for i in range(data.shape[0]-seq_length+1):
        seq_data[i, :, :, :] = data[i:i+seq_length, :, :]
    return seq_data

def generate_data(data, type_='all', is_sequential=False):

    final_data = []
    if type_ == 'l':
        arr_l = np.concatenate((data_to_numpy(data['l_s']), data_to_numpy(data['l_ns'])), axis=0)
        arr_nl = np.concatenate((data_to_numpy(data['nl_s']), data_to_numpy(data['nl_ns'])), axis=0)
        if is_sequential:
            arr_l = to_sequential(arr_l)
            arr_nl = to_sequential(arr_nl)
        y = np.concatenate((np.zeros(arr_l.shape[0]), np.ones(arr_nl.shape[0])), axis=0)

        final_data = [np.concatenate((arr_l, arr_nl), axis=0), y, ['looking', 'not_looking']]
    
    elif type_ == 's':
        arr_s = np.concatenate((data_to_numpy(data['l_s']), data_to_numpy(data['nl_s'])), axis=0)
        arr_ns = np.concatenate((data_to_numpy(data['l_ns']), data_to_numpy(data['nl_ns'])), axis=0)
        if is_sequential:
            arr_s = to_sequential(arr_s)
            arr_ns = to_sequential(arr_ns)
        y = np.concatenate((np.zeros(arr_s.shape[0]), np.ones(arr_ns.shape[0])), axis=0)
        
        final_data = [np.concatenate((arr_s, arr_ns), axis=0), y, ['speaking', 'not_speaking']]
    
    elif type_ == 'all':
        l_s = data_to_numpy(data['l_s'])
        nl_s = data_to_numpy(data['nl_s'])
        l_ns = data_to_numpy(data['l_ns'])
        nl_ns = data_to_numpy(data['nl_ns'])
        if is_sequential:
            l_s = to_sequential(l_s)
            nl_s = to_sequential(nl_s)
            l_ns = to_sequential(l_ns)
            nl_ns = to_sequential(nl_ns)
        X = np.concatenate((l_s, nl_s, l_ns, nl_ns), axis=0)
        y = np.concatenate((np.zeros(l_s.shape[0]), np.ones(nl_s.shape[0]), np.ones(l_ns.shape[0])*2, np.ones(nl_ns.shape[0])*3), axis=0)

        final_data = [X, y, ['looking_speaking', 'not_looking_speaking', 'looking_not_speaking', 'not_looking_not_speaking']]
    
    return final_data

In [4]:
seq_length = 10
is_sequential = False
is_pca = True

In [5]:
arr_s = np.concatenate((data_to_numpy(data['l_s']), data_to_numpy(data['nl_s'])), axis=0)
arr_ns = np.concatenate((data_to_numpy(data['l_ns']), data_to_numpy(data['nl_ns'])), axis=0)

In [6]:
X, y, label_text = generate_data(data, type_='s', is_sequential=is_sequential)

In [8]:
upper_lip = [185, 184, 183, 191, 
             40, 74, 42, 80,
             39, 73, 41, 81, 
             37, 72, 38, 82, 
             0, 11, 12, 13, 
             267, 302, 208, 312,
             269, 303, 271, 311,
             270, 304, 272, 310,
             409, 408, 407, 415]

lower_lip = [146, 77, 96, 95,
             91, 90, 89, 88,
             181, 180, 179, 178,
             84, 85, 86, 87,
             17, 16, 15, 14,
             314, 315, 316, 317,
             405, 404, 403, 402,
             321, 320, 319, 318,
             375, 307, 325, 324]

In [45]:
len(upper_lip + lower_lip)

72

In [8]:
# X_lip = X[:, upper_lip, :] - X[:, lower_lip, :]
X_lip = X[:, upper_lip + lower_lip, ]
X_lip = standardize_data(X_lip, axis_=1)

In [9]:
from sklearn.model_selection import train_test_split
X_lip = X_lip.reshape(X_lip.shape[0], -1)
X_train, X_test, y_train, y_test = train_test_split(X_lip, y, test_size=0.2, random_state=42)

from sklearn import svm
from sklearn.metrics import accuracy_score

from sklearn.neighbors import KNeighborsClassifier
# from sklearn.neighbors import RadiusNeighborsClassifier
from sklearn.neural_network import MLPClassifier
# clf = MLPClassifier(hidden_layer_sizes=400, verbose=True, random_state=1, max_iter=50, tol=0.001).fit(X_train, y_train)

from sklearn.naive_bayes import GaussianNB
# clf = GaussianNB()
clf = KNeighborsClassifier(n_neighbors=3)
# clf = RadiusNeighborsClassifier(radius=0.1)
# clf = KDTree(X, leaf_size=2)
# clf = svm.SVC(kernel='linear')
# clf = AdaBoostClassifier(n_estimators=150, random_state=0)
# clf = GradientBoostingClassifier(n_estimators=100, learning_rate=1.0,
    # max_depth=1, random_state=0)
clf.fit(X_train, y_train)
y_pred = clf.predict(X_test)

accuracy_score(y_test, y_pred)

0.9312210200927357

In [10]:
X_lip.shape

(6469, 216)

In [11]:
from sklearn.decomposition import PCA

X_dec = np.copy(X_lip.reshape(X_lip.shape[0], -1))
pca = PCA(n_components=50)
X_dec = pca.fit_transform(X_dec)

In [12]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X_dec, y, test_size=0.5, random_state=42)

from sklearn import svm
from sklearn.metrics import accuracy_score

from sklearn.neighbors import KNeighborsClassifier
# from sklearn.neighbors import RadiusNeighborsClassifier
        # sp_cls[0] = 1 if np.sum(np.abs(s[0, 13] - s[0, 14])) > 0.025 else 0
        # txt = 'speaking' if np.sum(sp_cls)/2 >= 0.5 else 'not speaking'
from sklearn.neural_network import MLPClassifier
# clf = MLPClassifier(hidden_layer_sizes=400, verbose=True, random_state=1, max_iter=50, tol=0.001).fit(X_train, y_train)

from sklearn.naive_bayes import GaussianNB
from sklearn.ensemble import AdaBoostClassifier
from sklearn.ensemble import GradientBoostingClassifier

# clf = AdaBoostClassifier(n_estimators=150, random_state=0)
# clf = GradientBoostingClassifier(n_estimators=100, learning_rate=1.0,
#     max_depth=1, random_state=0)
# clf = GaussianNB()
clf = KNeighborsClassifier(n_neighbors=3)
# clf = RadiusNeighborsClassifier(radius=0.1)
# clf = KDTree(X, leaf_size=2)
# clf = svm.SVC(kernel='linear')
clf.fit(X_train, y_train)
y_pred = clf.predict(X_test)

accuracy_score(y_test, y_pred)

0.9230293663060278

In [6]:
import mediapipe as mp
from protobuf_to_dict import protobuf_to_dict

mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles
mp_face_mesh = mp.solutions.face_mesh

In [11]:
import cv2
import numpy as np

# Create a VideoCapture object and read from input file
# If the input is the camera, pass 0 instead of the video file name
# ft = cv2.freetype.createFreeType2()
# ft.loadFontData(fontFileName='fonts/JetBrainsMono-Medium.ttf',id=0)
record_video = False
seq_length = 30
weight_func = 'constant'
weigh_val = None
if weight_func == 'logspace':
  weight_val = np.logspace(0.1, 1, num=seq_length, base=2.0)/np.sum(np.logspace(0.1, 10, num=seq_length, base=2.0))
elif weight_func == 'linear':
    weight_val = np.arange(1, seq_length+1)/np.sum(np.arange(1, seq_length + 1))
else:
  weigh_val = np.ones(seq_length)/np.sum(seq_length)

confidence_weight = np.arange(1, seq_length+1)/np.sum(np.arange(1, seq_length + 1))
sp_cls = np.zeros(seq_length)
max_num_faces=1
refine_landmarks=True
min_detection_confidence=0.5
min_tracking_confidence=0.5
conf_analysis = {
  'speaking': {
    'count': 0,
    'conf': []
  },
    'not_speaking': {
    'count': 0,
    'conf': []
  }
}
with mp_face_mesh.FaceMesh(
        max_num_faces=max_num_faces,
        refine_landmarks=refine_landmarks,
        min_detection_confidence=min_detection_confidence,
        min_tracking_confidence=min_tracking_confidence,
    ) as face_mesh:
    cap = cv2.VideoCapture('vid2.mp4')

    # Check if camera opened successfully
    if (cap.isOpened()== False):
      print("Error opening video stream or file")

    if record_video:
      # Read until video is completed
      frame_width = int(cap.get(3))
      frame_height = int(cap.get(4))
        
      size = (frame_width, frame_height)
      out = cv2.VideoWriter('output_videos/is_speaking.avi', 
                          cv2.VideoWriter_fourcc(*'MJPG'),
                          10, size)
    count = 0

    while(cap.isOpened()):
      # Capture frame-by-frame
      ret, frame = cap.read()
      if ret == True:
        # To improve performance, optionally mark the image as not writeable to
        # pass by reference.
        # image.flags.writeable = False
        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = face_mesh.process(image)

        # Draw the face mesh annotations on the image.
        # image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        if results.multi_face_landmarks:
            for face_landmarks in results.multi_face_landmarks:
                # add facelandmarks to the keypoints listimage
                keypoint = protobuf_to_dict(face_landmarks)["landmark"]
        else:
          continue

        s = np.array([[i['x'], i['y'], i['z']] for i in keypoint])
        s = np.expand_dims(s, axis=0)
        # s = s[:, upper_lip + lower_lip, :]
        # s = s[:, upper_lip, :] - s[:, lower_lip, :]
        # s = standardize_data(s, axis_=1)
        # if is_sequential:
        #   if count == 0:
        #     seq_frames = s
        #     count += 1
        #     continue
        #   elif count < seq_length - 1:
        #     seq_frames = np.concatenate((seq_frames, s), axis=0)
        #     count += 1
        #     continue
        #   elif count == seq_length - 1:
        #     seq_frames = np.concatenate((seq_frames, s), axis=0)
        #     count += 1
        #   else:
        #     seq_frames = np.roll(seq_frames, -1, axis=0)
        #     seq_frames[seq_length-1] = s
        # else:
        #   inference_frame = s.reshape(1, -1)


        # break
        # if is_sequential:
        #   inference_frame = np.expand_dims(seq_frames, axis=0)
        #   inference_frame = standardize_data(inference_frame, axis_=2)
        #   inference_frame = inference_frame.reshape(1, -1)

        #   # print(inference_frame.shape)
        #   # break
        # else:
        #   inference_frame = standardize_data(inference_frame, axis_=1)
        #   inference_frame = inference_frame.reshape(1, -1)

        # inference_frame = pca.transform(inference_frame) if is_pca else inference_frame

        # y_pred = clf.predict(inference_frame)
        # print(label_text[int(y_pred)])
        # print(f'diff: {s[]}')
        sp_cls = np.roll(sp_cls, -1, axis=0)
        lip_distance = np.abs(s[0, upper_lip, 1] - s[0, lower_lip, 1])
        lip_distance = lip_distance/(lip_distance.max() - lip_distance.min())
        sp_cls[seq_length - 1] = 1 if np.sum(lip_distance) > 5 else 0
        # decision_val = np.sum(sp_cls * weight_val)
        # decision_val = 'speaking' if np.sum(sp_cls * weight_val) >= 0.0003 and sp_cls[-seq_length//2:].sum() == seq_length//2 else 'not speaking'
        decision_val = 'speaking' if np.sum(sp_cls * weight_val) >= 0.006 else 'not speaking' # better result
        # print(sp_cls)
        diff = np.sum(np.abs(s[0, upper_lip, 1] - s[0, lower_lip, 1]))
        # txt = f'Diff is: {diff}\nDecision Val: {decision_val}'
        # if weight_func == 'logspace':
        #   decision_val = np.sum(sp_cls * np.logspace(0.1, 10, num=seq_length, base=2.0))/2
        # elif weight_func == 'linear':
        #   decision_val = np.sum(sp_cls * np.logspace(0.1, 10, num=seq_length, base=2.0))/2
        # else:
        #   sp_cls[0] = 1 if np.sum(np.abs(s[0, upper_lip] - s[0, lower_lip])) > 0.45 else 0
          
        # txt = 'speaking' if np.sum(sp_cls)/2 >= 0.5 else 'not speaking'
        # txt = str(np.sum(np.abs(s[0, upper_lip] - s[0, lower_lip])))
        
        cv2.rectangle(image, (40, 30), (400, 150), (0,0,0), -1)

        window_name = 'Image'
  
        # font
        font = cv2.FONT_HERSHEY_SIMPLEX
        # org
        org = (50, 50)
          
        # fontScale
        fontScale = 0.5
          
        # Blue color in BGR
        color = (255, 0, 0)
          
        # Line thickness of 2 px
        thickness = 1
        confidence = np.sum(sp_cls * confidence_weight)
        
        if decision_val == 'speaking':
          conf_analysis['speaking']['count'] += 1
          conf_analysis['speaking']['conf'].append(confidence)
        else:
          conf_analysis['not_speaking']['count'] += 1
          conf_analysis['not_speaking']['conf'].append(confidence)
        
        # Using cv2.putText() method
        # image = cv2.putText(image, label_text[int(y_pred)], org, font, fontScale, color, thickness, cv2.LINE_AA)
        color_ = (0,255,0) if lip_distance.sum() > 5.0 else (0,0,255)
        image = cv2.putText(image, f'Cumulative Lip Distance: {lip_distance.sum():.5f}', org, font, fontScale, color_, thickness, cv2.LINE_AA)
        color_ = (0,255,0) if np.sum(sp_cls * weight_val) > 0.010 else (0,0,255)
        image = cv2.putText(image, f'Cumulative Sequence Distance: {np.sum(sp_cls * weight_val):.5f}', (50, 80), font, fontScale, color_, thickness, cv2.LINE_AA)
        image = cv2.putText(image, f'User is: {decision_val}', (50, 110), font, fontScale, color_, thickness, cv2.LINE_AA)
        image = cv2.putText(image, f'Confidence is: {confidence*100:.2f}%', (50, 140), font, fontScale, color_, thickness, cv2.LINE_AA)


        # print(label_text[int(y_pred)])
        # break
        # Display the resulting frame
        if record_video:
          out.write(image)
        cv2.imshow('Frame',image)

        # Press Q on keyboard to  exit
        if cv2.waitKey(25) & 0xFF == ord('q'):
          break

      # Break the loop
      else: 
        break

    # When everything done, release the video capture object
    cap.release()
    if record_video:
      out.release()

# Closes all the frames
cv2.destroyAllWindows()

QObject::moveToThread: Current thread (0x56125e748020) is not the object's thread (0x56125e02c210).
Cannot move to target thread (0x56125e748020)

QObject::moveToThread: Current thread (0x56125e748020) is not the object's thread (0x56125e02c210).
Cannot move to target thread (0x56125e748020)

QObject::moveToThread: Current thread (0x56125e748020) is not the object's thread (0x56125e02c210).
Cannot move to target thread (0x56125e748020)

QObject::moveToThread: Current thread (0x56125e748020) is not the object's thread (0x56125e02c210).
Cannot move to target thread (0x56125e748020)

QObject::moveToThread: Current thread (0x56125e748020) is not the object's thread (0x56125e02c210).
Cannot move to target thread (0x56125e748020)

QObject::moveToThread: Current thread (0x56125e748020) is not the object's thread (0x56125e02c210).
Cannot move to target thread (0x56125e748020)

QObject::moveToThread: Current thread (0x56125e748020) is not the object's thread (0x56125e02c210).
Cannot move to tar

In [16]:
sp = np.array(conf_analysis['speaking']['conf'])
nsp = np.array(conf_analysis['not_speaking']['conf'])

In [25]:
sp.mean()

0.8827943892184983