# YOLO + RTSP 推流示例

本 Notebook 展示如何使用 toolkit 实现 YOLO 目标检测 + RTSP 推流。

**功能:**
- 从 CSI/USB 摄像头获取视频
- 使用 YOLO 进行目标检测
- 将检测结果通过 RTSP 推流输出

**依赖:**
```bash
pip install ultralytics
```

## 1. 导入库和初始化

In [None]:
# 导入必要的库
import sys
import os
import time
import threading
from queue import Queue

# 添加 toolkit 模块路径
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath("."))))

from camera import VideoSource, setup_logging, get_ips
from rtsp import RTSPServer

print("导入成功!")

## 2. 配置参数

In [None]:
# ========== 配置区域 ==========

# 视频源配置
SOURCE = "csi://0"  # 可选: "csi://0", "usb://0", 0 (自动检测)

# YOLO 模型配置
MODEL_PATH = "best.engine"  # 可选: "yolov8n.pt", "best.engine"

# 输出配置
WIDTH = 640
HEIGHT = 480
FPS = 30

# 处理间隔（每 N 帧处理一次，降低性能消耗）
INTERVAL = 1

# RTSP 配置
RTSP_PORT = 8554

# 日志级别
LOG_LEVEL = "INFO"  # DEBUG, INFO, WARNING, ERROR

print("配置完成!")
print(f"视频源: {SOURCE}")
print(f"模型: {MODEL_PATH}")
print(f"分辨率: {WIDTH}x{HEIGHT} @ {FPS}fps")

## 3. 初始化日志

In [None]:
# 设置日志
setup_logging(LOG_LEVEL)
print("日志已设置!")

## 4. 打开视频源

In [None]:
# 解析视频源
source = SOURCE
if source.isdigit():
    source = int(source)

# 创建视频源
video_src = VideoSource(source, WIDTH, HEIGHT, FPS)

# 打开摄像头
if video_src.open():
    print(f"\n摄像头已打开: {SOURCE}")
else:
    print(f"\n无法打开摄像头: {SOURCE}")
    raise SystemExit("摄像头打开失败!")

## 5. 创建 YOLO 处理器

In [None]:
class YOLOProcessor:
    """YOLO 图像处理器"""
    
    def __init__(self, model_path, cap, output_size=(640, 480), interval=1):
        from ultralytics import YOLO
        
        self.model = YOLO(model_path, task='detect')
        self.cap = cap
        self.out_w, self.out_h = output_size
        self.interval = interval
        self.frame_count = 0
        self.running = False
        
        self.input_buf = Queue(maxsize=2)
        self.output_buf = Queue(maxsize=2)
        self.fps_times = []
        self.fps_lock = threading.Lock()
    
    def start(self):
        """启动处理"""
        self.running = True
        threading.Thread(target=self._capture_thread, daemon=True).start()
        threading.Thread(target=self._process_thread, daemon=True).start()
        print("YOLO 处理器已启动")
    
    def _capture_thread(self):
        """捕获线程"""
        while self.running:
            if self.cap is None or not self.cap.isOpened():
                time.sleep(0.1)
                continue
            ret, frame = self.cap.read()
            if ret:
                if not self.input_buf.full():
                    self.input_buf.put(frame)
            else:
                time.sleep(0.01)
    
    def _process_thread(self):
        """处理线程"""
        import cv2
        while self.running:
            try:
                frame = self.input_buf.get(timeout=1.0)
                if frame is None:
                    continue
                
                self.frame_count += 1
                
                if self.frame_count % self.interval == 0:
                    results = self.model.track(frame)
                    frame = results[0].plot()
                
                with self.fps_lock:
                    self.fps_times.append(time.time())
                    if len(self.fps_times) > 30:
                        self.fps_times.pop(0)
                
                fps = self.get_fps()
                cv2.putText(frame, f"FPS: {fps:.1f}", (10, 30),
                           cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
                
                frame = cv2.resize(frame, (self.out_w, self.out_h))
                if not self.output_buf.full():
                    self.output_buf.put(frame)
            except Exception as e:
                pass
    
    def get_fps(self):
        """获取 FPS"""
        with self.fps_lock:
            if len(self.fps_times) < 2:
                return 0.0
            return (len(self.fps_times) - 1) / (self.fps_times[-1] - self.fps_times[0])
    
    def get_frame(self):
        """获取处理后的帧"""
        try:
            return self.output_buf.get_nowait()
        except:
            return None
    
    def stop(self):
        """停止处理"""
        self.running = False
        if self.cap:
            self.cap.release()

# 创建处理器
processor = YOLOProcessor(
    model_path=MODEL_PATH,
    cap=video_src.cap,
    output_size=(WIDTH, HEIGHT),
    interval=INTERVAL
)

print("YOLO 处理器已创建!")

## 6. 创建 RTSP 服务器

In [None]:
# 创建 RTSP 服务器
rtsp = RTSPServer(port=RTSP_PORT)
rtsp.add_app_stream(
    frame_provider=processor.get_frame,
    width=WIDTH,
    height=HEIGHT,
    fps=FPS
)

print("RTSP 服务器已创建!")

## 7. 显示服务信息

In [None]:
# 获取 IP 地址
local_ip, public_ip = get_ips()

print("=" * 50)
print("YOLO + RTSP 服务")
print("=" * 50)
print(f"模型: {MODEL_PATH}")
print(f"视频源: {SOURCE}")
print(f"分辨率: {WIDTH}x{HEIGHT} @ {FPS}fps")
print(f"处理间隔: 每 {INTERVAL} 帧处理一次")
print(f"\n访问地址:")
print(f"  rtsp://localhost:{RTSP_PORT}/stream")
print(f"  rtsp://{local_ip}:{RTSP_PORT}/stream")
if public_ip:
    print(f"  rtsp://{public_ip}:{RTSP_PORT}/stream")
print("=" * 50)

print("\n可以使用以下方式访问:")
print("  VLC: vlc rt://<IP>:8554/stream")
print("  FFplay: ffplay rtsp://<IP>:8554/stream")

## 8. 启动服务

**注意:** 下方单元格运行后会阻塞，直到按 Ctrl+C 停止。

你可以：
1. 点击上方工具栏的 "停止" 按钮中断
2. 或在菜单中选择 "Kernel" → "Interrupt"
3. 或使用键盘快捷键 (通常是 Ctrl+C)

In [None]:
# 启动 YOLO 处理器
processor.start()

print("\n服务正在运行...")
print("按停止按钮中断服务")

In [None]:
# 启动 RTSP 服务器（阻塞运行）
try:
    rtsp.start()
except KeyboardInterrupt:
    print("\n服务已停止")
finally:
    processor.stop()
    video_src.release()
    print("资源已释放")

## 使用说明

### 快速开始

1. 修改上方配置区域的参数
2. 依次运行所有单元格
3. 使用 VLC 或其他 RTSP 客户端访问显示的地址

### 参数调整

| 参数 | 说明 | 推荐值 |
|------|------|--------|
| SOURCE | 视频源 | csi://0, usb://0 |
| MODEL_PATH | YOLO 模型 | best.engine (快), yolov8n.pt (兼容) |
| INTERVAL | 处理间隔 | 1 (每帧), 2 (降负载) |
| WIDTH/HEIGHT | 分辨率 | 640x480 (平衡), 1280x720 (高清) |

### 常见问题

**Q: 模型加载失败**

A: 确保已安装 ultralytics 并下载了模型：
```bash
pip install ultralytics
# 下载模型到当前目录
wget https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n.pt
```