In [None]:
%pip install -q ultralytics
%pip install -q supervision

from tqdm.notebook import tqdm
import supervision as sv
import numpy as np
import ultralytics
from ultralytics import YOLO
from google.colab import drive

ultralytics.checks()

Ultralytics YOLOv8.0.231 🚀 Python-3.10.12 torch-2.1.0+cu121 CUDA:0 (Tesla T4, 15102MiB)
Setup complete ✅ (2 CPUs, 12.7 GB RAM, 26.3/78.2 GB disk)


In [None]:
# Подключаем гугл диск для загрузки файлов
drive.mount("/content/drive", force_remount=True)

Mounted at /content/drive


In [None]:
# Пути на исходный и результирующий файл
SOURCE_PATH = '/content/drive/MyDrive/Стажировка/video_test/rakurs1-snyali_kaski.mp4'
RESULT_PATH = '/content/drive/MyDrive/Стажировка/video_test/RESULT/result1-snyali_kaski.mp4'

# Модель детекции с нашими весами
model_det = YOLO("/content/drive/MyDrive/Стажировка/weights/best-3_classes-m-a100.pt")
# Стандартная модель сегментации
model_seg = YOLO('yolov8m-seg.pt')

# Информация о видео
video_info = sv.VideoInfo.from_video_path(SOURCE_PATH)
print(f'\nИсходное видео; Res: {video_info.resolution_wh}, FPS: {video_info.fps}, Dur.: {video_info.total_frames//video_info.fps} sec.')

Downloading https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8m-seg.pt to 'yolov8m-seg.pt'...


100%|██████████| 52.4M/52.4M [00:00<00:00, 234MB/s]



Исходное видео; Res: (2048, 1536), FPS: 100, Dur.: 11 sec.


In [None]:
# Цвет Аннотатора
color = sv.Color.from_hex("ff0000")
# Палитра для 3х классов
#color_palette = sv.ColorPalette.from_hex(["#0f0", "#00f", "#f00"])

# Цвет текста Аннотатора
text_color = sv.Color.from_hex("#0ff")

# Создаем трэкер
byte_tracker = sv.ByteTrack()

# Создаем Аннотатор
halo_annotator = sv.HaloAnnotator(color=color)
label_annotator = sv.LabelAnnotator(text_position=sv.Position.TOP_CENTER, color=color, text_color=text_color, text_padding=2, text_thickness=1)

#corner_annotator = sv.BoxCornerAnnotator(color=color_palette, thickness=1)
#ellipse_annotator = sv.EllipseAnnotator(color=color, thickness=2)
#box_annotator = sv.BoxAnnotator(thickness=4, text_thickness=2, text_scale=1)
#triangle_annotator = sv.TriangleAnnotator(color = color_palette)

### Способ 1

In [None]:
'''
Функция колбэка для обработки кадра
'''
def process_frame(frame: np.ndarray, _) -> np.ndarray:

    # Предсказание модели
    results = model_det(frame, imgsz=video_info.width, conf=0.4, verbose=False)[0]

    # Разбираем результат
    detections = sv.Detections.from_ultralytics(results)

    # Обновляем трэкер
    detections = byte_tracker.update_with_detections(detections)

    # Фильтр по номеру класса 2 - без каски
    detections = detections[np.isin(detections.class_id, [2,5])]

    # Формируем метку для Аннотатора
    #labels = [f"#{tracker_id} {confidence:.0%}" for _, _, confidence, _, tracker_id in detections]
    # labels = [f"{model.names[class_id]} {confidence:0.2f}" for _, _, confidence, class_id, _ in detections]

    # Аннотируем фрейм
    #frame = label_annotator.annotate(scene=frame.copy(), detections=detections, labels=labels)
    frame = triangle_annotator.annotate(scene=frame, detections=detections)
    frame = ellipse_annotator.annotate(scene=frame, detections=detections)

    return frame

In [None]:
# Обработка видео
sv.process_video(source_path=VIDEO_PATH, target_path=RESULT_PATH, callback=process_frame)

### Способ 2 с полигоном и трекером крайний вариант

In [None]:
def is_iopa(ground_truth, pred, conf=.95):

    # Координаты области пересечения.
    ix1 = np.maximum(ground_truth[0], pred[0])
    iy1 = np.maximum(ground_truth[1], pred[1])
    ix2 = np.minimum(ground_truth[2], pred[2])
    iy2 = np.minimum(ground_truth[3], pred[3])

    # Высота и ширина пересечения.
    i_height = np.maximum(iy2 - iy1 + 1, np.array(0.))
    i_width = np.maximum(ix2 - ix1 + 1, np.array(0.))

    # Площадь пересечения
    area_of_intersection = i_height * i_width

    # Высота и ширина Предсказания
    pd_height = pred[3] - pred[1] + 1
    pd_width = pred[2] - pred[0] + 1

    # Площадь Прогноза
    pred_area = pd_height*pd_width
    '''
    # Высота и ширина 'Основной Истины'
    gt_height = ground_truth[3] - ground_truth[1] + 1
    gt_width = ground_truth[2] - ground_truth[0] + 1

    # Площадь 'Основной Истины'
    ground_truth_area = gt_height*gt_width

    # Площадь объединения
    area_of_union = gt_height * gt_width + pd_height * pd_width - area_of_intersection

    # IoU
    iou = area_of_intersection / area_of_union
    '''
    # IoPA интерсекшин овер предикт ареа :)
    iopa = area_of_intersection/pred_area

    if iopa >= conf: return True
    else: return False

In [None]:
import supervision as sv

def mrg_detection(detections_seg, detections_det, conf=0.95)->sv.detection:

  xyxy_lst=[]
  mask_lst=[]
  confidence_lst=[]
  class_id_lst=[]
  tracker_id_lst=[]
  ret_lst=[]

  ret_detections = sv.Detections.empty()

  for s in range(len(detections_seg.xyxy)):
    for d in range(len(detections_det.xyxy)):
      iopa = is_iopa(detections_seg.xyxy[s],detections_det.xyxy[d], conf)
      if iopa:

        if detections_seg.xyxy is not None:
          xyxy_lst.append(detections_seg.xyxy[s])
        else:
          xyxy_lst = None

        if detections_seg.mask is not None:
          mask_lst.append(detections_seg.mask[s])
        else:
          mask_lst = None

        if detections_seg.confidence is not None:
          confidence_lst.append(detections_seg.confidence[s])
        else:
          confidence_lst = None

        if detections_seg.class_id is not None:
          class_id_lst.append(detections_seg.class_id[s])
        else:
          class_id_lst = None

        if detections_seg.tracker_id is not None:
          tracker_id_lst.append(detections_seg.tracker_id[s])
        else:
          tracker_id_lst = None

  if detections_seg.xyxy is not None:
    ret_detections.xyxy=np.asarray(xyxy_lst, dtype=np.float32)
  else:
    ret_detections.xyxy=None

  if detections_seg.mask is not None:
    ret_detections.mask=np.asarray(mask_lst)
  else:
    ret_detections.mask=None

  if detections_seg.confidence is not None:
    ret_detections.confidence=np.asarray(confidence_lst, dtype=np.float32)
  else:
   ret_detections.confidenc=None

  if detections_seg.class_id is not None:
    ret_detections.class_id=np.asarray(class_id_lst, dtype=np.int64)
  else:
    ret_detections.class_id=None

  if detections_seg.tracker_id is not None:
    ret_detections.tracker_id=np.asarray(tracker_id_lst, dtype=np.int64)
  else:
    ret_detections.tracker_id=None

  return ret_detections

In [None]:
# Инициализируем полигон
polygon = np.array([
    [10, 10],
    [10, video_info.height-10],
    [video_info.width-10, video_info.height-10],
    [video_info.width-10, 10]
])

# Создаем зону
zone = sv.PolygonZone(polygon=polygon, frame_resolution_wh=video_info.resolution_wh)
# Создаем аннотатор зоны
zone_annotator = sv.PolygonZoneAnnotator(zone=zone, color=sv.Color.white(), thickness=5, text_thickness=2, text_scale=1)

# Создаем генератор фреймов
frames_generator = sv.get_video_frames_generator(SOURCE_PATH)
# Создаем итератор по генератору
frame_iterator = iter(frames_generator)

with sv.VideoSink(target_path=RESULT_PATH, video_info=video_info) as sink:
  # tqdm в цикл для вывода прогресса обработки видео
  for frame in tqdm(frame_iterator, total=video_info.total_frames):

      result_seg = model_seg(frame, imgsz=video_info.width, verbose=False, conf=.5)[0]
      detections_seg = sv.Detections.from_ultralytics(result_seg)
      detections_seg = detections_seg[np.isin(detections_seg.class_id, 0)]

      # Детектим
      result_det = model_det(frame, imgsz=video_info.width, verbose=False, conf=.5)[0]
      detections_det = sv.Detections.from_ultralytics(result_det)
      # Фильтруем
      detections_det = detections_det[np.isin(detections_det.class_id, 1)]
      # Для детекции обновляем трекер и тригер зоны
      detections_det = byte_tracker.update_with_detections(detections_det)
      zone.trigger(detections=detections_det)

      # Объединяем модели с коэффициентом погружения bb головы в bb персоны 0.89
      detections_sd = mrg_detection(detections_seg, detections_det, conf=0.89)

      # Аннотируем фрейм
      #annotated_frame = triangle_annotator.annotate(scene=frame.copy(), detections=detections)
      #annotated_frame = ellipse_annotator.annotate(scene=annotated_frame.copy(), detections=p_detections)
      #annotated_frame = corner_annotator.annotate(scene=annotated_frame.copy(), detections=h_detections)

      labels = [f"id#{tracker_id} {confidence:.0%}" for _, _, confidence, _, tracker_id in detections_det]
      annotated_frame = label_annotator.annotate(scene=frame.copy(), detections=detections_det, labels=labels)
      if len(detections_sd)>0:
        annotated_frame= halo_annotator.annotate(scene=annotated_frame.copy(), detections=detections_sd)
      annotated_frame = zone_annotator.annotate(scene=annotated_frame.copy())

      sink.write_frame(frame=annotated_frame)

  0%|          | 0/1127 [00:00<?, ?it/s]

### Тест попытка объединить предсказания det и seg моделей с целью добыть маску

In [None]:
import supervision as sv
'''
    Обработка единичного фрейма
'''
# Инициализируем полигон
polygon = np.array([
    [10, 10],
    [10, video_info.height-10],
    [video_info.width-10, video_info.height-10],
    [video_info.width-10, 10]
])

zone = sv.PolygonZone(polygon=polygon, frame_resolution_wh=video_info.resolution_wh)

# Создаем генератор фреймов
frames_generator = sv.get_video_frames_generator(SOURCE_PATH)

# Создаем итератор по генератору
iterator = iter(frames_generator)

# ...
for i in range(720):
  frame = next(iterator)

# detect seg
results_seg = model_seg(frame, imgsz=video_info.width, verbose=False)[0]
detections_seg = sv.Detections.from_ultralytics(results_seg)
detections_seg = detections_seg[np.isin(detections_seg.class_id, 0)]
#print('Люди:', detections_seg.xyxy, len(detections_seg.xyxy))

# detect det
results_det = model_det(frame, imgsz=video_info.width, verbose=False)[0]
detections_det = sv.Detections.from_ultralytics(results_det)
detections_det = detections_det[np.isin(detections_det.class_id, 1)]

detections_det = byte_tracker.update_with_detections(detections_det)
zone.trigger(detections=detections_det)
#print('Нет каски:', detections_det.xyxy, len(detections_det.xyxy))

detections_sd = mrg_detection(detections_seg, detections_det, conf=0.89)

#print(sv.box_iou_batch(detections_det.xyxy, detections_seg.xyxy,), len(sv.box_iou_batch(detections_det.xyxy, detections_seg.xyxy)))

# annotate
#halo_annotator = sv.HaloAnnotator(color=color)
#polygon_annotator = sv.PolygonAnnotator()
#triangle_annotator = sv.TriangleAnnotator(color=color)
zone_annotator = sv.PolygonZoneAnnotator(zone=zone, color=sv.Color.white(), thickness=5, text_thickness=2, text_scale=1)
#box_annotator = sv.BoxAnnotator(thickness=4, text_thickness=4, text_scale=2)
#labels = [f"{model_seg.names[class_id]} {confidence:0.2f}" for _, _, confidence, class_id, _ in detections]
#frame = box_annotator.annotate(scene=frame, detections=detections, labels=labels)
#frame = polygon_annotator.annotate(scene=frame.copy(), detections=detections_seg)
labels = [f"id#{tracker_id} {confidence:.0%}" for _, _, confidence, _, tracker_id in detections_det]
frame = label_annotator.annotate(scene=frame.copy(), detections=detections_det, labels=labels)
if len(detections_sd)>0:
  frame = halo_annotator.annotate(scene=frame.copy(), detections=detections_sd)
#frame = triangle_annotator.annotate(scene=frame.copy(), detections=detections_det)
frame = zone_annotator.annotate(scene=frame.copy())

%matplotlib inline
sv.plot_image(frame, (16, 16))

#### Проверки типов для функций

In [None]:
detections_sd

Detections(xyxy=array([], dtype=float32), mask=array([], dtype=float64), confidence=array([], dtype=float32), class_id=array([], dtype=int64), tracker_id=None)

In [None]:
for i in range(len(detections_seg.xyxy)):
  for k in range(len(detections_det.xyxy)):
   print(i,k,is_iopa(detections_seg.xyxy[i],detections_det.xyxy[k]))

In [None]:
detections_det

Detections(xyxy=array([], shape=(0, 4), dtype=float32), mask=None, confidence=array([], dtype=float32), class_id=array([], dtype=int64), tracker_id=array([], dtype=int64))

In [None]:
detections_seg

Detections(xyxy=array([[     313.78,      95.675,      433.47,      385.16],
       [      164.3,      107.08,      261.68,      387.23]], dtype=float32), mask=array([[[False, False, False, ..., False, False, False],
        [False, False, False, ..., False, False, False],
        [False, False, False, ..., False, False, False],
        ...,
        [False, False, False, ..., False, False, False],
        [False, False, False, ..., False, False, False],
        [False, False, False, ..., False, False, False]],

       [[False, False, False, ..., False, False, False],
        [False, False, False, ..., False, False, False],
        [False, False, False, ..., False, False, False],
        ...,
        [False, False, False, ..., False, False, False],
        [False, False, False, ..., False, False, False],
        [False, False, False, ..., False, False, False]]]), confidence=array([    0.92998,     0.91328], dtype=float32), class_id=array([0, 0]), tracker_id=None)

In [None]:
def filter_seg(ground_truth, pred, conf=.95):
  lst=[]
  for s in range(len(ground_truth)):
    for d in range(len(pred)):
      iopa = is_iopa(ground_truth[s],pred[d], conf)
      if iopa:
        lst.append(ground_truth[s].tolist())
  return np.unique(lst,axis=0)

In [None]:
print('Нет каски:', detections_det.xyxy, len(detections_det.xyxy))

Нет каски: [[     118.72      526.88      178.24      595.52]
 [     126.88      527.49      170.55       557.9]
 [     49.505      662.78      78.431      711.32]
 [     253.01       142.8      266.39      153.18]] 4


In [None]:
print('Люди:', detections_seg.xyxy, len(detections_seg.xyxy))

Люди: [[     120.16      527.83      293.45      796.61]
 [    0.13452      553.89      192.48      850.74]
 [     242.68      143.95      293.42      231.81]] 3


In [None]:
detections_seg

Detections(xyxy=array([[     120.16,      527.83,      293.45,      796.61],
       [    0.13451,      553.89,      192.48,      850.74],
       [     242.68,      143.95,      293.42,      231.81]], dtype=float32), mask=array([[[False, False, False, ..., False, False, False],
        [False, False, False, ..., False, False, False],
        [False, False, False, ..., False, False, False],
        ...,
        [False, False, False, ..., False, False, False],
        [False, False, False, ..., False, False, False],
        [False, False, False, ..., False, False, False]],

       [[False, False, False, ..., False, False, False],
        [False, False, False, ..., False, False, False],
        [False, False, False, ..., False, False, False],
        ...,
        [False, False, False, ..., False, False, False],
        [False, False, False, ..., False, False, False],
        [False, False, False, ..., False, False, False]],

       [[False, False, False, ..., False, False, False],
        

In [None]:
detections_det

Detections(xyxy=array([[     118.72,      526.88,      178.24,      595.52],
       [     126.88,      527.49,      170.55,       557.9],
       [     49.505,      662.78,      78.431,      711.32],
       [     253.01,       142.8,      266.39,      153.18]], dtype=float32), mask=None, confidence=array([    0.55087,     0.51214,     0.44107,     0.37997], dtype=float32), class_id=array([2, 2, 2, 2]), tracker_id=array([1, 2, 3, 4]))

In [None]:
mrg_detection(detections_seg, detections_det, conf=.89)

Detections(xyxy=array([[     120.16,      527.83,      293.45,      796.61],
       [     120.16,      527.83,      293.45,      796.61],
       [    0.13451,      553.89,      192.48,      850.74],
       [     242.68,      143.95,      293.42,      231.81]], dtype=float32), mask=array([[[False, False, False, ..., False, False, False],
        [False, False, False, ..., False, False, False],
        [False, False, False, ..., False, False, False],
        ...,
        [False, False, False, ..., False, False, False],
        [False, False, False, ..., False, False, False],
        [False, False, False, ..., False, False, False]],

       [[False, False, False, ..., False, False, False],
        [False, False, False, ..., False, False, False],
        [False, False, False, ..., False, False, False],
        ...,
        [False, False, False, ..., False, False, False],
        [False, False, False, ..., False, False, False],
        [False, False, False, ..., False, False, False]],

    

In [None]:

xyxy_lst=[]
mask_lst=[]
confidence_lst=[]
class_id_lst=[]
tracker_id_lst=[]
ret_lst=[]

empty_detections = sv.Detections.empty()

for s in range(len(detections_seg.xyxy)):
  for d in range(len(detections_det.xyxy)):
    iopa = is_iopa(detections_seg.xyxy[s],detections_det.xyxy[d], conf=.85)
    if iopa:

      if detections_seg.xyxy is not None:
        xyxy_lst.append(detections_seg.xyxy[s])
      else:
        xyxy_lst = None

      if detections_seg.mask is not None:
        mask_lst.append(detections_seg.mask[s])
      else:
        mask_lst = None

      if detections_seg.confidence is not None:
        confidence_lst.append(detections_seg.confidence[s])
      else:
        confidence_lst = None

      if detections_seg.class_id is not None:
        class_id_lst.append(detections_seg.class_id[s])
      else:
        class_id_lst = None

      if detections_seg.tracker_id is not None:
        tracker_id_lst.append(detections_seg.tracker_id[s])
      else:
        tracker_id_lst = None

if detections_seg.xyxy is not None:
 ret_detections.xyxy=np.asarray(xyxy_lst, dtype=np.float32)
else:
 ret_detections.xyxy=None

if detections_seg.mask is not None:
ret_detections.mask=np.asarray(mask_lst)
else:
  ret_detections.mask=None

if detections_seg.confidence is not None:
ret_detections.confidence=np.asarray(confidence_lst, dtype=np.float32)
else:
  detections_seg.confidenc=None

if empty_detections.class_id is not None:
ret_detections.class_id=np.asarray(class_id_lst, dtype=np.int64)
else:
ret_detections.class_id=None

if empty_detections.tracker_id is not None:
ret_detections.tracker_id=np.asarray(tracker_id_lst, dtype=np.int64)
else:
ret_detections.tracker_id=None

'''
xyxy_arr = np.unique(xyxy_lst,axis=0)
mask_arr = np.unique(mask_lst,axis=0)
confidence_arr = np.unique(confidence_lst)
class_id_arr = np.unique(class_id_lst)
tracker_id_arr = np.unique(tracker_id_lst)

ret_lst.append(xyxy_arr)
ret_lst.append(mask_arr)
ret_lst.append(confidence_arr)
ret_lst.append(class_id_arr)
ret_lst.append(tracker_id_arr)
#print(iopa, detections_seg.xyxy[s],detections_det.xyxy[d])
#print(lst,'\n')
'''


IndentationError: ignored

In [None]:
ret_lst

[array([[    0.13451,      553.89,      192.48,      850.74],
        [     120.16,      527.83,      293.45,      796.61]], dtype=float32),
 array([[[False, False, False, ..., False, False, False],
         [False, False, False, ..., False, False, False],
         [False, False, False, ..., False, False, False],
         ...,
         [False, False, False, ..., False, False, False],
         [False, False, False, ..., False, False, False],
         [False, False, False, ..., False, False, False]],
 
        [[False, False, False, ..., False, False, False],
         [False, False, False, ..., False, False, False],
         [False, False, False, ..., False, False, False],
         ...,
         [False, False, False, ..., False, False, False],
         [False, False, False, ..., False, False, False],
         [False, False, False, ..., False, False, False]]]),
 array([    0.89873,     0.91864], dtype=float32),
 array([0]),
 array([None], dtype=object)]

In [None]:
len(mask_arr[0])

1080

In [None]:
arr=np.array([120.16272735595703, 527.8292846679688, 293.450927734375, 796.6121215820312])
new1=np.isin(new,arr)[0]
new1

array([False, False, False, False])

In [None]:
np.any(np.isin(new,arr))

True

In [None]:
arr

array([     120.16,      527.83,      293.45,      796.61])

In [None]:
np.any(np.in1d(new,arr))

True