## Cyclist Detection

This notebook will finetune models using Ultralytics YOLOv11 to detect cyclists

In [1]:
# Imports
import os
import random

import numpy as np
import torch
from typing import List, Tuple, Type
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset, TensorDataset
from sklearn.model_selection import train_test_split

import colorsys
from PIL import Image, ImageFont, ImageDraw
import imghdr

%pip install ultralytics
from ultralytics import YOLO

import cv2

!pip install -q memory_profiler

  import imghdr


Collecting ultralytics
  Downloading ultralytics-8.3.89-py3-none-any.whl.metadata (35 kB)
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.14-py3-none-any.whl.metadata (9.4 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch>=1.8.0->ultralytics)
  Downloading nv

In [7]:
# Delete data.zip and /content/data/
import shutil, gdown, os
if os.path.exists('data'):
    shutil.rmtree('data')
if os.path.exists('data.zip'):
    !rm data.zip

# Redownload from Drive
data_path = "data.zip"
print("Downloading data.zip...")
# url = "https://drive.google.com/file/d/1h3KDDFFmtW9CJGjK9WYJ9SHO4apSP0QK/view?usp=sharing" # URL to CT dataset
url = "https://drive.google.com/file/d/1x5uYOX9qLZI4mkcK5UdnkLurzzav7pgC/view?usp=sharing" #URL to CIMAT dataset
gdown.download(url, output=data_path, fuzzy=True)

# Unzip data
!unzip -q "/content/$data_path"

Downloading data.zip...


Downloading...
From (original): https://drive.google.com/uc?id=1x5uYOX9qLZI4mkcK5UdnkLurzzav7pgC
From (redirected): https://drive.google.com/uc?id=1x5uYOX9qLZI4mkcK5UdnkLurzzav7pgC&confirm=t&uuid=03aeb5a7-96c3-47ef-8a59-51b979415701
To: /content/data.zip
100%|██████████| 5.98G/5.98G [01:18<00:00, 76.1MB/s]


replace cimat-cyclist-master-research-object_detection-images-cyclist-orientation/research/object_detection/images/cyclist-orientation/.gitkeep? [y]es, [n]o, [A]ll, [N]one, [r]ename: A


In [15]:
# Import model
import os
from google.colab import drive
drive.mount('/content/drive')

base_path = "/content/drive/MyDrive"
folder = "CyclistDetectionModel/yolo"
file_name = "CT_original.pt" # EDIT FILENAME HERE BASED ON YOUR MODEL NAME, "<model_name>.pt"
# yaml_name = "data.yaml" #.yaml extension for CT dataset
yaml_name = "cimat.yaml" #.yaml extension for CIMAT dataset

MODEL_PATH = os.path.join(base_path, folder, file_name)
YAML_PATH = os.path.join(base_path, folder, yaml_name)
print(MODEL_PATH)
print(YAML_PATH)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
/content/drive/MyDrive/CyclistDetectionModel/yolo/CT_original.pt
/content/drive/MyDrive/CyclistDetectionModel/yolo/cimat.yaml


In [4]:
%load_ext memory_profiler

### LEGACY: Data Processor

In [5]:
class DataProcessor(Dataset):
    def __init__(self, image_dir: str = '/data/images', label_dir: str = '/data/labels'):
        self.image_dir = image_dir
        self.label_dir = label_dir
        self.image_paths = [os.path.join(image_dir, fname) for fname in os.listdir(image_dir) if fname.endswith('.jpg')]
        self.label_paths = [os.path.join(label_dir, fname) for fname in os.listdir(label_dir) if fname.endswith('.txt')]

        # Transform the images to tensors:
        self.transform = transforms.Compose([
            transforms.ToTensor(),
        ])

        self.images = []
        self.labels = []

        self.images, self.labels = self.load_data()

    def __len__(self):
        return len(self.image_paths)

    def load_data(self) -> Tuple[List[torch.Tensor], List[torch.Tensor]]:
        # Iterate over each image and label file
        for img_path, label_path in zip(self.image_paths, self.label_paths):

            # Loading images
            try:
                image = self.transform(Image.open(img_path).convert('RGB'))
                self.images.append(image)
            except Exception as e:
                print(f'Error opening image: {img_path}')
                continue

            # Loading labels/bounding boxes
            bounding_boxes = []
            try:
                with open(label_path, 'r') as f:
                    for line in f:
                        # Iterate over each line in the label file
                        label: list[str] = line.strip().split()
                        if len(label) == 5:
                            try:
                                # Compute each bounding box
                                class_id = int(label[0])
                                bbox_values = [float(x) for x in label[1:]]
                                bounding_box = torch.tensor([class_id+1] + bbox_values, dtype=torch.float32) #class_id = 1 corresponds to cyclist in Ultralytics YOLO
                                bounding_boxes.append(bounding_box)
                            except ValueError:
                                print(f"invalid value in label line: {line} in {label_path}")

                # Stack the tensors of bounding boxes
                if bounding_boxes:
                    labels = torch.stack(bounding_boxes)
                else:
                    labels = torch.empty((0, 5), dtype=torch.float32)
            except Exception as e:
                print(f'Error opening label: {label_path}')
                continue

            self.labels.append(labels)

        return self.images, self.labels

    def __getitem__(self, idx: int) -> Tuple[torch.Tensor, torch.Tensor]:
        return self.images[idx], self.labels[idx]

    def split_data(self, test_size: float = 0.2, random_state: int = 42):
        self.train_images, self.val_images, self.train_labels, self.val_labels = train_test_split(
            self.images, self.labels, test_size=test_size, random_state=random_state
        )

    def get_train_data(self) -> Tuple[List[torch.Tensor], List[torch.Tensor]]:
        return self.train_images, self.train_labels

    def get_val_data(self) -> Tuple[List[torch.Tensor], List[torch.Tensor]]:
        return self.val_images, self.val_labels

### XML to TXT Handler
CIMAT-Cyclist Dataset is not separated into `labels` folders and labels are in `.xml` format. This handler takes all the `.xml` label files and compiles them into a separate `labels` directory with `.txt` conversion for Ultralytics YOLO compatibility.

In [8]:
import os
import shutil
import xml.etree.ElementTree as ET

# Define original directories for train and test
original_dirs = {
    "train": "cimat-cyclist-master-research-object_detection-images-cyclist-orientation/research/object_detection/images/cyclist-orientation/train",
    "test": "cimat-cyclist-master-research-object_detection-images-cyclist-orientation/research/object_detection/images/cyclist-orientation/test"
}

# Define new nested directories
nested_dirs = {
    "train": os.path.join(os.path.dirname(original_dirs["train"]), "training"),
    "test": os.path.join(os.path.dirname(original_dirs["test"]), "testing")
}

# Create nested directories and move train/test into them
for key in ["train", "test"]:
    os.makedirs(nested_dirs[key], exist_ok=True)
    new_path = os.path.join(nested_dirs[key], key)
    shutil.move(original_dirs[key], new_path)
    original_dirs[key] = new_path  # Update path to point to the new nested structure

# Create corresponding labels directories inside nested_train/ and nested_test/
labels_dirs = {}
for key in ["train", "test"]:
    labels_dir = os.path.join(nested_dirs[key], "labels")  # Create labels inside nested_train/ and nested_test/
    labels_dirs[key] = labels_dir
    os.makedirs(labels_dir, exist_ok=True)

# Function to convert XML annotations to YOLO format (all cyclists → class 0)
def convert_xml_to_txt(xml_path, txt_path):
    tree = ET.parse(xml_path)
    root = tree.getroot()

    # Image dimensions
    width = int(root.find("size/width").text)
    height = int(root.find("size/height").text)

    with open(txt_path, "w") as f:
        for obj in root.findall("object"):
            bbox = obj.find("bndbox")

            # Bounding box coordinates
            xmin = int(bbox.find("xmin").text)
            ymin = int(bbox.find("ymin").text)
            xmax = int(bbox.find("xmax").text)
            ymax = int(bbox.find("ymax").text)

            # Convert to YOLO format
            x_center = (xmin + xmax) / 2 / width
            y_center = (ymin + ymax) / 2 / height
            bbox_width = (xmax - xmin) / width
            bbox_height = (ymax - ymin) / height

            # Write to file with class ID 0 for all cyclists
            f.write(f"0 {x_center:.6f} {y_center:.6f} {bbox_width:.6f} {bbox_height:.6f}\n")

# Process XML files in train and test directories
for key in ["train", "test"]:
    combined_dir = original_dirs[key]
    labels_dir = labels_dirs[key]

    for file in os.listdir(combined_dir):
        if file.endswith(".xml"):
            xml_path = os.path.join(combined_dir, file)
            txt_filename = file.replace(".xml", ".txt")
            txt_path = os.path.join(labels_dir, txt_filename)

            # Move XML to labels directory first
            new_xml_path = os.path.join(labels_dir, file)
            shutil.move(xml_path, new_xml_path)

            # Convert XML to YOLO TXT format
            convert_xml_to_txt(new_xml_path, txt_path)

            # Delete XML file after conversion
            os.remove(new_xml_path)

# Move the label directories

# Test: Print a sample .txt file from the train labels directory
test_files = [f for f in os.listdir(labels_dirs["train"]) if f.endswith(".txt")]
if test_files:
    test_file_path = os.path.join(labels_dirs["train"], test_files[0])
    print(f"Contents of {test_files[0]}:")
    with open(test_file_path, "r") as test_file:
        print(test_file.read())

print("Processing complete!")


Contents of 12433.txt:
0 0.726318 0.497074 0.216309 0.817118

Processing complete!


### YOLO

In [11]:
class YOLO_Detection():
    def __init__(self, model_path: str=MODEL_PATH):
        self.CLASSES: list[str] = ['person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train', 'truck', 'boat', 'traffic light',
           'fire hydrant', 'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow',
           'elephant', 'bear', 'zebra', 'giraffe', 'backpack', 'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee',
           'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove', 'skateboard', 'surfboard',
           'tennis racket', 'bottle', 'wine glass', 'cup', 'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple',
           'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake', 'chair', 'couch',
           'potted plant', 'bed', 'dining table', 'toilet', 'tv', 'laptop', 'mouse', 'remote', 'keyboard', 'cell phone',
           'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'book', 'clock', 'vase', 'scissors', 'teddy bear',
           'hair drier', 'toothbrush']

        self.model = YOLO(model_path)

    def filter_boxes(self, box_confidence: torch.Tensor, boxes: torch.Tensor, box_class_probs: torch.Tensor, threshold: float = .6) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
        '''
        This function filters boxes using confidence and class probabilities and seeing if they lie above the certain threshold.
        '''

        # Compute the score of a box as the confidence that there's some object * the probability of it being in a certain class
        box_scores = box_confidence * box_class_probs

        box_classes = torch.argmax(box_scores, dim=-1)
        box_class_scores, _ = torch.max(box_scores, dim=-1, keepdim=False)
        filtering_mask = (box_class_scores >= threshold) # Only filter & keep boxes above the threshold

        # Convert scores to boolean values using the filtering mask
        scores = torch.masked_select(box_class_scores[filtering_mask])
        boxes = torch.masked_select(boxes[filtering_mask])
        classes = torch.masked_select(box_classes[filtering_mask])

        return scores, boxes, classes

    def iou(self, box1: Tuple[float, float, float, float], box2: Tuple[float, float, float, float]) -> float:
        '''
        Design IOU for non-max suppression (NMS) -- we want to use NMS to only select the most accurate (highest probability of the 3 boxes)
        '''
        (box1_x1, box1_y1, box1_x2, box1_y2) = box1
        (box2_x1, box2_y1, box2_x2, box2_y2) = box2

        # Compute intersections
        xi1 = np.maximum(box1[0], box2[0])
        yi1 = np.maximum(box1[1], box2[1])
        xi2 = np.minimum(box1[2], box2[2])
        yi2 = np.minimum(box1[3], box2[3])
        intersection_width = xi2 - xi1
        intersection_height = yi2 - yi1
        intersection_area = max(intersection_width, 0) * max(intersection_height, 0) #Case where areas do not intersect

        # Compute Union Area and return the iou
        box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1])
        box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1])
        union_area = box1_area + box2_area - intersection_area

        return float(intersection_area) / float(union_area)

    def non_max_suppression(self, scores: torch.Tensor, boxes: torch.Tensor, classes: torch.Tensor, max_boxes: int = 10, iou_threshold: float = 0.5) -> Tuple[torch.tensor, torch.tensor, torch.tensor]:
        '''
        Non-max suppression: Select the highest-score box, overlap the box and remove boxes that overlap significantly
        '''
        nms_detections: list = torch.ops.torchvision.nms(boxes, scores, iou_threshold)
        nms_detections = nms_detections[:max_boxes]

        return scores[nms_detections], boxes[nms_detections], classes[nms_detections]

    def train(self, resume: bool):
        '''
        Finetune the pre-trained model using .yaml file
        '''
        device = torch.device('cuda' if torch.cuda.is_available() else 0) # 0 for GPU

        if not resume:
          self.model.train(data=YAML_PATH, epochs=55, imgsz=640, batch=-1, device=device, patience=5) #Epochs, img_size, batch_size (-1 to find optimal), early_stopping
        else:
          self.model.train(resume=resume)
        self.model.export(format="onnx")

### LEGACY: Inference

Refer to Python `cyclist-cv.py` file for live inference updated code

In [None]:
class Inference():
    # Pass in a yolo class and model path
    def __init__(self, yolo: Type[object], model_path: str = 'yolo/yolo11n.onnx'):
        self.model = YOLO(model_path)
        self.CLASSES = yolo.CLASSES
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    def predict(self, video_src=0, score_threshold=0.6, iou_threshold=0.5, max_boxes=10, use_webcam=False):
        camera = cv2.VideoCapture(video_src)
        if not camera.isOpened():
            raise Exception("Could not open video device")

        if use_webcam:
            capture = cv2.VideoCapture(f'http://192.168.205.149:8080/video') #IP when connected to hotspot data
        else:
            capture = cv2.VideoCapture(0)

        while True:
            ret, frame = capture.read()
            if not ret:
                break

            # Run model prediction
            prediction = self.model(frame)

            # Evaluate the predictions
            scores, boxes, classes = self.evaluate(prediction, img_shape=(frame.shape[0], frame.shape[1]), max_boxes=max_boxes, score_threshold=score_threshold, iou_threshold=iou_threshold)

            # Draw the bounding boxes
            self.draw_boxes(frame, scores, boxes, classes, self.CLASSES, self.generate_colors(self.CLASSES))
            cv2.imshow("Cyclist Detection", frame)

            if (cv2.waitKey(1) & 0xFF == ord('q')):
                break

        capture.release()
        cv2.destroyAllWindows()

    # Evaluation functions
    def evaluate(self, model_output: Tuple[Tuple[torch.tensor, torch.tensor, torch.tensor, torch.tensor]], img_shape = (720., 1280.), max_boxes=10, score_threshold = 0.6, iou_threshold = 0.5) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
        # Unpack outputs of the model
        box_confidence, boxes, box_class_probs, classes = model_output

        # Convert the boxes to the corners
        boxes = self.boxes_to_corners(boxes)

        # Filter the boxes
        scores, boxes, classes = self.filter_boxes(box_confidence, boxes, box_class_probs, threshold=score_threshold)

        # Scale boxes to the original image shape
        boxes = self.scale_boxes(boxes, img_shape)

        # Perform and return non-max suppression
        return self.non_max_suppression(scores, boxes, classes, max_boxes, iou_threshold)

    def boxes_to_corners(boxes: torch.Tensor) -> torch.Tensor:
        '''
        Helper function to convert YOLO boxes to bounding box corners
        '''
        x_center, y_center, width, height = boxes[:, 0], boxes[:, 1], boxes[:, 2], boxes[:, 3]
        x_min = x_center - (width / 2)
        y_min = y_center - (height / 2)
        x_max = x_center + (width / 2)
        y_max = y_center + (height / 2)

        return torch.stack([x_min, y_min, x_max, y_max], dim=1)

    '''
    Helper functions for YOLO inference, drawing on webcam:
    '''
    def generate_colors(class_names):
        '''
        Generates random HSV --> RGB colors for each class
        '''
        hsv_tuples = [(x / len(class_names), 1., 1.) for x in range(len(class_names))]
        colors = list(map(lambda x: colorsys.hsv_to_rgb(*x), hsv_tuples))
        colors = list(map(lambda x: (int(x[0] * 255), int(x[1] * 255), int(x[2] * 255)), colors))
        random.seed(10101)  # Fixed seed for consistent colors across runs.
        random.shuffle(colors)  # Shuffle colors to decorrelate adjacent classes.
        random.seed(None)  # Reset seed to default.
        return colors

    def scale_boxes(boxes, image_shape):
        """
        Scales the predicted boxes in order to be drawable on the image
        """
        height = image_shape[0]
        width = image_shape[1]
        image_dims = torch.tensor([height, width, height, width])
        image_dims = torch.reshape(image_dims, [1, 4])
        boxes = boxes * image_dims
        return boxes

    def preprocess_frame(frame, model_image_size):
        '''
        Preprocess frame into data that can be inputted into the model
        '''
        image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        resized_image = image.resize(tuple(reversed(model_image_size)), Image.BICUBIC)
        image_data = np.array(resized_image, dtype='float32')
        image_data /= 255.
        image_data = np.expand_dims(image_data, 0)  # Add batch dimension.
        return image_data

    def draw_boxes(frame, out_scores, out_boxes, out_classes, class_names, colors):
        '''
        This function draws the bounding box with class labels/scores over the frame.
        '''
        thickness = (frame.shape[0] + frame.shape[1]) // 300

        for i, c in reversed(list(enumerate(out_classes))):
            predicted_class = class_names[c]
            box = out_boxes[i]
            score = out_scores[i]

            label = '{} {:.2f}'.format(predicted_class, score)

            top, left, bottom, right = box
            top = max(0, np.floor(top + 0.5).astype('int32'))
            left = max(0, np.floor(left + 0.5).astype('int32'))
            bottom = min(frame.shape[0], np.floor(bottom + 0.5).astype('int32'))
            right = min(frame.shape[1], np.floor(right + 0.5).astype('int32'))
            print(label, (left, top), (right, bottom))

            # Draw bounding box
            cv2.rectangle(frame, (left, top), (right, bottom), colors[c], thickness)

            # Draw label
            label_size, _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
            label_top = max(top, label_size[1])
            cv2.rectangle(frame, (left, label_top - label_size[1]), (left + label_size[0], label_top + 5), colors[c], cv2.FILLED)
            cv2.putText(frame, label, (left, label_top), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1)

        return frame


In [None]:
yolo = YOLO_Detection(model_path=MODEL_PATH)
train = True
resume = False

if train:
    yolo.train(resume=resume)

### Download model (if not Windows/Linux)

In [None]:
yolo.model.export(format="onnx", optimize=True, dynamic=True) #Downoad. onnx if on MacOS, since MacOS does not support TensorRT

### LEGACY: Sample Inference

In [None]:
inference = Inference(yolo)
inference.predict(video_src=0, score_threshold=0.6, iou_threshold=0.5, max_boxes=10, use_webcam=True)