### Shot-Detection

In [None]:
# def detect_scenes(video_path, scene_list):
#     cap = cv2.VideoCapture(video_path)   
#     embeddings = []
#     metadatas = []
#     ids = []
#     with tqdm(total = len(scene_list), desc = "Processing frames") as pbar:
#         for i, scene in enumerate(scene_list):
#             start_time, end_time = scene[0].get_seconds(), scene[1].get_seconds()
#             mid_time = (start_time + end_time) / 2
#             timestamps = mid_time
#             labels = "middle"
            
            
#             cap.set(cv2.CAP_PROP_POS_MSEC, timestamps * 1000)
#             ret, frame = cap.read()
            
#             if ret:
#                 img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
#                 image = Image.fromarray(img_rgb)
#                 inputs = processor(images = image, return_tensors = 'pt').to(device)
#                 blip_input = blip_processor(images = image, return_tensors = 'pt').to(device)
#                 with torch.no_grad():
#                     outputs = model.get_image_features(inputs.pixel_values)
#                     blip_outputs = blip_model.generate(**blip_input,
#                                                     # max_length = 60,
#                                                     # min_length = 20,
#                                                     # no_repeat_ngram_size=2,
#                                                     # num_beams = 5,
#                                                     )
                
#                 caption = blip_processor.decode(blip_outputs[0], skip_special_tokens=True)
#                 image_embedding = outputs.pooler_output
#                 image_embedding = image_embedding / image_embedding.norm(dim = -1, keepdim= True)
#                 image_embedding = image_embedding.squeeze(0).cpu().numpy().tolist()
#                 timestamp_sec = timestamps*1000
#                 frame_id = f"{video_path}:{timestamp_sec}"
            
#                 ids.append(frame_id)
#                 embeddings.append(image_embedding)

#                 metadatas.append({
#                     "frame_idx": f"frame_no_{i}_{labels}",
#                     "caption": caption,
#                     "timestamp_ms": timestamp_sec,
#                     "source_path": video_path
#                 })
#             pbar.update(1)
                
#     return embeddings, metadatas, ids

In [1]:
from scenedetect import open_video, SceneManager
import scenedetect
from scenedetect.detectors import ContentDetector
import os
from PIL import Image
from transformers import CLIPProcessor, CLIPModel, BlipProcessor, BlipForConditionalGeneration
import torch
from tqdm import tqdm
import cv2
device = "cuda" if torch.cuda.is_available() else "cpu"

model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32",use_safetensors=True).to(device)
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

blip_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-large")
blip_model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-large").to(device)

  from .autonotebook import tqdm as notebook_tqdm
Loading weights: 100%|██████████| 398/398 [00:01<00:00, 394.86it/s, Materializing param=visual_projection.weight]                                
[1mCLIPModel LOAD REPORT[0m from: openai/clip-vit-base-patch32
Key                                  | Status     |  | 
-------------------------------------+------------+--+-
vision_model.embeddings.position_ids | UNEXPECTED |  | 
text_model.embeddings.position_ids   | UNEXPECTED |  | 

[3mNotes:
- UNEXPECTED[3m	:can be ignored when loading from different task/architecture; not ok if you expect identical arch.[0m
The image processor of type `CLIPImageProcessor` is now loaded as a fast processor by default, even if the model checkpoint was saved with a slow processor. This is a breaking change and may produce slightly different outputs. To continue using the slow processor, instantiate this class with `use_fast=False`. 
The image processor of type `BlipImageProcessor` is now loaded as a fa

In [2]:
def scene_split(video_path):
    print("--- Detecting shot boundaries with PySceneDetect ---")
    video = open_video(video_path)
    scene_manager = SceneManager()
    scene_manager.add_detector(ContentDetector())

    try:
        scene_manager.detect_scenes(video, show_progress=False)
        scene_list = scene_manager.get_scene_list()
    except Exception as e:
        print("Scene detection failed:", e)
        scene_list = []
    return scene_list

In [3]:
def detect_scenes(video_path, scene_list):
    cap = cv2.VideoCapture(video_path)   
    embeddings = []
    metadatas = []
    ids = []
    with tqdm(total = len(scene_list), desc = "Processing frames") as pbar:
        for i, scene in enumerate(scene_list):
            start_time, end_time = scene[0].get_seconds(), scene[1].get_seconds()
            mid_time = (start_time + end_time) / 2
            timestamps = [start_time, mid_time, end_time]
            labels = ["initial", "middle", "final"]
            
            for t, label in zip(timestamps, labels):
                cap.set(cv2.CAP_PROP_POS_MSEC, t * 1000)
                ret, frame = cap.read()
                
                if ret:
                    img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                    image = Image.fromarray(img_rgb)
                    inputs = processor(images = image, return_tensors = 'pt').to(device)
                    blip_input = blip_processor(images = image, return_tensors = 'pt').to(device)
                    with torch.no_grad():
                        outputs = model.get_image_features(inputs.pixel_values)
                        blip_outputs = blip_model.generate(**blip_input,
                                                        # max_length = 500,
                                                        # min_length = 150,
                                                        # no_repeat_ngram_size=2,
                                                        # num_beams = 5,
                                                        )
                    
                    caption = blip_processor.decode(blip_outputs[0], skip_special_tokens=True)
                    image_embedding = outputs.pooler_output
                    image_embedding = image_embedding / image_embedding.norm(dim = -1, keepdim= True)
                    image_embedding = image_embedding.squeeze(0).cpu().numpy().tolist()
                    timestamp_sec = t*1000
                    frame_id = f"{video_path}:{timestamp_sec}"
                
                    ids.append(frame_id)
                    embeddings.append(image_embedding)

                    metadatas.append({
                        "frame_idx": f"frame_no_{i}_{label}",
                        "caption": caption,
                        "timestamp_ms": timestamp_sec,
                        "source_path": video_path
                    })
            pbar.update(1)
                
    return embeddings, metadatas, ids

In [4]:
scene_list = scene_split("../This Integral Breaks Math.mp4")

--- Detecting shot boundaries with PySceneDetect ---


In [5]:
emb, met, ids = detect_scenes(video_path="../This Integral Breaks Math.mp4", scene_list= scene_list)

Processing frames:   0%|          | 0/49 [00:00<?, ?it/s]

Processing frames: 100%|██████████| 49/49 [02:47<00:00,  3.42s/it]


In [7]:
import chromadb
client = chromadb.PersistentClient(path = "../db_path")
collection = client.get_or_create_collection("frame_collection")

In [8]:
collection.add(
    ids = ids,
    embeddings= emb,
    metadatas= met,
)

In [13]:
w[0]

(00:00:00.000 [frame=0, fps=30.000], 00:00:01.233 [frame=37, fps=30.000])