# Video search - start embedding jobs

Nova Multimodal Embedding (MME) enables intelligent video search by representing video, text, and images within a unified semantic space. This allows users to search videos using natural language prompts or visual examples, and retrieve relevant moments with high accuracy.

![nova-mme](./statics/nova-mme-diagram.png)

You can upload videos either directly through the Video Understanding Tool UI or via the programming interface.

In this sample, we'll experiment with Nova Multimodal Embedding for video search. Instead of uploading videos through the UI, we'll use this notebook to upload multiple videos, which will then be available in the tool's UI for review.

You can also find sample notebooks in the same folder that demonstrate how to start video tasks for other workflows, such as frame-based and clip-based processing.

In this sample notebook, we will:
1. Download videos from public URLs
2. Upload them to the S3 data bucket
3. Start Nova MME embedding tasks for each video
4. Monitor task progress with heartbeat checking
5. Search embedding via the tool's search function
6. Cleanup

In [None]:
import boto3
import json
import uuid
import requests
import os
import time
from urllib.parse import urlparse
from pathlib import Path

## Setup AWS Clients and Configuration

Initialize AWS clients and get account information. The Video Understanding Tool stores data in an S3 bucket following the naming convention: `bedrock-mm-{account_id}-{region}`.

In [None]:
# Initialize AWS clients
session = boto3.session.Session()
sts = session.client("sts")
lambda_client = boto3.client("lambda")
s3_client = boto3.client("s3")

# Get account and region information
account_id = sts.get_caller_identity()["Account"]
region = session.region_name

print(f"Account ID: {account_id}")
print(f"Region: {region}")

In [None]:
# Configuration constants
S3_BUCKET = f"bedrock-mm-{account_id}-{region}"
S3_KEY_TASK_TEMPLATE = "tasks/{task_id}/upload/{file_name}"
LAMBDA_FUN_NAME_NOVA_MME_START_TASK = "bedrock-mm-nova-srv-start-task"
LAMBDA_FUN_NAME_NOVA_MME_GET_TASK = "bedrock-mm-nova-srv-get-video-task"
LAMBDA_FUN_NAME_NOVA_MME_SEARCH_VECTOR = "bedrock-mm-nova-srv-search-vector"
LAMBDA_FUN_NAME_NOVA_MME_DELETE_TASK = "bedrock-mm-nova-srv-delete-video-task"
NOVA_MME_MODEL_ID = "amazon.nova-2-multimodal-embeddings-v1:0"

In [None]:
# Create local download directory
DOWNLOAD_DIR = "./downloaded_videos"
Path(DOWNLOAD_DIR).mkdir(exist_ok=True)

## Define Public Video URLs

Add your public video URLs here. These should be direct links to video files (mp4, mov, avi, etc.) that are publicly accessible.

In [None]:
# List of public video URLs to process
video_urls = [
    {
        "url": "https://ws-assets-prod-iad-r-iad-ed304a55c2ca1aee.s3.us-east-1.amazonaws.com/8082573f-f39e-4e39-a48f-f3562cc6e597/aws-ads-magical-day-out.mp4",
        "name": "aws-ads-magical-day-out.mp4",
        "description": "Sample video - AWS Ads Magical Day Out"
    },
    {
        "url": "https://ws-assets-prod-iad-r-iad-ed304a55c2ca1aee.s3.us-east-1.amazonaws.com/8082573f-f39e-4e39-a48f-f3562cc6e597/aws-ads-rainy-day.mp4",
        "name": "aws-ads-rainy-day.mp4",
        "description": "Sample video - AWS Ads Rainy Day Out"
    },
    {
        "url": "https://ws-assets-prod-iad-r-iad-ed304a55c2ca1aee.s3.us-east-1.amazonaws.com/8082573f-f39e-4e39-a48f-f3562cc6e597/aws-ads-transform-genai.mp4",
        "name": "aws-ads-transform-genai.mp4",
        "description": "Sample video - AWS Ads Transform GenAI"
    },
    {
        "url": "https://ws-assets-prod-iad-r-iad-ed304a55c2ca1aee.s3.us-east-1.amazonaws.com/8082573f-f39e-4e39-a48f-f3562cc6e597/nova-reel-burger.mp4",
        "name": "nova-reel-burger.mp4",
        "description": "Sample video - Nova Reel generated burger Ads"
    },
    # Add more video URLs as needed
]

print(f"Found {len(video_urls)} videos to process")
for i, video in enumerate(video_urls, 1):
    print(f"{i}. {video['name']} - {video['description']}")

## Download Videos from Public URLs

Download each video from the public URLs to local storage before uploading to S3.

In [None]:
def download_videos(video_list, download_dir="/tmp"):
    """
    Download a list of videos from their URLs and add a 'local_path' key to each entry.

    Args:
        video_list (list): List of dicts, each containing 'url' and 'name'.
        download_dir (str): Local directory to save videos (default: /tmp).

    Returns:
        list: Updated list with 'local_path' added to each entry.
    """
    os.makedirs(download_dir, exist_ok=True)
    
    for video in video_list:
        url = video.get("url")
        filename = video.get("name")
        local_path = os.path.join(download_dir, filename)

        print(f"Downloading {filename}...")
        response = requests.get(url, stream=True)
        response.raise_for_status()

        with open(local_path, "wb") as f:
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:
                    f.write(chunk)

        video["local_path"] = local_path
        print(f"Saved to {local_path}")

    return video_list

downloaded_videos = download_videos(video_urls, DOWNLOAD_DIR)
print(json.dumps(downloaded_videos, indent=2))

## Upload Videos to S3 and Start Embedding Tasks

Upload each downloaded video to S3 and start a Nova MME embedding task for processing. Each task will be monitored with heartbeat checking until completion.

In [None]:
def get_task_status(task_id):
    """Get the current status of a task"""
    try:
        request = {"TaskId": task_id}
        response = lambda_client.invoke(
            FunctionName=LAMBDA_FUN_NAME_NOVA_MME_GET_TASK,
            InvocationType="RequestResponse",
            Payload=json.dumps(request),
        )
        
        response_payload = json.loads(response["Payload"].read())
        if response.get('StatusCode') == 200:
            # Parse the response body if it's a string
            if isinstance(response_payload.get('body'), str):
                body = json.loads(response_payload['body'])
            else:
                body = response_payload.get('body', response_payload)
            
            return body.get('Status')
        else:
            print(f"‚ùå Failed to get task status: {response_payload}")
            return 'error'
            
    except Exception as e:
        print(f"‚ùå Error getting task status for {task_id}: {str(e)}")
        return 'error'

def wait_for_task_completion(task_id, file_name, check_interval=5, max_wait_time=1800):
    """Wait for task to complete with heartbeat monitoring"""
    print(f"‚è≥ Monitoring task {task_id} for {file_name}...")
    start_time = time.time()
    
    while (time.time() - start_time) < max_wait_time:
        status = get_task_status(task_id)
        print(f"  üìã Status: {status}")
        
        if status == 'completed':
            print(f"‚úÖ Task {task_id} completed successfully!")
            return True
        elif status in ['failed', 'error']:
            print(f"‚ùå Task {task_id} failed with status: {status}")
            return False
        
        # Wait before next check
        time.sleep(check_interval)
    
    print(f"‚ö†Ô∏è Task {task_id} timed out after {max_wait_time} seconds")
    return False

In [None]:
def upload_and_start_task(video_info):
    """Upload video to S3 and start Nova MME embedding task"""
    try:
        # Generate unique task ID
        task_id = str(uuid.uuid4())
        file_name = video_info['name']
        local_path = video_info['local_path']
        
        # Construct S3 key
        s3_key = S3_KEY_TASK_TEMPLATE.format(task_id=task_id, file_name=file_name)
        
        print(f"üì§ Uploading {file_name} to S3...")
        s3_client.upload_file(local_path, S3_BUCKET, s3_key)
        print(f"‚úì Uploaded to s3://{S3_BUCKET}/{s3_key}")
        
        # Prepare Nova MME task request
        request = {
            "ModelId": NOVA_MME_MODEL_ID,
            "EmbedMode": "AUDIO_VIDEO_COMBINED",
            "DurationS": 5,
            "TaskId": task_id,
            "FileName": file_name,
            "TaskName": video_info.get('description',file_name),
            "Video": {
                "S3Object": {
                    "Bucket": S3_BUCKET,
                    "Key": s3_key
                }
            },
            "RequestBy": "workshop_user"
        }
        
        print(f"üöÄ Starting Nova MME embedding task for {file_name}...")
        response = lambda_client.invoke(
            FunctionName=LAMBDA_FUN_NAME_NOVA_MME_START_TASK,
            InvocationType="RequestResponse",
            Payload=json.dumps(request),
        )
        
        response_payload = json.loads(response["Payload"].read())
        if response.get('StatusCode') == 200:
            if "body" not in response_payload:
                print(response_payload)
            print(f"‚úì Successfully started task {task_id} for {file_name}")
            
            # Wait for task completion with heartbeat monitoring
            task_completed = wait_for_task_completion(task_id, file_name)
            
            return {
                'task_id': task_id,
                'file_name': file_name,
                's3_key': s3_key,
                'status': 'completed' if task_completed else 'failed',
                'response': response_payload,
                **video_info
            }
        else:
            print(f"‚ùå Failed to start task for {file_name}: {response_payload}")
            return None
        
    except Exception as e:
        print(f"‚ùå Error processing {video_info['name']}: {str(e)}")
        return None

# Process all downloaded videos
processing_tasks = []
for video_info in downloaded_videos:
    task_info = upload_and_start_task(video_info)
    if task_info:
        processing_tasks.append(task_info)
    print()

print(f"Processed {len(processing_tasks)} embedding tasks")
successful_tasks = [t for t in processing_tasks if t['status'] == 'completed']
failed_tasks = [t for t in processing_tasks if t['status'] == 'failed']

print(f"‚úÖ Successful: {len(successful_tasks)}")
print(f"‚ùå Failed: {len(failed_tasks)}")

for task in processing_tasks:
    print(f"  - Task ID: {task['task_id']} | File: {task['file_name']} | Status: {task['status']}")

## Search Embedding

You can also use the Video Understanding Tool UI to perform embedding searches using either text or image inputs. For programmatic access, the same API or Lambda function that powers the UI can be used for application integration.

The Video Understanding Tool stores embeddings in the Amazon S3 vector store, which you can query directly. Alternatively, you can search through the UI, as shown in the example below. The UI provides additional task-related metadata and generates a pre-signed S3 URL to display the corresponding video.

In [None]:
request = {
    "SearchText": "aws",
    "Source": "",
    "InputType": "text",
    "InputBytes": None,
    "InputFormat": "",
    "RequestBy": "workshop_user",
    "PageSize": 8,
    "FromIndex": 0,
}
response = lambda_client.invoke(
            FunctionName=LAMBDA_FUN_NAME_NOVA_MME_SEARCH_VECTOR,
            InvocationType="RequestResponse",
            Payload=json.dumps(request),
        )
response_payload = json.loads(response["Payload"].read())
print(json.dumps(response_payload,indent=2))

## Cleanup (Optional)

Clean up resources by deleting tasks and removing downloaded files. **Warning: This will permanently delete all task data including embeddings.**

In [None]:
def delete_nova_mme_task(task_id):
    request = {"TaskId": task_id}
    response = lambda_client.invoke(
                FunctionName=LAMBDA_FUN_NAME_NOVA_MME_DELETE_TASK,
                InvocationType="RequestResponse",
                Payload=json.dumps(request),
            )
    
    response_payload = json.loads(response["Payload"].read())
        
    if response.get('StatusCode') == 200:
            print(f"‚úì Successfully deleted task {task_id}")
    else:
        print(f"‚ùå Failed to delete task for task {task_id}: {response_payload}")
        return None

import shutil
import os

def delete_folder(folder_path):
    if os.path.exists(folder_path):
        shutil.rmtree(folder_path)
        print(f"Deleted folder: {folder_path}")
    else:
        print(f"Folder not found: {folder_path}")

# Delete all tasks
for task in processing_tasks:
     print(f"Deleting task for file: {task['file_name']}")
     delete_nova_mme_task(task["task_id"])

# Delete downloaded video on local disk
delete_folder(DOWNLOAD_DIR)