# LanceDB + Twelve Labs Video AI Tutorial

In this notebook, we will build a semantic video search and summarization system capable of understanding video content and responding to natural language queries.
We'll integrate three technologies:

1. Twelve Labs: Marengo for multimodal video embeddings and Pegasus for video understanding
2. LanceDB: Open-source multimodal lakehouse for managing AI data at scale

By the end of this tutorial, you'll have a working system that can:
1. Load videos from HuggingFace dataset
2. Generate semantic embeddings with Marengo
3. Store videos and embeddings in LanceDB
4. Search videos using natural language queries (text embeddings)
5. Generate intelligent summaries with Pegasus

**Prerequisites**: Twelve Labs API key from [twelvelabs.io](https://twelvelabs.io)

## Install Dependencies

First, install the required packages.

In [None]:
# Install all required packages
!uv pip install -q geneva datasets==3.5.0 twelvelabs ray[default] kubernetes

## Import Libraries

Import all necessary libraries for video processing and database operations.

In [None]:
# Standard library imports
import os
import time
import tempfile
import shutil
from typing import Callable

# Data processing and scientific computing
import numpy as np
import pandas as pd
import pyarrow as pa

# Database and ML frameworks
import geneva
from geneva import udf
import lancedb
from twelvelabs import TwelveLabs

# Dataset handling
from datasets import load_dataset

# Environment-specific imports (optional)
try:
    from google.colab import userdata
    IN_COLAB = True
except ImportError:
    IN_COLAB = False

## Configuration

Set up the database path and processing parameters.

In [None]:
# Configuration
GENEVA_DB_PATH = "/content/quickstart/"
NUM_VIDEOS = 10

## API Authentication

Configure Twelve Labs API credentials.

In [None]:
# API Key Setup
if IN_COLAB:
    # Google Colab environment
    os.environ['TWELVE_LABS_API_KEY'] = userdata.get('TWELVE_LABS_API_KEY')
else:
    # Local environment - ensure TWELVE_LABS_API_KEY is set in your environment
    # export TWELVE_LABS_API_KEY='your-api-key'
    if 'TWELVE_LABS_API_KEY' not in os.environ:
        raise ValueError("Please set TWELVE_LABS_API_KEY environment variable")

## Load Video Dataset

Load video data from the HuggingFace FineVideo dataset.

In [None]:
# Load videos from HuggingFace FineVideo dataset
shutil.rmtree(GENEVA_DB_PATH, ignore_errors=True)

def load_videos():
    dataset = load_dataset("HuggingFaceFV/finevideo", split="train", streaming=True)
    batch = []
    processed = 0

    for row in dataset:
        if processed >= NUM_VIDEOS:
            break

        video_bytes = row['mp4']
        json_metadata = row['json']

        # Create rich caption from metadata
        caption_parts = []
        if json_metadata.get("youtube_title"):
            caption_parts.append(json_metadata["youtube_title"])
        if json_metadata.get("youtube_description"):
            desc = json_metadata["youtube_description"][:200]
            caption_parts.append(desc + "..." if len(json_metadata["youtube_description"]) > 200 else desc)

        batch.append({
            "video": video_bytes,
            "caption": " | ".join(filter(None, caption_parts)) or "No description",
            "youtube_title": json_metadata.get("youtube_title", ""),
            "video_id": f"video_{processed}",
            "duration": json_metadata.get("duration_seconds", 0),
            "resolution": json_metadata.get("resolution", "")
        })
        processed += 1

    return pa.RecordBatch.from_pylist(batch)

db = geneva.connect(GENEVA_DB_PATH)

In [None]:
# Create table with video data
tbl = db.create_table("videos", load_videos(), mode="overwrite")

In [None]:
# Display the video data table
tbl.to_pandas().head()

## Initialize Twelve Labs Client

Initialize the Twelve Labs client for API access.

In [None]:
# Initialize Twelve Labs client
client = TwelveLabs(api_key=os.environ['TWELVE_LABS_API_KEY'])

## Define Video Embedding UDF

Create a User-Defined Function (UDF) to generate video embeddings using the Marengo model.

In [None]:
# Create video embeddings UDF using Twelve Labs Marengo model
@udf(data_type=pa.list_(pa.float32(), 1024))
class GenVideoEmbeddings(Callable):
    def __init__(self, twelve_labs_api_key):
        self.client = None
        self.is_loaded = False
        self.twelve_labs_api_key = twelve_labs_api_key

    def setup(self):
        self.client = TwelveLabs(api_key=self.twelve_labs_api_key)
        self.is_loaded = True

    def __call__(self, video: pa.Array) -> pa.Array:
        if not self.is_loaded:
            self.setup()

        embeddings = []

        for i, v in enumerate(video):
            video_bytes = v.as_buffer().to_pybytes()

            # Save video to temporary file
            with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as tmp:
                tmp.write(video_bytes)
                video_path = tmp.name

            try:
                # Create embedding task - API requires 'clip' to be included
                with open(video_path, 'rb') as video_file:
                    task = self.client.embed.tasks.create(
                        model_name="Marengo-retrieval-2.7",
                        video_file=video_file,
                        video_embedding_scope=["clip", "video"]
                    )

                # Wait for completion and retrieve embedding
                status = self.client.embed.tasks.wait_for_done(task.id)
                result = self.client.embed.tasks.retrieve(task.id)

                video_segments = [seg for seg in result.video_embedding.segments
                                if seg.embedding_scope == "video"]

                if not video_segments:
                    print(f"No video embedding segments found for video {i}")
                    embeddings.append(pa.array(np.zeros(1024, dtype=np.float32)))
                    continue

                # Extract embedding using .float_ (confirmed working)
                embedding_data = video_segments[0].float_
                embedding_array = np.array(embedding_data, dtype=np.float32)

                if np.allclose(embedding_array, 0):
                    print(f"Warning: Got zero embedding for video {i}")
                else:
                    print(f"✅ Valid embedding for video {i}, sum: {sum(embedding_data):.4f}")

                embeddings.append(pa.array(embedding_array))

            except Exception as e:
                print(f"Error processing video {i}: {str(e)}")
                embeddings.append(pa.array(np.zeros(1024, dtype=np.float32)))
            finally:
                if os.path.exists(video_path):
                    os.unlink(video_path)

        return pa.FixedSizeListArray.from_arrays(pa.concat_arrays(embeddings), 1024)

# Add embedding column to table
tbl.add_columns({"embedding": GenVideoEmbeddings(twelve_labs_api_key=os.environ['TWELVE_LABS_API_KEY'])})

## Generate Embeddings

Apply the embedding function to process all videos in the table.

In [None]:
# Generate embeddings for all videos
tbl.backfill("embedding", concurrency=1)

In [None]:
# Display the updated table with embeddings
tbl.to_pandas()

## Add Summary Column

Add a summary column to store video descriptions.

In [None]:
# Add empty summary column using a simple UDF
@udf(data_type=pa.string())
class InitSummary(Callable):
    def __call__(self, video_id: pa.Array) -> pa.Array:
        # Return empty strings for now
        return pa.array([""] * len(video_id))

# Add the summary column
tbl.add_columns({"video_summary": InitSummary()})

# Verify the column was added
print("Table columns:", tbl.to_pandas().columns.tolist())

## Create Pegasus Index

Create a Twelve Labs index configured with the Pegasus model for video summarization.

In [None]:
# Create Twelve Labs index for summarization
index = client.indexes.create(
    index_name=f"lancedb_demo_{int(time.time())}",
    models=[{
        "model_name": "pegasus1.2",
        "model_options": ["visual", "audio"]
    }]
)

## Convert Query to Embedding

Convert the search query to an embedding vector.

In [None]:
# Generate query embedding using Marengo
query = "educational tutorial"
query_result = client.embed.create(
    model_name="Marengo-retrieval-2.7",
    text=query
)
query_embedding = np.array(query_result.text_embedding.segments[0].float_)

## Search Videos by Similarity

Execute vector similarity search to find relevant videos.

In [None]:
# Execute vector search using LanceDB
lance_db = lancedb.connect(GENEVA_DB_PATH)
lance_tbl = lance_db.open_table("videos")

search_results = (lance_tbl
                 .search(query_embedding)
                 .metric("cosine")
                 .limit(3)
                 .to_pandas())

search_results[['youtube_title', 'duration', '_distance', 'caption']].head()

## Create Video Summarizer

Initialize the video summarization pipeline.

In [None]:
# Initialize video summarizer
class VideoSummarizer:
    def __init__(self, twelve_labs_api_key, index_id):
        self.client = TwelveLabs(api_key=twelve_labs_api_key)
        self.index_id = index_id
        self.uploaded_videos = {}

    def summarize(self, video_bytes, video_id):
        try:
            # Upload video if needed
            if video_id not in self.uploaded_videos:
                with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as tmp:
                    tmp.write(video_bytes)
                    tmp_path = tmp.name

                try:
                    with open(tmp_path, 'rb') as video_file:
                        task = self.client.tasks.create(
                            index_id=self.index_id,
                            video_file=video_file
                        )

                    status = self.client.tasks.wait_for_done(task.id)
                    self.uploaded_videos[video_id] = status.video_id

                finally:
                    if os.path.exists(tmp_path):
                        os.unlink(tmp_path)

            # Generate summary
            twelve_labs_video_id = self.uploaded_videos[video_id]
            summary_result = self.client.summarize(
                video_id=twelve_labs_video_id,
                type="summary",
                prompt="Provide a concise summary of the main content and key moments in this video."
            )

            return summary_result.summary

        except Exception as e:
            return f"Error: {str(e)}"

summarizer = VideoSummarizer(
    twelve_labs_api_key=os.environ['TWELVE_LABS_API_KEY'],
    index_id=index.id
)

## Generate Video Summaries

Process search results to generate video summaries using Pegasus.

In [None]:
%%time
# Generate summaries for search results
total_videos = len(search_results)

for idx, (_, result) in enumerate(search_results.iterrows(), 1):
    video_id = result['video_id']
    print(f"Processing video {idx}/{total_videos}: {result['youtube_title'][:50]}...")

    summary = summarizer.summarize(result['video'], video_id)
    print(f"✅ Generated summary for {video_id}")

    # Update LanceDB with the summary
    try:
        lance_tbl.update(
            where=f"video_id = '{video_id}'",
            values={"video_summary": summary}
        )
        print(f"✅ Saved summary to LanceDB for {video_id}")
    except Exception as e:
        print(f"❌ Failed to save summary for {video_id}: {str(e)}")

print(f"\n🎉 Completed processing {total_videos} videos!")

## Final Results

Display search results with similarity scores and AI-generated summaries.

In [None]:
# Verify summaries were saved - query the table again
updated_results = (lance_tbl
                  .search(query_embedding)
                  .metric("cosine")
                  .limit(3)
                  .to_pandas())

# Display results with summaries
updated_results[['video_id', 'youtube_title', '_distance', 'video_summary']].head()

In [None]:
# Format results for better display
results_display = []
for _, result in updated_results.iterrows():
    similarity = 1 - result['_distance']
    summary = result['video_summary']

    results_display.append({
        'video_id': result['video_id'],
        'title': result['youtube_title'],
        'similarity': f"{similarity:.3f}",
        'duration': f"{result['duration']}s",
        'summary': summary[:150] + '...' if len(str(summary)) > 150 else str(summary)
    })

pd.DataFrame(results_display)

## Complete Video AI Pipeline

You've successfully built a semantic video search system with the following capabilities:
- Video embedding generation using Twelve Labs Marengo
- Efficient video storage and retrieval with LanceDB
- Natural language search through vector similarity
- AI-powered video summarization with Twelve Labs Pegasus

This architecture scales to large video datasets and enables sophisticated video content discovery applications.