## Mount GOOGLE COLAB

In [None]:
from google.colab import drive
drive.mount('/content/drive')

## Install Libraries

In [None]:
!pip install mediapipe
!pip install ultralytics
!pip install opencv-python numpy
!pip uninstall torch torchvision torchaudio

In [None]:
!pip install torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/cu112 #Modify CUDA version based on Installed version of CUDA i have the 11.2

## Import Dependencies

In [1]:
import torch

print(torch.__version__)
print(torch.cuda.is_available())
print(torch.version.cuda)

2.1.2+cpu
False
None


In [31]:
import cv2
import numpy as np
import mediapipe as mp
from ultralytics import YOLO
from ultralytics.utils.checks import check_imshow
from ultralytics.utils.plotting import Annotator, colors
from collections import defaultdict
import random
import string
from datetime import datetime
import time

## MediaPipe Pose Detector

In [32]:
# Initialize MediaPipe Pose Detector
class poseDetector():
    def __init__(self, mode=False, upBody=False, smooth=True, detectionCon=0.8, trackCon=0.8, landmarkThickness=1):
        self.mode = mode
        self.upBody = upBody
        self.smooth = smooth
        self.detectionCon = detectionCon
        self.trackCon = trackCon
        self.landmarkThickness = landmarkThickness  # New parameter for landmark thickness
        self.mpDraw = mp.solutions.drawing_utils
        self.mpPose = mp.solutions.pose
        self.pose = self.mpPose.Pose(static_image_mode=self.mode,
                                     model_complexity=1,
                                     smooth_landmarks=self.smooth,
                                     enable_segmentation=False,
                                     smooth_segmentation=False,
                                     min_detection_confidence=self.detectionCon,
                                     min_tracking_confidence=self.trackCon)

    def findPose(self, img, draw=True):
        imgRGB = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        self.results = self.pose.process(imgRGB)
        if self.results.pose_landmarks and draw:
            self.mpDraw.draw_landmarks(img, self.results.pose_landmarks, self.mpPose.POSE_CONNECTIONS)
        return img

    def findPosition(self, img, draw=True):
        lmList = []
        if self.results.pose_landmarks:
            for id, lm in enumerate(self.results.pose_landmarks.landmark):
                h, w, c = img.shape
                cx, cy = int(lm.x * w), int(lm.y * h)
                lmList.append([id, cx, cy])
                if draw:
                    # Use the specified thickness for landmarks
                    cv2.circle(img, (cx, cy), 1, (255, 99, 51), self.landmarkThickness)
        return lmList

## MediaPipe Face Detector

In [33]:
# Initialize MediaPipe face Detector
class FaceDetector():
    
    def __init__(self, minDetectionCon = 0.7):
        
        self.minDetectionCon = minDetectionCon      
        self.mpFaceDetection = mp.solutions.face_detection
        self.mpDraw = mp.solutions.drawing_utils
        self.faceDetection = self.mpFaceDetection.FaceDetection(self.minDetectionCon)

        
    def fancyDraw(self, img, bbox, color=(255, 0, 255), thickness=2):
        x, y, w, h = bbox
        cv2.rectangle(img, (x, y), (x+w, y+h), color, thickness)
        # Add more fancy drawing code here if needed
        return img


    def findFaces(self, img, draw = True):
        
        imgRGB = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        self.results = self.faceDetection.process(imgRGB)
        # print(self.results)
        
        bboxs = []
        if self.results.detections:
            for id, detection in enumerate(self.results.detections):
                # mpDraw.draw_detection(img, detection)            
                # print(id, detection)
                # print(detection.score)
                # print(detection.location_data.relative_bounding_box)
                
                bboxC = detection.location_data.relative_bounding_box
                ih , iw ,ic = img.shape
                bbox = int(bboxC.xmin * iw), int(bboxC.ymin * ih),\
                    int(bboxC.width * iw), int(bboxC.height * ih)
                
                bboxs.append([id, bbox, detection.score])
                if draw:
                    cv2.rectangle(img, bbox, (255, 0, 255), 1)
        return img, bboxs

## Initialize ultralytics - YOLOv8 

In [34]:
# Initialize YOLOv8 
model = YOLO("YOLOv8 - models/yolov8x.pt")  # Use 0 for webcam input
names = model.model.names

## Initialize - MediaPipe Detectors

In [35]:
# Initialize MediaPipe Detectors
detector = poseDetector(landmarkThickness=1)
face_detector = FaceDetector(minDetectionCon = 0.7)

## Video processing

In [36]:
# Video processing
video_path = "input_dir/6.mp4"
cap = cv2.VideoCapture(video_path)
assert cap.isOpened(), "Error reading video file"

## Frame processing 

In [37]:
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
frame_rate = cap.get(cv2.CAP_PROP_FPS)  # Use the input video's frame rate

### Generate a unique random filename for the output video

## Generate a filename based on the current date and time

In [38]:
def generate_timestamp_filename(extension=".mp4"):
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    return f"{timestamp}{extension}"

# Generate a timestamp-based filename for the output video
timestamp_filename = generate_timestamp_filename()

timestamp_filename

'20240112_170753.mp4'

## Output Video File

In [39]:
# Using 'mp4v' codec for MP4 file format
# out = cv2.VideoWriter("output.mp4", cv2.VideoWriter_fourcc(*'mp4v'), frame_rate, (frame_width, frame_height))


# Using 'mp4v' codec for MP4 file format with the random filename
# out = cv2.VideoWriter(random_filename, cv2.VideoWriter_fourcc(*'mp4v'), frame_rate, (frame_width, frame_height))


In [40]:
# Using 'mp4v' codec for MP4 file format with the timestamp filename
out = cv2.VideoWriter(timestamp_filename, cv2.VideoWriter_fourcc(*'mp4v'), frame_rate, (frame_width, frame_height)) 

# Version 2.0

### Tracking History 

In [41]:
track_history = defaultdict(list)

## Drawing Options

In [42]:
# Initialize flags
draw_bounding_box = True
draw_face_bounding_box = False
draw_landmarks = False
draw_tracking = True
draw_nose_to_corner_line = True

In [43]:

#specific landmark to the bottom-left corner if the flag is set

# 0 is the nose

Landmark_Bottom_Left_Corner_Line = 0  # Set your desired landmark index here

## Video

### Display Real-Time video frame process or no

In [44]:
# Initialize flags
display_video = False  # Set to False to process in background without display

## Main Loop

In [45]:
try:
    while cap.isOpened():
        success, frame = cap.read()
        if not success:
            break  # Exit loop if there are no more frames to read      
    
        # Apply YOLO object detection on the frame to find humans
        results = model.track(frame, persist=True, verbose=False)
        boxes = results[0].boxes.xyxy.cpu()  # Extract bounding boxes

        # Process each detected object
        if results[0].boxes.id is not None:
            clss = results[0].boxes.cls.cpu().tolist()  # Class IDs of detections
            track_ids = results[0].boxes.id.int().cpu().tolist()  # Tracking IDs

            # Initialize annotator for drawing bounding boxes if the flag is set
            annotator = Annotator(frame, line_width=2) if draw_bounding_box else None

            for box, cls, track_id in zip(boxes, clss, track_ids):
                if names[int(cls)] != "person":
                    continue

                if draw_bounding_box:
                    annotator.box_label(box, color=colors(int(cls), True), label=names[int(cls)])

                x1, y1, x2, y2 = [int(b) for b in box[:4]]
                human_img = frame[y1:y2, x1:x2]

                # Perform face detection within the YOLO bounding box if the flag is set
                if draw_face_bounding_box:
                    face_img_rgb = cv2.cvtColor(human_img, cv2.COLOR_BGR2RGB)
                    human_img, face_detections = face_detector.findFaces(face_img_rgb, draw=False)
                    if face_detections:
                        for _, bbox, _ in face_detections:
                            bx, by, bw, bh = bbox
                            cv2.rectangle(frame, (x1 + bx, y1 + by), (x1 + bx + bw, y1 + by + bh), (0, 255, 0), 2)

                # Perform pose estimation on the cropped human image
                human_img = detector.findPose(human_img, draw=False)
                lmList = detector.findPosition(human_img, draw=draw_landmarks)

                # Tracking logic: Update tracking history and draw tracking lines if the flag is set
                if draw_tracking:
                    center = (int((x1 + x2) / 2), int((y1 + y2) / 2))
                    track_history[track_id].append(center)
                    if len(track_history[track_id]) > 30:
                        track_history[track_id].pop(0)

                    if len(track_history[track_id]) > 1:
                        points = np.array(track_history[track_id], dtype=np.int32).reshape((-1, 1, 2))
                        cv2.polylines(frame, [points], isClosed=False, color=colors(int(cls), True), thickness=1)

                # Draw a line from a specific landmark to the bottom-left corner if the flag is set
                if draw_nose_to_corner_line:
                    for lm in lmList:
                        if lm[0] == Landmark_Bottom_Left_Corner_Line:  # Use the variable here
                            chosen_landmark_point = (lm[1] + x1, lm[2] + y1)
                            bottom_left_corner = (0, frame_height)
                            cv2.line(frame, chosen_landmark_point, bottom_left_corner, (0, 255, 255), 1)

        # Write the processed frame to output
        if draw_bounding_box:
            out.write(annotator.result())
        else:
            out.write(frame)

        # Display the processed frame if the flag is set (optional, based on your requirement)
        if display_video:
            cv2.imshow("Processed Frame", frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

finally:
    # Release resources
    cap.release()
    out.release()  # Ensure this is called to finalize the video file
    cv2.destroyAllWindows()
    track_history.clear()
    print("Processing complete.")

Processing complete.


# Process all videos in a directory and export an output video for each one into an output directory

In [46]:
import os
import glob
import cv2
from pathlib import Path
from ultralytics import YOLO
from collections import defaultdict

In [47]:
# Initialize YOLOv8 and MediaPipe Detectors
# model = YOLO("YOLOv8 - models/yolov8x.pt")
detector = poseDetector(landmarkThickness=1)
face_detector = FaceDetector(minDetectionCon=0.9)
names = model.model.names

In [48]:
input_directory = "input_dir"
output_directory = "output_dir"

In [49]:
# Create output directory if it doesn't exist
Path(output_directory).mkdir(parents=True, exist_ok=True)

In [50]:
# List all video files in the input directory
video_files = glob.glob(os.path.join(input_directory, "*.mp4"))

In [51]:
# Initialize flags
draw_bounding_box = True
draw_face_bounding_box = False
draw_landmarks = False
draw_tracking = True
draw_nose_to_corner_line = True
draw_global_face_detection = True

In [52]:

#specific landmark to the bottom-left corner if the flag is set

# 0 is the nose

Landmark_Bottom_Left_Corner_Line = 0  # Set your desired landmark index here

In [53]:
# Process each video file
for video_path in video_files:
    # Reinitialize the YOLOv8 model at the start of each video
    model = YOLO("YOLOv8 - models/yolov8x.pt")  # Adjust model path as needed
    
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error opening video file: {video_path}")
        continue

    # Get video properties
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    frame_rate = cap.get(cv2.CAP_PROP_FPS)

    # Generate a unique output filename
    output_filename = os.path.join(output_directory, os.path.basename(video_path))

    # Initialize VideoWriter for the current video dimensions
    out = cv2.VideoWriter(output_filename, cv2.VideoWriter_fourcc(*'mp4v'), frame_rate, (frame_width, frame_height))

    track_history = defaultdict(list)  # Initialize tracking history for each video

    while cap.isOpened():
        success, frame = cap.read()
        if not success:
            break  # Exit loop if there are no more frames to read
                
        if draw_global_face_detection:
            # Perform global face detection on the entire frame
            frame, bboxs = face_detector.findFaces(frame, draw=True)
        
    
        # Apply YOLO object detection on the frame to find humans
        results = model.track(frame, persist=True, verbose=False)
        boxes = results[0].boxes.xyxy.cpu()  # Extract bounding boxes

        # Process each detected object
        if results[0].boxes.id is not None:
            clss = results[0].boxes.cls.cpu().tolist()  # Class IDs of detections
            track_ids = results[0].boxes.id.int().cpu().tolist()  # Tracking IDs

            # Initialize annotator for drawing bounding boxes if the flag is set
            annotator = Annotator(frame, line_width=2) if draw_bounding_box else None

            for box, cls, track_id in zip(boxes, clss, track_ids):
                if names[int(cls)] != "person":
                    continue

                if draw_bounding_box:
                    annotator.box_label(box, color=colors(int(cls), True), label=names[int(cls)])

                x1, y1, x2, y2 = [int(b) for b in box[:4]]
                human_img = frame[y1:y2, x1:x2]

                # Perform face detection within the YOLO bounding box if the flag is set
                if draw_face_bounding_box:
                    face_img_rgb = cv2.cvtColor(human_img, cv2.COLOR_BGR2RGB)
                    human_img, face_detections = face_detector.findFaces(face_img_rgb, draw=False)
                    if face_detections:
                        for _, bbox, _ in face_detections:
                            bx, by, bw, bh = bbox
                            cv2.rectangle(frame, (x1 + bx, y1 + by), (x1 + bx + bw, y1 + by + bh), (0, 255, 0), 2)

                # Perform pose estimation on the cropped human image
                human_img = detector.findPose(human_img, draw=False)
                lmList = detector.findPosition(human_img, draw=draw_landmarks)

                # Tracking logic: Update tracking history and draw tracking lines if the flag is set
                if draw_tracking:
                    center = (int((x1 + x2) / 2), int((y1 + y2) / 2))
                    track_history[track_id].append(center)
                    if len(track_history[track_id]) > 30:
                        track_history[track_id].pop(0)

                    if len(track_history[track_id]) > 1:
                        points = np.array(track_history[track_id], dtype=np.int32).reshape((-1, 1, 2))
                        cv2.polylines(frame, [points], isClosed=False, color=colors(int(cls), True), thickness=1)

                # Draw a line from a specific landmark to the bottom-left corner if the flag is set
                if draw_nose_to_corner_line:
                    for lm in lmList:
                        if lm[0] == Landmark_Bottom_Left_Corner_Line:  # Use the variable here
                            chosen_landmark_point = (lm[1] + x1, lm[2] + y1)
                            bottom_left_corner = (0, frame_height)
                            cv2.line(frame, chosen_landmark_point, bottom_left_corner, (255, 0, 255), 1)

       # Write the processed frame to output
        if draw_bounding_box:
            out.write(annotator.result())
        else:
            out.write(frame)

    # After processing each video, clear the tracking history
    track_history.clear()

    # Release resources for the current video
    cap.release()
    out.release()

print("Processing complete.")

Processing complete.
