In [1]:
from ultraleap_demo.training_history import *
from ultraleap_demo.load_demo import *

Importing the dtw module. When using in academic works please cite:
  T. Giorgino. Computing and Visualizing Dynamic Time Warping Alignments in R: The dtw Package.
  J. Stat. Soft., doi:10.18637/jss.v031.i07.



In [3]:
# Model Parameters
d_model = 512
nhead = 8
num_layers = 6
dropout = 0.3

# Load the models
timeseries_models = {
    16: {
        1: None,
        4: None,
        8: None,
        12: None
    },
    32: {
        1: None,
        8: None,
        16: None,
        24: None
    }
}
for model_file in MODEL_FILES:
    keyname = model_file.split("\\")[-1].split("step_")[-1].split("_batch")[0]
    sequence_length = int(keyname.split("sequence_")[-1].split("_")[0])
    output_window = int(keyname.split("output_")[-1].split("_")[0])
    moving_ntokens = ntokens[sequence_length][output_window]["moving"]
    palm_ntokens = ntokens[sequence_length][output_window]["palm"]
    hand_ntokens = ntokens[sequence_length][output_window]["hand"]
    feature_ntokens = [moving_ntokens, palm_ntokens, hand_ntokens]

    model = TransformerModel(len(feature_ntokens), feature_ntokens, d_model, nhead, num_layers, max_len = sequence_length)
    model.load_state_dict(torch.load(model_file))
    model.eval()

    timeseries_models[sequence_length][output_window] = model

In [4]:
all_models = {"timeseries":timeseries_models, "classifier":classifier_models}

In [None]:
def normalize_sequence(sequence, target_length):
    normalized_sequence = []
    sequence_length = len(sequence)
    
    for i in range(target_length):
        index = int((i / target_length) * sequence_length)
        normalized_sequence.append(sequence[index])
    
    return np.array(normalized_sequence)


def get_predict_sequence(mapped_moving_directions, mapped_palm_orientations, mapped_similarity_states, sequence_length, output_window):
    combined = combine_mapped_separated_sequences([mapped_moving_directions], [mapped_palm_orientations], [mapped_similarity_states])[0][output_window-sequence_length:]
    to_predict_sequence = make_predict_frame_sequence(combined, sequence_length, output_window)
    return to_predict_sequence

### Add every n frame

In [None]:
from IPython.display import clear_output
from dtw import dtw
import keyboard
import copy

import leap
import time

orientation_mapping = {'up': 0, 'down': 1, 'opposite': 2}
inverted_orientation_mapping = {v: k for k, v in orientation_mapping.items()}
direction_mapping_2d = {'up': 0, 'down': 1, 'left': 2, 'right': 3, 'stationary': 4}
inverted_direction_mapping_2d = {v: k for k, v in direction_mapping_2d.items()}

gesture_mapped_names = {
    "gesture_7":"Swipe Right",
    "gesture_8":"Swipe Left",
    "gesture_9":"Swipe Up",
    "gesture_10":"Swipe Down",
}

top_dirs = 3
window_size = 7 # Original data used 30 fps camera, Leap 2 uses 120 fps camera so we need to multiply by 4
stationary_threshold_ratio = 1.5
moving_percentage = 0.45
# moving_direction_indexes = [0, 1, 2, 6, 18, 21, 5, 13, 9, 17, 10, 14]
moving_direction_indexes = None

similarity_lookback = 4
similarity_threshold = 0.8

sequence_length = 16
output_window = 1
max_frames = 100
add_every_n_frame = 10
frame_num = 0
interpolated_frame_num = 0
target_length = 64

frames = Frames(
    handpose=handpose_filtered, 
    sequence_length=sequence_length, 
    max_frames=max_frames, 
    window_size=window_size, 
    similarity_lookback=similarity_lookback,
    stationary_threshold_ratio=stationary_threshold_ratio,
    moving_direction_indexes=moving_direction_indexes,
    similarity_threshold=similarity_threshold,
    moving_direction_mapping=moving_direction_state_mapping, 
    palm_orientation_mapping=palm_orientation_state_mapping, 
    hand_pose_mapping=hand_pose_state_mapping)


timeseries_model = all_models["timeseries"][sequence_length][output_window]
classifier_model = list(all_models["classifier"].values())[0]





class MyListener(leap.Listener):
    def on_connection_event(self, event):
        print("Connected")

    def on_device_event(self, event):
        try:
            with event.device.open():
                info = event.device.get_info()
        except leap.LeapCannotOpenDeviceError:
            info = event.device.get_info()

        print(f"Found device {info.serial}")

    def on_tracking_event(self, event):
        global frame_num, interpolated_frame_num
        frame_num += 1
        for hand in event.hands:

            if frame_num % add_every_n_frame == 0:
                interpolated_frame_num += 1
                landmarks = Landmarks(hand)
                frame = Frame(landmarks)

                frames.add_frame(frame, True)
                print(f"Added frame {interpolated_frame_num}")


        if frame_num % add_every_n_frame == 0:
            if (len(frames.mapped_moving_directions) > 0) and len(event.hands) > 0:
                print(f"moving: {inverted_direction_mapping_2d[int(inverted_moving_direction_state_mapping[frames.mapped_moving_directions[-1]])]}")
                print(f"orientation: {inverted_orientation_mapping[int(inverted_palm_orientation_state_mapping[frames.mapped_palm_orientations[-1]])]}")
                print(f"hand pose: {frames.similarity_states[-1]}")
                clear_output(wait=True)

        #         # count the number of '4' in the mapped_moving_direction[-sequence_length:] (stationary state)
        #         stationary_count = frames.mapped_moving_directions[-sequence_length:].count(4)

        #         if stationary_count <= sequence_length * moving_percentage:

        #             mapped_moving_directions = frames.mapped_moving_directions
        #             mapped_palm_orientations = frames.mapped_palm_orientations
        #             mapped_similarity_states = frames.mapped_similarity_states

        #             combined = combine_mapped_separated_sequences([mapped_moving_directions], [mapped_palm_orientations], [mapped_similarity_states])[0][output_window-sequence_length:]
        #             to_predict_sequence = make_predict_frame_sequence(combined, sequence_length, output_window)
        #             predicted_states = combine_predicted_features(make_prediction(timeseries_model, to_predict_sequence, output_window))

        #             performed_states = to_predict_sequence.tolist()[0][:-output_window]
        #             combined_states = performed_states + predicted_states

        #             print(combined_states)

        #             norm_combined_states = torch.tensor(normalize_sequence(combined_states, target_length), dtype=torch.float32).unsqueeze(0).to('cpu')
        #             pred = torch.max(classifier_model(norm_combined_states), 1)
        #             pred_score = pred[0].item()
        #             pred_index = pred[1].item()
        #             mapped_pred = gesture_mapped_names[classifier_mappings["inverted_gesture_state_mapping"][pred_index]]
        #             if pred_score > 10:
        #                 print(f"Predicted: {mapped_pred} with score {pred_score}")
                    
        #             clear_output(wait=True)

        #         else:
        #             print("Stationary")
        #             clear_output(wait=True)


                


def main():
    my_listener = MyListener()

    connection = leap.Connection()
    connection.add_listener(my_listener)

    running = True

    with connection.open():
        connection.set_tracking_mode(leap.TrackingMode.Desktop)
        while running:
            if keyboard.is_pressed('q'):
                running = False
                break
            # if keyboard.is_pressed('a'):
                
            if interpolated_frame_num >= max_frames:
                running = False
                break
            time.sleep(0.01)
            


if __name__ == "__main__":
    main()

### Add frame with 'space' bar

In [None]:
from IPython.display import clear_output
from dtw import dtw
import keyboard
import copy

import leap
import time

orientation_mapping = {'up': 0, 'down': 1, 'opposite': 2}
inverted_orientation_mapping = {v: k for k, v in orientation_mapping.items()}
direction_mapping_2d = {'up': 0, 'down': 1, 'left': 2, 'right': 3, 'stationary': 4}
inverted_direction_mapping_2d = {v: k for k, v in direction_mapping_2d.items()}

gesture_mapped_names = {
    "gesture_7":"Swipe Right",
    "gesture_8":"Swipe Left",
    "gesture_9":"Swipe Up",
    "gesture_10":"Swipe Down",
}

top_dirs = 2
window_size = 7 # Original data used 30 fps camera, Leap 2 uses 120 fps camera so we need to multiply by 4
stationary_threshold_ratio = 1
moving_percentage = 0.45
# moving_direction_indexes = [0, 1, 2, 6, 18, 21, 5, 13, 9, 17, 10, 14]
moving_direction_indexes = None

similarity_lookback = 4
similarity_threshold = 0.8

sequence_length = 16
output_window = 1
max_frames = 100
add_every_n_frame = 4
frame_num = 0
added_frame_num = 0
previous_added_frame_num = 0


target_length = 64

frames = Frames(
    handpose=handpose_filtered, 
    sequence_length=sequence_length, 
    max_frames=max_frames, 
    window_size=window_size, 
    similarity_lookback=similarity_lookback,
    stationary_threshold_ratio=stationary_threshold_ratio,
    moving_direction_indexes=moving_direction_indexes,
    similarity_threshold=similarity_threshold,
    moving_direction_mapping=moving_direction_state_mapping, 
    palm_orientation_mapping=palm_orientation_state_mapping, 
    hand_pose_mapping=hand_pose_state_mapping)


timeseries_model = all_models["timeseries"][sequence_length][output_window]
classifier_model = list(all_models["classifier"].values())[0]

# def increment_frame_num():
#     global added_frame_num
#     added_frame_num += 1
#     print(f'Counter: {added_frame_num}')


class MyListener(leap.Listener):
    def on_connection_event(self, event):
        print("Connected")

    def on_device_event(self, event):
        try:
            with event.device.open():
                info = event.device.get_info()
        except leap.LeapCannotOpenDeviceError:
            info = event.device.get_info()

        print(f"Found device {info.serial}")

    def on_tracking_event(self, event):
        global frame_num, added_frame_num, previous_added_frame_num

        frame_num += 1

        

        for hand in event.hands:
            if keyboard.is_pressed('space') and frame_num % add_every_n_frame == 0:
                added_frame_num += 1
                if added_frame_num != previous_added_frame_num:
                    landmarks = Landmarks(hand)
                    frame = Frame(landmarks)

                    frames.add_frame(frame, True)
                    print(f"Added frame {added_frame_num}")

                    previous_added_frame_num = added_frame_num


        if (len(frames.mapped_moving_directions) > 0) and len(event.hands) > 0:
            
            print(f"moving: {inverted_direction_mapping_2d[int(inverted_moving_direction_state_mapping[frames.mapped_moving_directions[-1]])]}")
            print(f"orientation: {inverted_orientation_mapping[int(inverted_palm_orientation_state_mapping[frames.mapped_palm_orientations[-1]])]}")
            print(f"hand pose: {frames.similarity_states[-1]}")

            to_predict = get_predict_sequence(frames.mapped_moving_directions, frames.mapped_palm_orientations, frames.similarity_states, sequence_length, output_window)
            # clear_output(wait=True)
            predicted_states = combine_predicted_features(make_prediction(timeseries_model, to_predict, output_window))

            performed_states = to_predict.tolist()[0][:-output_window]
            combined_states = performed_states + predicted_states

            print(combined_states)

                



def main():
    global added_frame_num

    my_listener = MyListener()

    connection = leap.Connection()
    connection.add_listener(my_listener)

    running = True
    with connection.open():
        connection.set_tracking_mode(leap.TrackingMode.Desktop)
        while running:
            if keyboard.is_pressed('q'):
                running = False
                break
            
                
            if (len(frames.mapped_moving_directions) > 64):
                running = False
                break
            time.sleep(0.01)
            


if __name__ == "__main__":
    main()

    

# Compare time series prediction 

In [None]:
def make_predict_frame_sequence(mapped_frame_sequence, sequence_length, output_window):
    if len(mapped_frame_sequence) == sequence_length - output_window:
        return torch.tensor([mapped_frame_sequence + [[0, 0, 0]]*output_window]).to(device)
    elif len(mapped_frame_sequence) == sequence_length:
        return torch.tensor([mapped_frame_sequence[:-output_window] + [[0, 0, 0]]*output_window]).to(device)
    else:
        if len(mapped_frame_sequence) > sequence_length - output_window:
            return torch.tensor([mapped_frame_sequence[-(sequence_length - output_window):] + [[0, 0, 0]]*output_window]).to(device)
        else:
            return torch.tensor([[[0, 0, 0]]*(sequence_length - len(mapped_frame_sequence) - output_window) + mapped_frame_sequence + [[0, 0, 0]]*output_window]).to(device)

In [None]:
mapped_moving_directions = frames.mapped_moving_directions
mapped_palm_orientations = frames.mapped_palm_orientations
mapped_similarity_states = frames.similarity_states


target_length = 64
sequence_length = 16
output_window = 12

timeseries_model = all_models["timeseries"][sequence_length][output_window]
classifier_model = list(all_models["classifier"].values())[0]

combined = combine_mapped_separated_sequences([mapped_moving_directions], [mapped_palm_orientations], [mapped_similarity_states])[0]
combined_normalized = normalize_sequence(combined, sequence_length)
print(combined_normalized.tolist())
true_state = np.array(combined_normalized.tolist()[-output_window:])

to_predict_sequence = make_predict_frame_sequence(combined_normalized.tolist(), sequence_length, output_window)
print(to_predict_sequence.tolist())

predicted_states = combine_predicted_features(make_prediction(timeseries_model, to_predict_sequence, output_window))
performed_states = to_predict_sequence.tolist()[0][:-output_window]
combined_states = performed_states + predicted_states
print(combined_states)
predicted_states = np.array(predicted_states)
feature_wise_accuracy = (true_state == predicted_states).sum() / (true_state.shape[1] * true_state.shape[0])
print(f"Feature-wise accuracy: {feature_wise_accuracy}")
total = 0
correct = 0

for truth, pred in zip(true_state, predicted_states):
    if np.array_equal(truth, pred):
        correct += 1
    total += 1

print(f"State-wise accuracy: {correct}/{total}")

In [None]:
mapped_moving_directions = frames.mapped_moving_directions
mapped_palm_orientations = frames.mapped_palm_orientations
mapped_similarity_states = frames.similarity_states


target_length = 64
sequence_length = 32
output_window = 1

timeseries_model = all_models["timeseries"][sequence_length][output_window]
classifier_model = list(all_models["classifier"].values())[0]

combined = combine_mapped_separated_sequences([mapped_moving_directions], [mapped_palm_orientations], [mapped_similarity_states])[0][-sequence_length:]
# print(combined_normalized.tolist())
print(len(combined))
true_state = np.array(combined[-output_window:])
print(true_state.tolist())

to_predict_sequence = make_predict_frame_sequence(combined, sequence_length, output_window)
print(to_predict_sequence.tolist())

predicted_states = combine_predicted_features(make_prediction(timeseries_model, to_predict_sequence, output_window))
performed_states = to_predict_sequence.tolist()[0][:-output_window]
combined_states = performed_states + predicted_states
print(combined_states)
predicted_states = np.array(predicted_states)
feature_wise_accuracy = (true_state == predicted_states).sum() / (true_state.shape[1] * true_state.shape[0])
state_wise_accuracy = (true_state == predicted_states).sum() / true_state.size
print(f"Feature-wise accuracy: {feature_wise_accuracy}")
print(f"State-wise accuracy: {state_wise_accuracy}")

In [None]:
len(frames.mapped_moving_directions)

In [None]:
import keyboard

# Initialize the variable
# counter = 0

def increment_counter():
    global counter
    counter += 1
    print(f'Counter: {counter}')

# Add hotkey
keyboard.on_release_key('enter', increment_counter())

# Block forever, to keep the script running.
# keyboard.wait()


In [None]:
"""Uses interpolation in Leap API to determine the location of hands based on 
previous data. We use the LatestEventListener to wait until we have tracking 
events. We delay by 0.02 seconds each frame to simulate some delay, we get a 
frame size of the frame closest to the time we want to interpolate from and 
then interpolate on that frame"""
import leap
import time
from timeit import default_timer as timer
from typing import Callable
from leap.events import TrackingEvent
from leap.event_listener import LatestEventListener
from leap.datatypes import FrameData


def wait_until(condition: Callable[[], bool], timeout: float = 5, poll_delay: float = 0.01):
    start_time = timer()
    while timer() - start_time < timeout:
        if condition():
            return True
        time.sleep(poll_delay)
    if not condition():
        return False


def main():
    tracking_listening = LatestEventListener(leap.EventType.Tracking)

    connection = leap.Connection()
    connection.add_listener(tracking_listening)

    with connection.open() as open_connection:
        wait_until(lambda: tracking_listening.event is not None)
        # ctr-c to exit
        while True:
            event = tracking_listening.event
            if event is None:
                continue
            event_timestamp = event.timestamp

            target_frame_size = leap.ffi.new("uint64_t*")
            frame_time = leap.ffi.new("int64_t*")
            frame_time[0] = event_timestamp

            # simulate 20 ms delay
            time.sleep(0.02)

            try:
                # we need to query the storage required for our interpolation
                # request, the size will depend on the number visible hands in
                # this frame
                leap.get_frame_size(open_connection, frame_time, target_frame_size)
            except Exception as e:
                print("get_frame_size() failed with: ", e)
                continue

            frame_data = FrameData(target_frame_size[0])
            try:
                # actually interpolate and get frame data from the Leap API
                # this is the time of the frame plus the 20ms artificial
                # delay and an estimated 10ms processing time which should
                # get close to real time hand tracking with interpolation
                leap.interpolate_frame(
                    open_connection,
                    event_timestamp + 40000,
                    frame_data.frame_ptr(),
                    target_frame_size[0],
                )
            except Exception as e:
                print("interpolate_frame() failed with: ", e)
                continue

            event = TrackingEvent(frame_data)
            print(
                "Frame ",
                event.tracking_frame_id,
                " with ",
                len(event.hands),
                "hands with a delay of ",
                leap.get_now() - event.timestamp,
            )
            for hand in event.hands:
                hand_type = "left" if str(hand.type) == "HandType.Left" else "right"
                print(
                    f"Hand id {hand.id} is a {hand_type} hand with position ({hand.palm.position.x}, {hand.palm.position.y}, {hand.palm.position.z})."
                )


if __name__ == "__main__":
    main()

In [None]:
!python -m pip install traitlets==5.1.1