# Ingest massive amounts of data to a Vector DB (Amazon Aurora Postgresql with pgvector)
**_Use of Amazon Aurora Postgresql as a vector database for storing embeddings_**

This notebook works well with the `Data Science 3.0` kernel on a SageMaker Studio `ml.t3.medium` instance.

Here is a list of packages that are used in this notebook.
```
!pip list | grep -E -w "sagemaker|ipython-sql|langchain|psycopg2|pgvector|numpy|sh"
----------------------------------------------------------------------------------------
ipython-sql                          0.5.0
langchain                            0.1.0
langchain-community                  0.0.20
langchain-core                       0.1.52
langchain-text-splitters             0.0.1
numpy                                1.24.3
pgvector                             0.2.0
psycopg2-binary                      2.9.6
sagemaker                            2.155.0
sagemaker-studio-image-build         0.6.0
sh                                   2.0.4
```

## Step 1: Setup
Install the required packages.

In [None]:
!pip install -U langchain==0.1.0 --quiet
!pip install -U ipython-sql==0.5.0 --quiet
!pip install -U psycopg2-binary==2.9.6 --quiet
!pip install -U pgvector==0.2.0 --quiet
!pip install -U sh==2.0.4 --quiet
!pip install -U sagemaker-studio-image-build==0.6.0 --quiet

In [None]:
!pip list | grep -E -w "sagemaker|ipython-sql|langchain|psycopg2|pgvector|numpy|sh"

## Step 2: Download the data from the web and upload to S3

In this step we use `wget` to crawl a Python documentation style website data. All files other than `html`, `txt` and `md` are removed. **This data download would take a few minutes**.

In [None]:
WEBSITE = "https://sagemaker.readthedocs.io/en/stable/"
DOMAIN = "sagemaker.readthedocs.io"
DATA_DIR = "docs"

In [None]:
!python ./scripts/get_data.py --website {WEBSITE} --domain {DOMAIN} --output-dir {DATA_DIR}

In [None]:
import boto3
import sagemaker

sagemaker_session = sagemaker.session.Session()
aws_region = boto3.Session().region_name
bucket = sagemaker_session.default_bucket()

In [None]:
CREATE_OS_INDEX_HINT_FILE = "_create_index_hint"
app_name = 'llm-app-rag'

In [None]:
# create a dummy file called _create_index to provide a hint for Postgresql index creation
# this is needed for Sagemaker Processing Job when there are multiple instance nodes
# all running the same code for data ingestion but only one node needs to create the index
!touch {DATA_DIR}/{CREATE_OS_INDEX_HINT_FILE}

# upload this data to S3, to be used when we run the Sagemaker Processing Job
!aws s3 cp --recursive {DATA_DIR}/ s3://{bucket}/{app_name}/{DOMAIN}

## Step 3: Load data into Aurora Postgresql with pgvector 

We now have a working script that is able to ingest data into an Aurora Postgresql. But for this to work for massive amounts of data we need to scale up the processing by running this code in a distributed fashion. We will do this using Sagemkaer Processing Job. This involves the following steps:

1. Create a custom container in which we will install the `langchain`, `psycopg2` and `pgvector` packges and then upload this container image to Amazon Elastic Container Registry (ECR).
2. Use the Sagemaker `ScriptProcessor` class to create a Sagemaker Processing job that will run on multiple nodes.
    - The data files available in S3 are automatically distributed across in the Sagemaker Processing Job instances by setting `s3_data_distribution_type='ShardedByS3Key'` as part of the `ProcessingInput` provided to the processing job.
    - Each node processes a subset of the files and this brings down the overall time required to ingest the data into the Aurora Postgresql.
    - Each node also uses Python `multiprocessing` to internally also parallelize the file processing. Thus, **there are two levels of parallelization happening, one at the cluster level where individual nodes are distributing the work (files) amongst themselves and another at the node level where the files in a node are also split between multiple processes running on the node**.

### Create custom container

We will now create a container locally and push the container image to ECR. **The container creation process takes about 1 minute**.

1. The container include all the Python packages we need i.e. `langchain`, `psycopg2`, `pgvector`, `sagemaker` and `beautifulsoup4`.
2. The container also includes the `credentials.py` script for retrieving credentials from Secrets Manager and `sm_helper.py` for helping to create SageMaker endpoint classes that langchain uses.

In [None]:
DOCKER_IMAGE = "load-data-pgvector-custom"
DOCKER_IMAGE_TAG = "latest"

In [None]:
!cd ./container && sm-docker build . --repository {DOCKER_IMAGE}:{DOCKER_IMAGE_TAG}

### Create and run the Sagemaker Processing Job

Now we will run the Sagemaker Processing Job to ingest the data into Aurora Postgresql.

In [None]:
import sys
import logging


logger = logging.getLogger()
logging.basicConfig(format='%(asctime)s,%(module)s,%(processName)s,%(levelname)s,%(message)s', level=logging.INFO, stream=sys.stderr)

In [None]:
import json
from typing import List
import boto3


def get_cfn_outputs(stack_name: str, region_name: str = 'us-east-1') -> List:
    cfn = boto3.client('cloudformation', region_name=region_name)
    outputs = {}
    for output in cfn.describe_stacks(StackName=stack_name)['Stacks'][0]['Outputs']:
        outputs[output['OutputKey']] = output['OutputValue']
    return outputs

def get_secret_name(stack_name: str, region_name: str = 'us-east-1'):
    cf_client = boto3.client('cloudformation', region_name=region_name)
    response = cf_client.describe_stacks(StackName=stack_name)
    outputs = response["Stacks"][0]["Outputs"]

    secrets = [e for e in outputs if e['ExportName'] == 'VectorDBSecret'][0]
    secret_name = secrets['OutputValue']
    return secret_name

def get_secret(secret_name: str, region_name: str = 'us-east-1'):
    client = boto3.client('secretsmanager', region_name=region_name)
    get_secret_value_response = client.get_secret_value(SecretId=secret_name)
    secret = get_secret_value_response['SecretString']

    return json.loads(secret)

def get_db_subnet_ids(stack_name: str, region_name: str = 'us-east-1'):
    cfn_outputs = get_cfn_outputs(stack_name, region_name)
    db_cluster_id = cfn_outputs['DBClusterId']

    rds_client = boto3.client('rds', region_name=region_name)
    db_cluster_info = rds_client.describe_db_clusters(DBClusterIdentifier=db_cluster_id)
    db_subnet_group_name = db_cluster_info['DBClusters'][0]['DBSubnetGroup']
    db_subnet_info = rds_client.describe_db_subnet_groups(DBSubnetGroupName=db_subnet_group_name)
    db_subnet_ids = [e['SubnetIdentifier'] for e in db_subnet_info['DBSubnetGroups'][0]['Subnets']]

    return db_subnet_ids

##### Create the pgvector extension on your Aurora PostgreSQL database (DB) cluster

[pgvector](https://github.com/pgvector/pgvector) is an open-source extension for PostgreSQL that adds the ability to store and search over ML-generated vector embeddings. pgvector provides different capabilities that let you identify both exact and approximate nearest neighbors. It’s designed to work seamlessly with other PostgreSQL features, including indexing and querying. Using ChatGPT and other LLM tooling often requires storing the output of these systems, i.e., vector embeddings, in a permanent storage system for retrieval at a later time.

In [None]:
%load_ext sql

In [None]:
import urllib

CFN_STACK_NAME = "RAGPgVectorStack" # name of CloudFormation stack

secret_name = get_secret_name(CFN_STACK_NAME)
secret = get_secret(secret_name)

db_username = secret['username']
db_password = urllib.parse.quote_plus(secret['password'])
db_port = secret['port']
db_host = secret['host']

driver = 'psycopg2'

connection_string = f"postgresql+{driver}://{db_username}:{db_password}@{db_host}:{db_port}/"
connection_string

In [None]:
%sql $connection_string

In [None]:
%%sql

CREATE EXTENSION IF NOT EXISTS vector;

In [None]:
%%sql

SELECT typname
FROM pg_type
WHERE typname = 'vector';

##### Load the embeddings and LLM into Aurora PostgreSQL DB cluster

In [None]:
CFN_STACK_NAME = 'EmbeddingEndpointStack'

cfn_stack_outputs = get_cfn_outputs(CFN_STACK_NAME, aws_region)
embeddings_model_endpoint_name = cfn_stack_outputs['EmbeddingEndpointName']

In [None]:
CFN_STACK_NAME = "RAGPgVectorStack"

pgvector_secret_id = get_secret_name(CFN_STACK_NAME, aws_region)
pgvector_collection_name = 'llm_rag_embeddings'

In [None]:
account_id = boto3.client("sts").get_caller_identity()["Account"]
aws_role = sagemaker_session.get_caller_identity_arn()

In [None]:
CHUNK_SIZE_FOR_DOC_SPLIT = 500
CHUNK_OVERLAP_FOR_DOC_SPLIT = 20

In [None]:
db_subnet_ids = get_db_subnet_ids('RAGPgVectorStack', aws_region)
db_client_security_group_id = get_cfn_outputs('RAGPgVectorStack', aws_region)['DBClientSecurityGroupId']
sagemaker_domain_security_group_id = get_cfn_outputs('RAGSageMakerStudioStack', aws_region)['DomainSecurityGroupId']

In [None]:
from sagemaker.network import NetworkConfig


# For more information, see https://docs.aws.amazon.com/sagemaker/latest/dg/process-vpc.html
network_config = NetworkConfig(security_group_ids=[sagemaker_domain_security_group_id,
                                                   db_client_security_group_id],
                               subnets=db_subnet_ids)

In [None]:
import time

from sagemaker.processing import (
    ProcessingInput,
    ScriptProcessor
)

# setup the parameters for the job
base_job_name = f"{app_name}-job"
tags = [{"Key": "data", "Value": "embeddings-for-llm-apps"}]

# use the custom container we just created
image_uri = f"{account_id}.dkr.ecr.{aws_region}.amazonaws.com/{DOCKER_IMAGE}:{DOCKER_IMAGE_TAG}"

# instance type and count determined via trial and error: how much overall processing time
# and what compute cost works best for your use-case
instance_type = "ml.m5.xlarge"
instance_count = 3
logger.info(f"base_job_name={base_job_name}, tags={tags}, image_uri={image_uri}, instance_type={instance_type}, instance_count={instance_count}")

# setup the ScriptProcessor with the above parameters
processor = ScriptProcessor(base_job_name=base_job_name,
                            image_uri=image_uri,
                            role=aws_role,
                            instance_type=instance_type,
                            instance_count=instance_count,
                            command=["python3"],
                            tags=tags,
                            network_config=network_config)

# setup input from S3, note the ShardedByS3Key, this ensures that
# each instance gets a random and equal subset of the files in S3.
inputs = [ProcessingInput(source=f"s3://{bucket}/{app_name}/{DOMAIN}",
                          destination='/opt/ml/processing/input_data',
                          s3_data_distribution_type='ShardedByS3Key',
                          s3_data_type='S3Prefix')]


logger.info(f"creating an pgvector collection with name={pgvector_collection_name}")

# ready to run the processing job
st = time.time()
processor.run(code="container/load_data_into_pgvector.py",
              inputs=inputs,
              outputs=[],
              arguments=["--pgvector-secretid", pgvector_secret_id,
                         "--pgvector-collection-name", pgvector_collection_name,
                         "--aws-region", aws_region,
                         "--embeddings-model-endpoint-name", embeddings_model_endpoint_name,
                         "--chunk-size-for-doc-split", str(CHUNK_SIZE_FOR_DOC_SPLIT),
                         "--chunk-overlap-for-doc-split", str(CHUNK_OVERLAP_FOR_DOC_SPLIT),
                         "--input-data-dir", "/opt/ml/processing/input_data",
                         "--create-index-hint-file", CREATE_OS_INDEX_HINT_FILE,
                         "--process-count", "2"])

time_taken = time.time() - st
logger.info(f"processing job completed, total time taken={time_taken}s")

preprocessing_job_description = processor.jobs[-1].describe()
logger.info(preprocessing_job_description)

## Step 4: Do a similarity search for user input to documents (embeddings) in Aurora Postgresql 

In [None]:
import urllib

from langchain.vectorstores import PGVector

from container.credentials import get_credentials
from container.sm_helper import create_sagemaker_embeddings_from_js_model


secret = get_credentials(pgvector_secret_id, aws_region)

db_username = secret['username']
db_password = urllib.parse.quote_plus(secret['password'])
db_port = secret['port']
db_host = secret['host']

connection_string = PGVector.connection_string_from_db_params(
    driver='psycopg2',
    user=db_username,
    password=db_password,
    host=db_host,
    port=db_port,
    database=''
)

docsearch = PGVector.from_existing_index(
                                   embedding=create_sagemaker_embeddings_from_js_model(embeddings_model_endpoint_name,
                                                                                       aws_region),
                                   collection_name=pgvector_collection_name,
                                   connection_string=connection_string)

q = "Which XGBoost versions does SageMaker support?"
docs = docsearch.similarity_search(q, k=3)
for doc in docs:
    logger.info("----------")
    logger.info(f"content=\"{doc.page_content}\",\nmetadata=\"{doc.metadata}\"")

## Cleanup

To avoid incurring future charges, delete the resources. You can do this by deleting the CloudFormation template used to create the IAM role and SageMaker notebook.

---

## Conclusion
In this notebook we were able to see how to use LLMs deployed on a SageMaker Endpoint to generate embeddings and then ingest those embeddings into Aurora Postgresql and finally do a similarity search for user input to the documents (embeddings) stored in Aurora Postgresql. We used langchain as an abstraction layer to talk to both the SageMaker Endpoint as well as Aurora Postgresql.

---

## Appendix

In [None]:
from container.sm_helper import create_sagemaker_embeddings_from_js_model

CFN_STACK_NAME = 'EmbeddingEndpointStack'
cfn_stack_outputs = get_cfn_outputs(CFN_STACK_NAME, aws_region)
embeddings_model_endpoint_name = cfn_stack_outputs['EmbeddingEndpointName']

embeddings = create_sagemaker_embeddings_from_js_model(embeddings_model_endpoint_name, aws_region)

text = "This is a sample query."
query_result = embeddings.embed_query(text)

print(query_result)
print(f"length: {len(query_result)}")

## References

  * [Leverage pgvector and Amazon Aurora PostgreSQL for Natural Language Processing, Chatbots and Sentiment Analysis](https://aws.amazon.com/blogs/database/leverage-pgvector-and-amazon-aurora-postgresql-for-natural-language-processing-chatbots-and-sentiment-analysis/)
  * [Building AI-powered search in PostgreSQL using Amazon SageMaker and pgvector](https://aws.amazon.com/blogs/database/building-ai-powered-search-in-postgresql-using-amazon-sagemaker-and-pgvector/)
  * [Using the Amazon SageMaker Studio Image Build CLI to build container images from your Studio notebooks](https://aws.amazon.com/blogs/machine-learning/using-the-amazon-sagemaker-studio-image-build-cli-to-build-container-images-from-your-studio-notebooks/)
  * [Give SageMaker Processing Jobs Access to Resources in Your Amazon VPC](https://docs.aws.amazon.com/sagemaker/latest/dg/process-vpc.html)
    * **Configure the VPC Security Group**
      * In distributed processing, you must allow communication between the different containers in the same processing job. To do that, configure a rule for your security group that allows inbound connections between members of the same security group.
  * [How can I troubleshoot the InternalServerError response on Amazon SageMaker? - AWS re:Post](https://repost.aws/knowledge-center/sagemaker-http-500-internal-server-error)
  * [LangChain](https://python.langchain.com/docs/get_started/introduction.html) - A framework for developing applications powered by language models.