<center><h1> Using Automated Movement Detection to Assist with Gesture Annotation </h1>


<h4> James Trujillo ( j.p.trujillo@uva.nl )<br>
    Updated: 19-06-2025 </h4>
    
<img src="./images/envisionbox_logo.png" width="300"/> 
<br>
<h3> Info </h3>
This module is meant to be a brief tutorial on how you can use the automatic movement detection tool SPUDNIG (Ripperda, Drijvers & Holler, 2020) as part of your gesture coding pipeline. This method can be handy because SPUDNIG can capture all of the hand movements performed by a person, allowing the coder to simply go through these annotations, determine which are gestures and which are not, and fix the timing boundaries of the annotations, rather than going through every second of the video. The authors of the original SPUDNIG paper estimate that this can cut the time you spend gesture coding nearly in half. <br>
So, in this tutorial, we'll see how you can easily set everything up, run the program, and then use the output to code some gestures.
<br><br>

<b>Packages:</b>
    
* opencv-python

* pympi-ling

* SciPy

* numpy

* pandas

* scipy

* mediapipe
<br><br>
 
    
<b>Module citation: </b><br>
Trujillo, J.P. (2025). <i> Using Automated Movement Detection to Assist with Gesture Annotation </i> \[day you visited the site]. Retrieved from: https://envisionbox.org/Assisted_Gesture_Annotation.html 
<br>

<b>SPUDNIG reference</b>
<br>Ripperda, J., Drijvers, L. & Holler, J. (2020). Speeding up the detection of non-iconic and iconic gestures (SPUDNIG): A toolkit for the automatic detection of hand movements and gestures in video data. <i>Behavior Research Methods</i>: 52, 1783–1794. https://doi.org/10.3758/s13428-020-01350-2 <br>

<b>Location Repository:</b><br>
https://github.com/jptrujillo/SPUDNIG_assisted_gesture_annotation_module
<br><br>

 



In [1]:
import cv2
import utils
import movements2
import run_MP_module
import os
import sort_output
import argparse
import pympi
import pandas as pd

In [2]:
def init(filename):
    keypoints_left = utils.keypoint_check(keypoints_left)
    keypoints_right = utils.keypoint_check(keypoints_right)
    keypoints_body = utils.keypoint_check(keypoints_body)
    
    fps = utils.get_fps(filename)
    return  fps, keypoints_left, keypoints_right, keypoints_body




## Running SPUDNIG
In the next step, we will actually run SPUDNIG on our videos. Most of the functions have been tucked away into imported modules to make this tutorial more concise, so there's not much to see here. However, one thing to note is the two-step process. First, we check if there is already some MediaPipe data in the <i>motion_data</i> folder. If not, we go ahead and do the motion tracking. After that, we run the SPUDNIG algorithm which does the actual movement estimation. What is important to note here is that if you already have motion tracking data, you won't have to run it again. This code block will just use what's already there (assuming it's in the motion_data folder). This also makes it a lot easier to go back and adjust parameters for movement detection, without re-running any motion tracking.

### Set some parameters
Here we want to set up some parameters for how the automated detection will work, such as movement thresholds. For the first run, we'll just leave them as is. You can always adjust them and re-run the notebook if you want to fine-tune your movement detection. These values are chosen to minimize the number of false negatives and false positives, but can all be adjusted. The default values (used here) are also meant to skew somewhat towards reducing the number of false negatives as possible, at the cost of additional false positives. In other words, it ensures that you get every gesture in the video, although you will need to clean out the 'noise'.<br>
- <b>Threshold:</b> If the reliability of a key point in a frame is below the reliability threshold, the script stops checking for potential movement and indicates that nomovement is detected in the respective frame, before continuing to the next frame. If the reliability is above the threshold, the script continues and determines whether the key point in question is part of a rest state or part of a movement. <i>range: 0 - 1</i>
- <b>minimum cutoff:</b> minimum number of frames to be considered a movement, lower value = more precise tracking, i.e. even small movements are detected, higher value = more lenient tracking, i.e. only large movements that e xtend over several frames are detected. <i>range: 0 - 10</i>
- <b>gap cutoff:</b> minimum number of frames between two "movements" before they are merged together (i.e., what constitutes a gap). lower value = more individual submovements, higher value = fewer movements, more merging. <i>range: 0 - 10</i>

In [3]:
# minimum number of frames to be considered a "movement" (values: 0-10)
min_cutoff = 4
# minimum number of frames between two "movements" before they are merged together (values: 0-10)
gap_cutoff = 4
# threshold for reliability: lower values correspond to lower reliability/precision of tracking. 
# Frames with lower reliability than this are discarded (values: 0-1)
threshold = 0.3
# these numbers just correspond to keypoints in MediaPipe
keypoints_left = range(4,83)
keypoints_right = range(4,83)
keypoints_body = range(9,23)

In [4]:
video_dir = "./videos_to_process/"
motion_output_folder = "./motion_data/"


This next code block will:
- loop through each video in your <i>videos_to_process</i> folder
- check if there is no tracking data already, and if not, run the motion tracking script
- get the fps of the video, which is used when determining what is considered a "movement
- run SPUDNIG, resulting in a dataframe containing the detected movements
- save an Elan file with these annotations, into the <i>annotations</i> folder

In [5]:
for video_name in os.listdir(video_dir):

    video_name_short =  video_name.split(".")[0]
    data_output_folder = motion_output_folder + "/" + video_name_short + "/" 
    
    if not os.path.isfile(data_output_folder + video_name):
            run_MP_module.process_video(video_name, video_dir, data_output_folder)
        
    #### Now run SPUDNIG ####
    # first, restructure the data for SPUDNIG
    keypoints_left, keypoints_right, keypoints_body = sort_output.sort_MP(
       data_output_folder, keypoints_left, keypoints_right, keypoints_body, video_name_short)
    # then, get the video's fps
    fps = utils.get_fps(video_dir + video_name)

    # now process
    data = movements2.main(data_output_folder, threshold, keypoints_left, keypoints_right, keypoints_body, fps,
                           min_cutoff, gap_cutoff)
    
    print(data)

    Annotation = [(utils.timestamp_to_ms(row[1]["Begin"]), utils.timestamp_to_ms(row[1]["End"]),row[1]["Annotation"]) for row in data.iterrows()]
    tiers = {'Movements':Annotation}
    new_eaf = utils.to_eaf(tiers)
                    
    new_eaf.to_file("./annotations/" + video_name_short + ".eaf")
    

Filename = WTD_short.mp4
outpath = ./motion_data//WTD_short/
1920.0 1080.0 29.97002997002997


TypeError: Holistic.__init__() got an unexpected keyword argument 'num_poses'

## Checking the motion tracking
This first step is basically just a sanity check to make sure that the tracking data makes sense. All this entails, is that you open the video file in the motion_data folder and check if the keypoints make sense. Is it (relatively consistently) putting keypointso on the shoulders, hands, etc? It won't be perfect, but you want to make sure there's not anything strange going on in the video. <br>
Ideally, you have something like this:<br>
<img src="./images/good_tracking.png" width=500 />

Sometimes there will be tracking errors. Take a look at the example below: <br>
<img src="./images/poor_tracking.png" width=500 />
<br>
Here we see some points where tracking did not work well. However, most of the video looks okay. This is really a judgment call on whether you trust the quality of the tracking, and there's not really a standard on this. 

## Cleaning the annotations
Finally, in this step we're going to get to some usable annotations. Go ahead and open the Elan file, that will be the <i>annotations</i> folder. You can then go to Edit >> Linked Files to add the video back in: <br>
<img src="./images/linked_files1.png" width=500 /> <br>
<br>

Now, as noted before, we can do our "assisted annotation". We'll create a new tier called "gesture". <br>
<img src="./images/add_tier.png" width=500 /> <br>
<br>

Now, we can move through each of the annotations in the "movements" tier and see if it corresponds to a true gesture or not. If so, we can select the annotation and use this to create a new annotation in the gesture tier. When the "movement" annotation isn't a gesture (or perhaps just not the kind that we are interested in), we just ignore it and move on. <br>
<img src="./images/add_gesture_annot.png" width=500 /> <br>
<br>



## Tracking and Capturing Annotations for Multi-Person Videos
...Intro coming soon...

In [5]:
from ultralytics import YOLO
from pydantic import BaseModel
import cv2
import csv
import numpy as np
import glob as glob
import os

# identify all videos in all folders
alldatafol = './videos_to_process/'
allvids = glob.glob(alldatafol + '*.mp4')+glob.glob(alldatafol + '*.avi')+glob.glob(alldatafol + '*.mov')
print("the following videos are found: ", allvids)

# output folder
outputfol = './motion_data/'

# lets define the keypoint names, see: https://github.com/Alimustoofaa/YoloV8-Pose-Keypoint-Classification
class GetKeypoint(BaseModel):
    NOSE:           int = 0
    LEFT_EYE:       int = 1
    RIGHT_EYE:      int = 2
    LEFT_EAR:       int = 3
    RIGHT_EAR:      int = 4
    LEFT_SHOULDER:  int = 5
    RIGHT_SHOULDER: int = 6
    LEFT_ELBOW:     int = 7
    RIGHT_ELBOW:    int = 8
    LEFT_WRIST:     int = 9
    RIGHT_WRIST:    int = 10
    LEFT_HIP:       int = 11
    RIGHT_HIP:      int = 12
    LEFT_KNEE:      int = 13
    RIGHT_KNEE:     int = 14
    LEFT_ANKLE:     int = 15
    RIGHT_ANKLE:    int = 16

get_keypoint = GetKeypoint()

# Define skeleton connections
skeleton = [
    (get_keypoint.LEFT_SHOULDER, get_keypoint.RIGHT_SHOULDER),
    (get_keypoint.LEFT_SHOULDER, get_keypoint.LEFT_ELBOW),
    (get_keypoint.RIGHT_SHOULDER, get_keypoint.RIGHT_ELBOW),
    (get_keypoint.LEFT_ELBOW, get_keypoint.LEFT_WRIST),
    (get_keypoint.RIGHT_ELBOW, get_keypoint.RIGHT_WRIST),
    (get_keypoint.LEFT_SHOULDER, get_keypoint.LEFT_HIP),
    (get_keypoint.RIGHT_SHOULDER, get_keypoint.RIGHT_HIP),
    (get_keypoint.LEFT_HIP, get_keypoint.RIGHT_HIP),
    (get_keypoint.LEFT_HIP, get_keypoint.LEFT_KNEE),
    (get_keypoint.RIGHT_HIP, get_keypoint.RIGHT_KNEE),
    (get_keypoint.LEFT_KNEE, get_keypoint.LEFT_ANKLE),
    (get_keypoint.RIGHT_KNEE, get_keypoint.RIGHT_ANKLE),
]


# Load/download the model
model = YOLO('yolov8n-pose.pt')  # if the code below is really slow, you can try 'YOLOv8n-pose' for a faster but less accurate model  yolov8x-pose-p6



the following videos are found:  ['./videos_to_process\\WTD_short.mp4']


In [6]:
%%capture 
import sys
for vid in allvids:
    # get only the filename
    vidlocname = os.path.basename(vid)
    # remove the last 4 characters, so we have the name without the extension
    vidlocname = vidlocname[:-4]
    # Open the video
    cap = cv2.VideoCapture(vid)
    # Get video properties
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width = int(int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))/2) #were making the video half the size (so as to not exceed 100MB github threshold)
    height = int(int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))/2) #were making the video half the size 
    # Define the output video writer
    output_path = outputfol + '/' + vidlocname + '_YOLO.mp4'
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (int(width), int(height)))
    # Prepare CSV file
    csv_path = outputfol + '/' + vidlocname + '_YOLO.csv'
    csv_file = open(csv_path, 'w', newline='')
    csv_writer = csv.writer(csv_file)
    # Write header for the csv file
    header = ['frame', 'person', 'keypoint', 'x', 'y']
    csv_writer.writerow(header)
    frame_count = 0
    # person_tracker
    person_tracker = []
    
    # now lets loop through the video frames
    while cap.isOpened():
        success, frame = cap.read()
        if not success:
            break
        # Run YOLOv8 inference on the frame (this is where the action happens)
        results = model(frame)
        
        # Visualize the results on the frame
        #annotated_frame = results[0].plot()
        annotated_frame = frame
        boxes = results[0].boxes.xywh.cpu()
        #track_ids = results[0].boxes.id.int().cpu().tolist()
        #track_ids = list(range(len(results[0].boxes.conf)))
        confs = results[0].boxes.conf.float().cpu().tolist()
        clss = results[0].boxes.cls.cpu().tolist()
        #namess = results[0].names
        track_ids = []

        idx = 0

        populate = 0 # this is to ensure that in the first frame with detections, all detections are given a unique id
        for box, conf in zip(boxes, confs):

            if conf > 0.6:
                track_distances = []
                #person_track_threshold = w/2
                person_track_threshold = box[2]
    
                person_track_distance = []
                if person_tracker:
                    for person in person_tracker:
                        a = np.array(person[0:2]) # the previous person's x,y coordinates (center of bounding box)
                        b = np.array([box[0],box[1]]) # this is current person
        
                        person_track_distance.append(np.linalg.norm(a-b)) # calculates distance
                else: # this is here so if there has been noone tracked yet, they will be added
                    populate = 1
                    
                if populate == 1:
                    person_track_distance = 9999
    
                min_distance = np.min(person_track_distance)


                
                if min_distance < person_track_threshold:
                    track_id_loc = np.where(person_track_distance == min_distance)[0][0]
                    track_ids.append(track_id_loc)
                    person_tracker[track_id_loc] = box
                else:
                    track_ids.append(len(person_tracker)+1)
                    person_tracker.append(box)

            idx += 1
        populate = 0
        
        for box, track_id, conf in zip(boxes, track_ids,confs):
            
            if conf > 0.6:
                x, y, w, h = box
                
                x1, y1,x2, y2 = x-w/2, y-h/2, x+w/2, y+h/2
                cv2.rectangle(frame, (int(x1),int(y1)),(int(x2),int(y2)), (0, 0, 255), 1)
                label = "person " +str(int(track_id)) + "  conf: " + str(conf)
                t_size = cv2.getTextSize(label, 0, fontScale=3 / 3, thickness=1)[0]
                cv2.rectangle(frame, (int(x1), int(y1) - t_size[1] - 3),(int(x1) + t_size[0], int(y1)+3),  (0,255,0),-1)
                cv2.putText(frame, label, (int(x1), int(y1) - 2), 0, 3 / 3,[0,0,0], thickness=2, lineType=cv2.LINE_AA)
        

        ###################
        # write empty rows in the csv if no person is detected
        if len(results[0].keypoints.xy) == 0:
            csv_writer.writerow([frame_count, None, None, None, None])
            annotated_frame = frame
        # If people are detected
        if len(results[0].keypoints.xy) > 0:
            # Process the results
            for person_idx, person_keypoints in enumerate(results[0].keypoints.xy):
                if confs[person_idx] > 0.6:
                    for keypoint_idx, keypoint in enumerate(person_keypoints):
                        x, y = keypoint
                        # Write to CSV
                        csv_writer.writerow([frame_count, track_ids[person_idx], keypoint_idx, x.item(), y.item()])       
                        # Draw keypoint on the frame
                        cv2.circle(annotated_frame, (int(x), int(y)), 5, (0, 255, 0), -1)
                    # Draw skeleton
                    for connection in skeleton:
                        if connection[0] < len(person_keypoints) and connection[1] < len(person_keypoints):
                            start_point = tuple(map(int, person_keypoints[connection[0]]))
                            end_point = tuple(map(int, person_keypoints[connection[1]]))
                            if all(start_point) and all(end_point):  # Check if both points are valid
                                cv2.line(annotated_frame, start_point, end_point, (255, 0, 0), 2)
        # Write the frame to the output video
                                # downsize the video by a half
        annotated_frame = cv2.resize(annotated_frame, (width, height))
        out.write(annotated_frame)
        frame_count += 1

    # Release everything
    cap.release()
    out.release()
    cv2.destroyAllWindows()
    csv_file.close()

    print(f"Output video saved as {output_path}")
    print(f"Keypoints data saved as {csv_path}")


0: 384x640 3 persons, 182.9ms
Speed: 3.8ms preprocess, 182.9ms inference, 9.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 persons, 100.7ms
Speed: 2.6ms preprocess, 100.7ms inference, 1.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 persons, 91.7ms
Speed: 3.0ms preprocess, 91.7ms inference, 1.6ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 persons, 93.9ms
Speed: 2.5ms preprocess, 93.9ms inference, 1.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 persons, 89.3ms
Speed: 2.7ms preprocess, 89.3ms inference, 1.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 persons, 89.2ms
Speed: 2.5ms preprocess, 89.2ms inference, 1.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 persons, 76.0ms
Speed: 2.5ms preprocess, 76.0ms inference, 1.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 persons, 84.5ms
Speed: 2.4ms preprocess, 84.5ms inference, 2.3ms postprocess per image at sha

<b> Formatting the YOLO data</b><br>
This next code block just takes the raw YOLO output and does some formatting to prepare it for SPUDNIG. Specifically, it:<br>
- creates one file per detected speaker
- removes detections below a certain threshold (60% is the default used here)
- pivots the data from long format to wide format, as is expected by SPUDNIG

In [32]:
import sys

root = motion_output_folder + "WTD_short/"

for files in os.listdir(root):
    if files == files.split(".")[0] + ".csv":
       # with open(root + "rootfiles.pkl","wb") as f:
        #    pickle.dump([root, files],f)
        df = pd.read_csv(root + files)
        # now we need to split into files for each speaker
        speakers = list(set(df["person"]))

        for speaker in speakers:
            speaker_df = df[df["person"] == speaker]
            
            YOLO_keypoints = [6,7,8,9,10,11]

            wide_cols = []
            for val in range(1,18):
                if val in YOLO_keypoints:
                    wide_cols.append("x"+str(val))
                    wide_cols.append("y"+str(val))
            
                
            wide_df = pd.DataFrame(columns=wide_cols)
            
            df_idx = 0
            for frame in range(np.max(speaker_df["frame"])):
                this_row = []
                
                if frame in speaker_df.frame.values:
                    frame_df = speaker_df[speaker_df["frame"]==frame]
                    frame_df.reset_index(inplace=True)
                    for row_idx, row in frame_df.iterrows():
                        if row_idx in YOLO_keypoints:
                            this_row.append(row["x"])
                            this_row.append(row["y"])
            
                        
                    if len(this_row) > 12:
                        diff1 = np.mean(abs(wide_df.loc[df_idx-1] - this_row[0:12]))
                        diff2 = np.mean(abs(wide_df.loc[df_idx-1] - this_row[12:24]))
                        if diff1 < diff2:
                            this_row =  this_row[0:12]
                        else:
                            this_row = this_row[12:34]
            
                if not this_row:
                    this_row = pd.Series(index=range(12))
                        
                wide_df.loc[df_idx] = this_row

                df_idx += 1
                
            wide_df.to_csv(root + files.split(".")[0] + "_speaker" + str(speaker) + ".csv")

<b>Running SPUDNIG</b><br>
Now, we can run SPUDNIG. The main difference between this and the single-speaker format shown above (in terms of code) is that we want to process each of the individual speakers, then put everything into one Elan file for output. 

TOOD: make SPUDNIG loop<br>
    ---- make sure output is put into ONE elan file