In [1]:
import cv2
import numpy as np

def initialize_video_capture(video_path):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise ValueError("Error opening video file")
    return cap

def get_video_properties(cap):
    return (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),
            int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)),
            int(cap.get(cv2.CAP_PROP_FPS)))

def initialize_video_writer(output_path, fourcc, fps, width, height):
    return cv2.VideoWriter(output_path, fourcc, fps, (width, height))

def save_logs(logs_path, data):
    with open(logs_path, "a") as f:
        # Extraemos los datos del diccionario y los guardamos en el archivo
        for key, value in data.items():
            f.write(f"{key}: {value}\n")

def detect_shoplifting(frame, model, confidence_threshold=0.8):
    """
    Detecta si una persona está robando en el frame utilizando el modelo de detección de robo.
    
    Args:
        frame (numpy.ndarray): El frame de video.
        model (YOLO): El modelo YOLO para detección de robo.
        confidence_threshold (float): Umbral de confianza para considerar una detección válida.
    
    Returns:
        numpy.ndarray: El frame con las anotaciones de detección de robo.
    """

    # Colores para diferenciar a las personas
    ROBBERY_COLOR = (0, 0, 255)  # Rojo para personas robando
    NORMAL_COLOR = (0, 255, 0)    # Verde para personas normales

    # Definir estados de detección
    shoplifting_status = "Robando"
    not_shoplifting_status = "No robando"
    result = model.predict(frame)
    cc_data = np.array(result[0].boxes.data)

    if len(cc_data) != 0:
        xywh = np.array(result[0].boxes.xywh).astype("int32")
        xyxy = np.array(result[0].boxes.xyxy).astype("int32")
        
        for (x1, y1, x2, y2), (_, _, _, _), (_, _, _, _, conf, clas) in zip(xyxy, xywh, cc_data):
            if conf >= confidence_threshold:  # Solo considerar detecciones con alta confianza
                if clas == 1:  # Clase 1: Robo
                    color = ROBBERY_COLOR  # Rojo para personas robando
                    status = shoplifting_status
                else:  # Clase 0: No robo
                    color = NORMAL_COLOR  # Verde para personas normales
                    status = not_shoplifting_status

                # Dibujar el cuadro alrededor de la persona
                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)

                # Mostrar la confianza como texto
                text = f"{status} {conf * 100:.2f}%"
                cv2.putText(frame, text, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
    
    return frame

In [2]:
import os
import cv2
import numpy as np
import time
import json
from ultralytics import YOLO
from ultralytics.utils.plotting import Annotator, colors

video_path = "../data/par_nieve.mp4"
model_path = "../src/models/yolo11n.pt"
output_path = "../outputs/par_nieve.mp4"

In [22]:
# Load the YOLO model
model = YOLO(model_path)

# Open the video file and get properties
cap = initialize_video_capture(video_path)
w, h, fps = get_video_properties(cap)

# Initialize video writer
out = initialize_video_writer(output_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, w, h)

# Function to configure the line and directions
def configure_line(line_type, line_position, entry_direction, exit_direction):
    """
    Configure the line and directions for entry and exit.
    
    Args:
        line_type (str): "horizontal" or "vertical".
        line_position (int): Position of the line (Y for horizontal, X for vertical).
        entry_direction (str): Direction for entry ("up", "down", "left", "right").
        exit_direction (str): Direction for exit ("up", "down", "left", "right").
    
    Returns:
        tuple: Line start and end points, entry and exit conditions.
    """
    if line_type == "horizontal":
        line_start = (0, line_position)
        line_end = (w, line_position)
    else:  # Vertical
        line_start = (line_position, 0)
        line_end = (line_position, h)

    # Define conditions for entry and exit based on direction
    def crossed_line(current_pos, last_pos, line_pos, direction):
        if line_type == "horizontal":
            last_y, current_y = last_pos[1], current_pos[1]
            if direction == "down":
                return last_y < line_pos and current_y >= line_pos
            elif direction == "up":
                return last_y > line_pos and current_y <= line_pos
        else:  # Vertical
            last_x, current_x = last_pos[0], current_pos[0]
            if direction == "right":
                return last_x < line_pos and current_x >= line_pos
            elif direction == "left":
                return last_x > line_pos and current_x <= line_pos
        return False

    return line_start, line_end, crossed_line

# Configure the line and directions
line_type = "horizontal"  # Change to "vertical" if needed
line_position = h // 2 + 125  # Middle of the frame
entry_direction = "up"  # Enter when moving down
exit_direction = "down"  # Exit when moving up

line_start, line_end, crossed_line = configure_line(line_type, line_position, entry_direction, exit_direction)

# Initialize variables
prev_time = time.time()
last_positions = {}  # Track last positions of people
entry_times = {}  # Track entry times for each person
time_spent_list = []  # List to store time spent inside the area

while True:
    ret, frame = cap.read()
    if not ret:
        print("End of video.")
        break

    annotator = Annotator(frame, line_width=2)
    results = model.track(frame, persist=True)

    # Draw the line
    cv2.line(frame, line_start, line_end, (0, 255, 0), 2)  # Green line

    if results[0].boxes.id is not None:
        boxes = results[0].boxes.xyxy.cpu().numpy()
        confs = results[0].boxes.conf.cpu().numpy()
        class_ids = results[0].boxes.cls.cpu().numpy()
        track_ids = results[0].boxes.id.int().cpu().tolist()

        for box, conf, class_id, track_id in zip(boxes, confs, class_ids, track_ids):
            # Check if the detected object is a person (class_id == 0)
            if int(class_id) == 0:  # 0 is typically the class_id for "person" in YOLO
                label = f"{model.names[int(class_id)]} {conf:.2f} ID: {track_id}"
                annotator.box_label(box, label, color=colors(track_id, True))

                # Calculate the centroid of the bounding box
                centroid_x = int((box[0] + box[2]) / 2)
                centroid_y = int((box[1] + box[3]) / 2)
                current_pos = (centroid_x, centroid_y)

                # Draw the centroid
                cv2.circle(frame, (centroid_x, centroid_y), 5, (0, 0, 255), -1)  # Red dot

                # Check if the person crossed the line
                if track_id in last_positions:
                    # Check for entry
                    if crossed_line(current_pos, last_positions[track_id], line_position, entry_direction):
                        if track_id not in entry_times:
                            entry_times[track_id] = time.time()  # Record entry time
                            print(f"Person {track_id} entered from {entry_direction} at {entry_times[track_id]}")

                    # Check for exit
                    if crossed_line(current_pos, last_positions[track_id], line_position, exit_direction):
                        if track_id in entry_times:
                            exit_time = time.time()
                            time_spent = exit_time - entry_times[track_id]
                            time_spent_list.append(time_spent)  # Add to the list
                            print(f"Person {track_id} exited to {exit_direction} after {time_spent:.2f} seconds")
                            del entry_times[track_id]  # Remove from entry times

                # Update last position
                last_positions[track_id] = current_pos

    current_time = time.time()
    fps = 1 / (current_time - prev_time)
    prev_time = current_time

    # Display statistics on the frame
    cv2.putText(frame, f"FPS: {fps:.2f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
    cv2.putText(frame, f"People in area: {len(entry_times)}", (10, 70), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)

    out.write(frame)
    cv2.imshow("object-detection-tracking", frame)

    if cv2.waitKey(1) & 0xFF == ord("q"):
        break

# Calculate the average time spent inside the area
average_time_spent = sum(time_spent_list) / len(time_spent_list) if time_spent_list else 0

# Save the average time to a JSON file
with open("average_time_log.json", "w") as f:
    json.dump({"average_time_spent": average_time_spent}, f, indent=4)

print(f"Average time spent inside the area: {average_time_spent:.2f} seconds")

out.release()
cap.release()
cv2.destroyAllWindows()


0: 384x640 2 persons, 83.3ms
Speed: 4.4ms preprocess, 83.3ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 persons, 149.4ms
Speed: 2.0ms preprocess, 149.4ms inference, 2.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 persons, 77.3ms
Speed: 4.0ms preprocess, 77.3ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 persons, 77.6ms
Speed: 4.6ms preprocess, 77.6ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 persons, 76.5ms
Speed: 5.0ms preprocess, 76.5ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 persons, 77.9ms
Speed: 4.7ms preprocess, 77.9ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 persons, 73.4ms
Speed: 3.3ms preprocess, 73.4ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 persons, 69.5ms
Speed: 3.0ms preprocess, 69.5ms inference, 2.0ms postprocess per image at shape