In [None]:
"""
ClipInsight Pipeline
Input: video file path
Output: keyframes, transcript, multimodal fused features, and a text summary

Prereqs (install these in your environment):
    pip install open-clip-torch==2.0.0  # or `open-clip-torch`
    pip install transformers
    pip install accelerate
    pip install sentence-transformers
    pip install git+https://github.com/openai/whisper.git   # or 'whisper' package
    pip install torchaudio
    pip install moviepy
    pip install scikit-learn
    pip install tqdm
    pip install opencv-python
"""

import os
import math
import json
import tempfile
from typing import List, Dict, Tuple, Optional

import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
import numpy as np
from PIL import Image
import cv2
from tqdm import tqdm

# optional model libs
try:
    import open_clip
except Exception as e:
    open_clip = None
try:
    import whisper
except Exception as e:
    whisper = None
from transformers import T5ForConditionalGeneration, T5TokenizerFast, pipeline
from sentence_transformers import SentenceTransformer, util
from sklearn.cluster import KMeans

# ---------------------------
# CONFIG - change as needed
# ---------------------------
CONFIG = {
    "VIDEO_PATH": "input.mp4",
    "WORKDIR": "clipinsight_output",
    "FRAME_EXTRACT_FPS": 3,       # sample fps to extract (change to 30 for all frames)
    "CLIP_MODEL": ("ViT-B-32", "openai"),   # open-clip model name + pretrained source
    "USE_WHISPER": True,
    "WHISPER_MODEL": "base",      # tiny, base, small, medium, large
    "LITE_MODE": True,            # if True uses smaller models for speed (recommended)
    "NUM_SCENES": 8,              # number of clusters/scenes for summarization
    "SUMMARY_MODEL": "t5-small",  # or "t5-base" / "facebook/bart-large-cnn"
    "DEVICE": "cuda" if torch.cuda.is_available() else "cpu",
    "BATCH_SIZE": 32,             # embedding batch size
    "CACHE": True,                # cache embeddings / captions to disk
}

os.makedirs(CONFIG["WORKDIR"], exist_ok=True)

# ---------------------------
# Utilities
# ---------------------------
def ffmpeg_extract_audio(video_path: str, out_audio: str):
    # uses opencv/ffmpeg via moviepy for cross-platform simplicity
    from moviepy.editor import VideoFileClip
    clip = VideoFileClip(video_path)
    clip.audio.write_audiofile(out_audio, verbose=False, logger=None)
    clip.close()

# ---------------------------
# 1) Frame extraction
#    extracts images at sample_fps (not necessarily original fps)
# ---------------------------
def extract_frames(video_path: str, out_dir: str, sample_fps: float = 3.0) -> List[str]:
    os.makedirs(out_dir, exist_ok=True)
    cap = cv2.VideoCapture(video_path)
    orig_fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
    interval = max(1, int(round(orig_fps / sample_fps)))
    saved_paths = []
    idx = 0
    saved = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        if idx % interval == 0:
            filename = f"frame_{saved:06d}.jpg"
            path = os.path.join(out_dir, filename)
            cv2.imwrite(path, frame)
            saved_paths.append(path)
            saved += 1
        idx += 1
    cap.release()
    return saved_paths

# ---------------------------
# 2) Speech-to-text (Whisper wrapper)
# ---------------------------
def transcribe_audio_with_whisper(video_path: str, out_txt: str, model_name: str = "base"):
    if whisper is None:
        raise RuntimeError("whisper is not installed. pip install git+https://github.com/openai/whisper.git")
    audio_tmp = os.path.join(tempfile.gettempdir(), "clipinsight_audio.wav")
    ffmpeg_extract_audio(video_path, audio_tmp)
    model = whisper.load_model(model_name, device=CONFIG["DEVICE"])
    result = model.transcribe(audio_tmp, fp16=(CONFIG["DEVICE"]=="cuda"))
    transcript = result.get("text","")
    with open(out_txt, "w", encoding="utf-8") as f:
        f.write(transcript)
    return transcript

# ---------------------------
# 3) Frame embedding using CLIP (open-clip)
# ---------------------------
def load_openclip(model_name: str = "ViT-B-32", pretrained: str = "openai"):
    if open_clip is None:
        raise RuntimeError("open_clip not installed. pip install open-clip-torch")
    model, _, preprocess = open_clip.create_model_and_transforms(model_name, pretrained=pretrained)
    model.eval()
    return model.to(CONFIG["DEVICE"]), preprocess

def batch_embed_images_openclip(model, preprocess, image_paths: List[str], batch_size: int = 32):
    all_embs = []
    from torchvision import transforms
    import torch
    imgs = []
    for p in image_paths:
        img = Image.open(p).convert("RGB")
        imgs.append(preprocess(img))
    with torch.no_grad():
        for i in range(0, len(imgs), batch_size):
            batch = torch.stack(imgs[i:i+batch_size]).to(CONFIG["DEVICE"])
            emb = model.encode_image(batch)
            emb = emb / emb.norm(dim=-1, keepdim=True)
            all_embs.append(emb.cpu().numpy())
    if len(all_embs) == 0:
        return np.zeros((0,512), dtype=np.float32)
    return np.vstack(all_embs)

# Alternative: SentenceTransformer image model (fallback)
def batch_embed_images_sbert(model_name: str, image_paths: List[str], batch_size: int=32):
    s = SentenceTransformer(model_name, device=CONFIG["DEVICE"])
    return s.encode(image_paths, batch_size=batch_size, convert_to_tensor=False, show_progress_bar=True)

# ---------------------------
# 4) Caption generation (BLIP)
# ---------------------------
def generate_blip_captions(frames: List[str], cache_path: Optional[str] = None) -> Dict[str, str]:
    from transformers import BlipProcessor, BlipForConditionalGeneration
    device = CONFIG["DEVICE"]
    processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
    model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base").to(device)
    captions = {}
    for p in tqdm(frames, desc="Captioning frames"):
        image = Image.open(p).convert("RGB")
        inputs = processor(image, return_tensors="pt").to(device)
        out = model.generate(**inputs, max_new_tokens=30)
        cap = processor.decode(out[0], skip_special_tokens=True)
        captions[os.path.basename(p)] = cap
    if cache_path:
        with open(cache_path, "w", encoding="utf-8") as f:
            json.dump(captions, f, indent=2, ensure_ascii=False)
    return captions

# ---------------------------
# 5) Temporal model (lightweight BiLSTM baseline)
#    This is optional for supervised scoring; here we implement a small BiLSTM + MLP
# ---------------------------
class TemporalScorer(nn.Module):
    def __init__(self, input_dim: int = 512, hidden: int = 256, n_layers:int=1):
        super().__init__()
        self.lstm = nn.LSTM(input_dim, hidden, num_layers=n_layers, batch_first=True, bidirectional=True)
        self.mlp = nn.Sequential(
            nn.Linear(hidden*2, hidden),
            nn.ReLU(),
            nn.Linear(hidden, 1),
            nn.Sigmoid()
        )
    def forward(self, x):
        # x: (B, T, D)
        out, _ = self.lstm(x)
        scores = self.mlp(out).squeeze(-1)  # (B, T)
        return scores

# ---------------------------
# 6) Multimodal fusion (simple cross-attention module)
#    We'll implement a simple fusion: project image emb and text emb to same dim,
#    then run a small Transformer encoder to mix them.
# ---------------------------
class FusionModule(nn.Module):
    def __init__(self, img_dim=512, txt_dim=384, hidden=512, num_layers=2, nhead=8):
        super().__init__()
        self.img_proj = nn.Linear(img_dim, hidden)
        self.txt_proj = nn.Linear(txt_dim, hidden)
        encoder_layer = nn.TransformerEncoderLayer(d_model=hidden, nhead=nhead, batch_first=True)
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.pool = nn.AdaptiveAvgPool1d(1)
    def forward(self, img_embs: torch.Tensor, txt_embs: torch.Tensor):
        # img_embs: (T_img, img_dim), txt_embs: (T_txt, txt_dim)
        device = img_embs.device
        img = self.img_proj(img_embs)        # (T_img, hidden)
        txt = self.txt_proj(txt_embs)        # (T_txt, hidden)
        concat = torch.cat([img, txt], dim=0).unsqueeze(0)  # (1, T, hidden)
        fused = self.transformer(concat)  # (1, T, hidden)
        pooled = fused.mean(dim=1)  # (1, hidden)
        return pooled.squeeze(0)    # (hidden,)

# ---------------------------
# 7) Summary generation (T5)
# ---------------------------
def generate_summary_from_texts(texts: List[str], model_name: str = "t5-small", max_len=120):
    # combine texts intelligently (you can weight and cluster before calling this)
    joined = " ".join(texts)
    tokenizer = T5TokenizerFast.from_pretrained(model_name)
    model = T5ForConditionalGeneration.from_pretrained(model_name).to(CONFIG["DEVICE"])
    inputs = tokenizer("summarize: " + joined, return_tensors="pt", truncation=True, max_length=1024).to(CONFIG["DEVICE"])
    out = model.generate(**inputs, max_length=max_len, num_beams=4, early_stopping=True)
    summary = tokenizer.decode(out[0], skip_special_tokens=True)
    return summary

# ---------------------------
# 8) High-level pipeline orchestration
# ---------------------------
def pipeline_run(video_path: str, workdir: str):
    os.makedirs(workdir, exist_ok=True)
    frames_dir = os.path.join(workdir, "frames")
    os.makedirs(frames_dir, exist_ok=True)

    # 1) extract frames (sampled)
    print("1) extracting frames ...")
    frames = extract_frames(video_path, frames_dir, sample_fps=CONFIG["FRAME_EXTRACT_FPS"])
    print(f"   frames saved: {len(frames)}")

    # 2) transcribe audio
    transcript_path = os.path.join(workdir, "transcript.txt")
    transcript = ""
    if CONFIG["USE_WHISPER"]:
        print("2) transcribing audio with Whisper ...")
        try:
            transcript = transcribe_audio_with_whisper(video_path, transcript_path, model_name=CONFIG["WHISPER_MODEL"])
        except Exception as e:
            print("Whisper failed:", e)
    else:
        print("2) skipping transcript")

    # 3) compute image embeddings (CLIP)
    print("3) computing CLIP embeddings ...")
    clip_model_choice = CONFIG["CLIP_MODEL"]
    if open_clip is None:
        raise RuntimeError("open_clip not installed. Install open-clip-torch")
    clip_model, preprocess = load_openclip(model_name=clip_model_choice[0], pretrained=clip_model_choice[1])
    img_embeddings = batch_embed_images_openclip(clip_model, preprocess, frames, batch_size=CONFIG["BATCH_SIZE"])
    # optionally cache
    emb_path = os.path.join(workdir, "img_embeddings.npy")
    if CONFIG["CACHE"]:
        np.save(emb_path, img_embeddings)

    # 4) caption key frames (BLIP)
    print("4) generating captions (BLIP) ...")
    captions_cache = os.path.join(workdir, "captions.json")
    captions = generate_blip_captions(frames, cache_path=captions_cache) if not os.path.exists(captions_cache) else json.load(open(captions_cache))

    # 5) compute caption/text embeddings (SentenceTransformer)
    print("5) computing text embeddings for captions ...")
    txt_model_name = "all-MiniLM-L6-v2"  # small and fast; swap for larger if needed
    txt_encoder = SentenceTransformer(txt_model_name, device=CONFIG["DEVICE"])
    caption_texts = [captions[os.path.basename(p)] for p in frames]
    txt_embeddings = np.array(txt_encoder.encode(caption_texts, batch_size=CONFIG["BATCH_SIZE"], show_progress_bar=True))

    # 6) compute weights (multimodal uniqueness)
    print("6) computing importance weights ...")
    # Option: combine image + text embeddings (concatenate) then compute uniqueness
    img_embs_norm = img_embeddings / np.linalg.norm(img_embeddings, axis=1, keepdims=True)
    txt_embs_norm = txt_embeddings / np.linalg.norm(txt_embeddings, axis=1, keepdims=True)
    combined = np.concatenate([img_embs_norm, txt_embs_norm], axis=1)  # (T, 512+384)
    sim = util.cos_sim(torch.tensor(combined), torch.tensor(combined)).cpu().numpy()
    weights = 1 - sim.mean(axis=1)   # uniqueness score

    # 7) cluster scenes and pick representative frames per cluster
    print("7) clustering scenes and selecting keyframes ...")
    n_clusters = min(CONFIG["NUM_SCENES"], len(frames))
    kmeans = KMeans(n_clusters=n_clusters, random_state=42).fit(combined)
    selected_frames = []
    for cidx in range(n_clusters):
        members = np.where(kmeans.labels_ == cidx)[0]
        if len(members) == 0:
            continue
        best = members[np.argmax(weights[members])]
        selected_frames.append(frames[best])

    # 8) create weighted caption list so summary emphasizes important frames
    print("8) building weighted caption list ...")
    # create a mapping: frame basename -> weight
    name_to_weight = {os.path.basename(frames[i]): float(weights[i]) for i in range(len(frames))}
    weighted_caps = []
    max_w = max(weights) if len(weights)>0 else 1.0
    for f in frames:
        cap = captions[os.path.basename(f)]
        w = name_to_weight[os.path.basename(f)]
        # scale repeats by weight (tune scale_factor as needed)
        repeats = int(np.clip((w / (max_w + 1e-9)) * 4, 1, 6))
        weighted_caps.extend([cap]*repeats)

    # 9) generate text summary (T5)
    print("9) generating summary with T5 ...")
    summary = generate_summary_from_texts(weighted_caps, model_name=CONFIG["SUMMARY_MODEL"], max_len=120)

    # 10) produce final outputs
    outputs = {
        "frames": frames,
        "selected_keyframes": [os.path.basename(p) for p in selected_frames],
        "captions": captions,
        "transcript": transcript,
        "summary": summary,
        "weights": {os.path.basename(frames[i]): float(weights[i]) for i in range(len(frames))}
    }
    with open(os.path.join(workdir, "clipinsight_output.json"), "w", encoding="utf-8") as f:
        json.dump(outputs, f, indent=2, ensure_ascii=False)

    print("\nâœ… Pipeline finished. Results written to:", workdir)
    print("Summary:\n", summary)
    return outputs

# ---------------------------
# If run as script
# ---------------------------
if __name__ == "__main__":
    out = pipeline_run(CONFIG["VIDEO_PATH"], CONFIG["WORKDIR"])


In [1]:

import os
import math
import json
import tempfile
from typing import List, Dict, Tuple, Optional

import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
import numpy as np
from PIL import Image
import cv2
from tqdm import tqdm