In [13]:
import cv2
import numpy as np
from ultralytics import YOLO
from collections import defaultdict
from filterpy.kalman import KalmanFilter
from scipy.optimize import linear_sum_assignment

# 加载YOLOv8模型（使用YOLOv8n以提高实时性）
model = YOLO("yolov8m.pt")

# 定义车辆类别（COCO数据集中的车辆类别ID：2=car, 3=motorcycle, 5=bus, 7=truck）
vehicle_classes = [2, 3, 5, 7]

# 卡尔曼滤波器类，用于目标跟踪
class KalmanTracker:
    def __init__(self, bbox):
        self.kf = KalmanFilter(dim_x=4, dim_z=2)
        self.kf.F = np.array([[1, 0, 1, 0],
                              [0, 1, 0, 1],
                              [0, 0, 1, 0],
                              [0, 0, 0, 1]])  # 状态转移矩阵
        self.kf.H = np.array([[1, 0, 0, 0],
                              [0, 1, 0, 0]])  # 测量矩阵
        self.kf.P *= 1000.0  # 初始协方差矩阵
        self.kf.R = np.array([[1, 0],
                              [0, 1]]) * 10  # 测量噪声
        self.kf.x[:2] = bbox[:2].reshape(2, 1)  # 初始状态 [x, y, vx, vy]

    def predict(self):
        self.kf.predict()
        return self.kf.x[:2].reshape(2)

    def update(self, bbox):
        self.kf.update(bbox[:2].reshape(2, 1))

# 匈牙利算法匹配检测和跟踪目标
def hungarian_matching(tracker_boxes, detections):
    cost_matrix = np.zeros((len(tracker_boxes), len(detections)))
    for i, tracker_box in enumerate(tracker_boxes):
        for j, det in enumerate(detections):
            cost_matrix[i, j] = np.linalg.norm(tracker_box - det[:2])
    row_ind, col_ind = linear_sum_assignment(cost_matrix)
    return row_ind, col_ind

# 强光抑制：使用CLAHE（对比度受限的自适应直方图均衡化）
def apply_clahe(frame):
    lab = cv2.cvtColor(frame, cv2.COLOR_BGR2LAB)
    l, a, b = cv2.split(lab)
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    l_clahe = clahe.apply(l)
    lab_clahe = cv2.merge((l_clahe, a, b))
    return cv2.cvtColor(lab_clahe, cv2.COLOR_LAB2BGR)

# 车辆计数类
class VehicleCounter:
    def __init__(self, line_position):
        self.line_position = line_position  # 计数线的y坐标
        self.count = 0
        self.tracked_vehicles = defaultdict(lambda: None)

    def update(self, trackers):
        for track_id, tracker in trackers.items():
            y = tracker.predict()[1]
            if y > self.line_position and track_id not in self.tracked_vehicles:
                self.count += 1
                self.tracked_vehicles[track_id] = True

# 主函数
def main(video_path):
    # 打开视频流
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("无法打开视频流或文件")
        return

    # 获取视频参数
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # 定义计数线位置（例如，屏幕中间）
    line_position = 860 #height // 2

    # 初始化车辆计数器和跟踪器
    counter = VehicleCounter(line_position)
    trackers = {}
    track_id = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # 应用强光抑制
        frame = apply_clahe(frame)

        # 使用YOLOv8进行目标检测
        results = model(frame, device='cpu',verbose=False)  # 使用GPU加速
        detections = results[0].boxes.data.cpu().numpy()  # [x1, y1, x2, y2, conf, cls]

        # 过滤出车辆类别
        vehicle_detections = [det for det in detections if int(det[5]) in vehicle_classes]

        # 预测现有跟踪器位置
        tracker_boxes = []
        for tracker in trackers.values():
            pred = tracker.predict()
            tracker_boxes.append(pred)

        # 匈牙利算法匹配检测和跟踪目标
        if tracker_boxes and vehicle_detections:
            row_ind, col_ind = hungarian_matching(tracker_boxes, vehicle_detections)
            for r, c in zip(row_ind, col_ind):
                trackers[list(trackers.keys())[r]].update(vehicle_detections[c])
            unmatched_dets = set(range(len(vehicle_detections))) - set(col_ind)
        else:
            unmatched_dets = range(len(vehicle_detections))

        # 为未匹配的检测创建新跟踪器
        for i in unmatched_dets:
            trackers[track_id] = KalmanTracker(vehicle_detections[i])
            track_id += 1

        # 更新车辆计数
        counter.update(trackers)

        # 绘制检测框、计数线和计数结果
        for det in vehicle_detections:
            x1, y1, x2, y2, conf, cls = det
            cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2)
            cv2.putText(frame, f'ID: {track_id}', (int(x1), int(y1) - 10), 
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

        
        cv2.line(frame, (0, line_position), (width, line_position), (0, 0, 255), 2)
        cv2.putText(frame, f'counter: {counter.count}', (10, 30), 
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)

        # 显示处理后的帧
        cv2.imshow('高速公路车辆计数', frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    # 释放资源
    cap.release()
    cv2.destroyAllWindows()

if __name__ == "__main__":
    video_path = "highway_n.avi"  # 替换为你的高速公路视频路径
    main(video_path)