In [2]:
import cv2
import numpy as np
import time
import imutils
import os
import csv
from datetime import datetime

In [3]:
YOLO_CONFIG = 'yolov4.cfg'
YOLO_WEIGHTS = 'yolov4.weights'
YOLO_CLASSES = 'coco.names'

In [4]:
# Database file
DB_FILE = 'object_database.csv'

# Detection parameters
CONFIDENCE_THRESHOLD = 0.5
NMS_THRESHOLD = 0.4
TARGET_CLASSES = ['car', 'truck', 'bus', 'motorcycle', 'bicycle', 'person']  # Objects we're interested in

# ROI parameters (same as original)
ROI_X, ROI_Y = 400, 200    # ROI position
ROI_WIDTH, ROI_HEIGHT = 500, 150  # ROI dimensions

# Tracking parameters
MAX_DISAPPEARED = 30  # Maximum number of frames an object can disappear before we delete its ID
MIN_DISTANCE = 50     # Minimum distance between centroids to consider it a new object


In [5]:
def is_colliding_with_counting_line(car_box, roi_height):
    """Check if car's bounding box is colliding with the counting line in the middle of ROI"""
    # Counting line is at the middle of ROI
    line_y = roi_height / 2
    
    # Car's top and bottom edges
    car_top = car_box["y"]
    car_bottom = car_box["y"] + car_box["height"]
    
    # Check if car is crossing the line
    return car_top <= line_y <= car_bottom

In [6]:
class ObjectTracker:
    def __init__(self):
        self.next_object_id = 0
        self.objects = {}  # Dictionary format: {ID: {"centroid": (x, y), "class": class_name, "disappeared": count, "counted": bool}}
        self.disappeared = {}
        self.object_counts = {}  # Will be populated as objects are detected
        self.total_count = 0  # Total objects counted
        self.displayed_classes = []  # Track which classes have been displayed
        
        # Initialize database file if it doesn't exist
        if not os.path.exists(DB_FILE):
            with open(DB_FILE, 'w', newline='') as file:
                writer = csv.writer(file)
                writer.writerow(['ID', 'Class', 'Timestamp', 'Confidence'])
    
    def register(self, centroid, class_name, confidence):
        """Register a new object"""
        object_id = f"{class_name}_{self.next_object_id}"
        self.objects[object_id] = {
            "centroid": centroid, 
            "class": class_name, 
            "disappeared": 0,
            "counted": False,
            "confidence": confidence,
            "first_seen": datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
        }
        self.disappeared[object_id] = 0
        self.next_object_id += 1
        
        # Add to database
        with open(DB_FILE, 'a', newline='') as file:
            writer = csv.writer(file)
            writer.writerow([
                object_id, 
                class_name, 
                datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                f"{confidence:.2f}"
            ])
        
        return object_id
    
    def deregister(self, object_id):
        """Deregister an object"""
        del self.objects[object_id]
        del self.disappeared[object_id]
    
    def update(self, detections):
        """Update object locations and handle new/missing objects"""
        # If no detections, mark all existing objects as disappeared
        if len(detections) == 0:
            for object_id in list(self.disappeared.keys()):
                self.disappeared[object_id] += 1
                self.objects[object_id]["disappeared"] = self.disappeared[object_id]
                
                # Deregister if disappeared for too long
                if self.disappeared[object_id] > MAX_DISAPPEARED:
                    self.deregister(object_id)
            
            return self.objects
        
        # Initialize centroids of current frame
        current_centroids = []
        current_classes = []
        current_confidences = []
        
        for detection in detections:
            (class_name, confidence, x, y, w, h) = detection
            cx = int(x + w/2.0)
            cy = int(y + h/2.0)
            current_centroids.append((cx, cy))
            current_classes.append(class_name)
            current_confidences.append(confidence)
        
        # If we have no existing objects, register all as new
        if len(self.objects) == 0:
            for i in range(len(current_centroids)):
                self.register(current_centroids[i], current_classes[i], current_confidences[i])
        else:
            # Get existing object IDs and centroids
            object_ids = list(self.objects.keys())
            object_centroids = [self.objects[object_id]["centroid"] for object_id in object_ids]
            
            # Compute distances between each pair of existing and new centroids
            D = self._calculate_distances(object_centroids, current_centroids)
            
            # Find the smallest value in each row, then sort by rows
            rows = D.min(axis=1).argsort()
            
            # Find the smallest value in each column, then sort by cols
            cols = D.argmin(axis=1)[rows]
            
            # Keep track of which rows and columns we've already examined
            used_rows = set()
            used_cols = set()
            
            # Loop over combinations of rows and columns
            for (row, col) in zip(rows, cols):
                # If we've already used this row or column, skip
                if row in used_rows or col in used_cols:
                    continue
                
                # If the distance is greater than our threshold, skip
                if D[row, col] > MIN_DISTANCE:
                    continue
                
                # Otherwise, get the object ID, set its new centroid, and reset disappeared
                object_id = object_ids[row]
                self.objects[object_id]["centroid"] = current_centroids[col]
                self.objects[object_id]["disappeared"] = 0
                self.disappeared[object_id] = 0
                
                # Check if object crossed the counting line (middle of ROI)
                centroid = current_centroids[col]
                if (centroid[1] > ROI_Y + ROI_HEIGHT/3 and 
                    centroid[1] < ROI_Y + 2*ROI_HEIGHT/3 and 
                    not self.objects[object_id]["counted"]):
                    self.objects[object_id]["counted"] = True
                    class_name = self.objects[object_id]["class"]
                    
                    # Initialize counter for this class if we haven't seen it before
                    if class_name not in self.object_counts:
                        self.object_counts[class_name] = 0
                        
                    self.object_counts[class_name] += 1
                    self.total_count += 1
                
                # Mark row and column as used
                used_rows.add(row)
                used_cols.add(col)
            
            # Compute set of unused row and column indexes
            unused_rows = set(range(D.shape[0])).difference(used_rows)
            unused_cols = set(range(D.shape[1])).difference(used_cols)
            
            # If we have more existing objects than new centroids, check if some disappeared
            if D.shape[0] >= D.shape[1]:
                for row in unused_rows:
                    object_id = object_ids[row]
                    self.disappeared[object_id] += 1
                    self.objects[object_id]["disappeared"] = self.disappeared[object_id]
                    
                    if self.disappeared[object_id] > MAX_DISAPPEARED:
                        self.deregister(object_id)
            # Otherwise, register each new object centroid as a new object
            else:
                for col in unused_cols:
                    self.register(current_centroids[col], current_classes[col], current_confidences[col])
        
        return self.objects
    
    def _calculate_distances(self, existing_centroids, new_centroids):
        """Calculate Euclidean distances between existing centroids and new centroids"""
        distances = np.zeros((len(existing_centroids), len(new_centroids)))
        
        for i, existing_centroid in enumerate(existing_centroids):
            for j, new_centroid in enumerate(new_centroids):
                distances[i, j] = np.sqrt(
                    (existing_centroid[0] - new_centroid[0]) ** 2 +
                    (existing_centroid[1] - new_centroid[1]) ** 2
                )
        
        return distances


In [7]:
def main():
    # Load YOLO
    print("Loading YOLO model...")
    try:
        # Check if files exist
        if not (os.path.exists(YOLO_CONFIG) and os.path.exists(YOLO_WEIGHTS) and os.path.exists(YOLO_CLASSES)):
            print(f"YOLO files not found. Please download and place in the current directory:")
            print(f"  - {YOLO_CONFIG}")
            print(f"  - {YOLO_WEIGHTS}")
            print(f"  - {YOLO_CLASSES}")
            return
        
        # Load YOLO network
        net = cv2.dnn.readNetFromDarknet(YOLO_CONFIG, YOLO_WEIGHTS)
        
        # Set preferred backend (CUDA if available)
        if cv2.cuda.getCudaEnabledDeviceCount() > 0:
            print("CUDA backend available")
            net.setPreferableBackend(cv2.dnn.DNN_BACKEND_CUDA)
            net.setPreferableTarget(cv2.dnn.DNN_TARGET_CUDA)
        else:
            print("Using CPU backend")
        
        # Get output layer names
        layer_names = net.getLayerNames()
        try:
            output_layers = [layer_names[i - 1] for i in net.getUnconnectedOutLayers()]
        except:
            output_layers = [layer_names[i[0] - 1] for i in net.getUnconnectedOutLayers()]
        
        # Load class names
        with open(YOLO_CLASSES, 'r') as f:
            classes = [line.strip() for line in f.readlines()]
        
    except Exception as e:
        print(f"Error loading YOLO model: {e}")
        return
    
    # Initialize video capture
    video_path = 'test_cars.mp4'
    video_cap = cv2.VideoCapture(video_path)
    
    # Create object tracker
    tracker = ObjectTracker()
    
    try:
        # Read first frame
        success, frame = video_cap.read()
        if not success:
            print("Failed to read from video source")
            return
        
        frame = cv2.resize(frame, (1920, 1080))
        
        while True:
            success, frame = video_cap.read()
            if not success:
                print("End of video or failed to read frame")
                break
            
            frame = cv2.resize(frame, (1920, 1080))
            
            # Extract ROI
            roi_frame = frame[ROI_Y:ROI_Y+ROI_HEIGHT, ROI_X:ROI_X+ROI_WIDTH].copy()
            
            # Prepare frame for YOLO
            height, width, _ = roi_frame.shape
            blob = cv2.dnn.blobFromImage(roi_frame, 1/255.0, (416, 416), swapRB=True, crop=False)
            
            # Set input and forward pass
            net.setInput(blob)
            layer_outputs = net.forward(output_layers)
            
            # Initialize lists
            boxes = []
            confidences = []
            class_ids = []
            detected_objects = []
            
            # Process detections
            for output in layer_outputs:
                for detection in output:
                    scores = detection[5:]
                    class_id = np.argmax(scores)
                    confidence = scores[class_id]
                    
                    if confidence > CONFIDENCE_THRESHOLD and classes[class_id] in TARGET_CLASSES:
                        # Object detected
                        center_x = int(detection[0] * width)
                        center_y = int(detection[1] * height)
                        w = int(detection[2] * width)
                        h = int(detection[3] * height)
                        
                        # Rectangle coordinates
                        x = int(center_x - w / 2)
                        y = int(center_y - h / 2)
                        
                        boxes.append([x, y, w, h])
                        confidences.append(float(confidence))
                        class_ids.append(class_id)
            
            # Apply non-max suppression
            indices = cv2.dnn.NMSBoxes(boxes, confidences, CONFIDENCE_THRESHOLD, NMS_THRESHOLD)
            
            # Prepare detections for tracker
            if len(indices) > 0:
                indices = indices.flatten()
                for i in indices:
                    box = boxes[i]
                    x, y, w, h = box
                    class_id = class_ids[i]
                    class_name = classes[class_id]
                    confidence = confidences[i]
                    
                    detected_objects.append((class_name, confidence, x, y, w, h))
            
            # Update tracker with detected objects
            # Update tracker with detected objects
            previous_counts = tracker.object_counts.copy()
            objects = tracker.update(detected_objects)
            
            # Check if any counts changed and update display flag
            counts_changed = any(previous_counts[cls] != tracker.object_counts[cls] 
                                for cls in tracker.object_counts)
                
            # Draw ROI
            cv2.rectangle(frame, (ROI_X, ROI_Y), (ROI_X + ROI_WIDTH, ROI_Y + ROI_HEIGHT), (0, 0, 255), 2)
            
            # Draw counting line
            cv2.line(frame, 
                        (ROI_X, ROI_Y + int(ROI_HEIGHT/2)), 
                        (ROI_X + ROI_WIDTH, ROI_Y + int(ROI_HEIGHT/2)), 
                        (255, 0, 0), 2)
            
            # Draw objects on frame
            for object_id, attributes in objects.items():
                # Get object details
                centroid = attributes["centroid"]
                class_name = attributes["class"]
                disappeared = attributes["disappeared"]
                counted = attributes["counted"]
                confidence = attributes.get("confidence", 0)
                
                # Adjust centroid to main frame coordinates
                centroid_x = centroid[0] + ROI_X
                centroid_y = centroid[1] + ROI_Y
                
                # Draw on main frame with more visible color (red)
                color = (0, 0, 255)  # Bright red for better visibility
                cv2.circle(frame, (centroid_x, centroid_y), 6, color, -1)  # Larger circle
                cv2.putText(frame, f"ID: {object_id}", (centroid_x - 10, centroid_y - 10),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
                
                # Draw on ROI frame for better visualization
                cv2.circle(roi_frame, centroid, 6, color, -1)  # Larger circle
                cv2.putText(roi_frame, f"{class_name}: {confidence:.2f}", (centroid[0] - 10, centroid[1] + 10),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
                
                # Display object counts with dynamic class visibility
                y_offset = 30
                cv2.putText(frame, f"Total Objects Passed: {tracker.total_count}", (10, y_offset),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 0, 0), 2)
                
                # Get list of classes that have been detected
                active_classes = [cls for cls, count in tracker.object_counts.items() if count > 0]
                
                # Create a dynamic counter display that only shows classes we've seen
                for class_name in active_classes:
                    y_offset += 35  # More spacing between lines
                    count = tracker.object_counts[class_name]
                    cv2.putText(frame, f"{class_name.capitalize()}: {count}", (10, y_offset),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
                
                # Show frames
                cv2.imshow("Object Detection and Tracking", frame)
                cv2.imshow("ROI", roi_frame)
                
                # Key handling
                key = cv2.waitKey(100) & 0xFF
                if key == 27 or key == ord('q'):  # ESC or 'q' key
                    print("User requested exit")
                    break
    
    except Exception as e:
        print(f"An error occurred: {e}")
        import traceback
        traceback.print_exc()
    
    finally:
        # Cleanup
        print("Cleaning up resources")
        video_cap.release()
        cv2.destroyAllWindows()
        for i in range(1, 5):
            cv2.waitKey(1)

In [None]:
main()

Loading YOLO model...
Using CPU backend


2025-03-23 13:26:21.413 python[86886:2067662] +[IMKClient subclass]: chose IMKClient_Modern
2025-03-23 13:26:21.413 python[86886:2067662] +[IMKInputSession subclass]: chose IMKInputSession_Modern


User requested exit
User requested exit
Cleaning up resources


KeyboardInterrupt: 

: 