In [1]:
import boto3
from botocore.exceptions import BotoCoreError
import json
import requests
from requests.auth import HTTPBasicAuth
from requests_aws4auth import AWS4Auth
import time

# This Python code is compatible with AWS OpenSearch versions 2.9 and higher.
class AIConnectorHelper:
    
    def __init__(self, region, opensearch_domain_name, opensearch_domain_username, opensearch_domain_password, aws_user_name):
        self.region = region
        self.opensearch_domain_url, self.opensearch_domain_arn = AIConnectorHelper.get_opensearch_domain_info(region, opensearch_domain_name)
        self.opensearch_domain_username = opensearch_domain_username
        self.opensearch_domain_opensearch_domain_password = opensearch_domain_password
        self.aws_user_name = aws_user_name
        
    @staticmethod    
    def get_opensearch_domain_info(region, domain_name):
        # Create a boto3 client for OpenSearch
        opensearch_client = boto3.client('es', region_name=region)

        try:
            # Describe the domain to get its information
            response = opensearch_client.describe_elasticsearch_domain(DomainName=domain_name)
            domain_info = response['DomainStatus']

            # Extract the domain URL and ARN
            domain_url = domain_info['Endpoint']
            domain_arn = domain_info['ARN']

            return f'https://{domain_url}', domain_arn

        except opensearch_client.exceptions.ResourceNotFoundException:
            print(f"Domain '{domain_name}' not found.")
            return None, None
        
    def get_user_arn(self, username):
        # Create a boto3 client for IAM
        iam_client = boto3.client('iam')

        try:
            # Get information about the IAM user
            response = iam_client.get_user(UserName=username)
            user_arn = response['User']['Arn']
            return user_arn
        except iam_client.exceptions.NoSuchEntityException:
            print(f"IAM user '{username}' not found.")
            return None

    def secret_exists(self, secret_name):
        secretsmanager = boto3.client('secretsmanager', region_name=self.region)
        try:
            # Try to get the secret
            secretsmanager.get_secret_value(SecretId=secret_name)
            # If no exception was raised by get_secret_value, the secret exists
            return True
        except secretsmanager.exceptions.ResourceNotFoundException:
            # If a ResourceNotFoundException was raised, the secret does not exist
            return False

    def get_secret_arn(self, secret_name):
        secretsmanager = boto3.client('secretsmanager', region_name=self.region)
        try:
            response = secretsmanager.describe_secret(SecretId=secret_name)
            # Return ARN of the secret 
            return response['ARN']
        except secretsmanager.exceptions.ResourceNotFoundException:
            print(f"The requested secret {secret_name} was not found")
            return None
        except Exception as e:
            print(f"An error occurred: {e}")
            return None
        
    def get_secret(self, secret_name):
        secretsmanager = boto3.client('secretsmanager', region_name=self.region)
        try:
            response = secretsmanager.get_secret_value(SecretId=secret_name)
        except secretsmanager.exceptions.NoSuchEntityException:
            print("The requested secret was not found")
            return None
        except Exception as e:
            print(f"An error occurred: {e}")
            return None
        else:
            return response.get('SecretString')

    def create_secret(self, secret_name, secret_value):
        secretsmanager = boto3.client('secretsmanager', region_name=self.region)

        try:
            response = secretsmanager.create_secret(
                Name=secret_name,
                SecretString=json.dumps(secret_value),
            )
            print(f'Secret {secret_name} created successfully.')
            return response['ARN']  # Return the ARN of the created secret
        except BotoCoreError as e:
            print(f'Error creating secret: {e}')
            return None


    def role_exists(self, role_name):
        iam_client = boto3.client('iam')

        try:
            iam_client.get_role(RoleName=role_name)
            return True
        except iam_client.exceptions.NoSuchEntityException:
            return False

    def delete_role(self, role_name):
        iam_client = boto3.client('iam')

        try:
            # Detach managed policies
            policies = iam_client.list_attached_role_policies(RoleName=role_name)['AttachedPolicies']
            for policy in policies:
                iam_client.detach_role_policy(RoleName=role_name, PolicyArn=policy['PolicyArn'])
            print(f'All managed policies detached from role {role_name}.')

            # Delete inline policies
            inline_policies = iam_client.list_role_policies(RoleName=role_name)['PolicyNames']
            for policy_name in inline_policies:
                iam_client.delete_role_policy(RoleName=role_name, PolicyName=policy_name)
            print(f'All inline policies deleted from role {role_name}.')

            # Now, delete the role
            iam_client.delete_role(RoleName=role_name)
            print(f'Role {role_name} deleted.')

        except iam_client.exceptions.NoSuchEntityException:
            print(f'Role {role_name} does not exist.')


    def create_iam_role(self, role_name, trust_policy_json, inline_policy_json):
        iam_client = boto3.client('iam')

        try:
            # Create the role with the trust policy
            create_role_response = iam_client.create_role(
                RoleName=role_name,
                AssumeRolePolicyDocument=json.dumps(trust_policy_json),
                Description='Role with custom trust and inline policies',
            )

            # Get the ARN of the newly created role
            role_arn = create_role_response['Role']['Arn']

            # Attach the inline policy to the role
            iam_client.put_role_policy(
                RoleName=role_name,
                PolicyName='InlinePolicy',  # you can replace this with your preferred policy name
                PolicyDocument=json.dumps(inline_policy_json)
            )

            print(f'Created role: {role_name}')
            return role_arn

        except Exception as e:
            print(f"Error creating the role: {e}")
            return None

    def get_role_arn(self, role_name):
        iam_client = boto3.client('iam')
        try:
            response = iam_client.get_role(RoleName=role_name)
            # Return ARN of the role
            return response['Role']['Arn']
        except iam_client.exceptions.NoSuchEntityException:
            print(f"The requested role {role_name} does not exist")
            return None
        except Exception as e:
            print(f"An error occurred: {e}")
            return None
    
    
    def get_role_details(self, role_name):
        iam = boto3.client('iam')

        try:
            response = iam.get_role(RoleName=role_name)
            role = response['Role']

            print(f"Role Name: {role['RoleName']}")
            print(f"Role ID: {role['RoleId']}")
            print(f"ARN: {role['Arn']}")
            print(f"Creation Date: {role['CreateDate']}")
            print("Assume Role Policy Document:")
            print(json.dumps(role['AssumeRolePolicyDocument'], indent=4, sort_keys=True))

            list_role_policies_response = iam.list_role_policies(RoleName=role_name)

            for policy_name in list_role_policies_response['PolicyNames']:
                get_role_policy_response = iam.get_role_policy(RoleName=role_name, PolicyName=policy_name)
                print(f"Role Policy Name: {get_role_policy_response['PolicyName']}")
                print("Role Policy Document:")
                print(json.dumps(get_role_policy_response['PolicyDocument'], indent=4, sort_keys=True))

        except iam.exceptions.NoSuchEntityException:
            print(f'Role {role_name} does not exist.')

    def map_iam_role_to_backend_role(self, role_arn, os_security_role='ml_full_access'):
        url = f'{self.opensearch_domain_url}/_plugins/_security/api/rolesmapping/{os_security_role}'
        r=requests.get(url, auth=HTTPBasicAuth(self.opensearch_domain_username, self.opensearch_domain_opensearch_domain_password))
        role_mapping = json.loads(r.text)
        headers = {"Content-Type": "application/json"}
        if 'status' in role_mapping and role_mapping['status'] == 'NOT_FOUND':
            data = {'backend_roles': [ role_arn ] }
            response = requests.put(url, headers=headers, data=json.dumps(data), auth=HTTPBasicAuth(self.opensearch_domain_username, self.opensearch_domain_opensearch_domain_password))
            print(response.text)
        else:
            role_mapping = role_mapping[os_security_role]
            role_mapping['backend_roles'].append(role_arn)
            data = [
              {
                "op": "replace", "path": "/backend_roles", "value": list(set(role_mapping['backend_roles']))
              }
            ]
            response = requests.patch(url, headers=headers, data=json.dumps(data), auth=HTTPBasicAuth(self.opensearch_domain_username, self.opensearch_domain_opensearch_domain_password))
            print(response.text)

    def assume_role(self, create_connector_role_arn, role_session_name="your_session_name"):
        sts_client = boto3.client('sts')

        #role_arn = f"arn:aws:iam::{aws_account_id}:role/{role_name}"
        assumed_role_object = sts_client.assume_role(
            RoleArn=create_connector_role_arn,
            RoleSessionName=role_session_name,
        )

        # Obtain the temporary credentials from the assumed role 
        temp_credentials = assumed_role_object["Credentials"]

        return temp_credentials

    def create_connector(self, create_connector_role_name, payload):
        create_connector_role_arn = self.get_role_arn(create_connector_role_name)
        temp_credentials = self.assume_role(create_connector_role_arn)
        awsauth = AWS4Auth(
            temp_credentials["AccessKeyId"],
            temp_credentials["SecretAccessKey"],
            self.region,
            'es',
            session_token=temp_credentials["SessionToken"],
        )

        path = '/_plugins/_ml/connectors/_create'
        url = self.opensearch_domain_url + path

        headers = {"Content-Type": "application/json"}

        r = requests.post(url, auth=awsauth, json=payload, headers=headers)
        print(r.text)
        connector_id = json.loads(r.text)['connector_id']
        return connector_id
    
    def search_model_group(self, model_group_name):
        payload = {
          "query": {
            "term": {
              "name.keyword": {
                "value": model_group_name
              }
            }
          }
        }
        headers = {"Content-Type": "application/json"}
        r = requests.post(f'{self.opensearch_domain_url}/_plugins/_ml/model_groups/_search',
                          auth=HTTPBasicAuth(self.opensearch_domain_username, self.opensearch_domain_opensearch_domain_password),
                          json=payload,
                          headers=headers)
        #print(r.text)
        response = json.loads(r.text)
        return response
    
    def create_model_group(self, model_group_name, description):
        search_model_group_response = self.search_model_group(model_group_name)
        if 'no such index [.plugins-ml-model-group]' not in str(search_model_group_response) and search_model_group_response['hits']['total']['value'] > 0:
            return search_model_group_response['hits']['hits'][0]['_id']
        payload = {
          "name": model_group_name,
          "description": description
        }
        headers = {"Content-Type": "application/json"}
        r = requests.post(f'{self.opensearch_domain_url}/_plugins/_ml/model_groups/_register',
                          auth=HTTPBasicAuth(self.opensearch_domain_username, self.opensearch_domain_opensearch_domain_password),
                          json=payload,
                          headers=headers)
        print(r.text)
        response = json.loads(r.text)
        return response['model_group_id']
    
    def get_task(self, task_id):
        return requests.get(f'{self.opensearch_domain_url}/_plugins/_ml/tasks/{task_id}',
                          auth=HTTPBasicAuth(self.opensearch_domain_username, self.opensearch_domain_opensearch_domain_password))
    
    def create_model(self, model_name, description, connector_id, deploy=True):
        model_group_id = self.create_model_group(model_name, description)
        payload = {
          "name": model_name,
          "function_name": "remote",
          "description": description,
          "model_group_id": model_group_id,
          "connector_id": connector_id
        }
        headers = {"Content-Type": "application/json"}
        deploy_str = str(deploy).lower()
        r = requests.post(f'{self.opensearch_domain_url}/_plugins/_ml/models/_register?deploy={deploy_str}',
                          auth=HTTPBasicAuth(self.opensearch_domain_username, self.opensearch_domain_opensearch_domain_password),
                          json=payload,
                          headers=headers)
        print(r.text)
        response = json.loads(r.text)
        if 'model_id' in response:
            return response['model_id']
        else:
            time.sleep(2) # sleep two seconds for task complete
            r = self.get_task(response['task_id'])
            print(r.text)
            return json.loads(r.text)['model_id']
    
    def deploy_model(self, model_id):
        return requests.post(f'{self.opensearch_domain_url}/_plugins/_ml/models/{model_id}/_deploy',
                          auth=HTTPBasicAuth(self.opensearch_domain_username, self.opensearch_domain_opensearch_domain_password),
                          headers=headers)
    
    def predict(self, model_id, payload):
        headers = {"Content-Type": "application/json"}

        r = requests.post(f'{self.opensearch_domain_url}/_plugins/_ml/models/{model_id}/_predict',
                          auth=HTTPBasicAuth(self.opensearch_domain_username, self.opensearch_domain_opensearch_domain_password),
                          json=payload,
                          headers=headers)
        return r.text
    
    def create_connector_with_secret(self, secret_name, secret_value, connector_role_name, create_connector_role_name, create_connector_input, sleep_time_in_seconds=10):
        # Step1: Create Secret
        print('Step1: Create Secret')
        if not self.secret_exists(secret_name):
            secret_arn = self.create_secret(secret_name, secret_value)
        else:
            print('secret exists, skip creating')
            secret_arn = self.get_secret_arn(secret_name)
        #print(secret_arn)
        print('----------')
        
        # Step2: Create IAM role configued in connector
        trust_policy = {
          "Version": "2012-10-17",
          "Statement": [
            {
              "Effect": "Allow",
              "Principal": {
                "Service": "es.amazonaws.com"
              },
              "Action": "sts:AssumeRole"
            }
          ]
        }

        inline_policy = {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Action": [
                        "secretsmanager:GetSecretValue"
                    ],
                    "Effect": "Allow",
                    "Resource": secret_arn
                }
            ]
        }

        print('Step2: Create IAM role configued in connector')
        if not self.role_exists(connector_role_name):
            connector_role_arn = self.create_iam_role(connector_role_name, trust_policy, inline_policy)
        else:
            print('role exists, skip creating')
            connector_role_arn = self.get_role_arn(connector_role_name)
        #print(connector_role_arn)
        print('----------')
        
        # Step 3: Configure IAM role in OpenSearch
        # 3.1 Create IAM role for Signing create connector request
        user_arn = self.get_user_arn(self.aws_user_name)
        trust_policy = {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Effect": "Allow",
                    "Principal": {
                        "AWS": user_arn
                    },
                    "Action": "sts:AssumeRole"
                }
            ]
        }

        inline_policy = {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Effect": "Allow",
                    "Action": "iam:PassRole",
                    "Resource": connector_role_arn
                },
                {
                    "Effect": "Allow",
                    "Action": "es:ESHttpPost",
                    "Resource": self.opensearch_domain_arn
                }
            ]
        }

        print('Step 3: Configure IAM role in OpenSearch')
        print('Step 3.1: Create IAM role for Signing create connector request')
        if not self.role_exists(create_connector_role_name):
            create_connector_role_arn = self.create_iam_role(create_connector_role_name, trust_policy, inline_policy)
        else:
            print('role exists, skip creating')
            create_connector_role_arn = self.get_role_arn(create_connector_role_name)
        #print(create_connector_role_arn)
        print('----------')
        
        # 3.2 Map backend role
        print(f'Step 3.2: Map IAM role {create_connector_role_name} to OpenSearch permission role')
        self.map_iam_role_to_backend_role(create_connector_role_arn)
        print('----------')
        
        # 4. Create connector
        print('Step 4: Create connector in OpenSearch')
        # When you create an IAM role, it can take some time for the changes to propagate across AWS systems.
        # During this time, some services might not immediately recognize the new role or its permissions.
        # So we wait for some time before creating connector.
        # If you see such error: ClientError: An error occurred (AccessDenied) when calling the AssumeRole operation
        # you can rerun this function.
        
        # Wait for some time
        time.sleep(sleep_time_in_seconds)
        payload = create_connector_input
        payload['credential'] = {
            "secretArn": secret_arn,
            "roleArn": connector_role_arn
        }
        connector_id = self.create_connector(create_connector_role_name, payload)
        #print(connector_id)
        print('----------')
        return connector_id
    
    def create_connector_with_role(self, connector_role_inline_policy, connector_role_name, create_connector_role_name, create_connector_input, sleep_time_in_seconds=10):
        # Step1: Create IAM role configued in connector
        trust_policy = {
          "Version": "2012-10-17",
          "Statement": [
            {
              "Effect": "Allow",
              "Principal": {
                "Service": "es.amazonaws.com"
              },
              "Action": "sts:AssumeRole"
            }
          ]
        }

        print('Step1: Create IAM role configued in connector')
        if not self.role_exists(connector_role_name):
            connector_role_arn = self.create_iam_role(connector_role_name, trust_policy, connector_role_inline_policy)
        else:
            print('role exists, skip creating')
            connector_role_arn = self.get_role_arn(connector_role_name)
        #print(connector_role_arn)
        print('----------')

        # Step 2: Configure IAM role in OpenSearch
        # 2.1 Create IAM role for Signing create connector request
        user_arn = self.get_user_arn(self.aws_user_name)
        trust_policy = {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Effect": "Allow",
                    "Principal": {
                        "AWS": user_arn
                    },
                    "Action": "sts:AssumeRole"
                }
            ]
        }

        inline_policy = {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Effect": "Allow",
                    "Action": "iam:PassRole",
                    "Resource": connector_role_arn
                },
                {
                    "Effect": "Allow",
                    "Action": "es:ESHttpPost",
                    "Resource": self.opensearch_domain_arn
                }
            ]
        }

        print('Step 2: Configure IAM role in OpenSearch')
        print('Step 2.1: Create IAM role for Signing create connector request')
        if not self.role_exists(create_connector_role_name):
            create_connector_role_arn = self.create_iam_role(create_connector_role_name, trust_policy, inline_policy)
        else:
            print('role exists, skip creating')
            create_connector_role_arn = self.get_role_arn(create_connector_role_name)
        #print(create_connector_role_arn)
        print('----------')

        # 2.2 Map backend role
        print(f'Step 2.2: Map IAM role {create_connector_role_name} to OpenSearch permission role')
        self.map_iam_role_to_backend_role(create_connector_role_arn)
        print('----------')

        # 3. Create connector
        print('Step 3: Create connector in OpenSearch')
        # When you create an IAM role, it can take some time for the changes to propagate across AWS systems.
        # During this time, some services might not immediately recognize the new role or its permissions.
        # So we wait for some time before creating connector.
        # If you see such error: ClientError: An error occurred (AccessDenied) when calling the AssumeRole operation
        # you can rerun this function.

        # Wait for some time
        time.sleep(sleep_time_in_seconds)
        payload = create_connector_input
        payload['credential'] = {
            "roleArn": connector_role_arn
        }
        connector_id = self.create_connector(create_connector_role_name, payload)
        #print(connector_id)
        print('----------')
        return connector_id

# 0. Create helper

In [2]:
region = '...' # set your AWS region, for example us-west-2
opensearch_domain_name = '...' # set your AWS OpenSearch domain name
opensearch_domain_username = '...' # set your AWS OpenSearch domain admin username
opensearch_domain_password = '...' # set your domain password

aws_user_name = '...' # set your AWS IAM user name, not IAM user ARN. 
            # To avoid permission issue and quick start, you can use user whith AdministratorAccess policy
            # Configure this user's access key and secret key in ~/.aws/credential 
            # You can configure ~/.aws/credential as:
'''
[default]
AWS_ACCESS_KEY_ID = YOUR_ACCESS_KEY_ID
AWS_SECRET_ACCESS_KEY = YOUR_SECRET_ACCESS_KEY
'''

helper = AIConnectorHelper(region, 
                           opensearch_domain_name, 
                           opensearch_domain_username, 
                           opensearch_domain_password, 
                           aws_user_name)

# 1. Create Connector of Bedrock Embedding Model

Read [semantic_search_with_bedrock_titan_embedding_model](./semantic_search_with_bedrock_titan_embedding_model.md) for more details.

In [3]:
connector_role_inline_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "bedrock:InvokeModel"
            ],
            "Effect": "Allow",
            "Resource": "arn:aws:bedrock:*::foundation-model/amazon.titan-embed-text-v1"
        }
    ]
}

# You can use existing role if the role permission and trust relationship are correct. 
# To quick start, you can specify new role names. AIConnectorHelper will create role automatically.
connector_role_name = 'my_test_bedrock_connector_role'
create_connector_role_name = 'my_test_create_bedrock_connector_role'

bedrock_region = 'us-west-2' # bedrock region could be different with OpenSearch domain region
create_connector_input = {
  "name": "Amazon Bedrock Connector: titan embedding v1",
  "description": "The connector to bedrock Titan embedding model",
  "version": 1,
  "protocol": "aws_sigv4",
  "parameters": {
    "region": bedrock_region,
    "service_name": "bedrock"
  },
  "actions": [
    {
      "action_type": "predict",
      "method": "POST",
      "url": f"https://bedrock-runtime.{bedrock_region}.amazonaws.com/model/amazon.titan-embed-text-v1/invoke",
      "headers": {
        "content-type": "application/json",
        "x-amz-content-sha256": "required"
      },
      "request_body": "{ \"inputText\": \"${parameters.inputText}\" }",
      "pre_process_function": "\n    StringBuilder builder = new StringBuilder();\n    builder.append(\"\\\"\");\n    String first = params.text_docs[0];\n    builder.append(first);\n    builder.append(\"\\\"\");\n    def parameters = \"{\" +\"\\\"inputText\\\":\" + builder + \"}\";\n    return  \"{\" +\"\\\"parameters\\\":\" + parameters + \"}\";",
      "post_process_function": "\n      def name = \"sentence_embedding\";\n      def dataType = \"FLOAT32\";\n      if (params.embedding == null || params.embedding.length == 0) {\n        return params.message;\n      }\n      def shape = [params.embedding.length];\n      def json = \"{\" +\n                 \"\\\"name\\\":\\\"\" + name + \"\\\",\" +\n                 \"\\\"data_type\\\":\\\"\" + dataType + \"\\\",\" +\n                 \"\\\"shape\\\":\" + shape + \",\" +\n                 \"\\\"data\\\":\" + params.embedding +\n                 \"}\";\n      return json;\n    "
    }
  ]
}

connector_id = helper.create_connector_with_role(connector_role_inline_policy,
                                                 connector_role_name,
                                                 create_connector_role_name,
                                                 create_connector_input,
                                                 sleep_time_in_seconds=10)

Step1: Create IAM role configued in connector
Created role: my_test_bedrock_connector_role
----------
Step 2: Configure IAM role in OpenSearch
Step 2.1: Create IAM role for Signing create connector request
Created role: my_test_create_bedrock_connector_role
----------
Step 2.2: Map IAM role my_test_create_bedrock_connector_role to OpenSearch permission role
{"status":"OK","message":"'ml_full_access' updated."}
----------
Step 3: Create connector in OpenSearch
{"connector_id":"7p2gMI4BWbTmLN9FeY7x"}
----------


In [4]:
model_name = 'Bedrock embedding model'
description = 'This is my test model'
model_id = helper.create_model(model_name, description, connector_id)

{"task_id":"ikqgMI4BOhavBOmffCmA","status":"CREATED","model_id":"i0qgMI4BOhavBOmffCma"}


In [5]:
payload = {
  "parameters": {
    "inputText": "hello world"
  }
}
r=helper.predict(model_id, payload)
r

'{"inference_results":[{"output":[{"name":"sentence_embedding","data_type":"FLOAT32","shape":[1536],"data":[0.7265625,-0.0703125,0.34765625,...]}],"status_code":200}]}'

# 2. Create Connector of Sagemaker Embedding Model

Read [semantic_search_with_sagemaker_embedding_model](./semantic_search_with_sagemaker_embedding_model.md) for more details.

In [6]:
sagemaker_inference_endpoint_arn = "..." # set your SageMaker inference endpoint ARN
connector_role_inline_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sagemaker:InvokeEndpoint"
            ],
            "Resource": [
                sagemaker_inference_endpoint_arn
            ]
        }
    ]
}

# You can use existing role if the role permission and trust relationship are correct. 
# To quick start, you can specify new role names. AIConnectorHelper will create role automatically.
connector_role_name = 'my_test_sagemaker_connector_role'
create_connector_role_name = 'my_test_create_sagemaker_connector_role'
sagemaker_inference_endpoint_url = '...' # set your SageMaker inference endpoint URL

sagemaker_endpoint_region = 'us-west-2' # SageMaker endpoint region could be different with OpenSearch domain region
create_connector_input = {
  "name": "Sagemaker embedding model connector",
  "description": "Connector for my Sagemaker embedding model",
  "version": "1.0",
  "protocol": "aws_sigv4",
  "parameters": {
    "region": sagemaker_endpoint_region,
    "service_name": "sagemaker"
  },
  "actions": [
    {
      "action_type": "predict",
      "method": "POST",
      "headers": {
        "content-type": "application/json"
      },
      "url": sagemaker_inference_endpoint_url,
      "request_body": "${parameters.input}",
      "pre_process_function": "connector.pre_process.default.embedding",
      "post_process_function": "connector.post_process.default.embedding"
    }
  ]
}

connector_id = helper.create_connector_with_role(connector_role_inline_policy,
                                                 connector_role_name,
                                                 create_connector_role_name,
                                                 create_connector_input,
                                                 sleep_time_in_seconds=10)

Step1: Create IAM role configued in connector
Created role: my_test_sagemaker_connector_role
----------
Step 2: Configure IAM role in OpenSearch
Step 2.1: Create IAM role for Signing create connector request
Created role: my_test_create_sagemaker_connector_role
----------
Step 2.2: Map IAM role my_test_create_sagemaker_connector_role to OpenSearch permission role
{"status":"OK","message":"'ml_full_access' updated."}
----------
Step 3: Create connector in OpenSearch
{"connector_id":"jUqhMI4BOhavBOmfhCnz"}
----------


In [7]:
model_name = 'Sagemaker embedding model'
description = 'This is my test model'
model_id = helper.create_model(model_name, description, connector_id)

{"task_id":"752hMI4BWbTmLN9FjY7M","status":"CREATED","model_id":"8J2hMI4BWbTmLN9FjY7k"}


In [8]:
payload = {
  "parameters": {
    "input": ["hello world", "how are you"]
  }
}
r=helper.predict(model_id, payload)
r

'{"inference_results":[{"output":[{"name":"sentence_embedding","data_type":"FLOAT32","shape":[384],"data":[0.0067263525,0.034647945,0.051665697,0.045790758,...]},{"name":"sentence_embedding","data_type":"FLOAT32","shape":[384],"data":[0.17081982,-0.035508048,0.017017538,...]}],"status_code":200}]}'

# 3. Create Connector of Cohere Embedding Model with secret
Read [semantic_search_with_cohere_embedding_model](./semantic_search_with_cohere_embedding_model.md) for more details.

In [9]:
secret_name = 'my_test_cohere_secret'
secret_key = 'my_cohere_key'
secret_value = '...' # set your Cohere API key
secret_value = { secret_key: secret_value }
# You can use existing role if the role permission and trust relationship are correct. 
# To quick start, you can specify new role names. AIConnectorHelper will create role automatically.
connector_role_name = 'my_test_cohere_connector_role'
create_connector_role_name = 'my_test_create_cohere_connector_role'

create_connector_input = {
  "name": "cohere-embed-v3",
  "description": "The connector to public Cohere model service for embed",
  "version": "1",
  "protocol": "http",
  "parameters": {
    "model": "embed-english-v3.0",
    "input_type":"search_document",
    "truncate": "END"
  },
  "actions": [
    {
      "action_type": "predict",
      "method": "POST",
      "url": "https://api.cohere.ai/v1/embed",
      "headers": {
        "Authorization": f"Bearer ${{credential.secretArn.{secret_key}}}",
        "Request-Source": "unspecified:opensearch"
      },
      "request_body": "{ \"texts\": ${parameters.texts}, \"truncate\": \"${parameters.truncate}\", \"model\": \"${parameters.model}\", \"input_type\": \"${parameters.input_type}\" }",
      "pre_process_function": "connector.pre_process.cohere.embedding",
      "post_process_function": "connector.post_process.cohere.embedding"
    }
  ]
}

connector_id = helper.create_connector_with_secret(secret_name,
                                                   secret_value,
                                                   connector_role_name,
                                                   create_connector_role_name, 
                                                   create_connector_input,
                                                   sleep_time_in_seconds=10)

Step1: Create Secret
secret exists, skip creating
----------
Step2: Create IAM role configued in connector
Created role: my_test_cohere_connector_role
----------
Step 3: Configure IAM role in OpenSearch
Step 3.1: Create IAM role for Signing create connector request
Created role: my_test_create_cohere_connector_role
----------
Step 3.2: Map IAM role my_test_create_cohere_connector_role to OpenSearch permission role
{"status":"OK","message":"'ml_full_access' updated."}
----------
Step 4: Create connector in OpenSearch
{"connector_id":"8p2iMI4BWbTmLN9Fi47_"}
----------


In [10]:
model_name = 'Cohere embedding model'
description = 'This is my test model'
model_id = helper.create_model(model_name, description, connector_id)

{"task_id":"jkqiMI4BOhavBOmfjSnB","status":"CREATED","model_id":"j0qiMI4BOhavBOmfjSn1"}


In [11]:
payload = {
  "parameters": {
    "texts": ["hello world", "how are you"]
  }
}
r=helper.predict(model_id, payload)
r

'{"inference_results":[{"output":[{"name":"sentence_embedding","data_type":"FLOAT32","shape":[1024],"data":[-0.029510498,-0.023223877,-0.059631348,...]},{"name":"sentence_embedding","data_type":"FLOAT32","shape":[1024],"data":[0.02279663,0.014976501,-0.04058838,...]}],"status_code":200}]}'

# 4. Create Connector of OpenAI Embedding Model with secret
Read [semantic_search_with_openai_embedding_model](./semantic_search_with_openai_embedding_model.md) for more details.

In [12]:
secret_name = 'my_test_openai_secret'
secret_key = 'my_openai_key'
secret_value = '...' # set your OpenAI API key
secret_value = { secret_key : secret_value }
# You can use existing role if the role permission and trust relationship are correct. 
# To quick start, you can specify new role names. AIConnectorHelper will create role automatically.
connector_role_name = 'my_test_openai_connector_role'
create_connector_role_name = 'my_test_create_openai_connector_role'

create_connector_input = {
  "name": "OpenAI embedding model connector",
  "description": "Connector for OpenAI embedding model",
  "version": "1.0",
  "protocol": "http",
  "parameters": {
    "model": "text-embedding-ada-002"
  },
  "actions": [
    {
      "action_type": "predict",
      "method": "POST",
      "url": "https://api.openai.com/v1/embeddings",
      "headers": {
        "Authorization": f"Bearer ${{credential.secretArn.{secret_key}}}",
      },
      "request_body": "{ \"input\": ${parameters.input}, \"model\": \"${parameters.model}\" }",
      "pre_process_function": "connector.pre_process.openai.embedding",
      "post_process_function": "connector.post_process.openai.embedding"
    }
  ]
}

connector_id = helper.create_connector_with_secret(secret_name,
                                                   secret_value, 
                                                   connector_role_name, 
                                                   create_connector_role_name, 
                                                   create_connector_input,
                                                   sleep_time_in_seconds=10)

Step1: Create Secret
secret exists, skip creating
----------
Step2: Create IAM role configued in connector
Created role: my_test_openai_connector_role
----------
Step 3: Configure IAM role in OpenSearch
Step 3.1: Create IAM role for Signing create connector request
Created role: my_test_create_openai_connector_role
----------
Step 3.2: Map IAM role my_test_create_openai_connector_role to OpenSearch permission role
{"status":"OK","message":"'ml_full_access' updated."}
----------
Step 4: Create connector in OpenSearch
{"connector_id":"cRWzMI4BTaDH9c7t0Rfd"}
----------


In [13]:
model_name = 'OpenAI embedding model'
description = 'This is my test model'
model_id = helper.create_model(model_name, description, connector_id)

{"task_id":"kkqzMI4BOhavBOmf9im-","status":"CREATED","model_id":"k0qzMI4BOhavBOmf9inW"}


In [14]:
payload = {
  "parameters": {
    "input": ["hello world", "how are you"]
  }
}
r=helper.predict(model_id, payload)
r

'{"inference_results":[{"output":[{"name":"sentence_embedding","data_type":"FLOAT32","shape":[1536],"data":[-0.014903838,0.0013317588,-0.018488139,...]},{"name":"sentence_embedding","data_type":"FLOAT32","shape":[1536],"data":[-0.014025201,-0.006837123,-0.01223793,...]}],"status_code":200}]}'