Copyright (c) Meta Platforms, Inc. and affiliates.

<a target="_blank" href="https://colab.research.google.com/github/facebookresearch/co-tracker/blob/main/notebooks/demo.ipynb">
  <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>

# CoTracker: It is Better to Track Together
This is a demo for <a href="https://co-tracker.github.io/">CoTracker</a>, a model that can track any point in a video.

<img src="https://www.robots.ox.ac.uk/~nikita/storage/cotracker/bmx-bumps.gif" alt="Logo" width="50%">

Don't forget to turn on GPU support if you're running this demo in Colab. 

**Runtime** -> **Change runtime type** -> **Hardware accelerator** -> **GPU**

Let's install dependencies for Colab:

In [107]:
import os
import torch

from base64 import b64encode
from cotracker.utils.visualizer import Visualizer, read_video_from_path
from IPython.display import HTML

Read a video from CO3D:

In [109]:
video_dir = '../paper_data/video_1/'
video_path = os.path.join(video_dir, 'orig/video_1.mov')
video = read_video_from_path(video_path)
video = torch.from_numpy(video).permute(0, 3, 1, 2)[None].float()

In [None]:
def show_video(video_path):
    video_file = open(video_path, "r+b").read()
    video_url = f"data:video/mp4;base64,{b64encode(video_file).decode()}"
    return HTML(f"""<video width="640" height="480" autoplay loop controls><source src="{video_url}"></video>""")
 
show_video(video_path)

Import CoTrackerPredictor and create an instance of it. We'll use this object to estimate tracks:

In [322]:
from cotracker.predictor import CoTrackerPredictor

model = CoTrackerPredictor(
    checkpoint=os.path.join(
        '../checkpoints/cotracker2v1.pth'
    ), 
    window_len=16
)

In [323]:
if torch.cuda.is_available():
    model = model.cuda()
    video = video.cuda()

Track points sampled on a regular grid of size 30\*30 on the first frame:

In [324]:
pred_tracks, pred_visibility = model(video, grid_size=30)

Visualize and save the result: 

In [None]:
vis = Visualizer(save_dir=os.path.join(video_dir, "tracking"), pad_value=100)
vis.visualize(video=video, tracks=pred_tracks, visibility=pred_visibility, filename='teaser');

In [None]:
show_video(os.path.join(video_dir, "tracking/teaser.mp4"))

## Tracking manually selected points

We will start by tracking points queried manually.
We define a queried point as: [time, x coord, y coord] 

So, the code below defines points with different x and y coordinates sampled on frames 0, 10, 20, and 30:

In [None]:
import cv2
import numpy as np

def select_roi(frame):
    """Allow user to select a region of interest (ROI) on the frame."""
    roi = cv2.selectROI("Select Region of Interest", frame, fromCenter=False, showCrosshair=True)
    cv2.destroyWindow("Select Region of Interest")
    return roi

def detect_points(frame, roi, blue_lower, blue_upper):
    """Detect blue points in the frame within the ROI."""
    roi_frame = frame[int(roi[1]):int(roi[1]+roi[3]), int(roi[0]):int(roi[0]+roi[2])]
    hsv = cv2.cvtColor(roi_frame, cv2.COLOR_BGR2HSV)
    blue_mask = cv2.inRange(hsv, blue_lower, blue_upper)
    contours, _ = cv2.findContours(blue_mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
    contours = sorted(contours, key=lambda x: cv2.contourArea(x), reverse=True)[:68]
    
    points = []
    for contour in contours:
        (x, y), radius = cv2.minEnclosingCircle(contour)
        x, y = int(x) + int(roi[0]), int(y) + int(roi[1])
        points.append((x, y))
    
    return points

def correct_points(frame, points):
    """Allow user to correct point positions by dragging."""
    corrected_points = points.copy()
    window_name = "Correct Points (Drag to move, press Enter when done, 'r' to reset a point)"
    cv2.namedWindow(window_name)
    
    dragging = False
    drag_point_index = -1

    def mouse_callback(event, x, y, flags, param):
        nonlocal dragging, drag_point_index, corrected_points
        
        if event == cv2.EVENT_LBUTTONDOWN:
            drag_point_index = min(range(len(corrected_points)), 
                                   key=lambda i: (corrected_points[i][0]-x)**2 + (corrected_points[i][1]-y)**2)
            dragging = True
        
        elif event == cv2.EVENT_MOUSEMOVE:
            if dragging:
                corrected_points[drag_point_index] = (x, y)
        
        elif event == cv2.EVENT_LBUTTONUP:
            dragging = False
            drag_point_index = -1
        
        frame_copy = frame.copy()
        for i, (px, py) in enumerate(corrected_points):
            cv2.circle(frame_copy, (px, py), 5, (255, 0, 0), -1)
            cv2.putText(frame_copy, f'ID:{i}', (px, py - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
        cv2.imshow(window_name, frame_copy)
    
    cv2.setMouseCallback(window_name, mouse_callback)
    
    while True:
        frame_copy = frame.copy()
        for i, (x, y) in enumerate(corrected_points):
            cv2.circle(frame_copy, (x, y), 5, (255, 0, 0), -1)
            cv2.putText(frame_copy, f'ID:{i}', (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
        cv2.imshow(window_name, frame_copy)
        
        key = cv2.waitKey(1) & 0xFF
        if key == 13:  # Press Enter to finish correction
            break
        elif key == ord('r'):  # Press 'r' to reset a point
            print("Click on the point you want to reset.")
            reset_point = cv2.waitKey(0) & 0xFF
            if reset_point >= ord('0') and reset_point <= ord('9'):
                index = reset_point - ord('0')
                if index < len(corrected_points):
                    corrected_points[index] = points[index]  # Reset to original position
    
    cv2.destroyWindow(window_name)
    return corrected_points

# Load the video
# video_path = "/Users/mitchfogelson/Downloads/trimmed_scissor_video.mov"  # Replace with your video file path
cap = cv2.VideoCapture(video_path)

# Read the first frame
ret, first_frame = cap.read()
if not ret:
    print("Failed to read the video")
    exit()

# Let the user select the ROI
roi = select_roi(first_frame)

# Define HSV color range for blue dots
blue_lower = np.array([100, 20, 14])
blue_upper = np.array([180, 255, 255])

# Detect initial points
initial_points = detect_points(first_frame, roi, blue_lower, blue_upper)

# Allow user to correct initial points
corrected_points = correct_points(first_frame, initial_points)

# Create the output vector
output_vector = [[0, x, y] for x, y in corrected_points]

# Print the output vector
print("Output vector [frame #, xpos, ypos]:")
for point in output_vector:
    print(point)

# Release everything
cap.release()
cv2.destroyAllWindows()

In [328]:
queries = torch.tensor(output_vector).float()[None].reshape(-1, 3)
if torch.cuda.is_available():
    queries = queries.cuda()

In [None]:
queries[:,0]

We pass these points as input to the model and track them:

In [330]:
pred_tracks, pred_visibility = model(video, queries=queries[None])

Finally, we visualize the results with tracks leaving traces from the frame where the tracking starts.
Color encodes time:

In [331]:
vis = Visualizer(
    save_dir=os.path.join(video_dir, "tracking"),
    linewidth=6,
    mode='cool',
    tracks_leave_trace=-1
)


In [None]:

vis.visualize(
    video=video,
    tracks=pred_tracks,
    visibility=pred_visibility,
    filename='queries');

In [None]:
show_video(os.path.join(video_dir, "tracking/queries.mp4"))

Notice that points queried at frames 10, 20, and 30 are tracked **incorrectly** before the query frame. This is because CoTracker is an online algorithm and only tracks points in one direction. However, we can also run it backward from the queried point to track in both directions. Let's correct this:

In [None]:
import cv2
import numpy as np

def select_roi(frame):
    """Allow user to select a region of interest (ROI) on the frame."""
    roi = cv2.selectROI("Select Region of Interest", frame, fromCenter=False, showCrosshair=True)
    cv2.destroyWindow("Select Region of Interest")
    return roi

def detect_points(frame, roi, blue_lower, blue_upper):
    """Detect blue points in the frame within the ROI."""
    roi_frame = frame[int(roi[1]):int(roi[1]+roi[3]), int(roi[0]):int(roi[0]+roi[2])]
    hsv = cv2.cvtColor(roi_frame, cv2.COLOR_BGR2HSV)
    blue_mask = cv2.inRange(hsv, blue_lower, blue_upper)
    contours, _ = cv2.findContours(blue_mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
    contours = sorted(contours, key=lambda x: cv2.contourArea(x), reverse=True)[:68]
    
    points = []
    for contour in contours:
        (x, y), radius = cv2.minEnclosingCircle(contour)
        x, y = int(x) + int(roi[0]), int(y) + int(roi[1])
        points.append((x, y))
    
    return points

def correct_points(frame, points):
    """Allow user to correct point positions by dragging."""
    corrected_points = points.copy()
    window_name = "Correct Points (Drag to move, press Enter when done, 'r' to reset a point)"
    cv2.namedWindow(window_name)
    
    dragging = False
    drag_point_index = -1

    def mouse_callback(event, x, y, flags, param):
        nonlocal dragging, drag_point_index, corrected_points
        
        if event == cv2.EVENT_LBUTTONDOWN:
            drag_point_index = min(range(len(corrected_points)), 
                                   key=lambda i: (corrected_points[i][0]-x)**2 + (corrected_points[i][1]-y)**2)
            dragging = True
        
        elif event == cv2.EVENT_MOUSEMOVE:
            if dragging:
                corrected_points[drag_point_index] = (x, y)
        
        elif event == cv2.EVENT_LBUTTONUP:
            dragging = False
            drag_point_index = -1
        
        frame_copy = frame.copy()
        for i, (px, py) in enumerate(corrected_points):
            cv2.circle(frame_copy, (px, py), 5, (255, 0, 0), -1)
            cv2.putText(frame_copy, f'ID:{i}', (px, py - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
        cv2.imshow(window_name, frame_copy)
    
    cv2.setMouseCallback(window_name, mouse_callback)
    
    while True:
        frame_copy = frame.copy()
        for i, (x, y) in enumerate(corrected_points):
            cv2.circle(frame_copy, (x, y), 5, (255, 0, 0), -1)
            cv2.putText(frame_copy, f'ID:{i}', (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
        cv2.imshow(window_name, frame_copy)
        
        key = cv2.waitKey(1) & 0xFF
        if key == 13:  # Press Enter to finish correction
            break
        elif key == ord('r'):  # Press 'r' to reset a point
            print("Click on the point you want to reset.")
            reset_point = cv2.waitKey(0) & 0xFF
            if reset_point >= ord('0') and reset_point <= ord('9'):
                index = reset_point - ord('0')
                if index < len(corrected_points):
                    corrected_points[index] = points[index]  # Reset to original position
    
    cv2.destroyWindow(window_name)
    return corrected_points

# Load the video
# video_path = "/Users/mitchfogelson/Downloads/trimmed_scissor_video.mov"  # Replace with your video file path
cap = cv2.VideoCapture(video_path)

# Initialize last_frame
last_frame = None
frame_count = 0

# Read frames until the end of the video
while True:
    ret, frame = cap.read()
    if not ret:
        break
    last_frame = frame
    frame_count += 1

if last_frame is None:
    print("Failed to read any frames from the video")
    # exit()

print(f"Total frames in video: {frame_count}")

# Let the user select the ROI
roi = select_roi(last_frame)

# Define HSV color range for blue dots
blue_lower = np.array([100, 20, 14])
blue_upper = np.array([180, 255, 255])

# Detect initial points
initial_points = detect_points(last_frame, roi, blue_lower, blue_upper)

# Allow user to correct initial points
corrected_points = correct_points(last_frame, initial_points)


In [None]:

# Create the output vector
output_vector = [[frame_count-1, x, y] for x, y in corrected_points]

# Print the output vector
print("Output vector [frame #, xpos, ypos]:")
for point in output_vector:
    print(point)

# Release everything
cap.release()
cv2.destroyAllWindows()

In [336]:
queries = torch.tensor(output_vector).float()[None].reshape(-1, 3)
if torch.cuda.is_available():
    queries = queries.cuda()

In [337]:
pred_tracks, pred_visibility = model(video, queries=queries[None], backward_tracking=True)


In [None]:
pred_tracks[0,0,1,:]

In [None]:

vis.visualize(
    video=video,
    tracks=pred_tracks,
    visibility=pred_visibility,
    filename='queries_backward');

In [None]:
show_video(os.path.join(video_dir, "tracking/queries_backward.mp4"))

In [None]:
import cv2
import numpy as np

def select_roi(frame):
    """Allow user to select a region of interest (ROI) on the frame."""
    roi = cv2.selectROI("Select Region of Interest", frame, fromCenter=False, showCrosshair=True)
    cv2.destroyWindow("Select Region of Interest")
    return roi

def select_scale_points(frame):
    """Allow user to select two points for setting the physical scale."""
    points = []
    
    def mouse_callback(event, x, y, flags, param):
        if event == cv2.EVENT_LBUTTONDOWN:
            points.append((x, y))
            cv2.circle(frame, (x, y), 5, (0, 255, 0), -1)
            cv2.imshow("Select Scale Points", frame)
            
            if len(points) == 2:
                cv2.line(frame, points[0], points[1], (0, 255, 0), 2)
                cv2.imshow("Select Scale Points", frame)
    
    cv2.namedWindow("Select Scale Points")
    cv2.setMouseCallback("Select Scale Points", mouse_callback)
    
    while len(points) < 2:
        cv2.imshow("Select Scale Points", frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    cv2.destroyWindow("Select Scale Points")
    return points

def calculate_scale(points, distance_meters):
    """Calculate the scale (pixels per meter) based on two points and their real-world distance."""
    pixel_distance = np.sqrt((points[1][0] - points[0][0])**2 + (points[1][1] - points[0][1])**2)
    return pixel_distance / distance_meters

def pixels_to_meters(points, scale, origin):
    """Convert pixel coordinates to meters."""
    return [((p[0] - origin[0]) / scale, (p[1] - origin[1]) / scale) for p in points]

# Load the video
# video_path = '/Users/mitchfogelson/Downloads/IMG_2071 2.mov'  # Replace with your video file path
cap = cv2.VideoCapture(video_path)

# Read the first frame (or last frame, depending on your preference)
ret, frame = cap.read()
if not ret:
    print("Failed to read the video")
    exit()

# Let the user select the ROI
roi = select_roi(frame)

# Let the user select two points for scale
scale_points = select_scale_points(frame)

# Ask for the real-world distance between the two points
distance_meters = float(input("Enter the distance between the two points in meters: "))

# Calculate the scale (pixels per meter)
scale = calculate_scale(scale_points, distance_meters)
print(f"Scale: {scale} pixels per meter")

# Set the origin (top-left corner of ROI)
origin = (int(roi[0]), int(roi[1]))

# Now, let's process the pred_tracks
# Assuming pred_tracks is already loaded and has shape [1, # frames, # nodes, 2]
# pred_tracks = np.random.rand(1, 100, 10, 2) * 100  # Replace this with your actual pred_tracks

# Convert pred_tracks from pixels to meters
pred_tracks_meters = np.zeros_like(pred_tracks)
for i in range(pred_tracks.shape[1]):  # For each frame
    for j in range(pred_tracks.shape[2]):  # For each node
        x_pixels, y_pixels = pred_tracks[0, i, j]
        x_meters, y_meters = pixels_to_meters([(x_pixels, y_pixels)], scale, origin)[0]
        pred_tracks_meters[0, i, j] = [x_meters, y_meters]

# Print some sample results
print("\nSample results (first 5 frames, first 3 nodes):")
print("Frame | Node | Pixels (x, y) | Meters (x, y)")
print("-" * 50)
for i in range(min(5, pred_tracks.shape[1])):
    for j in range(min(3, pred_tracks.shape[2])):
        pixels = pred_tracks[0, i, j]
        meters = pred_tracks_meters[0, i, j]
        print(f"{i:5d} | {j:4d} | ({pixels[0]:6.2f}, {pixels[1]:6.2f}) | ({meters[0]:6.2f}, {meters[1]:6.2f})")

# Release everything
cap.release()
cv2.destroyAllWindows()

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from moviepy.editor import VideoFileClip
import matplotlib.animation as animation

def create_animation(pred_tracks_meters, output_file='animation.mp4', fps=30, dot_size=5):
    """
    Create an animation of the points from pred_tracks_meters.
    
    Args:
    pred_tracks_meters (np.array): Array of shape [1, # frames, # nodes, 2] containing the x, y coordinates in meters.
    output_file (str): Name of the output video file.
    fps (int): Frames per second for the output video.
    dot_size (int): Size of the dots representing the points.
    
    Returns:
    None
    """
    # Extract relevant dimensions
    num_frames = pred_tracks_meters.shape[1]
    num_nodes = pred_tracks_meters.shape[2]
    
    # Create a new figure and axis
    fig, ax = plt.subplots(figsize=(10, 10))
    
    #square aspect ratio
    ax.set_aspect('equal', adjustable='box')
    
    # Set the limits of the plot
    x_min, x_max = pred_tracks_meters[0, :, :, 0].min(), pred_tracks_meters[0, :, :, 0].max()
    y_min, y_max = pred_tracks_meters[0, :, :, 1].min(), pred_tracks_meters[0, :, :, 1].max()
    ax.set_xlim(x_min, x_max)
    ax.set_ylim(y_min, y_max)
    
    # Create a scatter plot for the points
    scatter = ax.scatter([], [], s=dot_size)
    
    # Set labels and title
    ax.set_xlabel('X position (meters)')
    ax.set_ylabel('Y position (meters)')
    ax.set_title('Point Tracking Animation')
    
    # Add a text annotation for the frame number
    frame_text = ax.text(0.02, 0.98, '', transform=ax.transAxes, verticalalignment='top')
    
    def init():
        scatter.set_offsets(np.c_[[], []])
        frame_text.set_text('')
        return scatter, frame_text
    
    def update(frame):
        # Update the positions of the points
        positions = pred_tracks_meters[0, frame, :, :]
        scatter.set_offsets(positions)
        
        # Update the frame number text
        frame_text.set_text(f'Frame: {frame}')
        
        return scatter, frame_text
    
    # Create the animation
    anim = FuncAnimation(fig, update, frames=num_frames, init_func=init, blit=True, interval=1000/fps)
    
    # Save the animation as a video file
    writer = animation.writers['ffmpeg'](fps=fps)
    anim.save(output_file, writer=writer)
    
    plt.close(fig)
    
    print(f"Animation saved as {output_file}")

# Example usage:
# Assuming pred_tracks_meters is already defined
create_animation(pred_tracks_meters, output_file=os.path.join(video_dir, 'tracking/point_tracking_animation.mp4'), fps=30, dot_size=5)

In [None]:
import numpy as np
import pandas as pd

def save_pred_tracks_to_csv(pred_tracks_meters, filename):
    """
    Save pred_tracks_meters to a CSV file with the following format:
    Headers: node ids
    Rows: (x,y) for each frame

    Args:
    pred_tracks_meters (np.array): Array of shape [1, # frames, # nodes, 2] containing the x, y coordinates in meters.
    filename (str): Name of the output CSV file.
    """
    # Remove the first dimension if it's 1
    if pred_tracks_meters.shape[0] == 1:
        pred_tracks_meters = pred_tracks_meters.squeeze(0)

    num_frames, num_nodes, _ = pred_tracks_meters.shape

    # Create headers (node IDs)
    headers = [f'Node_{i}' for i in range(num_nodes)]

    # Create a list to store each row of data
    data_rows = []

    # Process each frame
    for frame in range(num_frames):
        row = []
        for node in range(num_nodes):
            x, y = pred_tracks_meters[frame, node]
            row.append(f'({x:.4f},{y:.4f})')
        data_rows.append(row)

    # Create a DataFrame
    df = pd.DataFrame(data_rows, columns=headers)

    # Add a frame number column
    df.insert(0, 'Frame', range(num_frames))

    # Save to CSV
    df.to_csv(filename, index=False)
    print(f"Data saved to {filename}")

# Example usage:
# Assuming pred_tracks_meters is already defined
if not os.path.exists(os.path.join(video_dir, 'csv')):
    os.makedirs(os.path.join(video_dir, 'csv'))
save_pred_tracks_to_csv(pred_tracks_meters, os.path.join(video_dir, 'csv/pred_tracks_formatted.csv'))

## Get slop from video

In [110]:
import pandas as pd
import numpy as np 
import matplotlib.pyplot as plt

In [111]:
df = pd.read_csv(os.path.join(video_dir, 'csv/pred_tracks_formatted.csv'))

In [112]:
frame = 1

In [None]:
positions = np.asarray([[float(df.iloc[frame,1:][i].replace('(','').replace(')','').split(',')[0]), float(df.iloc[frame,1:][i].replace('(','').replace(')','').split(',')[1])] for i in range(df.shape[1]-1)])

In [None]:

# plot the data 
plt.figure(figsize=(10, 10))
# for i in range(1, df.shape[1]):
plt.scatter(positions[:,0], positions[:,1],  s=10)
ax = plt.gca()
ax.set_aspect('equal', adjustable='box')
plt.xlabel('Frame')
plt.ylabel('Position (meters)')
plt.title('Point Tracking Data')
# plt.legend()
plt.show()


In [115]:
# get the 3 points with the smallest x values
sorted_positions = positions[positions[:,0].argsort()]

In [116]:
center_pos = sorted_positions[1, 1]

In [117]:
center_points = [p for p in sorted_positions if np.linalg.norm(p[1]-center_pos) <= 1.055e-2]

In [118]:
def group_points(sorted_positions, tolerance=5e-3):
    grouped_points = []
    adjacency_matrix = np.zeros((len(sorted_positions), len(sorted_positions)))

    center = sorted_positions[1,1]

    # Get the center points that are close to the scissor center
    center_points = [p for p in sorted_positions if np.linalg.norm(p[1] - center) <= tolerance]
    center_points_ind = [i for i, p in enumerate(sorted_positions) if np.linalg.norm(p[1] - center) <= tolerance]
    
    # get all the points that are not the center points 
    sorted_positions_reduced = np.delete(sorted_positions, center_points_ind, axis=0)
    
    for center_point in center_points:
        # Find the 4 points in the [top right, bottom left, bottom right, top left] quadrants
        top_right = min([p for p in sorted_positions_reduced if p[0] > center_point[0] and p[1] > center_point[1]],
                        key=lambda p: np.linalg.norm(p - center_point), default=None)
        bottom_left = min([p for p in sorted_positions_reduced if p[0] < center_point[0] and p[1] < center_point[1]],
                            key=lambda p: np.linalg.norm(p - center_point), default=None)
        bottom_right = min([p for p in sorted_positions_reduced if p[0] > center_point[0] and p[1] < center_point[1]],
                            key=lambda p: np.linalg.norm(p - center_point), default=None)
        top_left = min([p for p in sorted_positions_reduced if p[0] < center_point[0] and p[1] > center_point[1]],
                        key=lambda p: np.linalg.norm(p - center_point), default=None)
        connected_points = [top_right, bottom_left, bottom_right, top_left]

        grouped_points.append({
            'center': center_point,
            'points': connected_points
        })

        # Find indices of the center and connected points in the original sorted_positions
        center_idx = np.where((sorted_positions == center_point).all(axis=1))[0][0]
        adjacency_matrix[center_idx][center_idx] = 1  # Set connection to itself

        for point in connected_points:
            if point is None:  # If the point exists
                continue
            point_idx = np.where((sorted_positions == point).all(axis=1))[0][0]
            adjacency_matrix[center_idx][point_idx] = 1  # Set connection in adjacency matrix
            adjacency_matrix[point_idx][center_idx] = 1  # Make it bidirectional
            adjacency_matrix[point_idx][point_idx] = 1

    return grouped_points, adjacency_matrix

In [None]:
len(center_points)

In [120]:
grouped_points_real, adj_mat = group_points(sorted_positions, 1.055e-2)

In [121]:
# get lower triangle of the adjacency matrix
adj_mat = np.tril(adj_mat)


In [None]:
adj_mat

In [None]:
len(center_points)

In [None]:
sorted_positions[1,:]

In [125]:
# get the angle between the 3 points
def angle_between_points(p1, p2, p3):
    """Calculate the angle between three points."""
    v1 = p1 - p2
    v2 = p3 - p2
    angle = np.arccos(np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2)))
    return np.degrees(angle)

In [126]:
angle = angle_between_points(sorted_positions[0], sorted_positions[1], sorted_positions[2])

In [127]:
def angle_between_2_points(p1, p2):
    """
    Calculate the angle (in degrees) between the line formed by two points (p1, p2) and the x-axis.
    
    Parameters:
    p1 (tuple): Coordinates of the first point (x1, y1)
    p2 (tuple): Coordinates of the second point (x2, y2)
    
    Returns:
    float: Angle between the line through the points and the x-axis, in degrees.
    """
    # Calculate the difference in x and y coordinates
    delta_x = p2[0] - p1[0]
    delta_y = p2[1] - p1[1]
    
    # Calculate the angle using atan2, then convert it to degrees
    angle_rad = np.arctan2(delta_y, delta_x)
    angle_deg = np.rad2deg(angle_rad)
    
    return angle_deg

In [128]:
offset_angle = angle_between_2_points(sorted_positions[0], sorted_positions[2])

In [129]:
length = np.linalg.norm(sorted_positions[0] - sorted_positions[1])*2

In [178]:
class ScissorMechanism:
    def __init__(self, link_length, angle, offset_position, offset_rotation, num_cells):
        self.link_length = link_length
        self.angle = np.deg2rad(angle)  # Convert angle to radians
        self.offset_position = np.array(offset_position)
        self.offset_rotation = np.deg2rad(offset_rotation)  # Convert offset rotation to radians
        self.num_cells = num_cells

    def calculate_endpoints(self):
        """
        Calculate the positions of the center and end points for each scissor unit.
        Returns a list of center and endpoint positions.
        """
        # List to hold the position of the centers and points of all scissor units
        positions = []

        # Loop over the number of scissor cells
        for i in range(self.num_cells):
            # Calculate the x and y positions based on the link length and angle for each cell
            center_x = self.offset_position[0] + i * self.link_length * np.cos(self.angle/2)
            center_y = self.offset_position[1]
            center_position = np.array([center_x, center_y])

            # Endpoints of the scissor link (left and right)
            endpoint_1_x = center_x + 0.5 * self.link_length * np.cos(self.angle/2)
            endpoint_1_y = center_y + 0.5 * self.link_length * np.sin(self.angle/2)
            endpoint_2_x = center_x - 0.5 * self.link_length * np.cos(self.angle/2)
            endpoint_2_y = center_y - 0.5 * self.link_length * np.sin(self.angle/2)
            # Endpoints of the scissor link (left and right)
            endpoint_3_x = center_x + 0.5 * self.link_length * np.cos(-self.angle/2)
            endpoint_3_y = center_y + 0.5 * self.link_length * np.sin(-self.angle/2)
            endpoint_4_x = center_x - 0.5 * self.link_length * np.cos(-self.angle/2)
            endpoint_4_y = center_y - 0.5 * self.link_length * np.sin(-self.angle/2)


            endpoint_1 = np.array([endpoint_1_x, endpoint_1_y])
            endpoint_2 = np.array([endpoint_2_x, endpoint_2_y])
            endpoint_3 = np.array([endpoint_3_x, endpoint_3_y])
            endpoint_4 = np.array([endpoint_4_x, endpoint_4_y])

            # Append positions of center and endpoints for this scissor unit
            positions.append({
                'center': center_position,
                'points': (endpoint_1, endpoint_2, endpoint_3, endpoint_4)
            })

        return positions

    def display_positions(self):
        """
        Display the center and endpoints for each scissor unit.
        """
        positions = self.calculate_endpoints()
        for i, pos in enumerate(positions):
            print(f"Scissor Unit {i+1}:")
            print(f"  Center: {pos['center']}")
            print(f"  Endpoints: {pos['points']}\n")

In [None]:
# Define parameters for the scissor mechanism
link_length = length # Length of each scissor link
angle = 180-angle  # Angle between scissor arms
offset_position = sorted_positions[1,:]  # Starting position of the first scissor center
offset_rotation = offset_angle  # No rotation applied to the entire scissor mechanism
num_cells = 23  # Number of scissor units

# Create the scissor mechanism and display positions
scissor = ScissorMechanism(link_length, angle, offset_position, offset_rotation, num_cells)
scissor.display_positions()

In [217]:
def plot_real_v_sim(scissor_positions, grouped_points_real, frame_id, show=False, save=False, video_dir=video_dir, active_units=None):
    plt.figure(figsize=(10, 10))
    # for i in range(1, df.shape[1]):
    # scissor_positions = scissor.calculate_endpoints()
    # Plot the scissor mechanism
    for unit in scissor_positions:
        center = unit['center']
        plt.scatter(center[0], center[1], color='r')
    
        for endpoint in unit['points']:
            if endpoint is None:
                continue
            plt.scatter(endpoint[0], endpoint[1], color='g')
            plt.plot([center[0], endpoint[0]], [center[1], endpoint[1]], color='r')

        
    for unit in grouped_points_real:
        center = unit['center']
        plt.scatter(center[0], center[1], color='b')

        for endpoint in unit['points']:
            if endpoint is None:
                continue
            plt.scatter(endpoint[0], endpoint[1], color='k')
            plt.plot([center[0], endpoint[0]], [center[1], endpoint[1]], color='b')

    if active_units is not None:
        grouped_points = [grouped_points_real[i] for i in active_units]
        for unit in grouped_points:
            center = unit['center']
            plt.scatter(center[0], center[1], color='y')

            for endpoint in unit['points']:
                if endpoint is None:
                    continue
                plt.scatter(endpoint[0], endpoint[1], color='y')
                plt.plot([center[0], endpoint[0]], [center[1], endpoint[1]], color='y')

    # Formatting the plot
    ax = plt.gca()
    ax.set_aspect('equal', adjustable='box')
    plt.xlabel('Frame')
    plt.ylabel('Position (meters)')
    plt.title('Point Tracking Data with Scissor Mechanism')
    # set x and y limits 
    plt.xlim(0, 0.6)
    plt.ylim(0, 0.06)
    # plt.legend()
    if show:
        plt.show()
        
    if save: 
        plt.savefig(os.path.join(video_dir, f'images/scissor_mechanism_{frame_id}.png'));

In [None]:
slops = []
for real_unit, sim_unit in zip(grouped_points_real, scissor_positions):
    x_slop = real_unit['center'][0] - sim_unit['center'][0]
    y_slop = real_unit['center'][1] - sim_unit['center'][1]
    slops.append([x_slop, y_slop])
    
    for endpoint_real, endpoint_sim in zip(real_unit['points'], sim_unit['points']):
        if endpoint_real is None or endpoint_sim is None:
            continue
        x_slop = endpoint_real[0] - endpoint_sim[0]
        y_slop = endpoint_real[1] - endpoint_sim[1]
        slops.append([x_slop, y_slop])
    

In [211]:
def plot_slops_unadjusted(slops, frame_id, show=False, save=False, video_dir=video_dir):
    slops = np.array(slops)
    plt.figure(figsize=(10, 10))
    plt.scatter(range(len(slops)), slops[:,0], label='X Slop')
    plt.scatter(range(len(slops)), slops[:,1], label='Y Slop')
    plt.legend()
    plt.xlabel('point id')
    plt.ylabel('Slop')
    plt.title("Slops between real and simulated points")
    plt.xlim(0, 120)
    plt.ylim(-0.25, 0.05)
    if show:
        plt.show()
        
    if save: 
        plt.savefig(os.path.join(video_dir, f'images/slops_unadjusteed_{frame_id}.png'));



In [183]:
slops = np.array(slops)
n = np.array(slops).shape[0]
# Create a lower triangular matrix with 1's
lower_triangular_matrix = np.tril(np.ones((n, n)))

In [184]:
x_slops = np.matmul(np.linalg.inv(lower_triangular_matrix),slops[:,0])
y_slops = np.matmul(np.linalg.inv(lower_triangular_matrix),slops[:,1])

In [212]:
def plot_slops_adjusted(x_slops, y_slops, frame_id, show=False, save=False,video_dir=video_dir):
    plt.figure(figsize=(10, 10))
    plt.scatter(range(len(x_slops)), x_slops, label='X Slop')
    plt.scatter(range(len(y_slops)), y_slops, label='Y Slop')
    plt.legend()
    plt.xlabel('point id')
    plt.ylabel('Slop')
    plt.title("Slops between real and simulated points")
    plt.xlim(0, 120)
    plt.ylim(-0.05, 0.05)
    if show:
        plt.show()
        
    if save: 
        plt.savefig(os.path.join(video_dir, f'images/slops_adjusted_{frame_id}.png'));

In [None]:
center_points[:3,:]

In [None]:
grouped_points_real[0]

In [213]:
def get_active_nodes(df, frame, angle_diff_threshold=5.0):
    first_positions = np.asarray([[float(df.iloc[0,1:][i].replace('(','').replace(')','').split(',')[0]), float(df.iloc[0,1:][i].replace('(','').replace(')','').split(',')[1])] for i in range(df.shape[1]-1)])
    first_positions = first_positions[first_positions[:,0].argsort()]

    last_positions = np.asarray([[float(df.iloc[frame,1:][i].replace('(','').replace(')','').split(',')[0]), float(df.iloc[frame,1:][i].replace('(','').replace(')','').split(',')[1])] for i in range(df.shape[1]-1)])
    last_positions = last_positions[last_positions[:,0].argsort()]

    grouped_points_real_first, _ = group_points(first_positions, 1.055e-2)
    grouped_points_real_last, _ = group_points(last_positions, 1.055e-2)
    
    active_units = []
    for i, (unit_first, unit_last) in enumerate(zip(grouped_points_real_first, grouped_points_real_last)):
        if any(p is None for p in unit_first['points'][0:4:3]) or any(p is None for p in unit_last['points'][0:4:3]):
            continue
        angle_first = angle_between_points(unit_first['points'][0], unit_first['center'], unit_first['points'][-1])
        angle_last = angle_between_points(unit_last['points'][0], unit_last['center'], unit_last['points'][-1])
        angle_diff = abs(angle_last - angle_first)
            
        if angle_diff > angle_diff_threshold:
            active_units.append(i)
            
    return active_units

In [None]:
if not os.path.exists(os.path.join(video_dir, 'images')):
    os.makedirs(os.path.join(video_dir, 'images'))
for frame in range(df.shape[0]):
    positions = np.asarray([[float(df.iloc[frame,1:][i].replace('(','').replace(')','').split(',')[0]), float(df.iloc[frame,1:][i].replace('(','').replace(')','').split(',')[1])] for i in range(df.shape[1]-1)])
    
    # get the 3 points with the smallest x values
    sorted_positions = positions[positions[:,0].argsort()]
    
    center_pos = sorted_positions[1, 1]
    
    center_points = [p for p in sorted_positions if np.linalg.norm(p[1]-center_pos) <= 5e-3]

    grouped_points_real, adj_mat = group_points(sorted_positions, 1.055e-2)
    angle = angle_between_points(grouped_points_real[0]['points'][0], grouped_points_real[0]['center'], grouped_points_real[0]['points'][-1])
    length = np.linalg.norm(sorted_positions[0] - sorted_positions[1])*2
    # Define parameters for the scissor mechanism
    link_length = length # Length of each scissor link
    angle = 180-angle  # Angle between scissor arms
    offset_position = sorted_positions[1,:]  # Starting position of the first scissor center
    offset_rotation = offset_angle  # No rotation applied to the entire scissor mechanism
    num_cells = 23  # Number of scissor units

    # Create the scissor mechanism and display positions
    scissor = ScissorMechanism(link_length, angle, offset_position, offset_rotation, num_cells)
    
    scissor_positions = scissor.calculate_endpoints()
    slops = []
    for i, (real_unit, sim_unit) in enumerate(zip(grouped_points_real, scissor_positions)):
        x_slop = real_unit['center'][0] - sim_unit['center'][0]
        y_slop = real_unit['center'][1] - sim_unit['center'][1]
        slops.append([x_slop, y_slop])
        
        if i == 0:
            for endpoint_real, endpoint_sim in zip(real_unit['points'], sim_unit['points']):
            
                if endpoint_real is None or endpoint_sim is None:
                    continue
                x_slop = endpoint_real[0] - endpoint_sim[0]
                y_slop = endpoint_real[1] - endpoint_sim[1]
                slops.append([x_slop, y_slop])
        else:
            for endpoint_real, endpoint_sim in zip(real_unit['points'][0:4:2], sim_unit['points'][0:4:2]):

                #only do top right and bottom right
                
                if endpoint_real is None or endpoint_sim is None:
                    continue
                x_slop = endpoint_real[0] - endpoint_sim[0]
                y_slop = endpoint_real[1] - endpoint_sim[1]
                slops.append([x_slop, y_slop])

    
    slops = np.array(slops)
            
    n = slops.shape[0]
    # Create a lower triangular matrix with 1's
    # lower_triangular_matrix = np.tril(np.ones((n, n)))
    lower_triangular_matrix = np.tril(adj_mat)
    # print(lower_triangular_matrix.shape)
    # print(slops.shape)
    try:
        x_slops = np.matmul(np.linalg.inv(lower_triangular_matrix),slops[:,0])
        y_slops = np.matmul(np.linalg.inv(lower_triangular_matrix),slops[:,1])
    except:
        print(lower_triangular_matrix.shape)
        print(slops.shape)
         
    active_units = get_active_nodes(df, frame, angle_diff_threshold=5.0)
    active_units = [active_units[i] for i in range(len(active_units)) if (active_units[i]-active_units[i-1] == 1 or i == 0) and (active_units[i]-active_units[i-2] == 2 or i <= 1)]
    
    plot_real_v_sim(scissor_positions, grouped_points_real, frame_id=frame, save=True, video_dir=video_dir, active_units=active_units);
    plot_slops_unadjusted(slops, frame_id=frame, save=True, video_dir=video_dir);
    plot_slops_adjusted(x_slops, y_slops, frame_id=frame, save=True, video_dir=video_dir);
        

In [None]:
active_units = get_active_nodes(df, -1, angle_diff_threshold=5.0)
active_units = [active_units[i] for i in range(len(active_units)) if (active_units[i]-active_units[i-1] == 1 or i == 0) and (active_units[i]-active_units[i-2] == 2 or i <= 1)]

print(f"Active units: {active_units}")

In [None]:
y_slops

In [None]:
# frame = 0
# first_positions = np.asarray([[float(df.iloc[frame,1:][i].replace('(','').replace(')','').split(',')[0]), float(df.iloc[frame,1:][i].replace('(','').replace(')','').split(',')[1])] for i in range(df.shape[1]-1)])
# first_positions = first_positions[first_positions[:,0].argsort()]

# frame = -1 
# last_positions = np.asarray([[float(df.iloc[frame,1:][i].replace('(','').replace(')','').split(',')[0]), float(df.iloc[frame,1:][i].replace('(','').replace(')','').split(',')[1])] for i in range(df.shape[1]-1)])
# last_positions = last_positions[last_positions[:,0].argsort()]

# grouped_points_real_first, adj_mat = group_points(first_positions, 1.055e-2)
# grouped_points_real_last, adj_mat = group_points(last_positions, 1.055e-2)
# plot_real_v_sim(grouped_points_real_first[0:2], grouped_points_real_last[0:2], frame_id=frame, save=False, video_dir=video_dir)
# for i, (unit_first, unit_last) in enumerate(zip(grouped_points_real_first, grouped_points_real_last)):
#     if any(p is None for p in unit_first['points'][0:4:3]) or any(p is None for p in unit_last['points'][0:4:3]):
#         continue
#     angle_first = angle_between_points(unit_first['points'][0], unit_first['center'], unit_first['points'][-1])
#     angle_last = angle_between_points(unit_last['points'][0], unit_last['center'], unit_last['points'][-1])
#     angle_diff = abs(angle_last - angle_first)
#     print(i)
#     print(angle_first)
#     print(angle_last)
#     print(angle_diff)
    
#     if angle_diff > 5:
#         print(f"Scissor unit {i} with center at {unit_first['center']} has an angle difference of {angle_diff:.2f} degrees between the first and last frames")

In [None]:
import re
from PIL import Image
import glob
if not os.path.exists(os.path.join(video_dir, 'gifs')):
    os.makedirs(os.path.join(video_dir, 'gifs'))
for image_type in ['scissor_mechanism', 'slops_unadjusteed', 'slops_adjusted']:
    # Set the path pattern for the PNG images
    png_files = glob.glob(os.path.join(video_dir, f"images/{image_type}_*.png"))

    # Custom sorting function to sort by the numeric frame ID
    def sort_by_frame_id(file_name):
        # Use regular expression to extract the numeric part of the file name
        match = re.search(r"(\d+).png", file_name)
        return int(match.group(1)) if match else 0

    # Sort the files by the numeric frame ID
    png_files_sorted = sorted(png_files, key=sort_by_frame_id)

    # Load the images
    images = [Image.open(png) for png in png_files_sorted]

    # Save the images as a GIF
    images[0].save(os.path.join(video_dir, f'gifs/{image_type}.gif'), save_all=True, append_images=images[1:], optimize=False, duration=100, loop=0)

    print(f"GIF saved as '{image_type}.gif'")

In [232]:
print(np.median(abs(x_slops[1:10])))
print(np.median(abs(y_slops[1:10])))
print(np.mean(abs(x_slops[1:10])))
print(np.mean(abs(y_slops[1:10])))

0.0009431131074313839
0.0007153089100620982
0.0008568868925686243
0.0008900826172946735


In [224]:
first_positions = np.asarray([[float(df.iloc[0,1:][i].replace('(','').replace(')','').split(',')[0]), float(df.iloc[0,1:][i].replace('(','').replace(')','').split(',')[1])] for i in range(df.shape[1]-1)])
first_positions = first_positions[first_positions[:,0].argsort()]
angle = angle_between_points(first_positions[0], first_positions[1], first_positions[2])

  first_positions = np.asarray([[float(df.iloc[0,1:][i].replace('(','').replace(')','').split(',')[0]), float(df.iloc[0,1:][i].replace('(','').replace(')','').split(',')[1])] for i in range(df.shape[1]-1)])


In [228]:
np.pi-np.deg2rad(angle)

np.float64(2.4824251646528257)

In [231]:
print(abs(x_slops[1:10]))
print(abs(y_slops[1:10]))

[4.56886893e-04 1.10000000e-03 2.13773785e-04 1.92754757e-03
 4.43113107e-04 9.43113107e-04 1.64132136e-03 4.31131074e-05
 9.43113107e-04]
[0.00015765 0.00071531 0.         0.00071531 0.00025765 0.00127296
 0.00123062 0.00017296 0.00348827]
