<a href="https://colab.research.google.com/github/johnkapustka/skierClassification/blob/main/SkierClassification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Project for Intro to AI: Skier vs Snowboarder Classification

# 0. Setup

In [2]:
!nvidia-smi

Mon Nov  6 01:17:36 2023       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 525.105.17   Driver Version: 525.105.17   CUDA Version: 12.0     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla T4            Off  | 00000000:00:04.0 Off |                    0 |
| N/A   43C    P8     9W /  70W |      0MiB / 15360MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

In [3]:
import torch
import os
import sys

HOME = os.getcwd()

In [4]:
%cd {HOME}
!git clone https://github.com/WongKinYiu/yolov7
%cd {HOME}/yolov7
!pip install -r requirements.txt

sys.path.append(f"{HOME}/yolov7/")

/content
Cloning into 'yolov7'...
remote: Enumerating objects: 1197, done.[K
remote: Counting objects: 100% (6/6), done.[K
remote: Compressing objects: 100% (5/5), done.[K
remote: Total 1197 (delta 2), reused 3 (delta 1), pack-reused 1191[K
Receiving objects: 100% (1197/1197), 74.23 MiB | 29.96 MiB/s, done.
Resolving deltas: 100% (517/517), done.
/content/yolov7
Collecting thop (from -r requirements.txt (line 36))
  Downloading thop-0.1.1.post2209072238-py3-none-any.whl (15 kB)
Collecting jedi>=0.16 (from ipython->-r requirements.txt (line 34))
  Downloading jedi-0.19.1-py2.py3-none-any.whl (1.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m8.1 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: jedi, thop
Successfully installed jedi-0.19.1 thop-0.1.1.post2209072238


# 1. Video Data Path

In [5]:
SKI_VIDEO_PATH = f"{HOME}/input/trail.mp4"

# 2. Utility Functions

In [6]:
from typing import Generator

import matplotlib.pyplot as plt
import numpy as np

import cv2

%matplotlib inline


def generate_frames(video_file: str) -> Generator[np.ndarray, None, None]:
    video = cv2.VideoCapture(video_file)

    while video.isOpened():
        success, frame = video.read()

        if not success:
            break

        yield frame

    video.release()


def plot_image(image: np.ndarray, size: int = 12) -> None:
    plt.figure(figsize=(size, size))
    plt.axis('off')
    plt.imshow(image[...,::-1])
    plt.show()

In [7]:
from typing import Optional, Tuple
from dataclasses import dataclass


@dataclass(frozen=True)
class Point:
    x: float
    y: float

    @property
    def int_xy_tuple(self) -> Tuple[int, int]:
        return int(self.x), int(self.y)


@dataclass(frozen=True)
class Rect:
    x: float
    y: float
    width: float
    height: float

    @property
    def top_left(self) -> Point:
        return Point(x=self.x, y=self.y)

    @property
    def bottom_right(self) -> Point:
        return Point(x=self.x + self.width, y=self.y + self.height)

    @property
    def bottom_center(self) -> Point:
        return Point(x=self.x + self.width / 2, y=self.y + self.height)

@dataclass
class Detection:
    rect: Rect
    class_id: int
    confidence: float
    tracker_id: Optional[int] = None


@dataclass(frozen=True)
class Color:
    r: int
    g: int
    b: int

    @property
    def bgr_tuple(self) -> Tuple[int, int, int]:
        return self.b, self.g, self.r

In [8]:
import cv2

import numpy as np

def draw_rect(image: np.ndarray, rect: Rect, color: Color, thickness: int = 2) -> np.ndarray:
    cv2.rectangle(image, rect.top_left.int_xy_tuple, rect.bottom_right.int_xy_tuple, color.bgr_tuple, thickness)
    return image

# 3. Load YOLOv7 Models

In [9]:
%cd {HOME}/yolov7
!wget https://github.com/WongKinYiu/yolov7/releases/download/v0.1/yolov7-e6e.pt --quiet
!wget https://github.com/WongKinYiu/yolov7/releases/download/v0.1/yolov7-w6-pose.pt --quiet

/content/yolov7


In [10]:
DETECTION_MODEL_WEIGHTS_PATH = f"{HOME}/yolov7/yolov7-e6e.pt"
POSE_MODEL_WEIGHTS_PATH = f"{HOME}/yolov7/yolov7-w6-pose.pt"

In [11]:
import torch

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [12]:
from utils.general import check_img_size
from models.experimental import attempt_load

detection_model = attempt_load(weights=DETECTION_MODEL_WEIGHTS_PATH, map_location=device)

Fusing layers... 


  return _VF.meshgrid(tensors, **kwargs)  # type: ignore[attr-defined]


In [13]:
weigths = torch.load(POSE_MODEL_WEIGHTS_PATH, map_location=device)
pose_model = weigths["model"]
_ = pose_model.float().eval()

if torch.cuda.is_available():
    pose_model.half().to(device)

# 4. Detection and Annotation

### Settings

In [21]:
DETECTION_IMAGE_SIZE = 1920
POSE_IMAGE_SIZE = 960
STRIDE = 64
CONFIDENCE_TRESHOLD = 0.25
IOU_TRESHOLD = 0.65

### Pre-process

In [22]:
from utils.datasets import letterbox
from torchvision import transforms

import numpy as np


def detection_pre_process_frame(frame: np.ndarray, device: torch.device) -> torch.Tensor:
    img = letterbox(frame, DETECTION_IMAGE_SIZE, STRIDE, auto=True)[0]
    img = img[:, :, ::-1].transpose(2, 0, 1)
    img = np.ascontiguousarray(img)
    img = torch.from_numpy(img).to(device).float()
    img /= 255.0
    if img.ndimension() == 3:
        img = img.unsqueeze(0)
    return img

def pose_pre_process_frame(frame: np.ndarray, device: torch.device) -> torch.Tensor:
    image = letterbox(frame, POSE_IMAGE_SIZE, stride=STRIDE, auto=True)[0]
    image = transforms.ToTensor()(image)
    image = torch.tensor(np.array([image.numpy()]))

    if torch.cuda.is_available():
        image = image.half().to(device)

    return image

### Post-process

In [23]:
from typing import Tuple

from utils.general import non_max_suppression_kpt, non_max_suppression
from utils.plots import output_to_keypoint

import numpy as np


def clip_coords(boxes: np.ndarray, img_shape: Tuple[int, int]):
    # Clip bounding xyxy bounding boxes to image shape (height, width)
    boxes[:, 0] = np.clip(boxes[:, 0], 0, img_shape[1]) # x1
    boxes[:, 1] = np.clip(boxes[:, 1], 0, img_shape[0]) # y1
    boxes[:, 2] = np.clip(boxes[:, 2], 0, img_shape[1]) # x2
    boxes[:, 3] = np.clip(boxes[:, 3], 0, img_shape[0]) # y2


def detection_post_process_output(
    output: torch.tensor,
    confidence_trashold: float,
    iou_trashold: float,
    image_size: Tuple[int, int],
    scaled_image_size: Tuple[int, int]
) -> np.ndarray:
    output = non_max_suppression(
        prediction=output,
        conf_thres=confidence_trashold,
        iou_thres=iou_trashold
    )
    coords = output[0].detach().cpu().numpy()

    v_gain = scaled_image_size[0] / image_size[0]
    h_gain = scaled_image_size[1] / image_size[1]

    coords[:, 0] /= h_gain
    coords[:, 1] /= v_gain
    coords[:, 2] /= h_gain
    coords[:, 3] /= v_gain

    clip_coords(coords, image_size)
    return coords


def post_process_pose(pose: np.ndarray, image_size: Tuple, scaled_image_size: Tuple) -> np.ndarray:
    height, width = image_size
    scaled_height, scaled_width = scaled_image_size
    vertical_factor = height / scaled_height
    horizontal_factor = width / scaled_width
    result = pose.copy()
    for i in range(17):
        result[i * 3] = horizontal_factor * result[i * 3]
        result[i * 3 + 1] = vertical_factor * result[i * 3 + 1]
    return result


def pose_post_process_output(
    output: torch.tensor,
    confidence_trashold: float,
    iou_trashold: float,
    image_size: Tuple[int, int],
    scaled_image_size: Tuple[int, int]
) -> np.ndarray:
    output = non_max_suppression_kpt(
        prediction=output,
        conf_thres=confidence_trashold,
        iou_thres=iou_trashold,
        nc=pose_model.yaml['nc'],
        nkpt=pose_model.yaml['nkpt'],
        kpt_label=True)

    with torch.no_grad():
        output = output_to_keypoint(output)

        for idx in range(output.shape[0]):
            output[idx, 7:] = post_process_pose(
                output[idx, 7:],
                image_size=image_size,
                scaled_image_size=scaled_image_size
            )

    return output

### Annotate

In [24]:
from utils.plots import plot_skeleton_kpts
from scipy.spatial import distance

class SkeletonTracker:
    def __init__(self, max_distance=30):
        self.max_distance = max_distance
        self.trackers = {}
        self.next_id = 0

    def update(self, poses):
        # Initialize current_ids at the beginning of the update method
        current_ids = list(self.trackers.keys())

        # If it's the first frame or no skeletons are currently tracked, assign new IDs to all poses
        if not self.trackers or all(len(t[0]) == 0 for t in self.trackers.values()):
            self.trackers = {self.next_id + i: (pose, self.calculate_centroid(pose)) for i, pose in enumerate(poses)}
            self.next_id += len(poses)
        else:
            # Update trackers with new poses
            for pose in poses:
                pose_centroid = self.calculate_centroid(pose)

                # Find the closest tracker to each pose
                distances = {tid: np.linalg.norm(pose_centroid - self.trackers[tid][1])
                             for tid in current_ids}
                if current_ids:  # Make sure there are still IDs to be compared
                    closest_tid, closest_dist = min(distances.items(), key=lambda item: item[1])
                    if closest_dist < self.max_distance:
                    # If the closest tracker is near enough, update it with the new pose
                        self.trackers[closest_tid] = (pose, pose_centroid)
                        current_ids.remove(closest_tid)
                    else:
                    # If no tracker is close enough, start a new one
                        self.trackers[self.next_id] = (pose, pose_centroid)
                        self.next_id += 1
                else:
                # If all current IDs have been matched, start a new tracker for the remaining poses
                    self.trackers[self.next_id] = (pose, pose_centroid)
                    self.next_id += 1


        # Clear trackers without poses
        for tid in current_ids:
            del self.trackers[tid]

        return self.trackers

    def calculate_centroid(self, pose):
        x = pose[::3]  # Extract every third element starting at index 0
        y = pose[1::3] # Extract every third element starting at index 1
        x_mean = np.mean(x[x != 0])  # Calculate the mean, excluding zeros (assuming zero is not a valid coordinate)
        y_mean = np.mean(y[y != 0])  # Same for y
        return np.array([x_mean, y_mean])

def detect_annotate(image: np.ndarray, detections: np.ndarray, color: Color, thickness: int = 2) -> np.ndarray:
    annotated_image = image.copy()
    for x_min, y_min, x_max, y_max, confidence, class_id in detections:
        rect = Rect(
            x=float(x_min),
            y=float(y_min),
            width=float(x_max - x_min),
            height=float(y_max - y_min)
        )
        annotated_image = draw_rect(image=annotated_image, rect=rect, color=color, thickness=thickness)

    return annotated_image

tracker = SkeletonTracker(max_distance=30)
def pose_annotate(image: np.ndarray, detections: np.ndarray) -> np.ndarray:
    annotated_frame = image.copy()

    # Extract poses for the tracker
    poses = [detections[idx, 7:].T for idx in range(detections.shape[0])]

    # Update the tracker with new poses
    tracked_poses = tracker.update(poses)

    for track_id, (pose, _) in tracked_poses.items():
        plot_skeleton_kpts(annotated_frame, pose, 3)

        x = []
        y = []
        for kid in range(len(pose) // 3):
          x.append(pose[3 * kid])
          y.append(pose[3 * kid + 1])

        x_coord = int(np.mean(x))
        y_coord = int(np.mean(y))

        cv2.putText(annotated_frame, f'ID: {track_id}', (x_coord, y_coord), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)

    return annotated_frame

### Single frame detection

In [25]:
COLOR = Color(r=255, g=255, b=255)

frame_iterator = iter(generate_frames(video_file=SKI_VIDEO_PATH))

frame = next(frame_iterator)

detection_pre_processed_frame = detection_pre_process_frame(
    frame=frame,
    device=device
)

image_size = frame.shape[:2]
scaled_image_size = tuple(detection_pre_processed_frame.size())[2:]

with torch.no_grad():

    detection_output = detection_model(detection_pre_processed_frame)[0].detach().cpu()

    detection_output = detection_post_process_output(
        output=detection_output,
        confidence_trashold=CONFIDENCE_TRESHOLD,
        iou_trashold=IOU_TRESHOLD,
        image_size=image_size,
        scaled_image_size=scaled_image_size
    )

annotated_frame = detect_annotate(image=frame, detections=detection_output, color=COLOR)

plot_image(annotated_frame, 16)

### Single frame pose

In [26]:
def process_frame_and_annotate(frame: np.ndarray) -> np.ndarray:
    pose_pre_processed_frame = pose_pre_process_frame(frame=frame.copy(), device=device)

    image_size = frame.shape[:2]
    scaled_image_size = tuple(pose_pre_processed_frame.size())[2:]

    with torch.no_grad():
        pose_output, _ = pose_model(pose_pre_processed_frame)
        pose_output = pose_post_process_output(
            output=pose_output,
            confidence_trashold=CONFIDENCE_TRESHOLD,
            iou_trashold=IOU_TRESHOLD,
            image_size=image_size,
            scaled_image_size=scaled_image_size
        )

    annotated_frame = pose_annotate(image=frame, detections=pose_output)

    return annotated_frame

In [27]:
frame_iterator = iter(generate_frames(video_file=SKI_VIDEO_PATH))

frame = next(frame_iterator)

annotated_frame = process_frame_and_annotate(frame=frame)

plot_image(annotated_frame, 16)

# 5. Process video

In [28]:
from dataclasses import dataclass

import cv2


"""
usage example:

video_config = VideoConfig(
    fps=30,
    width=1920,
    height=1080)
video_writer = get_video_writer(
    target_video_path=TARGET_VIDEO_PATH,
    video_config=video_config)

for frame in frames:
    ...
    video_writer.write(frame)

video_writer.release()
"""


# stores information about output video file, width and height of the frame must be equal to input video
@dataclass(frozen=True)
class VideoConfig:
    fps: float
    width: int
    height: int


# create cv2.VideoWriter object that we can use to save output video
def get_video_writer(target_video_path: str, video_config: VideoConfig) -> cv2.VideoWriter:
    video_target_dir = os.path.dirname(os.path.abspath(target_video_path))
    os.makedirs(video_target_dir, exist_ok=True)
    return cv2.VideoWriter(
        target_video_path,
        fourcc=cv2.VideoWriter_fourcc(*"mp4v"),
        fps=video_config.fps,
        frameSize=(video_config.width, video_config.height),
        isColor=True
    )


def get_frame_count(path: str) -> int:
    cap = cv2.VideoCapture(path)
    return int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

In [29]:
SOURCE_VIDEO_PATH = SKI_VIDEO_PATH
TARGET_VIDEO_PATH = f"{HOME}/output/pose-estimation.mp4"

In [30]:
from tqdm.notebook import tqdm

COLOR = Color(r=255, g=255, b=255)

# initiate video writer
video_config = VideoConfig(
    fps=25,
    width=1920,
    height=1080)
video_writer = get_video_writer(
    target_video_path=TARGET_VIDEO_PATH,
    video_config=video_config)

# get fresh video frame generator
frame_iterator = iter(generate_frames(video_file=SOURCE_VIDEO_PATH))

total = get_frame_count(SOURCE_VIDEO_PATH)

for frame in tqdm(frame_iterator, total=total):
    annotated_frame = frame.copy()

    with torch.no_grad():
        image_size = frame.shape[:2]

        #detection
        detection_pre_processed_frame = detection_pre_process_frame(
            frame=frame,
            device=device)
        detection_scaled_image_size = tuple(detection_pre_processed_frame.size())[2:]

        detection_output = detection_model(detection_pre_processed_frame)[0].detach().cpu()
        detection_output = detection_post_process_output(
            output=detection_output,
            confidence_trashold=CONFIDENCE_TRESHOLD,
            iou_trashold=IOU_TRESHOLD,
            image_size=image_size,
            scaled_image_size=detection_scaled_image_size
        )
        # annotated_frame = detect_annotate(image=annotated_frame, detections=detection_output, color=COLOR)

        # pose
        pose_pre_processed_frame = pose_pre_process_frame(
            frame=frame,
            device=device)
        pose_scaled_image_size = tuple(pose_pre_processed_frame.size())[2:]

        pose_output = pose_model(pose_pre_processed_frame)[0].detach().cpu()
        pose_output = pose_post_process_output(
            output=pose_output,
            confidence_trashold=CONFIDENCE_TRESHOLD,
            iou_trashold=IOU_TRESHOLD,
            image_size=image_size,
            scaled_image_size=pose_scaled_image_size
        )
        annotated_frame = pose_annotate(
            image=annotated_frame, detections=pose_output)

        # save video frame
        video_writer.write(annotated_frame)

# close output video
video_writer.release()

  0%|          | 0/422 [00:00<?, ?it/s]