In [1]:
from pymilvus import MilvusClient
from twelvelabs import TwelveLabs
from twelvelabs.models.embed import EmbeddingsTask
import os

In [2]:
milvus_client = MilvusClient("milvus_twelvelabs_demo.db")

print("Successfully connected to Milvus")

collection_name = "twelvelabs_demo_collection"

if milvus_client.has_collection(collection_name=collection_name):
    milvus_client.drop_collection(collection_name=collection_name)

milvus_client.create_collection(
    collection_name=collection_name,
    dimension=1024  # The dimension of the Twelve Labs embeddings
)

print(f"Collection '{collection_name}' created successfully")

Successfully connected to Milvus
Collection 'twelvelabs_demo_collection' created successfully


In [3]:
TWELVE_LABS_API_KEY = os.getenv('TWELVE_LABS_API_KEY')
twelvelabs_client = TwelveLabs(api_key=TWELVE_LABS_API_KEY)

In [4]:
def generate_embedding(video_url):
    """
    Generate embeddings for a given video URL using the Twelve Labs API.

    This function creates an embedding task for the specified video URL using
    the Marengo-retrieval-2.6 engine. It monitors the task progress and waits
    for completion. Once done, it retrieves the task result and extracts the
    embeddings along with their associated metadata.

    Args:
        video_url (str): The URL of the video to generate embeddings for.

    Returns:
        tuple: A tuple containing two elements:
            1. list: A list of dictionaries, where each dictionary contains:
                - 'embedding': The embedding vector as a list of floats.
                - 'start_offset_sec': The start time of the segment in seconds.
                - 'end_offset_sec': The end time of the segment in seconds.
                - 'embedding_scope': The scope of the embedding (e.g., 'shot', 'scene').
            2. EmbeddingsTaskResult: The complete task result object from Twelve Labs API.

    Raises:
        Any exceptions raised by the Twelve Labs API during task creation,
        execution, or retrieval.
    """

    # Create an embedding task
    video_task = twelvelabs_client.embed.task.create(
        model_name="Marengo-retrieval-2.7",
        video_url=video_url
    )
    print(f"Created task: id={video_task.id} model_name={video_task.model_name} status={video_task.status}")

    # Define a callback function to monitor task progress
    def on_task_update(video_task: EmbeddingsTask):
        print(f"  Status={video_task.status}")

    # Wait for the task to complete
    status = video_task.wait_for_done(
        sleep_interval=2,
        callback=on_task_update
    )
    print(f"Embedding done: {status}")

    # Retrieve segmented video embeddings
    # task_result = twelvelabs_client.embed.task.retrieve(task.id)
    video_task = video_task.retrieve()

    video_segments = video_task.video_embedding.segments
    print(f"Retrieved {len(video_segments)} video segments")    

    # Extract and return the embeddings
    video_embeddings = []
    for segment in video_segments:
        video_embeddings.append({
            'embedding': segment.embeddings_float,
            'start_time': segment.start_offset_sec,
            'end_time': segment.end_offset_sec,
            'embedding_scope': segment.embedding_scope
        })
    
    return video_embeddings, video_segments


In [None]:
video_url = "http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4"
#video_url2= "https://www.youtube.com/watch?v=cBymtFSnJqU"
#video_url3="https://www.youtube.com/watch?v=931ni7_flRo"
video_embeddings, video_segments = generate_embedding(video_url)
#video_embeddings_2, video_segments_2 = generate_embedding(video_url2)
#video_embeddings_3, video_segments_3 = generate_embedding(video_url3)

Created task: id=679591f981c61d7813698ebb model_name=Marengo-retrieval-2.7 status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing
  Status=processing


In [24]:
import pickle
with open('embeddings.pkl', 'wb') as f:
    pickle.dump({'video_embeddings':video_embeddings, 'video_segments':video_segments}, f)
    pickle.dump({'video_embeddings':video_embeddings_2, 'video_segments':video_segments_2}, f)
    pickle.dump({'video_embeddings':video_embeddings_3, 'video_segments':video_segments_3}, f)
    

In [22]:
with open('embeddings.pkl', 'rb') as f:
    data = pickle.load(f)

video_embeddings = data['video_embeddings']
video_segments = data['data_segments']

({'video_embeddings': [{'embedding': [0.0068783234,
     -0.00729163,
     0.032936763,
     0.0147475805,
     0.0004509193,
     -0.058732983,
     0.010216852,
     -0.0107900575,
     0.013492671,
     -0.037403982,
     -0.021999722,
     -0.03039865,
     -0.0019278503,
     0.01788534,
     0.012924736,
     0.011753899,
     -0.024063474,
     -0.09012038,
     0.021896683,
     0.006487544,
     0.010387876,
     -0.02762893,
     0.06674046,
     0.04259518,
     0.012998299,
     -0.01387637,
     0.007397698,
     -0.010834219,
     -0.02403517,
     -0.031014148,
     -0.011390642,
     0.0076335417,
     -0.043314736,
     0.026069751,
     -0.023581678,
     -0.0037833375,
     0.014594813,
     -0.0064795073,
     -0.01648418,
     -0.026304655,
     -0.011824551,
     0.012401233,
     -0.07024261,
     -0.010345327,
     0.033803698,
     0.03137717,
     0.020379616,
     -0.008056862,
     -0.03816837,
     -0.016470522,
     -0.023214314,
     0.051605448,
     0.0

In [8]:
print(f"Generated {len(video_embeddings)} embeddings for the video")
for i, emb in enumerate(video_embeddings):
    print(f"Embedding {i+1}:")
    print(f"  Scope: {emb['embedding_scope']}")
    print(f"  Time range: {emb['start_time']} - {emb['end_time']} seconds")
    print(f"  Embedding vector (first 5 values): {emb['embedding'][:5]}")
    print()


Generated 100 embeddings for the video
Embedding 1:
  Scope: clip
  Time range: 0.0 - 6.0 seconds
  Embedding vector (first 5 values): [0.0068783234, -0.00729163, 0.032936763, 0.0147475805, 0.0004509193]

Embedding 2:
  Scope: clip
  Time range: 6.0 - 12.0 seconds
  Embedding vector (first 5 values): [0.0074522155, -0.008839504, 0.034555014, 0.01722106, -0.010919876]

Embedding 3:
  Scope: clip
  Time range: 12.0 - 18.0 seconds
  Embedding vector (first 5 values): [0.022766022, -0.0035799714, 0.019664006, -0.023654077, -0.003662397]

Embedding 4:
  Scope: clip
  Time range: 18.0 - 24.0 seconds
  Embedding vector (first 5 values): [0.024753511, 0.0034212, 0.021475725, -0.0071854503, -0.01930795]

Embedding 5:
  Scope: clip
  Time range: 24.0 - 30.0 seconds
  Embedding vector (first 5 values): [0.027265737, 0.042607695, 0.012588095, 0.010838325, -0.002919223]

Embedding 6:
  Scope: clip
  Time range: 30.0 - 36.0 seconds
  Embedding vector (first 5 values): [0.017960895, 0.01399288, 0.003

In [20]:
# Insert text and all video segment embeddings
def insert_embeddings(milvus_client, collection_name, video_embeddings, video_segments, video_url):
    video_entry = []

    # Insert each video segment embedding
    id = 0
    for segment in video_segments[0:]:
        print(type(segment.embeddings_float))
        print(type(segment.start_offset_sec))

        segment.embedding_scope
        video_entry.append({
            'id': id,
            'vector': segment.embeddings_float,
            'start_time': segment.start_offset_sec,
            'end_time': segment.end_offset_sec,
            'embedding_scope': segment.embedding_scope,
            "video_url": video_url
        })
        id += 1
        
        #st.write(f"Inserted {len(embeddings_data['video_embeddings'])} video segment embeddings")
        insert_result = milvus_client.insert(collection_name=collection_name, data = video_entry)
        # return insert_result
        
    # except Exception as e:
    #     print(f"Error inserting embeddings: {str(e)}")
    #     return False

insert_result = insert_embeddings(milvus_client, collection_name, video_embeddings, video_segments, video_url)
# print(insert_result)

<class 'list'>
<class 'float'>
<class 'list'>
<class 'float'>
<class 'list'>
<class 'float'>
<class 'list'>
<class 'float'>
<class 'list'>
<class 'float'>
<class 'list'>
<class 'float'>
<class 'list'>
<class 'float'>
<class 'list'>
<class 'float'>
<class 'list'>
<class 'float'>
<class 'list'>
<class 'float'>
<class 'list'>
<class 'float'>
<class 'list'>
<class 'float'>
<class 'list'>
<class 'float'>
<class 'list'>
<class 'float'>
<class 'list'>
<class 'float'>
<class 'list'>
<class 'float'>
<class 'list'>
<class 'float'>
<class 'list'>
<class 'float'>
<class 'list'>
<class 'float'>
<class 'list'>
<class 'float'>
<class 'list'>
<class 'float'>
<class 'list'>
<class 'float'>
<class 'list'>
<class 'float'>
<class 'list'>
<class 'float'>
<class 'list'>
<class 'float'>
<class 'list'>
<class 'float'>
<class 'list'>
<class 'float'>
<class 'list'>
<class 'float'>
<class 'list'>
<class 'float'>
<class 'list'>
<class 'float'>
<class 'list'>
<class 'float'>
<class 'list'>
<class 'float'>
<class '

In [54]:
# Search for similar video segments using text query

def perform_similarity_search(milvus_client, collection_name, query_vector, limit=5):
    """
    Perform a similarity search on the Milvus collection.

    Args:
        milvus_client: The Milvus client instance.
        collection_name (str): The name of the Milvus collection to search in.
        query_vector (list): The query vector to search for similar embeddings.
        limit (int, optional): The maximum number of results to return. Defaults to 5.

    Returns:
        list: A list of search results, where each result is a dictionary containing
              the matched entity's metadata and similarity score.

    This function searches the specified Milvus collection for embeddings similar to
    the given query vector. It returns the top matching results, including metadata
    such as the embedding scope, time range, and associated video URL for each match.
    """
    search_results = milvus_client.search(
        collection_name=collection_name,
        data=[query_vector],
        limit=limit,
        output_fields=["embedding_scope", "start_offset_sec", "end_offset_sec", "video_url"]
    )

    return search_results

In [55]:
# Function to get embedding for a text query
def get_text_embedding(text_query):
    # Twelve Labs Embed API supports text-to-embedding
    text_embedding = twelvelabs_client.embed.create(
      model_name="Marengo-retrieval-2.7",
      text=text_query,
      text_truncate="start"
    )

    return text_embedding.text_embedding.segments[0].embeddings_float

In [58]:
# define the query vector: text-based search
text_query = "A big white rabbit is walking in the forrest. "
query_vector = get_text_embedding(text_query)

# Perform a similarity search on the Milvus collection
search_results = perform_similarity_search(milvus_client, collection_name, query_vector)
print(search_results)



data: ["[{'id': 10, 'distance': 0.39097675681114197, 'entity': {'embedding_scope': 'clip', 'video_url': 'http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4'}}]"]
