In [1]:
import math
import random
import numpy as np
from PIL import Image

import cv2
import matplotlib.pyplot as plt
import plotly.graph_objects as go

from tqdm import tqdm

import json

import torch
import torch.nn as nn

from torch.utils.data import Dataset, DataLoader

import torch
from torchvision import transforms
import torchvision.transforms.functional as F
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection import fasterrcnn_resnet50_fpn, fasterrcnn_resnet50_fpn_v2

import os

import kagglehub


In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [3]:

# Download latest version
path = kagglehub.dataset_download("pkdarabi/vehicle-detection-image-dataset")

print("Path to dataset files:", path)

file_path = os.path.join(path, 'No_Apply_Grayscale/No_Apply_Grayscale/Vehicles_Detection.v8i.coco/train/_annotations.coco.json')

# Read the Json Annotations file
with open(file_path, 'r') as file:
    coco_data = json.load(file)



Downloading from https://www.kaggle.com/api/v1/datasets/download/pkdarabi/vehicle-detection-image-dataset?dataset_version_number=7...


100%|██████████| 262M/262M [00:14<00:00, 18.5MB/s]

Extracting files...





Path to dataset files: /root/.cache/kagglehub/datasets/pkdarabi/vehicle-detection-image-dataset/versions/7


In [4]:

# Extract the categories
categories = {category['id']: category['name'] for category in coco_data['categories']}

cat_frequencies = {}
for annotation in coco_data['annotations']:
    category_id = annotation['category_id']
    category_name = categories.get(category_id, 'Unknown')
    cat_frequencies[category_name] = cat_frequencies.get(category_name, 0) + 1

# Visualize if there is a class imbalance
classes = list(cat_frequencies.keys())
frequencies = list(cat_frequencies.values())
fig = go.Figure(data=[go.Bar(x=classes, y=frequencies)])
fig.update_layout(title='Frequency of Classes', xaxis_title='Classes', yaxis_title='Frequency')
fig.show()

In [5]:
# Split Data into training and Testing sets
train_image_path = os.path.join(path, 'No_Apply_Grayscale/No_Apply_Grayscale/Vehicles_Detection.v8i.coco/train')
train_label_file = os.path.join(path, 'No_Apply_Grayscale/No_Apply_Grayscale/Vehicles_Detection.v8i.coco/train/_annotations.coco.json')
test_image_path = os.path.join(path, 'No_Apply_Grayscale/No_Apply_Grayscale/Vehicles_Detection.v8i.coco/test')
test_label_file = os.path.join(path, 'No_Apply_Grayscale/No_Apply_Grayscale/Vehicles_Detection.v8i.coco/test/_annotations.coco.json')

In [6]:
with open(train_label_file, 'r') as f:
    train_data_dict = json.load(f)

with open(test_label_file, 'r') as f:
    test_data_dict = json.load(f)

train_data_dict['annotations'][0]

{'id': 0,
 'image_id': 0,
 'category_id': 3,
 'bbox': [310, 493, 22, 78],
 'area': 1716,
 'segmentation': [],
 'iscrowd': 0}

In [7]:
# Map class id to class name
cat_mapping = {}
for cat in train_data_dict['categories']:
    cat_mapping[cat['id']] = cat['name']

# Custom PyTorch Dataset and DataLoader
To format and process the data in a way that supports data preprocessing steps such as data augmentation, a custom DataLoader class and function was written. This was done using the PyTorch Data Tutorial.

In [8]:
class Compose(object):
  def __init__(self, transforms):
    """
    Argument:
      transforms (list): list of transforms to compose
    """
    self.transforms = transforms
  def __call__(self, image, bboxes):
    """
    Function that loops through all the augmentations in the augmentation list using the input data

    Arguments:
    - image (PIL.Image): Input image
    - bboxes (list): List of bounding boxes

    Returns:
    - image (PIL.Image): Transformed image
    - bboxes (list): List of transformed bounding boxes
    """
    for t in self.transforms:
      image, bboxes = t(image, bboxes)
    return image, bboxes

class RandomHorizontalFlip(object):
    def __init__(self, p=0.5):
        """
        Argument:
          p (float): Probability of an image being flipped
        """
        self.p = p

    def __call__(self, image, bboxes):
        '''
        Function that applies the horizontal flip.
        Arguments:
        -----------
            - image(PIL.Image.Image): Image.
            - bboxes(torch.Tensor): Bounding boxes related to the image.
        Return:
        ---------
            - image(PIL.Image.Image): Image after horizontal flip.
            - bboxes(torch.Tensor): Bounding boxes related to the image after horizontal flip.
        '''
        if random.random() < self.p:
            image = F.hflip(image)
            width, _ = image.size
            if len(bboxes) != 0:
                bboxes = bboxes.clone()
                bboxes[:, [0, 2]] = width - bboxes[:, [2, 0]]  # Flip x_min and x_max
        return image, bboxes

class ToTensor(object):
  def __call__(self, image, bboxes):
    """
    Function that converts input data into torch.Tensor format

    Arguments:
    - image (PIL.Image): Input image
    - bboxes (list): List of bounding boxes

    Returns:
    - image (torch.Tensor): Transformed image
    - bboxes (list): List of transformed bounding boxes
    """
    image = F.to_tensor(image)
    return image, bboxes


In [9]:
# Train transforms with data augmentation
train_transforms = Compose([
    RandomHorizontalFlip(p=0.5),
    ToTensor()
])


# Validation transforms (no augmentation, only preprocessing)
val_transforms = Compose([
    ToTensor()
])

# Test transforms (usually same as validation for fair evaluation)
test_transforms = val_transforms


## Custom Dataset Class
PyTorch allows for writing custom dataset classes which allow for more flexibility when reading and processing the data which will likely result in a model that achieves better accuracy. This can help in a variety of ways, such as data preprocessing, efficiency in loading, and adding custom logic. For this usecase, the dataset is relatively small, so data preprocessing is very important as we want to ensure the model is not simply memorizing the data, so applying custom filters, augmentations, scaling and other preprocessing steps is important in ensuring the model can be applied to more than just this dataset.

In [10]:
class VehicleDataset(Dataset):
  def __init__(self, image_path, data_dict, transforms=None):
    """
    Initialize function with input values

    Arguments:
    - image_path (np.ndarray): Array with all images (X)
    - data_dict (np.ndarray): Array with all labels (y)
    - transforms (list): List of transforms to apply

    Return:
    None
    """
    self.image_path = image_path
    self.data_dict = data_dict
    self.transforms = transforms
    self.dataset = self.create_dataset()

  def create_dataset(self):
    """
    Function to create dataset list, achieved by paring each image path to the annotations (y-values) in that image
    Arguments:
    None
    Returns:
    dataset (list): List of tuples (image, annotations)
    """
    dataset = []
    for element in self.data_dict['images']:
      image_id = element['id']
      annotations = []
      for annotation in self.data_dict['annotations']:
        if annotation['image_id'] == image_id:
          annotations.append(annotation)
      dataset.append((element['file_name'], annotations))
    return dataset

  def __len__(self):
    """
    Function to return length of dataset

    Arguments:
    None
    Returns:
    length (int): Length of dataset
    """
    return len(self.data_dict['images'])

  def collate_fn(self, batch):
      """
      Since each image may have a different number of objects,
      we need a collate function (to be passed to the DataLoader).

      Arguments:
      ---------
          - batch: an iterable of N sets from __getitem__()
      Return:
      ---------
          - images(torch.Tensor): a tensor of batch_size images
          - boxes(list[torch.Tensor]): list with batch_size varying-size tensors of bounding boxes
          - labels(list[torch.Tensor]): list with batch_size varying-size tensors of labels
          - areas(list[torch.Tensor]): list with batch_size varying-size tensors of areas
          - image_id(list[torch.Tensor]): list with batch_size varying-size tensors of image ids
      """
      images = list()
      boxes = list()
      labels = list()
      areas = list()
      image_id = list()

      for b in batch:
          images.append(b[0])
          boxes.append(b[1])
          labels.append(b[2])
          areas.append(b[3])
          image_id.append(b[4])

      images = torch.stack(images, dim=0)

      return images, boxes, labels, areas, image_id

  def __getitem__(self, idx):
      image = cv2.imread(os.path.join(self.image_path, self.dataset[idx][0]))
      # image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
      category_ids = []
      bounding_boxes = []
      areas = []
      iscrowd = []
      image_id = []
      for annotation in self.dataset[idx][1]:
          category_ids.append(annotation['category_id'])
          bounding_boxes.append(annotation['bbox'])
          areas.append(annotation['area'])
          iscrowd.append(annotation['iscrowd'])
          image_id.append(annotation['image_id'])

      # Convert metadata into tensors
      category_ids = torch.tensor(category_ids, dtype=torch.int64)

      if torch.any(category_ids < 0):
        print(f"Invalid label found for image {idx}. Category IDs: {category_ids}")

      areas = torch.tensor(areas, dtype=torch.float32)
      iscrowd = torch.tensor(iscrowd)
      image_id = torch.tensor(image_id)
      bounding_boxes = np.array(bounding_boxes, dtype=np.float32)

      for i, bounding_box in enumerate(bounding_boxes):
        x_min, y_min, width, height = bounding_box
        if width <= 0 or height <= 0:
            print(f"Invalid bounding box for image {idx}: {bounding_box}")
        x1, y1 = x_min, y_min
        x2, y2 = x_min + width, y_min + height
        bounding_boxes[i] = np.array([x1, y1, x2, y2])
      # Convert Bounding boxes to tensors
      bounding_boxes = torch.tensor(bounding_boxes, dtype=torch.float32)

      if self.transforms:
        image = Image.fromarray(image)
        image, bounding_boxes = self.transforms(image, bounding_boxes)

      bounding_boxes = torch.from_numpy(np.array(bounding_boxes))
      # category_ids = torch.Tensor(category_ids).long()

      return image, bounding_boxes, category_ids, areas, image_id

In [11]:
train_dataset = VehicleDataset(train_image_path, train_data_dict, train_transforms)
test_dataset = VehicleDataset(test_image_path, test_data_dict, test_transforms)

train_dataloader = DataLoader(
    train_dataset, batch_size=16, shuffle=True, collate_fn=train_dataset.collate_fn
)
test_dataloader = DataLoader(
    test_dataset, batch_size=16, shuffle=False, collate_fn=test_dataset.collate_fn
)

In [12]:
# Load a pre-trained Faster R-CNN model with a ResNet50 FPN backbone
model = fasterrcnn_resnet50_fpn(pretrained=True)

# Modify the model for your custom number of classes (if necessary)
# For example, if you have 3 classes (background + 2 custom classes):
num_classes = len(categories) # 1 class (background) + 2 custom classes
print(num_classes)

# Replace the classifier with a new one (this part is for fine-tuning)
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

# Move the model to the GPU if available
# device = torch.device("cpu")
model.to(device)



The parameter 'pretrained' is deprecated since 0.13 and may be removed in the future, please use 'weights' instead.


Arguments other than a weight enum or `None` for 'weights' are deprecated since 0.13 and may be removed in the future. The current behavior is equivalent to passing `weights=FasterRCNN_ResNet50_FPN_Weights.COCO_V1`. You can also use `weights=FasterRCNN_ResNet50_FPN_Weights.DEFAULT` to get the most up-to-date weights.

Downloading: "https://download.pytorch.org/models/fasterrcnn_resnet50_fpn_coco-258fb6c6.pth" to /root/.cache/torch/hub/checkpoints/fasterrcnn_resnet50_fpn_coco-258fb6c6.pth
100%|██████████| 160M/160M [00:00<00:00, 219MB/s]


6


FasterRCNN(
  (transform): GeneralizedRCNNTransform(
      Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
      Resize(min_size=(800,), max_size=1333, mode='bilinear')
  )
  (backbone): BackboneWithFPN(
    (body): IntermediateLayerGetter(
      (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
      (bn1): FrozenBatchNorm2d(64, eps=0.0)
      (relu): ReLU(inplace=True)
      (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (layer1): Sequential(
        (0): Bottleneck(
          (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn1): FrozenBatchNorm2d(64, eps=0.0)
          (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn2): FrozenBatchNorm2d(64, eps=0.0)
          (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn3): FrozenBatchNorm2d(256, eps=0.0)
          (relu): ReLU(

In [14]:
# Train the model
import torch
import numpy as np
import matplotlib.pyplot as plt
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from tqdm import tqdm

# Optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.Adam(params, lr=1e-4)

# Learning rate scheduler
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

def calculate_iou(box1, box2):
    """
    Function that calculates IoU metric.
    Arguments:
    ---------
        - box1(np.ndarray|list): First bounding box in the format (x_min, y_min, x_max, y_max).
        - box2(np.ndarray|list): Second bounding box in the format (x_min, y_min, x_max, y_max).
    Return:
    ---------
        - iou(float): Intersection over Union metric from the two bounding boxes.
    """
    x1_min, y1_min, x1_max, y1_max = box1
    x2_min, y2_min, x2_max, y2_max = box2

    inter_min_x = max(x1_min, x2_min)
    inter_min_y = max(y1_min, y2_min)
    inter_max_x = min(x1_max, x2_max)
    inter_max_y = min(y1_max, y2_max)

    inter_area = max(0, inter_max_x - inter_min_x) * max(0, inter_max_y - inter_min_y)

    box1_area = (x1_max - x1_min) * (y1_max - y1_min)
    box2_area = (x2_max - x2_min) * (y2_max - y2_min)

    iou = inter_area / float(box1_area + box2_area - inter_area)
    return iou

# Training function
def train_one_epoch(model, data_loader, optimizer, epoch, lr_scheduler=None):
    model.train()
    print(f"Epoch {epoch} - Training...")
    i = 0
    total_train_loss = 0
    loss_classifier = 0
    loss_box_reg = 0
    loss_objectness = 0
    loss_rpn_box_reg = 0
    total_samples = 0

    for images, bounding_boxes, labels, areas, image_id in data_loader:
        images = images.float()
        targets = [{'boxes':bounding_boxes[i], 'labels':labels[i]} for i, bb in enumerate(bounding_boxes)]
        # # Ensure target boxes are never empty, even if there are no objects
        # targets = []
        # for i in range(len(bounding_boxes)):
        #     # Handle cases where there are no bounding boxes (empty tensor for 'boxes' and 'labels')
        #     if bounding_boxes[i].numel() == 0:
        #         targets.append({'boxes': torch.empty(0, 4), 'labels': torch.empty(0)})
        #     else:
        #         targets.append({'boxes': bounding_boxes[i], 'labels': labels[i] - 1})

        # Zero gradients
        optimizer.zero_grad()

        if 0 in list(map(len, bounding_boxes)):
            continue
        # Forward pass
        loss_dict = model(images, targets)

        # Get total loss and backpropagate
        losses = sum(loss for loss in loss_dict.values())
        losses.backward()

        # Update parameters
        optimizer.step()

        total_train_loss += losses.item() * images.size(0)
        loss_classifier += loss_dict['loss_classifier'].item() * images.size(0)
        loss_box_reg += loss_dict['loss_box_reg'].item() * images.size(0)
        loss_objectness += loss_dict['loss_objectness'].item() * images.size(0)
        loss_rpn_box_reg += loss_dict['loss_rpn_box_reg'].item() * images.size(0)
        total_samples += images.size(0)
        i += 1

    average_train_loss = total_train_loss / total_samples
    average_classifier_loss = loss_classifier / total_samples
    average_box_reg_loss = loss_box_reg / total_samples
    average_objectness_loss = loss_objectness / total_samples
    average_rpn_box_reg_loss = loss_rpn_box_reg / total_samples
    lr_scheduler.step()

    print(f"Epoch {epoch + 1}, Total Loss: {average_train_loss:.4f}, Class. Loss: {average_classifier_loss:.4f}, Box Reg. Loss: {average_box_reg_loss:.4f}, LR: {lr_scheduler.get_last_lr()[0]:.7f}", end='\r')

    return average_train_loss, average_classifier_loss, average_box_reg_loss


# Validation function (with visualization)
def evaluate(model, data_loader):
    model.eval()
    iou_scores = []
    with torch.no_grad():
        for images, bounding_boxes, labels, areas, image_id in data_loader:
            images = [image.to(device) for image in images]
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            # Get model predictions
            predictions = model(images)

            for prediction, target in zip(predictions, targets):
                pred_boxes = prediction['boxes']
                true_boxes = target['boxes']
                iou = calculate_iou(pred_boxes, true_boxes)
                iou_scores.extend(iou.tolist())

                # Visualize predictions and ground truth
                visualize_predictions(images[0], pred_boxes, target['labels'], iou)

    avg_iou = np.mean(iou_scores) if iou_scores else 0
    print(f"Average IoU: {avg_iou:.4f}")
    return avg_iou

def visualize_predictions(image, pred_boxes, pred_labels, target):
    image = image.permute(1, 2, 0).cpu().numpy()
    plt.figure(figsize=(10, 10))
    plt.imshow(image)

    # Plot ground truth boxes
    for box in target['boxes']:
        xmin, ymin, xmax, ymax = box.cpu().numpy()
        plt.gca().add_patch(plt.Rectangle((xmin, ymin), xmax - xmin, ymax - ymin, fill=False, color='green', linewidth=2))

    # Plot predicted boxes
    for box, label in zip(pred_boxes, pred_labels):
        xmin, ymin, xmax, ymax = box.cpu().numpy()
        plt.gca().add_patch(plt.Rectangle((xmin, ymin), xmax - xmin, ymax - ymin, fill=False, color='red', linewidth=2))
        plt.text(xmin, ymin, f"Label {label.item()}", color='red', fontsize=12, bbox=dict(facecolor='white', alpha=0.5))

    plt.show()



# Training the model
num_epochs = 1
model.train()
for epoch in range(num_epochs):
    average_train_loss, average_classifier_loss, average_box_reg_loss = train_one_epoch(model, train_dataloader, optimizer, epoch, lr_scheduler)
    avg_iou = evaluate(model, test_dataloader)
    # avg_loss, accuracy = train_one_epoch(model, train_dataloader, optimizer, epoch, lr_scheduler)
    # avg_iou = evaluate(model, test_dataloader)
    # print(f"Epoch {epoch} - Training Loss: {average_train_loss:.4f} - Training IoU: {accuracy:.4f} - Validation IoU: {avg_iou:.4f}")
    print(f"Epoch {epoch} - Training Loss: {average_train_loss:.4f} - Trrainig IoU: {average_classifier_loss:.4f} - Validation IoU: {avg_iou:.4f}}}")



Epoch 0 - Training...


RuntimeError: Input type (torch.FloatTensor) and weight type (torch.cuda.FloatTensor) should be the same or input should be a MKLDNN tensor and weight is a dense tensor