# The semi-automatic gesture annotation tool in ELAN format.

## Mount Google Drive

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Parameters that should be set manually.

In [41]:
# < MANDATORY >
# Path to directory of json files OpenPose generated
path_openpose_json_zip = "/content/drive/My Drive/Colab Notebooks/json.zip"
# Path to Elan file with some annotations of the input video (ELAN file with predicted annotations will be output to the same location)
path_elan = "/content/drive/My Drive/Colab Notebooks/me_anno_tool.eaf"
# FPS of the input ivdeo
fps = 25.0
# Seconds of annotations to add at one cycle
query_sec = 5
# Seconds of annotations to smooth (smooth_sec <= 0 disable smoothing)
smooth_sec = 0.2


# < Advanced >
# Keypoints with confidence values below this value will be interpolated
th_opconf = 0.5
# Window size (sec.) of the feature array
time_window = 0.5
# LightGBM parameters
param = (5, 31, 30, 7)
# Value ​​for splitting annotated data into train data and validation data (X means, train : validation = X-1 : 1)
n_splits = 3
# The number of epochs
epochs = 99999
# Early stopping round
early_stopping_rounds = 50

## If you just would like to try this tool, just run this cell (please do not change anything).

In [42]:
import json, copy, time, zipfile, re, ast
import numpy as np
import pandas as pd
import lightgbm as lgbm
from sklearn.model_selection import StratifiedKFold

my_round_int = lambda x: int((x * 2 + 1) // 2)

"""
Make input feature for LGBM based on the keypoints detected by OpenPose
"""
def make_input_feature(zipfile_, jsons_path, time_window, th_opconf):
    keypoints = np.full((len(jsons_path), 48+1, 2), np.nan, dtype=np.float64)
    body_idxes = [2, 5, 3, 6, 4, 7]

    for frm_idx, json_path in enumerate(jsons_path):
        # read json
        with zipfile_.open(json_path) as tmp:
            json_data = ast.literal_eval(tmp.read().decode("UTF-8"))
        if not json_data["people"]: continue

        # read keypoints
        pose_kpts  = json_data["people"][0]["pose_keypoints_2d"]
        rhand_kpts = json_data["people"][0]["hand_right_keypoints_2d"]
        lhand_kpts = json_data["people"][0]["hand_left_keypoints_2d"]

        # neck
        if pose_kpts[1*3+2] >= th_opconf:
            keypoints[frm_idx, -1, :] = pose_kpts[1*3], pose_kpts[1*3+1]

        # shoulders, elbows, wrists
        kpt_idx = 0
        for body_idx in body_idxes:
            if pose_kpts[body_idx*3+2] >= th_opconf:
                keypoints[frm_idx, kpt_idx, :] = pose_kpts[body_idx*3], pose_kpts[body_idx*3+1]
            kpt_idx += 1

        # hands
        for hand_idx in range(21):
            if rhand_kpts[hand_idx*3+2] >= th_opconf:
                keypoints[frm_idx, kpt_idx, :] = rhand_kpts[hand_idx*3], rhand_kpts[hand_idx*3+1]
            kpt_idx += 1
        for hand_idx in range(21):
            if lhand_kpts[hand_idx*3+2] >= th_opconf:
                keypoints[frm_idx, kpt_idx, :] = lhand_kpts[hand_idx*3], lhand_kpts[hand_idx*3+1]
            kpt_idx += 1
    
    # interpolation (keypoints with all frames nan are not interpolated)
    for kpt_idx in range(keypoints.shape[1]):
        if np.any(np.isnan(keypoints[:, kpt_idx, :])) and not np.all(np.isnan(keypoints[:, kpt_idx, :])):
            df = pd.DataFrame(keypoints[:, kpt_idx, :])
            df.interpolate('linear', limit_direction='both', inplace=True)
            keypoints[:, kpt_idx, :] = df.values

    # check if the keypoint normalization is possible
    is_normalization = True
    for frm_idx in range(keypoints.shape[0]):
        check = not np.any(np.isnan(keypoints[frm_idx, 0, :])) and not np.any(np.isnan(keypoints[frm_idx, 1, :])) and not np.any(np.isnan(keypoints[frm_idx, -1, :]))
        assert check, "Since OpenPose did not detect enough keypoints, the keypoint normalization will not be performed. If the normalization is required, consider lowering \"th_opconf\"."
        is_normalization = False
        break

    # normalization
    if is_normalization:
        for frm_idx in range(keypoints.shape[0]):
            shoul_len = np.linalg.norm(keypoints[frm_idx, 0, :] - keypoints[frm_idx, 1, :])
            keypoints[frm_idx] = (keypoints[frm_idx] - keypoints[frm_idx, -1, :]) / shoul_len
    
    # delete neck
    keypoints = np.delete(keypoints, -1, axis=1)

    # keypoints -> feature
    x_data = []
    x_data_ap = x_data.append
    for idx in range(keypoints.shape[0]-time_window+1):
        pos_avg = np.average(keypoints[idx:idx+time_window], axis=0) # 48 x 2

        # averaged distance
        dsts = np.full((time_window, keypoints.shape[1]), np.nan, dtype=np.float64)
        for dsts_idx, idx2 in enumerate(range(idx, idx+time_window)):
            dsts[dsts_idx] = np.linalg.norm(keypoints[idx2]-keypoints[int(idx+(time_window-1)/2)], axis=1)
        dsts = np.delete(dsts, int((time_window-1)/2), axis=0)
        dst_avg = np.average(dsts, axis=0) # 48

        x_data_ap(np.hstack((np.ravel(pos_avg), dst_avg)))

    return np.array(x_data)

"""
Read annotation from .eaf file
"""
def read_eaf(path_elan, num_frm, fps):
    # open elan file
    eaf_file = open(path_elan)
    lines = eaf_file.readlines()
    eaf_file.close()

    # 0:rest, 1:gesture, 2:no annotation
    annotation = np.ones(num_frm, dtype=np.uint8) * 2
    times = {}

    for idx, line in enumerate(lines):
        # time slot
        if "TIME_SLOT_ID=\"ts" in line:
            split_ = line.split('"')
            times[split_[1]] = float(split_[3])

        # time slot -> frame
        elif "<ANNOTATION_VALUE>gesture" in line or "<ANNOTATION_VALUE>rest" in line:
            split_ = prev_line.split('"')
            start = my_round_int(fps * times[split_[3]] / 1000.)
            end = my_round_int(fps * times[split_[5]] / 1000.)
            tmp = 1 if "gesture" in line else 0
            annotation[start:end] = tmp

        prev_line = line
    
    return annotation

"""
Do active learning
"""
def active_learning(feature, annotation, param, n_splits, epochs, early_stopping_rounds, query_frm):
    # train, validation, test data
    x_data = feature[annotation != 2]
    y_data = annotation[annotation != 2]
    x_test = feature[annotation == 2]

    # divide x/y_data into train and validation
    skf = StratifiedKFold(n_splits=n_splits, shuffle=True)
    for train_idx, valid_idx in skf.split(x_data, y_data):
        x_train = x_data[train_idx]
        y_train = y_data[train_idx]
        x_valid = x_data[valid_idx]
        y_valid = y_data[valid_idx]
        break

    # LightGBM parameters
    weight_column = [1./float(np.count_nonzero(y_train==0)), 1./float(np.count_nonzero(y_train==1))]
    weight_column = list(map(lambda x: x/sum(weight_column), weight_column))
    lgbm_param = {
        'objective': "binary",
        'learning_rate': 1e-1,
        'feature_fraction': 0.5,
        'bagging_fraction': 0.5,
        'bagging_freq': 1,
        'verbosity': -1,
        'num_leaves': my_round_int(float(param[0])/10. * (2**param[3])),
        'max_bin': param[1],
        'min_data_in_leaf': param[2],
        'max_depth': param[3],
        'weight_column': weight_column
    }

    # training
    lgb_train = lgbm.Dataset(x_train, label=y_train)
    lgb_valid = lgbm.Dataset(x_valid, label=y_valid, reference=lgb_train)
    model = lgbm.train(lgbm_param, lgb_train, valid_sets=lgb_valid, num_boost_round=epochs, early_stopping_rounds=early_stopping_rounds, verbose_eval=False)

    # prediction
    y_pred_prob = model.predict(x_test, num_iteration=model.best_iteration)

    # select query
    unannotated = np.where(annotation == 2)[0]
    tmp_arr = np.full(annotation.shape[0], np.nan, dtype=np.float64)
    tmp_arr[unannotated] = y_pred_prob
    max_uncertainty = [0., []]

    for idx in range(0, unannotated.shape[0], query_frm):
        # calc. minimum probability
        tmp = idx+query_frm if idx+query_frm <= unannotated.shape[0] else unannotated.shape[0]
        tmp_prob = tmp_arr[unannotated[idx:tmp]]
        sum_uncertainty = sum([1.-p if p >= 0.5 else p for p in tmp_prob]) 
        assert not np.isnan(sum_uncertainty)

        # update max uncertainty
        if sum_uncertainty > max_uncertainty[0]:
            max_uncertainty = sum_uncertainty, [unannotated[idx], unannotated[tmp]]

    y_pred = np.array([1 if p >= 0.5 else 0 for p in y_pred_prob], dtype=np.uint8)
    return y_pred, max_uncertainty[1]

"""
Smooth predicted annotation to output elan file
"""
def postprocessing(annotation, pred_anno, y_pred, cutout):
    pred_anno[pred_anno == 2] = y_pred + 2 # manual(0,1), predicted(2,3)
    pred_anno = np.hstack((np.zeros(cutout, dtype=np.uint8), pred_anno, np.zeros(cutout, dtype=np.uint8)))
    annotation[annotation == 2] = copy.deepcopy(pred_anno[annotation == 2])

    # separate annotation
    separated, current_anno, st = [], annotation[0], 0
    for idx in range(1, annotation.shape[0]):
        if annotation[idx] != current_anno:
            separated.append([st, idx, current_anno])
            current_anno = annotation[idx]
            st = idx
    separated.append([st, annotation.shape[0], current_anno])
    
    return annotation, separated

def smooth_annotation(annotation, delete_frm, separated):
    if delete_frm > 0: 
        # first loop (integrate consecutive short annotations)
        idx, dels = 0, []
        while True:
            if len(separated) <= idx: break

            # short and predicted annotation
            if separated[idx][1]-separated[idx][0] <= delete_frm and separated[idx][2] > 1:
                # end
                ed = idx + 1
                while True:
                    if separated[ed][1]-separated[ed][0] > delete_frm or separated[ed][2] <= 1: break
                    else: ed += 1

                # integration
                if idx != ed - 1:
                    separated[idx][1] = separated[ed-1][1]
                    tmp = annotation[separated[idx][0]:separated[idx][1]]
                    num_rest = np.count_nonzero(tmp == 2)
                    num_gest = np.count_nonzero(tmp == 3)
                    assert np.count_nonzero(tmp == 0) == 0 and np.count_nonzero(tmp == 1) == 0
                    label = 2 if num_rest >= num_gest else 3
                    separated[idx][2] = label
                    for idx2 in range(idx+1, ed):
                        dels.append(idx2) 
                idx = ed + 1
            else: idx += 1

        ofs = 0
        for del_ in dels:
            del separated[del_-ofs]
            ofs += 1

        # second loop
        idx, dels = 0, []
        while True:
            if len(separated) <= idx: break

            # short and predicted annotation
            if separated[idx][1]-separated[idx][0] <= delete_frm and separated[idx][2] > 1:
                st = idx-1 if idx != 0 else idx
                ed = idx+1 if idx < len(separated)-1 else idx

                assert separated[st][1]-separated[st][0] > delete_frm or separated[st][2] <= 1
                assert separated[ed][1]-separated[ed][0] > delete_frm or separated[ed][2] <= 1

                tmp = annotation[separated[st][0]:separated[ed][1]]
                num_rest = np.count_nonzero(tmp == 0) + np.count_nonzero(tmp == 2)
                num_gest = np.count_nonzero(tmp == 1) + np.count_nonzero(tmp == 3)
                label = 0 if num_rest >= num_gest else 1
                separated[idx][2] = label
            else: 
                if separated[idx][2] > 1:
                    separated[idx][2] -= 2
            idx += 1
    else:
        for idx in range(len(separated)):
            if separated[idx][2] > 1:
                separated[idx][2] -= 2

    # integration
    new_separated = []
    current_anno, st = separated[0][2], 0
    for idx, sp in enumerate(separated):
        assert sp[2] <= 1, sp[2]
        if sp[2] != current_anno:
            new_separated.append([separated[st][0], separated[idx-1][1], current_anno])
            current_anno = sp[2]
            st = idx
    new_separated.append([separated[st][0], separated[-1][1], current_anno])
    
    return new_separated

"""
Output elan file with predicted annotation and query
"""
def output_eaf(path_elan, separated, max_uncertainty, fps):
    # Open file
    input_eaf = open(path_elan, "r")
    output_eaf = open(path_elan.split(".eaf")[0]+"_predicted.eaf", "w")

    prev_line, examples, times = "", ["" for _ in range(6)], {}
    for line in input_eaf:
        # TIME_SLOT for manual
        if "TIME_SLOT_ID=\"ts" in line:
            split_ = line.split('"')
            last_ts = int(split_[1].split("s")[-1])
            times[split_[1]] = int(split_[3])

        elif "</TIME_ORDER>" in line:
            # TIME_SLOT for predicted
            split_ = prev_line.split('"')
            for sp_idx, sp in enumerate(separated):
                for sp_idx2 in range(2):
                    ms = int(float(sp[sp_idx2]) * 1000. / fps)
                    last_ts += 1
                    separated[sp_idx][sp_idx2] = "ts"+str(last_ts)
                    output_eaf.write('"'.join([split_[0], "ts"+str(last_ts), split_[2], str(ms), split_[4]]))
                    if sp_idx != len(separated)-1: break

            # TIME_SLOT for query
            for idx in range(2):
                ms = int(float(max_uncertainty[idx]) * 1000. / fps)
                last_ts += 1
                output_eaf.write('"'.join([split_[0], "ts"+str(last_ts), split_[2], str(ms), split_[4]]))
            max_uncertainty = ["ts"+str(last_ts-1), "ts"+str(last_ts)]

        # references
        elif examples[0] == "" and "TIER_ID=\"" in line:
            examples[0] = line.split("TIER_ID=\"")[0]
            tab = line.split("<")[0]
        elif examples[1] == "" and "<ANNOTATION>" in line:            examples[1] = line
        elif examples[2] == "" and "<ALIGNABLE_ANNOTATION " in line:  examples[2] = line.split('"')
        elif examples[3] == "" and "<ANNOTATION_VALUE>" in line:
            examples[3] = line
            r_ann = "rest" if "rest" in line else "gesture"
        elif examples[4] == "" and "</ALIGNABLE_ANNOTATION>" in line: examples[4] = line
        elif examples[5] == "" and "</ANNOTATION>" in line:           examples[5] = line
        
        # manual annotation
        elif "<ANNOTATION_VALUE>rest" in line or "<ANNOTATION_VALUE>gesture" in line:
            last_a = int(prev_line.split('"')[1].split('a')[-1])

        # write
        elif "</TIER>" in prev_line and "TIER_ID=\"" not in line:
            output_eaf.write(examples[0] + "TIER_ID=\"PREDICTED\">\n")

            # write predicted annotations
            for idx, sp in enumerate(separated):
                last_a += 1
                output_eaf.write(examples[1])
                tmp = separated[idx+1][0] if idx != len(separated)-1 else sp[1]
                output_eaf.write('"'.join([examples[2][0], "a"+str(last_a), examples[2][2], sp[0], examples[2][4], tmp, examples[2][6]]))
                label = "rest" if sp[2] == 0 else "gesture"
                output_eaf.write(examples[3].replace(r_ann, label))
                output_eaf.write(examples[4])
                output_eaf.write(examples[5])
            output_eaf.write(tab + "</TIER>\n")

            # write query
            output_eaf.write(examples[0] + "TIER_ID=\"QUERY\">\n")
            output_eaf.write(examples[1])
            output_eaf.write('"'.join([examples[2][0], "a"+str(last_a+1), examples[2][2], max_uncertainty[0], examples[2][4], max_uncertainty[1], examples[2][6]]))
            output_eaf.write(tab+tab+tab+tab + "<ANNOTATION_VALUE>query</ANNOTATION_VALUE>\n")
            output_eaf.write(examples[4])
            output_eaf.write(examples[5])
            output_eaf.write(tab + "</TIER>\n")

        # update  
        output_eaf.write(line)
        prev_line = line

    # end
    input_eaf.close()
    output_eaf.close()
    print("Done! {} has been outputted.".format(path_elan.split(".eaf")[0]+"_predicted.eaf"))

st_time = time.time()

print("Making input feature array to train...")
zipfile_ = zipfile.ZipFile(path_openpose_json_zip)
json_paths = [i for i in zipfile_.namelist() if '.json' in i]
json_paths = sorted(json_paths, key=lambda x:int(re.findall(r"\d+", x)[0]))
time_window = my_round_int(float(time_window)*float(fps))
feature = make_input_feature(zipfile_, json_paths, time_window, float(th_opconf))

print("Read annotation from .eaf file...")
annotation = read_eaf(path_elan, len(json_paths), float(fps))
cutout = int((time_window-1.)/2.)
tmp_annotation = copy.deepcopy(annotation[cutout:-cutout])

print("Training LGBM and predicting annotation...")
y_pred, max_uncertainty = active_learning(feature, tmp_annotation, param, int(n_splits), int(epochs), int(early_stopping_rounds), my_round_int(float(query_sec)*float(fps)))

print("Outputing elan file...")
annotation, separated = postprocessing(annotation, tmp_annotation, y_pred, cutout)
separated = smooth_annotation(annotation, my_round_int(float(smooth_sec)*float(fps)), separated)
output_eaf(path_elan, separated, [max_uncertainty[0]+cutout, max_uncertainty[1]+cutout], float(fps))

print("It took {} seconds.".format(time.time()-st_time))

Making input feature array to train...
Read annotation from .eaf file...
Training LGBM and predicting annotation...
Outputing elan file...
Done! /content/drive/My Drive/Colab Notebooks/me_anno_tool_predicted.eaf has been outputted.
It took 6.012408971786499 seconds.
