In [1]:
%pip install fastapi uvicorn python-multipart opencv-python-headless numpy ultralytics supervision

Collecting python-multipart
  Downloading python_multipart-0.0.20-py3-none-any.whl (24 kB)
Collecting opencv-python-headless
  Downloading opencv_python_headless-4.11.0.86-cp37-abi3-win_amd64.whl (39.4 MB)
     --------------------------------------- 39.4/39.4 MB 27.3 MB/s eta 0:00:00
Collecting supervision
  Using cached supervision-0.25.1-py3-none-any.whl (181 kB)
Collecting defusedxml<0.8.0,>=0.7.1
  Downloading defusedxml-0.7.1-py2.py3-none-any.whl (25 kB)
Installing collected packages: python-multipart, opencv-python-headless, defusedxml, supervision
Successfully installed defusedxml-0.7.1 opencv-python-headless-4.11.0.86 python-multipart-0.0.20 supervision-0.25.1




[notice] A new release of pip available: 22.2.2 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [3]:
# 1. Import thư viện
from fastapi import FastAPI, UploadFile, File, Form
from fastapi.responses import JSONResponse
import uvicorn
import nest_asyncio
import shutil
import os
from pathlib import Path

import cv2
import numpy as np
from ultralytics import YOLO
from collections import defaultdict, deque
import supervision as sv

Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.


In [4]:
# Cho phép FastAPI chạy trong notebook
nest_asyncio.apply()

# 2. Khởi tạo FastAPI app
app = FastAPI()

In [5]:
# 3. Tọa độ polygon giám sát (có thể chỉnh tùy video)
SOURCE = np.array([[1252, 787], [2298, 803], [5039, 2159], [-550, 2159]])
TARGET_WIDTH = 25
TARGET_HEIGHT = 250
TARGET = np.array([[0, 0], [TARGET_WIDTH-1, 0], [TARGET_WIDTH-1, TARGET_HEIGHT-1], [0, TARGET_HEIGHT-1]])

class ViewTransformer:
    def __init__(self, source, target):
        self.m = cv2.getPerspectiveTransform(source.astype(np.float32), target.astype(np.float32))

    def transform_points(self, points):
        if points.size == 0:
            return points
        reshaped_points = points.reshape(-1, 1, 2).astype(np.float32)
        transformed = cv2.perspectiveTransform(reshaped_points, self.m)
        return transformed.reshape(-1, 2)

In [6]:
# 4. Xử lý chính: detect & tính tốc độ
def run_speed_detection(video_path: str, speed_limit: float, output_path: str = "output.mp4") -> dict:
    video_info = sv.VideoInfo.from_video_path(video_path)
    model = YOLO("yolov8n.pt")  # Dùng model nhỏ để demo
    byte_track = sv.ByteTrack(frame_rate=video_info.fps, track_activation_threshold=0.3)

    thickness = sv.calculate_optimal_line_thickness(video_info.resolution_wh)
    text_scale = sv.calculate_optimal_text_scale(video_info.resolution_wh)
    box_annotator = sv.BoxAnnotator(thickness=thickness)
    label_annotator = sv.LabelAnnotator(text_scale=text_scale, text_thickness=thickness, text_position=sv.Position.BOTTOM_CENTER)
    trace_annotator = sv.TraceAnnotator(thickness=thickness, trace_length=video_info.fps*2, position=sv.Position.BOTTOM_CENTER)

    frame_generator = sv.get_video_frames_generator(source_path=video_path)
    polygon_zone = sv.PolygonZone(polygon=SOURCE)
    view_transformer = ViewTransformer(source=SOURCE, target=TARGET)

    coordinates = defaultdict(lambda: deque(maxlen=video_info.fps))
    violations = []

    output_dir = Path("violations")
    output_dir.mkdir(exist_ok=True)

    with sv.VideoSink(output_path, video_info) as sink:
        for frame_index, frame in enumerate(frame_generator):
            result = model(frame)[0]
            detections = sv.Detections.from_ultralytics(result)
            detections = detections[detections.confidence > 0.3]
            detections = detections[polygon_zone.trigger(detections)]
            detections = detections.with_nms(threshold=0.7)
            detections = byte_track.update_with_detections(detections=detections)

            points = detections.get_anchors_coordinates(anchor=sv.Position.BOTTOM_CENTER)
            points = view_transformer.transform_points(points=points).astype(int)

            for tracker_id, [_, y] in zip(detections.tracker_id, points):
                coordinates[tracker_id].append(y)

            labels = []
            colors = []
            for det, tracker_id in zip(detections, detections.tracker_id):
                if len(coordinates[tracker_id]) < video_info.fps / 2:
                    labels.append(f"#{tracker_id}")
                    colors.append(sv.Color.GREEN)
                else:
                    y_start = coordinates[tracker_id][-1]
                    y_end = coordinates[tracker_id][0]
                    distance = abs(y_start - y_end)
                    time = len(coordinates[tracker_id]) / video_info.fps
                    speed = distance / time * 3.6
                    if speed > speed_limit:
                        labels.append(f"#{tracker_id} {int(speed)} km/h 🚨")
                        colors.append(sv.Color.RED)

                        # Lưu hình ảnh vi phạm
                        x1, y1, x2, y2 = map(int, det.xyxy)
                        car_img = frame[y1:y2, x1:x2]
                        cv2.imwrite(str(output_dir / f"violation_{tracker_id}_{frame_index}.jpg"), car_img)

                        # Ghi log
                        violations.append({"id": int(tracker_id), "speed": round(speed, 2)})
                    else:
                        labels.append(f"#{tracker_id} {int(speed)} km/h")
                        colors.append(sv.Color.GREEN)

            annotated = frame.copy()
            annotated = trace_annotator.annotate(scene=annotated, detections=detections)
            annotated = box_annotator.annotate(scene=annotated, detections=detections, color=colors)
            annotated = label_annotator.annotate(scene=annotated, detections=detections, labels=labels)

            sink.write_frame(annotated)

    return {
        "message": "Done",
        "total_violations": len(violations),
        "violators": violations,
        "output_video": output_path
    }


In [7]:
# 5. API endpoint
@app.post("/detect-speed")
async def detect_speed_api(video: UploadFile = File(...), speed_limit: float = Form(...)):
    video_path = f"temp_{video.filename}"
    with open(video_path, "wb") as f:
        f.write(await video.read())

    output_video = f"out_{video.filename}"
    result = run_speed_detection(video_path, speed_limit, output_video)

    return JSONResponse(content=result)

# 6. Chạy API (dùng trong Jupyter)
uvicorn.run(app, host="0.0.0.0", port=8000)

INFO:     Started server process [367]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)
INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [367]


In [3]:
%pip install paddlepaddle

Collecting paddlepaddle
  Downloading paddlepaddle-3.0.0-cp310-cp310-win_amd64.whl (97.0 MB)
     ---------------------------------------- 97.0/97.0 MB 2.5 MB/s eta 0:00:00
Collecting httpx
  Using cached httpx-0.28.1-py3-none-any.whl (73 kB)
Collecting astor
  Using cached astor-0.8.1-py2.py3-none-any.whl (27 kB)
Collecting opt-einsum==3.3.0
  Using cached opt_einsum-3.3.0-py3-none-any.whl (65 kB)
Collecting httpcore==1.*
  Using cached httpcore-1.0.8-py3-none-any.whl (78 kB)
Installing collected packages: opt-einsum, httpcore, astor, httpx, paddlepaddle
  Attempting uninstall: opt-einsum
    Found existing installation: opt_einsum 3.4.0
    Uninstalling opt_einsum-3.4.0:
      Successfully uninstalled opt_einsum-3.4.0
Successfully installed astor-0.8.1 httpcore-1.0.8 httpx-0.28.1 opt-einsum-3.3.0 paddlepaddle-3.0.0
Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip available: 22.2.2 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip
