<a href="https://colab.research.google.com/github/krunalhp/avalokan/blob/main/VideoAnalysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
import cv2
import os
import numpy as np

# Function to add activity labels and track movement
def annotate_frame(frame, person_id, activity_label, bbox):
    # Set font and text color
    font = cv2.FONT_HERSHEY_SIMPLEX
    font_scale = 1
    color = (0, 255, 0)  # Green
    thickness = 2

    # Draw bounding box around detected person
    x, y, w, h = bbox
    cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 0, 255), 2)

    # Add ID and activity label above bounding box
    label = f"ID: {person_id}, Activity: {activity_label}"
    text_size = cv2.getTextSize(label, font, font_scale, thickness)[0]
    text_x = x + (w - text_size[0]) // 2
    text_y = y - 10
    cv2.putText(frame, label, (text_x, text_y), font, font_scale, color, thickness)

    return frame

# Function to process and annotate the video with people detection and activity
def analyze_local_video(video_path):
    # Open the video
    cap = cv2.VideoCapture(video_path)

    if not cap.isOpened():
        print("Error: Unable to open video.")
        return

    # Get video properties
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Codec for output video

    # Set output video path
    output_video_path = "/content/sample_data/output_video_with_activity.mp4"

    # Initialize the video writer
    out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))

    # Create a HOG person detector (pre-trained model)
    hog = cv2.HOGDescriptor()
    hog.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector())

    person_id = 0  # ID for the people detected
    prev_positions = []  # Store previous positions for activity tracking

    while True:
        ret, frame = cap.read()

        if not ret:
            break  # End of video

        # Detect people in the frame
        boxes, weights = hog.detectMultiScale(frame, winStride=(8, 8), padding=(8, 8), scale=1.05)

        annotated_frame = frame.copy()  # Make sure annotated_frame is defined

        # Iterate over the detected people and track their movement
        for bbox in boxes:
            x, y, w, h = bbox

            # Track movement to classify activity
            centroid = (x + w // 2, y + h // 2)

            # For simplicity, we'll assume each detected box represents a unique person
            matched = False
            for i, (prev_centroid, prev_bbox) in enumerate(prev_positions):
                prev_x, prev_y, prev_w, prev_h = prev_bbox
                # Check if bounding boxes are close (indicating the same person)
                if abs(x - prev_x) < 100 and abs(y - prev_y) < 100:  # Threshold for re-identifying the person
                    # Calculate movement speed (delta in centroid)
                    dx = centroid[0] - prev_centroid[0]
                    dy = centroid[1] - prev_centroid[1]
                    speed = np.sqrt(dx**2 + dy**2)

                    # Define activity based on speed (simple thresholding)
                    if speed > 10:
                        activity_label = "Running"
                    else:
                        activity_label = "Walking"

                    prev_positions[i] = [centroid, bbox]  # Update the tracked position
                    matched = True
                    break

            if not matched:
                # If no match, treat this as a new person
                prev_positions.append([centroid, bbox])
                activity_label = "Walking"  # Default activity for new person
                person_id += 1  # Increment person ID

            # Annotate the frame with bounding box and activity
            annotated_frame = annotate_frame(annotated_frame, person_id, activity_label, bbox)

        # Write the frame to the output video
        out.write(annotated_frame)

    # Release the resources
    cap.release()
    out.release()
    print(f"Processed video saved as {output_video_path}")

# Path to the local video file
video_path = "/content/sample_data/cctv.mp4"  # Path to your local video file

if os.path.exists(video_path):
    analyze_local_video(video_path)
else:
    print(f"Video file not found: {video_path}")


Processed video saved as /content/sample_data/output_video_with_activity.mp4
