In [None]:
#1. Install & Import Dependencies
# Colab cell 1: Install necessary packages
!pip install yt-dlp
!pip install scenedetect
!pip install openai                 # needed by OpenRouter if using OpenAI client
!pip install aiofiles fastapi uvicorn python-multipart  # for optional local hosting
!pip install huggingface_hub        # HuggingFace Inference client
!pip install requests               # simpler HTTP calls
!pip install ffmpeg-python          # wrapper around ffmpeg
!pip install faster-whisper         # CPU Whisper alternative

Why these?

yt-dlp for downloading YouTube shorts.

ffmpeg-python (and native ffmpeg) to extract frames/audio.

scenedetect for optional scene detection.

huggingface_hub to call BLIP-2 & Whisper Inference endpoints.

faster_whisper if you want purely local CPU transcription.

requests to call OpenRouter LLMs.


In [None]:
#1. Install & Import Dependencies
# Colab cell 2: Import libraries
import os, shutil, json, base64, uuid
import yt_dlp
import ffmpeg                       # ffmpeg-python
import requests
import subprocess
from scenedetect import VideoManager, SceneManager
from scenedetect.detectors import ContentDetector
from huggingface_hub import InferenceApi
from faster_whisper import WhisperModel


In [None]:
!pip install yt-dlp

In [None]:
import yt_dlp
import os, shutil, json, base64, uuid

In [None]:
#2. Set Up API Keys & Folders
# Colab cell 3: Environment variables (replace with your own)
HF_API_KEY = "" #add your api
OPENROUTER_API_KEY = "" #add your api

# For Colab, you can securely store keys via:
#   from google.colab import auth
#   auth.authenticate_user()
#   os.environ["HF_API_KEY"] = HF_API_KEY
#   os.environ["OPENROUTER_API_KEY"] = OPENROUTER_API_KEY

# Create working directories (clear if they exist)
ROOT = "/content/video_pipeline"
VIDEOS_DIR = f"{ROOT}/videos"
FRAMES_DIR = f"{ROOT}/frames"
OUTPUT_DIR = f"{ROOT}/output"

for d in [VIDEOS_DIR, FRAMES_DIR, OUTPUT_DIR]:
    if os.path.exists(d):
        shutil.rmtree(d)
    os.makedirs(d, exist_ok=True)


Purpose:

VIDEOS_DIR holds downloaded or uploaded videos.

FRAMES_DIR holds extracted frames or scene snapshots.

OUTPUT_DIR holds transcripts, captions, analysis JSONs.

In [None]:
# 3. Ingest Video (YouTube URL or Local File)
# Colab cell 4: Function to download video via yt-dlp
def download_video(url: str) -> str:
    """
    Download a YouTube URL (short or regular) into VIDEOS_DIR.
    Returns local path to the downloaded video.
    """
    video_id = str(uuid.uuid4())[:8]
    out_path = f"{VIDEOS_DIR}/{video_id}.mp4"
    # yt-dlp prefers no "-f best" to auto‐select best combined format:
    ytdl_opts = {
        "format": "bestvideo[ext=mp4]+bestaudio[ext=m4a]/mp4",
        "outtmpl": out_path,
        "quiet": True
    }
    with yt_dlp.YoutubeDL(ytdl_opts) as ydl:
        ydl.download([url])
    return out_path

# Example usage:
video_path = download_video("https://youtu.be/F_Wp-rYKix4?si=er3dNva6sHESDhxi")


In [None]:
# Colab cell 5: Alternatively, if you have a local file:
def copy_local_video(local_path: str) -> str:
    """
    Copy an existing local video file into VIDEOS_DIR and return new path.
    """
    base = os.path.basename(local_path)
    dest = f"{VIDEOS_DIR}/{str(uuid.uuid4())}_{base}"
    shutil.copy(local_path, dest)
    return dest

# Example usage:
# video_path = copy_local_video("/content/1.mp4")


Note: If yt-dlp fails (some Shorts restrictions), let the user supply a local file.

In [None]:
# 4. Scene/Frame Extraction
# Option A: Fixed‐Interval Frames
# Colab cell 6A: Extract fixed-interval frames (e.g. every N seconds)
def extract_frames_interval(video_path: str, interval: float = 2.0) -> list[str]:
    """
    Extracts frames every 'interval' seconds from video_path into FRAMES_DIR.
    Returns list of frame file paths.
    """
    # Create a unique subfolder for this video’s frames
    vid_id = os.path.splitext(os.path.basename(video_path))[0]
    folder = f"{FRAMES_DIR}/{vid_id}"
    os.makedirs(folder, exist_ok=True)

    # Use ffmpeg to extract one frame every 'interval' seconds:
    (
        ffmpeg
        .input(video_path)
        .filter('fps', fps=f'1/{interval}')
        .output(f"{folder}/frame_%04d.jpg", start_number=1)
        .overwrite_output()
        .run(quiet=True)
    )
    # List sorted frame paths:
    frames = sorted([f"{folder}/{f}" for f in os.listdir(folder) if f.endswith(".jpg")])
    return frames

# Example:
# frames = extract_frames_interval(video_path, interval=2)


In [None]:
# # Option B: Scene‐Change Detection (Optional)
# # Colab cell 6B: Extract keyframes by scene detection
# from scenedetect import VideoManager, SceneManager
# from scenedetect.detectors import ContentDetector

# def extract_frames_scenes(video_path: str) -> list[str]:
#     """
#     Uses PySceneDetect to find scene changes and save the first frame of each scene.
#     Returns list of saved frame file paths.
#     """
#     vid_manager = VideoManager([video_path])
#     scene_manager = SceneManager()
#     scene_manager.add_detector(ContentDetector(threshold=30.0))  # adjust threshold

#     vid_manager.start()
#     scene_manager.detect_scenes(frame_source=vid_manager)
#     scene_list = scene_manager.get_scene_list()
#     vid_manager.release()

#     frame_paths = []
#     vid_filename = os.path.splitext(os.path.basename(video_path))[0]
#     out_folder = f"{FRAMES_DIR}/{vid_filename}"
#     os.makedirs(out_folder, exist_ok=True)

#     for idx, (start, end) in enumerate(scene_list):
#         cap = ffmpeg.input(video_path)
#         time = start.get_seconds()
#         # Seek to start and extract one frame
#         out_file = f"{out_folder}/scene_{idx+1:03d}.jpg"
#         (
#             ffmpeg
#             .input(video_path, ss=time)
#             .filter('scale', -1, 360)  # resize to height 360 px if you like
#             .output(out_file, vframes=1)
#             .overwrite_output()
#             .run(quiet=True)
#         )
#         frame_paths.append(out_file)
#     return frame_paths

# # Example:
# # frames = extract_frames_scenes(video_path)

# Option B: Scene‐Change Detection (Updated)
# Colab cell 6B: Extract keyframes by scene detection (Modern PySceneDetect API)

from scenedetect import open_video, SceneManager
from scenedetect.detectors import ContentDetector
import ffmpeg
import os

# Replace this with your actual frames directory constant
FRAMES_DIR = f"{ROOT}/frames"

def extract_frames_scenes(video_path: str) -> list[str]:
    """
    Uses PySceneDetect to find scene changes and save the first frame of each scene.
    Returns list of saved frame file paths.
    """
    # Open the video using the modern PySceneDetect interface
    video = open_video(video_path)
    scene_manager = SceneManager()
    scene_manager.add_detector(ContentDetector(threshold=30.0))  # Adjust threshold as needed

    # Run scene detection
    scene_manager.detect_scenes(video)
    scene_list = scene_manager.get_scene_list()

    # Close the video
    video.close()

    # Prepare output directory
    vid_filename = os.path.splitext(os.path.basename(video_path))[0]
    out_folder = os.path.join(FRAMES_DIR, vid_filename)
    os.makedirs(out_folder, exist_ok=True)

    # Extract one frame at the start of each detected scene
    frame_paths = []
    timestamps = []
    for idx, (start, end) in enumerate(scene_list):
        time = start.get_seconds()
        out_file = f"{out_folder}/scene_{idx+1:03d}.jpg"
        (
            ffmpeg
            .input(video_path, ss=time)
            .filter('scale', -1, 360)  # Resize height to 360px while keeping aspect ratio
            .output(out_file, vframes=1)
            .overwrite_output()
            .run(quiet=True)
        )
        frame_paths.append(out_file)
        timestamps.append(time)

    return frame_paths, timestamps

# # Example usage:
# # frames = extract_frames_scenes(video_path)

# Option B: Scene‐Change Detection (Fixed)
# from scenedetect import open_video, SceneManager
# from scenedetect.detectors import ContentDetector
# import ffmpeg
# import os

# FRAMES_DIR = "frames"  # Update if needed

# def extract_frames_scenes(video_path: str) -> list[str]:
#     """
#     Uses PySceneDetect to find scene changes and save the first frame of each scene.
#     Returns list of saved frame file paths.
#     """
#     # Open video
#     video = open_video(video_path)
#     scene_manager = SceneManager()
#     scene_manager.add_detector(ContentDetector(threshold=30.0))  # Adjust as needed

#     # Run scene detection
#     scene_manager.detect_scenes(video)
#     scene_list = scene_manager.get_scene_list()

#     # video.close()  # ❌ Not needed (and not supported in newer PySceneDetect)

#     # Prepare output directory
#     vid_filename = os.path.splitext(os.path.basename(video_path))[0]
#     out_folder = os.path.join(FRAMES_DIR, vid_filename)
#     os.makedirs(out_folder, exist_ok=True)

#     # Extract one frame at start of each scene
#     frame_paths = []
#     for idx, (start, end) in enumerate(scene_list):
#         time = start.get_seconds()
#         out_file = f"{out_folder}/scene_{idx+1:03d}.jpg"
#         (
#             ffmpeg
#             .input(video_path, ss=time)
#             .filter('scale', -1, 360)
#             .output(out_file, vframes=1)
#             .overwrite_output()
#             .run(quiet=True)
#         )
#         frame_paths.append(out_file)

#     return frame_paths


Which to choose?

If you want uniform spacing: use 6A.

If you want semantic breaks: use 6B.

In [None]:
# 5. (Optional) Extract & Transcribe Audio
# A. Extract Audio with ffmpeg
# Colab cell 7A: Extract audio track as WAV (16 kHz mono)
def extract_audio(video_path: str) -> str:
    """
    Extracts audio track (mono, 16 kHz) to WAV file and returns path.
    """
    vid_id = os.path.splitext(os.path.basename(video_path))[0]
    audio_path = f"{VIDEOS_DIR}/{vid_id}.wav"
    (
        ffmpeg
        .input(video_path)
        .output(audio_path, acodec='pcm_s16le', ac=1, ar='16000')
        .overwrite_output()
        .run(quiet=True)
    )
    return audio_path

# Example:
# audio_path = extract_audio(video_path)


In [None]:
# B. Transcribe with faster-whisper (Local, CPU)
# Colab cell 7B: Transcribe with faster-whisper
def transcribe_audio_local(audio_path: str) -> str:
    """
    Uses faster-whisper to transcribe audio locally (CPU).
    Returns the full transcript as text.
    """
    model = WhisperModel("small", device="cuda")  # 'small' is fast; can choose 'tiny' for speed
    segments, info = model.transcribe(audio_path, beam_size=5)
    transcript = " ".join([seg.text for seg in segments])
    return transcript

# Example:
# transcript = transcribe_audio_local(audio_path)


Note: On Colab, you could use device="cuda", but on your HP 15s, use cpu.

Which to pick?

Local Whisper is CPU‐heavy but free.

HF Inference is faster for short audio (free tier/N credits/day).

In [None]:
# C. Transcribe with HuggingFace Inference (API)
# Colab cell 7C: Transcribe using HuggingFace Whisper via API
def transcribe_audio_hf(audio_path: str) -> str:
    """
    Sends raw WAV audio to HuggingFace Whisper-large API (free tier).
    Returns transcript text.
    """
    hf = InferenceApi(repo_id="openai/whisper-large", token=HF_API_KEY)
    # Read the WAV file as bytes
    with open(audio_path, "rb") as f:
        data = f.read()
    result = hf(inputs=data)
    # API response: { "text": "..." }
    return result.get("text", "")

# Example:
# transcript = transcribe_audio_hf(audio_path)


In [None]:
# # 6. Caption Frames (BLIP-2 / Image2Prompt)
# # A. BLIP-2 via HuggingFace Inference
# # Colab cell 8A: Caption single image using BLIP-2 HF API
# def caption_image_blip(frame_path: str) -> str:
#     """
#     Sends a frame JPEG to HuggingFace BLIP-2 for captioning.
#     Returns the generated caption text.
#     """
#     hf = InferenceApi(repo_id="Salesforce/blip2-opt-2.7b", token=HF_API_KEY)
#     with open(frame_path, "rb") as f:
#         img_bytes = f.read()
#     response = hf(inputs=img_bytes)
#     # response example: ["a cat sitting on a sofa..."]
#     return response[0] if isinstance(response, list) else str(response)

# # Batch caption all frames:
# def batch_caption(frames: list[str]) -> dict[int,str]:
#     captions = {}
#     for idx, fp in enumerate(frames):
#         cap = caption_image_blip(fp)
#         captions[idx] = cap
#     return captions

# # Example:
# # captions = batch_caption(frames)
# 🔍 6. Caption Frames (BLIP-2 / Image2Prompt)
# A. BLIP-2 via HuggingFace Inference (Fixed with direct request)

# import requests
# import json

# def caption_image_blip(frame_path: str, hf_token: str) -> str:
#     """
#     Sends a frame JPEG to HuggingFace BLIP-2 for captioning via direct requests.
#     Returns the generated caption text.
#     Requires HF_API_KEY to be passed.
#     """
#     api_url = "https://api-inference.huggingface.co/models/Salesforce/blip2-opt-2.7b"
#     headers = {
#         "Authorization": f"Bearer {hf_token}"
#     }

#     with open(frame_path, "rb") as f:
#         files = {
#             "file": (frame_path, f, "image/jpeg")
#         }
#         response = requests.post(api_url, headers=headers, files=files)

#     if response.status_code != 200:
#         print(f"HF API error {response.status_code}: {response.text}")
#         try:
#             error_data = response.json()
#             raise Exception(f"HF API error {response.status_code}: {error_data.get('error', response.text)}")
#         except json.JSONDecodeError:
#             raise Exception(f"HF API error {response.status_code}: Could not decode response as JSON. Response text: {response.text}")

#     result = response.json()
#     if isinstance(result, list) and result:
#         return result[0]
#     elif isinstance(result, dict) and "error" in result:
#         raise Exception(f"HF API returned error: {result['error']}")
#     else:
#         return str(result)

def caption_image_blip(frame_path: str, hf_token: str) -> str:
    """
    Uses Hugging Face Inference API to caption an image via BLIP-2.
    """
    url = "https://api-inference.huggingface.co/models/Salesforce/blip2-opt-2.7b"
    headers = {
        "Authorization": f"Bearer {hf_token}"
    }

    with open(frame_path, "rb") as f:
        image_bytes = f.read()

    response = requests.post(url, headers=headers, data=image_bytes)

    if response.status_code != 200:
        print(f"[HF API Error] {frame_path} -> {response.status_code}: {response.text}")
        return "Caption generation failed"

    try:
        result = response.json()
        if isinstance(result, dict) and "generated_text" in result:
            return result["generated_text"]
        elif isinstance(result, str):
            return result
        else:
            return json.dumps(result)
    except json.JSONDecodeError:
        print(f"[JSON Error] Could not decode response: {response.text}")
        return "Caption generation failed"


# 🚀 Batch caption all frames
# def batch_caption(frames: list[str], hf_token: str) -> dict[int, str]:
#     """
#     Batch captions frames using the updated caption_image_blip function.
#     """
#     captions = {}
#     for idx, fp in enumerate(frames):
#         try:
#             cap = caption_image_blip(fp, hf_token)
#             captions[idx] = cap
#         except Exception as e:
#             print(f"Error captioning frame {idx}: {e}")
#             captions[idx] = "Captioning failed"
#     return captions

# # ✅ Example usage:
# # captions = batch_caption(frames, HF_API_KEY)

# def batch_caption(frames: list[str], hf_token: str) -> dict[int, str]:
#     captions = {}
#     for idx, frame_path in enumerate(frames):
#         blip_caption = caption_image_blip(frame_path, hf_token)
#         if blip_caption == "Caption generation failed":
#             # Try CLIP Interrogator as fallback
#             clip_caption = image_to_prompt_clip(frame_path)
#             captions[idx] = clip_caption
#         else:
#             captions[idx] = blip_caption
#     return captions

def batch_caption(frames: list[str], hf_token: str) -> dict[int, str]:
    """
    Captions images using BLIP-2 (via HF API), with fallback to CLIP Interrogator.
    Returns dict mapping frame index to caption text.
    """
    captions = {}
    for idx, frame_path in enumerate(frames):
        if not os.path.exists(frame_path):
            print(f"[Missing Frame] Skipping: {frame_path}")
            captions[idx] = "Frame missing"
            continue

        try:
            blip_caption = caption_image_blip(frame_path, hf_token)
        except Exception as e:
            print(f"[BLIP Error] {frame_path}: {e}")
            blip_caption = "Caption generation failed"

        if blip_caption == "Caption generation failed":
            try:
                clip_caption = image_to_prompt_clip(frame_path)
            except Exception as e:
                print(f"[CLIP Fallback Error] {frame_path}: {e}")
                clip_caption = "Caption generation failed"
            captions[idx] = clip_caption
        else:
            captions[idx] = blip_caption

    return captions






Tip: BLIP-2 outputs a short descriptive caption, which you’ll later refine.

B. Image→Prompt via CLIP Interrogator (Local, GPU/CPU)
If you want a more precise SD-style prompt rather than just a caption, use a local CLIP Interrogator (requires GPU for speed, but CPU can work slowly). On Colab with GPU:

In [None]:
# Colab cell 8B: Install CLIP Interrogator (only on GPU-enabled notebook)
!pip install git+https://github.com/pharmapsychotic/clip-interrogator.git


In [None]:
# # Colab cell 8C: Use CLIP Interrogator
# from clip_interrogator import Config, Interrogator

# # Set up once
# ci = Interrogator(Config(clip_model_name="ViT-L-14/openai"))

# def image_to_prompt_clip(frame_path: str) -> str:
#     """
#     Reverse-engineer an SD-like prompt from an image frame using CLIP Interrogator.
#     """
#     prompt = ci.interrogate(frame_path)
#     return prompt

# # Example:
# # refined_prompt = image_to_prompt_clip(frames[0])

from clip_interrogator import Config, Interrogator

# Set up globally once
ci = Interrogator(Config(clip_model_name="ViT-L-14/openai"))

# def image_to_prompt_clip(frame_path: str) -> str:
#     """
#     Reverse-engineer an SD-like prompt from an image frame using CLIP Interrogator.
#     If it fails, logs and returns fallback.
#     """
#     try:
#         prompt = ci.interrogate(frame_path)
#         return prompt
#     except Exception as e:
#         print(f"[CLIP Error] Failed on {frame_path}: {e}")
#         return "Caption generation failed"

def image_to_prompt_clip(frame_path: str) -> str:
    """
    Generate a descriptive prompt using CLIP Interrogator.
    """
    try:
        if not os.path.exists(frame_path):
            print(f"[CLIP Error] File not found: {frame_path}")
            return "File missing"

        return ci.interrogate(frame_path)

    except Exception as e:
        print(f"[CLIP Error] {frame_path}: {e}")
        return "Caption generation failed"



# from clip_interrogator import Config, Interrogator

# ci = Interrogator(Config(clip_model_name="ViT-B-16"))  # adjust as needed

# def image_to_prompt_clip(frame_path: str) -> str:
#     try:
#         return ci.interrogate(frame_path)
#     except Exception as e:
#         print(f"[CLIP Error] on {frame_path}: {e}")
#         return "Caption generation failed"



When to use?

If you need exactly SD-style prompts (e.g., “ultra-detailed, cinematic lighting, 4k…”).

If your HP 15s has no GPU, skip CLIP and stay with BLIP-2 captions or HF Image2Prompt.

7. Scene / Frame-by-Frame LLM Analysis
Now that you have either:

captions (BLIP-2)

refined prompts (CLIP Interrogator)

And optionally transcript, you can call an LLM to produce:

Final refined prompt

Scene narration summary (if audio)

Scene meaning/storyline

Optional style/tone

In [None]:
# # A. Crafting the LLM Prompt
# # Colab cell 9A: Create a prompt for each scene/frame
# def create_llm_prompt(
#     scene_index: int,
#     raw_caption: str,
#     refined_prompt: str,
#     transcript_segment: str,
#     user_context: str = ""
# ) -> str:
#     """
#     Returns a single string prompt to send to the LLM for one scene/frame.
#     """
#     return f"""
# You are a world-class AI content strategist and video analyst.

# User Context: {user_context or 'None'}

# Scene #{scene_index + 1}:
# - Raw Caption: "{raw_caption}"
# - Refined Prompt (image2prompt style): "{refined_prompt}"
# - Transcript (audio) snippet: "{transcript_segment or 'No audio'}"

# Please return (as JSON):
# 1. caption: A polished description of the scene.
# 2. prompt: The most accurate, detailed Stable-Diffusion style prompt for generating a similar image.
# 3. narration: A short voiceover or dialogue based on the transcript/snippet.
# 4. meaning: What this scene represents, its significance in the overall story.
# 5. scene_type: One of ["action","static","transition","dialogue"].

# Do NOT return any extra text. Only return valid JSON.
# """
# ## For English video
# def create_llm_prompt(
#     scene_index: int,
#     raw_caption: str,
#     refined_prompt: str,
#     transcript_segment: str,
#     user_context: str = ""
# ) -> str:
#     return f"""
# You are a world-class AI content strategist and video analyst.

# User Context: {user_context or 'None'}

# Scene #{scene_index + 1}:
# - Raw Caption: "{raw_caption}"
# - Refined Prompt (image2prompt style): "{refined_prompt}"
# - Transcript (audio) snippet: "{transcript_segment or 'No audio'}"

# Please return ONLY valid JSON like this:
# {{
#   "caption": "...",
#   "prompt": "...",
#   "narration": "...",
#   "meaning": "...",
#   "scene_type": "static"  // one of ["action","static","transition","dialogue"]
# }}

# DO NOT return any explanation or text outside the JSON.
# """
## for hindi video
def create_llm_prompt(
    scene_index: int,
    raw_caption: str,
    refined_prompt: str,
    transcript_segment: str,
    user_context: str = ""
) -> str:
    return f"""
You are a world-class AI content strategist and video analyst.

User Context: {user_context or 'None'}

Scene #{scene_index + 1}:
- Raw Caption: "{raw_caption}"
- Refined Prompt (image2prompt style): "{refined_prompt}"
- Transcript (audio) snippet (may be in Hindi): "{transcript_segment or 'No audio'}"

Your task is to analyze the scene. If the transcript is in Hindi, translate it to English.
Return all fields below ONLY in English.

Please return ONLY valid JSON like this:
{{
  "caption": "...",
  "prompt": "...",
  "narration": "...",
  "meaning": "...",
  "scene_type": "static"  // one of ["action","static","transition","dialogue"]
}}

DO NOT return any explanation or text outside the JSON.
"""


In [None]:
# # B. Call OpenRouter (Free GPT-3.5)
# # Colab cell 9B: Send prompt to OpenRouter
# def call_openrouter_llm(system_prompt: str, user_prompt: str) -> dict:
#     """
#     Calls OpenRouter’s GPT-3.5-turbo (free tier) to process the combined prompt.
#     Returns the parsed JSON result.
#     """
#     url = "https://openrouter.ai/api/v1/chat/completions"
#     headers = {
#         "Authorization": f"Bearer {OPENROUTER_API_KEY}",
#         "Content-Type": "application/json"
#     }
#     payload = {
#         "model": "openai/gpt-3.5-turbo",
#         "messages": [
#             {"role": "system", "content": system_prompt},
#             {"role": "user", "content": user_prompt}
#         ]
#     }
#     resp = requests.post(url, headers=headers, json=payload)
#     if resp.status_code != 200:
#         raise Exception(f"OpenRouter error {resp.status_code}: {resp.text}")
#     data = resp.json()
#     # data["choices"][0]["message"]["content"] is the JSON string
#     content = data["choices"][0]["message"]["content"]
#     return json.loads(content)

# # Example usage for one scene:
# # sys_msg = "You are a world-class AI content strategist…"
# # user_msg = create_llm_prompt(0, captions[0], refined_prompts[0], transcript_snippets[0], "My context")
# # analysis = call_openrouter_llm(sys_msg, user_msg)
# # → analysis is a dict with keys: caption, prompt, narration, meaning, scene_type

# Colab cell 9B: Local LLM using Mistral in transformers (no API or credit needed)
# !pip install -q transformers accelerate bitsandbytes

# from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
# import torch

# # Load Mistral-7B-Instruct model (quantized for Colab GPU)
# model_id = "mistralai/Mistral-7B-Instruct-v0.1"
# tokenizer = AutoTokenizer.from_pretrained(model_id)
# model = AutoModelForCausalLM.from_pretrained(
#     model_id,
#     device_map="auto",
#     torch_dtype=torch.float16,
#     load_in_4bit=True
# )
# generator = pipeline("text-generation", model=model, tokenizer=tokenizer)

# # Updated local LLM call function (replaces call_openrouter_llm)
# def call_local_llm(system_prompt: str, user_prompt: str) -> dict:
#     """
#     Calls a local open-source LLM (e.g., Mistral) for processing.
#     Returns a parsed JSON dict with the structured scene analysis.
#     """
#     prompt = f"{system_prompt.strip()}\n\n{user_prompt.strip()}"
#     output = generator(prompt, max_new_tokens=512, do_sample=True, temperature=0.7)[0]["generated_text"]

#     # Try to extract JSON substring
#     try:
#         json_str = output[output.index("{"):output.rindex("}") + 1]
#         return json.loads(json_str)
#     except Exception:
#         print("⚠️ Could not parse JSON. Returning raw output.")
#         return {"response": output}


# from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
# import torch
# from huggingface_hub import login

# # Paste your Hugging Face token here
# login("#add your api")
# # ✅ Use an open-access variant
# model_id = "mistralai/Mistral-7B-v0.1"  # No access gate
# tokenizer = AutoTokenizer.from_pretrained(model_id)
# model = AutoModelForCausalLM.from_pretrained(
#     model_id,
#     device_map="auto",
#     torch_dtype=torch.float16,
#     load_in_4bit=True
# )
# generator = pipeline("text-generation", model=model, tokenizer=tokenizer)

# def call_local_llm(system_prompt: str, user_prompt: str) -> dict:
#     prompt = f"{system_prompt.strip()}\n\n{user_prompt.strip()}"
#     output = generator(prompt, max_new_tokens=512, do_sample=True, temperature=0.7)[0]["generated_text"]
#     try:
#         json_str = output[output.index("{"):output.rindex("}") + 1]
#         return json.loads(json_str)
#     except Exception:
#         print("⚠️ Could not parse JSON. Returning raw output.")
#         return {"response": output}

# !pip install -U bitsandbytes transformers accelerate

# !pip uninstall -y bitsandbytes
# !pip install --no-cache-dir --force-reinstall bitsandbytes==0.41.1
# !pip install --upgrade transformers accelerate

# Install required packages
# Install required packages

#latest

# !pip install -U transformers accelerate

# from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
# from huggingface_hub import login
# import torch
# import json

# # Hugging Face login (replace with your token)
# login("#add your api")

# # Load model
# model_id = "microsoft/phi-2"

# # Load tokenizer and model in float16 for GPU
# tokenizer = AutoTokenizer.from_pretrained(model_id)
# model = AutoModelForCausalLM.from_pretrained(
#     model_id,
#     device_map="auto",
#     torch_dtype=torch.float16
# )

# # Create text generation pipeline
# generator = pipeline("text-generation", model=model, tokenizer=tokenizer)

# # Define generation function
# def call_local_llm(system_prompt: str, user_prompt: str) -> dict:
#     prompt = f"{system_prompt.strip()}\n\n{user_prompt.strip()}"
#     output = generator(prompt, max_new_tokens=512, do_sample=True, temperature=0.7)[0]["generated_text"]
#     try:
#         json_str = output[output.index("{"):output.rindex("}") + 1]
#         return json.loads(json_str)
#     except Exception:
#         print("⚠️ Could not parse JSON. Returning raw output.")
#         return {"response": output}


In [None]:
# 1. Install and Login
!pip install -U transformers accelerate bitsandbytes
from huggingface_hub import login
login("")  # Use your actual HF token

# 2. Import and load Mistral-7B-Instruct or Zephyr
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
import torch
import json

model_id = "mistralai/Mistral-7B-Instruct-v0.2"  # or "HuggingFaceH4/zephyr-7b-beta"

tokenizer = AutoTokenizer.from_pretrained(model_id)
model = AutoModelForCausalLM.from_pretrained(
    model_id,
    device_map="auto",
    torch_dtype=torch.float16,
    load_in_8bit=True  # Optional: saves VRAM
)

generator = pipeline("text-generation", model=model, tokenizer=tokenizer)

# 3. JSON-capable generation wrapper
def call_local_llm(system_prompt: str, user_prompt: str) -> dict:
    full_prompt = f"<s>[INST] <<SYS>>\n{system_prompt.strip()}\n<</SYS>>\n\n{user_prompt.strip()} [/INST]"
    result = generator(full_prompt, max_new_tokens=512, temperature=0.7)[0]["generated_text"]

    try:
        json_str = result[result.index("{"):result.rindex("}") + 1]
        return json.loads(json_str)
    except Exception:
        print("⚠️ Could not parse JSON. Returning raw output.")
        return {"response": result}


Why OpenRouter?

They offer a free GPT-3.5 endpoint without cost.

Enough for a few dozen scenes per day.

In [None]:
# Colab cell 10: Main pipeline function
def process_video_pipeline(
    source: str,          # either a YouTube URL or local file path
    use_audio: bool = True,
    frame_interval: int = 2,
    scene_mode: str = "interval",  # "interval" or "scene_detect"
    user_context: str = ""
) -> dict:
    """
    End-to-end pipeline. Returns a structured dict with all scene data.
    """
    # 1. Ingest video
    if source.startswith("http"):
        video_path = download_video(source)
    else:
        video_path = copy_local_video(source)

    # 2. Extract audio if requested
    transcript = ""
    if use_audio:
        audio_path = extract_audio(video_path)
        # You can choose local or HF transcription:
        transcript = transcribe_audio_local(audio_path)  # or transcribe_audio_hf(audio_path)

    # 3. Extract frames
    if scene_mode == "scene_detect":
        frames, timestamps = extract_frames_scenes(video_path)
    else:
        frames = extract_frames_interval(video_path, interval=frame_interval)
        timestamps = [idx * frame_interval for idx in range(len(frames))]
    # 4. Caption frames
    captions = batch_caption(frames, HF_API_KEY)  # uses BLIP-2 via HF Inference API

    # 5. (Optional) Refine prompts via CLIP Interrogator if GPU available:
    # try:
    #     refined_prompts = {
    #         idx: image_to_prompt_clip(fp) for idx, fp in enumerate(frames)
    #     }
    # except Exception:
    #     refined_prompts = captions  # fallback: use raw captions as prompts
    refined_prompts = {}
    for idx, fp in enumerate(frames):
        refined = image_to_prompt_clip(fp)
        if refined == "Caption generation failed":
            refined = captions.get(idx, "No caption available")
        refined_prompts[idx] = refined


    # 6. Split transcript into segments aligned to frames (approximate)
    transcript_snippets = []
    if use_audio:
        words = transcript.split()
        words_per_frame = max(1, len(words) // len(frames))
        for i in range(len(frames)):
            start = i * words_per_frame
            end = (i + 1) * words_per_frame
            snippet = " ".join(words[start:end])
            transcript_snippets.append(snippet)
    else:
        transcript_snippets = [""] * len(frames)

    # 7. LLM analysis using local model
    # scenes = []
    # for idx, fp in enumerate(frames):
    #     raw_cap = captions[idx]
    #     ref_prompt = refined_prompts[idx]
    #     snippet = transcript_snippets[idx]
    #     user_msg = create_llm_prompt(idx, raw_cap, ref_prompt, snippet, user_context)
    #     system_msg = "You are a top‐tier AI content strategist..."
    #     result = call_local_llm(system_msg, user_msg)
    #     scenes.append({
    #         "scene_id": idx + 1,
    #         "timestamp": idx * frame_interval,
    #         "frame_path": fp,
    #         "raw_caption": raw_cap,
    #         "refined_prompt": result.get("prompt", ref_prompt),
    #         "narration": result.get("narration", snippet),
    #         "meaning": result.get("meaning", ""),
    #         "scene_type": result.get("scene_type", "static")
    #     })
    scenes = []
    for idx, fp in enumerate(frames):
        try:
            raw_cap = captions[idx]
            ref_prompt = refined_prompts[idx]
            snippet = transcript_snippets[idx]
            user_msg = create_llm_prompt(idx, raw_cap, ref_prompt, snippet, user_context)
            system_msg = "You are a top‐tier AI content strategist and video analyst."

            result = call_local_llm(system_msg, user_msg)

            scenes.append({
                "scene_id": idx + 1,
                "timestamp": idx * frame_interval,
                "frame_path": fp,
                "raw_caption": raw_cap,
                "refined_prompt": result.get("prompt", ref_prompt),
                "narration": result.get("narration", snippet),
                "meaning": result.get("meaning", ""),
                "scene_type": result.get("scene_type", "static")
            })
        except Exception as e:
            print(f"[LLM Error] Scene {idx}: {e}")


    # 8. Save structured JSON & return
    out = {
        "video_path": video_path,
        "transcript": transcript,
        "scenes": scenes
    }
    with open(f"{OUTPUT_DIR}/analysis_result.json", "w") as f:
        json.dump(out, f, indent=2)
    return out

# Example: run pipeline on a YouTube short
result = process_video_pipeline(
    "https://youtube.com/shorts/8-jFqdBBV0M?si=SNSHVFvv2zPJOwMz",
    use_audio=False,
    frame_interval=2.0,
    scene_mode="interval",  # "interval" or "scene_detect : should update in next implementaions"
    user_context="I want a cinematic and animatic tone"
)


What happens here:

Downloads or copies video.

Extracts audio + transcript.

Splits into frames (every 2 s).

Captions via BLIP-2.

(Optionally) Runs CLIP Interrogator—if GPU is available in Colab.

Breaks transcript into equal pieces per frame.

Sends each scene’s data to GPT-3.5 via OpenRouter.

Saves everything in /output/analysis_result.json (plus all frames under /frames/…).

In [None]:
# 9. Download or Clean Up
# Colab cell 11: Functions to download or clean output

# Download the entire OUTPUT_DIR as .zip for local storage
# !zip -r /content/analysis_output.zip {OUTPUT_DIR}

# To clean (delete) all files in /videos, /frames, /output:
def clean_all():
    for d in [VIDEOS_DIR, FRAMES_DIR, OUTPUT_DIR]:
        if os.path.exists(d):
            shutil.rmtree(d)
        os.makedirs(d, exist_ok=True)
    print("Cleaned all directories.")

# Example:
# clean_all()
# On Colab, after zip, you can click the file-
# -icon on the left panel and download analysis_output.zip.

💡 Additional Enhancements & “Impactful” Add‐Ons
Adaptive Frame Interval

Instead of a fixed 2 s, adjust interval based on scene complexity:

Use scene detection for abrupt changes (PySceneDetect).

Fallback to fixed if scene detection fails.

Prompt Paraphrasing

After obtaining a raw prompt, run a “paraphrase” step (via a second LLM call) to get 3–5 variant prompts (e.g. “fantasy style,” “cinematic,” “noir,” etc.).

Emotion/Tone Tagging

In your LLM prompt, ask for a tone tag (“Scene is comedic/tense/dark”) and store it.

Metadata Export in Multiple Formats

Save not only JSON but also:

A Markdown storyboard (analysis_result.md)

A CSV for easy spreadsheet import

Interactive Visualization (in Colab)

Display the first few frames inline with their captions and refined prompts:

python
Copy code
from IPython.display import display, Image
for i in range(min(5, len(result["scenes"]))):
    img_path = result["scenes"][i]["frame_path"]
    display(Image(filename=img_path, width=320))
    print("Prompt:", result["scenes"][i]["refined_prompt"])
    print("---")
Local HTML Export

Generate a tiny index.html that loads from /frames/… and displays scene cards (thumbnail + captions), so you can open it offline in any browser.

Audio Clip Extraction

If audio has multiple speakers or segments, split audio by silence to get per-scene audio clips.

Then transcribe individually for better alignment.

Model Checkpoint Caching

If you run multiple videos in one Colab session, BLIP-2 and Whisper caches will speed up subsequent calls.

