In [3]:
import numpy as np
import cv2
from tqdm.notebook import tqdm

## Helper Function

In [4]:
import torch
import torch.nn as nn
from torchvision.models import ResNet18_Weights
from torchvision.models import resnet18

class BasicBlock(nn.Module):
    expansion = 1  # No channel expansion in BasicBlock

    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3,
                               stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)

        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.downsample = downsample

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)
        return out

class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes=1000):
        super(ResNet, self).__init__()
        self.in_channels = 64

        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3,
                               bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)

        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(block, 64,  layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

    def _make_layer(self, block, out_channels, blocks, stride=1):
        downsample = None

        if stride != 1 or self.in_channels != out_channels * block.expansion:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, out_channels * block.expansion,
                          kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels * block.expansion)
            )

        layers = []
        layers.append(block(self.in_channels, out_channels, stride, downsample))
        self.in_channels = out_channels * block.expansion

        for _ in range(1, blocks):
            layers.append(block(self.in_channels, out_channels))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)

        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x

def build_model_classification(class_num=2, use_pretrained=True):
    # Create custom model
    custom_resnet18 = ResNet(BasicBlock, [2, 2, 2, 2], class_num)

    # Load pretrained weights from torchvision
    if use_pretrained:
        official_resnet18 = resnet18(weights=ResNet18_Weights.DEFAULT)

        pretrained_dict = official_resnet18.state_dict()
        model_dict = custom_resnet18.state_dict()

        #Filtered the last layer, because the difference number of class
        filtered_dict = {
            k: v for k, v in pretrained_dict.items()
            if k in model_dict and not k.startswith('fc')
        }

        custom_resnet18.load_state_dict(filtered_dict, strict=False)

    return custom_resnet18

In [5]:
import torchvision
from torchvision.models.detection.faster_rcnn import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator
from torchvision.models.detection.backbone_utils import resnet_fpn_backbone

def build_model_object_detection(backbone='resnet50', num_class=2, use_pretrained=True):
    # 1. Create the same backbone used in the pretrained model
    # This will create resnet50 with FPN (Feature Pyramid Network)
    backbone = resnet_fpn_backbone('resnet50', pretrained=True)

    # 2. Define AnchorGenerator (same as default)
    # Correct the aspect_ratios to match the default pretrained model's RPN head expectation.
    # The default config uses 3 aspect ratios per spatial location.
    anchor_generator = AnchorGenerator(
      sizes=((32,), (64,), (128,), (256,), (512,)),
      aspect_ratios=((0.5, 1.0, 2.0),) * 5
    )

    # 3. Define ROI Pooler (same as in torchvision)
    roi_pooler = torchvision.ops.MultiScaleRoIAlign(
      featmap_names=['0', '1', '2', '3'], # The default resnet_fpn_backbone returns 5 feature maps (p2, p3, p4, p5, p6).
                                              # Check the return_layers of the default backbone.
                                              # Looking at resnet_fpn_backbone source, it returns {"0": p2, "1": p3, "2": p4, "3": p5, "4": p6}.
                                              # So we need 5 feature map names.
      output_size=7,
      sampling_ratio=2
    )

    # 4. Assemble the Faster R-CNN model
    model = FasterRCNN(
      backbone=backbone,
      num_classes=num_class,  # use 2 for our case, car and background
      rpn_anchor_generator=anchor_generator,
      box_roi_pool=roi_pooler
    )

    if use_pretrained:

        pretrained_model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights="DEFAULT")

        pretrained_dict = pretrained_model.state_dict()
        model_dict = model.state_dict()

        #Filtered the last layer, because the difference number of class
        filtered_dict = {
            k: v for k, v in pretrained_dict.items()
            if k in model_dict and not k.startswith('roi_heads.box_predictor')
        }
        model.load_state_dict(filtered_dict, strict=False)

    return model

In [6]:
import torch
def load_model(model, path_load_from, device):
    """
    Save the model weights to the specified path.

    Args:
        model: The trained model.
        path_to_save: Path to save the model weights.
    """
    # path_load_from = '/content/best_model_weights.pth'
    model.load_state_dict(torch.load(path_load_from, map_location=device))
    print(f"Model weights loaded successfully from {path_load_from}")

## Main Code

In [7]:
file_name="traffic_test.mp4"
output_file = "output_traffic_test_model.mp4"

In [8]:
# Use the file_name argument
# file_name = args.file_name
# output_file = args.output_file


# Check if the file exists
if not file_name:
    raise ValueError("No input file provided. Please specify a valid video file path.")
if not file_name.endswith(('.mp4', '.avi', '.mov', '.mkv')):
    raise ValueError("Invalid file format. Please provide a valid video file (mp4, avi, mov, mkv).")
if not cv2.os.path.exists(file_name):
    raise FileNotFoundError(f"The specified file does not exist: {file_name}")

class_names = {
    0: 'City Car',
    1: 'Big Truck',
    2: 'Multi Purpose Vehicle',
    3: 'Sedan',
    4: 'Sport Utility Vehicle',
    5: 'Truck',
    6: 'Van'
}

# Load the models
model_detection = build_model_object_detection(backbone='resnet50', num_class=2, use_pretrained=False)
model_classification = build_model_classification(class_num=7, use_pretrained=False)

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
# cpu_device = torch.device("cpu")

load_model(model_detection, 'logs/detection/best_model/best_model_weights.pth', device)
load_model(model_classification, 'logs/classification/best_model/best_model_weights.pth', device)

model_detection = model_detection.to(device)
model_classification = model_classification.to(device)
model_detection.eval()
model_classification.eval()

Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth
100%|██████████| 97.8M/97.8M [00:00<00:00, 214MB/s]


Model weights loaded successfully from logs/detection/best_model/best_model_weights.pth
Model weights loaded successfully from logs/classification/best_model/best_model_weights.pth


ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
  

In [9]:
# Open the video file
cap = cv2.VideoCapture(file_name)

# Get video properties
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) # Get total number of frames


# Define the codec and create VideoWriter object
# fourcc = cv2.VideoWriter_fourcc(*'XVID')  # Use 'XVID' for .avi files
fourcc = cv2.VideoWriter_fourcc(*'XVID') # Use 'mp4v' for MP4 files
out = cv2.VideoWriter(output_file, fourcc, fps, (frame_width, frame_height))

In [10]:
for _ in tqdm(range(total_frames), desc="Processing Video"):
    ret, frame = cap.read()
    if not ret:
        break

    # Preprocess the frame
    image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB).astype(np.float32)
    image /= 255.0
    image_preprocess = torch.from_numpy(image).permute(2, 0, 1).to(device).unsqueeze(0)

    # Object detection
    outputs = model_detection(image_preprocess)
    outputs = [{k: v.to(device) for k, v in t.items()} for t in outputs]

    boxes = outputs[0]['boxes']
    scores = outputs[0]['scores']

    confidence_threshold = 0.5  # Adjust as needed
    filtered_boxes = boxes[scores > confidence_threshold].detach().cpu().numpy().astype(np.int32)

    img_predictions = []

    # Iterate for each box prediction
    for bbox in filtered_boxes:
        if bbox.ndim != 1 or bbox.shape[0] != 4:
            img_predictions.append(-2)  # Indicate skipped due to format
            continue
        x_min, y_min, x_max, y_max = [int(b.item()) for b in bbox]

        height, width = image_preprocess[0].shape[-2:]  # If the format is cxhxw

        x_min = max(0, x_min)
        y_min = max(0, y_min)
        x_max = min(width, x_max)
        y_max = min(height, y_max)

        classifier_device = model_classification.fc.weight.device

        roi_np = image_preprocess[0][:, y_min:y_max, x_min:x_max].unsqueeze(0)
        roi_np = roi_np.to(classifier_device)

        with torch.no_grad():  # No need to calculate gradients for inference
            output = model_classification(roi_np)
        probabilities = torch.softmax(output, dim=1)

        _, predicted_class = torch.max(probabilities, 1)
        img_predictions.append(predicted_class.item())

    # Draw bounding boxes and labels on the frame
    for i, box in enumerate(filtered_boxes):
        x_min, y_min, x_max, y_max = box.astype(np.int32)
        label_index = img_predictions[i]
        label = class_names.get(label_index, f'Class {label_index}')  # Get class name or use index

        # Draw the bounding box
        cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), (220, 0, 0), 3)

        # Put the label and score text
        text = f'{label}'
        text_x = x_min
        text_y = y_min - 10 if y_min - 10 > 10 else y_min + 10
        cv2.putText(frame, text, (text_x, text_y), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)

    # Write the processed frame to the output video
    out.write(frame)

# Release resources
cap.release()
out.release()
print(f"Processed video saved to {output_file}")

Processing Video:   0%|          | 0/5920 [00:00<?, ?it/s]

Processed video saved to output_traffic_test_model.mp4
