# Open Search Serverless Collection creation
This notebook demonstrates how to create an OpenSearch Serverless Collection using the AWS Python SDK (Boto3). OpenSearch Serverless is a fully managed service that makes it easy to launch and run OpenSearch clusters in the cloud. It simplifies the deployment and management of OpenSearch by automatically provisioning, configuring, and scaling the resources required to run OpenSearch

## Install required libraries
The following cell installs required python libraries specified in the 'requirements.txt' file.

In [None]:
#This cell installs the required libraries specified in the 'requirements.txt' file
#!pip install -r requirements.txt --quiet

In [1]:
import os
import pandas as pd
import sagemaker
import boto3
import json
import pprint
import random 
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth, helpers
import time

session = boto3.session.Session()
region_name = session.region_name

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


## Required permissions
Your role or user will need a certain number of policies attached to execute the below code including AmazonBedrockFullAccess, AmazonOpenSearchServiceFullAccess, and the following policy for OpenSearchServerless. This policy grants full access to the OpenSearch Serverless service, allowing you to create, manage, and delete OpenSearch Serverless resources.

```json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "aoss:*",
            "Resource": "*"
        }
    ]
}
```

The following cells creates this policy and assigns the policy to the current user or role. If running in sagemaker notebook the code will attempt to assign the policy to the sagemaker execution role. 

In [2]:
# Create an IAM client
iam = boto3.client('iam')

suffix = random.randrange(200, 900)

# Define the policy document
policy_document = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "aoss:*",
            "Resource": "*"
        }
    ]
}

# Create the IAM policy
aossAccessPolicy = iam.create_policy(
    PolicyName='AOSSAccessPolicy-{0}'.format(suffix),
    PolicyDocument=json.dumps(policy_document)
)


aossAccessPolicyArn = aossAccessPolicy["Policy"]["Arn"]

#wait for the policy to be created
time.sleep(10)

In [3]:
# get the current identify ARN
# if running this in sagemaker this should indicate a sagemaker execution role
identity_arn = ""

try:
    # Get the execution role ARN
    identity_arn = sagemaker.get_execution_role()
    
except Exception as e:
    print("Not a sagemaker role, trying to retrieve the user identity")
    # Create an STS client
    sts_client = boto3.client('sts')

    # Get the caller identity
    caller_identity = sts_client.get_caller_identity()
    identity_arn = caller_identity['Arn']

print(f"Identity ARN:{identity_arn}")

Identity ARN:arn:aws:iam::207390309313:role/service-role/AmazonSageMaker-ExecutionRole-20220409T200590


In [4]:
# Check if the identity ARN is for a user or a role

try:
    # Try to get the user information
    user = iam.get_user(UserName=identity_arn.split('/')[-1])
    print(f"The identity ARN '{identity_arn}' is for a user.")

    # Attach the policy to the user
    iam.attach_user_policy(
        UserName=user['User']['UserName'],
        PolicyArn=aossAccessPolicyArn
    )
except iam.exceptions.NoSuchEntityException:
    # If the identity ARN is not for a user, it must be for a role
    print(f"The identity ARN '{identity_arn}' is for a role.")

    # Attach the policy to the role
    iam.attach_role_policy(
        RoleName=identity_arn.split('/')[-1],
        PolicyArn=aossAccessPolicyArn
    )

The identity ARN 'arn:aws:iam::207390309313:role/service-role/AmazonSageMaker-ExecutionRole-20220409T200590' is for a role.


## Open Search Collection Creation
Now that we have the policy created and attached to allow full access to Open Search Service (OSS), we are ready to create a OSS collection to house our embeddings and enriched metadata. There are a few additional policies we require before we can invoke to create a collection. 

1. Data access policy - to allow creation of collection & creating index with current user set as the principal.
2. Security policy - to use aws owned keys for encryption
3. Network policy - to allow access from public. NOTE: in production environments this is not recommended. You should define appropriate policy to limit access to specific resources. 

The following cell instantiates boto3 oss client before creating the required policies for security, network, and data access for the collection and index. 



In [6]:
# data access policy for OSS

collection_name = 'media-search-{0}'.format(suffix)
# Create an OpenSearch Serverless client
oss_client = boto3.client('opensearchserverless')

# define the data acccess policy 
data_access_policy = json.dumps([
      {
        "Rules": [
          {
            "Resource": [
              f"collection/{collection_name}"
            ],
            "Permission": [
              "aoss:CreateCollectionItems",
              "aoss:DeleteCollectionItems",
              "aoss:UpdateCollectionItems",
              "aoss:DescribeCollectionItems"
            ],
            "ResourceType": "collection"
          },
          {
            "Resource": [
              f"index/{collection_name}/*"
            ],
            "Permission": [
              "aoss:CreateIndex",
              "aoss:DeleteIndex",
              "aoss:UpdateIndex",
              "aoss:DescribeIndex",
              "aoss:ReadDocument",
              "aoss:WriteDocument"
            ],
            "ResourceType": "index"
          }
        ],
        "Principal": [
          identity_arn
        ],
        "Description": "data-access-rule"
      }
    ], indent=2)

data_access_policy_name_nb = f"{collection_name}-policy-notebook"

# Create the data access policy
response = oss_client.create_access_policy(
    description='Data access policy for semantic search collection',
    name=data_access_policy_name_nb,
    policy=str(data_access_policy),
    type='data'
)

pprint.pp(response)

{'accessPolicyDetail': {'createdDate': 1724211614949,
                        'description': 'Data access policy for semantic search '
                                       'collection',
                        'lastModifiedDate': 1724211614949,
                        'name': 'media-search-229-policy-notebook',
                        'policy': [{'Rules': [{'Resource': ['collection/media-search-229'],
                                               'Permission': ['aoss:CreateCollectionItems',
                                                              'aoss:DeleteCollectionItems',
                                                              'aoss:UpdateCollectionItems',
                                                              'aoss:DescribeCollectionItems'],
                                               'ResourceType': 'collection'},
                                              {'Resource': ['index/media-search-229/*'],
                                               'Permiss

In [7]:
# create the security policy 
encryption_policy_name = f"{collection_name}-sp-notebook"

encryption_policy = oss_client.create_security_policy(
    name=encryption_policy_name,
    policy=json.dumps(
        {
            'Rules': [{'Resource': ['collection/' + collection_name],
                       'ResourceType': 'collection'}],
            'AWSOwnedKey': True
        }),
        type='encryption'
    )
pprint.pp(encryption_policy)

{'securityPolicyDetail': {'createdDate': 1724211618059,
                          'lastModifiedDate': 1724211618059,
                          'name': 'media-search-229-sp-notebook',
                          'policy': {'Rules': [{'Resource': ['collection/media-search-229'],
                                                'ResourceType': 'collection'}],
                                     'AWSOwnedKey': True},
                          'policyVersion': 'MTcyNDIxMTYxODA1OV8x',
                          'type': 'encryption'},
 'ResponseMetadata': {'RequestId': '7ca6625d-d164-49c8-97fc-220ddfae63f9',
                      'HTTPStatusCode': 200,
                      'HTTPHeaders': {'x-amzn-requestid': '7ca6625d-d164-49c8-97fc-220ddfae63f9',
                                      'date': 'Wed, 21 Aug 2024 03:40:18 GMT',
                                      'content-type': 'application/x-amz-json-1.0',
                                      'content-length': '297',
                         

In [8]:
# create the network policy 
network_policy_name = f"{collection_name}-np-notebook"
network_policy = oss_client.create_security_policy(
    name=network_policy_name,
    policy=json.dumps(
        [
            {'Rules': [{'Resource': ['collection/' + collection_name],
                        'ResourceType': 'collection'}],
             'AllowFromPublic': True}
        ]),
        type='network'
    )

pprint.pp(network_policy)

{'securityPolicyDetail': {'createdDate': 1724211620293,
                          'lastModifiedDate': 1724211620293,
                          'name': 'media-search-229-np-notebook',
                          'policy': [{'Rules': [{'Resource': ['collection/media-search-229'],
                                                 'ResourceType': 'collection'}],
                                      'AllowFromPublic': True}],
                          'policyVersion': 'MTcyNDIxMTYyMDI5M18x',
                          'type': 'network'},
 'ResponseMetadata': {'RequestId': 'cbaea349-de68-47b5-bd17-9f6b1f1a1ee7',
                      'HTTPStatusCode': 200,
                      'HTTPHeaders': {'x-amzn-requestid': 'cbaea349-de68-47b5-bd17-9f6b1f1a1ee7',
                                      'date': 'Wed, 21 Aug 2024 03:40:20 GMT',
                                      'content-type': 'application/x-amz-json-1.0',
                                      'content-length': '300',
                    

We are now ready to create the OSS collection and index. The following cells creates a collection, index as well as the index schema required to house our metadata including a vector field to store our embeddings. An search_client (of type opensearch) is created in order to create the index and issue various calls to OSS. 

In [9]:
# create the collection of type vector search
oss_client = boto3.client('opensearchserverless')
collection = oss_client.create_collection(name=collection_name, type='VECTORSEARCH')
collection_id = collection['createCollectionDetail']['id']
host = collection_id + '.' + region_name + '.aoss.amazonaws.com'
print("OSS host: {0}".format(host))

# create the OSS client
service = 'aoss'
credentials = boto3.Session().get_credentials()
awsauth = AWSV4SignerAuth(credentials, region_name, service)

# Build the OpenSearch client
search_client = OpenSearch(
    hosts=[{'host': host, 'port': 443}],
    http_auth=awsauth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection,
    timeout=300
)
# # It can take up to a minute for data access rules to be enforced
time.sleep(60)

OSS host: vpdj3ctgv0rmukhasyi.us-east-1.aoss.amazonaws.com


### Defining the Opensearchserverless index
The following cell defines a OSS index schema for our dataset as well as creating the index via the OSS client. 

In [11]:
index_name = 'smart-search-index'
index_body = {
   "settings": {
      "index.knn": "true"
   },
   "mappings": {
      "properties": {
          "image_vector": {
              "type": "knn_vector",
              "dimension": 1024 # Embedding size for Amanon Titan Multimodal Embedding G1 model, it is 1,024 (default), 384, 256
          },
          "image_id" : {"type": "text"},
          "path": {"type": "text"},
          "title": {"type": "text"},
          "description": {"type": "text"},
          "keywords": {"type": "text"},
          "tags": {"type": "text"}
      }
   }
}

# We would get an index already exists exception if the index already exists, and that is fine.
try:
    response = search_client.indices.create(index_name, body=index_body)
    print(f"response received for the create index -> {response}")
except Exception as e:
    print(f"error in creating index={index_name}, exception={e}")

response received for the create index -> {'acknowledged': True, 'shards_acknowledged': True, 'index': 'smart-search-index'}


In [13]:
#display information on the index you just created

# Get index mapping
response = search_client.indices.get_mapping(index=index_name)
pprint.pp(response)

# Get index settings
response = search_client.indices.get_settings(index=index_name)
pprint.pp(response)

# Get index aliases
response = search_client.indices.get_alias(index=index_name) 
pprint.pp(response)

{'smart-search-index': {'mappings': {'properties': {'description': {'type': 'text'},
                                                    'image_id': {'type': 'text'},
                                                    'image_vector': {'type': 'knn_vector',
                                                                     'dimension': 1024},
                                                    'keywords': {'type': 'text'},
                                                    'path': {'type': 'text'},
                                                    'tags': {'type': 'text'},
                                                    'title': {'type': 'text'}}}}}
{'smart-search-index': {'settings': {'index': {'knn': 'true',
                                               'number_of_shards': '2',
                                               'number_of_replicas': '0',
                                               'uuid': '4cgHc5EBSSHZ9_4rQ5K8',
                                              

In [None]:
# deleting indices
# search_client.indices.delete(index=index_name)

## Loading the data in the index 

The index is created and ready for use. The following cells will attempt to reload data from the previous notebook and populate the index before we can issue any queries. 
If there any issues with the variable, uncomment the line to reload the dataset from CSV file saved in notebook 2. 

In [27]:
# load the dataset from notebook 2 
%store -r df_metadata
# df_metadata = pd.read_csv('./data/enriched_dataset.csv')
df_metadata.head()

Load the entire contents of the dataframe into opensearch index. The following cell does this simply by iterating over the dataframe and processing each row and insert into the index. 

In [30]:
%%time
from tqdm import tqdm
import tqdm.notebook as tq

for idx, record in tq.tqdm(df_metadata.iterrows(), total=len(df_metadata)):
    document = {
        'image_vector': df_metadata['embeddings'][idx],
        "description":   df_metadata['description'][idx],
        "image_id" : df_metadata['image_id'][idx],
        "image_url": df_metadata['path'][idx],
        "title": df_metadata['title'][idx],
        "keywords": df_metadata['keywords'][idx],
        "tags": df_metadata['tags'][idx],
    }
    response = search_client.index(
        index = index_name,
        body = document
    )

  0%|          | 0/292 [00:00<?, ?it/s]

CPU times: user 1.34 s, sys: 125 ms, total: 1.46 s
Wall time: 1min 10s


In [34]:
# save variables for use in search notebook
%store index_name
%store collection_name
%store host

Stored 'index_name' (str)
Stored 'collection_name' (str)
Stored 'host' (str)
