# Yolov3、卡尔曼滤波、匈牙利匹配算法实现目标跟踪

In [1]:
# -*- coding: utf-8 -*-
import colorsys
import os
import cv2
from timeit import default_timer as timer

import matplotlib
import matplotlib.pyplot as plt
import numpy as np
from keras import backend as K
from keras.models import load_model
from keras.layers import Input
from PIL import Image, ImageFont, ImageDraw

from yolo3.model import yolo_eval, yolo_body, tiny_yolo_body
from yolo3.utils import letterbox_image
from keras.utils import multi_gpu_model

from tracker import Tracker
%matplotlib inline

Using TensorFlow backend.


yolo

In [6]:
class YOLO(object):
    _defaults= {
        "model_path": '/home/wang/PycharmProjects/keras-yolo3-master/model_data/yolo.h5',
        "anchors_path": '/home/wang/PycharmProjects/keras-yolo3-master/model_data/yolo_anchors.txt',
        "classes_path": '/home/wang/PycharmProjects/keras-yolo3-master/model_data/coco_classes.txt',
        "score" : 0.4,
        "iou" : 0.35,
        "model_image_size" : (416, 416),
        "gpu_num" : 0,
        
    }
    @classmethod
    def get_defaults(cls,n):
        if n in cls._defaults:
            return cls._defaults[n]
        else:
            return "unrecognized attribute name ' "+ n +"'"
    def __init__(self,**kwargs):
        self.__dict__.update(self._defaults)
        self.__dict__.update(kwargs)
        self.class_names = self._get_class()
        self.anchors = self._get_anchors()
        self.sess = K.get_session()
        self.boxes,self.scores,self.classes = self.generate()
        
    def _get_class(self):
        classes_path = os.path.expanduser(self.classes_path)
        with open(classes_path) as f:
            class_names = f.readlines()
        class_names = [c.strip() for c in class_names]
        return class_names
    
    def _get_anchors(self):
        anchors_path = os.path.expanduser(self.anchors_path)
        with open(anchors_path) as f:
            anchors = f.readline()
        anchors = [float(x) for x in anchors.split(',')]
        return np.array(anchors).reshape(-1,2)
    
    def generate(self):
        model_path = os.path.expanduser(self.model_path)
        assert model_path.endswith('.h5'),'Keras model must be a .h5 file'
        # load model
        num_anchors = len(self.anchors)
        num_classes = len(self.class_names)
        is_tiny_version = num_anchors == 6 # default setting
        try:
            self.yolo_model = load_model(model_path,compile=False)
        except:
            self.yolo_model = tiny_yolo_body(Input(shape=(None,None,3)),num_anchors//2,num_classes) \
                if is_tiny_version else yolo_body(Input(shape=(None,None,3)),num_anchors//3,num_classes)
            self.yolo_model.load_weights(self.model_path)
        else:
            assert self.yolo_model.layers[-1].output_shape[-1] == \
                num_anchors/len(self.yolo_model.output)* (num_classes+5), \
                'Mismatch between model and anchor and class sizes'
        print('{} model,anchors,and classes loaded'.format(model_path))
        
        # Generate colors for drawing bounding boxes
        hsv_tuples = [(x/len(self.class_names),1.,1.)
                     for x in range(len(self.class_names))]
        self.colors = list(map(lambda x:colorsys.hsv_to_rgb(*x),hsv_tuples))
        self.colors = list(map(lambda x: (int(x[0]*255),int(x[1]*255),int(x[2]*255)),self.colors))
        np.random.seed(10101)  # 运行时固定颜色种子
        np.random.shuffle(self.colors) # 混合颜色使相邻类去相关
        np.random.seed(None)  # reset seed to default
        
        # Generate output tensor targets for filtered bounding
        # 生成tensor变量输出 for 滤框
        self.input_image_shape = K.placeholder(shape=(2,))
        if self.gpu_num>=2:
            self.yolo_model = multi_gpu_model(self.yolo_model,gpus=self.gpu_num)
        boxes,scores,classes = yolo_eval(self.yolo_model.output,self.anchors,len(self.class_names),self.input_image_shape,score_threshold=self.score,iou_threshold=self.iou)
        return boxes,scores,classes
    
    def detect_image(self, image):
        start = timer()
        centers = []
        if self.model_image_size != (None, None):
            assert self.model_image_size[0]%32 == 0, 'Multiples of 32 required'
            assert self.model_image_size[1]%32 == 0, 'Multiples of 32 required'
            boxed_image = letterbox_image(image, tuple(reversed(self.model_image_size)))
        else:
            new_image_size = (image.width - (image.width % 32),
                              image.height - (image.height % 32))
            boxed_image = letterbox_image(image, new_image_size)
        image_data = np.array(boxed_image, dtype='float32')

        # print(image_data.shape)
        image_data /= 255.
        image_data = np.expand_dims(image_data, 0)  # 添加批量处理

        out_boxes, out_scores, out_classes = self.sess.run(
            [self.boxes, self.scores, self.classes],
            feed_dict={
                self.yolo_model.input: image_data,
                self.input_image_shape: [image.size[1], image.size[0]],
                K.learning_phase(): 0
            })
        # 打印边框信息
        print('Found {} boxes for {}'.format(len(out_boxes), 'img'))

        font = ImageFont.truetype(font='Course_4/font/FiraMono-Medium.otf',
                    size=np.floor(3e-2 * image.size[1] + 0.5).astype('int32'))
        thickness = (image.size[0] + image.size[1]) // 300
        
        
        number = len(out_boxes)
        for i, c in reversed(list(enumerate(out_classes))):
            predicted_class = self.class_names[c]
            box = out_boxes[i]
            score = out_scores[i]

            # label = '{} {:.2f}'.format(predicted_class, score)
            label = '{}'.format(predicted_class)
            draw = ImageDraw.Draw(image)
            label_size = draw.textsize(label, font)
            if c in [2,8]:
                top, left, bottom, right = box
                center=(int ((left+right)//2),int((top+bottom)//2))
                b=np.array([[(left+right)//2],[(top+bottom)//2]])
                centers.append(b)

                top = max(0, np.floor(top + 0.5).astype('int32'))
                left = max(0, np.floor(left + 0.5).astype('int32'))
                bottom = min(image.size[1], np.floor(bottom + 0.5).astype('int32'))
                right = min(image.size[0], np.floor(right + 0.5).astype('int32'))
                # 打印边框信息
                # print(label, (left, top), (right, bottom))

                if top - label_size[1] >= 0:
                    text_origin = np.array([left, top - label_size[1]])
                else:
                    text_origin = np.array([left, top + 1])

                # My kingdom for a good redistributable image drawing library.
                for i in range(thickness):
                    draw.rectangle(
                        [left + i, top + i, right - i, bottom - i],
                        outline=self.colors[c])
                draw.rectangle(
                    [tuple(text_origin), tuple(text_origin + label_size)],
                    fill=self.colors[c])
                draw.text(text_origin, label, fill=(0, 0, 0), font=font)
                del draw

        end = timer()
        print("耗时：{} s".format(end - start))
        return image,centers,number

    def close_session(self):
        self.sess.close()

    

In [7]:
yolov3=YOLO()
print(yolov3.class_names)
print(yolov3.anchors)

In [8]:
dir='Course_4/images/'
out_dir='out/'
plt.figure(figsize=(20, 15))
j=1
number = 0
for i in range(1111,1127):
     #计算需要在前面填充几个0
    num_fill = int( len("0000") - len(str(1))) + 1
    #对索引进行填充
    filename = str(i).zfill(num_fill) + ".jpg"
    print("当前文件：" + str(filename),end=" ")
    file=os.path.join(dir,filename)
    image=Image.open(file)
    #开始绘制
    r_image,centers,number = yolov3.detect_image(image)
    plt.subplot(4, 4, j) 
    j += 1
    plt.imshow(r_image)
    out_file=os.path.join(out_dir,filename)
    image.save(out_file, quality=90)


print("绘制完成！")

In [9]:
image='Course_4/images/1116.jpg'
image=Image.open(image)
r_image,centers,number=yolov3.detect_image(image)
plt.figure(figsize=(10, 10))
plt.imshow(r_image)

In [10]:
def detect_video(yolo, video_path, output_path=""):
    # video_path = "http://admin:admin@192.168.1.108:8081/"
    vid = cv2.VideoCapture(video_path)
    if not vid.isOpened():
        raise IOError("Couldn't open webcam or video")
    video_FourCC    = int(vid.get(cv2.CAP_PROP_FOURCC))   #表示codec的四个字符
    video_fps       = vid.get(cv2.CAP_PROP_FPS)   #帧率
    video_size      = (int(vid.get(cv2.CAP_PROP_FRAME_WIDTH)),    #视频流中的帧宽度
                        int(vid.get(cv2.CAP_PROP_FRAME_HEIGHT)))
    isOutput = True if output_path != "" else False
    if isOutput:
        print("!!! TYPE:", type(output_path), type(video_FourCC), type(video_fps), type(video_size))
        # 直接赋值Fourcc = 1734701162时可创建新文件 
        out = cv2.VideoWriter(output_path,1734701162, video_fps, video_size)
        # out = cv2.VideoWriter(output_path, video_FourCC, video_fps, video_size)
    accum_time = 0
    curr_fps = 0
    fps = "FPS: ??"
    prev_time = timer()
    
    # 创建跟踪器对象
    tracker = Tracker(160,20,20,100)
    # 变量初始化
    skip_frame_count = 0
    track_colors = [(255, 0, 0), (0, 255, 0), (0, 0, 255), (255, 255, 0),
                    (0, 255, 255), (255, 0, 255), (255, 127, 255),
                    (127, 0, 255), (127, 0, 127)]
    pause = False
    
    while True:
        return_value, frame = vid.read()
        if return_value != True:
            break
        image = Image.fromarray(frame)
        image,centers,number = yolo.detect_image(image)
        result = np.asarray(image)
        curr_time = timer()
        exec_time = curr_time - prev_time
        prev_time = curr_time
        accum_time = accum_time + exec_time
        curr_fps = curr_fps + 1
        if accum_time > 1:
            accum_time = accum_time - 1
            fps = "FPS: " + str(curr_fps)
            curr_fps = 0
        cv2.putText(result, text=fps, org=(3, 15), fontFace=cv2.FONT_HERSHEY_SIMPLEX,
                    fontScale=0.50, color=(255, 0, 0), thickness=2)
        # 显示跟踪数量
        #font = cv2.FONT_HERSHEY_SIMPLEX
        #cv2.putText(result, str(number), (1000,  40), font, 1, (0, 0, 255), 5)
        track = "tracks:"+str(number)
        cv2.putText(result,text=track , org=(3, 40), fontFace=cv2.FONT_HERSHEY_SIMPLEX,
                    fontScale=0.50, color=(0, 0, 255), thickness=2)
        if (len(centers) > 0):

            # Track object using Kalman Filter
            tracker.Update(centers)

            # For identified object tracks draw tracking line
            # Use various colors to indicate different track_id
            for i in range(len(tracker.tracks)):
                if (len(tracker.tracks[i].trace) > 1):
                    for j in range(len(tracker.tracks[i].trace) - 1):
                        if j>2:
                            # Draw trace line
                            x1 = tracker.tracks[i].trace[j][0][0]
                            y1 = tracker.tracks[i].trace[j][1][0]
                            x2 = tracker.tracks[i].trace[j + 1][0][0]
                            y2 = tracker.tracks[i].trace[j + 1][1][0]
                            print(x1)
                            clr = tracker.tracks[i].track_id % 9
                            cv2.line(result, (int(x1), int(y1)), (int(x2), int(y2)),
                                     track_colors[clr], 4)
            #cv2.imshow('Tracking', result)
        cv2.namedWindow("result", cv2.WINDOW_NORMAL)
        cv2.imshow("result", result)
        if isOutput:
            out.write(result)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break


    # yolo.close_session()
    # return 0

In [11]:
video_path = '/home/wang/PycharmProjects/keras-yolo3-master/video/tes9.mov'
out_path = 'out/test09.avi'

In [12]:
detect_video(yolov3,video_path,out_path)

In [10]:
len([])

0