# Short-form Video Search Playground
This notebook recreates the core CLIP-based video indexing/search pipeline from the `CLIP-video-search` repository,
and adapts it so you can experiment directly on the MSRVTT clips that live under `shortform-video-ranker/data`.

Use it end-to-end or cherry-pick the pieces you need for your short-form video + LLM reranking project.


## Environment setup
Make sure the runtime has GPU access plus the following system dependencies:
- `ffmpeg/ffprobe` (for duration metadata)
- Python packages: `torch`, `transformers`, `opencv-python`, `scenedetect`, `tqdm`, `pillow`

If anything is missing, uncomment the `%pip` cell below or install the packages however you prefer before running the rest of the notebook.


In [32]:
# Optional: install runtime deps (requires internet access)
%pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
%pip install transformers scenedetect opencv-python tqdm pillow
%pip install faiss-cpu


huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Looking in indexes: https://download.pytorch.org/whl/cu118
Note: you may need to restart the kernel to use updated packages.


huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Note: you may need to restart the kernel to use updated packages.


huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Collecting faiss-cpu
  Downloading faiss_cpu-1.13.0-cp39-abi3-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl (23.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m23.6/23.6 MB[0m [31m37.4 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Installing collected packages: faiss-cpu
Successfully installed faiss-cpu-1.13.0
Note: you may need to restart the kernel to use updated packages.


In [50]:
import json
import math
import os
import subprocess
import uuid
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Tuple

import cv2
import scenedetect as sd
import torch
from PIL import Image
from tqdm import tqdm
from transformers import CLIPModel, CLIPProcessor
import numpy as np
import faiss
import torch.nn.functional as F


In [51]:
DATASET_ROOT = Path('data/1/TrainValVideo').resolve()
INDEX_ROOT = Path('notebook_artifacts/index').resolve()
TENSOR_CACHE = INDEX_ROOT / 'scene_tensors'
FAISS_SCENE_IDS_PATH = INDEX_ROOT / 'faiss_scene_ids.json'
FAISS_META_PATH = INDEX_ROOT / 'faiss_scene_meta.json'
FAISS_INDEX_PATH = INDEX_ROOT / 'faiss.index'
INDEX_ROOT.mkdir(parents=True, exist_ok=True)
TENSOR_CACHE.mkdir(parents=True, exist_ok=True)

VIDEO_EXTENSIONS = {'.mp4', '.mov', '.avi', '.mkv'}
print(f'Dataset root: {DATASET_ROOT}')
print(f'Index root: {INDEX_ROOT}')


Dataset root: /home/khushpatel/shortform-video-ranker/data/1/TrainValVideo
Index root: /home/khushpatel/shortform-video-ranker/notebook_artifacts/index


In [52]:
def list_video_files(root: Path, limit: Optional[int] = None) -> List[Path]:
    files = []
    for path in root.rglob('*'):
        if path.suffix.lower() in VIDEO_EXTENSIONS:
            files.append(path)
            if limit and len(files) >= limit:
                break
    return sorted(files)

def ffprobe_duration(path: Path) -> float:
    try:
        completed = subprocess.run(
            ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of',
             'default=noprint_wrappers=1:nokey=1', str(path)],
            check=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT
        )
        return float(completed.stdout)
    except Exception:
        return 0.0


In [53]:
class SceneFeatureExtractor:
    def __init__(self, samples_per_scene: int = 3, threshold: float = 27.0, tensor_cache: Path = TENSOR_CACHE):
        self.samples_per_scene = samples_per_scene
        self.threshold = threshold
        self.tensor_cache = Path(tensor_cache)
        self.tensor_cache.mkdir(parents=True, exist_ok=True)
        self.clip_processor = CLIPProcessor.from_pretrained('openai/clip-vit-base-patch32')
    def _collect_scenes(self, video_path: Path) -> List[Tuple[sd.FrameTimecode, sd.FrameTimecode]]:
        video = sd.open_video(str(video_path))
        manager = sd.SceneManager()
        manager.add_detector(sd.ContentDetector(threshold=self.threshold))
        manager.detect_scenes(video)
        return manager.get_scene_list()
    def _sample_frames(self, frame_start: int, frame_end: int) -> List[int]:
        length = max(frame_end - frame_start, 1)
        every_n = max(round(length / self.samples_per_scene), 1)
        return [frame_start + min(idx * every_n, length - 1) for idx in range(self.samples_per_scene)]
    def _save_tensor(self, tensor: torch.Tensor) -> str:
        target = self.tensor_cache / f'{uuid.uuid4().hex}.pt'
        torch.save(tensor.cpu(), target)
        return str(target)
    def extract(self, video_path: Path) -> Dict:
        scenes = self._collect_scenes(video_path)
        cap = cv2.VideoCapture(str(video_path))
        if not scenes:
            frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
            class _FrameStub:
                def __init__(self, frame_num: int):
                    self.frame_num = int(frame_num)
            scenes = [(_FrameStub(0), _FrameStub(max(frame_count - 1, 0)))]
        clip_pixel_scenes = []
        for scene_no, (start_tc, end_tc) in enumerate(scenes):
            samples = self._sample_frames(start_tc.frame_num, end_tc.frame_num)
            tensors = []
            for sample in samples:
                cap.set(cv2.CAP_PROP_POS_FRAMES, sample)
                ok, frame = cap.read()
                if not ok:
                    continue
                pil_image = Image.fromarray(frame)
                inputs = self.clip_processor(images=pil_image, return_tensors='pt', padding=True)
                tensors.append(inputs['pixel_values'].squeeze(0))
            if not tensors:
                continue
            stacked = torch.stack(tensors)
            tensor_path = self._save_tensor(torch.mean(stacked, dim=0))
            clip_pixel_scenes.append({
                'local_path': tensor_path,
                'scene_no': scene_no,
                'scene': {
                    'start_frame_num': start_tc.frame_num,
                    'end_frame_num': end_tc.frame_num,
                }
            })
        cap.release()
        return {
            'num_of_scenes': len(clip_pixel_scenes),
            'clip_pixel_scenes': clip_pixel_scenes,
        }


In [54]:
class FrameTextScorer:
    def __init__(self, device: Optional[str] = None):
        self.device = device or ('cuda' if torch.cuda.is_available() else 'cpu')
        self.processor = CLIPProcessor.from_pretrained('openai/clip-vit-base-patch32')
        self.model = CLIPModel.from_pretrained('openai/clip-vit-base-patch32').to(self.device)

    def score(self, tensor_paths: Iterable[str], text: str) -> float:
        tensor_paths = list(tensor_paths)
        if not tensor_paths:
            return 0.0
        total = 0.0
        for tensor_path in tensor_paths:
            image_tensor = torch.load(tensor_path, map_location=self.device)
            if image_tensor.ndim == 3:
                image_tensor = image_tensor.unsqueeze(0)
            inputs = self.processor(text=[text], return_tensors='pt', padding=True).to(self.device)
            inputs['pixel_values'] = image_tensor.to(self.device)
            with torch.inference_mode():
                outputs = self.model(**inputs)
            probs = outputs.logits_per_image.squeeze()
            total += probs.item()
        return float(total / len(tensor_paths))


In [55]:
@dataclass
class SceneRecord:
    scene_id: str
    tensor_path: str
    start_frame: int
    end_frame: int
    
class IndexStorage:
    def __init__(self, index_root: Path = INDEX_ROOT):
        self.index_root = Path(index_root)
        self.index_root.mkdir(parents=True, exist_ok=True)
        self.metadata_path = self.index_root / 'video_metadata.json'
        self.scene_path = self.index_root / 'video_scenes.json'
        self.scene_records_path = self.index_root / 'scene_records.json'
        self.uri_map_path = self.index_root / 'uri_to_id.json'
        self._metadata = self._load(self.metadata_path)
        self._video_scenes = self._load(self.scene_path)
        self._scene_records = self._load(self.scene_records_path)
        self._uri_map = self._load(self.uri_map_path)
    def _load(self, path: Path) -> Dict:
        if path.exists():
            return json.loads(path.read_text())
        return {}
    def _save(self, data: Dict, path: Path) -> None:
        path.write_text(json.dumps(data, indent=2))
    def has_video(self, video_uri: str) -> bool:
        return video_uri in self._uri_map
    def add_video(self, video_uri: str, video_duration: float, scenes: List[SceneRecord]) -> str:
        video_id = str(uuid.uuid4())
        self._metadata[video_id] = {
            'video_uri': video_uri,
            'video_duration': video_duration,
        }
        self._video_scenes[video_id] = [scene.scene_id for scene in scenes]
        for scene in scenes:
            self._scene_records[scene.scene_id] = {
                'tensor_path': scene.tensor_path,
                'start_frame': scene.start_frame,
                'end_frame': scene.end_frame,
                'video_id': video_id,
            }
        self._uri_map[video_uri] = video_id
        self._commit()
        return video_id
    def _commit(self):
        self._save(self._metadata, self.metadata_path)
        self._save(self._video_scenes, self.scene_path)
        self._save(self._scene_records, self.scene_records_path)
        self._save(self._uri_map, self.uri_map_path)
    def iter_videos(self):
        for video_id, metadata in self._metadata.items():
            yield video_id, metadata
    def iter_scenes(self):
        for scene_id, record in self._scene_records.items():
            yield scene_id, record
    def get_video_metadata(self, video_id: str):
        return self._metadata.get(video_id)
    def scene_tensor_paths(self, video_id: str) -> List[str]:
        scene_ids = self._video_scenes.get(video_id, [])
        return [self._scene_records[s]['tensor_path'] for s in scene_ids if s in self._scene_records]


In [56]:
class VideoIndexer:
    def __init__(self, extractor: SceneFeatureExtractor, storage: IndexStorage):
        self.extractor = extractor
        self.storage = storage

    def index_video(self, video_path: Path) -> Optional[str]:
        video_path = Path(video_path).resolve()
        if self.storage.has_video(str(video_path)):
            return None
        features = self.extractor.extract(video_path)
        scene_records = []
        for clip_scene in features['clip_pixel_scenes']:
            scene_records.append(SceneRecord(
                scene_id=str(uuid.uuid4()),
                tensor_path=clip_scene['local_path'],
                start_frame=clip_scene['scene']['start_frame_num'],
                end_frame=clip_scene['scene']['end_frame_num'],
            ))
        duration = ffprobe_duration(video_path)
        return self.storage.add_video(str(video_path), duration, scene_records)

    def bulk_index(self, video_paths: Iterable[Path], max_items: Optional[int] = None) -> Dict[str, int]:
        processed = 0
        skipped = 0
        for video_path in tqdm(video_paths, desc='Indexing videos'):
            if max_items and processed >= max_items:
                break
            video_id = self.index_video(video_path)
            if video_id:
                processed += 1
            else:
                skipped += 1
        return {'processed': processed, 'skipped': skipped}


In [57]:
class VideoSearchEngine:
    def __init__(self, storage: IndexStorage, scorer: FrameTextScorer):
        self.storage = storage
        self.scorer = scorer

    def search(self, text: str, top_k: int = 5) -> List[Dict]:
        results = []
        for video_id, metadata in self.storage.iter_videos():
            tensor_paths = self.storage.scene_tensor_paths(video_id)
            if not tensor_paths:
                continue
            score = self.scorer.score(tensor_paths, text)
            results.append({
                'video_id': video_id,
                'video_uri': metadata['video_uri'],
                'video_duration': metadata['video_duration'],
                'score': score,
            })
        results.sort(key=lambda item: item['score'], reverse=True)
        return results[:top_k]


In [58]:
scene_extractor = SceneFeatureExtractor(samples_per_scene=3)
storage = IndexStorage(INDEX_ROOT)
indexer = VideoIndexer(scene_extractor, storage)
frame_scorer = FrameTextScorer()
engine = VideoSearchEngine(storage, frame_scorer)
print('Components instantiated.')

Components instantiated.


In [None]:
video_files = list_video_files(DATASET_ROOT)
print(f'Found {len(video_files)} candidate video files. Showing a sample:')
video_files[:5]

Found 7010 candidate video files. Showing a sample:


[PosixPath('/home/khushpatel/shortform-video-ranker/data/1/TrainValVideo/video0.mp4'),
 PosixPath('/home/khushpatel/shortform-video-ranker/data/1/TrainValVideo/video1.mp4'),
 PosixPath('/home/khushpatel/shortform-video-ranker/data/1/TrainValVideo/video10.mp4'),
 PosixPath('/home/khushpatel/shortform-video-ranker/data/1/TrainValVideo/video100.mp4'),
 PosixPath('/home/khushpatel/shortform-video-ranker/data/1/TrainValVideo/video1000.mp4')]

In [30]:
# Index a manageable subset first to verify everything works.
subset_to_index = video_files  # index all files
index_summary = indexer.bulk_index(subset_to_index)
index_summary

Indexing videos: 100%|██████████| 7010/7010 [1:06:26<00:00,  1.76it/s]


{'processed': 7007, 'skipped': 3}

In [31]:
example_query = 'a person talking to the camera'
search_results = engine.search(example_query, top_k=10)
search_results


[{'video_id': '3ad0ae0c-6ae8-47e1-9753-5ca7caa89b54',
  'video_uri': '/home/khushpatel/shortform-video-ranker/data/1/TrainValVideo/video1758.mp4',
  'video_duration': 0.0,
  'score': 30.197233200073242},
 {'video_id': '2f1b9870-0c66-45be-abd7-d3e0b5204948',
  'video_uri': '/home/khushpatel/shortform-video-ranker/data/1/TrainValVideo/video117.mp4',
  'video_duration': 0.0,
  'score': 29.495466232299805},
 {'video_id': 'b1057168-556b-45c0-8ca6-dfc748a266c1',
  'video_uri': '/home/khushpatel/shortform-video-ranker/data/1/TrainValVideo/video1451.mp4',
  'video_duration': 0.0,
  'score': 28.711292266845703},
 {'video_id': 'd40a33dc-3813-445e-bbdb-fbe151d4193f',
  'video_uri': '/home/khushpatel/shortform-video-ranker/data/1/TrainValVideo/video4005.mp4',
  'video_duration': 0.0,
  'score': 28.614532470703125},
 {'video_id': '3b77a04c-0bdf-44f3-8e1c-5ca03c59a61b',
  'video_uri': '/home/khushpatel/shortform-video-ranker/data/1/TrainValVideo/video4701.mp4',
  'video_duration': 0.0,
  'score': 28

## Hooking in an LLM reranker
Once you have coarse CLIP scores, you can push the top-k clips (with metadata or transcripts) through any LLM to
build a second-pass ranking. Provide a textual description of each candidate (scene summaries, ASR text, etc.),
ask the model to order them by relevance, and merge the ordering back into your structured results.


In [None]:
def rerank_with_llm_stub(results: List[Dict], prompt_builder):
    # Placeholder that shows how you could integrate an LLM client.
    # `prompt_builder` should accept the candidate list and return a prompt string.
    # Replace the body with real LLM calls (OpenAI, Azure, local models, etc.).
    prompt = prompt_builder(results)
    print('LLM prompt preview:')
    print(prompt[:500])
    # TODO: send prompt to your model and parse its ordering.
    # For now, just return the incoming list so you can wire this up later.
    return results

def simple_prompt_builder(results: List[Dict]) -> str:
    lines = ['You are a helpful assistant. Rank the following video clips by relevance.']
    for idx, item in enumerate(results, start=1):
        lines.append(f"Clip {idx}: URI={item['video_uri']} | Duration={item['video_duration']:.2f}s | CLIP score={item['score']:.3f}")
    lines.append('Respond with the clip numbers in best-to-worst order separated by commas.')
    return '
'.join(lines)

reranked_results = rerank_with_llm_stub(search_results, simple_prompt_builder)
reranked_results


## Accelerating search with FAISS
These helper cells embed each cached scene tensor once, load them into a FAISS index, and
run top-K scene retrieval using CLIP's text encoder. The resulting scene hits are aggregated
back to video-level recommendations before any downstream reranking.


In [59]:
faiss_index = None
faiss_scene_ids = []
faiss_scene_meta = []
faiss_scene_matrix = None


def build_faiss_index(storage: IndexStorage, scorer: FrameTextScorer):
    global faiss_index, faiss_scene_ids, faiss_scene_meta, faiss_scene_matrix
    if not storage._scene_records:
        raise ValueError('No scene records available. Index videos first.')
    scene_ids = []
    embeddings = []
    metadata = []
    for scene_id, record in storage.iter_scenes():
        tensor_path = record['tensor_path']
        tensor = torch.load(tensor_path, map_location=scorer.device)
        if tensor.ndim == 3:
            tensor = tensor.unsqueeze(0)
        tensor = tensor.to(scorer.device)
        with torch.inference_mode():
            features = scorer.model.get_image_features(pixel_values=tensor)
        features = F.normalize(features, dim=-1)
        embeddings.append(features.squeeze(0).cpu().numpy().astype('float32'))
        enriched = dict(record)
        enriched['scene_id'] = scene_id
        metadata.append(enriched)
        scene_ids.append(scene_id)
    matrix = np.stack(embeddings).astype('float32')
    index = faiss.IndexFlatIP(matrix.shape[1])
    index.add(matrix)
    faiss_index = index
    faiss_scene_ids = scene_ids
    faiss_scene_meta = metadata
    faiss_scene_matrix = matrix
    return {'scenes_indexed': len(scene_ids), 'dimension': matrix.shape[1]}


In [60]:
faiss_stats = build_faiss_index(storage, frame_scorer)
faiss_stats


{'scenes_indexed': 23820, 'dimension': 512}

In [62]:
def search_videos_with_faiss(text: str, top_scene_hits: int = 20, top_videos: int = 5):
    if faiss_index is None:
        raise ValueError('Build the FAISS index first.')
    text_inputs = frame_scorer.processor(text=[text], return_tensors='pt', padding=True)
    text_inputs = {k: v.to(frame_scorer.device) for k, v in text_inputs.items()}
    with torch.inference_mode():
        text_features = frame_scorer.model.get_text_features(**text_inputs)
    text_features = F.normalize(text_features, dim=-1).cpu().numpy().astype('float32')
    k = min(top_scene_hits, faiss_index.ntotal)
    scores, indices = faiss_index.search(text_features, k)
    scene_hits = []
    for score, idx in zip(scores[0], indices[0]):
        if idx == -1:
            continue
        meta = faiss_scene_meta[idx]
        scene_hits.append({
            'scene_id': meta['scene_id'],
            'video_id': meta['video_id'],
            'score': float(score),
            'start_frame': meta['start_frame'],
            'end_frame': meta['end_frame'],
        })
    video_scores = {}
    for hit in scene_hits:
        vid = hit['video_id']
        current = video_scores.get(vid)
        if not current or hit['score'] > current['score']:
            video_scores[vid] = hit
    ranked = sorted(video_scores.values(), key=lambda item: item['score'], reverse=True)[:top_videos]
    results = []
    for hit in ranked:
        metadata = storage.get_video_metadata(hit['video_id']) or {}
        results.append({
            'video_id': hit['video_id'],
            'video_uri': metadata.get('video_uri'),
            'video_duration': metadata.get('video_duration'),
            'score': hit['score'],
            'scene_id': hit['scene_id'],
            'scene_frames': (hit['start_frame'], hit['end_frame']),
        })
    return {'scene_hits': scene_hits, 'video_results': results}

faiss_search_results = search_videos_with_faiss('car', top_scene_hits=30, top_videos=5)
faiss_search_results['video_results']


[{'video_id': '8f81179f-4213-49fb-a582-03c05d831cbd',
  'video_uri': '/home/khushpatel/shortform-video-ranker/data/1/TrainValVideo/video5334.mp4',
  'video_duration': 0.0,
  'score': 0.30587273836135864,
  'scene_id': 'fbe3324c-063a-4e7d-a52e-4d96f0c0cd60',
  'scene_frames': (0, 15)},
 {'video_id': '3b671d45-9fc0-4b5e-b5ad-cc6b5e735541',
  'video_uri': '/home/khushpatel/shortform-video-ranker/data/1/TrainValVideo/video6948.mp4',
  'video_duration': 0.0,
  'score': 0.30110543966293335,
  'scene_id': '8a229896-7da4-4ef7-8f81-1a00bd409990',
  'scene_frames': (196, 223)},
 {'video_id': 'b9ca730e-21b2-45b2-83e8-88e57da4bef2',
  'video_uri': '/home/khushpatel/shortform-video-ranker/data/1/TrainValVideo/video3215.mp4',
  'video_duration': 0.0,
  'score': 0.2981030344963074,
  'scene_id': 'e9cb4d4f-a06c-4bbe-8389-e5904ae7e2bb',
  'scene_frames': (0, 22)},
 {'video_id': 'edd1b066-bd1e-44d8-9453-cf35b7281576',
  'video_uri': '/home/khushpatel/shortform-video-ranker/data/1/TrainValVideo/video5736

### Persisting the FAISS index
Save the trained FAISS structure plus scene metadata to disk so you can reload them
without recomputing embeddings after a kernel restart.


In [None]:
def save_faiss_artifacts(index_path=FAISS_INDEX_PATH, meta_path=FAISS_META_PATH, ids_path=FAISS_SCENE_IDS_PATH):
    if faiss_index is None:
        raise ValueError('Build the FAISS index before saving.')
    faiss.write_index(faiss_index, str(index_path))
    Path(meta_path).write_text(json.dumps(faiss_scene_meta))
    Path(ids_path).write_text(json.dumps(faiss_scene_ids))
    return {
        'index_path': str(index_path),
        'meta_path': str(meta_path),
        'ids_path': str(ids_path),
        'entries': len(faiss_scene_ids),
    }


def load_faiss_artifacts(index_path=FAISS_INDEX_PATH, meta_path=FAISS_META_PATH, ids_path=FAISS_SCENE_IDS_PATH):
    global faiss_index, faiss_scene_meta, faiss_scene_ids
    if not Path(index_path).exists():
        raise FileNotFoundError(index_path)
    faiss_index = faiss.read_index(str(index_path))
    faiss_scene_meta = json.loads(Path(meta_path).read_text())
    faiss_scene_ids = json.loads(Path(ids_path).read_text())
    return {
        'index_path': str(index_path),
        'entries': len(faiss_scene_ids),
    }


In [None]:
saved_faiss = save_faiss_artifacts()
saved_faiss


In [None]:
loaded_faiss = load_faiss_artifacts()
loaded_faiss
