# 车辆检测Demo

直接调用[yolov5](https://github.com/ultralytics/yolov5)训练好的模型，该算法运用很简单，以下代码仅仅做一个测试，后续的话训练可以另一份代码，然后保存模型，这个代码可以调用相应的模型

In [1]:
# import modules
import sys
import torch
import numpy as np
import cv2
import time
# from yolov5.utils.general import non_max_suppression
from deep_sort_pytorch.utils.parser import get_config
from deep_sort_pytorch.deep_sort import DeepSort

In [2]:
def non_max_suppression(prediction, conf_thres=0.25, iou_thres=0.45, classes=None, agnostic=False, multi_label=False,
                        labels=()):
    """Runs Non-Maximum Suppression (NMS) on inference results

    Returns:
         list of detections, on (n,6) tensor per image [xyxy, conf, cls]
    """

    nc = prediction.shape[2] - 5  # number of classes
    xc = prediction[..., 4] > conf_thres  # candidates

    # Settings
    min_wh, max_wh = 2, 4096  # (pixels) minimum and maximum box width and height
    max_det = 300  # maximum number of detections per image
    max_nms = 30000  # maximum number of boxes into torchvision.ops.nms()
    time_limit = 10.0  # seconds to quit after
    redundant = True  # require redundant detections
    multi_label &= nc > 1  # multiple labels per box (adds 0.5ms/img)
    merge = False  # use merge-NMS

    t = time.time()
    output = [torch.zeros((0, 6), device=prediction.device)] * prediction.shape[0]
    for xi, x in enumerate(prediction):  # image index, image inference
        # Apply constraints
        # x[((x[..., 2:4] < min_wh) | (x[..., 2:4] > max_wh)).any(1), 4] = 0  # width-height
        x = x[xc[xi]]  # confidence

        # Cat apriori labels if autolabelling
        if labels and len(labels[xi]):
            l = labels[xi]
            v = torch.zeros((len(l), nc + 5), device=x.device)
            v[:, :4] = l[:, 1:5]  # box
            v[:, 4] = 1.0  # conf
            v[range(len(l)), l[:, 0].long() + 5] = 1.0  # cls
            x = torch.cat((x, v), 0)

        # If none remain process next image
        if not x.shape[0]:
            continue

        # Compute conf
        x[:, 5:] *= x[:, 4:5]  # conf = obj_conf * cls_conf

        # Box (center x, center y, width, height) to (x1, y1, x2, y2)
        box = xywh2xyxy(x[:, :4])

        # Detections matrix nx6 (xyxy, conf, cls)
        if multi_label:
            i, j = (x[:, 5:] > conf_thres).nonzero(as_tuple=False).T
            x = torch.cat((box[i], x[i, j + 5, None], j[:, None].float()), 1)
        else:  # best class only
            conf, j = x[:, 5:].max(1, keepdim=True)
            x = torch.cat((box, conf, j.float()), 1)[conf.view(-1) > conf_thres]

        # Filter by class
        if classes is not None:
            x = x[(x[:, 5:6] == torch.tensor(classes, device=x.device)).any(1)]

        # Apply finite constraint
        # if not torch.isfinite(x).all():
        #     x = x[torch.isfinite(x).all(1)]

        # Check shape
        n = x.shape[0]  # number of boxes
        if not n:  # no boxes
            continue
        elif n > max_nms:  # excess boxes
            x = x[x[:, 4].argsort(descending=True)[:max_nms]]  # sort by confidence

        # Batched NMS
        c = x[:, 5:6] * (0 if agnostic else max_wh)  # classes
        boxes, scores = x[:, :4] + c, x[:, 4]  # boxes (offset by class), scores
        i = torchvision.ops.nms(boxes, scores, iou_thres)  # NMS
        if i.shape[0] > max_det:  # limit detections
            i = i[:max_det]
        if merge and (1 < n < 3E3):  # Merge NMS (boxes merged using weighted mean)
            # update boxes as boxes(i,4) = weights(i,n) * boxes(n,4)
            iou = box_iou(boxes[i], boxes) > iou_thres  # iou matrix
            weights = iou * scores[None]  # box weights
            x[i, :4] = torch.mm(weights, x[:, :4]).float() / weights.sum(1, keepdim=True)  # merged boxes
            if redundant:
                i = i[iou.sum(1) > 1]  # require redundancy

        output[xi] = x[i]
        if (time.time() - t) > time_limit:
            print(f'WARNING: NMS time limit {time_limit}s exceeded')
            break  # time limit exceeded

    return output

## 初始化模型

利用`PyTorch Hub`加载 **YOLOv5** 模型，为了方便运行，将`yolov5s.pt` 拷贝到该代码的同一级目录下，方便直接调用，省去网络加载的时间，我这里的环境如下：

In [3]:
torch.__version__

'1.8.0+cu111'

In [4]:
# linux下查看cuda版本
# !nvcc -V

In [5]:
# model load
model = torch.hub.load('ultralytics/yolov5', 'yolov5s')

Using cache found in /home/madao/.cache/torch/hub/ultralytics_yolov5_master

                 from  n    params  module                                  arguments                     
  0                -1  1      3520  models.common.Focus                     [3, 32, 3]                    
  1                -1  1     18560  models.common.Conv                      [32, 64, 3, 2]                
  2                -1  1     18816  models.common.C3                        [64, 64, 1]                   
  3                -1  1     73984  models.common.Conv                      [64, 128, 3, 2]               
  4                -1  1    156928  models.common.C3                        [128, 128, 3]                 
  5                -1  1    295424  models.common.Conv                      [128, 256, 3, 2]              
  6                -1  1    625152  models.common.C3                        [256, 256, 3]                 
  7                -1  1   1180672  models.common.Conv             

## yolov5预测

In [6]:
img_path = "../datasets/objectDetection/testing_images/"
imgfiles = os.listdir(img_path)
len(imgfiles)

175

In [7]:
imgs = [cv2.imread(img_path + imgfile)[:, :, ::-1] for imgfile in imgfiles]
len(imgs)

175

In [8]:
# random测试
chose = np.random.randint(len(imgs))
result = model(imgs[chose], size=640)
# print(result.xyxy)
# result.show()

In [9]:
results = model(imgs[1:20], size=640)

In [10]:
names = results.names
# results.xyxy

In [11]:
# 单个图片测试
# boxes = results.xyxy[0].cpu().numpy()
# img_ = imgs[1].copy()
# cv2.rectangle(img_, (boxes[0, 0], boxes[0, 1]), (boxes[0, 2], boxes[0, 3]), (0, 255, 0), 2)
# cv2.imshow('', img_)
# cv2.waitKey(0)
# cv2.destroyAllWindows()

## 检测视频测试

In [12]:
video_path = '../datasets/video/cars2.mp4' # 视频路径
video = cv2.VideoCapture(video_path)

In [13]:
def addBox(img, boxes):
    for box in boxes:
        if int(box[5]) == 2:
            cv2.rectangle(img, (box[0], box[1]), (box[2], box[3]), (0, 255, 0), 2)

In [14]:
# 目标检测车辆测试
# while True:
#     start_time = time.time()
#     rc, image = video.read()
#     if image is None:
#         cv2.destroyAllWindows()
#         break
#     resultImage = image
#     boxes = model(resultImage).xyxy[0].cpu().numpy()
#     addBox(resultImage, boxes)
#     cv2.imshow('result', resultImage)
    
#     # 按q退出
#     if cv2.waitKey(1) & 0xFF == ord('q'):
#         cv2.destroyAllWindows()
#         break

In [15]:
def bbox_rel(xyxy):
    """" Calculates the relative bounding box from absolute pixel values. """
    bbox_left = min([xyxy[0], xyxy[2]])
    bbox_top = min([xyxy[1], xyxy[3]])
    bbox_w = abs(xyxy[0] - xyxy[2])
    bbox_h = abs(xyxy[1] - xyxy[3])
    x_c = (bbox_left + bbox_w / 2)
    y_c = (bbox_top + bbox_h / 2)
    w = bbox_w
    h = bbox_h
    return x_c, y_c, w, h

In [16]:
palette = (2 ** 11 - 1, 2 ** 15 - 1, 2 ** 20 - 1)
def compute_color_for_labels(label):
    """
    Simple function that adds fixed color depending on the class
    """
    color = [int((p * (label ** 2 - label + 1)) % 255) for p in palette]
    return tuple(color)

In [17]:
def checkTraffic(speed=None, vthresh = 10):
    """
    根据速度字典检查是否拥堵，返回为True则表示拥堵，否则为正常
    """
    if speed != None:
        # 获取有速度的车辆数目
        carnum = len(speed)
        lowcars = 0
        for k, v in speed.items():
            if v < vthresh:
                lowcars += 1
        if lowcars > carnum * 0.5:
            return False
        else:
            return True
    return False

In [18]:
def draw_boxes(img, bbox, identities=None, speed=None, offset=(0, 0), line=(0.7, 0.8), framecounter=25, status=False):
    speedval = 0
    height, width, _cha = img.shape
    for i, box in enumerate(bbox):
        x1, y1, x2, y2 = [int(i) for i in box]
        x1 += offset[0]
        x2 += offset[0]
        y1 += offset[1]
        y2 += offset[1]
        center_y = int((y1+y2)/2)
        center_x = int((x1+x2)/2)
        # box text and bar
        id = int(identities[i]) if identities is not None else 0
        color = compute_color_for_labels(id)
        label = '{}{:d}'.format("", id)
        if speed != None and id in speed:
            label += "-v-" + str(speed[id])
        t_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_PLAIN, 1, 1)[0]
        cv2.rectangle(img, (x1, y1), (x2, y2), color, 3)
        cv2.rectangle(
            img, (x1, y1), (x1 + t_size[0] + 3, y1 + t_size[1] + 4), color, -1)
        cv2.putText(img, label, (x1, y1 +
                                 t_size[1] + 4), cv2.FONT_HERSHEY_PLAIN, 1, [255, 255, 255], 1)

        cv2.line(img, (0, int(line[0]*height)), (width, int(line[0] * height)), [255, 0, 0], 1)
        cv2.line(img, (0, int(line[1]*height)), (width, int(line[1] * height)), [255, 0, 0], 1)
        if center_y < line[1] * height and center_y > line[0] * height:
            cv2.drawMarker(img, (center_x, center_y), [0, 255, 255], markerType=3)
    cv2.putText(img, "f:" + str(framecounter), (20, 20), cv2.FONT_HERSHEY_PLAIN, 1, [0, 255, 0], 2)
    cv2.putText(img, 'traffic:' + str(status), (20, 40), cv2.FONT_HERSHEY_PLAIN, 1, [0, 255, 0], 2)
    return img

In [19]:
def getdict(outputs):
    centerdict = dict()
    for i in range(len(outputs)):
        x = outputs[i, 0] + outputs[i, 1]
        y = outputs[i, 2] + outputs[i, 3]
        id = outputs[i, -1]
        centerdict[id] = np.array([x/2, y/2])
    return centerdict

In [20]:
def getspeed(preoutputs, outputs, framecounter=10, fps=30, offset=1, height=1080, width=1920):
    pre = getdict(preoutputs)
    now = getdict(outputs)
    # print('dict', pre)
    # print('dict', now)
    speed = dict()
    for id, center in now.items():
        # 计算上次检测两者之间方框中心的欧氏距离
        if id in pre:
            speed[id] = (pre[id] - now[id])**2
            speed[id] = np.around(np.sqrt(speed[id].sum())/framecounter*fps*offset/ center[1]**2 *height, decimals=1)
        else:
            speed[id] = np.around(0, decimals=1)
    return speed

In [21]:
def updateSpeed(prespeed, speed):
    res = dict()
    for id, v in speed.items():
        if id in prespeed:
            res[id] = np.around((prespeed[id] + speed[id])/2, decimals=1)
        else:
            res[id] = speed[id]
    return res

In [22]:
framecounter = 0
speed = dict()
fscount = dict()
ages = dict()
offset = 10000
high = 0.6
low = 0.5
coef = 20
vthresh = 20
status = False
# 读取的视频路径
video_path = '../datasets/video/cars3.mp4'
video = cv2.VideoCapture(video_path)
fps = video.get(5)
# 初始化deepsort
cfg = get_config()
cfg.merge_from_file("deep_sort_pytorch/configs/deep_sort.yaml")
deepsort = DeepSort(cfg.DEEPSORT.REID_CKPT,
                    max_dist=cfg.DEEPSORT.MAX_DIST, min_confidence=cfg.DEEPSORT.MIN_CONFIDENCE,
                    nms_max_overlap=cfg.DEEPSORT.NMS_MAX_OVERLAP, max_iou_distance=cfg.DEEPSORT.MAX_IOU_DISTANCE,
                    max_age=cfg.DEEPSORT.MAX_AGE, n_init=cfg.DEEPSORT.N_INIT, nn_budget=cfg.DEEPSORT.NN_BUDGET,
                    use_cuda=True)
while True:
    # 读取视频
    rc, image = video.read()
    framecounter += 1   # 帧数计数加一
    
    if image is None:
        cv2.destroyAllWindows()
        break
    # 按q退出
    if cv2.waitKey(1) & 0xFF == ord('q'):
        cv2.destroyAllWindows()
        break
    height, width, _channel = image.shape
    resimg = image.copy()
    res = model(resimg)
    # res.show()
    pred = res.xyxy
    # print(pred)
    for i, det in enumerate(pred):
        # print(i, ":", det)
        if det is not None and len(det):
            bbox_xywh = []
            confs = []
            for i in range(len(det)):
                xyxy = det[i,0:4]
                conf = det[i,4]
                label = det[i, -1]
                # 检测到车辆
                if int(label)==2:
                    x_c, y_c, bbox_w, bbox_h = bbox_rel(xyxy)
                    obj = [x_c, y_c, bbox_w, bbox_h]
                    bbox_xywh.append(obj)
                    confs.append([conf])

            xywhs = torch.Tensor(bbox_xywh)
            confss = torch.Tensor(confs)
            outputs = deepsort.update(xywhs, confss, resimg)

            if len(outputs) > 0:
                bbox_xywh = outputs[:, :4]
                indentities = outputs[:, -1]
                
                for i, id in enumerate(indentities):
                    x1, y1, x2, y2 = [int(i) for i in bbox_xywh[i]]
                    x1 += 0
                    x2 += 0
                    y1 += 0
                    y2 += 0
                    center_y = int((y1+y2)/2)
                    center_x = int((x1+x2)/2)
                    # print(center_x, center_y)
                    if int(center_y) < int(high*height) and int(center_y) > int(low*height):
                        
                        if id not in fscount:
                            # print('first checked:', id, int(center_y), '(', int(low*height), ', ', int(high*height), ')', framecounter)
                            fscount[id] = framecounter
                        
                    else:
                        if id in fscount:
                            if id not in ages:
                                ages[id] = framecounter
                            else:
                                if framecounter - ages[id] < 5:
                                    pass
                                else:
                                    speed[id] = int((high-low)*height/(framecounter - fscount[id] - 5) * coef)
                                    status = checkTraffic(speed, vthresh)
                                    # print('second checked:', id, int(center_y), '(', int(low*height), ', ', int(high*height), ')', 'speed:', speed[id], framecounter)
                                    fscount.pop(id)
                                    ages.pop(id)
                        
                    
                draw_boxes(resimg, bbox_xywh, indentities, speed, line=(low, high), framecounter=framecounter, status=status)
        else:
            deepsort.increment_ages()
    # time.sleep(3)
    cv2.imshow('', resimg)

  self.update(yaml.load(fo.read()))
Loading weights from /home/madao/Documents/partjob/B10078/codes/deep_sort_pytorch/deep_sort/deep/checkpoint/ckpt.t7... Done!
