# Create a dataset that can be used to classify a set of face mesh landmarks as a viseme

expressions to capture
- nothing
- oo
- ee
- ar/ah
- random talking - without exagerating expressions ...
    - this should be ignored by pointer control
    - MAYBE we should remove random talking examples that are classified as 0:nothing?

while recording data
- keep fingers on keyboard
- exagerate expression - unless we're doing nothing/random talking
- change lighing over different capture sessions
- move around slowly
    - up,down,left,right,corners etc
- move forward and backward a bit

In [None]:
#export
import ipywidgets as widgets
import numpy as np
import pandas as pd
import cv2, datetime, time, math, json, shutil
import win32api, win32con

import mediapipe as mp
mp_face_mesh = mp.solutions.face_mesh

from pathlib import Path

In [None]:
#export
def _now(): 
    return datetime.datetime.utcnow().strftime('%Y%m%d%H%M%S')

In [None]:
#export
def _new_capture_metadata(stop_after, path, video_capture, expression_id, comments):
    width, height = [int(video_capture.get(p)) for p in [cv2.CAP_PROP_FRAME_WIDTH, cv2.CAP_PROP_FRAME_HEIGHT]]
    assert width >= height
    return dict(count=0, stop_after=stop_after, path=str(path.resolve()), expression_id=expression_id,
                capture_width=width, capture_height=height, start_date=_now(), comments=comments)

In [None]:
def _setup_variables(expression_id):
    expression_id = str(expression_id)
    with open('../data/viseme-config.json') as f: config = json.load(f)
    if expression_id not in config.get('expressions', {}):
        raise Exception(f'{expression_id} is missing from expressions section of data/viseme-config.json')
    expression_name = config['expressions'][expression_id]
    path = Path(f'../data/viseme_capture_session_{_now()}_{expression_id}')
    path.mkdir(parents=True, exist_ok=True)
    video_capture = cv2.VideoCapture(0)
    width, height = [int(video_capture.get(p)) for p in [cv2.CAP_PROP_FRAME_WIDTH, cv2.CAP_PROP_FRAME_HEIGHT]]
    assert width >= height
    return expression_id, expression_name, path, video_capture, width, height

In [None]:
def _setup_data():
    data = dict(img_path=[])
    for i in range(468):
        for j in ['x','y','z']: 
            data[f'{i}{j}']=[]
    return data

In [None]:
def _update_image(image, image_widget, text):
    image = cv2.putText(image, text, (20,40), cv2.FONT_HERSHEY_COMPLEX, 1, (200,200,200))
    image_widget.value = cv2.imencode('.png', image)[1].tobytes()

In [None]:
def _capture_and_process(video_capture, face_mesh):
    retval, image = video_capture.read() # TODO: check retval
    image = cv2.flip(image, 1)
    return image, face_mesh.process(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))

In [None]:
def _countdown(video_capture, expression_name, image_widget, face_mesh):
    for i in range(3,0,-1):
        image, _ = _capture_and_process(video_capture, face_mesh)
        _update_image(image, image_widget, f'Capture: {expression_name} in {i}s')
        time.sleep(1)

when we `_save_results`
- we want to save some images so we can check why things are being misclassified etc
- so that we don't create too much data, we can
    - save one in 10 images at high quality or
    - save all images at low quality
- if we wanted to be able to re-calculate landmarks (i.e. if mediapipe changed) we might need to save all images at high quality
    - TODO: see if we get the same landmark data from low res images

In [None]:
def _save_results(path, results, data, image, capture_count):
    # save all landmarks calculated
    for landmark_id in range(468):
        landmark = results.multi_face_landmarks[0].landmark[landmark_id]
        for coord in ['x','y','z']: 
            data[f'{landmark_id}{coord}'].append(getattr(landmark, coord))
    # save one in 10 images
#     if capture_count % 10 == 0: 
#         img_name = f'{_now()}_{capture_count}.png'
#         assert cv2.imwrite(f'{path}/{img_name}', image)
#         data['img_path'].append(img_name)
#     else:
#         data['img_path'].append('')
    # but we can save all as low res - using less space than a single png (even at max compression)
    img_name = f'{_now()}_{capture_count}.jpeg'
    assert cv2.imwrite(f'{path}/{img_name}', image, [cv2.IMWRITE_JPEG_QUALITY, 50])
    data['img_path'].append(img_name)

In [None]:
def dry_run():
    video_capture = cv2.VideoCapture(0)
    retval, image = video_capture.read()
    image = cv2.flip(image, 1)
    image_widget = widgets.Image(value=cv2.imencode('.png', image)[1].tobytes())
    display(image_widget)
    while True:
        if win32api.GetAsyncKeyState(win32con.VK_ESCAPE): 
            video_capture.release()
            break
        retval, image = video_capture.read()
        image = cv2.flip(image, 1)
        image_widget.value = cv2.imencode('.png', image)[1].tobytes()
        time.sleep(.05)
    image_widget.close()
    return image

In [None]:
image = dry_run() # press ESC to stop

Image(value=b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x02\x80\x00\x00\x01\xe0\x08\x02\x00\x00\x00\xba\xb3K…

In [None]:
#export
def capture_session(expression_id, stop_after, comments):
    "Run a video capture session"
    expression_id, expression_name, path, video_capture, width, height = _setup_variables(expression_id)
    face_mesh = mp_face_mesh.FaceMesh(max_num_faces=1)
    data = _setup_data()
    image, _ = _capture_and_process(video_capture, face_mesh)
    image_widget = widgets.Image(value=cv2.imencode('.png', image)[1].tobytes())
    display(image_widget)
    try:
        _countdown(video_capture, expression_name, image_widget, face_mesh)
        capture_metadata = _new_capture_metadata(stop_after, path, video_capture, expression_id, comments)
        for capture_count in range(1, stop_after+1):
            image, results = _capture_and_process(video_capture, face_mesh)
            _update_image(image, image_widget, f'{expression_name} {capture_count}')
            _save_results(path, results, data, image, capture_count)
            time.sleep(.05)
        capture_metadata['count'] = capture_count
        capture_metadata['end_data'] = _now()
        with open(path/'capture_metadata.json', 'w') as f: json.dump(capture_metadata, f, indent=2)
        pd.DataFrame(data).to_csv(path/'data.csv', index=False)
    finally:
        video_capture.release()
    return path

In [None]:
path = capture_session(3, 500, 'nearly clean shaven')

Image(value=b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x02\x80\x00\x00\x01\xe0\x08\x02\x00\x00\x00\xba\xb3K…

In [None]:
# pd.read_csv(path/'data.csv')

# clear out an old viseme data

In [None]:
# data_path = Path('../data')
# for path in data_path.glob('viseme_capture_session*'):
#     print('removing', path)
#     shutil.rmtree(path)

# create a single csv of all data.csv files

In [None]:
data_path = Path('../data')
df = pd.DataFrame()
for path in data_path.glob('viseme_capture_session*'):
    print(path, str(path)[-1])
    _df = pd.read_csv(path/'data.csv')
    _df['expression_id']=int(str(path)[-1])
    df = pd.concat([df,_df])
file_name=data_path/f'data_{_now()}.csv'
df.to_csv(file_name, index=False)
print('file_name',file_name)

..\data\viseme_capture_session_20211013101459_0 0
..\data\viseme_capture_session_20211013101550_0 0
..\data\viseme_capture_session_20211013101745_1 1
..\data\viseme_capture_session_20211013101930_1 1
..\data\viseme_capture_session_20211013102156_2 2
..\data\viseme_capture_session_20211013102325_2 2
..\data\viseme_capture_session_20211013102443_3 3
..\data\viseme_capture_session_20211013102614_3 3
..\data\viseme_capture_session_20211014132309_4 4
..\data\viseme_capture_session_20211014132409_4 4
..\data\viseme_capture_session_20211014132511_0 0
..\data\viseme_capture_session_20211014132600_0 0
..\data\viseme_capture_session_20211014132750_1 1
..\data\viseme_capture_session_20211014132840_1 1
..\data\viseme_capture_session_20211014141645_2 2
..\data\viseme_capture_session_20211014141805_2 2
..\data\viseme_capture_session_20211014141927_3 3
..\data\viseme_capture_session_20211014142018_3 3
..\data\viseme_capture_session_20211014142904_4 4
..\data\viseme_capture_session_20211014142958_4 4


In [None]:
df