# Object Detection

In [1]:
!git clone https://github.com/ultralytics/yolov5

!pip install -r https://raw.githubusercontent.com/ultralytics/yolov5/master/requirements.txt

fatal: destination path 'yolov5' already exists and is not an empty directory.


Defaulting to user installation because normal site-packages is not writeable




In [2]:
import cv2 as cv
import os
import random
import torch
import torch.nn as nn
import torchvision
from torchvision import models
from torch.utils.data import Dataset, DataLoader
import albumentations as A
import torch.nn.functional as F
from albumentations.pytorch import ToTensorV2
import xml.etree.ElementTree as ET
from utils import ModelTrainer, displayImage, importImage, plotTrainingHistory
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor


In [3]:
# Get train and validation datasets
images_directory = "../dataset/images"
annotations_directory = "../dataset/annotations"

train_split = 0.8

with open("train.txt") as train:
    train_images_filenames_total = train.read().splitlines()

    split_idx = int(train_split * len(train_images_filenames_total))
    train_images_filenames = train_images_filenames_total[:split_idx]
    val_images_filenames = train_images_filenames_total[split_idx:]


with open("test.txt") as test:
    test_images_filenames = test.read().splitlines()

# Filter out images that can not be loaded properly
train_images_filenames = [i for i in train_images_filenames if cv.imread(os.path.join(images_directory, i + ".png")) is not None]
val_images_filenames = [i for i in val_images_filenames if cv.imread(os.path.join(images_directory, i + ".png")) is not None]
test_images_filenames = [i for i in test_images_filenames if cv.imread(os.path.join(images_directory, i + ".png")) is not None]

random.seed(42)
random.shuffle(train_images_filenames)
random.shuffle(val_images_filenames)
random.shuffle(test_images_filenames)

print(len(train_images_filenames), len(val_images_filenames), len(test_images_filenames))

490 123 264


## Convert to YOLO annotations

In [4]:
import shutil
classes = {
    "trafficlight": 0,
    "stop": 1,
    "speedlimit": 2,
    "crosswalk": 3,
}

os.makedirs("../dataset/labels", exist_ok=True)

os.makedirs("dataset/images/train", exist_ok=True)
os.makedirs("dataset/images/val", exist_ok=True)
os.makedirs("dataset/images/test", exist_ok=True)
os.makedirs("dataset/labels/train", exist_ok=True)
os.makedirs("dataset/labels/val", exist_ok=True)
os.makedirs("dataset/labels/test", exist_ok=True)


for img in train_images_filenames:
    shutil.copy(
        f"../dataset/images/{img}.png", f"dataset/images/train/{img}.png")

for img in val_images_filenames:
    shutil.copy(
        f"../dataset/images/{img}.png", f"dataset/images/val/{img}.png")

for img in test_images_filenames:
    shutil.copy(
        f"../dataset/images/{img}.png", f"dataset/images/test/{img}.png")


for path in os.listdir(annotations_directory):
    with open(os.path.join(annotations_directory, path), "r") as xml:
        folder = ""
        if path.strip(".xml") in train_images_filenames:
            folder = "train"
        elif path.strip(".xml") in val_images_filenames:
            folder = "val"
        else:
            folder = "test"


        with open(f"dataset/labels/{folder}/{path.replace('xml', 'txt')}", "w") as txt:
            tree = ET.parse(xml)
            root = tree.getroot()

            size = root.find("size")
            height = int(size.find("height").text)
            width = int(size.find("width").text)

            objects = root.findall("object")

            lines = []
            for object in objects:
                class_index = classes[object.find("name").text]

                xmin = int(object.find("bndbox/xmin").text)
                ymin = int(object.find("bndbox/ymin").text)
                xmax = int(object.find("bndbox/xmax").text)
                ymax = int(object.find("bndbox/ymax").text)

                # middle of bbox
                bbox_x = ((xmax + xmin) / 2) / width
                bbox_y = ((ymax + ymin) / 2) / height
                bbox_width = (xmax - xmin) / width
                bbox_height = (ymax - ymin) / height

                lines.append(
                    f"{class_index} {bbox_x} {bbox_y} {bbox_width} {bbox_height}")

            txt.write("\n".join(lines))


## Dataset

In [5]:
classes = {
    "trafficlight": 0,
    "stop": 1,
    "speedlimit": 2,
    "crosswalk": 3,
}


class TrafficSignDataset(Dataset):
    def __init__(self, annotations_directory, images_filenames, images_directory, transform=None):
        self.annotations_directory = annotations_directory
        self.images_filenames = images_filenames
        self.images_directory = images_directory
        self.transform = transform

    def __len__(self):
        return len(self.images_filenames)

    def __getitem__(self, idx):
        image_filename = self.images_filenames[idx]
        image = cv.imread(os.path.join(
            self.images_directory, image_filename + ".png"))
        image = cv.cvtColor(image, cv.COLOR_BGR2RGB)

        image = image / 255.

        boxes, labels = self._get_boxes_and_labels(image_filename)

        image_id = torch.tensor([idx])
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)

        target = {}
        target["boxes"] = boxes
        target["labels"] = labels
        target["image_id"] = image_id

        if self.transform is not None:
            transformed = self.transform(
                image=image, bboxes=target["boxes"], labels=target["labels"])
            image = transformed["image"]
            target["boxes"] = transformed["bboxes"]
            target["labels"] = transformed["labels"]
            target["boxes"] = torch.as_tensor(target["boxes"], dtype=torch.float32)
            target["labels"] = torch.as_tensor(target["labels"], dtype=torch.int64)

        return image.float(), target

    # https://pytorch.org/tutorials/intermediate/torchvision_tutorial.html
    def _get_boxes_and_labels(self, filename):
        boxes = []
        labels = []
        with open(os.path.join(self.annotations_directory, filename + ".xml")) as xml:
            tree = ET.parse(xml)
            root = tree.getroot()

            objects = root.findall("object")
            for object in objects:
                class_index = classes[object.find("name").text]

                xmin = int(object.find("bndbox/xmin").text)
                ymin = int(object.find("bndbox/ymin").text)
                xmax = int(object.find("bndbox/xmax").text)
                ymax = int(object.find("bndbox/ymax").text)

                boxes.append([xmin, ymin, xmax, ymax])
                labels.append(class_index)

        return boxes, labels


In [6]:
num_classes = len(classes)

# Get CPU or GPU device for training
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")

batch_size = 32
num_workers = 0  # how many processes are used to load the data

train_transform = A.Compose(
    [
        A.Resize(256, 256),
        A.RandomCrop(224, 224),
        A.ShiftScaleRotate(shift_limit=0.2, scale_limit=0.2,
                           rotate_limit=30, p=0.5),
        ToTensorV2(),
    ],
    bbox_params=A.BboxParams(format="pascal_voc", label_fields=['labels'])
)

val_transform = A.Compose(
    [A.Resize(256, 256), A.CenterCrop(224, 224), ToTensorV2()],
    bbox_params=A.BboxParams(format="pascal_voc", label_fields=['labels'])
)

train = TrafficSignDataset(annotations_directory,
                           train_images_filenames, images_directory, train_transform)
val = TrafficSignDataset(annotations_directory,
                         val_images_filenames, images_directory, val_transform)
test = TrafficSignDataset(annotations_directory,
                          test_images_filenames, images_directory)

train_dataloader = DataLoader(
    train, batch_size=batch_size, shuffle=True, num_workers=num_workers, drop_last=True, collate_fn=lambda batch: tuple(zip(*batch)))
val_dataloader = DataLoader(val, batch_size=batch_size,
                            shuffle=False, num_workers=num_workers, drop_last=False, collate_fn=lambda batch: tuple(zip(*batch)))
test_dataloader = DataLoader(
    test, batch_size=1, shuffle=False, num_workers=num_workers, drop_last=False)


Using cuda device


## Two-Stage

In [7]:
model = torchvision.models.detection.fasterrcnn_mobilenet_v3_large_fpn(pretrained=True)

in_features = model.roi_heads.box_predictor.cls_score.in_features

model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
model.to(device)

from torchmetrics import JaccardIndex
metric = JaccardIndex(num_classes=num_classes)

In [19]:
def train_one_epoch(dataloader, model, epoch, optimizer=None, is_train=True):
    if is_train:
        assert optimizer is not None, "When training, please provide an optimizer."

    num_batches = len(dataloader)

    if is_train:
        model.train()
    else:
        model.eval()

    lr_scheduler = None
    if epoch == 0:
        warmup_factor = 1.0 / 1000
        warmup_iters = min(1000, len(dataloader) - 1)

        lr_scheduler = torch.optim.lr_scheduler.LinearLR(
            optimizer, start_factor=warmup_factor, total_iters=warmup_iters
        )

    total_loss = 0.0

    with torch.set_grad_enabled(is_train):
        for images, targets in tqdm(dataloader):

            images = list(image.to(device) for image in images)
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            loss_dict = model(images, targets)
            losses = sum(loss for loss in loss_dict.values())
            # loss_value = losses_reduced.item()
            # losses_reduced = sum(loss for loss in loss_dict.values())

            if is_train:
                optimizer.zero_grad()
                losses.backward()
                optimizer.step()

                if lr_scheduler is not None:
                    lr_scheduler.step()

            # IMPORTANT: call .item() to obtain the value of the loss WITHOUT the computational graph attached
            total_loss += losses.item()
            # total_jaccard += metric(final_pred.cpu(), target.cpu())

        return total_loss / num_batches # , total_jaccard / num_batches


def evaluate(dataloader, model, metric):
    model.eval()
    
    with torch.set_grad_enabled(False):
        for images, targets in tqdm(dataloader):
            images = list(image.to(device) for image in images)

            outputs = model(images)
            outputs = [{k: v.to("cpu") for k, v in t.items()} for t in outputs]

            print(outputs)
            res = {target["image_id"].item(): output for target, output in zip(targets, outputs)}
            # print(res)

            metric.update(outputs, targets)
        
    return metric

In [20]:
import numpy as np
from tqdm import tqdm
from torchmetrics.detection.mean_ap import MeanAveragePrecision

# torch.cuda.empty_cache()

# construct an optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005,
                            momentum=0.9, weight_decay=0.0005)
# and a learning rate scheduler
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer,
                                                step_size=3,
                                                gamma=0.1)

metric = MeanAveragePrecision()

# let's train it for 10 epochs
num_epochs = 10


train_history = {'loss': []}
val_history = {'meanap': []}
best_val_loss = np.inf

# for epoch in range(num_epochs):
#     # train for one epoch, printing every 10 iterations
#     train_one_epoch()
#     train_one_epoch(model, optimizer, data_loader, device, epoch, print_freq=10)
#     # evaluate on the test dataset
#     evaluate(model, data_loader_test, device=device)



print("Start training...")
for t in range(num_epochs):
  print(f"\nEpoch {t+1}")
  train_loss = train_one_epoch(train_dataloader, model, t, optimizer)
  print(f"Train loss: {train_loss:.3f}")
  # val_loss = train_one_epoch(val_dataloader, model, t, optimizer, is_train=False)
  # meanap = evaluate(val_dataloader, model, metric)
  # print(f"Val loss: {val_loss:.3f}")
  print(meanap)

  # save model when val loss improves
  # if val_loss < best_val_loss:
  #   best_val_loss = val_loss
  #   save_dict = {'model': model.state_dict(), 'optimizer': optimizer.state_dict(), 'epoch': t}
  #   torch.save(save_dict, 'best_model.pth')

  # # save latest model
  # save_dict = {'model': model.state_dict(), 'optimizer': optimizer.state_dict(), 'epoch': t}
  # torch.save(save_dict, 'latest_model.pth')

  # save training history for plotting purposes
  train_history["loss"].append(train_loss)
  # train_history["jaccard"].append(train_jaccard)

  val_history["meanap"].append(meanap)
  # val_history["jaccard"].append(val_jaccard)

Start training...

Epoch 1


 40%|████      | 6/15 [00:44<01:06,  7.38s/it]


ValueError: Expected target boxes to be a tensor of shape [N, 4], got torch.Size([0]).

## One-Stage

In [17]:
# Might need to be run on linux/WSL
!python3 yolov5/train.py --batch 10 --epochs 30 --data trafficsigns.yaml
# python yolov5/train.py --batch -1 --epochs 3 --data trafficsigns.yaml --workers 0

SyntaxError: invalid syntax (3741266108.py, line 1)

In [None]:
# Detection
!python3 yolov5/detect.py --weights yolov5/runs/train/exp22/weights/best.pt --img 640 --conf 0.25 --source dataset/images/train/road2.png

In [8]:
# yolo_model = torch.hub.load('ultralytics/yolov5', 'yolov5s', channels=3, classes=4, autoshape=False)
# yolo_model.to(device)

# yolo = {
#     "model": yolo_model,
#     "name": 'yolov5s',
#     "num_epochs": 10,
#     "loss": nn.CrossEntropyLoss(),  # already includes the Softmax activation
#     "optimizer": torch.optim.SGD(yolo_model.parameters(), lr=1e-3)
# }

# print(yolo_model)

    exitcode = _main(fd, parent_sentinel)
  File "c:\Program Files\Python310\lib\multiprocessing\spawn.py", line 125, in _main
    prepare(preparation_data)
  File "c:\Program Files\Python310\lib\multiprocessing\spawn.py", line 236, in prepare
    _fixup_main_from_path(data['init_main_from_path'])
  File "c:\Program Files\Python310\lib\multiprocessing\spawn.py", line 287, in _fixup_main_from_path
    main_content = runpy.run_path(main_path,
  File "c:\Program Files\Python310\lib\runpy.py", line 269, in run_path
    return _run_module_code(code, init_globals, run_name,
  File "c:\Program Files\Python310\lib\runpy.py", line 96, in _run_module_code
    _run_code(code, mod_globals, init_globals,
  File "c:\Program Files\Python310\lib\runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "c:\Users\Ricardo\Desktop\VC2022\Part2\yolov5\train.py", line 26, in <module>
    import torch
  File "C:\Users\Ricardo\AppData\Roaming\Python\Python310\site-packages\torch\__init__.py", line 