In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/input-data/skateboarding.mp4
/kaggle/input/input-data/people_02.mp4
/kaggle/input/input-data/people_01.mp4


In [2]:
import numpy as np
import torch
import os
import warnings

from segment_anything import sam_model_registry, SamAutomaticMaskGenerator, SamPredictor

!mkdir weights
class SAM_handler:
    def __init__(self, device):
        HOME ='/kaggle/working'
        if not os.path.exists(f"/home/michal/Documents/FIT/DP/dp/src/impl/weights/sam_vit_h_4b8939.pth"):
            warnings.warn("sam not available, downloading...")
            !wget -q https://dl.fbaipublicfiles.com/segment_anything/sam_vit_h_4b8939.pth -P {HOME}/weights/
        self.m_sam_checkpoint = f"/home/michal/Documents/FIT/DP/dp/src/impl/weights/sam_vit_h_4b8939.pth"
        self.m_model_type = "vit_h"
        self.m_device = device
        self.sam = sam_model_registry[self.m_model_type](checkpoint=self.m_sam_checkpoint)
        self.sam.to(device=self.m_device)
        self.m_predictor = SamPredictor(self.sam)

    def transformBoxes(self, video_dims, detections):
        transformed_boxes = self.m_predictor.transform.apply_boxes_torch(detections[0].boxes.xyxy,
                                                                         video_dims)
        return transformed_boxes

    def predict(self, frame, transformed_boxes):
        self.m_predictor.set_image(frame)
        masks, scores, logits = self.m_predictor.predict_torch(
            boxes=transformed_boxes,
            multimask_output=False,
            point_coords=None,
            point_labels=None
        )
        masks = np.array(masks.cpu())
        return masks, scores, logits

In [3]:
HOME = os.getcwd()
HOME

'/kaggle/working'

In [4]:
!pip install ultralytics
from ultralytics import YOLO
import os
import numpy as np
import cv2

class YOLOHandler:
    def __init__(self, chosen_class_ids=[0]):
        HOME ='/kaggle/working'
        self.m_yolo_model = YOLO(f'{HOME}/weights/yolov8n.pt')
        if not isinstance(chosen_class_ids, list):
            raise Exception("Chosen class ids is not a list type.")
        if max(chosen_class_ids) > max(self.m_yolo_model.names.keys()):
            raise Exception(f"Invalid class id, max id is: {max(self.m_yolo_model.names.keys())}")
        if min(chosen_class_ids) < min(self.m_yolo_model.names.keys()):
            raise Exception(f"Invalid class id, min id is: {min(self.m_yolo_model.names.keys())}")
        self.chosen_class_ids = chosen_class_ids
        self.colors = np.random.randint(0, 256, size=(len(self.m_yolo_model.names), 3))

    def PrintAvailableModelNames(self):
        print(self.m_yolo_model.names)

    def set_chosenClassIds(self, chosen_class_ids):
        self.chosen_class_ids = chosen_class_ids

    def predict(self, frame):
        detections = self.m_yolo_model.predict(frame, conf=0.7)
        return detections

    def get_color(self, color):
        return int(color[0]), int(color[1]), int(color[2])

    def visualizeDetectionsBbox(self, frame, boxes, conf_thresholds, class_ids):
        frame_copy = np.copy(frame)
        for idx in range(len(boxes)):
            class_id = int(class_ids[idx])
            conf = float(conf_thresholds[idx])
            x1, y1, x2, y2 = int(boxes[idx][0]), int(boxes[idx][1]), int(boxes[idx][2]), int(boxes[idx][3])
            color = self.colors[class_id]
            label = f"{self.m_yolo_model.names[class_id]}: {conf:.2f}"
            cv2.rectangle(frame_copy, (x1, y1), (x2, y2), self.get_color(color), 2)
            cv2.putText(frame_copy, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, self.get_color(color), 2)
        return frame_copy

Collecting ultralytics
  Downloading ultralytics-8.1.9-py3-none-any.whl.metadata (40 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.2/40.2 kB[0m [31m1.3 MB/s[0m eta [36m0:00:00[0m
Collecting thop>=0.1.1 (from ultralytics)
  Downloading thop-0.1.1.post2209072238-py3-none-any.whl (15 kB)
Downloading ultralytics-8.1.9-py3-none-any.whl (709 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m709.3/709.3 kB[0m [31m14.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: thop, ultralytics
Successfully installed thop-0.1.1.post2209072238 ultralytics-8.1.9


In [5]:
import torch
import warnings
import cv2
import os
import re
class VideoMTT:
    def __init__(self, input_video=None, MTT=None, SAM=None, YOLO=None, output_video=None):
        self.m_output_video = None
        self.m_YOLO = None
        self.m_SAM = None
        self.m_MTT = None
        self.m_input_video = None
        self.set_inputVideo(input_video)
        self.set_MTT(MTT)
        self.set_SAM(SAM)
        self.set_YOLO(YOLO)
        self.set_outputVideo(output_video)
        self.m_device = 'cuda:0' if torch.cuda.is_available() else 'cpu'

    def set_inputVideo(self, input_video):
        if not os.path.exists(input_video):
            raise Exception("Input file does not exists.")
        pattern = r".*.mp4"
        if not re.match(pattern, input_video):
            raise Exception("Input file is not mp4 format.")
        self.m_input_video = input_video

    def set_MTT(self, MTT):
        self.m_MTT = MTT

    def set_SAM(self, SAM):
        self.m_SAM = SAM

    def set_YOLO(self, YOLO):
        self.m_YOLO = YOLO

    def set_outputVideo(self, output_video):
        self.m_output_video = output_video

    def set_device(self, device):
        if device != "cpu" and not torch.cuda.is_available():
            warnings.warn("GPU is not available, setting device to cpu.")
            self.m_device = "cpu"
            return
        self.m_device = "cpu"
        return

    def checkClassMembers(self):
        if self.m_MTT == None:
            raise Exception("MTT is not set")
        if not isinstance(self.m_SAM, SAM_handler):
            raise Exception("SAM is not set")
        if not isinstance(self.m_YOLO, YOLOHandler):
            raise Exception("YOLO is not set")
        if self.m_output_video == None:
            raise Exception("Output video is not set")
        if self.m_input_video == None:
            raise Exception("Input video is not set")

    def get_videoDimensions(self, cap):
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        return width, height

    def get_videoFps(self, cap):
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        return fps

    def get_outputVideoWriter(self, input_cap, output_path):
        # Get the video's properties (width, height, FPS)
        width, height = self.get_videoDimensions(input_cap)
        fps = self.get_videoFps(input_cap)

        # Define the output video file
        output_codec = cv2.VideoWriter_fourcc(*"mp4v")  # MP4 codec
        output_video = cv2.VideoWriter(output_path, output_codec, fps, (width, height))

        return output_video
    
    def get_color(self, color):
        return int(color[0]), int(color[1]), int(color[2])

    def add_color(self, mask, color):
        next_mask = mask.astype(np.uint8)
        next_mask = np.expand_dims(next_mask, 0).repeat(3, axis=0)
        next_mask = np.moveaxis(next_mask, 0, -1)
        return next_mask * color

# Merge masks into a single, multi-colored mask
    def merge_masks_colored(self, masks, class_ids):
        filtered_class_ids = []
        filtered_masks = []
        for idx, cid in enumerate(class_ids):
            if int(cid) in self.m_YOLO.chosen_class_ids:
                filtered_class_ids.append(cid)
                filtered_masks.append(masks[idx])

        merged_with_colors = self.add_color(filtered_masks[0][0], self.get_color(self.m_YOLO.colors[int(filtered_class_ids[0])])).astype(np.uint8)

        if len(filtered_masks) == 1:
            return merged_with_colors

        for i in range(1, len(filtered_masks)):
            curr_mask_with_colors = self.add_color(filtered_masks[i][0], self.get_color(self.m_YOLO.colors[int(filtered_class_ids[i])]))
            merged_with_colors = np.bitwise_or(merged_with_colors, curr_mask_with_colors)
        print("merged with colors shape: ", merged_with_colors.shape)
        return merged_with_colors.astype(np.uint8)


    def run(self):
        self.checkClassMembers()
        videoCap = cv2.VideoCapture(self.m_input_video)
        output_video_boxes = self.get_outputVideoWriter(videoCap, self.m_output_video + "_boxes.mp4")
        output_video_masks = self.get_outputVideoWriter(videoCap, self.m_output_video + "_masks.mp4")
        # print(self.m_input_video)
        frame_num = 1
        while videoCap.isOpened():
            print("frame: ", frame_num)
            ret, frame = videoCap.read()
            if frame_num < 50:
                frame_num +=1
                continue
            if not ret:
                break
            yoloDetections = self.m_YOLO.predict(frame)
            frameWithYoloDetections = self.m_YOLO.visualizeDetectionsBbox(frame,
                                                                          yoloDetections[0].boxes.cpu().xyxy,
                                                                          yoloDetections[0].boxes.cpu().conf,
                                                                          yoloDetections[0].boxes.cpu().cls)
            output_video_boxes.write(frameWithYoloDetections)
            # output_video_boxes.write(frame)
            print("frame with yolo detections shape: ", frameWithYoloDetections.shape)
            transformedBoxes = self.m_SAM.transformBoxes(detections=yoloDetections,
                                                         video_dims=list(self.get_videoDimensions(videoCap)))
            print("transformed boxes shape: ", transformedBoxes.shape)

            if len(transformedBoxes) == 0:
                print("No boxes found on frame", frame_num)
                output_video_masks.write(frame)
                frame_num += 1
                continue
            masks, scores, logits = self.m_SAM.predict(frame, transformedBoxes)
            print("masks shape: ", masks.shape)
            if masks is None or len(masks) == 0:
                print("No masks found on frame", frame_num)
                output_video_masks.write(frame)
                frame_num += 1
                continue
                
            merged_colored_mask = self.merge_masks_colored(masks, yoloDetections[0].boxes.cls)
  
          # Write masks to output video
            image_combined = cv2.addWeighted(frame, 0.7, merged_colored_mask, 0.7, 0)
            output_video_masks.write(image_combined)
            
            
            frame_num += 1
            if frame_num > 55:
                break
        videoCap.release()
        output_video_boxes.release()
        output_video_masks.release()
#         cv2.destroyAllWindows()

In [6]:
HOME = os.getcwd()
!mkdir output
of = f"{HOME}/output/people"
yolo = YOLOHandler()
mtt = "XXX"
device = "cuda" if torch.cuda.is_available() else "cpu"
print("device: ", device)
sam = SAM_handler(device = device)
input = "/kaggle/input/input-data/people_01.mp4"
vid = VideoMTT(input_video=input, MTT = mtt, SAM=sam, YOLO=yolo, output_video=of)
vid.run()

Downloading https://github.com/ultralytics/assets/releases/download/v8.1.0/yolov8n.pt to '/kaggle/working/weights/yolov8n.pt'...


100%|██████████| 6.23M/6.23M [00:00<00:00, 75.8MB/s]


device:  cuda
frame:  1
frame:  2
frame:  3
frame:  4
frame:  5
frame:  6
frame:  7
frame:  8
frame:  9
frame:  10
frame:  11
frame:  12
frame:  13
frame:  14
frame:  15
frame:  16
frame:  17
frame:  18
frame:  19
frame:  20
frame:  21
frame:  22
frame:  23
frame:  24
frame:  25
frame:  26
frame:  27
frame:  28
frame:  29
frame:  30
frame:  31
frame:  32
frame:  33
frame:  34
frame:  35
frame:  36
frame:  37
frame:  38
frame:  39
frame:  40
frame:  41
frame:  42
frame:  43
frame:  44
frame:  45
frame:  46
frame:  47
frame:  48
frame:  49
frame:  50

0: 384x640 8 persons, 1 bird, 128.2ms
Speed: 12.6ms preprocess, 128.2ms inference, 628.5ms postprocess per image at shape (1, 3, 384, 640)
frame with yolo detections shape:  (720, 1280, 3)
transformed boxes shape:  torch.Size([9, 4])
masks shape:  (9, 1, 720, 1280)
merged with colors shape:  (720, 1280, 3)
frame:  51

0: 384x640 8 persons, 1 bird, 8.1ms
Speed: 2.0ms preprocess, 8.1ms inference, 1.5ms postprocess per image at shape (1, 3, 38

In [7]:
HOME

'/kaggle/working'