In [1]:
import pathlib
import numpy as np

from blink_detector.blink_detector import (
    calculate_optical_flow, 
    predict_class_probas, 
    smooth_probas, 
    threshold_probas, 
    compile_into_events, 
    filter_events, 
    extract_blink_events,
)

from blink_detector.helper import (
    preprocess_frames, 
    get_video_frames, 
    get_timestamps,
    OfParams,
    PPParams,
    create_grid,
    get_classifier,
)

  from pandas import MultiIndex, Int64Index


**Run blink detection pipeline with example Neon recording**

Load example recording

In [2]:
recording_path = pathlib.Path("/users/tom/experiments/neon_blink_detection/datasets/train_data/padel_tennis_neon_03-2ded8f56")

left_eye_images, right_eye_images = get_video_frames(recording_path)

timestamps = get_timestamps(recording_path)

left_eye_images, right_eye_images = preprocess_frames(left_eye_images, right_eye_images, is_neon=True)

clf = get_classifier(is_neon=True)


**Define optical flow and postprocessing parameters** <br>
Take default parameters for both

In [None]:
of_params = OfParams()
pp_params = PPParams()
grid = create_grid(of_params.img_shape, of_params.grid_size)

**Predict blink events from video frames**

In [6]:
images_timestamps = zip(zip(left_eye_images, right_eye_images), timestamps)

x = calculate_optical_flow(images_timestamps, of_params, grid)
x = predict_class_probas(x, clf, of_params)
x = smooth_probas(x, pp_params)
x = threshold_probas(x, pp_params)
x = compile_into_events(x)
x = filter_events(x)

blink_events = list(extract_blink_events(x, pp_params))

**Visualize blink events**

**Using the real-time API**

In [10]:
from pupil_labs.realtime_api.simple import discover_one_device
from blink_detector.blink_detector import blink_detection_pipeline
from itertools import tee
import time
import nest_asyncio
import cv2

**Set up real time API**

In [None]:
# needed when running in notebook
nest_asyncio.apply()

# calling the two functions too close together seems to cause a crash, therefore we wait a second
time.sleep(1)

device = discover_one_device()

# check if correct device is connected
print(f"Phone IP address: {device.phone_ip}")
print(f"Phone name: {device.phone_name}")

In [None]:
# stream and resize frames from realtime API
def video_steam(device):
    while True:
        bgr_pixels, frame_datetime = device.receive_eyes_video_frame()

        left_images = cv2.resize(bgr_pixels[:, :192, 0], (64, 64), interpolation=3)
        right_images = cv2.resize(bgr_pixels[:, 192:, 0], (64, 64), interpolation=3)

        yield left_images, right_images, frame_datetime

In [None]:
# create three copies of the video stream generator and extract the left and right images and timestamps from each
sl, sr, st = tee(video_steam(device),3)
frames_left = (l for l, r, t in sl)
frames_right = (r for l, r, t in sr)
timestamps = (1e9*t for l, r, t in st)        

# run the blink detection pipeline and obtain blink events in quasi real time
for blinks in blink_detection_pipeline(frames_left, frames_right, timestamps, clf_path=clf_path):
    print(blinks)

    