In [5]:
import os
import cv2
import csv
from scenedetect import VideoManager, SceneManager
from scenedetect.detectors import ContentDetector

def detect_scenes(video_path):
    video_manager = VideoManager([video_path])
    scene_manager = SceneManager()
    scene_manager.add_detector(ContentDetector(threshold=30.0))

    video_manager.set_downscale_factor()
    video_manager.start()

    scene_manager.detect_scenes(frame_source=video_manager)
    scene_list = scene_manager.get_scene_list()
    print(f"[INFO] Detected {len(scene_list)} scenes.")
    video_manager.release()
    return scene_list


def extract_frames_per_scene(video_path, scene_list, output_dir, fps=0.5, save_csv=True):
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    cap = cv2.VideoCapture(video_path)
    original_fps = cap.get(cv2.CAP_PROP_FPS)

    metadata = []

    for i, (start_time, end_time) in enumerate(scene_list):
        scene_start_frame = int(start_time.get_seconds() * original_fps)
        scene_end_frame = int(end_time.get_seconds() * original_fps)
        step = int(original_fps / fps)  # ⬅ this is now large for low fps (e.g., 60 if 30fps input and 0.5 fps target)

        cap.set(cv2.CAP_PROP_POS_FRAMES, scene_start_frame)
        frame_id = scene_start_frame
        frame_count = 0

        while frame_id < scene_end_frame:
            cap.set(cv2.CAP_PROP_POS_FRAMES, frame_id)
            ret, frame = cap.read()
            if not ret:
                break

            timestamp_sec = frame_id / original_fps
            out_filename = f'scene_{i:03}_frame_{frame_count:04}.jpg'
            out_path = os.path.join(output_dir, out_filename)
            cv2.imwrite(out_path, frame)

            metadata.append({
                "scene": i,
                "frame_count": frame_count,
                "frame_id": frame_id,
                "timestamp_sec": round(timestamp_sec, 3),
                "file_name": out_filename
            })

            frame_id += step
            frame_count += 1

    cap.release()
    print(f"[INFO] Frames saved in {output_dir}")

    if save_csv and metadata:
        csv_path = os.path.join(output_dir, "frame_metadata.csv")
        with open(csv_path, mode='w', newline='') as csv_file:
            writer = csv.DictWriter(csv_file, fieldnames=metadata[0].keys())
            writer.writeheader()
            writer.writerows(metadata)
        print(f"[INFO] Frame metadata saved to {csv_path}")



In [2]:
# ==== MAIN ====
video_path = "../notebook/input.mp4"
output_dir = "output_frames"

# 1. Detect scenes
scene_list = detect_scenes(video_path)

VideoManager is deprecated and will be removed.


[INFO] Detected 47 scenes.


In [6]:
# 2. Extract 2 frames per second from each scene
extract_frames_per_scene(video_path, scene_list, output_dir, fps=0.5)

[INFO] Frames saved in output_frames
[INFO] Frame metadata saved to output_frames\frame_metadata.csv


In [12]:
import clip
import torch
from PIL import Image
from tqdm import tqdm

device = "cuda" if torch.cuda.is_available() else "cpu"

# Load the model (ViT-B/32 is fast, ViT-L/14 is more accurate)
model, preprocess = clip.load("ViT-B/32", device=device)


In [2]:
def get_image_embedding(image_path):
    image = preprocess(Image.open(image_path).convert("RGB")).unsqueeze(0).to(device)
    with torch.no_grad():
        image_features = model.encode_image(image)
    return image_features.cpu().numpy()[0]  # Shape: (512,)


In [3]:
def get_image_embeddings_batch(image_paths, batch_size=32):
    all_embeddings = []
    for i in tqdm(range(0, len(image_paths), batch_size), desc="Extracting CLIP embeddings"):
        batch_paths = image_paths[i:i+batch_size]
        batch_images = [preprocess(Image.open(p).convert("RGB")) for p in batch_paths]
        batch_tensor = torch.stack(batch_images).to(device)
        with torch.no_grad():
            batch_features = model.encode_image(batch_tensor).cpu().numpy()
        all_embeddings.extend(batch_features)
    return all_embeddings  # Shape: (N, 512)

def get_text_embedding(text_query):
    text = clip.tokenize([text_query]).to(device)
    with torch.no_grad():
        text_features = model.encode_text(text)
    return text_features.cpu().numpy()[0]


from sklearn.metrics.pairwise import cosine_similarity

def find_similar_images(text_query, image_embeddings, image_paths, top_k=5):
    text_emb = get_text_embedding(text_query)
    sims = cosine_similarity([text_emb], image_embeddings)[0]
    top_indices = sims.argsort()[-top_k:][::-1]
    return [(image_paths[i], sims[i]) for i in top_indices]


In [5]:
import os

image_folder = "../notebook/output_frames/"  # Replace with your folder
image_paths = [os.path.join(image_folder, f) for f in os.listdir(image_folder) if f.lower().endswith(('png', 'jpg', 'jpeg'))]

image_embeddings = get_image_embeddings_batch(image_paths)

Extracting CLIP embeddings: 100%|██████████| 10/10 [00:14<00:00,  1.48s/it]


AttributeError: 'list' object has no attribute 'T'

In [23]:
query = "transformer architecture paper"
results = find_similar_images(query, image_embeddings, image_paths)

for path, score in results:
    print(f"{path} (score: {score:.4f})")

../notebook/output_frames/scene_015_frame_0002.jpg (score: 0.2983)
../notebook/output_frames/scene_015_frame_0001.jpg (score: 0.2969)
../notebook/output_frames/scene_003_frame_0001.jpg (score: 0.2929)
../notebook/output_frames/scene_003_frame_0000.jpg (score: 0.2904)
../notebook/output_frames/scene_027_frame_0003.jpg (score: 0.2716)


In [36]:
image_embeddings[0].shape[0], len(image_embeddings)

(512, 299)