# TwelveLabs on Amazon Bedrock Workshop

TwelveLabs is a leading provider of multimodal AI models specializing in video understanding and analysis. TwelveLabs' advanced models enable sophisticated video search, analysis, and content generation capabilities through state-of-the-art computer vision and natural language processing technologies. Amazon Bedrock now offers two TwelveLabs models: TwelveLabs Pegasus 1.2, which provides comprehensive video understanding and analysis, and TwelveLabs Marengo Embed 2.7, which generates high-quality embeddings for video, text, audio, and image content. These models empower developers to build applications that can intelligently process, analyze, and derive insights from video data at scale.

### TwelveLabs Video Understanding Models
TwelveLabs’ video understanding models consist of a family of deep neural networks built on our multimodal foundation model for video understanding that you can use for the following downstream tasks:
- Search using natural language queries
- Analyze videos to generate text

Videos contain multiple types of information, including visuals, sounds, spoken words, and texts. The human brain combines all types of information and their relations with each other to comprehend the overall meaning of a scene. For example, you’re watching a video of a person jumping and clapping, both visual cues, but the sound is muted. You might realize they’re happy, but you can’t understand why they’re happy without the sound. However, if the sound is unmuted, you could realize they’re cheering for a soccer team that scored a goal.

Thus, an application that analyzes a single type of information can’t provide a comprehensive understanding of a video. TwelveLabs’ video understanding models, however, analyze and combine information from all the modalities to accurately interpret the meaning of a video holistically, similar to how humans watch, listen, and read simultaneously to understand videos.

Our video understanding models have the ability to identify, analyze, and interpret a variety of elements, including but not limited to the following:
| Element | Modality | Example |
|---------|----------|---------|
| People, including famous individuals | Visual | Michael Jordan, Steve Jobs |
| Actions | Visual | Running, dancing, kickboxing |
| Objects | Visual | Cars, computers, stadiums |
| Animals or pets | Visual | Monkeys, cats, horses |
| Nature | Visual | Mountains, lakes, forests |
| Text displayed on the screen (OCR) | Visual | License plates, handwritten words, number on a player's jersey |
| Brand logos | Visual | Nike, Starbucks, Mercedes |
| Shot techniques and effects | Visual | Aerial shots, slow motion, time-lapse |
| Counting objects | Visual | Number of people in a crowd, items on a shelf, vehicles in traffic |
| Sounds | Audio | Chirping (birds), applause, fireworks popping or exploding |
| Human speech | Audio | "Good morning. How may I help you?" |
| Music | Audio | Ominous music, whistling, lyrics |

### Modalities
Modalities represent the types of information that the models process and analyze in a video. These modalities are central to both indexing and searching video content.

The models support the following modalities: 

- **Visual**: Analyzes visual content in a video, including actions, objects, events, text (through Optical Character Recognition, or OCR), and brand logos.
- **Audio**: Analyzes audio content in a video, including ambient sounds, music, and human speech.

## Part 0: Setup

### Dependencies

In [None]:
%pip install -r requirements.txt -Uq

In [None]:
import boto3, botocore
import json
import re
import pandas as pd
import numpy as np
import uuid
import time
import base64
from IPython.display import clear_output, HTML, display, Image
from sklearn.metrics.pairwise import cosine_similarity

### Configure boto3

In [None]:
# Initialize AWS session
session = boto3.Session() # TODO: (optional) replace with your AWS profile, keep as is to use the default profile

In [None]:
# Get AWS Region from session
AWS_REGION = session.region_name
# AWS_REGION = "us-east-1" # OPTIONAL: Manual override for workshop region

print(f"AWS Region: {AWS_REGION}")

workshop_supported_regions = [
    "us-east-1", # N. Virginia
    "eu-west-1", # Ireland
    "ap-northeast-2" # Seoul
]

if AWS_REGION not in workshop_supported_regions:
    raise ValueError(f"AWS Region {AWS_REGION} is not supported for this workshop. Please use one of the following regions: {workshop_supported_regions}")

In [None]:
# Initialize AWS clients
bedrock_client = session.client('bedrock-runtime')
s3_client = session.client('s3')
s3vectors_client = session.client('s3vectors')

### Configure S3 bucket

In [None]:
# S3 Configuration
S3_BUCKET_NAME = "<YOUR_S3_BUCKET_NAME>" # TODO: Replace with your S3 bucket name
S3_VIDEOS_PATH = "videos"
S3_IMAGES_PATH = "images"
S3_EMBEDDINGS_PATH = "embeddings"

# Validate S3 bucket name
if S3_BUCKET_NAME == "<YOUR_S3_BUCKET_NAME>" or S3_BUCKET_NAME == "":
    raise ValueError("Please replace <YOUR_S3_BUCKET_NAME> with your S3 bucket name")

### Configure Amazon S3 Vector bucket

In [None]:
# S3 Vector Bucket Configuration
# The S3 Vector Bucket name must be unique name in your AWS account and region
# S3 Vector bucket names may only consist of lowercase letters, numbers and hyphens
S3_VECTOR_BUCKET_NAME = "<YOUR_S3_VECTOR_BUCKET_NAME>" # TODO: Replace with your S3 vector bucket name

S3_VECTOR_INDEX_NAME = "my-vector-index"

# Validate S3 Vector bucket and index names
if S3_VECTOR_BUCKET_NAME == "<YOUR_S3_VECTOR_BUCKET_NAME>" or S3_VECTOR_BUCKET_NAME == "":
    raise ValueError("Please replace <YOUR_S3_VECTOR_BUCKET_NAME> with your S3 vector bucket name")

### Bedrock model access


Follow the [Bedrock model access documentation](https://docs.aws.amazon.com/bedrock/latest/userguide/model-access-modify.html) to enable access to TwelveLabs models on Bedrock. Make sure to enable access in the same region you are running this workshop.

## Part 1: Multimodal Embeddings with Marengo

### Part 1a: What is an embedding?

Use TwelveLabs Marengo to create multimodal embeddings for videos, texts, images, and audio files. These embeddings are contextual vector representations (a series of numbers) that capture interactions between modalities, such as visual expressions, body language, spoken words, and video context. You can apply these embeddings to downstream tasks like training custom multimodal models for anomaly detection, diversity sorting, sentiment analysis, recommendations, or building Retrieval-Augmented Generation (RAG) systems.

Key features:
- **Native multimodal support**: Process all modalities natively without separate models or frame conversion.
- **State-of-the-art performance**: Captures motion and temporal information for accurate video interpretation.
- **Unified vector space**: Combines embeddings from different modalities for holistic understanding.
- **Fast and reliable**: Reduces processing time for large video sets.
- **Flexible segmentation**: Generate embeddings for video segments or the entire video.

Use cases:
- **Anomaly detection**: Identify unusual patterns, such as corrupt videos with black backgrounds, to improve data set quality.
- **Diversity sorting**: Organize data for broad representation, reducing bias and improving AI model training.
- **Sentiment analysis**: Combine vocal tone, facial expressions, and spoken language for accurate insights, which particularly useful for customer service.
- **Recommendations**: Use embeddings in similarity-based retrieval and ranking systems for recommendations.

To learn more about embeddings, check out [The Multimodal Evolution of Vector Embeddings](https://www.twelvelabs.io/blog/multimodal-embeddings) on the TwelveLabs Blog!

In [None]:
# Sample embeddings
sample_embedding_1 = np.random.rand(1, 1024)
sample_embedding_2 = np.random.rand(1, 1024)

df_embedding_1 = pd.DataFrame(sample_embedding_1)
df_embedding_2 = pd.DataFrame(sample_embedding_2)

df_embedding_1


In [None]:
# Sample video embedding
sample_video_embedding = np.random.rand(5, 1024)
df_video_embedding = pd.DataFrame(sample_video_embedding)
df_video_embedding

### Part 1b: Calculating cosine similarity

Cosine similarity measures the similarity between two vectors by calculating the cosine of the angle between them in high-dimensional space. Unlike distance metrics that consider magnitude, cosine similarity focuses purely on the orientation or direction of vectors, making it particularly useful for comparing text embeddings, documents, and other high-dimensional data.

The multimodal vector embeddings from TwelveLabs Marengo can be used to calculate the similarity across text, image, audio, and video.

***Formula***

The cosine similarity between two vectors **A** and **B** is calculated as:

```
cos(θ) = (A · B) / (||A|| × ||B||)
```

Where:
- **A · B** is the dot product of vectors A and B
- **||A||** and **||B||** are the magnitudes (norms) of vectors A and B respectively
- **θ** is the angle between the two vectors

***Key Characteristics***
- **Range**: Values range from -1 to 1
  - **1**: Identical direction (perfect similarity)
  - **0**: Orthogonal vectors (no similarity)
  - **-1**: Opposite directions (perfect dissimilarity)
- **Magnitude Independence**: Only considers vector direction, not size
- **Symmetric**: cos(A,B) = cos(B,A)

***Benefits***
- **Scale Invariant**: Ideal for comparing vectors of different magnitudes
- **Computationally Efficient**: Fast calculation, especially with sparse vectors
- **Robust for Text Analysis**: Perfect for document similarity and text embeddings
- **Handles High Dimensions**: Works well in high-dimensional spaces without curse of dimensionality issues
- **Intuitive Results**: Easy to interpret similarity scores between 0 and 1 for most applications

***Drawbacks***
- **Ignores Magnitude**: Completely disregards vector size, which may contain important information
- **Limited with Negative Values**: Can be less meaningful when dealing with vectors containing negative components
- **Not Always Intuitive**: May not align with human perception of similarity in certain domains
- **Loses Information**: Discarding magnitude means losing potentially valuable signal strength data
- **Poor for Sparse Positive Data**: May not distinguish well between vectors with very few non-zero elements

In [None]:
# Cosine similarity between two single segment embeddings
similarity = cosine_similarity(df_embedding_1, df_embedding_2)
pd.DataFrame(similarity)

In [None]:
# Cosine similarity with a multi-segment embedding
similarities = cosine_similarity(df_video_embedding, df_embedding_1)
pd.DataFrame(similarities)

In [None]:
# Getting the max similarity and the index of the max similarity
max_similarity = np.max(similarities)
max_similarity_index = np.argmax(similarities)

print(f"Max similarity: {max_similarity}")
print(f"Index of max similarity: {max_similarity_index}")

---
## Part 2: Building Multimodal Video Search


### Part 2a: Storing videos in S3

#### Set up sample dataset to S3 bucket

In [None]:
# AWS Account ID for S3 bucket ownership
aws_account_id = session.client('sts').get_caller_identity()["Account"]

print(f"AWS Account ID: {aws_account_id}")
print(f"S3 Bucket: {S3_BUCKET_NAME}")
print(f"S3 Videos Path: {S3_VIDEOS_PATH}")
print(f"S3 Images Path: {S3_IMAGES_PATH}")
print(f"S3 Embeddings Path: {S3_EMBEDDINGS_PATH}")

# Verify bucket access
try:
    s3_client.head_bucket(Bucket=S3_BUCKET_NAME)
    print(f"✅ Successfully connected to S3 bucket: {S3_BUCKET_NAME}")
except Exception as e:
    print(f"❌ Error accessing S3 bucket: {e}")
    print("Please ensure the bucket exists and you have proper permissions.")


#### Netflix Open Content

The [Netflix Open Content](https://opencontent.netflix.com/) is an open source content available under the [Creative Commons Attribution 4.0 International Public License](https://www.google.com/url?q=https%3A%2F%2Fcreativecommons.org%2Flicenses%2Fby%2F4.0%2Flegalcode&sa=D&sntz=1&usg=AOvVaw3DDX6ldzWtAO5wOs5KkByf).

The assets are available for download at: http://download.opencontent.netflix.com/

We will be utilizing a subset of the videos for demonstrating how to utilize the TwelveLabs models on Amazon Bedrock.

In [None]:
# Sample video S3 URIs
sample_videos = [
    # 's3://download.opencontent.netflix.com/TechblogAssets/CosmosLaundromat/encodes/CosmosLaundromat_2048x858_24fps_SDR.mp4',
    # 's3://download.opencontent.netflix.com/TechblogAssets/Meridian/encodes/Meridian_3840x2160_5994fps_SDR.mp4',
    's3://download.opencontent.netflix.com/TechblogAssets/Sparks/encodes/Sparks_4096x2160_5994fps_SDR.mp4'
]

In [None]:
# Unsigned S3 client
public_s3_client = boto3.client('s3', config=botocore.client.Config(signature_version=botocore.UNSIGNED))

In [None]:
def parse_s3_uri(s3_uri: str) -> tuple[str, str]:
    """
    Parses an S3 URI like s3://bucket-name/path/to/object and returns (bucket, key)

    Args:
        s3_uri (str): The S3 URI to parse
        
    Returns:
        tuple[str, str]: The bucket and key
    """
    pattern = r'^s3://([^/]+)/(.+)$'
    match = re.match(pattern, s3_uri)
    if not match:
        raise ValueError(f"Invalid S3 URI format: {s3_uri}")
    return match.group(1), match.group(2)


def copy_public_s3_object_to_private_bucket(public_s3_uri: str, dest_bucket: str, dest_key: str, aws_profile: str = 'default') -> None:
    """
    Copies a public S3 object to a private bucket

    Args:
        public_s3_uri (str): The S3 URI of the public object to copy
        dest_bucket (str): The name of the private bucket to copy to
        dest_key (str): The key of the object to copy to
        aws_profile (str): The AWS profile to use for the authenticated client
    """

    # Parse source bucket and key
    source_bucket, source_key = parse_s3_uri(public_s3_uri)

    # Anonymous client to read public object
    anon_s3 = boto3.client('s3', config=botocore.client.Config(signature_version=botocore.UNSIGNED))

    print(f"Downloading from {public_s3_uri}...")
    response = anon_s3.get_object(Bucket=source_bucket, Key=source_key)
    data = response['Body'].read()

    print(f"Uploading to s3://{dest_bucket}/{dest_key} ...")
    s3_client.put_object(Bucket=dest_bucket, Key=dest_key, Body=data)

    print("✅ Copy completed successfully!")

In [None]:
# Copy videos to the S3 bucket
for video_uri in sample_videos:
    # Extract the filename from the S3 key
    _, src_key = parse_s3_uri(video_uri)
    filename = src_key.split("/")[-1]
    dest_key = f"{S3_VIDEOS_PATH}/{filename}"
    copy_public_s3_object_to_private_bucket(
        public_s3_uri=video_uri,
        dest_bucket=S3_BUCKET_NAME,
        dest_key=dest_key
    )

### Part 2b: Creating vector embeddings with Marengo on Bedrock

#### TwelveLabs Marengo

Marengo is an embedding model for comprehensive video understanding. Marengo analyzes multiple modalities in video content, including visuals, audio, and text, to provide a holistic understanding similar to human comprehension.

***Key features***
- **Multimodal processing:** Combines visual, audio, and text elements for comprehensive understanding
- **Fine-grained search:** Detects brand logos, text, and small objects (as small as 10% of the video frame)
- **Motion search:** Identifies and analyzes movement within videos
- **Counting capabilities:** Accurately counts objects in video frames
- **Audio comprehension:** Analyzes music, lyrics, sound, and silence

***Use cases***
- **Search:** Use natural language queries to find specific content within videos
- **Embeddings:** Create video embeddings for various downstream applications

#### Marengo Embed 2.7 on Bedrock

A multimodal embedding model that generates high-quality vector representations of video, text, audio, and image content for similarity search, clustering, and other machine learning tasks. The model supports multiple input modalities and provides specialized embeddings optimized for different use cases.

The model supports synchronous inference through the [InvokeModel API](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_InvokeModel.html) and asynchronous inference through the [StartAsyncInvoke API](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_StartAsyncInvoke.html).
- Provider — TwelveLabs
- Categories — Embeddings, multimodal
- Model ID — `twelvelabs.marengo-embed-2-7-v1:0`
- Input modality — Video, Text, Audio, Image
- Output modality — Embeddings
- Max video size — 2 hours long video (< 2GB file size)

| API operation | Supported model types | Input modalities | Output modalities |
|---------------|-----------------------|------------------|-------------------|
| InvokeModel | [Inference profiles](https://docs.aws.amazon.com/bedrock/latest/userguide/inference-profiles-support.html) | Text, Image | Embedding |
| StartAsyncInvoke | [Base models](https://docs.aws.amazon.com/bedrock/latest/userguide/models-supported.html) | Video, Audio, Image, Text | Embedding |

**Resources:**
- [AWS Docs: TwelveLabs Marengo Embed 2.7](https://docs.aws.amazon.com/bedrock/latest/userguide/model-parameters-marengo.html)
- [TwelveLabs Docs: Marengo](https://docs.twelvelabs.io/v1.3/docs/concepts/models/marengo)


In [None]:
# Marengo model configuration
MARENGO_MODEL_ID = 'twelvelabs.marengo-embed-2-7-v1:0'

In [None]:
MARENGO_INFERENCE_ID_REGIONS = {
    "us-east-1": "us.twelvelabs.marengo-embed-2-7-v1:0",
    "eu-west-1": "eu.twelvelabs.marengo-embed-2-7-v1:0",
    "ap-northeast-2": "apac.twelvelabs.marengo-embed-2-7-v1:0"
}

In [None]:
try:
    MARENGO_INFERENCE_ID = MARENGO_INFERENCE_ID_REGIONS[AWS_REGION]
    print(MARENGO_INFERENCE_ID)
except KeyError:
    raise ValueError(f"Marengo is not supported for {AWS_REGION}")

##### Creating a text embedding with Marengo InvokeModel API

Marengo Embed 2.7 supports synchronous invocation for text and image inputs using the [InvokeModel API](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_InvokeModel.html) on Amazon Bedrock.

**Text** input for the InvokeModel API can be used with the following model input:
```
model_input = { 
    "inputType": "text",
    "inputText": text_query
}
```

In [None]:
# Wrapper function to create text embedding
def create_text_embedding(text_query: str) -> list:
    """
    Create embeddings for text using Marengo on Bedrock

    Args:
        text_query (str): The text query to create an embedding for
        
    Returns:
        list: A list of embedding data
    """
    
    model_input = { 
        "inputType": "text",
        "inputText": text_query
    }

    response = bedrock_client.invoke_model(
        modelId=MARENGO_INFERENCE_ID,
        body=json.dumps(model_input)
    )
    
    embedding_data = json.loads(response['body'].read().decode('utf-8'))['data']
    
    return embedding_data

In [None]:
# Example: Create text embedding
text_query = "two people having a conversation in a car"

print(f"Creating text embedding for query")
text_embedding_data = create_text_embedding(text_query)

print(f"✅ Text embedding created successfully with {len(text_embedding_data)} segment and {len(text_embedding_data[0]['embedding'])} dimensions.")

##### Creating an image embedding with Marengo InvokeModel API

**Image** input for the InvokeModel API can be defined as a Base64-encoded string or as an S3 location.

1. Base64-encoded string
```
    {
        "mediaSource": {
            "base64String": "base64-encoded string"
        }
    }
```
- `base64String` – The Base64-encoded string for the media.

2. S3 location – Specify the S3 URI and the
```
    {
        "s3Location": {
            "uri": "string",
            "bucketOwner": "string"
        }
    }
```
- `uri` – The S3 URI containing the media.
- `bucketOwner` – The AWS account ID of the S3 bucket owner.


In [None]:
# Choose image
image_path = "assets/images/image.jpg"

In [None]:
# Wrapper function to create image embedding
def create_image_embedding(image_path: str) -> list:
    """
    Create embeddings for image using Marengo on Bedrock
    
    Args:
        image_path (str): The path to the image to create an embedding for
        
    Returns:
        list: A list of embedding data
    """

    pattern = r'^s3://([^/]+)/(.+)$'
    match = re.match(pattern, image_path)
    if match:
        # image is located on S3
        media_source = {
            "s3Location": {
                "uri": image_path,
                "bucketOwner": aws_account_id
            }
        }
    else:
        # image is a local file path
        media_source = {
            "base64String": base64.b64encode(open(image_path, "rb").read()).decode('utf-8')
        }

    model_input = { 
        "inputType": "image",
        "mediaSource": media_source
    }

    response = bedrock_client.invoke_model(
        modelId=MARENGO_INFERENCE_ID,
        body=json.dumps(model_input)
    )
    
    embedding_data = json.loads(response['body'].read().decode('utf-8'))['data']
    
    return embedding_data

In [None]:
# Example: Create image embedding
print(f"Creating embeddings for image at {image_path}")
image_embedding_data = create_image_embedding(image_path)

print(f"✅ Image embedding created successfully with {len(image_embedding_data)} segment(s)")

##### Creating a video embedding with Marengo StartAsyncInvoke API

**Video** and **audio** inputs can be processed by Marengo with the [StartAsyncInvoke API](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_StartAsyncInvoke.html). The model outputs will land in the S3 location specified by `outputDataConfig`.

Since the StartAsyncInvoke API asynchronously executes the task, the helper function below triggers the task and waits for it to complete. It then retrieves the outputs from the output S3 location.

In [None]:
# Helper function to wait for async embedding results
def wait_for_embedding_output(s3_bucket: str, s3_prefix: str, invocation_arn: str, verbose: bool = False) -> list:
    """
    Wait for Bedrock async embedding task to complete and retrieve results

    Args:
        s3_bucket (str): The S3 bucket name
        s3_prefix (str): The S3 prefix for the embeddings
        invocation_arn (str): The ARN of the Bedrock async embedding task

    Returns:
        list: A list of embedding data
        
    Raises:
        Exception: If the embedding task fails or no output.json is found
    """
    
    # Wait until task completes
    status = None
    while status not in ["Completed", "Failed", "Expired"]:
        response = bedrock_client.get_async_invoke(invocationArn=invocation_arn)
        status = response['status']
        if verbose:
            clear_output(wait=True)
            print(f"Embedding task status: {status}")
        time.sleep(5)
    
    if status != "Completed":
        raise Exception(f"Embedding task failed with status: {status}")
    
    # Retrieve the output from S3
    response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=s3_prefix)
    
    for obj in response.get('Contents', []):
        if obj['Key'].endswith('output.json'):
            output_key = obj['Key']
            obj = s3_client.get_object(Bucket=s3_bucket, Key=output_key)
            content = obj['Body'].read().decode('utf-8')
            data = json.loads(content).get("data", [])
            return data
    
    raise Exception("No output.json found in S3 prefix")

In [None]:
# Wrapper function to create and retrieve video embeddings
def create_video_embedding(video_s3_uri: str) -> list:
    """
    Create embeddings for video using Marengo on Bedrock
    
    Args:
        video_s3_uri (str): The S3 URI of the video to create an embedding for
        
    Returns:
        list: A list of embedding data
    """
    
    unique_id = uuid.uuid4()
    s3_output_prefix = f'{S3_EMBEDDINGS_PATH}/{S3_VIDEOS_PATH}/{unique_id}'
    
    response = bedrock_client.start_async_invoke(
        modelId=MARENGO_MODEL_ID,
        modelInput={
            "inputType": "video",
            "mediaSource": {
                "s3Location": {
                    "uri": video_s3_uri,
                    "bucketOwner": aws_account_id
                }
            }
        },
        outputDataConfig={
            "s3OutputDataConfig": {
                "s3Uri": f's3://{S3_BUCKET_NAME}/{s3_output_prefix}'
            }
        }
    )
    
    invocation_arn = response["invocationArn"]
    print(f"Video embedding task started: {invocation_arn}")
    
    # Wait for completion and get results
    try:
        embedding_data = wait_for_embedding_output(S3_BUCKET_NAME, s3_output_prefix, invocation_arn)
    except Exception as e:
        print(f"Error waiting for embedding output: {e}")
        return None
    
    return embedding_data


In [None]:
# Example: Create video embeddings
videos = s3_client.list_objects_v2(Bucket=S3_BUCKET_NAME, Prefix=S3_VIDEOS_PATH)["Contents"]
video_uri = f"s3://{S3_BUCKET_NAME}/{videos[0]['Key']}"

print(f"Creating embeddings for video: {video_uri}")
video_embedding_data = create_video_embedding(video_uri)

print(f"✅ Video embedding created successfully with {len(video_embedding_data)} segment(s)")

### Part 2c: Creating a vector index in Amazon S3 Vectors

Amazon S3 Vectors is an Amazon Simple Storage Service (S3) feature designed for storing and querying large collections of vector embeddings. It's a purpose-built, durable vector store that integrates vector data directly into the S3 object storage infrastructure, enabling sub-second query performance on vector embeddings. For more information, please refer to the [Amazon S3 Vectors User Guide](https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-vectors.html). 

#### Configure Amazon S3 Vector Index

In [None]:
try:
    s3vectors_client.get_vector_bucket(vectorBucketName=S3_VECTOR_BUCKET_NAME)
    print(f'S3 Vector Bucket {S3_VECTOR_BUCKET_NAME} already exists')
except botocore.exceptions.ClientError as e:
    # bucket does not exist, create it
    s3vectors_client.create_vector_bucket(vectorBucketName=S3_VECTOR_BUCKET_NAME)

try:
    s3vectors_client.get_index(vectorBucketName=S3_VECTOR_BUCKET_NAME, indexName=S3_VECTOR_INDEX_NAME)
    print(f'Amazon S3 Vector Index {S3_VECTOR_INDEX_NAME} already exists')
except botocore.exceptions.ClientError as e:
    # Twelvelabs Marengo embeddings are 1024 dimensions with 32-bit floating point
    s3vectors_client.create_index(vectorBucketName=S3_VECTOR_BUCKET_NAME, 
        indexName=S3_VECTOR_INDEX_NAME,
        dataType='float32',
        dimension=1024,
        distanceMetric='cosine')

print(f'S3 Vector Bucket {S3_VECTOR_BUCKET_NAME} and Index {S3_VECTOR_INDEX_NAME} ready')

#### Bulk process videos in S3 with Marengo

In [None]:
def index_video_embeddings(s3_vector_bucket: str, s3_vector_index: str, video_s3_uri: str, video_embeddings: list) -> int:
    """ 
    Index video embeddings in S3 Vector Index 
   
     Args:
        s3_vector_bucket (str): The name of the bucket to use
        s3_vector_index (str): The name of the index to use
        video_embeddings (list): The list of video embeddings

    Returns:
        int: The number of documents indexed
    """
    embeddings = []
    for ve in video_embeddings:
        embeddings.append({
            "key": f'{ve["embeddingOption"]} {ve["startSec"]} {ve["endSec"]}',
            "data": {"float32": ve["embedding"]},
            "metadata": {
                "embeddingOption": ve["embeddingOption"], 
                "startSec": ve["startSec"], 
                "endSec": ve["endSec"],
                "video_s3_uri": video_s3_uri,
            }
        })

    # Write embeddings into vector index with metadata.
    s3vectors_client.put_vectors(
        vectorBucketName=s3_vector_bucket,   
        indexName=s3_vector_index,   
        vectors=embeddings
    )

    return len(embeddings)

In [None]:
# Clear existing video embeddings in S3 bucket
response = s3_client.list_objects_v2(Bucket=S3_BUCKET_NAME, Prefix=f"{S3_EMBEDDINGS_PATH}/{S3_VIDEOS_PATH}")

# Empty video embeddings in S3 bucket
try:
    if 'Contents' in response:
        objects_to_delete = [{'Key': obj['Key']} for obj in response['Contents']]
        s3_client.delete_objects(
            Bucket=S3_BUCKET_NAME,
            Delete={'Objects': objects_to_delete}
        )
        print(f"✅ Removed existing video embeddings successfully.")
    else:
        print(f"✅ No existing video embeddings found.")
except Exception as e:
    print(f"❌ Error emptying video embeddings: {e}")


In [None]:
# Retrieve the list of videos in the s3 bucket and loop through them to create embeddings
videos = s3_client.list_objects_v2(Bucket=S3_BUCKET_NAME, Prefix=S3_VIDEOS_PATH)["Contents"]

video_embeddings = []

for video in videos:
    video_uri = f"s3://{S3_BUCKET_NAME}/{video['Key']}"
    print(f"Creating embeddings for video: {video_uri}")
    results = create_video_embedding(video_uri)

    print(f"✅ Video embedding created successfully with {len(results)} segment(s) from {video['Key']}")

    video_embeddings.append({
        "video_s3_uri": video_uri,
        "embeddings_data": results
    })

#### Insert embeddings into S3 Vector Index

In [None]:
for video_embedding in video_embeddings:

    # Use the index_video_embeddings function to index the embedding data into Amazon S3 vectors
    num_indexed = index_video_embeddings(S3_VECTOR_BUCKET_NAME, S3_VECTOR_INDEX_NAME, video_embedding['video_s3_uri'], video_embedding['embeddings_data'])

    print(f"✅ Indexed {len(video_embedding['embeddings_data'])} segments from {video_embedding['video_s3_uri']}")

### Part 2d: Querying for multimodal video search

In [None]:
# Helper function to play a video at a specific start time
def play_video(video_url: str, start_time: float) -> None:
    """
    Play a video at a specific start time.

    Args:
        video_url (str): The URL of the video to play.
        start_time (float): The start time of the video in seconds.
    """

    # HTML code for the video player
    html_code = f"""
    <video width="640" controls>
        <source src="{video_url}#t={start_time}" type="video/mp4">
    </video>
    """
    display(HTML(html_code))

#### Query with text

In [None]:
# Text Query Search Function
def search_videos_by_text(query_text: str, top_k: int=5) -> list:
    """
    Search for video segments using text queries

    Args:
        query_text (str): The text query to search for.
        top_k (int): The number of videos to return.

    Returns:
        list: A list of video segments that match the query.
    """
    # Generate embedding for the text query
    print(f"Generating embedding for query: '{query_text}'")
    query_embedding_data = create_text_embedding(query_text)
    query_embedding = query_embedding_data[0]["embedding"]

    # Search S3 Vector Index
    response = s3vectors_client.query_vectors(
        vectorBucketName=S3_VECTOR_BUCKET_NAME,
        indexName=S3_VECTOR_INDEX_NAME,
        queryVector={"float32": query_embedding}, 
        topK=top_k, 
        returnDistance=True,
        returnMetadata=True
    )
    print(json.dumps(response["vectors"], indent=2))

    print(f"\n✅ Found {len(response['vectors'])} matching segments:")
    results = []
    
    for hit in response['vectors']:
        result = {
            "distance": hit["distance"],
            "video_s3_uri": hit["metadata"]['video_s3_uri'],
            "start_time": hit["metadata"]["startSec"],
            "end_time": hit["metadata"]["endSec"]
        }
        results.append(result)
        
        print(f"  Score: {result['distance']:.4f} | "
              f"Video : {result['video_s3_uri']} | Time: {result['start_time']:.1f}s - {result['end_time']:.1f}s")
    
    return results

In [None]:
text_query = "a person wearing safety gear and welding with a forest in the background"

In [None]:
# Example text search
text_search_results = search_videos_by_text(text_query, top_k=3)

In [None]:
# View top result
top_text_result = text_search_results[0]
video_bucket, video_key = parse_s3_uri(top_text_result["video_s3_uri"])

# Generate presigned URL for the video
presigned_url = s3_client.generate_presigned_url(
    "get_object",
    Params={"Bucket": video_bucket, "Key": video_key},
    ExpiresIn=3600
)

In [None]:
# Set the video stream URL and the start time
video_url = presigned_url
start_time = top_text_result["start_time"]
print(f"\nVideo URL: {video_url}")
print(f"Start time: {start_time}")

# Play the video
play_video(video_url, start_time)

#### Query with image

In [None]:
# Image Query Search Function
def search_videos_by_image(image_path: str, top_k: int=5) -> list:
    """
    Search for videos that contain the given image.

    Args:
        image_path (str): The path to the image to search for.
        top_k (int): The number of videos to return.

    Returns:
        list: A list of video segments that match the query.
    """
    # Generate embedding for the image
    print(f"Generating embedding for query: '{image_path}'")
    query_embedding_data = create_image_embedding(image_path)
    query_embedding = query_embedding_data[0]["embedding"]

    # Search S3 Vector Index
    response = s3vectors_client.query_vectors(
        vectorBucketName=S3_VECTOR_BUCKET_NAME,
        indexName=S3_VECTOR_INDEX_NAME,
        queryVector={"float32": query_embedding}, 
        topK=top_k, 
        returnDistance=True,
        returnMetadata=True
    )
    print(json.dumps(response["vectors"], indent=2))

    print(f"\n✅ Found {len(response['vectors'])} matching segments:")
    results = []
    
    for hit in response['vectors']:
        result = {
            "distance": hit["distance"],
            "video_s3_uri": hit["metadata"]['video_s3_uri'],
            "start_time": hit["metadata"]["startSec"],
            "end_time": hit["metadata"]["endSec"]
        }
        results.append(result)
        
        print(f"  Score: {result['distance']:.4f} | "
              f"Video : {result['video_s3_uri']} | Time: {result['start_time']:.1f}s - {result['end_time']:.1f}s")


    return results

In [None]:
image_query = "assets/images/image.jpg"

display(Image(filename=image_query, width=200))

In [None]:
# Example image search
image_search_results = search_videos_by_image(image_path=image_query, top_k=3)

In [None]:
# View top result
top_image_result = image_search_results[0]
video_bucket, video_key = parse_s3_uri(top_image_result["video_s3_uri"])

# Generate presigned URL for the video
presigned_url = s3_client.generate_presigned_url(
    "get_object",
    Params={"Bucket": video_bucket, "Key": video_key},
    ExpiresIn=3600
)

In [None]:
# Set the video stream URL and the start time
video_url = presigned_url
start_time = top_image_result["start_time"]
print(f"\nVideo URL: {video_url}")
print(f"Start time: {start_time}")

# Play the video
play_video(video_url, start_time)

---

## Part 3: Using Pegasus on Bedrock

### Bedrock model access

Follow the [Bedrock model access documentation](https://docs.aws.amazon.com/bedrock/latest/userguide/model-access-modify.html) to enable access to TwelveLabs models on Bedrock. Make sure to enable access in the same region you are running this workshop.

In [None]:
PEGASUS_MODEL_ID_REGIONS = {
    "us-east-1": "us.twelvelabs.pegasus-1-2-v1:0",
    "us-west-2": "us.twelvelabs.pegasus-1-2-v1:0",
    "eu-west-1": "eu.twelvelabs.pegasus-1-2-v1:0",
    "ap-northeast-2": "apac.twelvelabs.pegasus-1-2-v1:0"
}

In [None]:
try:
    PEGASUS_MODEL_ID = PEGASUS_MODEL_ID_REGIONS[AWS_REGION]
    print(PEGASUS_MODEL_ID)
except KeyError:
    raise ValueError(f"Pegasus 1.2 is not supported for {AWS_REGION}")

### Select the video to analyze

In [None]:
s3_response = s3_client.list_objects_v2(Bucket=S3_BUCKET_NAME, Prefix=S3_VIDEOS_PATH)

# List all object keys
if 'Contents' in s3_response:
    object_keys = [obj['Key'] for obj in s3_response['Contents']]
    for key in object_keys:
        print(key)
    print(f"\nTotal objects found: {len(object_keys)}")
else:
    print("No objects found in the specified bucket and prefix.")

In [None]:
video_s3_key = "videos/Sparks_4096x2160_5994fps_SDR.mp4" # TODO: Replace with your video S3 key

# Validate video S3 key
if video_s3_key == "<YOUR_VIDEO_S3_KEY>" or video_s3_key == "":
    raise ValueError("Please replace <YOUR_VIDEO_S3_KEY> with your video S3 key")

### View the video

In [None]:
# Generate presigned URL for the video
presigned_url = s3_client.generate_presigned_url(
    "get_object",
    Params={"Bucket": S3_BUCKET_NAME, "Key": video_s3_key},
    ExpiresIn=3600
)

# Play the video
play_video(presigned_url, 0)

### Part 3a: Analyze with Pegasus on Bedrock

#### TwelveLabs Pegasus

Pegasus is a generative model for video-to-text generation. Pegasus analyzes multiple modalities to generate contextually relevant text based on the content of your videos.

***Key features***
- **Video-to-text generation**: Creates detailed textual descriptions based on video content
- **Extended processing capacity**: Processes videos up to 1 hour in length
- **Granular visual comprehension**: Analyzes objects, on-screen text, and numerical content
- **Temporal grounding**: Accurately identifies timestamps of specific events
- **Multimodal understanding**: Combines visual, audio, and textual information for comprehensive analysis

***Use cases***
- **Content summarization**: Generate concise summaries of video content
- **Detailed descriptions**: Create comprehensive textual descriptions of visual scenes
- **Timestamp identification**: Answer questions about when specific events occur in videos
- **Content analysis**: Extract key information from video content for further processing


#### Pegasus 1.2 on Bedrock

The TwelveLabs Pegasus 1.2 model provides comprehensive video understanding and analysis capabilities. It can analyze video content and generate textual descriptions, insights, and answers to questions about the video.

Use this information to make inference calls to TwelveLabs models with the [InvokeModel](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_InvokeModel.html), [InvokeModelWithResponseStream](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_InvokeModelWithResponseStream.html) (streaming) operations.

- Provider — TwelveLabs
- Categories — Video understanding, content analysis
- Model ID — `twelvelabs.pegasus-1-2-v1:0`
- Input modality — Video
- Output modality — Text
- Max video size — 1 hour long video (< 2GB file size)

**Resources:**
- [AWS Docs: TwelveLabs Pegasus 1.2](https://docs.aws.amazon.com/bedrock/latest/userguide/model-parameters-pegasus.html)
- [TwelveLabs Docs: Pegasus](https://docs.twelvelabs.io/v1.3/docs/concepts/models/pegasus)

In [None]:
prompt = "What is the video about?"

request_body = {
    "inputPrompt": prompt,
    "mediaSource": {
        "s3Location": {
            "uri": f"s3://{S3_BUCKET_NAME}/{video_s3_key}",
            "bucketOwner": aws_account_id
        }
    },
    "temperature": 0
}

response = bedrock_client.invoke_model(
    modelId=PEGASUS_MODEL_ID,
    body=json.dumps(request_body),
    contentType="application/json",
    accept="application/json"
)

# Parse the response and print the model outputs
response_body = json.loads(response.get("body").read())
print(response_body["message"])

### Part 3b: Pegasus streaming response

In [None]:
prompt = "What is the video about?"

request_body = {
    "inputPrompt": prompt,
    "mediaSource": {
        "s3Location": {
            "uri": f"s3://{S3_BUCKET_NAME}/{video_s3_key}",
            "bucketOwner": aws_account_id
        }
    },
    "temperature": 0
}

streaming_response = bedrock_client.invoke_model_with_response_stream(
    modelId=PEGASUS_MODEL_ID,
    body=json.dumps(request_body),
    contentType="application/json",
    accept="application/json"
)

# Extract and print the response text in real-time.
message = ""
for event in streaming_response["body"]:
    chunk = json.loads(event["chunk"]["bytes"])
    print(chunk["message"], end="")

print(message)

## Part 4: Video Analysis with Pegasus

Pegasus analyzes videos to generate text based on their content using a multimodal approach. This method analyzes the visuals, sounds, spoken words, and relationships between them. As a result, it provides a comprehensive understanding of your videos, capturing nuances that might be overlooked when using an unimodal interpretation.

The platform generates the following types of text:
- **Topics and hashtags:** Represent a swift breakdown of the essence of a video.
- **Summaries:** Encapsulate the key points of a video, presenting the most important information clearly and concisely.
- **Highlights:** List the key events in order. Unlike chapters, they spotlight primary topics.
- **Chapters:** A chapter in a video typically focuses on a particular topic or theme. The platform chronologically lists all the chapters in your video for a thorough content breakdown.
- **Open-ended text (your own prompt):** Custom outputs based on your prompts, including, but not limited to, tables of content, action items, memos, reports, marketing copy, and comprehensive analyses.

### Part 4a: Summaries, hashtags, and highlights

In [None]:
# Generate a summary of the video
prompt = "Summarize the video"

request_body = {
    "inputPrompt": prompt,
    "mediaSource": {
        "s3Location": {
            "uri": f"s3://{S3_BUCKET_NAME}/{video_s3_key}",
            "bucketOwner": aws_account_id
        }
    },
    "temperature": 0
}

streaming_response = bedrock_client.invoke_model_with_response_stream(
    modelId=PEGASUS_MODEL_ID,
    body=json.dumps(request_body),
    contentType="application/json",
    accept="application/json"
)

# Extract and print the response text in real-time.
message = ""
for event in streaming_response["body"]:
    chunk = json.loads(event["chunk"]["bytes"])
    print(chunk["message"], end="")

print(message)

In [None]:
# Generate relevant hashtags for the video
prompt = "Generate hashtags for the video"

request_body = {
    "inputPrompt": prompt,
    "mediaSource": {
        "s3Location": {
            "uri": f"s3://{S3_BUCKET_NAME}/{video_s3_key}",
            "bucketOwner": aws_account_id
        }
    },
    "temperature": 0
}

streaming_response = bedrock_client.invoke_model_with_response_stream(
    modelId=PEGASUS_MODEL_ID,
    body=json.dumps(request_body),
    contentType="application/json",
    accept="application/json"
)

# Extract and print the response text in real-time.
message = ""
for event in streaming_response["body"]:
    chunk = json.loads(event["chunk"]["bytes"])
    print(chunk["message"], end="")

print(message)

In [None]:
# Generate highlights of the video
prompt = "What are the highlighted moments of this video?"

request_body = {
    "inputPrompt": prompt,
    "mediaSource": {
        "s3Location": {
            "uri": f"s3://{S3_BUCKET_NAME}/{video_s3_key}",
            "bucketOwner": aws_account_id
        }
    },
    "temperature": 0
}

streaming_response = bedrock_client.invoke_model_with_response_stream(
    modelId=PEGASUS_MODEL_ID,
    body=json.dumps(request_body),
    contentType="application/json",
    accept="application/json"
)

# Extract and print the response text in real-time.
message = ""
for event in streaming_response["body"]:
    chunk = json.loads(event["chunk"]["bytes"])
    print(chunk["message"], end="")

print(message)

### Part 4b: Structured outputs

Structured outputs for Pegasus lets users specify the structured output format as a JSON schema. Structured outputs can be useful for building automated integrations to application workflows such as metadata enrichment.

In [None]:
# Using JSON Schema to generate structured output
prompt = """
Generate metadata for the video with the following fields:
- title: (string) The title of the video
- description: (string) The description of the video
- mood: (string) The mood of the video
- genre: (string) The genre of the video
"""

request_body = {
    "inputPrompt": prompt,
    "mediaSource": {
        "s3Location": {
            "uri": f"s3://{S3_BUCKET_NAME}/{video_s3_key}",
            "bucketOwner": aws_account_id
        }
    },
    "temperature": 0,
    "responseFormat": {
        "jsonSchema": {
            "type": "object",
            "properties": {
                "title": {
                    "type": "string"
                },
                "description": {
                    "type": "string"
                },
                "mood": {
                    "type": "string"
                },
                "genre": {
                    "type": "string"
                }
            },
            "required": ["title", "description", "mood", "genre"]
        }
    }
}

response = bedrock_client.invoke_model(
    modelId=PEGASUS_MODEL_ID,
    body=json.dumps(request_body),
    contentType="application/json",
    accept="application/json"
)

# Parse the response and print the model outputs
response_body = json.loads(response.get("body").read())
message_data = json.loads(response_body["message"])
print(json.dumps(message_data, indent=4))

---
## Cleanup


#### Delete S3 Vector Bucket and Index

In [None]:
response = s3vectors_client.list_indexes(
    vectorBucketName=S3_VECTOR_BUCKET_NAME
)

if 'indexes' in response:
    for index in response['indexes']:
        s3vectors_client.delete_index(
            vectorBucketName=S3_VECTOR_BUCKET_NAME,
            indexName=index['indexName']
        )

s3vectors_client.delete_vector_bucket(vectorBucketName=S3_VECTOR_BUCKET_NAME)

print(f"S3 Vector Bucket {S3_VECTOR_BUCKET_NAME} removed successfully.")

#### Empty S3 bucket

In [None]:
# List objects and prepare for deletion
response = s3_client.list_objects_v2(Bucket=S3_BUCKET_NAME)

# Empty S3 bucket
try:
    if 'Contents' in response:
        objects_to_delete = [{'Key': obj['Key']} for obj in response['Contents']]
        s3_client.delete_objects(
            Bucket=S3_BUCKET_NAME,
            Delete={'Objects': objects_to_delete}
        )
        print(f"✅ Bucket '{S3_BUCKET_NAME}' emptied successfully.")
    else:
        print(f"✅ Bucket '{S3_BUCKET_NAME}' is already empty.")
except Exception as e:
    print(f"❌ Error emptying bucket '{S3_BUCKET_NAME}': {e}")