In [1]:
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from PIL import Image
import torch
from torchvision import transforms
from torch.utils.data import Dataset
import torchvision.transforms.functional as F
import os
from torchvision.ops import nms
from torchvision.models.detection.transform import GeneralizedRCNNTransform
from torchvision.models.detection.backbone_utils import BackboneWithFPN
from torchvision.models import mobilenet_v3_small
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator
from torchvision.models import MobileNet_V3_Small_Weights
import cv2


In [2]:
data_transform = transforms.Compose([
    transforms.Resize(size=(640, 640)),
    transforms.ToTensor()
])

class_to_idx = {
    "bus": 0, "car": 1, "motorbike": 2, "truck": 3,
}

class CustomDataset(Dataset):
    def __init__(self,
                 image_dir,
                 class_to_idx=class_to_idx,
                 transform=data_transform):
        self.image_dir = image_dir
        self.transform = transform
        self.class_to_idx = class_to_idx if class_to_idx else {}

        self.image_files = [f for f in os.listdir(
            self.image_dir) if f.endswith(".jpg") or f.endswith(".png")]

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_filename = self.image_files[idx]
        img_path = os.path.join(self.image_dir, img_filename)

        img = Image.open(img_path).convert("RGB")
        original_width, original_height = img.size

        if self.transform:
            img = self.transform(img)

        new_width, new_height = 640, 640
        scale_x = new_width / original_width
        scale_y = new_height / original_height

        return img

In [3]:
class CustomRCNNTransform(GeneralizedRCNNTransform):
    def __init__(self):
        super().__init__(min_size=640, max_size=640, image_mean=[
            0.485, 0.456, 0.406], image_std=[0.229, 0.224, 0.225])

    def resize(self, image, target):
        image = F.resize(image, [640, 640])

        if target is not None and "boxes" in target:
            w_old, h_old = image.shape[-1], image.shape[-2]
            w_new, h_new = 640, 640
            scale_w = w_new / w_old
            scale_h = h_new / h_old
            target["boxes"][:, [0, 2]] *= scale_w
            target["boxes"][:, [1, 3]] *= scale_h
        return image, target


class FRCNN(torch.nn.Module):
    def __init__(self,
                 num_classes,
                 pretrained=MobileNet_V3_Small_Weights.DEFAULT):
        super(FRCNN, self).__init__()
        self.num_classes = num_classes
        self.backbone = self.get_backbone(pretrained)

        self.anchor_sizes = (32, 64, 128, 256)
        self.aspect_ratios = ((0.5, 1.0, 2.0),) * len(self.anchor_sizes)

        self.anchor_generator = AnchorGenerator(
            sizes=self.anchor_sizes,
            aspect_ratios=self.aspect_ratios
        )

        self.model = FasterRCNN(
            backbone=self.backbone,
            num_classes=num_classes,
            rpn_anchor_generator=self.anchor_generator
        )

        self.model.transform = CustomRCNNTransform()

    def get_backbone(self, pretrained):
        backbone = mobilenet_v3_small(weights=pretrained).features
        return_layers = {'2': '0', '7': '1', '12': '2'}
        in_channels = [24, 48, 576]

        backbone.out_channels = 64
        fpn = BackboneWithFPN(
            backbone=backbone,
            return_layers=return_layers,
            in_channels_list=in_channels,
            out_channels=64
        )

        return fpn

    def forward(self, images, targets=None):
        if self.training:
            if targets is None:
                raise ValueError("In training mode, targets should be passed")
            return self.model(images, targets)
        else:
            return self.model(images)



# Define a custom collate function outside other functions
def detection_collate_fn(batch):
    return tuple(zip(*batch))

In [5]:
model = FRCNN(num_classes=5)  # Khởi tạo model với số lớp cần thiết
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f"Total parameters: {total_params}")
print(f"Trainable parameters: {trainable_params}")

Total parameters: 5404872
Trainable parameters: 5404872


In [4]:
def draw_bounding_boxes(image, boxes, labels, scores, class_names, threshold=0.7):
    fig, ax = plt.subplots(1, figsize=(12, 12))
    ax.imshow(image)

    for box, label, score in zip(boxes, labels, scores):
        if score >= threshold:  # Chỉ hiển thị các box có score cao hơn ngưỡng
            x_min, y_min, x_max, y_max = box
            width = x_max - x_min
            height = y_max - y_min

            # Vẽ hình chữ nhật
            rect = patches.Rectangle(
                (x_min, y_min), width, height, linewidth=2, edgecolor='r', facecolor='none'
            )
            ax.add_patch(rect)

            # Thêm tên class và score
            class_name = class_names[label - 1]  # -1 vì label bắt đầu từ 1, class_names từ 0
            label_text = f"{class_name}: {score:.2f}"
            plt.text(
                x_min, y_min - 10,  # Vị trí văn bản (phía trên box)
                label_text,
                color='white',
                fontsize=12,
                bbox=dict(facecolor='red', alpha=0.5)  # Hộp nền đỏ cho văn bản
            )

    plt.axis('off')
    plt.show()


In [7]:
def test_model(model_path, video_path, class_names, nms_threshold=0.5, output_path=None, threshold=0.7):
    # Device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Load model từ checkpoint
    model = FRCNN(num_classes=len(class_names) + 1)  # +1 cho background
    checkpoint = torch.load(model_path, map_location=device, weights_only=True)
    model.load_state_dict(checkpoint["model_state_dict"])
    model.to(device)
    model.eval()

    # Mở file video
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Không thể mở file video!")
        return

    # Lấy thông tin video
    fps = int(cap.get(cv2.CAP_PROP_FPS))

    # Nếu có output_path, tạo video đầu ra với codec H.264
    if output_path:
        # Sử dụng codec H.264 (AVC). Lưu ý: Cần OpenCV build với FFmpeg để H264 hoạt động
        fourcc = cv2.VideoWriter_fourcc(*'H264')  # Thử với H264
        # Nếu H264 không hoạt động, có thể thay bằng 'X264' hoặc kiểm tra build OpenCV
        # fourcc = cv2.VideoWriter_fourcc(*'X264')  # Một lựa chọn thay thế
        out = cv2.VideoWriter(output_path, fourcc, fps, (640, 640))  # Kích thước 640x640
        if not out.isOpened():
            print("Không thể khởi tạo VideoWriter. Kiểm tra codec H.264 có được hỗ trợ không.")
            fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Fallback về mp4v nếu H264 không hoạt động
            out = cv2.VideoWriter(output_path, fourcc, fps, (640, 640))
            print("Đã chuyển sang codec mp4v.")

    # Data transform cho frame
    data_transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Resize(size=(640, 640)),
    ])

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Chuyển frame từ BGR sang RGB
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        image_pil = Image.fromarray(frame_rgb)
        image_tensor = data_transform(image_pil).unsqueeze(0).to(device)

        # Dự đoán
        with torch.no_grad():
            predictions = model(image_tensor)[0]

        # Lấy dự đoán
        boxes = predictions["boxes"]
        labels = predictions["labels"]
        scores = predictions["scores"]

        # Áp dụng NMS
        keep = nms(boxes, scores, iou_threshold=nms_threshold)
        boxes = boxes[keep].cpu().numpy()
        labels = labels[keep].cpu().numpy()
        scores = scores[keep].cpu().numpy()

        # Resize frame về 640x640
        frame_640 = cv2.resize(frame, (640, 640), interpolation=cv2.INTER_NEAREST)

        # Vẽ bounding box
        for box, label, score in zip(boxes, labels, scores):
            if score >= threshold:  # Ngưỡng score
                x_min, y_min, x_max, y_max = map(int, box)
                class_name = class_names[label - 1]
                label_text = f"{class_name}: {score:.2f}"
                cv2.rectangle(frame_640, (x_min, y_min), (x_max, y_max), (0, 0, 255), 2)
                cv2.putText(frame_640, label_text, (x_min, y_min - 10), 
                            cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 255), 2)

        # Hiển thị frame
        cv2.imshow('Object Detection', frame_640)

        # Ghi frame vào video đầu ra nếu có
        if output_path:
            out.write(frame_640)

        # Thoát nếu nhấn 'q'
        if cv2.waitKey(1) & 0xFF == ord('q'):
            print("Đã dừng bởi người dùng. Đang lưu video...")
            break

    # Giải phóng tài nguyên
    cap.release()
    if output_path:
        out.release()  # Đảm bảo video được lưu khi dừng
        print(f"Video đã được lưu tại: {output_path}")
    cv2.destroyAllWindows()
    
def test_main():
    video_path = 'data\Traffic Camera VN(4).mp4'  # Đường dẫn tới file video
    model_path = r"model\best_model (1).pt"  # Đường dẫn tới model
    class_names = ["bus", "car", "motorbike", "truck"]
    nms_threshold = 0.8
    output_path = 'data\output\output_video.mp4'  # Đường dẫn để lưu video đầu ra (tùy chọn)
    threshold = 0.7

    test_model(model_path, video_path, class_names, nms_threshold, output_path, threshold)

In [8]:
if __name__ == "__main__":
    test_main()

Đã dừng bởi người dùng. Đang lưu video...
Video đã được lưu tại: data\output\output_video.mp4
