In [1]:
%pip install -q ultralytics opencv-python pyrealsense2 git+https://github.com/huggingface/transformers.git

Note: you may need to restart the kernel to use updated packages.


## Person class

In [2]:
from typing import List

import cv2
import numpy as np


class Person:
    def __init__(self, bbox: List[float], depth: float = None):
        self.x1, self.y1, self.x2, self.y2 = map(int, bbox)
        self.center = (int((self.x1 + self.x2) / 2), int((self.y1 + self.y2) / 2))
        self.depth = depth

    def draw(self, image: np.ndarray):
        return cv2.rectangle(
            image, (self.x1, self.y1), (self.x2, self.y2), (0, 0, 255), 2
        )

## Crowd class

In [3]:
from typing import Tuple


class Crowd:
    def __init__(self, resolution: Tuple[int]):
        self.people = []
        self.x1, self.y1, self.x2, self.y2 = (
            float("inf"),
            float("inf"),
            float("-inf"),
            float("-inf"),
        )
        self.resolution = resolution

    def is_close(
        self, person: Person, threshold_x: float = 0.05, threshold_z: float = 0.05
    ) -> bool:
        for other in self.people:
            z_diff = abs(person.depth - other.depth) / 255.0

            if person.x2 < other.x1:
                x_diff = abs(other.x1 - person.x2) / self.resolution[0]
            elif other.x2 < person.x1:
                x_diff = abs(person.x1 - other.x2) / self.resolution[0]
            else:
                x_diff = 0

            print(f"x_diff: {x_diff}, z_diff: {z_diff}")

            if x_diff < threshold_x and z_diff < threshold_z:
                return True

    def add_person(self, person: Person):
        self.people.append(person)
        self.update_bbox([person.x1, person.y1, person.x2, person.y2])

    def merge_crowd(self, crowd: "Crowd"):
        self.people.extend(crowd.people)
        self.update_bbox([crowd.x1, crowd.y1, crowd.x2, crowd.y2])

    def update_bbox(self, bbox: List[int]):
        if bbox[0] < self.x1:
            self.x1 = int(bbox[0])
        if bbox[1] < self.y1:
            self.y1 = int(bbox[1])
        if bbox[2] > self.x2:
            self.x2 = int(bbox[2])
        if bbox[3] > self.y2:
            self.y2 = int(bbox[3])

    def draw(self, image: np.ndarray):
        return cv2.rectangle(
            image, (self.x1, self.y1), (self.x2, self.y2), (255, 0, 255), 2
        )

## Crowd Detection

In [4]:
import cv2
import numpy as np
import pyrealsense2 as rs
import torch
import transformers
from PIL import Image
from ultralytics import YOLO

rs_pipeline = rs.pipeline()
rs_config = rs.config()
resolution = (1920, 1080)
rs_config.enable_stream(
    rs.stream.color, resolution[0], resolution[1], rs.format.bgr8, 30
)
rs_pipeline.start(rs_config)

depth_model = transformers.pipeline(
    task="depth-estimation",
    model="LiheYoung/depth-anything-small-hf",
    device=0 if torch.cuda.is_available() else -1,
)
od_model = YOLO("yolov8n.pt").to("cuda" if torch.cuda.is_available() else "cpu")

try:
    while True:
        rs_frames = rs_pipeline.wait_for_frames()
        rs_color_frame = rs_frames.get_color_frame()
        if not rs_color_frame:
            continue

        color_img = np.asanyarray(rs_color_frame.get_data())

        # OD
        od_preds = od_model.predict(color_img, classes=[0], conf=0.6, device="cuda")
        nPeople = len(od_preds[0].boxes)
        od_img = cv2.putText(
            color_img,
            f"Number of people: {nPeople}",
            (int(resolution[0] // 2 - 400), 100),
            cv2.FONT_HERSHEY_SIMPLEX,
            2,
            (0, 255, 0),
            3,
            cv2.LINE_AA,
        )

        # DE
        depth_img = np.array(
            depth_model(Image.fromarray(color_img))["depth"].convert("L")
        )

        # Find Crowds
        crowds = []
        bboxes = od_preds[0].boxes.xyxy
        for bbox in bboxes:
            person = Person(bbox)
            depth = depth_img[person.y1 : person.y2, person.x1 : person.x2]
            depth = depth[depth != 0]
            person.depth = np.mean(depth)
            od_img = person.draw(od_img)

            if len(crowds) == 0:
                crowd = Crowd(resolution)
                crowd.add_person(person)
                crowds.append(crowd)
            else:
                closedCrowdIdx = [
                    crowdIdx
                    for crowdIdx in range(len(crowds))
                    if crowd.is_close(person)
                ]
                if len(closedCrowdIdx) == 0:
                    crowd = Crowd(resolution)
                    crowd.add_person(person)
                    crowds.append(crowd)
                elif len(closedCrowdIdx) == 1:
                    crowd = crowds[closedCrowdIdx[0]]
                    crowd.add_person(person)
                else:
                    main_crowd = crowds[closedCrowdIdx[0]]
                    main_crowd.add_person(person)

                    for crowdIdx in closedCrowdIdx[-1:0:-1]:
                        main_crowd.merge_crowd(crowds[crowdIdx])
                        crowds.pop(crowdIdx)

        for crowd in crowds:
            if len(crowd.people) > 1:
                od_img = crowd.draw(od_img)

        cv2.namedWindow("Object Detection", cv2.WINDOW_NORMAL)
        cv2.imshow("Object Detection", od_img)
        cv2.namedWindow("Depth Estimation", cv2.WINDOW_NORMAL)
        cv2.imshow("Depth Estimation", depth_img)

        key = cv2.waitKey(1)
        if key & 0xFF == ord("q") or key == 27:
            cv2.destroyAllWindows()
            break

finally:
    rs_pipeline.stop()


0: 384x640 (no detections), 62.1ms
Speed: 4.4ms preprocess, 62.1ms inference, 20.6ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 6.1ms
Speed: 2.3ms preprocess, 6.1ms inference, 0.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 5.8ms
Speed: 2.1ms preprocess, 5.8ms inference, 0.6ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 5.8ms
Speed: 2.2ms preprocess, 5.8ms inference, 0.8ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 6.2ms
Speed: 2.1ms preprocess, 6.2ms inference, 0.6ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 2.9ms
Speed: 1.2ms preprocess, 2.9ms inference, 0.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 3.2ms
Speed: 1.1ms preprocess, 3.2ms inference, 0.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 2.7ms
Speed: 2.1ms preprocess, 2.7ms inference, 0.3




0: 384x640 (no detections), 2.6ms
Speed: 1.6ms preprocess, 2.6ms inference, 0.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 2.7ms
Speed: 1.4ms preprocess, 2.7ms inference, 0.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 3.0ms
Speed: 1.3ms preprocess, 3.0ms inference, 0.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 3.0ms
Speed: 1.3ms preprocess, 3.0ms inference, 0.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 5.9ms
Speed: 2.0ms preprocess, 5.9ms inference, 298.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 5.8ms
Speed: 2.1ms preprocess, 5.8ms inference, 1.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 5.8ms
Speed: 2.1ms preprocess, 5.8ms inference, 1.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 persons, 3.0ms
Speed: 1.6ms preprocess, 3.0ms inference, 0.7ms postprocess per image at 

## Object Detection + Depth Estimation

In [15]:
# import cv2
# import numpy as np
# import pyrealsense2 as rs
# import torch
# import transformers
# from PIL import Image
# from ultralytics import YOLO

# rs_pipeline = rs.pipeline()
# rs_config = rs.config()
# resolution = (1920, 1080)
# rs_config.enable_stream(
#     rs.stream.color, resolution[0], resolution[1], rs.format.bgr8, 30
# )
# rs_pipeline.start(rs_config)

# depth_model = transformers.pipeline(
#     task="depth-estimation",
#     model="LiheYoung/depth-anything-small-hf",
#     device=0 if torch.cuda.is_available() else -1,
# )
# od_model = YOLO("yolov8n.pt").to("cuda" if torch.cuda.is_available() else "cpu")

# try:
#     while True:
#         rs_frames = rs_pipeline.wait_for_frames()
#         rs_color_frame = rs_frames.get_color_frame()
#         if not rs_color_frame:
#             continue

#         color_img = np.asanyarray(rs_color_frame.get_data())

#         # OD
#         od_preds = od_model.predict(color_img, classes=[0], conf=0.4, device="cuda")
#         nPeople = len(od_preds[0].boxes)
#         od_img = od_preds[0].plot()

#         od_img = cv2.putText(
#             od_img,
#             f"Number of people: {nPeople}",
#             (int(resolution[0] // 2 - 400), 100),
#             cv2.FONT_HERSHEY_SIMPLEX,
#             2,
#             (0, 255, 0),
#             3,
#             cv2.LINE_AA,
#         )
#         if nPeople >= 3:
#             od_img = cv2.putText(
#                 od_img,
#                 f"Crowd Detected!",
#                 (int(resolution[0] // 2 - 400), 50),
#                 cv2.FONT_HERSHEY_SIMPLEX,
#                 2,
#                 (0, 0, 255),
#                 3,
#                 cv2.LINE_AA,
#             )

#         # DE
#         depth_img = np.array(
#             depth_model(Image.fromarray(color_img))["depth"].convert("L")
#         )

#         cv2.namedWindow("Object Detection", cv2.WINDOW_NORMAL)
#         cv2.imshow("Object Detection", od_img)
#         cv2.namedWindow("Depth Estimation", cv2.WINDOW_NORMAL)
#         cv2.imshow("Depth Estimation", depth_img)

#         key = cv2.waitKey(1)
#         # Press esc or 'q' to close the image window
#         if key & 0xFF == ord("q") or key == 27:
#             cv2.destroyAllWindows()
#             break

# finally:
#     rs_pipeline.stop()


0: 384x640 1 person, 7.0ms
Speed: 3.6ms preprocess, 7.0ms inference, 1.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 6.4ms
Speed: 1.7ms preprocess, 6.4ms inference, 1.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 6.4ms
Speed: 2.2ms preprocess, 6.4ms inference, 1.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 3.4ms
Speed: 1.2ms preprocess, 3.4ms inference, 0.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 3.5ms
Speed: 1.2ms preprocess, 3.5ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 3.5ms
Speed: 1.4ms preprocess, 3.5ms inference, 0.8ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 5.8ms
Speed: 1.7ms preprocess, 5.8ms inference, 1.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 6.1ms
Speed: 1.6ms preprocess, 6.1ms inference, 1.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x

## Depth Anything

In [7]:
# import cv2
# import numpy as np
# import pyrealsense2 as rs
# import torch
# import transformers
# from PIL import Image

# rs_pipeline = rs.pipeline()
# rs_config = rs.config()
# resolution = (1920, 1080)
# rs_config.enable_stream(
#     rs.stream.color, resolution[0], resolution[1], rs.format.bgr8, 30
# )
# rs_pipeline.start(rs_config)

# depth_model = transformers.pipeline(
#     task="depth-estimation",
#     model="LiheYoung/depth-anything-small-hf",
#     device=0 if torch.cuda.is_available() else -1,
# )

# try:
#     while True:
#         rs_frames = rs_pipeline.wait_for_frames()
#         rs_color_frame = rs_frames.get_color_frame()
#         if not rs_color_frame:
#             continue

#         color_img = np.asanyarray(rs_color_frame.get_data())
#         pil_img = Image.fromarray(color_img)
#         depth_img = np.array(depth_model(pil_img)["depth"].convert("L"))

#         cv2.namedWindow("RealSense", cv2.WINDOW_AUTOSIZE)
#         cv2.imshow("RealSense", depth_img)

#         key = cv2.waitKey(1)
#         # Press esc or 'q' to close the image window
#         if key & 0xFF == ord("q") or key == 27:
#             cv2.destroyAllWindows()
#             break

# finally:
#     rs_pipeline.stop()



## Only detection

In [8]:
# import cv2
# import numpy as np
# import pyrealsense2 as rs
# from ultralytics import YOLO

# od_model = YOLO("yolov8n.pt").to("cuda")
# rs_pipeline = rs.pipeline()
# rs_config = rs.config()
# resolution = (1920, 1080)
# rs_config.enable_stream(
#     rs.stream.color, resolution[0], resolution[1], rs.format.bgr8, 30
# )
# rs_pipeline.start(rs_config)

# try:
#     while True:
#         rs_frames = rs_pipeline.wait_for_frames()
#         rs_color_frame = rs_frames.get_color_frame()
#         if not rs_color_frame:
#             continue

#         color_img = np.asanyarray(rs_color_frame.get_data())
#         od_preds = od_model.predict(color_img, classes=[0], conf=0.4, device="cuda")
#         nPeople = len(od_preds[0].boxes)

#         color_img = od_preds[0].plot()
#         color_img = cv2.putText(
#             color_img,
#             f"Number of people: {nPeople}",
#             (int(resolution[0] // 2 - 400), 100),
#             cv2.FONT_HERSHEY_SIMPLEX,
#             2,
#             (0, 255, 0),
#             3,
#             cv2.LINE_AA,
#         )

#         if nPeople >= 3:
#             color_img = cv2.putText(
#                 color_img,
#                 f"Crowd Detected!",
#                 (int(resolution[0] // 2 - 400), 50),
#                 cv2.FONT_HERSHEY_SIMPLEX,
#                 2,
#                 (0, 0, 255),
#                 3,
#                 cv2.LINE_AA,
#             )

#         cv2.namedWindow("RealSense", cv2.WINDOW_AUTOSIZE)
#         cv2.imshow("RealSense", color_img)

#         key = cv2.waitKey(1)
#         # Press esc or 'q' to close the image window
#         if key & 0xFF == ord("q") or key == 27:
#             cv2.destroyAllWindows()
#             break

# finally:
#     rs_pipeline.stop()


0: 384x640 1 person, 58.6ms
Speed: 1.8ms preprocess, 58.6ms inference, 225.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 2.9ms
Speed: 1.5ms preprocess, 2.9ms inference, 0.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 2.9ms
Speed: 1.5ms preprocess, 2.9ms inference, 0.6ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 3.8ms
Speed: 1.7ms preprocess, 3.8ms inference, 0.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 6.0ms
Speed: 2.6ms preprocess, 6.0ms inference, 1.8ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 6.3ms
Speed: 2.7ms preprocess, 6.3ms inference, 1.2ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 3.0ms
Speed: 1.4ms preprocess, 3.0ms inference, 0.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 3.0ms
Speed: 1.2ms preprocess, 3.0ms inference, 0.7ms postprocess per image at shape (1, 3, 384, 640)

0: 

## Record mp4

In [None]:
# import cv2
# import numpy as np
# import pyrealsense2 as rs

# pipeline = rs.pipeline()
# config = rs.config()
# resolution = (1920, 1080)
# config.enable_stream(rs.stream.color, resolution[0], resolution[1], rs.format.bgr8, 30)
# pipeline.start(config)

# fourcc = cv2.VideoWriter_fourcc(*"mp4v")
# out = cv2.VideoWriter("output.mp4", fourcc, 20.0, resolution)

# try:
#     while True:
#         frames = pipeline.wait_for_frames()
#         color_frame = frames.get_color_frame()
#         if not color_frame:
#             continue

#         color_image = np.asanyarray(color_frame.get_data())

#         cv2.namedWindow("RealSense", cv2.WINDOW_AUTOSIZE)
#         cv2.imshow("RealSense", color_image)

#         out.write(color_image)

#         key = cv2.waitKey(1)
#         if key & 0xFF == ord("q") or key == 27:
#             cv2.destroyAllWindows()
#             break

# finally:
#     pipeline.stop()

#     out.release()