# 1. Detector de personas y vehículos

Para conseguir localizar y hacer seguimiento de personas y vehículos en un vídeo, se utiliza YOLOv11 en su versión *nano* como detector de objetos. Se carga el vídeo de entrada y se configura el modelo con las clases de interés (`person`, `bicycle`, `car` y `bus`) junto con un umbral mínimo de confianza para filtrar detecciones poco fiables.  

A medida que se procesan los fotogramas, se generan cuadros delimitadores sobre los objetos detectados y se asigna un identificador de seguimiento único a cada instancia. Esto permite mantener el seguimiento de cada objeto a lo largo del vídeo, incluso cuando puede quedar parcialmente ocluido.  

Todas las detecciones y sus características se registran en un archivo CSV, mientras que el vídeo resultante se guarda anotado con los delimitadores.


In [None]:
import csv
from ultralytics import YOLO


VIDEO_IN = "videos/C0142.mp4"
PROJECT = "outputs"
NAME = "C0142_annotated"
CSV_OUT = "C0142_tracks.csv"

MODEL_WEIGHTS = "yolo11n.pt"
TRACKER_CFG = "botsort.yaml"
CONF_THRESH = 0.25
CLASSES = {"person", "bicycle", "car", "bus"}

model = YOLO(MODEL_WEIGHTS)

csv_f = open(CSV_OUT, "w", newline="", encoding="utf-8")
csv_writer = csv.writer(csv_f)
csv_writer.writerow(["fotograma", "tipo_objeto", "confianza", "identificador_tracking", "x1", "y1", "x2", "y2"])

frame_idx = 0
stream = model.track(
    source=VIDEO_IN,
    tracker=TRACKER_CFG,
    persist=True,
    stream=True,
    save=True,
    project=PROJECT,
    name=NAME
)

unique_tracks = {k: set() for k in CLASSES}
total_detections = {k: 0 for k in CLASSES}

for results in stream:
    frame_idx += 1
    boxes = results.boxes
    if boxes is None:
        continue

    xyxy = boxes.xyxy.cpu().numpy() if hasattr(boxes, "xyxy") else []
    confs = boxes.conf.cpu().numpy() if hasattr(boxes, "conf") else []
    cls_ids = boxes.cls.cpu().numpy().astype(int) if hasattr(boxes, "cls") else []
    try:
        ids = boxes.id.cpu().numpy().astype(int)
    except Exception:
        ids = [-1] * len(xyxy)

    names = results.names if hasattr(results, "names") else model.names

    for i, box in enumerate(xyxy):
        conf = float(confs[i])
        if conf < CONF_THRESH:
            continue
        cls_name = names.get(int(cls_ids[i]), str(int(cls_ids[i])))
        if cls_name not in CLASSES:
            continue
        track_id = int(ids[i]) if i < len(ids) else -1
        x1, y1, x2, y2 = map(int, box)
        identifier = f"{cls_name}_{track_id}" if track_id >= 0 else f"{cls_name}_-1"

        csv_writer.writerow([frame_idx, cls_name, f"{conf:.4f}", identifier, x1, y1, x2, y2])

        total_detections[cls_name] = total_detections.get(cls_name, 0) + 1
        if track_id >= 0:
            unique_tracks[cls_name].add(track_id)

csv_f.close()

print("Resumen:")
for c in CLASSES:
    print(f" {c}: {total_detections.get(c,0)} detecciones, {len(unique_tracks.get(c,set()))} tracks únicos")
print("Vídeo anotado guardado por Ultralytics en la carpeta:", PROJECT, "/", NAME)
print("CSV:", CSV_OUT)


## Resultados del detector de personas y vehículos

Después de procesar todo el vídeo, se presentan los resultados en términos de detecciones y tracks únicos por clase (`bicycle`, `person`, `car`, `bus`).  

La salida de la ejecución de la celda da:

```text
bicycle: 49 detecciones, 3 tracks únicos
person: 3387 detecciones, 48 tracks únicos
car: 21841 detecciones, 250 tracks únicos
bus: 221 detecciones, 9 tracks únicos
```

# 2. Detector de matrículas

Para poder extraer información de las matrículas de los vehículos, primero es necesario localizarlas en el vídeo. Para ello se utiliza un modelo YOLO entrenado específicamente para detectar la clase `plate`.  

El modelo se configura para procesar únicamente la clase de interés, y se realiza un seguimiento de cada matrícula para registrar sus coordenadas y nivel de confianza en un CSV. A medida que se procesan los fotogramas, cada detección se asigna a un identificador que permite localizar la misma matrícula a lo largo del vídeo.  

In [None]:
import csv
import numpy as np
from ultralytics import YOLO

VIDEO_IN = "videos/C0142.mp4"
PROJECT = "outputs"
NAME = "C0142_annotated"
CSV_OUT = "C0142_plate_tracks.csv"

MODEL_WEIGHTS = "best.pt"
TRACKER_CFG = "botsort.yaml"
CONF_THRESH = 0.25
CLASSES = {"plate"}

model = YOLO(MODEL_WEIGHTS)

id2name = model.names
name2id = {v: k for k, v in id2name.items()}

desired_class_ids = set()
for cls_name in CLASSES:
    if cls_name in name2id:
        desired_class_ids.add(int(name2id[cls_name]))
    else:
        print(f"!! Clase '{cls_name}' no encontrada en model.names.")

print("IDs filtrados (clase -> id):", {id2name[i]: i for i in desired_class_ids})

with open(CSV_OUT, "w", newline="", encoding="utf-8") as csv_f:
    csv_writer = csv.writer(csv_f)
    csv_writer.writerow(["fotograma", "tipo_objeto", "confianza", "identificador_tracking", "x1", "y1", "x2", "y2"])

    frame_idx = 0

    stream = model.track(
        source=VIDEO_IN,
        tracker=TRACKER_CFG,
        persist=True,
        stream=True,
        save=True,
        project=PROJECT,
        name=NAME
    )

    # métricas
    total_detections = {k: 0 for k in CLASSES}
    unique_tracks = {k: set() for k in CLASSES}

    for results in stream:
        frame_idx += 1

        boxes = results.boxes
        xyxy = boxes.xyxy.cpu().numpy()                 # shape (N,4)
        confs = boxes.conf.cpu().numpy()                # shape (N,)
        cls_ids = boxes.cls.cpu().numpy().astype(int)   # shape (N,)
        if boxes.id is None:
            continue
        ids = boxes.id.cpu().numpy().astype(int)        # shape (N,)

        # Por si acaso trabajamos con el mínimo común
        n = min(xyxy.shape[0], confs.shape[0], cls_ids.shape[0], ids.shape[0])

        for i in range(n):
            conf = float(confs[i])
            if conf < CONF_THRESH:
                continue

            cls_id = int(cls_ids[i])
            # filtrar id
            if cls_id not in desired_class_ids:
                continue

            cls_name = id2name[cls_id]
            track_id = int(ids[i])
            x1, y1, x2, y2 = map(int, xyxy[i])
            identifier = f"{cls_name}_{track_id}" if track_id >= 0 else f"{cls_name}_-1"

            csv_writer.writerow([frame_idx, cls_name, f"{conf:.4f}", identifier, x1, y1, x2, y2])

            total_detections[cls_name] = total_detections.get(cls_name, 0) + 1
            if track_id >= 0:
                unique_tracks[cls_name].add(track_id)

print("----- RESUMEN -----")
for c in CLASSES:
    print(f"{c}: {total_detections.get(c,0)} detecciones, {len(unique_tracks.get(c,set()))} tracks únicos")


# 3. Comparativa de OCRs

Para determinar qué modelo de OCR es más adecuado para reconocer matrículas, se realiza una comparativa entre EasyOCR y PaddleOCR. Se procesan imágenes recortadas de matrículas previamente detectadas y se normaliza el texto para eliminar espacios y caracteres no deseados.  

Cada OCR predice el texto de la matrícula y se calcula la precisión utilizando la distancia de Levenshtein entre la matrícula real y la predicción obtenida. Además, se registra el tiempo de inferencia de cada modelo.  

In [None]:
import os
import time
import csv
import re
from typing import Tuple

import easyocr
from paddleocr import PaddleOCR

def normalize_plate(s: str) -> str:
    """
    Normaliza el texto de matrícula:
    - .strip()
    - elimina espacios
    - deja solo letras y dígitos
    - pasa a mayúsculas
    """
    if s is None:
        return ''
    s = s.strip()
    # eliminar todo lo que NO sea letra o dígito
    s = re.sub(r'[^A-Za-z0-9]+', '', s)
    return s.upper()

def levenshtein(a: str, b: str) -> int:
    """Distancia de Levenshtein (implementación iterativa)."""
    if a == b:
        return 0
    la, lb = len(a), len(b)
    if la == 0:
        return lb
    if lb == 0:
        return la
    # usar solo dos filas para memoria
    prev = list(range(lb + 1))
    cur = [0] * (lb + 1)
    for i in range(1, la + 1):
        cur[0] = i
        for j in range(1, lb + 1):
            cost = 0 if a[i - 1] == b[j - 1] else 1
            cur[j] = min(prev[j] + 1,          # borrado
                         cur[j - 1] + 1,       # inserción
                         prev[j - 1] + cost)   # sustitución
        prev, cur = cur, prev
    return prev[lb]

def accuracy_from_distance(true: str, pred: str) -> float:
    """
    Calcula una tasa de acierto en [0.0, 1.0] a partir de la distancia
    de Levenshtein normalizada por la longitud máxima entre true y pred.
    """
    t = normalize_plate(true)
    p = normalize_plate(pred)
    maxlen = max(len(t), len(p))
    if maxlen == 0:
        return 0.0
    dist = levenshtein(t, p)
    acc = max(0.0, 1.0 - (dist / maxlen))
    return acc

# ----

IMAGES_DIR = 'plates-images'
OUTPUT_CSV = 'plates_ocr_results.csv'

easy_reader = easyocr.Reader(['en'], gpu=False)
paddle = PaddleOCR(use_angle_cls=True, lang='en')

with open(OUTPUT_CSV, 'w', newline='', encoding='utf-8') as fcsv:
    writer = csv.writer(fcsv)
    writer.writerow([
        'plate_number',
        'easyocr_predict',
        'easyocr_accuracy',
        'easyocr_time',
        'paddleocr_predict',
        'paddle_accuracy',
        'paddle_time'
    ])

    files = sorted(os.listdir(IMAGES_DIR))
    for fname in files:
        img_path = os.path.join(IMAGES_DIR, fname)
        plate_true = os.path.splitext(fname)[0]

        t0 = time.time()
        res_easy = easy_reader.readtext(img_path)   # lista de (bbox, text, prob)
        easy_time = time.time() - t0

        # (bbox, text, prob)
        easy_pred_raw = ''.join([t for (_, t, _) in res_easy]) if res_easy else ''
        easy_pred = normalize_plate(easy_pred_raw)

        print(f"[easyocr]: '{easy_pred}' tiempo: {easy_time:.3f}s")


        t0 = time.time()
        paddle_res0 = paddle.predict(img_path)[0]
        paddle_time = time.time() - t0

        rec_texts = paddle_res0.get('rec_texts', [])

        paddle_pred_raw = ''.join(rec_texts) if rec_texts else ''
        paddle_pred = normalize_plate(paddle_pred_raw)
        print(f"[paddleocr]: '{paddle_pred}' tiempo: {paddle_time:.3f}s\n")

        easy_acc = accuracy_from_distance(plate_true, easy_pred)
        paddle_acc = accuracy_from_distance(plate_true, paddle_pred)

        writer.writerow([
            plate_true,
            easy_pred,
            f"{easy_acc:.4f}",
            f"{easy_time:.4f}",
            paddle_pred,
            f"{paddle_acc:.4f}",
            f"{paddle_time:.4f}"
        ])

print(f"CSV guardado en: {OUTPUT_CSV}")

# 3.1 Extracción de matrículas en vídeo de ejemplo

Para obtener las matrículas directamente desde el vídeo, se combinan los resultados del detector de matrículas con OCR. Se carga el vídeo y las coordenadas de las matrículas detectadas previamente, y se recorta la región de interés correspondiente a cada placa en cada fotograma.  

A continuación, se aplica EasyOCR para reconocer el texto de la matrícula, registrando la confianza de la predicción, el tiempo de procesamiento y el identificador de seguimiento en un archivo CSV.  

In [None]:
import cv2
import csv
import time
import easyocr


VIDEO_PATH = "videos/C0142.mp4"
CSV_PATH = "C0142_plate_tracks.csv"
OUTPUT_CSV = "C0142_plates_detected.csv"


reader = easyocr.Reader(['en'])


detections = []
with open(CSV_PATH, newline='', encoding='utf-8') as f:
    reader_csv = csv.DictReader(f)
    for row in reader_csv:
        row['fotograma'] = int(row['fotograma'])
        row['x1'] = int(row['x1'])
        row['y1'] = int(row['y1'])
        row['x2'] = int(row['x2'])
        row['y2'] = int(row['y2'])
        detections.append(row)

detections.sort(key=lambda x: x['fotograma'])

cap = cv2.VideoCapture(VIDEO_PATH)
if not cap.isOpened():
    raise RuntimeError("No se pudo abrir el video.")

fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(f"Video abierto ({total_frames} frames, {fps:.2f} fps)\n")

procesadas = set()
resultados = []

for det in detections:
    frame_num = det['fotograma']
    track_id = det['identificador_tracking']
    x1, y1, x2, y2 = det['x1'], det['y1'], det['x2'], det['y2']

    if track_id in procesadas:
        continue

    procesadas.add(track_id)
    print(f"\n Nueva matrícula detectada: {track_id} (frame {frame_num})")

    # Ir al frame indicado
    cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num)
    ret, frame = cap.read()
    if not ret:
        print(f"!! No se pudo leer el frame {frame_num}")
        continue

    h, w = frame.shape[:2]
    x1, y1 = max(0, x1), max(0, y1)
    x2, y2 = min(w - 1, x2), min(h - 1, y2)
    roi = frame[y1:y2, x1:x2]

    t0 = time.time()
    results = reader.readtext(roi)
    ocr_time = time.time() - t0

    if results:
        # Tomar el texto con mayor probabilidad
        best = max(results, key=lambda x: x[2])
        text = best[1].strip().replace(" ", "")
        prob = best[2]
        print(f"   Texto detectado: '{text}'  (confianza: {prob:.3f}, tiempo: {ocr_time:.3f}s)")
    else:
        text = ""
        prob = 0.0
        print(f"   No se detectó texto. Tiempo: {ocr_time:.3f}s")

    resultados.append({
        "identificador_tracking": track_id,
        "frame": frame_num,
        "texto": text,
        "confianza": prob,
        "tiempo": ocr_time
    })

cap.release()

with open(OUTPUT_CSV, 'w', newline='', encoding='utf-8') as f:
    writer = csv.DictWriter(f, fieldnames=["identificador_tracking", "frame", "texto", "confianza", "tiempo"])
    writer.writeheader()
    writer.writerows(resultados)

print(f"\nProcesamiento finalizado. Resultados guardados en: {OUTPUT_CSV}")