In [1]:
# https://storage.googleapis.com/mediapipe-models/hand_landmarker/hand_landmarker/float16/latest/hand_landmarker.task
import mediapipe as mp
import numpy as np
from mediapipe.framework.formats import landmark_pb2
import cv2

In [2]:
import time
lm_result = None
def print_result(result: mp.tasks.vision.HandLandmarkerResult, output_image: mp.Image, timestamp_ms: int):
    global lm_result
    lm_result = result
    # print(f'hand landmarker result: {result}') 

def detect_async(frame, landmarker):
      # convert np frame to mp image
      mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame)
      # detect landmarks
      landmarker.detect_async(image = mp_image, timestamp_ms = int(time.time() * 1000))

def close(landmarker):
  # close landmarker
  landmarker.close()

model_path = 'hand_landmarker.task'

options = mp.tasks.vision.HandLandmarkerOptions(
    base_options=mp.tasks.BaseOptions(model_asset_path=model_path),
    running_mode=mp.tasks.vision.RunningMode.LIVE_STREAM,
    min_hand_detection_confidence = 0.3, # lower than value to get predictions more often
    min_hand_presence_confidence = 0.3, # lower than value to get predictions more often
    min_tracking_confidence = 0.3, # lower than value to get predictions more often
    num_hands=2,
    result_callback=print_result)

# options for landmarker in image mode:
options_image = mp.tasks.vision.HandLandmarkerOptions(
    base_options=mp.tasks.BaseOptions(model_asset_path=model_path),
    running_mode=mp.tasks.vision.RunningMode.IMAGE,
    min_hand_detection_confidence = 0.2,
    min_hand_presence_confidence = 0.2,
    min_tracking_confidence = 0.2,
    # num_hands=2
)

In [3]:
first = True

def draw_landmarks_on_image(rgb_image, detection_result: mp.tasks.vision.HandLandmarkerResult):
    """Courtesy of https://github.com/googlesamples/mediapipe/blob/main/examples/hand_landmarker/python/hand_landmarker.ipynb"""
    global first
    try:
        if detection_result.hand_landmarks == []:
            return rgb_image
        else:
            hand_landmarks_list = detection_result.hand_landmarks
            annotated_image = np.copy(rgb_image)
            
            # Loop through the detected hands to visualize.
            for idx in range(len(hand_landmarks_list)):
                hand_landmarks = hand_landmarks_list[idx]
                # Draw the hand landmarks.
                hand_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
                hand_landmarks_proto.landmark.extend([
                    landmark_pb2.NormalizedLandmark(x=landmark.x, y=landmark.y, z=landmark.z) for landmark in hand_landmarks])
                mp.solutions.drawing_utils.draw_landmarks(
                    annotated_image,
                    hand_landmarks_proto,
                    mp.solutions.hands.HAND_CONNECTIONS,
                    mp.solutions.drawing_styles.get_default_hand_landmarks_style(),
                    mp.solutions.drawing_styles.get_default_hand_connections_style())
                
            return annotated_image
    except Exception as e:
        print(e, 'exception')
        return rgb_image

In [4]:
def create_coord_list(detected_landmarks):
  coord_list = []
  for i in detected_landmarks:
    coord_list.append([i.x, i.y, i.z])
  return coord_list
    
def orthonormal_basis_from_triangle(p1, p2, p3):
    """Return orthonormal triad (e1,e2,e3): e1 along (p2-p1), e2 in the plane, e3 = normal."""
    u = p2 - p1
    v = p3 - p1
    n = np.cross(u, v)
    if np.linalg.norm(n) < 1e-12:
        raise ValueError("Anchor points are collinear; cannot define a plane.")
    e1 = u / np.linalg.norm(u)
    v_perp = v - np.dot(v, e1) * e1
    e2 = v_perp / np.linalg.norm(v_perp)
    e3 = np.cross(e1, e2)  # already unit
    return e1, e2, e3

def solve_affine2d(src_xy, dst_xy):
    """Solve 2D affine mapping (x,y)->(X,Y) from 3 point pairs. Returns 2x2 M and 2-dim t."""
    A, b = [], []
    for (x, y), (X, Y) in zip(src_xy, dst_xy):
        A.append([x, y, 1, 0, 0, 0])
        A.append([0, 0, 0, x, y, 1])
        b += [X, Y]
    A = np.asarray(A, float)
    b = np.asarray(b, float)
    m11, m12, tx, m21, m22, ty = np.linalg.solve(A, b)
    M = np.array([[m11, m12],
                  [m21, m22]], float)
    t = np.array([tx, ty], float)
    return M, t

def tilt_from_z_original(P, idx0=0, idx1=17, degrees=False):
    """
    Angle between the original vector P[idx1]-P[idx0] and the global z-axis.
    Returns angle in radians
    """
    v = np.asarray(P[idx1]) - np.asarray(P[idx0])
    n = np.linalg.norm(v)
    if n < 1e-12:
        return np.nan  # or raise ValueError("Zero-length vector")
    v_unit = v / n
    cos_theta = np.clip(v_unit[2], -1.0, 1.0)   # dot(v̂, ẑ) = z-component
    theta = np.arccos(cos_theta)                # 0 = vertical, pi/2 = horizontal
    return theta

def normalize_landmarks(points, anchor_idx=(0,17,5), lambda_normal=1.0):
    """
    Build a 3D affine transform (A,t) so that:
      P[i1] -> (0,0,0), P[i2] -> (0,1,0), P[i3] -> (1,1,0),
    and the plane normal maps to the Z axis with scale lambda_normal.
    Returns transformed points (N,3) and (A,t).
    """
    P = np.asarray(points, float)
    i1, i2, i3 = anchor_idx
    p1, p2, p3 = P[i1], P[i2], P[i3]

    # Local orthonormal basis B = [e1 e2 e3]
    e1, e2, e3 = orthonormal_basis_from_triangle(p1, p2, p3)
    B = np.column_stack([e1, e2, e3])          # world <- local
    BT = B.T                                    # local <- world

    # Local coordinates of anchors (q = BT * (p - p1))
    q1 = BT @ (p1 - p1)                         # ~ (0,0,0)
    q2 = BT @ (p2 - p1)                         # (x2, y2, 0)
    q3 = BT @ (p3 - p1)                         # (x3, y3, 0)

    src_xy = np.vstack([q1[:2], q2[:2], q3[:2]])
    dst_xy = np.array([[0.,0.],[0.,1.],[1.,1.]], float)

    # Exact 2D affine that hits the three targets in XY
    M2, t2 = solve_affine2d(src_xy, dst_xy)

    # Compose full 3x3 local linear map: XY via M2, Z via lambda_normal
    L_local = np.array([[M2[0,0], M2[0,1], 0.0],
                        [M2[1,0], M2[1,1], 0.0],
                        [0.0,      0.0,    lambda_normal]], float)

    # Because q1 = (0,0,0) maps to (0,0,0), t2 should be ~0; keep it for completeness
    t_local = np.array([t2[0], t2[1], 0.0], float)

    # Convert to a single world-space affine: P' = A @ P + t
    # q = BT @ (P - p1);  q' = L_local @ q + t_local;  P' = q'  (targets are in world axes)
    A = L_local @ BT
    t = -L_local @ (BT @ p1) + t_local

    # Apply to all points
    normalized_points = (A @ P.T).T + t

    return normalized_points, tilt_from_z_original(points)

In [5]:
def store_result(result, clazz, normalize = False):
    d = {}
    if normalize:
        coords = create_coord_list(result.hand_landmarks[0])
        norm_coords, tilt = normalize_landmarks(coords)
        for k in range(len(coords)):
            d[str(k)+'_x'] = norm_coords[k][0]
            d[str(k)+'_y'] = norm_coords[k][1]
            d[str(k)+'_z'] = norm_coords[k][2]
        d['tilt'] = tilt
    
    else:
        for k,v in enumerate(result.hand_landmarks[0]):
            d[str(k)+'_x'] = v.x
            d[str(k)+'_y'] = v.y
            d[str(k)+'_z'] = v.z
    
    if clazz != None:    
        d['clazz'] = clazz
    # print(d)
    return d
    
    # store result values in dict and add to list

In [6]:
# convert list of dicts to pd.df
def get_data_frame(result_list):
    return pd.DataFrame(result_list)

def get_keypoints(landmarker, base_path, file_name, clazz):
    image = mp.Image.create_from_file(base_path + file_name)
    return detect(image, clazz, landmarker)

def detect(image, clazz, landmarker):
    return (landmarker.detect(image=image), clazz)

In [7]:
#!pip install pandas==2.1.4 moet numpy <2 gebruiken voor mediapipe!
import pandas as pd

# create the landmarker
landmarker_image = mp.tasks.vision.HandLandmarker.create_from_options(options_image)

data_path = './../data/American Sign Language Letters.v1-v1.tensorflow/'

train_data = pd.read_csv(data_path + 'train/_annotations.csv')
train_data.head()

valid_data = pd.read_csv(data_path + 'valid/_annotations.csv')
valid_data.head()

test_data = pd.read_csv(data_path + 'test/_annotations.csv')
test_data.head()

train_data[:1]
counter = 0
    
def store_data(data, folder, normalize=False):
    result_list = []
    # loop through images in csv and get results
    for i in range(len(data)):
        values = data[i:i+1][['filename','class']].values[0]
        file_name, clazz = values[0], values[1]
        result = get_keypoints(landmarker_image, data_path + folder, file_name, clazz)
        if len(result[0].hand_landmarks) > 0:
            result_list.append(store_result(result[0], clazz, normalize))
    return result_list

In [8]:
# Load normalized results into DataFrame
df_train_normalized = get_data_frame(store_data(train_data, 'train/', True))
df_train_normalized.head()

df_valid_normalized = get_data_frame(store_data(valid_data, 'valid/', True))
df_valid_normalized.head()

df_test_normalized = get_data_frame(store_data(test_data, 'test/', True))
df_test_normalized.head()

Unnamed: 0,0_x,0_y,0_z,1_x,1_y,1_z,2_x,2_y,2_z,3_x,...,18_y,18_z,19_x,19_y,19_z,20_x,20_y,20_z,tilt,clazz
0,0.0,4.440892e-16,0.0,0.32712,0.090737,0.045319,0.626213,0.228807,0.075154,0.607811,...,1.275446,0.019063,-0.188385,1.415672,0.016589,-0.239304,1.589045,0.002881,1.935054,J
1,4.1633360000000003e-17,5.5511150000000004e-17,0.0,0.525119,0.20624,-0.07191,0.828477,0.543234,-0.132616,0.932525,...,1.177242,-0.129969,0.229686,1.037169,-0.145758,0.188796,0.954804,-0.093303,2.952533,Q
2,-2.220446e-16,-4.440892e-16,0.0,0.444774,0.216007,-0.078405,0.73004,0.448115,-0.118868,0.519361,...,0.901946,-0.066201,-0.210894,0.662981,-0.085913,-0.107566,0.66115,-0.064426,2.187825,Z
3,1.110223e-16,4.440892e-16,0.0,0.897693,0.348256,-0.091335,1.310136,0.783704,-0.194932,1.397869,...,1.469281,-0.072216,0.291336,1.651877,-0.135644,0.35867,1.770191,-0.184458,1.861912,R
4,-2.220446e-16,0.0,0.0,0.38662,0.130031,-0.04709,0.742501,0.262637,-0.082818,0.643822,...,0.670696,-0.064583,-0.138432,0.604676,-0.066883,-0.059686,0.703596,-0.051676,2.247806,Z


In [14]:
# train neural network
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import accuracy_score
from sklearn.metrics import classification_report
from sklearn.metrics import f1_score

# training features
X_features = ['0_x','0_y','0_z','1_x','1_y','1_z','2_x','2_y','2_z'
                    ,'3_x','3_y','3_z','4_x','4_y','4_z','5_x','5_y','5_z'
                    ,'6_x','6_y','6_z','7_x','7_y','7_z','8_x','8_y','8_z'
                    ,'9_x','9_y','9_z','10_x','10_y','10_z','11_x','11_y','11_z'
                    ,'12_x','12_y','12_z','13_x','13_y','13_z','14_x','14_y','14_z'
                    ,'15_x','15_y','15_z','16_x','16_y','16_z','17_x','17_y','17_z'
                    ,'18_x','18_y','18_z','19_x','19_y','19_z','20_x','20_y','20_z', 'tilt'
]

y_target = 'clazz'

# prepare data for training
df_training_normalized_x = df_train_normalized[X_features]
df_valid_normalized_x = df_valid_normalized[X_features]
df_test_normalized_x = df_test_normalized[X_features]

df_training_normalized_y = df_train_normalized[y_target]
df_valid_normalized_y = df_valid_normalized[y_target]
df_test_normalized_y = df_test_normalized[y_target]

def train_nn_model(X_train, y_train):
    clf = MLPClassifier(
        solver='adam', 
        alpha=1e-5,
        hidden_layer_sizes=(26, 26), 
        random_state=42,
        max_iter=2000
    )
    clf.fit(X_train, y_train)
    return clf

def test_nn_model(model, title, X, y):
    # y_pred = model.predict_proba(X_test)[:,1]
    print(f'Statistics: {title}')
    print('accuracy: {:5.4f}'.format(accuracy_score(y_true=y, y_pred=model.predict(X=X))))
    print('f1: {:5.4f}'.format(f1_score(average='weighted', y_true=y, y_pred=model.predict(X=X))))
    print(classification_report(y_true=y, y_pred=model.predict(X=X)))

nn_model = train_nn_model(df_training_normalized_x, df_training_normalized_y)
test_nn_model(nn_model, 'Training', df_training_normalized_x, df_training_normalized_y)
test_nn_model(nn_model, 'Validation', df_valid_normalized_x, df_valid_normalized_y)
test_nn_model(nn_model, 'Test', df_test_normalized_x, df_test_normalized_y)

Statistics: Training
accuracy: 0.9622
f1: 0.9624
              precision    recall  f1-score   support

           A       0.89      0.97      0.93        64
           B       1.00      1.00      1.00        35
           C       0.94      0.97      0.96        33
           D       0.92      0.88      0.90        50
           E       0.98      0.92      0.95        51
           F       1.00      0.98      0.99        53
           G       0.92      0.98      0.95        49
           H       1.00      0.97      0.99        36
           I       0.99      0.99      0.99        71
           J       0.90      0.90      0.90        71
           K       1.00      0.96      0.98        45
           L       1.00      0.98      0.99        64
           M       1.00      0.96      0.98        45
           N       0.96      0.96      0.96        53
           O       0.90      0.98      0.94        47
           P       0.97      0.97      0.97        39
           Q       1.00      0.9

In [10]:
import re
import heapq
import time
from wordfreq import word_frequency

In [11]:
def is_misspelled(word, lan):
    return not word_frequency(word, lan) > 0

def probabilities_letter(landmark_x, model):
    prob_dict = {}
    probs = model.predict_proba(landmark_x)
    for cls, p in zip(model.classes_, probs[0]):
        #print(f"{cls}: {p:.3f}")
        prob_dict[cls] = p
    return prob_dict

def find_top_options(captured_probabilities, num_options = 5, max_misspell = 2, include_vocab_freq = True, language = 'en', timeout = 5):
    top_letters = []
    log_probs_landmarks = []
    log_probs_diff = []

    #find all possible values for the captured probabilities
    for i, probs in enumerate(captured_probabilities):
        top_letters_landmark = dict(sorted(probs.items(), key=lambda x: x[1], reverse=True))#[:num_options]) #we need max num_options possibilities per letter
        top_letter_landmark = sorted(probs, key=probs.get, reverse=True)[0]
        #print(top_letter_landmark)
        log_probs_diff.append([])
        #print(log_probs_diff)
        letters_to_delete = []
        for letter in top_letters_landmark:
            if probs[letter] == 0:
                letters_to_delete.append(letter)
            else:
                log_probs_diff[i].append((letter, np.log(top_letters_landmark[letter]) - np.log(top_letters_landmark[top_letter_landmark]))) 
        for letter in letters_to_delete:
            del top_letters_landmark[letter]

    
    #find all possible words in descending order of likeliness
    # initial state: pick the best (0) from each list
    start_indices = tuple([0]*len(log_probs_diff))
    start_choice = [log_probs_diff[i][0] for i in range(len(log_probs_diff))]
    start_sum = sum(val for _, val in start_choice)

    start_word = ''.join(char for char, _ in tuple(log_probs_diff[i][0] for i in range(len(log_probs_diff))))   

    # max-heap (negate sums because heapq is min-heap)
    heap = [(-start_sum, start_indices, 0)]
    seen = {start_indices}

    results = []
    start_time = time.time()
    run_time = 0


    while heap and len(results) < num_options and run_time <= timeout:
        
        #print(new_time - start_time)
        neg_sum, idx_tuple, changes = heapq.heappop(heap)
        current_sum = -neg_sum
        # build the actual combo
        combo = tuple(log_probs_diff[i][idx_tuple[i]] for i in range(len(log_probs_diff)))
        
        word = ''.join(char for char, _ in combo)
        num_misspell = sum(1 for _, num in combo if num != 0)
        
        if not is_misspelled(word, language) and num_misspell <= max_misspell:
            #print(word, "has been found with", num_misspell, "differences.")
            word_prob = np.exp(current_sum) * word_frequency(word, language)
            results.append((word, word_prob))

        # push neighbors: increment one dimension (if possible)
        for dim in range(len(log_probs_diff)):
            j = idx_tuple[dim]
            if j + 1 < len(log_probs_diff[dim]):
                nxt = list(idx_tuple)
                nxt[dim] = j + 1
                nxt = tuple(nxt)

                if nxt in seen:
                    continue

                new_changes = changes + (1 if j == 0 else 0)
                if new_changes > max_misspell:
                    continue
                    
                # compute new sum efficiently
                old_letter, old_val = log_probs_diff[dim][j]
                new_letter, new_val = log_probs_diff[dim][j+1]
                heapq.heappush(heap, (-(current_sum - old_val + new_val), nxt, new_changes))
                seen.add(nxt)
        new_time = time.time()
        run_time = new_time - start_time
    if run_time > timeout:
        print("Time limit of {} seconds reached.".format(timeout), "{} word(s) found".format(len(results)), "for original {}.".format(start_word))

    return start_word, sorted(results, key=lambda x: x[1], reverse=True)

In [13]:
cam = cv2.VideoCapture(0)
frame_width = int(cam.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cam.get(cv2.CAP_PROP_FRAME_HEIGHT))

landmarker = mp.tasks.vision.HandLandmarker.create_from_options(options)

captured_probabilities = []
captured_chars = []
probabilities = {}
prediction = None

def store_char_from_mouse_click(event,x,y,flags,param):
    global prediction
    global probabilities
    global captured_chars
    global captured_probabilities
    if event == 1:
        if prediction is not None:
            print(prediction)
            captured_chars.append(prediction)
            captured_probabilities.append(probabilities)

cv2.namedWindow('Camera')
cv2.setMouseCallback('Camera', store_char_from_mouse_click, (captured_probabilities, captured_chars, probabilities, prediction))


while True:
    ret, frame = cam.read()
    detect_async(frame, landmarker)
    frame = draw_landmarks_on_image(frame, lm_result)
    
    if lm_result and lm_result.hand_landmarks != []:
        data = store_result(lm_result, None, True)
        data.pop('clazz', None)
        probabilities = probabilities_letter(pd.DataFrame([data]), nn_model)
        prediction = max(probabilities, key=probabilities.get) 
        if probabilities[prediction] > 0.7:
            cv2.putText(frame, prediction, (250, 70), cv2.FONT_HERSHEY_SIMPLEX, 3, (255, 0, 0), 4)
    cv2.imshow('Camera', frame)

    if cv2.waitKey(1) == ord('s'):
        print(prediction)
        captured_chars.append(prediction)
        captured_probabilities.append(probabilities)
    elif cv2.waitKey(1) == ord('q'):
        break    

close(landmarker)
cam.release()
cv2.destroyAllWindows()

if len(captured_chars) > 0:
    print(f'Captured word: {"".join(captured_chars)}')

    if len(captured_chars) > 3:
        start_word, improved_words = find_top_options(captured_probabilities, num_options = 10, max_misspell = 3, language = 'en', timeout = 10)
        print(f'Start word: {start_word}')
        print('Improved words:')
        for alternative_word, probability in improved_words:
            print(alternative_word, "has probability {:.10f}".format(probability))
    else:
        print('To few characters to predict corrections')
else:
    print('No characters captured')

W
H
A
T
C
Captured word: WHATC
Start word: WHATC
Improved words:
WRATH has probability 0.0000000189
WHANG has probability 0.0000000025
WHATD has probability 0.0000000002
WHATA has probability 0.0000000000
WRANG has probability 0.0000000000
WHITH has probability 0.0000000000
WPATH has probability 0.0000000000
WHETH has probability 0.0000000000
WHATE has probability 0.0000000000
WUSTL has probability 0.0000000000
