### 实现vibe算法：视频流实时场景抑制+动态目标检测

In [None]:
import cv2
import numpy as np
from numba import jit, prange

class Vibe:
    def __init__(self, N=20, minMatches=2, threshold=30, updateFactor=0.05):
        self.N = N
        self.minMatches = minMatches
        self.threshold = threshold
        self.updateFactor = updateFactor

    def initialize(self, first_frame):
        height, width, channels = first_frame.shape
        self.background_model = np.zeros((height, width, self.N, channels), dtype=np.uint8)
        self.height = height
        self.width = width
        self.channels = channels
        # 预计算邻域列表
        self.neighbors_list = np.empty((height, width, 9, 2), dtype=np.int32)
        for i in range(height):
            for j in range(width):
                k = 0
                for di in [-1, 0, 1]:
                    for dj in [-1, 0, 1]:
                        ni, nj = i + di, j + dj
                        if 0 <= ni < height and 0 <= nj < width:
                            self.neighbors_list[i,j,k,0] = ni
                            self.neighbors_list[i,j,k,1] = nj
                            k += 1
                for m in range(k, 9):
                    self.neighbors_list[i,j,m,0] = -1
                    self.neighbors_list[i,j,m,1] = -1
        # 初始化背景模型
        for i in range(height):
            for j in range(width):
                valid_neighbors = self.neighbors_list[i,j]
                valid_pixels = first_frame[valid_neighbors[:,0], valid_neighbors[:,1], :]
                valid_count = np.count_nonzero(valid_neighbors[:,0] != -1)
                indices = np.random.choice(valid_count, size=self.N, replace=True)
                self.background_model[i,j] = valid_pixels[indices]

    def process_frame(self, frame):
        height, width, channels = frame.shape
        assert height == self.height and width == self.width and channels == self.channels
        # 计算距离
        current_frame_expanded = np.expand_dims(frame, axis=2)
        distance = np.sum(np.abs(current_frame_expanded - self.background_model), axis=3)
        # 计数匹配
        matches = (distance < self.threshold).sum(axis=2)
        # 分类前景
        foreground_mask = (matches < self.minMatches).astype(np.uint8) * 255
        # 更新背景模型
        background_pixels = (matches >= self.minMatches)
        rand_update_own = np.random.uniform(size=(height, width)) < self.updateFactor
        pixels_to_update_own = background_pixels & rand_update_own
        rand_update_neighbor = np.random.uniform(size=(height, width)) < self.updateFactor
        pixels_to_update_neighbor = background_pixels & rand_update_neighbor
        update_background(self.background_model, frame, pixels_to_update_own, pixels_to_update_neighbor, self.neighbors_list, self.N)
        return foreground_mask

@jit(nopython=True, parallel=True)
def update_background(background_model, current_frame, pixels_to_update_own, pixels_to_update_neighbor, neighbors_list, N):
    height, width, _, channels = background_model.shape
    # 更新自身样本
    for i in prange(height):
        for j in range(width):
            if pixels_to_update_own[i,j]:
                sample_index = np.random.randint(0, N)
                background_model[i,j,sample_index,:] = current_frame[i,j,:]
    # 更新邻域样本
    for i in prange(height):
        for j in range(width):
            if pixels_to_update_neighbor[i,j]:
                # 选择随机邻域索引（0到8）
                neighbor_index = np.random.randint(0, 9)
                ni, nj = neighbors_list[i,j,neighbor_index,0], neighbors_list[i,j,neighbor_index,1]
                # 检查邻域是否有效
                if ni != -1:
                    sample_index = np.random.randint(0, N)
                    background_model[ni,nj,sample_index,:] = current_frame[i,j,:]

if __name__ == '__main__':
    # 定义视频文件路径
    video_path = 'path_to_your_video_file.mp4'

    # 读取视频
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("无法打开视频文件")
        exit()

    # 读取第一帧
    ret, first_frame = cap.read()
    if not ret:
        print("无法读取第一帧")
        exit()

    # 创建Vibe实例并初始化
    vibe = Vibe()
    vibe.initialize(first_frame)

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # 处理帧
        foreground_mask = vibe.process_frame(frame)

        # 显示前景掩码
        cv2.imshow('Foreground Mask', foreground_mask)

        # 显示原始帧与前景标记
        output_frame = frame.copy()
        output_frame[foreground_mask == 255] = [0, 0, 255]  # 前景用蓝色标记
        cv2.imshow('Output', output_frame)

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()