# **Pipeline for object detection and tracking in 3D:**

### **Required packages:**

In [2]:
#!pip install -r requirements.txt 
#!pip install setuptools==69.2.0
import torch
import numpy as np
import cv2
from matplotlib import pyplot as plt
import os
from itertools import zip_longest

from pathlib import Path
from models.common import DetectMultiBackend
from utils.general import (Profile, cv2, non_max_suppression, scale_boxes, xyxy2xywh)
from utils.augmentations import letterbox
from utils.torch_utils import select_device, smart_inference_mode
from sort import Sort

### **1. Definition of Objects and Functions**

##### 2D Object detection : YOLOV5

In [3]:
# DEFINE PARAMETERS:

weights="weights/X-704.pt"
source_1="data/view1"
source_2="data/view2"
device="cpu"
project="runs"
name="exp"
save_path = "outputs"

image_size=(512, 512)
conf_thres=0.25
max_det=20
line_thickness=2
iou_thres=0.45

save_txt=True
save_csv=False
nosave=False
hide_labels=False 
hide_conf=True

# DEFINE PARAMETERS FOR SORT:
sort_max_age = 5 
sort_min_hits = 2
sort_iou_thresh = 0.2
track_color_id = 0

In [48]:
class ObjectDetector():
    
    def __init__(self, device, weights, source_1, source_2, image_size, save_path):
        self.device = select_device(device)
        self.model = DetectMultiBackend(weights, self.device)
        self.model.names = dict(list(self.model.names.items())[:2] + list(self.model.names.items())[4:])
        self.stride, self.names, self.pt = self.model.stride, self.model.names, self.model.pt
        self.imgsz = image_size
        self.model.warmup(imgsz=(1 , 3, *self.imgsz))
        self.dt = Profile(device=self.device)
        self.source_1 = Path(source_1)
        self.source_2 = Path(source_2)
        self.files_1 = [f for f in self.source_1.glob('*') if f.suffix.lower() in ['.jpg', '.jpeg', '.png']]
        self.files_2 = [f for f in self.source_2.glob('*') if f.suffix.lower() in ['.jpg', '.jpeg', '.png']]
        
        self.save_path = save_path
        if not os.path.exists(self.save_path):
            os.makedirs(self.save_path)
        self.track_color_id = 0
        self.sort_tracker = Sort(max_age=sort_max_age, min_hits=sort_min_hits, iou_threshold=sort_iou_thresh) 
        
    def __iter__(self):
        for file_1, file_2 in zip_longest(self.files_1, self.files_2, fillvalue=None):
            img_1 = cv2.imread(str(file_1))
            img_2 = cv2.imread(str(file_2))
            yield (img_1, img_2)
    
    def draw_boxes(self, img, bbox, identities=None, categories=None, names=None, offset=(0, 0)):
        for i, box in enumerate(bbox):
            x1, y1, x2, y2 = [int(i) for i in box]
            x1 += offset[0]
            x2 += offset[0]
            y1 += offset[1]
            y2 += offset[1]
            id = int(identities[i]) if identities is not None else 0
            data = (int((box[0]+box[2])/2),(int((box[1]+box[3])/2)))
            label = str(id)

            color = self.compute_color_for_labels(id)
            (w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 1)
            cv2.rectangle(img, (x1, y1), (x2, y2),color, 2)
            cv2.rectangle(img, (x1, y1 - 20), (x1 + w, y1), (255,191,0), -1)
            cv2.putText(img, label, (x1, y1 - 5),cv2.FONT_HERSHEY_SIMPLEX, 0.6, 
            [255, 255, 255], 1)
            cv2.circle(img, data, 3, color,-1)

        return img
    
    @smart_inference_mode()
    def detect_object_2D(self, im, frame_num, save_images):
        with self.dt:
            image_original = im
            im = letterbox(im, self.imgsz, stride=self.stride, auto=True)[0]
            im = im.transpose((2, 0, 1))[::-1]
            im = np.ascontiguousarray(im)
            im = torch.from_numpy(im).to(self.device)
            im = im.float().unsqueeze(0)
            im /= 255
                        
            pred = self.model(im)
            pred = non_max_suppression(pred)
            
            for det in pred:
                gn = torch.tensor(image_original.shape)[[1, 0, 1, 0]] #Normalization
                det[:, :4] = scale_boxes(im.shape[2:], det[:, :4], image_original.shape).round() # Rescale to original size
                
                dets_to_sort = np.empty((0, 6))
                for *xyxy, conf, cls in reversed(det):
                    if cls not in [0, 1, 4]:
                        continue
                    
                    c = int(cls)
                    coords = ((xyxy2xywh(torch.tensor(xyxy).view(1, 4)) / gn).view(-1).tolist())  # Normalized xywh
                    #{conf:.2f}", *coords
                    # Agregar detección a dets_to_sort para clases 0 y 1
                    x1, y1, x2, y2 = [int(coord) for coord in xyxy]
                    dets_to_sort = np.vstack((dets_to_sort, np.array([x1, y1, x2, y2, conf, c])))
                            
                # Run SORT:
                tracked_dets = self.sort_tracker.update(dets_to_sort)
                tracks = self.sort_tracker.getTrackers()
                
                #OUTPUT:
                for track in tracks:
                    if save_images:
                        color = self.compute_color_for_labels(self.track_color_id)
                        [cv2.line(image_original, (int(track.centroidarr[i][0]),int(track.centroidarr[i][1])), 
                                (int(track.centroidarr[i+1][0]),int(track.centroidarr[i+1][1])),
                                color, thickness=3) for i,_ in  enumerate(track.centroidarr) 
                                if i < len(track.centroidarr)-1 ] 
                        self.track_color_id += 1
                    line = (f"{frame_num} {track.id} {self.names[track.detclass]} {0 if track.time_since_update < 5 else 1} {'-'} {'-'}")
                    print(line)
                    
                if len(tracked_dets)>0:
                    bbox_xyxy = tracked_dets[:,:4]
                    identities = tracked_dets[:, 8]
                    categories = tracked_dets[:, 4]
                    self.draw_boxes(image_original, bbox_xyxy, identities, categories, self.names)
            
            cv2.imwrite(str(self.save_path + f"/{frame_num}.png"), image_original)

    def compute_color_for_labels(self, label, palette = (2 ** 11 - 1, 2 ** 15 - 1, 2 ** 20 - 1)):
        color = [int(int(p * (label ** 2 - label + 1)) % 255) for p in palette]
        return tuple(color)
    
    def create_video(self, images_dir, output_video_path, fps=30):
        image_files = sorted([f for f in os.listdir(images_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg'))])

        if not image_files:
            raise ValueError("No se encontraron imágenes en el directorio especificado.")

        # Leer la primera imagen para obtener el tamaño del video
        first_image_path = os.path.join(images_dir, image_files[0])
        first_image = cv2.imread(first_image_path)
        height, width, _ = first_image.shape

        # Definir el códec y el objeto VideoWriter
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Codec para archivos .mp4
        video_writer = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))

        # Escribir cada imagen en el archivo de video
        for image_file in image_files:
            image_path = os.path.join(images_dir, image_file)
            frame = cv2.imread(image_path)
            video_writer.write(frame)

        # Liberar el VideoWriter
        video_writer.release()
        print(f"Video creado exitosamente: {output_video_path}")

In [49]:
detector = ObjectDetector(device, weights, source_1, source_2, image_size, save_path)

YOLOv5  v7.0-383-g1435a8ee Python-3.10.5 torch-2.5.1+cpu CPU

Fusing layers... 
Model summary: 484 layers, 88417530 parameters, 0 gradients


##### Z ESTIMATION:

In [None]:
def calculate_distance_to_object(left_center, right_center, im_left, im_right, focal_length = 707.0493, baseline = 0.06):

    # Calculate disparity (horizontal pixel difference between the left and right image)
    disparity = abs(left_center[0] - right_center[0])
    
    if disparity == 0:
        return float('inf')
    
    Z = (focal_length * baseline) / disparity
    
    return Z

In [50]:
for i, images in enumerate(detector):
    detector.detect_object_2D(images[0], frame_num=i, save_images=False)
    #detector.detect_object_2D(images[1], save_path=output, frame_num=i)

0 486 pedestrian 0 - -
0 487 pedestrian 0 - -
0 488 pedestrian 0 - -
0 489 pedestrian 0 - -
0 490 pedestrian 0 - -
1 486 pedestrian 0 - -
1 487 pedestrian 0 - -
1 488 pedestrian 0 - -
1 489 pedestrian 0 - -
1 490 pedestrian 0 - -
1 491 vehicle 0 - -
2 486 pedestrian 0 - -
2 487 pedestrian 0 - -
2 488 pedestrian 0 - -
2 489 pedestrian 0 - -
2 490 pedestrian 0 - -
2 491 vehicle 0 - -
2 492 pedestrian 0 - -
2 493 pedestrian 0 - -
2 494 cyclist 0 - -
2 495 pedestrian 0 - -
3 486 pedestrian 0 - -
3 487 pedestrian 0 - -
3 488 pedestrian 0 - -
3 489 pedestrian 0 - -
3 490 pedestrian 0 - -
3 491 vehicle 0 - -
3 492 pedestrian 0 - -
3 493 pedestrian 0 - -
3 494 cyclist 0 - -
3 495 pedestrian 0 - -
3 496 pedestrian 0 - -
4 486 pedestrian 0 - -
4 487 pedestrian 0 - -
4 488 pedestrian 0 - -
4 489 pedestrian 0 - -
4 490 pedestrian 0 - -
4 491 vehicle 0 - -
4 492 pedestrian 0 - -
4 493 pedestrian 0 - -
4 494 cyclist 0 - -
4 495 pedestrian 0 - -
4 496 pedestrian 0 - -
5 486 pedestrian 0 - -
5 487 ped

KeyboardInterrupt: 

In [117]:
detector.create_video(images_dir="./outputs", output_video_path="./output_video.mp4", fps=30)

Video creado exitosamente: ./output_video.mp4


In [None]:
while True:
    # 1. New frame (2)
    frame = get_next_frame()
    
    # 2. Yolo detection in 2 stereo images
    detections_2d = yolo_model.detect(frame)
    
    # 3.3D reconstruction (center x,y in 2 stereo images)
    detections_3d = []
    for bbox in detections_2d:

        Z = disparity_to_depth(disparity_map[bbox_center_y1, bbox_center_x1, bbox_center_y2, bbox_center_x2 ])

        X = (bbox_center_x - cx) * Z / fx
        Y = (bbox_center_y - cy) * Z / fy
        detections_3d.append(point_3d)
    
    # 4 detect oclusion (if oclusion kalman, if not linear interpolation of line tracker.)
    
        # 4A. UPDATE tracker
        active_tracks = KALMAN.tracker.update(detections_3d)
        
        # 5A. visualize results:
        for track_id, track in active_tracks.items():
            position = track['kalman'].state[:3]  # Posición filtrada
            velocity = track['kalman'].state[3:]  # Velocidad estimada
            draw_track(frame, position, track_id)
            
        #4A linear interpolation