# Ingest massive amounts of data to a Vector DB (Amazon OpenSearch)
**_Use of Amazon OpenSearch as a vector database for storing embeddings_**

This notebook works well with the `Data Science 2.0` kernel on a SageMaker Studio `ml.t3.medium` instance.

Here is a list of packages that are used in this notebook.
```
!pip freeze | grep -E "sagemaker|boto3"
------------------------------------------
boto3==1.28.42
sagemaker==2.188.0
sagemaker-data-insights @ https://files.pythonhosted.org/packages/70/8b/7c964508afe1dc3535422df8383c022c762c1f1254acb68b29d26b33fe30/sagemaker_data_insights-0.3.3-py3-none-any.whl
sagemaker-datawrangler @ https://files.pythonhosted.org/packages/6a/29/6d3da0518cbe72647b164bbdee23f4df3936cf5691fff9b29dc8714115ff/sagemaker_datawrangler-0.4.3-py3-none-any.whl
sagemaker-scikit-learn-extension==2.5.0
sagemaker-studio-analytics-extension==0.0.19
sagemaker-studio-image-build==0.6.0
sagemaker-studio-sparkmagic-lib==0.1.4
```

## Step 1: Setup
Install the required packages.

In [1]:
!pip install -U sagemaker --quiet
!pip install -U sagemaker-studio-image-build==0.6.0 --quiet

In [2]:
!pip freeze | grep -E "sagemaker|boto3"

boto3==1.28.42
sagemaker==2.188.0
sagemaker-data-insights @ https://files.pythonhosted.org/packages/70/8b/7c964508afe1dc3535422df8383c022c762c1f1254acb68b29d26b33fe30/sagemaker_data_insights-0.3.3-py3-none-any.whl
sagemaker-datawrangler @ https://files.pythonhosted.org/packages/6a/29/6d3da0518cbe72647b164bbdee23f4df3936cf5691fff9b29dc8714115ff/sagemaker_datawrangler-0.4.3-py3-none-any.whl
sagemaker-scikit-learn-extension==2.5.0
sagemaker-studio-analytics-extension==0.0.19
sagemaker-studio-image-build==0.6.0
sagemaker-studio-sparkmagic-lib==0.1.4


## Step 2: Download the data 

In this step we use `wget` to download prepared documents which have been crawled from the OpenSearch documentation and website.

Document

In [3]:
%%sh

mkdir -p data
cd ./data
wget https://raw.githubusercontent.com/deepset-ai/haystack-sagemaker/main/data/opensearch-documentation-2.7.json
wget https://raw.githubusercontent.com/deepset-ai/haystack-sagemaker/main/data/opensearch-website.json

In [4]:
import boto3
import sagemaker

sagemaker_session = sagemaker.session.Session()
aws_region = boto3.Session().region_name
bucket = sagemaker_session.default_bucket()

aws_region, bucket

## Step 3: Load data into OpenSearch

We now have a working script that is able to ingest data into an OpenSearch index. But for this to work for massive amounts of data we need to scale up the processing by running this code in a distributed fashion. We will do this using Sagemkaer Processing Job. This involves the following steps:

1. Create a custom container in which we will install the `langchain` and `opensearch-py` packges and then upload this container image to Amazon Elastic Container Registry (ECR).
2. Use the Sagemaker `ScriptProcessor` class to create a Sagemaker Processing job that will run on multiple nodes.
    - The data files available in S3 are automatically distributed across in the Sagemaker Processing Job instances by setting `s3_data_distribution_type='ShardedByS3Key'` as part of the `ProcessingInput` provided to the processing job.
    - Each node processes a subset of the files and this brings down the overall time required to ingest the data into Opensearch.
    - Each node also uses Python `multiprocessing` to internally also parallelize the file processing. Thus, **there are two levels of parallelization happening, one at the cluster level where individual nodes are distributing the work (files) amongst themselves and another at the node level where the files in a node are also split between multiple processes running on the node**.

### Create custom container

We will now create a container locally and push the container image to ECR. **The container creation process takes about 1 minute**.

In [5]:
DOCKER_IMAGE = "haystack-opensearch-indexing-pipeline"
DOCKER_IMAGE_TAG = "latest"

In [6]:
!cd ./container && sm-docker build . --repository {DOCKER_IMAGE}:{DOCKER_IMAGE_TAG}

### Create and run the Sagemaker Processing Job

Now we will run the Sagemaker Processing Job to ingest the data into OpenSearch.

In [7]:
import sys
import time
import logging

logger = logging.getLogger()
logging.basicConfig(format='%(asctime)s,%(module)s,%(processName)s,%(levelname)s,%(message)s', level=logging.INFO, stream=sys.stderr)

In [8]:
import json
from typing import (
    List,
    Dict
)

import boto3


def get_cfn_outputs(stack_name: str, region_name: str) -> Dict:
    cf_client = boto3.client('cloudformation', region_name=region_name)
    response = cf_client.describe_stacks(StackName=stack_name)
    outputs = response["Stacks"][0]["Outputs"]
    return {elem['OutputKey']: elem['OutputValue'] for elem in outputs}


def get_opensearch_domain_name(stack_name: str, region_name: str = 'us-east-1'):
    outputs = get_cfn_outputs(stack_name, region_name=region_name)
    return outputs.get('OpenSearchDomainName', None)

def get_opensearch_endpoint(stack_name: str, region_name: str = 'us-east-1'):
    outputs = get_cfn_outputs(stack_name, region_name=region_name)
    return outputs.get('OpenSearchDomainEndpoint', None)

def get_opensearch_client_security_group_id(stack_name: str, region_name: str = 'us-east-1'):
    outputs = get_cfn_outputs(stack_name, region_name=region_name)
    return outputs.get('OpenSearchClientSecurityGroupId', None)


def get_opensearch_subnet_ids(domain_name: str, region_name: str = 'us-east-1') -> List:
    assert domain_name
    client = boto3.client('opensearch', region_name=aws_region)
    response = client.describe_domain_config(
        DomainName=domain_name
    )
    subnet_ids = response['DomainConfig']['VPCOptions']['Options']['SubnetIds']
    return subnet_ids


def get_secret_name(stack_name: str, region_name: str = 'us-east-1'):
    outputs = get_cfn_outputs(stack_name, region_name=region_name)
    return outputs.get('MasterUserSecretId', None)


def get_secret(secret_name: str, region_name: str = 'us-east-1'):
    client = boto3.client('secretsmanager', region_name=region_name)
    get_secret_value_response = client.get_secret_value(SecretId=secret_name)
    secret = get_secret_value_response['SecretString']

    return json.loads(secret)

In [9]:
CFN_STACK_NAME = "RAGHaystackOpenSearchStack"

opensearch_domain_name = get_opensearch_domain_name(CFN_STACK_NAME, region_name=aws_region)
opensearch_domain_endpoint = get_opensearch_endpoint(CFN_STACK_NAME, region_name=aws_region)
opensearch_client_security_group_id = get_opensearch_client_security_group_id(CFN_STACK_NAME, region_name=aws_region)
opensearch_subnet_ids = get_opensearch_subnet_ids(opensearch_domain_name, region_name=aws_region)
opensearch_secret_id = get_secret_name(CFN_STACK_NAME, region_name=aws_region)

In [10]:
app_name = "haystack-rag-app"
account_id = boto3.client("sts").get_caller_identity()["Account"]
aws_role = sagemaker.get_execution_role()

In [11]:
from sagemaker.processing import (
    ProcessingInput,
    ScriptProcessor
)
from sagemaker.network import NetworkConfig

# setup the parameters for the job
base_job_name = f"{app_name}-job"
tags = [{"Key": "data", "Value": app_name}]

# use the custom container we just created
image_uri = f"{account_id}.dkr.ecr.{aws_region}.amazonaws.com/{DOCKER_IMAGE}:{DOCKER_IMAGE_TAG}"

# instance type and count determined via trial and error: how much overall processing time
# and what compute cost works best for your use-case
instance_type = "ml.c5.2xlarge"
instance_count = 1
logger.info(f"base_job_name={base_job_name}, tags={tags}, image_uri={image_uri}, instance_type={instance_type}, instance_count={instance_count}")

# setup the ScriptProcessor with the above parameters
processor = ScriptProcessor(base_job_name=base_job_name,
                            image_uri=image_uri,
                            role=aws_role,
                            instance_type=instance_type,
                            instance_count=instance_count,
                            command=["python3"],
                            network_config=NetworkConfig(
                                security_group_ids=[opensearch_client_security_group_id],
                                subnets=opensearch_subnet_ids,
                            ),
                            tags=tags)

In [12]:
# setup input from S3, note the ShardedByS3Key, this ensures that 
# each instance gets a random and equal subset of the files in S3.
inputs = [ProcessingInput(source="./data",
                          destination='/opt/ml/processing/input',
                          s3_data_distribution_type='ShardedByS3Key',
                          s3_data_type='S3Prefix')]

In [13]:
logger.info(f"creating an opensearch index with name=document")

# ready to run the processing job
st = time.time()
processor.run(code="container/load_data_into_opensearch.py",
              inputs=inputs,
              outputs=[],
              arguments=["--opensearch-endpoint", opensearch_domain_endpoint,
                         "--opensearch-secret-id", opensearch_secret_id,
                         "--aws-region", aws_region,
                         "--input-data-dir", "/opt/ml/processing/input"
])

time_taken = time.time() - st
logger.info(f"processing job completed, total time taken={time_taken}s")

In [14]:
preprocessing_job_description = processor.jobs[-1].describe()
logger.info(preprocessing_job_description)

## Cleanup

To avoid incurring future charges, delete the resources. You can do this by deleting the CloudFormation template used to create the IAM role and SageMaker notebook.

---

## Conclusion
In this notebook we were able to see how to use LLMs deployed on a SageMaker Endpoint to generate embeddings and then ingest those embeddings into OpenSearch and finally do a similarity search for user input to the documents (embeddings) stored in OpenSearch. We used langchain as an abstraction layer to talk to both the SageMaker Endpoint as well as OpenSearch.

---

## References

  * [Build production-ready generative AI applications for enterprise search using Haystack pipelines and Amazon SageMaker JumpStart with LLMs 2023-0-14)](https://aws.amazon.com/blogs/machine-learning/build-production-ready-generative-ai-applications-for-enterprise-search-using-haystack-pipelines-and-amazon-sagemaker-jumpstart-with-llms/)
    * [Haystack Retrieval-Augmented Generative QA Pipelines with SageMaker JumpStart](https://github.com/deepset-ai/haystack-sagemaker/)
  * [Using the Amazon SageMaker Studio Image Build CLI to build container images from your Studio notebooks](https://aws.amazon.com/blogs/machine-learning/using-the-amazon-sagemaker-studio-image-build-cli-to-build-container-images-from-your-studio-notebooks/)
  * [Haystack](https://docs.haystack.deepset.ai/docs) - The open source Python framework by deepset for building custom apps with large language models (LLMs).