In [2]:
!pip install ultralytics -qq

In [3]:
import numpy as np
print(np.__version__)


1.26.4


In [4]:
!pip install numpy==1.26.4



In [3]:

# Object Detecion 
import cv2
from ultralytics import YOLO
#plots
import matplotlib.pyplot as plt
import seaborn as sns

#basics
import pandas as pd
import numpy as np
import os
import subprocess

from tqdm.notebook import tqdm

# Display image and videos
import IPython
from IPython.display import Video, display
%matplotlib inline


import urllib.request 
import shutil


# A People Detection and Counting project in a ROI based on the Yolo V8 Model.
----------------------
 **The objectives of the project are:**
- To detect people that are passing in a Region of interest (ROI) 
- Track each individual with a unique ID in the ROI 

---------------------

In [4]:
!cd

d:\PROJECT_ON_Github\Opencv_Smart_Door


In [5]:
# Video  path for experiment
path_zip = 'https://github.com/freedomwebtech/roiinyolo/raw/main/vid1.zip' # credits to the github repo for the video
urllib.request.urlretrieve(path_zip, "vid1.zip")
shutil.unpack_archive('vid1.zip')

In [6]:
path = 'D:\\PROJECT_ON_Github\\Opencv_Smart_Door\\vid1.mp4'

In [8]:
#loading a YOLO model 
model = YOLO('yolov8x.pt')

#geting names from classes
dict_classes = model.model.names

In [9]:
# Auxiliary functions
def risize_frame(frame, scale_percent):
    """Function to resize an image in a percent scale"""
    width = int(frame.shape[1] * scale_percent / 100)
    height = int(frame.shape[0] * scale_percent / 100)
    dim = (width, height)

    # resize image
    resized = cv2.resize(frame, dim, interpolation = cv2.INTER_AREA)
    return resized



def filter_tracks(centers, patience):
    """Function to filter track history"""
    filter_dict = {}
    for k, i in centers.items():
        d_frames = i.items()
        filter_dict[k] = dict(list(d_frames)[-patience:])

    return filter_dict


def update_tracking(centers_old,obj_center, thr_centers, lastKey, frame, frame_max):
    """Function to update track of objects"""
    is_new = 0
    lastpos = [(k, list(center.keys())[-1], list(center.values())[-1]) for k, center in centers_old.items()]
    lastpos = [(i[0], i[2]) for i in lastpos if abs(i[1] - frame) <= frame_max]
    # Calculating distance from existing centers points
    previous_pos = [(k,obj_center) for k,centers in lastpos if (np.linalg.norm(np.array(centers) - np.array(obj_center)) < thr_centers)]
    # if distance less than a threshold, it will update its positions
    if previous_pos:
        id_obj = previous_pos[0][0]
        centers_old[id_obj][frame] = obj_center
    
    # Else a new ID will be set to the given object
    else:
        if lastKey:
            last = lastKey.split('D')[1]
            id_obj = 'ID' + str(int(last)+1)
        else:
            id_obj = 'ID0'
            
        is_new = 1
        centers_old[id_obj] = {frame:obj_center}
        lastKey = list(centers_old.keys())[-1]

    
    return centers_old, id_obj, is_new, lastKey


# Detecting People in ROI 

In [10]:
### Configurations
verbose = False
scale_percent = 100
conf_level = 0.8
thr_centers = 20
frame_max = 5
patience = 100
alpha = 0.1

# Reading video with cv2
video = cv2.VideoCapture(0)

# Objects to detect (YOLO)
class_IDS = [0]
centers_old = {}
obj_id = 0 
end = []
frames_list = []
count_p = 0
lastKey = ''
print(f'[INFO] - Verbose during Prediction: {verbose}')

# Original video information
height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
fps = video.get(cv2.CAP_PROP_FPS)
print('[INFO] - Original Dim: ', (width, height))

# Scaling video for better performance
if scale_percent != 100:
    print('[INFO] - Scaling change may cause errors in pixel lines')
    width = int(width * scale_percent / 100)
    height = int(height * scale_percent / 100)
    print('[INFO] - Dim Scaled: ', (width, height))
    
### Video output setup
video_name = 'result.mp4'
output_path = "rep_" + video_name
tmp_output_path = "tmp_" + output_path
VIDEO_CODEC = "MP4V"

output_video = cv2.VideoWriter(tmp_output_path, 
                               cv2.VideoWriter_fourcc(*VIDEO_CODEC), 
                               fps, (width, height))

# Executing Recognition 
for i in tqdm(range(int(video.get(cv2.CAP_PROP_FRAME_COUNT)))):
    ret, frame = video.read()
    if not ret:
        break
    
    # Resize frame
    frame = risize_frame(frame, scale_percent)
    area_roi = [np.array([(1250, 400), (750, 400), (700, 800), (1200, 800)], np.int32)]
    ROI = frame[390:800, 700:1300]

    if verbose:
        print('Dimension Scaled(frame): ', (frame.shape[1], frame.shape[0]))

    # Getting predictions
    y_hat = model.predict(ROI, conf=conf_level, classes=class_IDS, device=0, verbose=False)
    
    # Retrieve bounding boxes, confidence scores, and class labels
    boxes = y_hat[0].boxes.xyxy.cpu().numpy()       # shape (n, 4)
    conf = y_hat[0].boxes.conf.cpu().numpy()          # shape (n,)
    classes = y_hat[0].boxes.cls.cpu().numpy()        # shape (n,)
    
    # Combine into a DataFrame
    positions_frame = pd.DataFrame(
        np.hstack((boxes, conf.reshape(-1, 1), classes.reshape(-1, 1))),
        columns=['xmin', 'ymin', 'xmax', 'ymax', 'conf', 'class']
    )
    
    # Translate numeric class labels to text (assuming dict_classes exists)
    labels = [dict_classes[int(c)] for c in classes]
    
    # Process each detection
    for ix, row in positions_frame.iterrows():
        xmin, ymin, xmax, ymax, confidence, category = row[['xmin', 'ymin', 'xmax', 'ymax', 'conf', 'class']]
        xmin, ymin, xmax, ymax = int(xmin), int(ymin), int(xmax), int(ymax)
        
        # Calculate center of the bounding box
        center_x, center_y = int((xmin + xmax) / 2), int((ymin + ymax) / 2)
        
        # Update tracking for each object
        centers_old, id_obj, is_new, lastKey = update_tracking(centers_old, (center_x, center_y), thr_centers, lastKey, i, frame_max)
        
        # Update count for new detections
        count_p += is_new
        
        # Draw bounding box and centers on ROI
        cv2.rectangle(ROI, (xmin, ymin), (xmax, ymax), (0, 0, 255), 2)
        for cx, cy in centers_old[id_obj].values():
            cv2.circle(ROI, (cx, cy), 5, (0, 0, 255), -1)
        
        # Draw label above bounding box
        cv2.putText(ROI, f"{id_obj}:{np.round(conf[ix], 2)}", (xmin, ymin - 10),
                    cv2.FONT_HERSHEY_TRIPLEX, 0.8, (0, 0, 255), 1)
    
    # Draw the count on the original frame
    cv2.putText(frame, f'Counts People in ROI: {count_p}', (30, 40),
                cv2.FONT_HERSHEY_TRIPLEX, 1.5, (255, 0, 0), 1)
    
    # Filter track history
    centers_old = filter_tracks(centers_old, patience)
    if verbose:
        print(contador_in, contador_out)
    
    # Draw ROI area
    overlay = frame.copy()
    cv2.polylines(overlay, pts=area_roi, isClosed=True, color=(255, 0, 0), thickness=2)
    cv2.fillPoly(overlay, area_roi, (255, 0, 0))
    frame = cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0)
    
    # Save processed frame and write to video
    frames_list.append(frame)
    output_video.write(frame)
    
# Release video writer
output_video.release()

# Post processing: Fixing video output codec
if os.path.exists(output_path):
    os.remove(output_path)
    
subprocess.run(
    ["ffmpeg", "-i", tmp_output_path, "-crf", "18", "-preset", "veryfast",
     "-hide_banner", "-loglevel", "error", "-vcodec", "libx264", output_path]
)
os.remove(tmp_output_path)


[INFO] - Verbose during Prediction: False
[INFO] - Original Dim:  (640, 480)


0it [00:00, ?it/s]

# Sampling Transformed Frames Results

In [12]:
print(len(frames_list))


0


In [13]:
# Checking samples of processed frames
for i in [62,63, 64, 65, 66]:
    plt.figure(figsize =( 14, 10))
    plt.imshow(frames_list[i])
    plt.show()

IndexError: list index out of range

<Figure size 1400x1000 with 0 Axes>

In [13]:
import cv2

def show_webcam():
    cap = cv2.VideoCapture("rtsp://admin:ZAMOUF@192.168.0.137:554/H.264")  # Open the default webcam
    if not cap.isOpened():
        print("Cannot open webcam")
        return

    while True:
        ret, frame = cap.read()  # Read frame from webcam
        if not ret:
            print("Can't receive frame. Exiting...")
            break

        cv2.imshow('Webcam Feed', frame)  # Display the resulting frame

        # Exit when 'q' key is pressed
        if cv2.waitKey(1) == ord('q'):
            break

    cap.release()  # Release the capture
    cv2.destroyAllWindows()  # Close any open windows

show_webcam()    

# Executing Result Video 

In [2]:
import torch

print("Number of GPU: ", torch.cuda.device_count())
print("GPU Name: ", torch.cuda.get_device_name())


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', device)

Number of GPU:  1
GPU Name:  NVIDIA GeForce RTX 3060 Laptop GPU
Using device: cuda


In [1]:
import torch
print(torch.cuda.is_available())


True


In [14]:
#output video result
frac = 0.7 
Video(data='rep_result.mp4', embed=True, height=int(720 * frac), width=int(1280 * frac))