diff --git a/supporting-blog-content/twelvelabs-bedrock-marengo/.gitignore b/supporting-blog-content/twelvelabs-bedrock-marengo/.gitignore new file mode 100644 index 00000000..5298e35f --- /dev/null +++ b/supporting-blog-content/twelvelabs-bedrock-marengo/.gitignore @@ -0,0 +1,3 @@ +.env +data +.venv \ No newline at end of file diff --git a/supporting-blog-content/twelvelabs-bedrock-marengo/blog_post_marengo_elasticsearch_bedrock.ipynb b/supporting-blog-content/twelvelabs-bedrock-marengo/blog_post_marengo_elasticsearch_bedrock.ipynb new file mode 100644 index 00000000..c9c98b36 --- /dev/null +++ b/supporting-blog-content/twelvelabs-bedrock-marengo/blog_post_marengo_elasticsearch_bedrock.ipynb @@ -0,0 +1,1161 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "9b493efb", + "metadata": {}, + "source": [ + "## TwelveLabs Marengo Video Embedding Model + Bedrock + Elasticsearch" + ] + }, + { + "cell_type": "markdown", + "id": "e216d914", + "metadata": {}, + "source": [ + "In this video we'll create a small app to search video embeddings from TwelveLabs' [Marengo](https://docs.twelvelabs.io/docs/concepts/models/marengo) model to search trailers from recent summer box office hits. We'll use the [AWS Bedrock integration for TwelveLabs](https://docs.twelvelabs.io/docs/cloud-partner-integrations/amazon-bedrock) so that our data never has to be persisted outside of our own S3 buckets.\n", + "\n", + "![Search Result](./images/marengo1.jpg) " + ] + }, + { + "cell_type": "markdown", + "id": "25815b49", + "metadata": {}, + "source": [ + "To run this notebook.\n", + "\n", + "* You'll need an S3 bucket that can be written to by your AWS id.\n", + " \n", + "* You will need the host URL and an API key for your Elasticsearch, either [deployed locally](https://www.elastic.co/docs/deploy-manage/deploy/self-managed/local-development-installation-quickstart) or in [Elastic cloud](https://www.elastic.co/cloud/serverless). This notebook assumes you are running on Elasticearch serverless or 9+ but should be adapatable back to Elasticsearch 8.16 if you add a version to the ```pip install```\n", + "\n", + "* You'll need a ```.env``` file with the following content. Alternatively you can hard code in your keys and configurations below.\n", + "\n", + "\n", + "```bash\n", + "DATA_DIR = \"./data\"\n", + "AWS_ACCESS_KEY_ID = \"your_access_key_id\"\n", + "AWS_SECRET_ACCESS_KEY = \"your_secret_access_key\"\n", + "S3_BUCKET_NAME = \"your_bucket_name\"\n", + "\n", + "ELASTICSEARCH_API_KEY = \"your_elasticsearch_api_key\"\n", + "ELASTICSEARCH_ENDPOINT = \"your_elasticsearch_endpoint_including_port_number\"\n", + "```\n", + "\n", + "* Additionally, you'll need to enable the Marengo model in Bedrock for your account.\n", + "\n", + "* Note, if you are behind a VPN or running on a cloud hosted notebook (like Google Colab) you'll likely be blocked from grabbing source data with yt-dlp" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "527bfdfe", + "metadata": {}, + "outputs": [], + "source": [ + "! pip -qqq install yt-dlp boto3 ipython tqdm elasticsearch ipywidgets python-dotenv" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "b7e22c9d", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "True" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "#########\n", + "## Python Imports\n", + "#########\n", + "\n", + "import os\n", + "import yt_dlp\n", + "import os\n", + "import json\n", + "from pathlib import Path\n", + "import time\n", + "\n", + "import boto3, botocore\n", + "import json\n", + "import time\n", + "from IPython.display import clear_output, HTML, display, Image\n", + "import tqdm\n", + "import copy\n", + "import uuid\n", + "from elasticsearch import Elasticsearch\n", + "from elasticsearch.helpers import bulk\n", + "from time import sleep\n", + "from dotenv import load_dotenv\n", + "\n", + "load_dotenv()" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "9ad20274", + "metadata": {}, + "outputs": [], + "source": [ + "#########\n", + "## Configuration Details\n", + "#########\n", + "\n", + "\n", + "## Source Data ... public trailers for the top five grossing US box office movies of 2025\n", + "videos = [\n", + " \"https://www.youtube.com/watch?v=VWqJifMMgZE\", ## Lilo and Stitch 2025\n", + " \"https://www.youtube.com/watch?v=Ox8ZLF6cGM0\", ## Superman 2025 trailer\n", + " \"https://www.youtube.com/watch?v=jan5CFWs9ic\", ## Jurassic World Rebirth\n", + " \"https://www.youtube.com/watch?v=qpoBjOg5RHU\", ## Fantastic Four: First Steps\n", + " \"https://www.youtube.com/watch?v=22w7z_lT6YM\", ## How to Train Your Dragon 2025\n", + "]\n", + "\n", + "## Local file system location for downloads\n", + "DATA_PATH = os.getenv(\"DATA_DIR\", \"./data\")\n", + "\n", + "## AWS Configuration\n", + "AWS_REGION = \"us-east-1\"\n", + "AWS_ACCESS_KEY_ID = os.getenv(\"AWS_ACCESS_KEY_ID\", \"your_access_key_id\")\n", + "AWS_SECRET_ACCESS_KEY = os.getenv(\"AWS_SECRET_ACCESS_KEY\", \"your_secret_access_key\")\n", + "\n", + "\n", + "# S3 Configuration\n", + "# S3_BUCKET_NAME = \"\" # TODO: Replace with your S3 bucket name\n", + "S3_BUCKET_NAME = os.getenv(\"S3_BUCKET_NAME\", \"your_s3_bucket_name\")\n", + "S3_VIDEOS_PATH = \"videos\"\n", + "S3_IMAGES_PATH = \"images\"\n", + "S3_EMBEDDINGS_PATH = \"embeddings\"\n", + "MARENGO_MODEL_ID = \"twelvelabs.marengo-embed-2-7-v1:0\"\n", + "TEXT_EMBEDDING_MODEL_ID = \"us.twelvelabs.marengo-embed-2-7-v1:0\"\n", + "\n", + "\n", + "ELASTICSEARCH_API_KEY = os.getenv(\"ELASTICSEARCH_API_KEY\", \"your_elasticsearch_api_key\")\n", + "ELASTICSEARCH_ENDPOINT = os.getenv(\n", + " \"ELASTICSEARCH_ENDPOINT\", \"your_elasticsearch_endpoint_including_port_number\"\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "28c65352", + "metadata": {}, + "outputs": [], + "source": [ + "#########\n", + "## Data Class\n", + "#########\n", + "\n", + "\n", + "class VideoIntelligence:\n", + " def __init__(self, url, platform, video_id):\n", + "\n", + " self.url = url\n", + " self.platform = platform\n", + " self.video_id = video_id\n", + " self.video_string = f\"{self.platform}_{self.video_id}\"\n", + " self.base_path = f\"{DATA_PATH}/videos/{self.video_string}\"\n", + "\n", + " self.images = []\n", + " self.video_path = None\n", + " self.metadata_file = None\n", + " self.s3_key = None\n", + "\n", + " self.metadata = None\n", + " self.title = None\n", + " self.description = None\n", + "\n", + " self.enmbedings_list = None\n", + "\n", + " def get_images(self):\n", + " return self.images\n", + "\n", + " def set_images(self, images):\n", + " self.images = images\n", + "\n", + " def add_image(self, image):\n", + " self.images.append(image)\n", + "\n", + " def get_video_string(self):\n", + " return self.video_string\n", + "\n", + " def get_url(self):\n", + " return self.url\n", + "\n", + " def get_platform(self):\n", + " return self.platform\n", + "\n", + " def get_video_id(self):\n", + " return self.video_id\n", + "\n", + " def get_base_path(self):\n", + " return self.base_path\n", + "\n", + " def get_video_path(self):\n", + " return self.video_path\n", + "\n", + " def set_video_path(self, video_path):\n", + " self.video_path = video_path\n", + "\n", + " def get_metadata_file(self):\n", + " return self.metadata_file\n", + "\n", + " def set_metadata_file(self, metadata_file):\n", + " self.metadata_file = metadata_file\n", + "\n", + " def get_metadata(self):\n", + " return self.metadata\n", + "\n", + " def set_metadata(self, metadata):\n", + " self.metadata = metadata\n", + " self.title = metadata.get(\"title\", \"\")\n", + " self.description = metadata.get(\"description\", \"\")\n", + "\n", + " def get_title(self):\n", + " return self.title\n", + "\n", + " def get_description(self):\n", + " return self.description\n", + "\n", + " def set_title(self, title):\n", + " self.title = title\n", + "\n", + " def set_description(self, description):\n", + " self.description = description\n", + "\n", + " def set_s3_key(self, s3_key):\n", + " self.s3_key = s3_key\n", + "\n", + " def get_s3_key(self):\n", + " return self.s3_key\n", + "\n", + " def set_embeddings_list(self, embeddings_list):\n", + " self.embeddings_list = embeddings_list\n", + "\n", + " def get_embeddings_list(self):\n", + " return self.embeddings_list\n", + "\n", + " def to_json(self):\n", + " return self.__dict__\n", + "\n", + " def get_video_object(self):\n", + " return {\n", + " \"url\": self.url,\n", + " \"platform\": self.platform,\n", + " \"video_id\": self.video_id,\n", + " \"title\": self.title,\n", + " }" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "78d25cf1", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Video already exists. Skipping video download.\n", + "Metadata already exists. Loading metadata from file...\n", + "Video already exists. Skipping video download.\n", + "Metadata already exists. Loading metadata from file...\n", + "Video already exists. Skipping video download.\n", + "Metadata already exists. Loading metadata from file...\n", + "Video already exists. Skipping video download.\n", + "Metadata already exists. Loading metadata from file...\n", + "Video already exists. Skipping video download.\n", + "Metadata already exists. Loading metadata from file...\n" + ] + } + ], + "source": [ + "#########\n", + "## Retrieve Videos and put them on local file system\n", + "#########\n", + "\n", + "\n", + "def get_video(video: VideoIntelligence):\n", + "\n", + " metadata = {}\n", + " # NOTE: this also creates the video ID directory since we have parents=True\n", + " # Create base path directory\n", + " base_directory = Path(video.get_base_path())\n", + " base_directory.mkdir(parents=True, exist_ok=True)\n", + "\n", + " video_path = video.get_base_path() + f\"/{video.get_video_string()}.mp4\"\n", + " metadata_path = video.get_base_path() + \"/metadata.json\"\n", + "\n", + " ydl_opts = {\n", + " \"format\": \"bestvideo+bestaudio/best\",\n", + " \"outtmpl\": video_path,\n", + " \"merge_output_format\": \"mp4\",\n", + " }\n", + "\n", + " # Download video if it doesn't exist\n", + " if not os.path.exists(video_path):\n", + " print(\"Downloading video...\")\n", + " with yt_dlp.YoutubeDL(ydl_opts) as ydl:\n", + " metadata = ydl.extract_info(video.url, download=False)\n", + " ydl.download([video.url])\n", + " with open(metadata_path, \"w\") as f:\n", + " json.dump(metadata, f)\n", + " else:\n", + " print(\"Video already exists. Skipping video download.\")\n", + "\n", + " # Download metadata if it doesn't exist\n", + " if not os.path.exists(metadata_path) and metadata == {}:\n", + " print(\"Downloading metadata...\")\n", + " with yt_dlp.YoutubeDL(ydl_opts) as ydl:\n", + " metadata = ydl.extract_info(video.url, download=False)\n", + " if not os.path.exists(metadata_path) and metadata == {}:\n", + " print(\"Downloading metadata...\")\n", + " with yt_dlp.YoutubeDL(ydl_opts) as ydl:\n", + " metadata = ydl.extract_info(video.url, download=False)\n", + " with open(metadata_path, \"w\") as f:\n", + " json.dump(metadata, f)\n", + " else:\n", + " print(\"Metadata already exists. Loading metadata from file...\")\n", + " metadata = json.load(open(metadata_path, \"r\"))\n", + "\n", + " video.set_metadata(metadata)\n", + " video.set_video_path(video_path)\n", + "\n", + "\n", + "def get_video_with_retries(video, max_retries=3):\n", + "\n", + " retry_count = 0\n", + " last_exception = None\n", + "\n", + " while retry_count < max_retries:\n", + " try:\n", + " get_video(video)\n", + " print(f\"Successfully processed {video.get_video_id()}\")\n", + " return True # Success\n", + " except Exception as e:\n", + " retry_count += 1\n", + " last_exception = e\n", + " if retry_count < max_retries:\n", + " print(\n", + " f\"Attempt {retry_count} failed for {video.get_video_id()}: {e}. Retrying...\"\n", + " )\n", + " # Wait a short time before retrying to allow for temporary issues to resolve\n", + " time.sleep(2)\n", + " else:\n", + " print(\n", + " f\"All {max_retries} attempts failed for {video.get_video_id()}. Last error: {e}\"\n", + " )\n", + "\n", + " # If we reached here, all retries failed\n", + " if last_exception:\n", + " raise last_exception\n", + " return False\n", + "\n", + "\n", + "video_objects = []\n", + "\n", + "for video_str in videos:\n", + " if \"youtube.com\" in video_str:\n", + " platform = \"youtube\"\n", + " video_id = video_str.split(\"v=\")[1]\n", + " video_objects.append(VideoIntelligence(video_str, platform, video_id))\n", + "\n", + "for video_object in video_objects:\n", + " get_video(video_object)" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "446cc5f8", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Titan's response:\n", + "Here are the top 10 highest-grossing movies of all time, according to Box Office Mojo:\n", + "\n", + "1. Avatar (2009) - $2.84 billion\n", + "2. Avengers: Endgame (2019) - $2.79 billion\n", + "3. Titanic (1997) - $2.19 billion\n", + "4. Star Wars: The Force Awakens (2015) - $2.07 billion\n", + "5. Avengers: Infinity War (2018) - $2.05 billion\n", + "6. Spider-Man: No Way Home (2021) - $1.92 billion\n", + "7. Jurassic World (2015) - $1.67 billion\n", + "8. The Lion King (2019) - $1.66 billion\n", + "9. The Avengers (2012) - $1.52 billion\n", + "10. Furious 7 (2015) - $1.51 billion\n", + "\n", + "It's worth noting that these numbers are adjusted for inflation and are based on worldwide box office receipts.\n" + ] + } + ], + "source": [ + "#########\n", + "## Setup Bedrock, test connection\n", + "#########\n", + "\n", + "# Initialize AWS session\n", + "session = boto3.session.Session(\n", + " aws_access_key_id=AWS_ACCESS_KEY_ID,\n", + " aws_secret_access_key=AWS_SECRET_ACCESS_KEY,\n", + " region_name=AWS_REGION,\n", + ")\n", + "\n", + "# Initialize AWS clients\n", + "bedrock_client = session.client(\"bedrock-runtime\", region_name=AWS_REGION)\n", + "modelId = \"amazon.titan-text-premier-v1:0\"\n", + "\n", + "\n", + "def get_bedrock_completion(prompt, max_tokens=500, temperature=0.7):\n", + " try:\n", + " body = json.dumps(\n", + " {\n", + " \"inputText\": prompt,\n", + " \"textGenerationConfig\": {\n", + " \"maxTokenCount\": max_tokens,\n", + " \"temperature\": temperature,\n", + " \"topP\": 0.9,\n", + " },\n", + " }\n", + " )\n", + "\n", + " response = bedrock_client.invoke_model(modelId=modelId, body=body)\n", + "\n", + " response_body = json.loads(response[\"body\"].read())\n", + " return response_body.get(\"results\", [{}])[0].get(\"outputText\", \"\")\n", + " except Exception as e:\n", + " print(f\"Error getting completion: {e}\")\n", + " return str(e)\n", + "\n", + "\n", + "# Test with a simple prompt\n", + "test_prompt = \"Hello, what are the biggest blockbuster movies of all time?\"\n", + "try:\n", + " completion = get_bedrock_completion(test_prompt)\n", + " print(\"Titan's response:\")\n", + " print(completion)\n", + "except Exception as e:\n", + " print(\"Bedrock API call failed:\")\n", + " print(f\"Error type: {type(e).__name__}\")\n", + " print(f\"Error message: {str(e)}\")" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "eed56bbb", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "AWS Account ID: REDACTED\n", + "✅ Successfully connected to S3 bucket: REDACTED\n" + ] + } + ], + "source": [ + "#########\n", + "## Validate S3 Configuration\n", + "#########\n", + "\n", + "aws_account_id = session.client(\"sts\").get_caller_identity()[\"Account\"]\n", + "print(f\"AWS Account ID: {aws_account_id}\")\n", + "s3_client = session.client(\"s3\")\n", + "\n", + "# Verify bucket access\n", + "try:\n", + " s3_client.head_bucket(Bucket=S3_BUCKET_NAME)\n", + " print(f\"✅ Successfully connected to S3 bucket: {S3_BUCKET_NAME}\")\n", + "except Exception as e:\n", + " print(f\"❌ Error accessing S3 bucket: {e}\")\n", + " print(\"Please ensure the bucket exists and you have proper permissions.\")" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "25ac6c1c", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Uploading youtube_VWqJifMMgZE to S3...\n", + "Successfully uploaded youtube_VWqJifMMgZE to S3\n", + "Uploading youtube_Ox8ZLF6cGM0 to S3...\n", + "Successfully uploaded youtube_Ox8ZLF6cGM0 to S3\n", + "Uploading youtube_jan5CFWs9ic to S3...\n", + "Successfully uploaded youtube_jan5CFWs9ic to S3\n", + "Uploading youtube_qpoBjOg5RHU to S3...\n", + "Successfully uploaded youtube_qpoBjOg5RHU to S3\n", + "Uploading youtube_22w7z_lT6YM to S3...\n", + "Successfully uploaded youtube_22w7z_lT6YM to S3\n" + ] + } + ], + "source": [ + "#########\n", + "## Upload videos to S3, and make note of where we put them in data object\n", + "#########\n", + "\n", + "for video_object in video_objects:\n", + " # Get the video file path\n", + " video_path = video_object.get_video_path()\n", + "\n", + " # Skip if video path is not set\n", + " if not video_path:\n", + " print(f\"Skipping {video_object.get_video_string()} - No video path set\")\n", + " continue\n", + "\n", + " # Define S3 destination key - organize by platform and video ID\n", + " # put this information in our data object for later\n", + " s3_key = video_object.get_s3_key()\n", + " if not s3_key:\n", + " s3_key = f\"{S3_VIDEOS_PATH}/{video_object.get_platform()}/{video_object.get_video_id()}/{os.path.basename(video_path)}\"\n", + " video_object.set_s3_key(s3_key)\n", + "\n", + " try:\n", + " # Check if file already exists in S3\n", + " try:\n", + " s3_client.head_object(Bucket=S3_BUCKET_NAME, Key=s3_key)\n", + " print(\n", + " f\"Video {video_object.get_video_string()} already exists in S3. Skipping upload.\"\n", + " )\n", + " continue\n", + " except botocore.exceptions.ClientError as e:\n", + " if e.response[\"Error\"][\"Code\"] == \"404\":\n", + " # File doesn't exist in S3, proceed with upload\n", + " pass\n", + " else:\n", + " # Some other error occurred\n", + " raise e\n", + "\n", + " # Upload the video to S3\n", + " print(f\"Uploading {video_object.get_video_string()} to S3...\")\n", + " s3_client.upload_file(video_path, S3_BUCKET_NAME, s3_key)\n", + " print(f\"Successfully uploaded {video_object.get_video_string()} to S3\")\n", + "\n", + " except Exception as e:\n", + " print(f\"Error uploading {video_object.get_video_string()} to S3: {str(e)}\")" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "746eb422", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Processing videos: 0%| | 0/5 [00:00 list:\n", + "\n", + " # Wait until task completes\n", + " status = None\n", + " while status not in [\"Completed\", \"Failed\", \"Expired\"]:\n", + " response = bedrock_client.get_async_invoke(invocationArn=invocation_arn)\n", + " status = response[\"status\"]\n", + " if verbose:\n", + " clear_output(wait=True)\n", + " tqdm.tqdm.write(f\"Embedding task status: {status}\")\n", + " time.sleep(5)\n", + "\n", + " if status != \"Completed\":\n", + " raise Exception(f\"Embedding task failed with status: {status}\")\n", + "\n", + " # Retrieve the output from S3\n", + " response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=s3_prefix)\n", + "\n", + " for obj in response.get(\"Contents\", []):\n", + " if obj[\"Key\"].endswith(\"output.json\"):\n", + " output_key = obj[\"Key\"]\n", + " obj = s3_client.get_object(Bucket=s3_bucket, Key=output_key)\n", + " content = obj[\"Body\"].read().decode(\"utf-8\")\n", + " data = json.loads(content).get(\"data\", [])\n", + " return data\n", + "\n", + " raise Exception(\"No output.json found in S3 prefix\")\n", + "\n", + "\n", + "# Create video embedding\n", + "def create_video_embedding(video_s3_uri: str, video_id: str) -> list:\n", + "\n", + " unique_id = video_id\n", + " s3_output_prefix = f\"{S3_EMBEDDINGS_PATH}/{S3_VIDEOS_PATH}/{unique_id}\"\n", + "\n", + " response = bedrock_client.start_async_invoke(\n", + " modelId=MARENGO_MODEL_ID,\n", + " modelInput={\n", + " \"inputType\": \"video\",\n", + " \"mediaSource\": {\n", + " \"s3Location\": {\"uri\": video_s3_uri, \"bucketOwner\": aws_account_id}\n", + " },\n", + " },\n", + " outputDataConfig={\n", + " \"s3OutputDataConfig\": {\"s3Uri\": f\"s3://{S3_BUCKET_NAME}/{s3_output_prefix}\"}\n", + " },\n", + " )\n", + "\n", + " invocation_arn = response[\"invocationArn\"]\n", + " print(f\"Video embedding task started: {invocation_arn}\")\n", + "\n", + " # Wait for completion and get results\n", + " try:\n", + " embedding_data = wait_for_embedding_output(\n", + " S3_BUCKET_NAME, s3_output_prefix, invocation_arn\n", + " )\n", + " except Exception as e:\n", + " print(f\"Error waiting for embedding output: {e}\")\n", + " return None\n", + "\n", + " return embedding_data\n", + "\n", + "\n", + "def check_existing_embedding(video_id: str) -> bool:\n", + "\n", + " s3_output_prefix = f\"{S3_EMBEDDINGS_PATH}/{S3_VIDEOS_PATH}/{video_id}\"\n", + " print(s3_output_prefix)\n", + "\n", + " try:\n", + " # Check if any files exist at this prefix\n", + " response = s3_client.list_objects_v2(\n", + " Bucket=S3_BUCKET_NAME, Prefix=s3_output_prefix\n", + " )\n", + "\n", + " if \"Contents\" in response and any(\n", + " obj[\"Key\"].endswith(\"output.json\") for obj in response.get(\"Contents\", [])\n", + " ):\n", + " print(\n", + " f\"Embedding {video_object.get_video_string()} already has an embedding. Skipping embedding creation.\"\n", + " )\n", + " # Find the output.json file\n", + " for obj in response.get(\"Contents\", []):\n", + " if obj[\"Key\"].endswith(\"output.json\"):\n", + " output_key = obj[\"Key\"]\n", + " # Get the object from S3\n", + " obj = s3_client.get_object(Bucket=S3_BUCKET_NAME, Key=output_key)\n", + " # Read the content and parse as JSON\n", + " content = obj[\"Body\"].read().decode(\"utf-8\")\n", + " embedding_data = json.loads(content).get(\"data\", [])\n", + " return embedding_data\n", + " else:\n", + " print(f\"No existing embedding found for {video_object.get_video_string()}.\")\n", + " return None\n", + " except botocore.exceptions.ClientError as e:\n", + " if e.response[\"Error\"][\"Code\"] == \"404\":\n", + " # File doesn't exist in S3, proceed with upload\n", + " print(\"Did not find embedding in s3\")\n", + " return None\n", + " else:\n", + " # Some other error occurred\n", + " raise e\n", + "\n", + "\n", + "def create_s3_uri(bucket_name: str, key: str) -> str:\n", + " video_uri = f\"s3://{bucket_name}/{key}\"\n", + " return video_uri\n", + "\n", + "\n", + "## Generate the embeddings one at a time, use S3 as cache to prevent double embedding generations\n", + "for video_object in tqdm.tqdm(video_objects, desc=\"Processing videos\"):\n", + " s3_key = video_object.get_s3_key()\n", + " video_id = video_object.get_video_id()\n", + " video_uri = create_s3_uri(S3_BUCKET_NAME, s3_key)\n", + "\n", + " retrieved_embeddings = check_existing_embedding(video_id)\n", + " if retrieved_embeddings:\n", + " video_object.set_embeddings_list(retrieved_embeddings)\n", + " else:\n", + " video_embedding_data = create_video_embedding(video_uri, video_id)\n", + " video_object.set_embeddings_list(video_embedding_data)" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "bff90833", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "0\n", + "\tembedding: len 1024\n", + "\tembeddingOption: visual-text\n", + "\tstartSec: 0.0\n", + "\tendSec: 6.199999809265137\n", + "1\n", + "\tembedding: len 1024\n", + "\tembeddingOption: visual-text\n", + "\tstartSec: 6.199999809265137\n", + "\tendSec: 10.399999618530273\n", + "2\n", + "\tembedding: len 1024\n", + "\tembeddingOption: visual-text\n", + "\tstartSec: 10.399999618530273\n", + "\tendSec: 17.299999237060547\n" + ] + } + ], + "source": [ + "video_embedding_data = video_objects[0].get_embeddings_list()\n", + "\n", + "## Preview Print\n", + "for i, embedding in enumerate(video_embedding_data[:3]):\n", + " print(f\"{i}\")\n", + " for key in embedding:\n", + " if \"embedding\" == key:\n", + " print(f\"\\t{key}: len {len(embedding[key])}\")\n", + " else:\n", + " print(f\"\\t{key}: {embedding[key]}\")" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "1f0e00aa", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "✅ Successfully connected to Elasticsearch: serverless\n" + ] + } + ], + "source": [ + "es = Elasticsearch(hosts=[ELASTICSEARCH_ENDPOINT], api_key=ELASTICSEARCH_API_KEY)\n", + "\n", + "es = Elasticsearch(hosts=[ELASTICSEARCH_ENDPOINT], api_key=ELASTICSEARCH_API_KEY)\n", + "\n", + "es_detail = es.info().body\n", + "if \"version\" in es_detail:\n", + " version_info = es_detail[\"version\"]\n", + " build_flavor = version_info.get(\"build_flavor\", \"N/A\")\n", + " version_number = version_info.get(\"number\", \"N/A\")\n", + "\n", + " identifier = build_flavor if build_flavor != \"N/A\" else version_number\n", + " print(f\"✅ Successfully connected to Elasticsearch: {identifier}\")" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "0651bd8e", + "metadata": {}, + "outputs": [], + "source": [ + "docs = []\n", + "\n", + "for video_object in video_objects:\n", + "\n", + " persist_object = video_object.get_video_object()\n", + " embeddings = video_object.get_embeddings_list()\n", + "\n", + " for embedding in embeddings:\n", + " if embedding[\"embeddingOption\"] == \"visual-image\":\n", + "\n", + " # Create a copy of the persist object and add embedding details\n", + " doc = copy.deepcopy(persist_object)\n", + " doc[\"embedding\"] = embedding[\"embedding\"]\n", + " doc[\"start_sec\"] = embedding[\"startSec\"]\n", + " doc[\"end_sec\"] = embedding[\"endSec\"]\n", + "\n", + " docs.append(doc)\n", + "\n", + " ### Documents should be of format\n", + " # {\n", + " # \"url\": \"https://www.youtube.com/watch?v=VWqJifMMgZE\",\n", + " # \"platform\": \"youtube\",\n", + " # \"video_id\": \"VWqJifMMgZE\",\n", + " # \"title\": \"Lilo & Stitch | Official Trailer | In Theaters May 23\",\n", + " # \"embedding\": [\n", + " # 0.049530029296875,\n", + " # -0.0153350830078125,\n", + " # 0.04205322265625,\n", + " # ... <1024 dimensions total>\n", + " # 417327880859375,\n", + " # 0.01041412353515625\n", + " # ],\n", + " # \"start_sec\": 0.0,\n", + " # \"end_sec\": 6.199999809265137\n", + " # }" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "15bb091f", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Deleting Index 'twelvelabs-movie-trailer-flat' and then sleeping for 2 seconds\n", + "Index 'twelvelabs-movie-trailer-flat' created successfully\n", + "Deleting Index 'twelvelabs-movie-trailer-hnsw' and then sleeping for 2 seconds\n", + "Index 'twelvelabs-movie-trailer-hnsw' created successfully\n", + "Deleting Index 'twelvelabs-movie-trailer-int8_hnsw' and then sleeping for 2 seconds\n", + "Index 'twelvelabs-movie-trailer-int8_hnsw' created successfully\n", + "Deleting Index 'twelvelabs-movie-trailer-bbq_hnsw' and then sleeping for 2 seconds\n", + "Index 'twelvelabs-movie-trailer-bbq_hnsw' created successfully\n", + "Deleting Index 'twelvelabs-movie-trailer-bbq_flat' and then sleeping for 2 seconds\n", + "Index 'twelvelabs-movie-trailer-bbq_flat' created successfully\n", + "Indexing 155 documents into twelvelabs-movie-trailer-flat...\n", + "\tSuccessfully indexed 155 documents into twelvelabs-movie-trailer-flat\n", + "Completed indexing documents into twelvelabs-movie-trailer-flat\n", + "Indexing 155 documents into twelvelabs-movie-trailer-hnsw...\n", + "\tSuccessfully indexed 155 documents into twelvelabs-movie-trailer-hnsw\n", + "Completed indexing documents into twelvelabs-movie-trailer-hnsw\n", + "Indexing 155 documents into twelvelabs-movie-trailer-int8_hnsw...\n", + "\tSuccessfully indexed 155 documents into twelvelabs-movie-trailer-int8_hnsw\n", + "Completed indexing documents into twelvelabs-movie-trailer-int8_hnsw\n", + "Indexing 155 documents into twelvelabs-movie-trailer-bbq_hnsw...\n", + "\tSuccessfully indexed 155 documents into twelvelabs-movie-trailer-bbq_hnsw\n", + "Completed indexing documents into twelvelabs-movie-trailer-bbq_hnsw\n", + "Indexing 155 documents into twelvelabs-movie-trailer-bbq_flat...\n", + "\tSuccessfully indexed 155 documents into twelvelabs-movie-trailer-bbq_flat\n", + "Completed indexing documents into twelvelabs-movie-trailer-bbq_flat\n" + ] + } + ], + "source": [ + "index_varieties = [\"flat\", \"hnsw\", \"int8_hnsw\", \"bbq_hnsw\", \"bbq_flat\"]\n", + "\n", + "for index_variety in index_varieties:\n", + " # Create an index for the movie trailer embeddings\n", + " # Define mapping with proper settings for dense vector search\n", + " index_name = f\"twelvelabs-movie-trailer-{index_variety}\"\n", + " mappings = {\n", + " \"properties\": {\n", + " \"url\": {\"type\": \"keyword\"},\n", + " \"platform\": {\"type\": \"keyword\"},\n", + " \"video_id\": {\"type\": \"keyword\"},\n", + " \"title\": {\"type\": \"text\", \"analyzer\": \"standard\"},\n", + " \"embedding\": {\n", + " \"type\": \"dense_vector\",\n", + " \"dims\": 1024,\n", + " \"similarity\": \"cosine\",\n", + " \"index_options\": {\"type\": index_variety},\n", + " },\n", + " \"start_sec\": {\"type\": \"float\"},\n", + " \"end_sec\": {\"type\": \"float\"},\n", + " }\n", + " }\n", + "\n", + " # Check if index already exists\n", + " if es.indices.exists(index=index_name):\n", + " print(f\"Deleting Index '{index_name}' and then sleeping for 2 seconds\")\n", + " es.indices.delete(index=index_name)\n", + " sleep(2)\n", + " # Create the index\n", + " es.indices.create(index=index_name, mappings=mappings)\n", + " print(f\"Index '{index_name}' created successfully\")\n", + "\n", + "for index_variety in index_varieties:\n", + " # Create an index for the movie trailer embeddings\n", + " # Define mapping with proper settings for dense vector search\n", + " index_name = f\"twelvelabs-movie-trailer-{index_variety}\"\n", + "\n", + " # Bulk insert docs into Elasticsearch index\n", + " print(f\"Indexing {len(docs)} documents into {index_name}...\")\n", + "\n", + " # Create actions for bulk API\n", + " actions = []\n", + " for doc in docs:\n", + " actions.append({\"_index\": index_name, \"_source\": doc})\n", + "\n", + " # Perform bulk indexing with error handling\n", + " try:\n", + " success, failed = bulk(\n", + " es,\n", + " actions,\n", + " chunk_size=100,\n", + " max_retries=3,\n", + " initial_backoff=2,\n", + " max_backoff=60,\n", + " )\n", + " print(f\"\\tSuccessfully indexed {success} documents into {index_name}\")\n", + " if failed:\n", + " print(f\"\\tFailed to index {len(failed)} documents\")\n", + " except Exception as e:\n", + " print(f\"Error during bulk indexing: {e}\")\n", + "\n", + " print(f\"Completed indexing documents into {index_name}\")" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "1e60c1d6", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{'took': 8, 'timed_out': False, '_shards': {'total': 5, 'successful': 5, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 10, 'relation': 'eq'}, 'max_score': 0.6417193, 'hits': [{'_index': 'twelvelabs-movie-trailer-flat', '_id': 'kjVNPpkBpEb67Rey0h3W', '_score': 0.6417193, 'fields': {'title': ['Jurassic World Rebirth | Official Trailer'], 'start_sec': [134.5], 'video_id': ['jan5CFWs9ic']}}, {'_index': 'twelvelabs-movie-trailer-flat', '_id': 'wDVNPpkBpEb67Rey1B3x', '_score': 0.6409597, 'fields': {'title': ['How To Train Your Dragon | Official Trailer'], 'start_sec': [59.0], 'video_id': ['22w7z_lT6YM']}}, {'_index': 'twelvelabs-movie-trailer-flat', '_id': 'uzVNPpkBpEb67Rey1B3x', '_score': 0.64061135, 'fields': {'title': ['How To Train Your Dragon | Official Trailer'], 'start_sec': [34.6], 'video_id': ['22w7z_lT6YM']}}, {'_index': 'twelvelabs-movie-trailer-flat', '_id': 'vjVNPpkBpEb67Rey1B3x', '_score': 0.63972485, 'fields': {'title': ['How To Train Your Dragon | Official Trailer'], 'start_sec': [50.399998], 'video_id': ['22w7z_lT6YM']}}, {'_index': 'twelvelabs-movie-trailer-flat', '_id': 'vTVNPpkBpEb67Rey1B3x', '_score': 0.6388538, 'fields': {'title': ['How To Train Your Dragon | Official Trailer'], 'start_sec': [46.3], 'video_id': ['22w7z_lT6YM']}}, {'_index': 'twelvelabs-movie-trailer-flat', '_id': 'ezVNPpkBpEb67Rey0h3W', '_score': 0.6381582, 'fields': {'title': ['Jurassic World Rebirth | Official Trailer'], 'start_sec': [18.8], 'video_id': ['jan5CFWs9ic']}}, {'_index': 'twelvelabs-movie-trailer-flat', '_id': 'vzVNPpkBpEb67Rey1B3x', '_score': 0.63632464, 'fields': {'title': ['How To Train Your Dragon | Official Trailer'], 'start_sec': [54.399998], 'video_id': ['22w7z_lT6YM']}}, {'_index': 'twelvelabs-movie-trailer-flat', '_id': 'zDVNPpkBpEb67Rey1B3x', '_score': 0.63516366, 'fields': {'title': ['How To Train Your Dragon | Official Trailer'], 'start_sec': [116.7], 'video_id': ['22w7z_lT6YM']}}, {'_index': 'twelvelabs-movie-trailer-flat', '_id': 'tDVNPpkBpEb67Rey1B3x', '_score': 0.6347714, 'fields': {'title': ['How To Train Your Dragon | Official Trailer'], 'start_sec': [0.0], 'video_id': ['22w7z_lT6YM']}}, {'_index': 'twelvelabs-movie-trailer-flat', '_id': 'vDVNPpkBpEb67Rey1B3x', '_score': 0.6343807, 'fields': {'title': ['How To Train Your Dragon | Official Trailer'], 'start_sec': [39.1], 'video_id': ['22w7z_lT6YM']}}]}}\n" + ] + } + ], + "source": [ + "# Create text embedding\n", + "def create_text_embedding(text_query: str) -> list:\n", + " text_model_id = TEXT_EMBEDDING_MODEL_ID\n", + " text_model_input = {\"inputType\": \"text\", \"inputText\": text_query}\n", + " response = bedrock_client.invoke_model(\n", + " modelId=text_model_id, body=json.dumps(text_model_input)\n", + " )\n", + " response_body = json.loads(response[\"body\"].read().decode(\"utf-8\"))\n", + " embedding_data = response_body.get(\"data\", [])\n", + " if embedding_data:\n", + " return embedding_data[0][\"embedding\"]\n", + " else:\n", + " return None\n", + "\n", + "\n", + "def vector_query(index_name: str, text_query: str) -> dict:\n", + "\n", + " query_embedding = create_text_embedding(text_query)\n", + " query = {\n", + " \"retriever\": {\n", + " \"knn\": {\n", + " \"field\": \"embedding\",\n", + " \"query_vector\": query_embedding,\n", + " \"k\": 10,\n", + " \"num_candidates\": \"25\",\n", + " }\n", + " },\n", + " \"size\": 10,\n", + " \"_source\": False,\n", + " \"fields\": [\"title\", \"video_id\", \"start_sec\"],\n", + " }\n", + " return es.search(index=index_name, body=query).body\n", + "\n", + "\n", + "text_query = \"Show me scenes with dinosaurs\"\n", + "print(vector_query(\"twelvelabs-movie-trailer-flat\", text_query))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "abab8a65", + "metadata": {}, + "outputs": [], + "source": [ + "from ipywidgets import widgets, HTML as WHTML, HBox, Layout\n", + "from IPython.display import display\n", + "\n", + "\n", + "def display_search_results_html(query):\n", + " results = vector_query(\"twelvelabs-movie-trailer-flat\", query)\n", + " hits = results.get(\"hits\", {}).get(\"hits\", [])\n", + "\n", + " if not hits:\n", + " return \"

No results found

\"\n", + "\n", + " items = []\n", + " for hit in hits:\n", + " fields = hit.get(\"fields\", {})\n", + " title = fields.get(\"title\", [\"No Title\"])[0]\n", + " score = hit.get(\"_score\", 0)\n", + " video_id = fields.get(\"video_id\", [\"\"])[0]\n", + " start_sec = fields.get(\"start_sec\", [0])[0]\n", + " url = f\"https://www.youtube.com/watch?v={video_id}&t={int(start_sec)}s\"\n", + " items.append(\n", + " f'
  • {title} (Start: {float(start_sec):.1f}s) Score: {score}
  • '\n", + " )\n", + "\n", + " return \"

    Search Results:

    \"\n", + "\n", + "\n", + "def search_videos():\n", + " search_input = widgets.Text(\n", + " value=\"\",\n", + " placeholder=\"Enter your search query…\",\n", + " description=\"Search:\",\n", + " layout=Layout(width=\"70%\"),\n", + " )\n", + "\n", + " search_button = widgets.Button(\n", + " description=\"Search Videos\", button_style=\"primary\", layout=Layout(width=\"20%\")\n", + " )\n", + "\n", + " # Use a single HTML widget for output; update its .value to avoid double-rendering\n", + " results_box = WHTML(value=\"\")\n", + "\n", + " def on_button_click(_):\n", + " q = search_input.value.strip()\n", + " if not q:\n", + " results_box.value = \"

    Please enter a search query

    \"\n", + " return\n", + " results_box.value = \"

    Searching…

    \"\n", + " results_box.value = display_search_results_html(q)\n", + "\n", + " # Avoid multiple handler attachments if the cell is re-run\n", + " try:\n", + " search_button._click_handlers.callbacks.clear()\n", + " except Exception:\n", + " pass\n", + " search_button.on_click(on_button_click)\n", + "\n", + " display(HBox([search_input, search_button]))\n", + " display(results_box)\n", + "\n", + "\n", + "# Call this to create the UI\n", + "search_videos()" + ] + }, + { + "cell_type": "markdown", + "id": "f1d8e3d1", + "metadata": {}, + "source": [ + "---\n", + "\n", + "#### Screenshot of UI when done:\n", + "\n", + "![Screenshot](./images/marengo2.jpg) " + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.13.3" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/supporting-blog-content/twelvelabs-bedrock-marengo/images/marengo1.jpg b/supporting-blog-content/twelvelabs-bedrock-marengo/images/marengo1.jpg new file mode 100644 index 00000000..e0cf06e2 Binary files /dev/null and b/supporting-blog-content/twelvelabs-bedrock-marengo/images/marengo1.jpg differ diff --git a/supporting-blog-content/twelvelabs-bedrock-marengo/images/marengo2.jpg b/supporting-blog-content/twelvelabs-bedrock-marengo/images/marengo2.jpg new file mode 100644 index 00000000..7577ca93 Binary files /dev/null and b/supporting-blog-content/twelvelabs-bedrock-marengo/images/marengo2.jpg differ