<a href="https://www.kaggle.com/code/rabbi2k3/rickshaw-detection-with-pytorch-and-fasterrcnn?scriptVersionId=216471180" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [25]:
import os
import warnings
from glob import glob

import torch
import torch.nn as nn
import numpy as np
import cv2
import torchvision
from PIL import Image
from bs4 import BeautifulSoup
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from torchvision.transforms import v2
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from skimage import io

warnings.filterwarnings("ignore")

## Add utility functions

In [26]:
def collate_fn(batch):
    return tuple(zip(*batch))


def get_model_instance_segmentation(num_classes, pretrained=True):
    # load a model pre-trained on COCO
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=pretrained)

    # replace the classifier with a new one, that has
    # num_classes which is user-defined
    # get number of input features for the classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    # replace the pre-trained head with a new one
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    # model.train()

    return model


def generate_box(obj):
    x_min = int(obj.find('xmin').text)
    y_min = int(obj.find('ymin').text)
    x_max = int(obj.find('xmax').text)
    y_max = int(obj.find('ymax').text)
    return [x_min, y_min, x_max, y_max]


def generate_label(obj):
    if 'rikshaw' in obj.find('name').text.lower():
        return 1
    return 0


def get_transform():
    # Define the transform pipeline, including resizing
    transform = transforms.Compose([        
        transforms.ToTensor(),         # Convert image to tensor
        transforms.RandomHorizontalFlip(p=0.5),
        # transforms.ToDtype(torch.float32, scale=True),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize
    ])
    return transform

## Create Rickshaw Dataset class

In [27]:
class RickshawDataset(Dataset):
    def __init__(self, dataset_dir, transform=None, target_size=(112, 112)):
        self.transform = transform
        self.dataset_dir = dataset_dir
        self.target_size = target_size
        self.images = list(sorted(glob(os.path.join(self.dataset_dir, '*.jpg'))))
        self.annotations = list(sorted(glob(os.path.join(self.dataset_dir, '*.xml'))))

    def __getitem__(self, index):           
        img_path = self.images[index]
        # print(f"image path: {img_path}")

        # Read image using OpenCV
        img = cv2.imread(img_path)                   
        original_width, original_height = img.shape[1], img.shape[0]  # (width, height)

        img = cv2.resize(img, self.target_size)
        
        # Convert to RGB
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB).astype(np.float32)
        # Resize image        
        img /= 255.0        

        # transform Torchvision
        resized_img = self.transform(img)

        # Calculate scaling factors
        scale_width = self.target_size[0] / original_width
        scale_height = self.target_size[1] / original_height        

        # Generate target and resize bounding boxes
        target = self.__generate_target(index, self.annotations[index], (scale_width, scale_height))

        del img

        return resized_img, target

    def __len__(self):
        return len(self.images)

    def __resize_box(self, box, original_size):
        """
        Resize a bounding box according to the new image dimensions.
        Args:
            box (list): [x_min, y_min, x_max, y_max] bounding box.
            original_size (tuple): (original_width, original_height) of the image.
        Returns:
            list: Resized bounding box.
        """
        original_width, original_height = original_size
        target_width, target_height = self.target_size

        x_min, y_min, x_max, y_max = box
        x_min = x_min * target_width / original_width
        x_max = x_max * target_width / original_width
        y_min = y_min * target_height / original_height
        y_max = y_max * target_height / original_height

        return [x_min, y_min, x_max, y_max]

    # def __generate_target(self, image_id, file, original_size):
    #     with open(file) as f:
    #         data = f.read()
    #         soup = BeautifulSoup(data, 'lxml')
    #         objects = soup.find_all('object')

    #         boxes = []
    #         labels = []
    #         for obj in objects:
    #             box = generate_box(obj)
    #             resized_box = self.__resize_box(box, original_size)
    #             boxes.append(resized_box)
    #             labels.append(generate_label(obj))

    #         # Convert everything into a torch.Tensor
    #         boxes = torch.as_tensor(boxes, dtype=torch.float32)
    #         labels = torch.as_tensor(labels, dtype=torch.int64)

    #         img_id = torch.tensor([image_id])
    #         area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
    #         iscrowd = torch.zeros((len(objects),), dtype=torch.int64)

    #         # Annotation is in dictionary format
    #         target = {
    #             "boxes": boxes,
    #             "labels": labels,
    #             "image_id": img_id,
    #             "area": area,
    #             "iscrowd": iscrowd,
    #         }
    #         return target

    def __generate_target(self, image_id, file, original_size):
        with open(file) as f:
            data = f.read()
    
        soup = BeautifulSoup(data, 'lxml')
        objects = soup.find_all('object')
    
        # Use list comprehensions for efficiency
        boxes = [self.__resize_box(generate_box(obj), original_size) for obj in objects]
        labels = [generate_label(obj) for obj in objects]
   
        # Convert lists to tensors via NumPy arrays
        boxes = torch.from_numpy(np.array(boxes, dtype=np.float32))       
        labels = torch.from_numpy(np.array(labels, dtype=np.int64))
    
        # Constant tensors
        img_id = torch.tensor([image_id])

        area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])       
        iscrowd = torch.zeros((len(objects),), dtype=torch.int64)
    
        # Annotation is in dictionary format
        target = {
            "boxes": boxes,
            "labels": labels,
            "image_id": img_id,
            "area": area,
            "iscrowd": iscrowd,
        }
        # print(target)
        return target

Create Rickshaw Train class 

In [28]:
class Averager:
    def __init__(self):
        self.current_total = 0.0
        self.iterations = 0.0

    def send(self, value):
        self.current_total += value
        self.iterations += 1

    @property
    def value(self):
        if self.iterations == 0:
            return 0
        else:
            return 1.0 * self.current_total / self.iterations

    def reset(self):
        self.current_total = 0.0
        self.iterations = 0.0

In [29]:
class TrainRickshaw:
    def __init__(self):
        self.train_iterator = None
        self.valid_iterator = None
        self.test_iterator = None

        self.batch_size = 16

        # Number of training epochs
        self.num_epochs = 100

        # Learning rate
        self.lr = 0.0001

        # Initiate net
        model = get_model_instance_segmentation(2)
        
        # set device
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        
        # # Wrap model in DataParallel for multi-GPU support
        # if torch.cuda.device_count() > 1:
        #     print(f"Using {torch.cuda.device_count()} GPUs!")
        #     device_ids = list(range(torch.cuda.device_count()))
        #     # self.net = nn.DataParallel(model, device_ids = [0,1]) #.to(self.device)
        #     self.net = nn.DataParallel(model, device_ids=device_ids)
        # else:
        #     self.net = model.to(self.device)

        self.net = model.to(self.device)
        # self.net = model
        self.net.to(self.device)

        # set optimizer
        params = [p for p in self.net.parameters() if p.requires_grad]
        self.optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)
        self.lr_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(self.optimizer, T_max=self.num_epochs)


    def load_dataset(self):
        dataset_dir = "/kaggle/input/annotated-rickshaw-images-from-bangladesh/RIckshaw Data/"
        train_datasets = RickshawDataset(
            dataset_dir=dataset_dir,
            transform=get_transform())

        self.train_iterator = DataLoader(dataset=train_datasets,
                                         shuffle=True,
                                         num_workers=8,
                                         batch_size=self.batch_size,
                                         collate_fn=collate_fn)

        print('Load data done!')

    def train_data(self):    
        loss_hist = Averager()
        for epoch in range(self.num_epochs):  # loop over the dataset multiple times
            torch.cuda.empty_cache()
            print(f'starting epoch: {epoch}')
            print(f"Is CUDA available? {torch.cuda.is_available()}")
            print(f"Number of GPUs: {torch.cuda.device_count()}")
            print(f"Current device: {torch.cuda.current_device()}")
            loss_hist.reset()
            self.net.train()

            i = 0
            epoch_loss = 0
            for imgs, annotations in self.train_iterator:
                # print(f'Processing batch: {i} ...')
                # print([img.shape for img in imgs])
                i += 1

                # batch_size = len(imgs)
                # num_gpus = torch.cuda.device_count()
                # assert batch_size % num_gpus == 0, "Batch size must be divisible by the number of GPUs."
                
                # print([img.shape for img in imgs])  # Ensure all images have 3 channels
                imgs = [img.to(self.device) for img in imgs]
                # print([img.shape for img in imgs])  # Ensure all images have 3 channels
                # imgs = list(img for img in imgs)
                annotations = [{k: v.to(self.device) for k, v in t.items()} for t in annotations]
                # annotations = [{k: v for k, v in t.items()} for t in annotations]

                # print(f'Total images: {len(imgs)}')
             
                loss_dict = self.net(imgs, annotations)                
                losses = sum(loss for loss in loss_dict.values())

                loss_value = losses.item()
                loss_hist.send(loss_value)

                self.optimizer.zero_grad()
                losses.backward()
                self.optimizer.step()               

            # print(f'Epoch: {epoch} Loss: {epoch_loss}')
            print(f"Epoch #{epoch} loss: {loss_hist.value}")
            self.lr_scheduler.step()

        print('Finished Training')

        # save model
        model_path = os.path.abspath(
            '/kaggle/working/rickshaw_net.pth')
        torch.save(self.net.state_dict(), model_path)

Run Train

In [None]:
if __name__ == '__main__':   
    trainer = TrainRickshaw()
    trainer.load_dataset()
    trainer.train_data()

Load data done!
starting epoch: 0
Is CUDA available? True
Number of GPUs: 1
Current device: 0
