In [1]:
import os
import torch
import imageio.v3 as iio
import numpy as np

from base64 import b64encode
from cotracker.utils.visualizer import Visualizer, read_video_from_path
from IPython.display import HTML
from cotracker.predictor import CoTrackerPredictor, CoTrackerOnlinePredictor

# model = torch.hub.load("facebookresearch/co-tracker", "cotracker2_online").to("cuda")
DEFAULT_DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CoTrackerPredictor(checkpoint=os.path.join('./co-tracker/checkpoints/cotracker2.pth'))#.to(DEFAULT_DEVICE)

if torch.cuda.is_available():
    model = model.cuda()


In [2]:
def _process_step(window_frames, is_first_step, queries = None):
        video_chunk = (
            torch.tensor(np.stack(window_frames[-model.step * 2 :]), device=DEFAULT_DEVICE)
            .float()
            .permute(0, 3, 1, 2)[None]
        )  # (1, T, 3, H, W)
        if queries is None or len(queries) == 0:
            return model(
            video_chunk,
            is_first_step=is_first_step
            )
        else:
            if torch.cuda.is_available():
                queries = queries.cuda()
                
            return model(
                video_chunk,
                is_first_step=is_first_step,
                queries=queries[None]
            )

In [3]:
import cv2

def crop_video(input_path, output_path, x_start, y_start, width, height):
    # Open the video file
    cap = cv2.VideoCapture(input_path)

    # Check if the video file is opened successfully
    if not cap.isOpened():
        print("Error: Could not open video file.")
        exit()

    # Get the original video dimensions
    original_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    original_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Create VideoWriter object to save the output video
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # You can change the codec based on your requirements
    out = cv2.VideoWriter(output_path, fourcc, 30.0, (width, height))

    # Iterate through the frames
    while True:
        ret, frame = cap.read()

        # Break the loop if we reach the end of the video
        if not ret:
            break

        # Crop the frame to the specified dimensions
        cropped_frame = frame[y_start:y_start + height, x_start:x_start + width]

        # Resize the cropped frame to the new dimensions
        resized_frame = cv2.resize(cropped_frame, (width, height))

        # Write the resized frame to the output video file
        out.write(resized_frame)

    # Release video capture and writer objects
    cap.release()
    out.release()

    # Close any open windows
    cv2.destroyAllWindows()


In [10]:
import os
import cv2
import csv
import numpy as np
import time
import peakutils
from KeyFrameDetector.utils import convert_frame_to_grayscale, prepare_dirs, plot_metrics

def cotracker_model(video, filename, queries, queries_dict):
    # Iterating over video frames, processing one window at a time:
    is_first_step = True
    window_frames = []

    cap = cv2.VideoCapture('./data/videos/' + filename)
    length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    lstfrm = []
    lstdiffMag = []
    timeSpans = []
    images = []
    full_color = []
    lastFrame = None

    for i in range(length):
        ret, frame = cap.read()
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        grayframe, blur_gray = convert_frame_to_grayscale(frame)

        frame_number = cap.get(cv2.CAP_PROP_POS_FRAMES) - 1
        lstfrm.append(frame_number)
        images.append(grayframe)
        full_color.append(frame)
        if frame_number == 0:
            lastFrame = blur_gray

        diff = cv2.subtract(blur_gray, lastFrame)
        diffMag = cv2.countNonZero(diff)
        lstdiffMag.append(diffMag)
        lastFrame = blur_gray
        window_frames.append(frame)

    
    y = np.array(lstdiffMag)
    base = peakutils.baseline(y, 2)
    indices = peakutils.indexes(y-base, float(0.3), min_dist=1)

    # reset to first frame
    cap.set(cv2.CAP_PROP_POS_FRAMES, 0)

    queries = queries_dict[0]
    for i in indices:
        if i < len(queries_dict):
            queries = torch.vstack((queries, queries_dict[i]))
        # if i % model.step == 0 and i != 0:
        #     if i in indices:
        #         queries = queries_dict[i]
        #     else:
        #         queries = None
        #     # if is_first_step:
        #     #     queries = torch.vstack((queries_dict[0], queries_dict[i]))
        #     # else:
        #     #     queries = None
        #     pred_tracks, pred_visibility = _process_step(
        #         window_frames,
        #         is_first_step,
        #         queries=queries
        #     )
        #     is_first_step = False
        # window_frames.append(frame)

    # video = torch.tensor(np.stack(window_frames), device=DEFAULT_DEVICE).permute(0, 3, 1, 2)[None]
    
    # video = read_video_from_path('./data/videos/' + filename)
    # video = torch.from_numpy(video).permute(0, 3, 1, 2)[None].float()
    
    # if torch.cuda.is_available():
        #model = model.cuda()
        # video = video.cuda()
    # Processing the final video frames in case video length is not a multiple of model.step
    # pred_tracks, pred_visibility = _process_step(
    #     window_frames[-(i % model.step) - model.step - 1 :],
    #     is_first_step
    # )
            
    if torch.cuda.is_available():
        queries = queries.cuda()
        
    pred_tracks, pred_visibility = model(
                video,
                queries=queries[None]
            )
            
    vis = Visualizer(save_dir="./videos/results/cotracker/", linewidth=3)
    vis.visualize(video, pred_tracks, pred_visibility, filename=filename.replace('.mp4', ''), query_frame=0)

    cap.release()
    cv2.destroyAllWindows()
    
    return pred_tracks, pred_visibility

In [32]:

def cotracker_model_2(filename, queries, queries_dict):
    # Iterating over video frames, processing one window at a time:
    is_first_step = True
    window_frames = []

    for i, frame in enumerate(
        iio.imiter(
            './data/videos/' + filename,
            plugin="FFMPEG",
            fps = 30
        )
    ):

        if i % model.step == 0 and i != 0:
            pred_tracks, pred_visibility = _process_step(
                window_frames,
                is_first_step,
                queries=queries #queries_dict[i]
            )
            is_first_step = False
        window_frames.append(frame)

    # Processing the final video frames in case video length is not a multiple of model.step
    pred_tracks, pred_visibility = _process_step(
        window_frames[-(i % model.step) - model.step - 1 :],
        is_first_step
    )

    seq_name = filename
    video = torch.tensor(np.stack(window_frames), device=DEFAULT_DEVICE).permute(0, 3, 1, 2)[None]
    vis = Visualizer(save_dir="./videos/results/cotracker/", linewidth=3)
    vis.visualize(video, pred_tracks, pred_visibility, filename=filename.replace('.mp4', ''), query_frame=0)

    return pred_tracks, pred_visibility

In [5]:
import PoseEstimation.customutils as customutils

df_running_annotations = customutils.load_images_dataframe()
filenames = df_running_annotations['file_name'].unique()

Import CoTrackerPredictor and create an instance of it. We'll use this object to estimate tracks:

In [6]:

def load_video(filename):
    video = read_video_from_path('./data/videos/' + filename)
    video = torch.from_numpy(video).permute(0, 3, 1, 2)[None].float()

    if torch.cuda.is_available():
        video = video.cuda()
    
    return video

Tracking manually selected points

In [17]:
import os, argparse, json, re
from collections import defaultdict
import pandas as pd

def load_annotations(file_path):
    frames = defaultdict(list)
    isExist = os.path.exists(file_path)
    if isExist:
      annotations = []
      with open(file_path, 'r') as f:
          annotations = json.load(f)
          
          keypoints = defaultdict(list)
          for person in annotations['annotations']:
            framecount = len(person['frames'])
            for frame_index in range(0, framecount):
              points = []
              if frame_index < framecount:
                frame = person['frames'][str(frame_index)]
                for node in frame['skeleton']['nodes']:
                    points.append({'id': node['name'], 'x' : node['x'], 'y': node['y']})

              if len(points) > 0:
                keypoints[frame_index].append({'person': {'points': points}})

          for frame in range(0, len(keypoints)):
            frames[frame] = keypoints[frame]

    return frames


def get_queries_for_frame(frame_number, annotations):
  i = 0

  for person in annotations[frame_number]:
    for point in person['person']['points']:
      new_tensor = torch.tensor([float(frame_number), point['x'], point['y']])
      if i == 0:
        queries_for_frames = new_tensor
      else:
        queries_for_frames = torch.vstack((queries_for_frames, new_tensor))
      i += 1

  return queries_for_frames


def get_queries_for_frames(start_frame, end_frame, annotations, step = 4):
  frame = 0
  frames_dict = defaultdict(list)
  all_queries_for_frames = None

  if end_frame == -1:
    end_frame = len(annotations)

  for frame in range(start_frame, end_frame, step):
    queries_for_frames = None
    for person in annotations[frame]:
      for point in person['person']['points']:
        if 'occluded' in point and point['occluded'] == 'false':
          continue
        new_tensor = torch.tensor([float(frame), point['x'], point['y']])
        if queries_for_frames is None:
          queries_for_frames = new_tensor
        if all_queries_for_frames is None:
          queries_for_frames = new_tensor
          all_queries_for_frames = new_tensor
        else:
          queries_for_frames = torch.vstack((queries_for_frames, new_tensor))
          all_queries_for_frames = torch.vstack((all_queries_for_frames, new_tensor))

    frames_dict[frame] = queries_for_frames

  return all_queries_for_frames, frames_dict

In [8]:
def get_image_id(filename):
    mask_file = df_running_annotations['file_name'] == filename
    mask_frame = df_running_annotations['frame'] == 0
    image_id = df_running_annotations.loc[mask_file & mask_frame]['image_id'].iloc[0]
    return image_id
    
def run_cotracker(filename, video):
    annot_json = 'videos/annotations/' + filename.replace('.mp4', '') + '.json'
    annotations = load_annotations(annot_json)

    queries, queries_dict = get_queries_for_frames(0, -1, annotations, 1)
    #queries = get_queries_for_frames(0, 1, annotations)
        
    pred_tracks, pred_visibility = cotracker_model(video, filename, queries, queries_dict)

    num_frames = pred_tracks.cpu().shape[1]
    np_pred = pred_tracks.cpu().numpy()
    df = pd.DataFrame(index=range(num_frames), columns=[str(x) for x in range(num_frames)])
    frames = []
    for i in range(num_frames):
        frame = {}
        frame['image_id'] = int(get_image_id(filename)) + i
        frame['category_id'] = 1

        keypoints = []
        for j in range(len(np_pred[0][i])): #range(26):
            if j % 26 == 25:
                # save current set of keypoints and start new frame/pose
                frame['keypoints'] = keypoints
                frames.append(frame)
                keypoints = []
            keypoints.append(np_pred[0][i][j][0].astype(float))
            keypoints.append(np_pred[0][i][j][1].astype(float))
            keypoints.append(2)

        frame['keypoints'] = keypoints
        frames.append(frame)

    customutils.writeJson(frames,'videos/results/cotracker/' + filename.replace('.mp4', '') + '.json')

    return frames

In [18]:
frames = []
for filename in filenames:
    if filename != "World_Athletics_Women_5000m_Oregon_2022_26.mp4":
        continue
    video = load_video(filename)
    frames += run_cotracker(filename, video)
    break

    
# customutils.writeJson(frames,'videos/results/cotracker/person_keypoints_running.json')