In [None]:
import cv2
import numpy as np
from scipy.special import softmax
# from scipy.special import expit as sigmoid
from hobot_dnn import pyeasy_dnn as dnn  # BSP Python API
import time
import argparse
import logging 



In [4]:
def log_info(content, log_file="log.txt"):
    """
    记录并打印日志信息
    :param content: 日志内容
    :param log_file: 日志文件路径，默认为当前目录下的 log.txt
    """
    # 获取当前时间
    current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    
    # 格式化日志信息
    log_message = f"[{current_time}] {content}"
    
    # 打印日志到控制台
    print(log_message)
    
    # 将日志写入文件
    # with open(log_file, "a", encoding="utf-8") as file:
    #     file.write(log_message + "\n")


In [2]:
def preprocess(frame):
    """
    预处理图像：letterbox 缩放 + padding
    返回 input_tensor, 缩放比例, padding 偏移
    """
    orig_h, orig_w = frame.shape[:2]
    target_w, target_h = 640,640
    scale = min(target_w / orig_w, target_h / orig_h)

    new_w = int(orig_w * scale)
    new_h = int(orig_h * scale)
    resized = cv2.resize(frame, (new_w, new_h))

    pad_w = target_w - new_w
    pad_h = target_h - new_h
    top = pad_h // 2
    bottom = pad_h - top
    left = pad_w // 2
    right = pad_w - left
    padded = cv2.copyMakeBorder(resized, top, bottom, left, right, cv2.BORDER_CONSTANT, value=(114, 114, 114))
    return padded

In [5]:
frame=cv2.imread("/home/sunrise/Project20250627/demo.jpg")
frame=preprocess(frame)

In [10]:
class BaseModel:
    def __init__(
        self,
        model_file: str
        ) -> None:
        # 加载BPU的bin模型, 打印相关参数
        # Load the quantized *.bin model and print its parameters
        try:
            begin_time = time.time()
            self.quantize_model = dnn.load(model_file)
            logger.debug("\033[1;31m" + "Load D-Robotics Quantize model time = %.2f ms"%(1000*(time.time() - begin_time)) + "\033[0m")
        except Exception as e:
            logger.error("❌ Failed to load model file: %s"%(model_file))
            logger.error("You can download the model file from the following docs: ./models/download.md") 
            logger.error(e)
            exit(1)

        logger.info("\033[1;32m" + "-> input tensors" + "\033[0m")
        for i, quantize_input in enumerate(self.quantize_model[0].inputs):
            logger.info(f"intput[{i}], name={quantize_input.name}, type={quantize_input.properties.dtype}, shape={quantize_input.properties.shape}")

        logger.info("\033[1;32m" + "-> output tensors" + "\033[0m")
        for i, quantize_input in enumerate(self.quantize_model[0].outputs):
            logger.info(f"output[{i}], name={quantize_input.name}, type={quantize_input.properties.dtype}, shape={quantize_input.properties.shape}")

        self.model_input_height, self.model_input_weight = self.quantize_model[0].inputs[0].properties.shape[2:4]

    def resizer(self, img: np.ndarray)->np.ndarray:
        img_h, img_w = img.shape[0:2]
        self.y_scale, self.x_scale = img_h/self.model_input_height, img_w/self.model_input_weight
        return cv2.resize(img, (self.model_input_height, self.model_input_weight), interpolation=cv2.INTER_NEAREST) # 利用resize重新开辟内存
    
    def preprocess(self, img: np.ndarray)->np.array:
        """
        Preprocesses an input image to prepare it for model inference.

        Args:
            img (np.ndarray): The input image in BGR format as a NumPy array.

        Returns:
            np.array: The preprocessed image tensor in NCHW format ready for model input.

        Procedure:
            1. Resizes the image to a specified dimension (`input_image_size`) using nearest neighbor interpolation.
            2. Converts the image color space from BGR to RGB.
            3. Transposes the dimensions of the image tensor to channel-first order (CHW).
            4. Adds a batch dimension, thus conforming to the NCHW format expected by many models.
            Note: Normalization to [0, 1] is assumed to be handled elsewhere based on configuration.
        """
        begin_time = time.time()

        input_tensor = self.resizer(img)
        input_tensor = cv2.cvtColor(input_tensor, cv2.COLOR_BGR2RGB)
        # input_tensor = np.array(input_tensor) / 255.0  # yaml文件中已经配置前处理
        input_tensor = np.transpose(input_tensor, (2, 0, 1))
        input_tensor = np.expand_dims(input_tensor, axis=0).astype(np.uint8)  # NCHW

        logger.debug("\033[1;31m" + f"pre process time = {1000*(time.time() - begin_time):.2f} ms" + "\033[0m")
        return input_tensor

    def bgr2nv12(self, bgr_img: np.ndarray) -> np.ndarray:
        """
        Convert a BGR image to the NV12 format.

        NV12 is a common video encoding format where the Y component (luminance) is full resolution,
        and the UV components (chrominance) are half-resolution and interleaved. This function first
        converts the BGR image to YUV 4:2:0 planar format, then rearranges the UV components to fit
        the NV12 format.

        Parameters:
        bgr_img (np.ndarray): The input BGR image array.

        Returns:
        np.ndarray: The converted NV12 format image array.
        """
        begin_time = time.time()
        bgr_img = self.resizer(bgr_img)
        height, width = bgr_img.shape[0], bgr_img.shape[1]
        area = height * width
        yuv420p = cv2.cvtColor(bgr_img, cv2.COLOR_BGR2YUV_I420).reshape((area * 3 // 2,))
        y = yuv420p[:area]
        uv_planar = yuv420p[area:].reshape((2, area // 4))
        uv_packed = uv_planar.transpose((1, 0)).reshape((area // 2,))
        nv12 = np.zeros_like(yuv420p)
        nv12[:height * width] = y
        nv12[height * width:] = uv_packed

        logger.debug("\033[1;31m" + f"bgr8 to nv12 time = {1000*(time.time() - begin_time):.2f} ms" + "\033[0m")
        return nv12


    def forward(self, input_tensor: np.array) -> list[dnn.pyDNNTensor]:
        begin_time = time.time()
        quantize_outputs = self.quantize_model[0].forward(input_tensor)
        logger.debug("\033[1;31m" + f"forward time = {1000*(time.time() - begin_time):.2f} ms" + "\033[0m")
        return quantize_outputs


    def c2numpy(self, outputs) -> list[np.array]:
        begin_time = time.time()
        outputs = [dnnTensor.buffer for dnnTensor in outputs]
        logger.debug("\033[1;31m" + f"c to numpy time = {1000*(time.time() - begin_time):.2f} ms" + "\033[0m")
        return outputs

In [11]:
class YOLO11_Detect(BaseModel):
    def __init__(self, 
                model_file: str, 
                conf: float, 
                iou: float
                ):
        super().__init__(model_file)
        # 将反量化系数准备好, 只需要准备一次
        # prepare the quantize scale, just need to generate once
        self.s_bboxes_scale = self.quantize_model[0].outputs[0].properties.scale_data[np.newaxis, :]
        self.m_bboxes_scale = self.quantize_model[0].outputs[1].properties.scale_data[np.newaxis, :]
        self.l_bboxes_scale = self.quantize_model[0].outputs[2].properties.scale_data[np.newaxis, :]
        logger.info(f"{self.s_bboxes_scale.shape=}, {self.m_bboxes_scale.shape=}, {self.l_bboxes_scale.shape=}")

        # DFL求期望的系数, 只需要生成一次
        # DFL calculates the expected coefficients, which only needs to be generated once.
        self.weights_static = np.array([i for i in range(16)]).astype(np.float32)[np.newaxis, np.newaxis, :]
        logger.info(f"{self.weights_static.shape = }")

        # anchors, 只需要生成一次
        self.s_anchor = np.stack([np.tile(np.linspace(0.5, 79.5, 80), reps=80), 
                            np.repeat(np.arange(0.5, 80.5, 1), 80)], axis=0).transpose(1,0)
        self.m_anchor = np.stack([np.tile(np.linspace(0.5, 39.5, 40), reps=40), 
                            np.repeat(np.arange(0.5, 40.5, 1), 40)], axis=0).transpose(1,0)
        self.l_anchor = np.stack([np.tile(np.linspace(0.5, 19.5, 20), reps=20), 
                            np.repeat(np.arange(0.5, 20.5, 1), 20)], axis=0).transpose(1,0)
        logger.info(f"{self.s_anchor.shape = }, {self.m_anchor.shape = }, {self.l_anchor.shape = }")

        # 输入图像大小, 一些阈值, 提前计算好
        self.input_image_size = 640
        self.conf = conf
        self.iou = iou
        self.conf_inverse = -np.log(1/conf - 1)
        logger.info("iou threshol = %.2f, conf threshol = %.2f"%(iou, conf))
        logger.info("sigmoid_inverse threshol = %.2f"%self.conf_inverse)
    

    def postProcess(self, outputs: list[np.ndarray]) -> tuple[list]:
        begin_time = time.time()
        # reshape
        s_bboxes = outputs[0].reshape(-1, 64)
        m_bboxes = outputs[1].reshape(-1, 64)
        l_bboxes = outputs[2].reshape(-1, 64)
        s_clses = outputs[3].reshape(-1, 80)
        m_clses = outputs[4].reshape(-1, 80)
        l_clses = outputs[5].reshape(-1, 80)

        # classify: 利用numpy向量化操作完成阈值筛选(优化版 2.0)
        s_max_scores = np.max(s_clses, axis=1)
        s_valid_indices = np.flatnonzero(s_max_scores >= self.conf_inverse)  # 得到大于阈值分数的索引，此时为小数字
        s_ids = np.argmax(s_clses[s_valid_indices, : ], axis=1)
        s_scores = s_max_scores[s_valid_indices]

        m_max_scores = np.max(m_clses, axis=1)
        m_valid_indices = np.flatnonzero(m_max_scores >= self.conf_inverse)  # 得到大于阈值分数的索引，此时为小数字
        m_ids = np.argmax(m_clses[m_valid_indices, : ], axis=1)
        m_scores = m_max_scores[m_valid_indices]

        l_max_scores = np.max(l_clses, axis=1)
        l_valid_indices = np.flatnonzero(l_max_scores >= self.conf_inverse)  # 得到大于阈值分数的索引，此时为小数字
        l_ids = np.argmax(l_clses[l_valid_indices, : ], axis=1)
        l_scores = l_max_scores[l_valid_indices]

        # 3个Classify分类分支：Sigmoid计算
        s_scores = 1 / (1 + np.exp(-s_scores))
        m_scores = 1 / (1 + np.exp(-m_scores))
        l_scores = 1 / (1 + np.exp(-l_scores))

        # 3个Bounding Box分支：筛选
        s_bboxes_float32 = s_bboxes[s_valid_indices,:]#.astype(np.float32) * self.s_bboxes_scale
        m_bboxes_float32 = m_bboxes[m_valid_indices,:]#.astype(np.float32) * self.m_bboxes_scale
        l_bboxes_float32 = l_bboxes[l_valid_indices,:]#.astype(np.float32) * self.l_bboxes_scale

        # 3个Bounding Box分支：dist2bbox (ltrb2xyxy)
        s_ltrb_indices = np.sum(softmax(s_bboxes_float32.reshape(-1, 4, 16), axis=2) * self.weights_static, axis=2)
        s_anchor_indices = self.s_anchor[s_valid_indices, :]
        s_x1y1 = s_anchor_indices - s_ltrb_indices[:, 0:2]
        s_x2y2 = s_anchor_indices + s_ltrb_indices[:, 2:4]
        s_dbboxes = np.hstack([s_x1y1, s_x2y2])*8

        m_ltrb_indices = np.sum(softmax(m_bboxes_float32.reshape(-1, 4, 16), axis=2) * self.weights_static, axis=2)
        m_anchor_indices = self.m_anchor[m_valid_indices, :]
        m_x1y1 = m_anchor_indices - m_ltrb_indices[:, 0:2]
        m_x2y2 = m_anchor_indices + m_ltrb_indices[:, 2:4]
        m_dbboxes = np.hstack([m_x1y1, m_x2y2])*16

        l_ltrb_indices = np.sum(softmax(l_bboxes_float32.reshape(-1, 4, 16), axis=2) * self.weights_static, axis=2)
        l_anchor_indices = self.l_anchor[l_valid_indices,:]
        l_x1y1 = l_anchor_indices - l_ltrb_indices[:, 0:2]
        l_x2y2 = l_anchor_indices + l_ltrb_indices[:, 2:4]
        l_dbboxes = np.hstack([l_x1y1, l_x2y2])*32

        # 大中小特征层阈值筛选结果拼接
        dbboxes = np.concatenate((s_dbboxes, m_dbboxes, l_dbboxes), axis=0)
        scores = np.concatenate((s_scores, m_scores, l_scores), axis=0)
        ids = np.concatenate((s_ids, m_ids, l_ids), axis=0)

        # nms
        indices = cv2.dnn.NMSBoxes(dbboxes, scores, self.conf, self.iou)

        # 还原到原始的img尺度
        bboxes = dbboxes[indices] * np.array([self.x_scale, self.y_scale, self.x_scale, self.y_scale])
        bboxes = bboxes.astype(np.int32)

        logger.debug("\033[1;31m" + f"Post Process time = {1000*(time.time() - begin_time):.2f} ms" + "\033[0m")

        return ids[indices], scores[indices], bboxes
    

In [13]:
logging.basicConfig(
    level = logging.DEBUG,
    format = '[%(name)s] [%(asctime)s.%(msecs)03d] [%(levelname)s] %(message)s',
    datefmt='%H:%M:%S')
logger = logging.getLogger("RDK_YOLO")

In [15]:
model_path="/home/sunrise/Project20250627/Chili.bin"
iou_thres=0.45
conf_thres=0.25
classes_num=1
reg=16
model = YOLO11_Detect(model_path, conf_thres, iou_thres)

[RDK_YOLO] [11:42:02.053] [DEBUG] [1;31mLoad D-Robotics Quantize model time = 128.85 ms[0m
[RDK_YOLO] [11:42:02.057] [INFO] [1;32m-> input tensors[0m
[RDK_YOLO] [11:42:02.059] [INFO] intput[0], name=images, type=uint8, shape=(1, 3, 640, 640)
[RDK_YOLO] [11:42:02.062] [INFO] [1;32m-> output tensors[0m
[RDK_YOLO] [11:42:02.064] [INFO] output[0], name=small, type=float32, shape=(1, 80, 80, 18)
[RDK_YOLO] [11:42:02.067] [INFO] output[1], name=medium, type=float32, shape=(1, 40, 40, 18)
[RDK_YOLO] [11:42:02.070] [INFO] output[2], name=big, type=float32, shape=(1, 20, 20, 18)
[RDK_YOLO] [11:42:02.073] [INFO] self.s_bboxes_scale.shape=(1, 0), self.m_bboxes_scale.shape=(1, 0), self.l_bboxes_scale.shape=(1, 0)
[RDK_YOLO] [11:42:02.076] [INFO] self.weights_static.shape = (1, 1, 16)
[RDK_YOLO] [11:42:02.088] [INFO] self.s_anchor.shape = (6400, 2), self.m_anchor.shape = (1600, 2), self.l_anchor.shape = (400, 2)
[RDK_YOLO] [11:42:02.091] [INFO] iou threshol = 0.45, conf threshol = 0.25
[RDK_Y

[W][DNN]packed_model.cpp:114][Model](2025-06-27,11:42:02.52.264) File /home/sunrise/Project20250627/Chili.bin has loaded, Discard the latest. 


In [22]:
s_anchor = np.stack([np.tile(np.linspace(0.5, 79.5, 80), reps=80), 
                        np.repeat(np.arange(0.5, 80.5, 1), 80)], axis=0).transpose(1,0)
m_anchor = np.stack([np.tile(np.linspace(0.5, 39.5, 40), reps=40), 
                            np.repeat(np.arange(0.5, 40.5, 1), 40)], axis=0).transpose(1,0)
l_anchor = np.stack([np.tile(np.linspace(0.5, 19.5, 20), reps=20), 
                            np.repeat(np.arange(0.5, 20.5, 1), 20)], axis=0).transpose(1,0)
input_tensor=model.bgr2nv12(frame)

[RDK_YOLO] [11:42:14.110] [DEBUG] [1;31mbgr8 to nv12 time = 5.78 ms[0m


In [23]:
outputs = model.c2numpy(model.forward(input_tensor))

[RDK_YOLO] [11:42:36.222] [DEBUG] [1;31mforward time = 18.45 ms[0m
[RDK_YOLO] [11:42:36.227] [DEBUG] [1;31mc to numpy time = 0.62 ms[0m


In [37]:
outputs[2].shape

(1, 20, 20, 18)

In [29]:

def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def postprocess_yolov5s_output(output, conf_thres=0.25, iou_thres=0.45, input_size=640, anchors=None):
    """
    NumPy 版本的 YOLOv5 后处理（单层）
    参数:
        output: ndarray, shape (1, 80, 80, 18)
        anchors: list or array, [[w1,h1], [w2,h2], [w3,h3]]
    返回:
        list of [x1, y1, x2, y2, confidence]
    """
    if anchors is None:
        anchors = np.array([[10,13], [16,30], [33,23]])  # 默认 anchor

    stride = input_size // output.shape[1]  # 640 / 80 = 8
    output = output.reshape(1, 80, 80, 3, 6)  # [1, 80, 80, 3, 6]
    output = output[0]  # [80, 80, 3, 6]

    boxes = []

    for y in range(80):
        for x in range(80):
            for a in range(3):
                tx, ty, tw, th, obj, cls = output[y, x, a]

                obj = sigmoid(obj)
                cls = sigmoid(cls)
                score = obj * cls
                if score < conf_thres:
                    continue

                anchor_w, anchor_h = anchors[a]

                # 解码回原图坐标
                cx = (sigmoid(tx) + x) * stride
                cy = (sigmoid(ty) + y) * stride
                w = np.exp(tw) * anchor_w
                h = np.exp(th) * anchor_h

                x1 = cx - w / 2
                y1 = cy - h / 2
                x2 = cx + w / 2
                y2 = cy + h / 2

                boxes.append([x1, y1, x2, y2, score])

    return nms(np.array(boxes), iou_thres)

def nms(boxes, iou_threshold):
    if len(boxes) == 0:
        return []

    boxes = boxes[np.argsort(-boxes[:, 4])]  # 按 score 降序排序
    keep = []

    while len(boxes) > 0:
        best = boxes[0]
        keep.append(best)
        rest = boxes[1:]

        ious = compute_iou_batch(best, rest)
        boxes = rest[ious < iou_threshold]

    return keep

def compute_iou_batch(box1, boxes):
    """
    box1: shape (5,)  [x1, y1, x2, y2, conf]
    boxes: shape (N, 5)
    return: shape (N,) 的 IoU 数组
    """
    x1 = np.maximum(box1[0], boxes[:, 0])
    y1 = np.maximum(box1[1], boxes[:, 1])
    x2 = np.minimum(box1[2], boxes[:, 2])
    y2 = np.minimum(box1[3], boxes[:, 3])

    inter_area = np.maximum(0, x2 - x1) * np.maximum(0, y2 - y1)
    area1 = (box1[2] - box1[0]) * (box1[3] - box1[1])
    area2 = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1])
    union_area = area1 + area2 - inter_area

    iou = inter_area / (union_area + 1e-6)
    return iou

In [31]:
results=postprocess_yolov5s_output(outputs[0])

In [32]:
for box in results:
    x1, y1, x2, y2, conf = box
    print(f"BBox: ({x1:.1f}, {y1:.1f}) → ({x2:.1f}, {y2:.1f}), Confidence: {conf:.2f}")

BBox: (75.9, 215.4) → (109.6, 272.1), Confidence: 0.68


In [33]:
import cv2

def draw_bboxes(frame, results, color=(0, 255, 0)):
    """
    在图像上绘制边框和置信度分数
    :param frame: 原始图像 (BGR 格式)
    :param results: 检测结果 [[x1, y1, x2, y2, conf], ...]
    :param color: 边框颜色，默认绿色
    :return: 带框图像
    """
    for box in results:
        x1, y1, x2, y2, conf = box
        x1, y1, x2, y2 = map(int, [x1, y1, x2, y2])
        cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
        cv2.putText(frame, f"{conf:.2f}", (x1, y1 - 5),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)
    return frame


In [34]:
frame = draw_bboxes(frame, results)

In [35]:
cv2.imwrite("/home/sunrise/Project20250627/render.jpg",frame)

True