<a href="https://colab.research.google.com/github/suyashmarathe512/suyash/blob/main/Crowd%20detection%20model/Crowd_detection_using_YOLO.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [23]:
!pip install opencv-python numpy pandas scikit-learn

# Import necessary libraries
import cv2
import numpy as np
import pandas as pd
from collections import deque
from sklearn.cluster import DBSCAN
import os

# Download the YOLOv3 weights and configuration files.
# These files are needed for the cv2.dnn.readNet() function to work.
# This downloads the file to the same directory as the notebook.




In [24]:
!pip install wget
import wget
if not os.path.exists('yolov3.weights'):
    wget.download("https://pjreddie.com/media/files/yolov3.weights")
if not os.path.exists('yolov3.cfg'):
    wget.download("https://raw.githubusercontent.com/pjreddie/darknet/master/cfg/yolov3.cfg")




In [25]:
print(os.listdir())

['.config', 'yolov3.cfg', '.ipynb_checkpoints', 'yolov3.weights', 'dataset_video.mp4', 'sample_data']


In [26]:
net = cv2.dnn.readNet("yolov3.weights", "yolov3.cfg")
layer_names = net.getLayerNames()
unconnected_layers = net.getUnconnectedOutLayers()

# Check if the output is a NumPy array or a list of lists
if isinstance(unconnected_layers, np.ndarray):
    # If it's a NumPy array (newer OpenCV), process accordingly
    output_layers = [layer_names[i - 1] for i in unconnected_layers.flatten()]
else:
    # If it's a list of lists (older OpenCV), process as before
    output_layers = [layer_names[i[0] - 1] for i in unconnected_layers]

In [27]:
def detect_persons(frame):
    height, width = frame.shape[:2]
    blob = cv2.dnn.blobFromImage(frame, 0.00392, (416, 416), swapRB=True, crop=False)
    net.setInput(blob)
    outputs = net.forward(output_layers)

    boxes, confidences = [], []
    for output in outputs:
        for detection in output:
            scores = detection[5:]
            class_id = np.argmax(scores)
            confidence = scores[class_id]
            if class_id == 0 and confidence > 0.5:  # 'person' class
                box = detection[0:4] * np.array([width, height, width, height])
                (x, y, w, h) = box.astype("int")
                boxes.append([x, y, int(w), int(h)])
                confidences.append(float(confidence))

    indices = cv2.dnn.NMSBoxes(boxes, confidences, 0.5, 0.4)
    if indices is not None and indices.size > 0:
        # Flatten the indices array to handle both 1D and 2D array cases
        indices = indices.flatten()
        return [boxes[i] for i in indices]  # Access boxes directly with the index
    else:
        return []

In [28]:
def is_crowd(persons, eps=100, min_samples=3):
    if len(persons) < min_samples:
        return []
    # Get center points of bounding boxes
    centers = np.array([(x + w/2, y + h/2) for (x, y, w, h) in persons])
    # Use DBSCAN to cluster nearby persons
    clustering = DBSCAN(eps=eps, min_samples=min_samples).fit(centers)
    labels = clustering.labels_
    # Count unique clusters (ignore noise label -1)
    clusters = {}
    for label, (x, y, w, h) in zip(labels, persons):
        if label != -1:
            clusters.setdefault(label, []).append((x, y, w, h))
    return [group for group in clusters.values() if len(group) >= 3]


In [29]:

def process_video(video_path, output_csv="crowd_log.csv"):
    cap = cv2.VideoCapture(video_path)
    frame_count = 0
    crowd_buffer = deque(maxlen=10)  # Track last 10 frames
    log_data = []

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        frame_count += 1

        persons = detect_persons(frame)
        crowd_groups = is_crowd(persons)

        # Check if any crowd exists in this frame
        crowd_detected = len(crowd_groups) > 0
        crowd_buffer.append(crowd_detected)

        # Log if 10 consecutive frames have crowds
        if len(crowd_buffer) == 10 and all(crowd_buffer):
            # Get the largest crowd group size
            max_size = max(len(group) for group in crowd_groups) if crowd_groups else 0
            log_data.append({"Frame Number": frame_count, "Person Count in Crowd": max_size})
            crowd_buffer.clear()  # Reset buffer after logging

    # Save to CSV
    df = pd.DataFrame(log_data)
    df.to_csv(output_csv, index=False)
    cap.release()

#Create a dummy video if the user does not have one.
if not os.path.exists('dataset_video.mp4'):
    !ffmpeg -f lavfi -i color=c=blue:s=640x480:d=10 -pix_fmt yuv420p dataset_video.mp4
# Example usage
process_video("dataset_video.mp4")