<a href="https://colab.research.google.com/github/silent-doom/agentic-ai/blob/feature%2Feditor-agent/Faizan_Editor_agent_v2_0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [36]:
import os
import json
import time
import re
import sys
import subprocess
import urllib.request
from concurrent.futures import ThreadPoolExecutor, as_completed

In [37]:
# =================================================================
# 0. LIGHTWEIGHT SETUP (No MediaPipe/TensorFlow)
# =================================================================

def install_lightweight_dependencies():
    """Installs only the necessary, stable libraries."""
    try:
        import moviepy
        import yt_dlp
        import whisper
    except ImportError:
        print("üì¶ Installing lightweight dependencies...")
        subprocess.check_call([sys.executable, "-m", "pip", "install", "-q",
            "moviepy==1.0.3", "yt-dlp", "git+https://github.com/openai/whisper.git"])

        # System deps for MoviePy
        subprocess.run("apt update -qq && apt install -qq imagemagick", shell=True, check=False)
        subprocess.run("sed -i 's/none/read,write/' /etc/ImageMagick-6/policy.xml", shell=True, check=False)

install_lightweight_dependencies()

In [38]:
# Imports
import cv2
import numpy as np
import yt_dlp
import whisper
from moviepy.editor import VideoFileClip, TextClip, CompositeVideoClip
from moviepy.video.fx.all import crop
from google.colab import drive

In [39]:
# =================================================================
# 1. CONFIGURATION & ROBUST PATH DISCOVERY
# =================================================================

# Padding to prevent abrupt starts/ends (seconds)
TIME_PADDING = 1.5

def get_robust_paths():
    """
    Attempts to locate the AI_Transcripts folder across different mount points.
    Returns (TRANSCRIPT_FOLDER, INPUT_PLAN_FILE, OUTPUT_FOLDER)
    """
    possible_bases = [
        "/content/drive/My Drive/AI_Transcripts",
        "/content/gdrive/MyDrive/AI_Transcripts",
        "/content/gdrive/My Drive/AI_Transcripts",
        "/content/drive/MyDrive/AI_Transcripts"
    ]

    for base in possible_bases:
        if os.path.exists(base):
            os.listdir(base) # Force refresh
            plan_file = os.path.join(base, 'viral_clip_plan_new.json')
            if os.path.exists(plan_file):
                print(f"‚úÖ Found planning data at: {base}")
                return base, plan_file, os.path.join(base, 'final_shorts')

    default_base = "/content/drive/My Drive/AI_Transcripts"
    return default_base, os.path.join(default_base, 'viral_clip_plan_new.json'), os.path.join(default_base, 'final_shorts')

# Haar Cascade for Face Detection
HAAR_URL = "https://raw.githubusercontent.com/opencv/opencv/master/data/haarcascades/haarcascade_frontalface_default.xml"
HAAR_PATH = "/content/haarcascade_frontalface_default.xml"

# Video URL (Ideally read this from the JSON plan in future updates)
YOUTUBE_URL = "https://www.youtube.com/watch?v=HAnw168huqA"

In [40]:
# =================================================================
# 2. UTILITY: SEGMENT DOWNLOAD (Partial)
# =================================================================

def _parse_time_to_seconds(time_str):
    """Handles HH:MM:SS or MM:SS."""
    parts = list(map(int, time_str.split(':')))
    if len(parts) == 3:
        return parts[0] * 3600 + parts[1] * 60 + parts[2]
    return parts[0] * 60 + parts[1]

def download_segment(url, start_time_str, end_time_str, output_path, cid):
    """Downloads partial video segment using yt-dlp to save bandwidth."""
    if os.path.exists(output_path):
        return output_path

    # Apply padding to capture the 'essence' of the speech
    start_s = max(0, _parse_time_to_seconds(start_time_str) - TIME_PADDING)
    end_s = _parse_time_to_seconds(end_time_str) + TIME_PADDING

    print(f"‚¨áÔ∏è [Clip {cid}] Downloading segment: {start_s}s to {end_s}s...", flush=True)

    ydl_opts = {
        'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best',
        'outtmpl': output_path,
        'quiet': True,
        'download_ranges': lambda _, __: [{'start_time': start_s, 'end_time': end_s}],
        'force_keyframes_at_cuts': True,
    }

    try:
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            ydl.download([url])
        return output_path
    except Exception as e:
        print(f"üî¥ [Clip {cid}] Download error: {e}", flush=True)
        return None

In [41]:
# =================================================================
# 3. LIGHTWEIGHT FACE TRACKING
# =================================================================

def setup_face_detector():
    if not os.path.exists(HAAR_PATH):
        urllib.request.urlretrieve(HAAR_URL, HAAR_PATH)
    return cv2.CascadeClassifier(HAAR_PATH)

def detect_face_x_center(frame, face_cascade):
    """Detects face using OpenCV to avoid numpy/tensorflow conflicts."""
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    faces = face_cascade.detectMultiScale(gray, 1.3, 5)
    if len(faces) == 0: return None
    x, y, w, h = max(faces, key=lambda f: f[2] * f[3])
    return (x + (w / 2)) / frame.shape[1]

def vertical_crop_smart(clip, cid):
    """Crops to 9:16 using Median face position for stability."""
    print(f"ü§ñ [Clip {cid}] Tracking face for smart crop...", flush=True)
    face_cascade = setup_face_detector()
    face_x_positions = []

    # Sample every 0.5s for precise tracking
    duration = int(clip.duration)
    for t in np.arange(0, duration, 0.5):
        try:
            x_pos = detect_face_x_center(clip.get_frame(t), face_cascade)
            if x_pos: face_x_positions.append(x_pos)
        except: pass

    # MEDIAN filters out background noise or transient face detections
    avg_x = np.median(face_x_positions) if face_x_positions else 0.5

    w, h = clip.size
    new_width = h * (9/16)
    x1 = int((avg_x * w) - (new_width / 2))
    x1 = max(0, min(x1, w - int(new_width))) # Keep in frame

    cropped = crop(clip, x1=x1, y1=0, width=int(new_width), height=h)
    return cropped.resize(height=1920)


In [42]:
# =================================================================
# 4. CAPTIONING ENGINE (Hardware Aware)
# =================================================================

def generate_dynamic_captions(video_clip, clip_id, device):
    """Generates Whisper captions using local model."""
    print(f"üìù [Clip {clip_id}] Generating captions...", flush=True)
    audio_path = f"/content/temp_audio_{clip_id}.wav"
    video_clip.audio.write_audiofile(audio_path, logger=None)

    # FP16 is only for CUDA
    use_fp16 = (device == "cuda")
    model = whisper.load_model("base", device=device)

    result = model.transcribe(audio_path, word_timestamps=True, fp16=use_fp16)

    caption_clips = []
    for segment in result['segments']:
        for word in segment.get('words', []):
            txt = word['word'].strip().upper()
            start, end = word['start'], word['end']
            if end - start < 0.05: continue

            # Karaoke-style yellow captions
            c = (TextClip(txt, fontsize=95, color='yellow', font='Arial-Bold', stroke_color='black', stroke_width=2)
                 .set_position(('center', 0.8), relative=True)
                 .set_start(start)
                 .set_duration(end - start))
            caption_clips.append(c)

    if os.path.exists(audio_path): os.remove(audio_path)
    return caption_clips

In [43]:
# =================================================================
# 5. WORKER LOGIC
# =================================================================

def process_single_clip(clip_data, output_folder, device):
    """Full processing pipeline for a single segment."""
    cid = clip_data['clip_id']
    hook = clip_data['viral_hook']
    temp_path = f"/content/raw_{cid}.mp4"
    clean_name = re.sub(r'[^A-Za-z0-9]', '', hook[:15])
    final_path = os.path.join(output_folder, f"Short_{cid}_{clean_name}.mp4")

    if not download_segment(YOUTUBE_URL, clip_data['start_time'], clip_data['end_time'], temp_path, cid):
        return f"Clip {cid} failed at download."

    try:
        with VideoFileClip(temp_path) as raw:
            # Reframe
            vertical = vertical_crop_smart(raw, cid)
            # Transcribe & Caption
            captions = generate_dynamic_captions(vertical, cid, device)
            # Composite
            final = CompositeVideoClip([vertical] + captions)

            print(f"üíæ [Clip {cid}] Rendering final file...", flush=True)
            # Use 'ultrafast' for speed, multi-threading enabled
            final.write_videofile(final_path, codec='libx264', audio_codec='aac',
                                 fps=24, preset='ultrafast', threads=4, logger=None)

        return f"‚úÖ Clip {cid} saved to Drive."
    except Exception as e:
        return f"üî¥ Error in clip {cid}: {e}"
    finally:
        if os.path.exists(temp_path): os.remove(temp_path)

In [44]:
# =================================================================
# 6. MAIN RUNNER (Auto-Scaling)
# =================================================================

def run_editor_agent():
    # 1. Mount Drive
    if not os.path.exists("/content/drive"):
        print("Mounting Google Drive...")
        drive.mount('/content/drive')

    # 2. Hardware Detection
    gpu_check = subprocess.run("nvidia-smi", shell=True, capture_output=True)
    if gpu_check.returncode == 0:
        device = "cuda"
        num_workers = 4 # Parallel workers for GPU
        print("üöÄ CUDA device detected! Scaling up to 4 parallel workers.")
    else:
        device = "cpu"
        num_workers = 2 # Safely fallback to 2 workers for CPU
        print("üê¢ No GPU found. Falling back to 2 workers on CPU.")

    # 3. Path Discovery
    TRANS_DIR, PLAN_FILE, OUT_DIR = get_robust_paths()
    os.makedirs(OUT_DIR, exist_ok=True)

    if not os.path.exists(PLAN_FILE):
        print(f"üî¥ Error: Planning file not found at {PLAN_FILE}")
        return

    with open(PLAN_FILE, 'r') as f:
        plan = json.load(f)

    clips = plan.get('viral_clips', [])
    print(f"üé¨ Scheduled {len(clips)} clips for processing. Please wait for logs below...")

    # 4. Parallel Execution with Future Tracking for better visibility
    with ThreadPoolExecutor(max_workers=num_workers) as executor:
        # Create a dictionary to map futures to clip IDs for tracking
        future_to_clip = {executor.submit(process_single_clip, clip, OUT_DIR, device): clip['clip_id'] for clip in clips}

        for future in as_completed(future_to_clip):
            clip_id = future_to_clip[future]
            try:
                data = future.result()
                print(f"üèÅ Update: {data}", flush=True)
            except Exception as exc:
                print(f"üî¥ Clip {clip_id} generated an exception: {exc}", flush=True)

if __name__ == "__main__":
    run_editor_agent()

üöÄ CUDA device detected! Scaling up to 4 parallel workers.
‚úÖ Found planning data at: /content/drive/My Drive/AI_Transcripts
üé¨ Scheduled 5 clips for processing. Please wait for logs below...
‚¨áÔ∏è [Clip 1] Downloading segment: 1558.5s to 1651.5s...
‚¨áÔ∏è [Clip 2] Downloading segment: 418.5s to 476.5s...
ü§ñ [Clip 3] Tracking face for smart crop...
ü§ñ [Clip 4] Tracking face for smart crop...




‚úÖ Success: /content/drive/My Drive/AI_Transcripts/final_shorts/Short_4_StopsayingNo.mp4
üé¨ Starting Clip 5...
‚úÖ Success: /content/drive/My Drive/AI_Transcripts/final_shorts/Short_3_Didyoumissth.mp4





‚¨áÔ∏è [Clip 5] Downloading segment: 2338.5s to 2401.5s...
üèÅ Update: üî¥ Error in clip 4: MoviePy error: failed to read the first frame of video file /content/raw_4.mp4. That might mean that the file is corrupted. That may also mean that you are using a deprecated version of FFMPEG. On Ubuntu/Debian for instance the version in the repos is deprecated. Please update to a recent version from the website.





üèÅ Update: üî¥ Error in clip 3: MoviePy error: failed to read the first frame of video file /content/raw_3.mp4. That might mean that the file is corrupted. That may also mean that you are using a deprecated version of FFMPEG. On Ubuntu/Debian for instance the version in the repos is deprecated. Please update to a recent version from the website.
ü§ñ [Clip 2] Tracking face for smart crop...
üìù [Clip 2] Generating captions...
üíæ [Clip 2] Rendering final file...
ü§ñ [Clip 5] Tracking face for smart crop...
üèÅ Update: ‚úÖ Clip 2 saved to Drive.
üìù [Clip 5] Generating captions...
üíæ [Clip 5] Rendering final file...
ü§ñ [Clip 1] Tracking face for smart crop...
üèÅ Update: ‚úÖ Clip 5 saved to Drive.
üìù [Clip 1] Generating captions...
üíæ [Clip 1] Rendering final file...
üèÅ Update: ‚úÖ Clip 1 saved to Drive.
