# Using AWS Opensearch ML Commons REST API for language detection

Amazon Opensearch 2.15.0 has a new ML inference processor that enables users to enrich ingest pipelines using inferences from OpenSearch-provided pretrained models. The ml_inference processor is used to invoke machine learning (ML) models registered in the OpenSearch ML Commons plugin. The model outputs are added as new fields to the ingested documents.

ML Commons for OpenSearch makes it easy to develop new machine learning features within Opensearch. The plugin allows machine learning engineers and developers to leverage existing opensource machine learning algorithms and streamlines the efforts to build new machine learning features.  

We will be deploying three models in this notebook, the first is an Amazon Comprehend model. The model examines the input text, detects the language using the Amazon Comprehend DetectDominantLanguage API, and sets a corresponding language code.

The second model uses Amazon Sagemaker's built-in BlazingText algorithm—a highly optimized implementation of the Word2vec and text classification algorithms that scale to large datasets easily. It is useful for many downstream natural language processing (NLP) tasks. This notebook walks through creating an Amazon OpenSearch connector, model, ingest pipeline, and testing for both the Amazon Comprehend model and the BlazingText fasttext model.

The third model is Amazon's  Titan embeddings model v2. In this notebook, we will use this embeddings model to create vectors of text in three languages (English, French and German). These vectors will then be stored in Amazon OpenSearch and allow for semantic searches to be used across the language sets.

Note: This functionality is available in **Amazon OpenSearch** 2.15.0 or later (we release odd versions), and Opensearch 2.14.0 or later


#### Step 1. Install dependencies needed for this notebook.

Ignore the ERROR about pip's dependencies.

In [None]:
%pip install sagemaker requests-aws4auth GitPython opensearch-py --upgrade --quiet

#### Step 2. Install git-lfs so that we can clone the model repos to our notebook

In [None]:
!sudo yum install -y amazon-linux-extras
!sudo amazon-linux-extras install epel -y 
!sudo yum-config-manager --enable epel
!sudo yum install git-lfs -y
!git lfs install

#### Step 3.  Store the Cloudformation output values as variables

This code block will store your cloudformation stack outputs as variables: **S3BucketName**, **SageMakerExecutionRoleArn**, **SageMakerOpenSearchRoleArn**, **OpensearchDashboardsURL**, **OpensearchDomainEndpoint**

In [None]:
import boto3

cf_client = boto3.client('cloudformation')

# Replace 'YourStackName' with the actual name of your CloudFormation stack
stack_name = 'opensearch-lang-detect-demo'

# Get stack outputs
response = cf_client.describe_stacks(StackName=stack_name)
outputs = response['Stacks'][0]['Outputs']

# Create a dictionary to store the outputs
output_dict = {output['OutputKey']: output['OutputValue'] for output in outputs}

# Retrieve values from the output dictionary
s3BucketName = output_dict.get('S3BucketName', 'Not found')
sageMakerExecutionRoleArn = output_dict.get('SageMakerExecutionRoleArn', 'Not found')
sageMakerOpenSearchRoleArn = output_dict.get('SageMakerOpenSearchRoleArn', 'Not found')
opensearchDomainEndpoint = output_dict.get('OpenSearchDomainEndpoint', 'Not found')

print('S3 Bucket Name: ' + s3BucketName)
print('SageMaker Execution Role Arn: ' + sageMakerExecutionRoleArn)
print('SageMaker OpenSearch Role Arn: ' + sageMakerOpenSearchRoleArn)
print('OpenSearch Domain Endpoint: ' + opensearchDomainEndpoint)

if 'OpenSearchDashboardsURL' in output_dict:
    print('OpenSearch Dashboards URL: ' + output_dict['OpenSearchDashboardsURL'])

#### Step 5. Add the SageMaker Execution role to OpenSearch

For us to be able to interact with OpenSearch from the notebook we need to allow the SageMaker execution role that was created by the CloudFormationTemplate to perform actions in OpenSearch.

Navigate to OpenSearch Console page in a different tab within your AWS Account. Click on the **ml-commons-demo-2-17** domain. Click on the OpenSearch Dashboards URL (IPv4) and login using the username and password you provided when deploying your cloudformation stack.  
![Dashboard](images/1_dashboard.png)

Then navigate to Security using the left hand menu.
![Security](images/2_security.png)

Next select **Roles** from the Security left hand menu.
![Roles](images/3_roles.png)

From the roles screen select **all_access**
![all access](images/4_all_access.png)

Select the **Mapped users** tab and then click on the **Manage mapping** button.
![mapped users](images/5_mapped_users.png)

Provide the **SageMaker Execution Role Arn**. (this was printed out in Step 3 above)
![mapped users](images/6_backend_roles.png)

Click on the **Map** button.

Navigate back to the **Roles** screen by using the breadcrumb at the top of the dashboard.

Search for the **ml_full_access** role and select it.
![mapped users](images/7_ml_full_access.png)

Select the **Mapped users** tab and then click on the **Manage mapping** button.
![mapped users](images/8_ml_full_access_tabs.png)

Provide the **SageMaker OpenSearch Role Arn** (this was printed out in Step 3 above)
![mapped users](images/9_add_role.png)

Click on the **Map** button.

#### Step 6. Setup the commons connector

We need to enable access control for the connector to talk to SageMaker. 

In [None]:
import requests
import boto3
import time
from requests_aws4auth import AWS4Auth

service = 'es'
session = boto3.Session()
region = session.region_name
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key,
                   region, service, session_token=credentials.token)

host= 'https://' + opensearchDomainEndpoint

# Register repository
path = '/_cluster/settings'
url = host + path

payload = {
    "persistent": {
        "plugins.ml_commons.trusted_connector_endpoints_regex": [
        """^https://runtime\.sagemaker\..*[a-z0-9-]\.amazonaws\.com/.*$""",
        """^https://api\.openai\.com/.*$""",
        """^https://api\.cohere\.ai/.*$""",
        """^https://bedrock-runtime\..*[a-z0-9-]\.amazonaws\.com/.*$""",
        """^https://comprehend\..*[a-z0-9-]\.amazonaws\.com$""",
        """^https://textract\..*[a-z0-9-]\.amazonaws\.com$""",
        """^https://translate\..*[a-z0-9-]\.amazonaws\.com$""",
        """^https://rekognition\..*[a-z0-9-]\.amazonaws\.com$""",
        """^https://personalize\..*[a-z0-9-]\.amazonaws\.com.*$""",
        """^https://personalize-runtime\..*[a-z0-9-]\.amazonaws\.com.*$"""
    ]
    }
    }
headers = {"Content-Type": "application/json"}

response = requests.put(url, auth=awsauth, json=payload, headers=headers)
print(response.json())

## Comprehend Language Classification Model

The first model will be built from the Amazon Comprehend service. This service examines the input text, detects the language using the Amazon Comprehend DetectDominantLanguage API, and sets a corresponding language code.

#### Step 7. Create the connector for the Comprehend Language Classification model

Now we will create the connector and model for Amazon Comprehend

In [None]:
comprehend = boto3.client('comprehend', region_name='us-east-1')
path = '/_plugins/_ml/connectors/_create'
url = host + path
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key,
                   region, 'es', session_token=credentials.token)

payload = {
  "name": "Comprehend lang identification",
  "description": "comprehend model",
  "version": 1,
  "protocol": "aws_sigv4",
  "credential": {
    "roleArn": sageMakerOpenSearchRoleArn
  },
  "parameters": {
    "region": "us-east-1",
    "service_name": "comprehend",
    "api_version": "20171127",
    "api_name": "DetectDominantLanguage",
    "api": "Comprehend_${parameters.api_version}.${parameters.api_name}",
    "response_filter": "$"
  },
  "actions": [
    {
      "action_type": "predict",
      "method": "POST",
      "url": "https://${parameters.service_name}.${parameters.region}.amazonaws.com",
      "headers": {
        "content-type": "application/x-amz-json-1.1",
        "X-Amz-Target": "${parameters.api}"
      },
      "request_body": "{\"Text\": \"${parameters.Text}\"}" 
    }
  ]
}
# headers = {"Content-Type": "application/json"}

comprehend_connector_response = requests.post(url, auth=awsauth, json=payload, headers=headers)
comprehend_connector = comprehend_connector_response.json()["connector_id"]
print('Connector id: ' + comprehend_connector)
# print(comprehend_connector_response.text)

#### Step 8. Register the Comprehend model

We now register the Comprehend model to the model group and the connector that we created.

In [None]:
path = '/_plugins/_ml/models/_register'
url = host + path

payload = {
    "name": "comprehend lang id model",
    "function_name": "remote",
    "description": "test model",
    "connector_id": comprehend_connector
}
headers = {"Content-Type": "application/json"}

response = requests.post(url, auth=awsauth, json=payload, headers=headers)
# print(response.json())
comprehend_model_id = response.json()['model_id']
print('Model id: ' + comprehend_model_id)

#### Step 9. Deploy the Comprehend model

In [None]:
path = '/_plugins/_ml/models/'+ comprehend_model_id + '/_deploy'
url = host + path

headers = {"Content-Type": "application/json"}

response = requests.post(url, auth=awsauth, headers=headers)
print(response.json())

#### Step 10. Test the Comprehend model through OpenSearch

In [None]:
path = '/_plugins/_ml/models/'+ comprehend_model_id + '/_predict'
url = host + path

headers = {"Content-Type": "application/json"}
payload = {
    "parameters": {
        "Text": "你知道厕所在哪里吗"
    }
}

response = requests.post(url, auth=awsauth, json=payload, headers=headers)
print(response.json())


#### Step 11. Create the comprehend index pipeline

Now we will create the pipeline for the index, this is how we tell OpenSearch to send the field(s) we wanted translations for to the Comprehend endpoint.

In [None]:
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key,
                   region, 'es', session_token=credentials.token)

path = '/_ingest/pipeline/comprehend_language_identification_pipeline'
url = host + path

headers = {"Content-Type": "application/json"}
payload = {
  "description": "ingest reviews and identify lang with the comprehend model",
  "processors":[
    {
      "ml_inference": {
        "model_id": comprehend_model_id,
        "input_map": [
            {
               "Text": "Text"
            }
        ],
        "output_map": [
            {
                
            "detected_language": "response.Languages[0].LanguageCode",
            "language_score": "response.Languages[0].Score"
            }
        ]
      }
    }
  ]
}
response = requests.put(url, auth=awsauth, json=payload, headers=headers)
print(response.json())

#### Step 12. Create the Comprehend index & test

Next we create the index using the pipeline.  

In [None]:
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key,
                   region, 'es', session_token=credentials.token)

index_name = 'comprehend_lang_ident_test01'
path = '/' + index_name

headers = {"Content-Type": "application/json"}

index_settings = {
    "settings": {
        "index": {
            "default_pipeline": "comprehend_language_identification_pipeline"
        }
    }
}

# Send the PUT request to create the index
url = host + path
response = requests.put(url, auth=awsauth, json=index_settings, headers=headers)

# Print the response
print(response.json())

In [None]:
index_name = 'comprehend_lang_ident_test01'
path = '/' + index_name + '/_doc/'

headers = {"Content-Type": "application/json"}

# Define the document to index
document = {
    "Text": "parlez vous francais?"
}

url = host + path
response = requests.post(url, auth=awsauth, json=document, headers=headers)


print(response.json())

doc_id = response.json()['_id']

# Retrieve the indexed document
get_path = '/' + index_name + '/_doc/' + doc_id
get_url = host + get_path
get_response = requests.get(get_url, auth=awsauth, headers=headers)

# Print the retrieved document
print(get_response.json())

## Fasttext Language Classification Model

Now that we've tested our Amazon Comprehend model connector, let's create a connector for our Sagemaker model using FastText language identification supported by BlazingText

More information about this algorithm can be found here:https://docs.aws.amazon.com/sagemaker/latest/dg/blazingtext.html

BlazingText is a GPU-accelerated implementation of FastText, capable of hosting pre-trained Text Classification and Word2Vec models, including FastText models. FastText is a neural network model used for both unsupervised word embedding generation and supervised text classification.

While BlazingText employs custom CUDA kernels to speed up FastText's training process, the core algorithm remains the same for both. This compatibility allows users to leverage BlazingText's hosting capabilities on Amazon SageMaker for real-time predictions using FastText models. This is particularly useful if you have your own FastText-trained model or if one of the pre-trained models provided by the FastText team meets your requirements.

In essence, BlazingText offers a way to deploy FastText models on SageMaker endpoints, combining the benefits of FastText's versatility with the computational efficiency of GPU acceleration.

In [None]:
import git
import os
import os.path
import tarfile
import sagemaker
sess = sagemaker.Session()

region_name = boto3.Session().region_name
container = sagemaker.image_uris.retrieve("blazingtext", region=region_name)
print('Using SageMaker BlazingText container: {} ({})'.format(container, region_name))

!wget -O model.bin https://dl.fbaipublicfiles.com/fasttext/supervised-models/lid.176.bin
# This creates a tar file fromt he model and uploads it into our S3 bucket we created in Step 3. to store models
!tar -czvf langid.tar.gz model.bin
model_location = sess.upload_data("langid.tar.gz", bucket=s3BucketName, key_prefix='custom_inference/fasttext-language-identification')
!rm langid.tar.gz model.bin

#### Step 1. Deploy the fasttext model

deploy the model as a SageMaker endpoint.

*note: takes a few minutes

In [None]:
from sagemaker.huggingface.model import HuggingFaceModel
import sagemaker
import boto3
sess = sagemaker.Session()
from sagemaker.predictor import Predictor
sagemaker_session_bucket=None
if sagemaker_session_bucket is None and sess is not None:
    # set to default bucket if a bucket name is not given
    sagemaker_session_bucket = sess.default_bucket()

try:
    role = sagemaker.get_execution_role()
except ValueError:
    iam = boto3.client('iam')
    role = sageMakerExecutionRoleArn
    
model_location = 's3://' + s3BucketName + '/custom_inference/fasttext-language-identification/langid.tar.gz'

container = sagemaker.image_uris.retrieve("blazingtext", region=region_name)


lang_id = sagemaker.Model(model_data=model_location, image_uri=container, role=role, sagemaker_session=sess)
endpoint_name = 'fasttext3'
lang_id.deploy(initial_instance_count=1, instance_type='ml.m4.xlarge',endpoint_name=endpoint_name)

FT_predictor = Predictor(endpoint_name=endpoint_name, sagemaker_session=sess)
FT_predictor.serializer = sagemaker.serializers.JSONSerializer()
FT_predictor.deserializer = sagemaker.deserializers.JSONDeserializer()

#### Step 2. Test the fasttext endpoint

Now we will test the newly created endpoint to see it gives us accurate language identification

In [None]:
sentences = ["hi which language is this?",
             "mon nom est Pierre",
             "Dem Jungen gab ich einen Ball.",
             "আমি বাড়ি যাবো."]
payload = {"instances" : sentences}

predictions = FT_predictor.predict(payload)
print(predictions)

#### Step 3. Create the connector for the fasttext model

Now we will create the connector and model for the fasttext model through Amazon Opensearch.

In [None]:
path = '/_plugins/_ml/connectors/_create'
url = host + path
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key,
                   region, 'es', session_token=credentials.token)

payload = {
  "name": "lang identification",
  "description": "fasttext model",
  "version": 1,
  "protocol": "aws_sigv4",
  "credential": {
    "roleArn": sageMakerOpenSearchRoleArn
  },
  "parameters": {
    "region": "us-east-1",
    "service_name": "sagemaker"
  },
  "actions": [
    {
      "action_type": "predict",
      "method": "POST",
      "url": "https://runtime.sagemaker.us-east-1.amazonaws.com/endpoints/" + FT_predictor.endpoint_name + "/invocations",
      "headers": {
        "content-type": "application/json"
      },
      "request_body": "{ \"instances\": [ \"${parameters.text}\" ] }"
    }
  ]
}
headers = {"Content-Type": "application/json"}

ft_connector_response = requests.post(url, auth=awsauth, json=payload, headers=headers)
ft_connector = ft_connector_response.json()["connector_id"]
print('Connector id: ' + ft_connector)
# print(comprehend_connector_response.text)

#### Step 4. Register the fasttext model

We now register the fasttext model to the model group and the connector that we created.

In [None]:
path = '/_plugins/_ml/models/_register'
url = host + path

payload = {
    "name": "fasttext",
    "function_name": "remote",
    "description": "lang id model",
    "connector_id": ft_connector
}
headers = {"Content-Type": "application/json"}

response = requests.post(url, auth=awsauth, json=payload, headers=headers)
ft_model_id = response.json()['model_id']
print('Model id: ' + ft_model_id)

#### Step 5. Deploy the fasttext model


In [None]:
path = '/_plugins/_ml/models/'+ ft_model_id + '/_deploy'
url = host + path

headers = {"Content-Type": "application/json"}

response = requests.post(url, auth=awsauth, headers=headers)
print(response.json())

#### Step 6. Test the fasttext model through OpenSearch


In [None]:
path = '/_plugins/_ml/models/'+ ft_model_id + '/_predict'
url = host + path

headers = {"Content-Type": "application/json"}
payload = {
  "parameters": {
    "text": "It's nice to see the flowers bloom and hear the birds sing in the spring"
  }
}
response = requests.post(url, auth=awsauth, json=payload, headers=headers)
print(response.json())

#### Step 7 Create the fasttext index pipeline

Now we will create the pipeline for the index, this is how we tell OpenSearch to send the field(s) we wanted translations for to the SageMaker endpoint.

In [None]:
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key,
                   region, 'es', session_token=credentials.token)

path = '/_ingest/pipeline/ft_language_identification_pipeline'
url = host + path

headers = {"Content-Type": "application/json"}
payload = {
  "description": "ingest reviews and identify lang with the fasttext model via sagemaker endpoint",
  "processors":[
    {
      "ml_inference": {
        "model_id": ft_model_id,
        "input_map": [
            {
               "text": "text"
            }
        ],
        "output_map": [
            {
                "inference":"response[0].label"
            }
        ]
      }
    }
  ]
}
response = requests.put(url, auth=awsauth, json=payload, headers=headers)
print(response.json())

#### Step 8. Create the fasttext index & test

Next we create the index using the pipeline.  

In [None]:
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key,
                   region, 'es', session_token=credentials.token)

index_name = 'ft_lang_ident_test1'
path = '/' + index_name

headers = {"Content-Type": "application/json"}

index_settings = {
    "settings": {
        "index": {
            "default_pipeline": "ft_language_identification_pipeline"
        }
    }
}

# Send the PUT request to create the index
url = host + path
response = requests.put(url, auth=awsauth, json=index_settings, headers=headers)

# Print the response
print(response.json())

In [None]:

index_name = 'ft_lang_ident_test1'
path = '/' + index_name + '/_doc/'

headers = {"Content-Type": "application/json"}

# Define the document to index
document = {
    "text": "parlez vous francais?"
}

url = host + path
response = requests.post(url, auth=awsauth, json=document, headers=headers)


print(response.json())

doc_id = response.json()['_id']

# Retrieve the indexed document
get_path = '/' + index_name + '/_doc/' + doc_id
get_url = host + get_path
get_response = requests.get(get_url, auth=awsauth, headers=headers)

# Print the retrieved document
print(get_response.json())