In [1]:
import os
import random
import traceback
from dotenv import load_dotenv
from pymilvus import connections, utility, Collection, FieldSchema, DataType, CollectionSchema, db
from sentence_transformers import SentenceTransformer
from pymilvus import MilvusClient
from openai import OpenAI

load_dotenv()

MILVUS_HOST = os.getenv("MILVUS_HOST")
MILVUS_PORT = os.getenv("MILVUS_PORT")

DB_NAME = os.getenv("VIDEO_DATABASE")
COLLECTION_NAME = os.getenv("VIDEO_COLLECTION")

TWELVE_LABS_API_KEY=os.getenv("TWELVE_LABS_API_KEY")

print(f"http://{MILVUS_HOST}:{MILVUS_PORT}")

http://127.0.0.1:19530


In [2]:
client = MilvusClient(
            uri=f"http://{MILVUS_HOST}:{MILVUS_PORT}",
            db_name=DB_NAME
        )

In [3]:
def create_database(database_name):
    conn = connections.connect(host=MILVUS_HOST, port=MILVUS_PORT)
    if database_name in db.list_database():
        return
    database = db.create_database(database_name)
create_database(DB_NAME)





In [4]:

# Check if the collection already exists and drop it if it does
if client.has_collection(collection_name=COLLECTION_NAME):
    client.drop_collection(collection_name=COLLECTION_NAME)


In [5]:

# Create the collection
client.create_collection(
    collection_name=COLLECTION_NAME,
    dimension=1024  # The dimension of the Twelve Labs embeddings
)

print(f"Collection '{COLLECTION_NAME}' created successfully")

Collection 'video_embeddings' created successfully


In [6]:
from twelvelabs import TwelveLabs
from twelvelabs.models.embed import EmbeddingsTask
twelvelabs_client = TwelveLabs(api_key=TWELVE_LABS_API_KEY)


In [7]:
def generate_embedding(path):
    """
    Generate embeddings for a given video URL using the Twelve Labs API.

    This function creates an embedding task for the specified video URL using
    the Marengo-retrieval-2.6 engine. It monitors the task progress and waits
    for completion. Once done, it retrieves the task result and extracts the
    embeddings along with their associated metadata.

    Args:
        path (str): The path of the video to generate embeddings for.

    Returns:
        raw_data

    Raises:
        Any exceptions raised by the Twelve Labs API during task creation,
        execution, or retrieval.
    """

    # Create an embedding task
    task = twelvelabs_client.embed.task.create(
        model_name="Marengo-retrieval-2.7",
        video_file=video_file # video_url=video_url
    )
    print(f"Created task: id={task.id} model_name={task.model_name} status={task.status}")

    # Define a callback function to monitor task progress
    def on_task_update(task: EmbeddingsTask):
        print(f"  Status={task.status}")

    # Wait for the task to complete
    status = task.wait_for_done(
        sleep_interval=2,
        callback=on_task_update
    )
    print(f"Embedding done: {status}")

    # Retrieve the task result
    task_result = twelvelabs_client.embed.task.retrieve(task.id)
    
    return task_result

In [8]:
video_file="/home/ec2-user/ai-summit-lab-2025/sample_codes/videos/big_buck_bunny_480p_1mb.mp4" # from https://sample-videos.com/
task_result = generate_embedding(video_file)


Created task: id=67b7c9a49461ba0f74686e47 model_name=Marengo-retrieval-2.7 status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Statu

In [9]:
from typing import List
from twelvelabs.models.embed import SegmentEmbedding


In [10]:
def insert_embeddings(milvus_client, collection_name, task_result, video_path):
    """
    Insert embeddings into the Milvus collection.

    Args:
        milvus_client: The Milvus client instance.
        collection_name (str): The name of the Milvus collection to insert into.
        task_result (EmbeddingsTaskResult): The task result containing video embeddings.
        video_path (str): The path of the video associated with the embeddings.

    Returns:
        MutationResult: The result of the insert operation.

    This function takes the video embeddings from the task result and inserts them
    into the specified Milvus collection. Each embedding is stored with additional
    metadata including its scope, start and end times, and the associated video URL.
    """
    data = []

    def prapare_data(video_path: str, segments: List[SegmentEmbedding]):
        data=[]
        idx=1
        for segment in segments:
           # print(
           #     f"  embedding_scope={segment.embedding_scope}, start_offset_sec={segment.start_offset_sec}, end_offset_sec={segment.end_offset_sec}"
           # )
           # print(f"  embeddings: {segment.embeddings_float}")
            data.append({
                "id": idx,
                "vector":segment.embeddings_float,
                "embedding_scope": segment.embedding_scope,
                "start_offset_sec": segment.start_offset_sec,
                "end_offset_sec": segment.end_offset_sec,
                "video_path": video_path
            })
            
            idx+=1
        print(data)
        return data

    
    if task_result.video_embedding is not None and task_result.video_embedding.segments is not None:
        data=prapare_data(video_path, task_result.video_embedding.segments)
        insert_result = milvus_client.insert(collection_name=collection_name, data=data)
        print(f"Inserted {len(data)} embeddings into Milvus")
        return data, insert_result
    else:
        return None
embedding_data, insert_result=insert_embeddings(client,COLLECTION_NAME, task_result, video_file)

[{'id': 1, 'vector': [0.011293273, 0.011134445, 0.018624293, -0.024905238, -0.030021077, -0.032317936, -0.019025005, -0.00030190055, -0.016772691, -0.05898016, 0.009755327, -0.054828964, -0.045286957, 0.022679515, -0.0127837695, -0.0076851672, -0.068526424, -0.08044255, -0.015463732, -0.016182408, 0.018166859, -0.0031694872, 0.04374777, 0.052766092, 0.0016972977, 0.004626846, 0.011100962, 0.054884955, -0.021003945, 0.032034136, -0.00080434815, -0.049462974, -0.0013499564, 0.050285224, -0.022039011, 0.011312073, -0.008946639, 0.0061128805, -0.0017873591, -0.037824295, -0.013623947, -0.00805492, -0.029822953, -0.02412476, 0.032041248, 0.014222868, -0.016141023, -0.013313575, -0.039279792, 0.039204363, -0.0102516925, 0.015833903, 0.015445854, 0.025808094, 0.010192145, -0.051433302, -0.0013952032, -0.0075264703, 0.007911947, 0.038381714, -0.017480174, 0.030641701, -0.022933312, 0.006986844, 0.021974033, 0.046490073, -0.00957011, 0.024434194, -0.0114549445, -0.0057887062, 0.075518504, 0.013

In [11]:
def perform_similarity_search(milvus_client, collection_name, query_vector, limit=5):
    """
    Perform a similarity search on the Milvus collection.

    Args:
        milvus_client: The Milvus client instance.
        collection_name (str): The name of the Milvus collection to search in.
        query_vector (list): The query vector to search for similar embeddings.
        limit (int, optional): The maximum number of results to return. Defaults to 5.

    Returns:
        list: A list of search results, where each result is a dictionary containing
              the matched entity's metadata and similarity score.

    This function searches the specified Milvus collection for embeddings similar to
    the given query vector. It returns the top matching results, including metadata
    such as the embedding scope, time range, and associated video URL for each match.
    """
    search_results = milvus_client.search(
        collection_name=collection_name,
        data=[query_vector],
        limit=limit,
        output_fields=["embedding_scope", "start_offset_sec", "end_offset_sec", "video_path"]
    )

    return search_results
   

In [12]:
 
query_vector = embedding_data[0]['vector']

search_results = perform_similarity_search(client, COLLECTION_NAME, query_vector)

print("Search Results:")
for i, result in enumerate(search_results[0]):
    print(f"Result {i+1}:")
    print(f"  Video URL: {result['entity']['video_path']}")
    print(f"  Time Range: {result['entity']['start_offset_sec']} - {result['entity']['end_offset_sec']} seconds")
    print(f"  Similarity Score: {result['distance']}")
    print()


Search Results:
Result 1:
  Video URL: /home/ec2-user/ai-summit-lab-2025/sample_codes/videos/big_buck_bunny_480p_1mb.mp4
  Time Range: 0.0 - 5.76 seconds
  Similarity Score: 1.0



---
# References


- [video sample](https://sample-videos.com/0)
- [twelveslabs](https://docs.twelvelabs.io/docs/create-video-embeddings)
- [twelveslabs&milvus](https://milvus.io/docs/zh-hant/video_search_with_twelvelabs_and_milvus.md)