In [None]:
# 1. Install Libraries
!pip install -q moviepy ultralytics transformers torch torchvision opencv-python

# 2. Import Dependencies
import cv2
import numpy as np
import torch
import os
from ultralytics import YOLO
from transformers import pipeline
from PIL import Image
from moviepy.editor import VideoFileClip, AudioFileClip, CompositeAudioClip
from google.colab import files

In [None]:
print("Loading AI Models...")

# Setup GPU
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using Device: {device}")

# 1. Load YOLOv8 (Object Detection)
yolo_model = YOLO('yolov8n.pt')

# 2. Load Depth Anything V2 (Depth Estimation)
depth_pipe = pipeline(task="depth-estimation", model="depth-anything/Depth-Anything-V2-Small-hf", device=0 if device == "cuda" else -1)

print("‚úÖ Models Loaded Successfully!")

In [None]:
def process_video_pipeline(input_path, output_filename):
    if not os.path.exists(input_path):
        print(f"‚ùå Error: {input_path} not found. Please upload it.")
        return

    print(f"üé¨ Processing Video: {input_path}")

    cap = cv2.VideoCapture(input_path)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    # Temp video (Visuals only, no sound yet)
    temp_video_path = "temp_visuals.mp4"
    out = cv2.VideoWriter(temp_video_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width * 2, height))

    # --- AUDIO LOGIC VARS ---
    beep_timestamps = []
    last_beep_time = -1.0 # Cooldown to prevent "machine gun" beeping

    frame_count = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret: break

        # 1. Depth Estimation
        pil_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        depth_result = depth_pipe(pil_image)
        depth_map = np.array(depth_result["depth"])
        depth_map_resized = cv2.resize(depth_map, (width, height))

        # Visualization (Heatmap)
        depth_display = cv2.normalize(depth_map_resized, None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8U)
        depth_color = cv2.applyColorMap(depth_display, cv2.COLORMAP_INFERNO)

        # 2. YOLO Detection
        results = yolo_model(frame, verbose=False)
        annotated_frame = frame.copy()

        danger_detected_this_frame = False

        for result in results:
            for box in result.boxes:
                x1, y1, x2, y2 = map(int, box.xyxy[0])
                cls = int(box.cls[0])
                label = yolo_model.names[cls]

                if label in ['person', 'car', 'truck', 'bus', 'motorcycle']:
                    # Depth Logic (70th Percentile Filter)
                    roi = depth_map_resized[y1:y2, x1:x2]
                    score = np.percentile(roi, 70) if roi.size > 0 else 0

                    # Calibration (Magic Number 250)
                    dist = 250 / score if score > 10 else 10.0

                    # Alert Logic
                    color = (0, 255, 0) # Green
                    status = ""

                    if dist < 2.5: # Warning
                        color = (0, 165, 255) # Orange
                        status = "Warning"

                    if dist < 1.5: # DANGER (Trigger Sound)
                        color = (0, 0, 255) # Red
                        status = "STOP!"
                        danger_detected_this_frame = True

                    # Draw Visuals
                    cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), color, 2)
                    label_text = f"{label} {dist:.1f}m {status}"
                    (w, h), _ = cv2.getTextSize(label_text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)
                    cv2.rectangle(annotated_frame, (x1, y1-20), (x1+w, y1), color, -1)
                    cv2.putText(annotated_frame, label_text, (x1, y1-5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 2)

        # 3. Audio Trigger Logic
        current_time = frame_count / fps
        # If danger detected AND we haven't beeped in the last 0.8 seconds
        if danger_detected_this_frame and (current_time - last_beep_time > 0.8):
            beep_timestamps.append(current_time)
            last_beep_time = current_time

        # Save Frame
        combined = np.hstack((annotated_frame, depth_color))
        out.write(combined)
        frame_count += 1
        if frame_count % 20 == 0: print(f"  Processed {frame_count}/{total_frames} frames...", end="\r")

    cap.release()
    out.release()

    # --- STEP 4: Post-Production (Adding Audio) ---
    print(f"\nüéß Adding {len(beep_timestamps)} audio alerts to video...")

    try:
        # Load the Visual Video
        video_clip = VideoFileClip(temp_video_path)

        # Load the Beep Sound
        beep_sound = AudioFileClip("beep.mp3")

        # Create an audio clip for EVERY timestamp in our list
        audio_clips = []
        for t in beep_timestamps:
            # Overlay the beep sound starting at time 't'
            audio_clips.append(beep_sound.set_start(t))

        # Combine all beeps into one track
        if len(audio_clips) > 0:
            final_audio = CompositeAudioClip(audio_clips)
            # Set the audio to the video (limit duration to match video)
            final_audio = final_audio.set_duration(video_clip.duration)
            final_clip = video_clip.set_audio(final_audio)
        else:
            final_clip = video_clip # No warnings, no sound

        # Write the Final File (Web Friendly)
        final_clip.write_videofile(output_filename, codec='libx264', audio_codec='aac', logger=None)

        print(f"‚úÖ DONE! Saved: {output_filename}")
        files.download(output_filename)

    except Exception as e:
        print(f"‚ö†Ô∏è Audio Error: {e}")
        print("Falling back to silent video...")
        # Fallback if audio fails
        os.rename(temp_video_path, output_filename)
        files.download(output_filename)

In [None]:
# Run on your uploaded videos
process_video_pipeline("input-video-01.mp4", "vision-mate-output-01.mp4")
process_video_pipeline("input-video-02.mp4", "vision-mate-output-02.mp4")
process_video_pipeline("input-video-03.mp4", "vision-mate-output-03.mp4")