##### 탐지된 객체 종류별 그룹화

In [None]:
import numpy as np
from collections import defaultdict
from mytool import centroid_tracking, tracking_id, corn2xywh

def groupby_detection_to_dic(names, detection):
    groupby_dic = defaultdict(lambda:{'ids':[], 'boxes':[], 'centroids':[], })
    if len(detection) == 0: return groupby_dic
    idxs = detection[:,-1]
    boxes = detection[:, :4]
    centroids = corn2xywh(boxes)[:,:2]
    for idx, box, centroid in zip(idxs, boxes, centroids):
        name = names[int(idx)]
        groupby_dic[name]['boxes'].append(box)
        groupby_dic[name]['centroids'].append(centroid)
    
    for key in groupby_dic:
        groupby_dic[key]['boxes'] = np.stack(groupby_dic[key]['boxes']) if groupby_dic[key]['boxes'] else []
        groupby_dic[key]['centroids'] = np.stack(groupby_dic[key]['centroids']) if groupby_dic[key]['centroids'] else []
    
    return groupby_dic

def groupby_dic_updater(next_id_dic, groupby_dic_t0, names, detection): # detection정보로 next_id_dic 객체 카운트?
    groupby_dic_t1 = groupby_detection_to_dic(names, detection) # {'car':[[], boxes, centroids], 'bus':[[], boxes, centroids]}
    keys = set(groupby_dic_t0.keys()) | set(groupby_dic_t1.keys())
    for key in keys:
        centroids_t0 = groupby_dic_t0[key]['centroids']
        centroids_t1 = groupby_dic_t1[key]['centroids']
        pairs = centroid_tracking(centroids_t0, centroids_t1)

        ids_t0 = groupby_dic_t0[key]['ids'][:]
        len_boxes_t1 = len(groupby_dic_t1[key]['boxes'])
        cnt_t0 = next_id_dic[key]
        ids_t1, cnt_t1 = tracking_id(ids_t0, pairs, len_boxes_t1, cnt_t0)
        next_id_dic[key] = cnt_t1
        groupby_dic_t1[key]['ids'] = ids_t1

    return groupby_dic_t1

##### 영역안 탐지된 객체 카운트

In [None]:
def is_in_area(centroid, count_area):
    x1, y1, x2, y2 = count_area
    x, y = centroid
    if x1 <= x <= x2 and y1 <= y <= y2: return True
    else: return False

def counter_dic_updater(counter_dic, groupby_dic, count_area):
    for key in groupby_dic:
        ids = groupby_dic[key]['ids']
        centroids = groupby_dic[key]['centroids']
        for id, centroid in zip(ids, centroids):
            if is_in_area(centroid, count_area) and not id in counter_dic[key]['set']:
                counter_dic[key]['set'].add(id)
                counter_dic[key]['cnt'] += 1

##### 가장자리에 붙어있는 박스 삭제

In [None]:
from mytool import corn2xywh

def delete_edge_boxes(img, detection):
    height, width, _ = img.shape
    new_detection = []
    
    for det in detection:
        x, y, _, _ = corn2xywh(det[None, :4])[0]
        if x < 20 or y < 20: continue
        if width-20 < x or height-20 < y: continue
        # x1, y1, x2, y2 = det[:4]
        # if (x1 < 5 or y1 < 5) and (x < 50 or y < 50): continue
        # if (width-5 < x2 or height-5 < y2) and (width-50 < x or height-50 < y): continue
        # if x1 == 0 or y1 == 0: continue
        # if width-1 == x2 or height-1 == y2: continue
        new_detection.append(det)
    return np.stack(new_detection) if new_detection else np.array([])



##### 카운트 영역과 카운트 수 그리기

In [None]:
import cv2

def draw_area(img, loc):
    height, width, _ = img.shape
    boundary1 = int(height*loc)
    boundary2 = boundary1+30
    box = [0, boundary1, width-1, boundary2] # xyxy
    img = cv2.rectangle(img, box[:2], box[2:], (0,0,255), 3)
    return img, box

def draw_count(img, counter_dic):
    height, width, _ = img.shape
    loc = [0,30]
    font = cv2.FONT_HERSHEY_SIMPLEX
    for key in counter_dic:
        cnt = counter_dic[key]['cnt']
        cv2.putText(img, f"{key} : {cnt}", loc, font, 1, (255,0,0), 2, cv2.LINE_AA)
        loc[1] += 30


In [None]:
import cv2
import numpy as np
from yolov5.detect import run
from IPython.display import clear_output
from collections import defaultdict
import time
from mytool import show_annotation

SOURCE_PATH = "./temp/traffic_30_crop.mp4"
# SOURCE_PATH = "http://192.168.0.57:8090/video"
WEIGHTS_PATH = "./yolov5m.pt"

# 프레임별 탐지 객체 박스 제네레이터
gen = run(source=SOURCE_PATH, weights=WEIGHTS_PATH, view_img=False, nosave=True, \
    classes=[2,3,5,7], conf_thres=0.35, iou_thres=0.3, agnostic_nms=True)
    # classes=[2,], conf_thres=0.5, iou_thres=0.3, agnostic_nms=True)
names = next(gen)
# print("names : ")
# print(names)
# print('#')

groupby_dic = defaultdict(lambda:{'ids':[], 'boxes':[], 'centroids':[], })
next_id_dic = defaultdict(int)
counter_dic = defaultdict(lambda:{'set':set(), 'cnt':0})

# 카운트 영상 저장
vcap = cv2.VideoCapture(SOURCE_PATH)
width  = int(vcap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(vcap.get(cv2.CAP_PROP_FRAME_HEIGHT))
vcap.release()
fourcc = cv2.VideoWriter_fourcc(*'MP4V')
out = cv2.VideoWriter('output.mp4', fourcc, 20.0, (width,height))


# try:
for img, detection in gen:
    # print(detection) # xywh, conf, id
    detection = detection.numpy()
    detection = delete_edge_boxes(img, detection)
    groupby_dic = groupby_dic_updater(next_id_dic, groupby_dic, names, detection)
    img = show_annotation(img, groupby_dic)
    img, count_area = draw_area(img, 1/2)
    counter_dic_updater(counter_dic, groupby_dic, count_area)
    draw_count(img, counter_dic)
    # clear_output()#wait=True)
    out.write(img)
    cv2.imshow('test', img)
    cv2.waitKey(1)
    # print(counter_dic)
    # print(time.time())
# except:
#     pass

out.release()
cv2.destroyAllWindows()

In [None]:
cv2.destroyAllWindows()

In [None]:
cv2.imshow('test', img)
cv2.waitKey(0)
cv2.destroyAllWindows()