# Private AI Search with LangChain and Elasticsearch

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/elastic/blog-langchain-elasticsearch/blob/main/Notebooks/Privacy_first_AI_search_using_LangChain_and_Elasticsearch.ipynb)

Motivations: 
* **freshness** - there aren't enough GPUs in the world to train large language models for every problem, data gets old very fast. Semantic search can be used to push context into LLM prompts with real time data.
* **privacy** - pushing our most private of data to the big LLMs isn't really an option when that data is private or the competitive advantage of a company, big or small. Let's use a local smaller LLM that can be deployed privately in a closed network if necessary.

First let's set up the environment


In [1]:
# !pip install --force-reinstall torch==1.12.1+cu113 --extra-index-url https://download.pytorch.org/whl/
!pip install -q langchain==0.0.230 eland elasticsearch huggingface-hub tqdm requests ipython GitPython
# !pip install -q unstructured==0.8.5 pdf2image pytesseract pypdf

In [2]:
import re
import requests
import json
from tqdm import tqdm
from langchain.document_loaders import GitLoader, UnstructuredPDFLoader, OnlinePDFLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter,Language
from langchain.text_splitter import MarkdownTextSplitter
from langchain.vectorstores import ElasticVectorSearch
from langchain.document_loaders import TextLoader
from IPython.display import display
from IPython.display import Markdown
from pathlib import Path
from elasticsearch import Elasticsearch

In [132]:
# Load repository with .md files as targets to split and store
def load_repo(remote_repo_url, local_repo_path, branch, file_filter=None):
    local_repo_exists = Path(local_repo_path).is_dir() # second and later loadings

    if local_repo_exists:
        loader = GitLoader(
            repo_path=local_repo_path,
            branch=branch,
            file_filter=file_filter
        )
    else:
        loader = GitLoader(
            clone_url=remote_repo_url,
            repo_path=local_repo_path,
            branch=branch,
            file_filter=file_filter
        )
    return loader.load() # load the required source files

# Load PDF documents
def load_pdf(file_path):
    loader = UnstructuredPDFLoader(file_path)
    return loader.load()

def load_online_pdf(file_url):
    loader = OnlinePDFLoader(file_url)
    return loader.load()

# Load single md document from drive/filesystem
def load_md(markdown_path):
    loader = TextLoader(markdown_path) # raw, do not parse md. https://github.com/langchain-ai/langchain/issues/3591
    return loader.load()

def split_docs(docs, language, chunk_size, chunk_overlap, force_category):
    text_splitter = RecursiveCharacterTextSplitter.from_language(language=language, chunk_size=chunk_size, chunk_overlap=chunk_overlap)

    all_splits=[]
    all_metadatas=[]
    for d in docs:
        doc_file = d.page_content
        metadata = d.metadata # metadata including filepath, filename, other stuffs
        splits = text_splitter.split_text(doc_file) # parse the document to small chunk of code snippets

        metadata['category'] = force_category

        # this is only for governance type questions
        if metadata['source'].split('.')[-1] == 'md':
          title = f"# {metadata['file_name'].split('.')[0]}"
          for i in range(len(splits)):
            splits[i] = f"{title}\n\n{splits[i]}" # string is immutable, must write explitly


        metadatas = [metadata for _ in splits]
        all_splits += splits
        all_metadatas += metadatas # collecting metadata headers and the splited chunks from each document.

    return {
        'all_splits': all_splits,
        'all_metadatas': all_metadatas # pack as dict/json
    }

def split_pdf(docs, chunk_size, chunk_overlap, force_category):
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) # no language as pdf is parsed as plain txt

    all_splits=[]
    all_metadatas=[]
    for d in docs:
        doc_file=d.page_content
        metadata = d.metadata # metadata including filepath, filename, other stuffs
        splits = text_splitter.split_text(doc_file) # parse the document to small chunk of code snippets

        metadata['category'] = force_category

        metadatas = [metadata for _ in splits]
        all_splits += splits
        all_metadatas += metadatas # collecting metadata headers and the splited chunks from each document.

    return {
        'all_splits': all_splits,
        'all_metadatas': all_metadatas # pack as dict/json
    }


def split_md(docs, chunk_size, chunk_overlap, force_category):
    text_splitter = MarkdownTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)

    all_splits=[]
    all_metadatas=[]
    for d in docs:
        doc_file = d.page_content
        metadata = d.metadata # metadata including filepath, filename, other stuffs
        splits = text_splitter.split_text(doc_file) # parse the document to small chunk of code snippets

        metadata['category'] = force_category

        metadatas = [metadata for _ in splits]
        all_splits += splits
        all_metadatas += metadatas # collecting metadata headers and the splited chunks from each document.

    return {
        'all_splits': all_splits,
        'all_metadatas': all_metadatas # pack as dict/json
    }

In [133]:
# documentation repo
remote_repo_docsV2_url="https://github.com/yieldprotocol/docs-v2" # the docs all coming from the github repo (public)
local_repo_docsV2_path="/tmp/yield_docs_v2_repo"

# cookbook repo
remote_repo_addendum_url="https://github.com/yieldprotocol/addendum-docs"
local_repo_addendum_path="/tmp/yield_addendum-docs"

# governance repo, the documents under this repo is kinda problematic, does not have a lot of content, most are link to external files
# thinking of whether to load the files affected by governance-v2 as well?
remote_repo_governance_url = "https://github.com/yieldprotocol/governance-v2"
local_repo_governance_path = "/tmp/governance-v2"

branch="main" # specify the branch of the remote repo urls
file_filter=lambda file_path: file_path.endswith(".md") # select only .md files as 'documentations'

chunk_size_chars = 1000 # context length required, in SFT chunk it to 1000 at evaluation
chunk_overlap_chars = 0 # no overlapping since containing code chunk

documentation_docs = load_repo(remote_repo_docsV2_url, local_repo_docsV2_path, branch, file_filter)
addendum_docs = load_repo(remote_repo_addendum_url, local_repo_addendum_path, branch, file_filter)
governance_docs = load_repo(remote_repo_governance_url, local_repo_governance_path, branch, file_filter)

documentation_splits = split_docs(documentation_docs, Language.MARKDOWN, chunk_size_chars, chunk_overlap_chars, force_category='general')
addendum_splits = split_docs(addendum_docs, Language.MARKDOWN, chunk_size_chars, chunk_overlap_chars, force_category='technical')
governance_splits = split_docs(governance_docs, Language.MARKDOWN, chunk_size_chars, chunk_overlap_chars, force_category='governance')

# trim down first and last file of gov_splits, as there are placeholders
governance_splits['all_splits'] = governance_splits['all_splits'][1:]
governance_splits['all_metadatas'] = governance_splits['all_metadatas'][1:]

In [None]:
import nltk
nltk.download('punkt')
nltk.download('averaged_perceptron_tagger')

whitepaper_data = load_online_pdf(file_url="https://yieldprotocol.com/Yield.pdf")
yieldspace_data = load_md(markdown_path="./YieldSpace.md")

# sanitize the source
whitepaper_data[0].metadata['source'] = "https://yieldprotocol.com/Yield.pdf"
yieldspace_data[0].metadata['source'] = "https://yieldprotocol.com/YieldSpace.pdf"

whitepaper_splits = split_pdf(whitepaper_data, 1000, 200, force_category="paper")
yieldspace_splits = split_md(yieldspace_data, 1000, 200, force_category="paper")

In [None]:
# sanity check
for i, split in enumerate(yieldspace_splits['all_splits']):
  display(Markdown(f"# Chunk {i+1}"))
  display(Markdown(split))

# Using LangChain to generate vectors and store in Elasticsearch

First we'll create the embeddings model

In [3]:
from langchain.embeddings import HuggingFaceEmbeddings

def setup_embeddings(model_name):
    # Huggingface embedding setup
    print(">> Prep. Huggingface embedding setup")
    return HuggingFaceEmbeddings(model_name=model_name)

# load the model locally just to chunk data to vector database @ esearch, not a deployment practice
emb_model = "BAAI/bge-base-en"
hf = setup_embeddings(emb_model)

>> Prep. Huggingface embedding setup


# Elastic Cloud and Connection Details

While you can definitely pull this colab down to run lcoally as a python notebook, the simplest way to get this working is to create an Elastic cluster over at  https://cloud.elastic.co/ . Make sure to configure an ML node. You won't need more than the default spec for this project.  Once that cluster is up and running grab your connecting info and edit the below before running it.

In [6]:
from langchain.vectorstores import ElasticKnnSearch

# Now we'll load these into the python environment
es_cloud_id = "Yield_RAG:dXMtZWFzdC0xLmF3cy5mb3VuZC5pbzo0NDMkZTY3NmJjYmFiYzYxNGE2MWIwMzhhOTFhMDJiMmM2ZGMkN2U1OWNlMzE4NTMyNGRjODlmMWU5ZmYyMjdkNDlkYmQ="
endpoint = "yield-rag-b0f384.kb.us-east-1.aws.found.io:9243/"
es_user = "elastic"
es_pass = "e3a5V37UP1mESMebLhKHPNH9"
es_url =  f"https://{es_user}:{es_pass}@{endpoint}:443"

index_name = "search-yield-data"
es = Elasticsearch(cloud_id=es_cloud_id,
                   basic_auth=(es_user, es_pass)
                   )
print(es.info()) # should return cluster info
# db = ElasticKnnSearch(embedding=hf, es_connection=es, es_cloud_id=es_cloud_id, es_user=es_user, es_password=es_pass, index_name=index_name)

{'name': 'instance-0000000001', 'cluster_name': 'e676bcbabc614a61b038a91a02b2c6dc', 'cluster_uuid': 'AlptJ-AASPGPzHy33ahRMQ', 'version': {'number': '8.9.0', 'build_flavor': 'default', 'build_type': 'docker', 'build_hash': '8aa461beb06aa0417a231c345a1b8c38fb498a0d', 'build_date': '2023-07-19T14:43:58.555259655Z', 'build_snapshot': False, 'lucene_version': '9.7.0', 'minimum_wire_compatibility_version': '7.17.0', 'minimum_index_compatibility_version': '7.0.0'}, 'tagline': 'You Know, for Search'}


Next we'll create our elasticsearch vectorstore in the langchain style:

Here goes the load. I like how small the code is, but eventually I'd love to see more flexibility on how we model the data as I'd like to do more hybrid search techniques.

In [138]:
# only load them once with the respective ESearch index, otherwise creating duplicates
# documentation_splits_ids = db.add_texts(documentation_splits['all_splits'], documentation_splits['all_metadatas'])
# addendum_splits_ids = db.add_texts(addendum_splits['all_splits'], addendum_splits['all_metadatas'])
# governance_splits_ids = db.add_texts(governance_splits['all_splits'], governance_splits['all_metadatas'])
# whitepaper_splits_ids = db.add_texts(whitepaper_splits['all_splits'], whitepaper_splits['all_metadatas'])
# yieldspace_splits_ids = db.add_texts(yieldspace_splits['all_splits'], yieldspace_splits['all_metadatas'])

Now we create a prompt chain that gets the most relevant passage from Elasticsearch using a vector search, and then uses that knowledge in the prompt to the LLM.

In [5]:
!pip install "transformers==4.31.0" "datasets[s3]==2.13.0" sagemaker --upgrade --quiet
!pip install "sagemaker>=2.163.0" --upgrade --quiet

In [6]:
!huggingface-cli login --token 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

Token will not been saved to git credential helper. Pass `add_to_git_credential=True` if you want to set the git credential as well.
Token is valid (permission: write).
Your token has been saved to /home/ec2-user/.cache/huggingface/token
Login successful


In [141]:
import sagemaker
import boto3
sess = sagemaker.Session()
# sagemaker session bucket -> used for uploading data, models and logs
# sagemaker will automatically create this bucket if it not exists
sagemaker_session_bucket=None
if sagemaker_session_bucket is None and sess is not None:
    # set to default bucket if a bucket name is not given
    sagemaker_session_bucket = sess.default_bucket()

try:
    role = sagemaker.get_execution_role()
except ValueError:
    iam = boto3.client('iam')
    role = iam.get_role(RoleName='sagemaker_execution_role')['Role']['Arn']

sess = sagemaker.Session(default_bucket=sagemaker_session_bucket)

print(f"sagemaker role arn: {role}")
print(f"sagemaker bucket: {sess.default_bucket()}")
print(f"sagemaker session region: {sess.boto_region_name}")

sagemaker role arn: arn:aws:iam::116961472995:role/service-role/AmazonSageMaker-ExecutionRole-20230804T154387
sagemaker bucket: sagemaker-us-east-1-116961472995
sagemaker session region: us-east-1


In [142]:
from sagemaker.huggingface import get_huggingface_llm_image_uri

# retrieve the llm image uri
llm_image = get_huggingface_llm_image_uri(
  "huggingface",
  version="0.9"
)

# print ecr image uri
print(f"llm image uri: {llm_image}")


import json
from sagemaker.huggingface import HuggingFaceModel

# sagemaker config
# hmmm why would the deployment cluster bigger than finetuning cluster in this case????
instance_type = "ml.g5.12xlarge"
number_of_gpu = 4
health_check_timeout = 300 

# Define Model and Endpoint configuration parameter
config = {
  'HF_MODEL_ID': "/opt/ml/model",
  'SM_NUM_GPUS': json.dumps(number_of_gpu), # Number of GPU used per replica
  'MAX_INPUT_LENGTH': json.dumps(3072),  # Max length of input text
  'MAX_TOTAL_TOKENS': json.dumps(4096),  # Max length of the generation (including input text)
  # 'HF_MODEL_QUANTIZE': "bitsandbytes", # comment in to quantize
}

model_path = "s3://sagemaker-us-east-1-116961472995/huggingface-qlora-2023-08-10-00-53-52-2023-08-10-00-53-52-626/output/model.tar.gz"
# create HuggingFaceModel with the image uri
llm_model = HuggingFaceModel(
  model_data=model_path,  # Change to your model path
  role=role,
  image_uri=llm_image,
  env=config
)

llm image uri: 763104351884.dkr.ecr.us-east-1.amazonaws.com/huggingface-pytorch-tgi-inference:2.0.1-tgi0.9.3-gpu-py39-cu118-ubuntu20.04


In [144]:
llm = llm_model.deploy(
  initial_instance_count=1,
  instance_type=instance_type,
  # volume_size=40, # If using an instance with local SSD storage, volume_size must be None, e.g. p4 but not p3
  container_startup_health_check_timeout=health_check_timeout, # 10 minutes to be able to load the model
) # ETA: 5 - 6 minutes to deploy

-----------!

In [4]:
from datasets import load_dataset
from random import randrange

# Load dataset from the hub
data_files = {"train": "qa_pairs_train.jsonl", "test":"qa_pairs_test.jsonl"}
dataset = load_dataset("YieldInc/chatbot_qa_dataset_splitted", data_files=data_files)['test']

print(f"dataset size: {len(dataset)}")

Found cached dataset json (/home/ec2-user/.cache/huggingface/datasets/YieldInc___json/YieldInc--chatbot_qa_dataset_splitted-aba7a1aa340b6c02/0.0.0/e347ab1c932092252e717ff3f949105a4dd28b27e842dd53157d2f72e276c2e4)


  0%|          | 0/2 [00:00<?, ?it/s]

dataset size: 389


In [3]:
model_id = "baai__bge-base-en"

def search_question(query_text):
    # Elasticsearch query (BM25) and kNN configuration for hybrid search
    print("Query text is", query_text)
    query = {
        "bool": {
            "must": [{
                "match": {
                    "text": {
                        "query": query_text,
                        "boost": 15
                    }
                }
            }]
        }
    }

    knn = {
        "field": "title-vector",
        "k": 5,
        "num_candidates": 20,
        "query_vector_builder": {
            "text_embedding": {
                "model_id": model_id,
                "model_text": query_text
            }
        },
        "boost": 5
    } # boost is defining relative importance of keyword search and kNN vector retrieval search.

    fields = ["text"] # define the field in document collection to match against query.
    index = index_name
    resp = es.search(index=index,
                     query=query,
                     knn=knn,
                     fields=fields,
                     size=5,
                     source=True)
    
    # print("Query is", query)
    # print("Response is",resp)
    body = [resp['hits']['hits'][i]['fields']['text'][0] for i in range(len(resp['hits']['hits']))] # return all relevant docs here
    return body

def format_yield_validate(sample, relevant_documents):
    instruction = f"%%% Instruction\n{sample['question']}"
    docs = '\n'.join(relevant_documents)
    context = f"%%% Context\n{docs}"
    response = f"%%% Answer"
    # join all the parts together
    prompt = "\n\n".join([i for i in [instruction, context, response] if i is not None])
    return prompt

In [12]:
sample = dataset[randrange(len(dataset))]
relevant_docs = search_question(sample['question'])
prompt = format_yield_validate(sample, relevant_docs)
print(prompt)

# hyperparameters for llm
payload = {
  "inputs":  prompt,
  "parameters": {
    "do_sample": True,
    "top_p": 0.9,
    "temperature": 0.1,
    "top_k": 50,
    "max_new_tokens": 1024,
    "repetition_penalty": 1.03,
    "stop": ["</s>"]
  }
}

output = llm.predict(payload)[0]['generated_text']
print(f"GENERATED ANSWER: {output}\n\n")
print(f"REFERENCE ANSWER: {sample['answer']}\n\n")

Query text is I am a user who wants to withdraw Ether collateral from my vault. How does the Yield protocol ensure that the Ether I withdraw is securely held until the end of the transaction?
%%% Instruction
I am a user who wants to withdraw Ether collateral from my vault. How does the Yield protocol ensure that the Ether I withdraw is securely held until the end of the transaction?

%%% Context
# COOKBOOK_VARIABLE

### Withdraw Ether collateral

This batch removes an amount of Ether collateral from a vault. Destroying the vault at the end is optional and possible only if the vault holds no collateral and no debt.

The Ether withdrawn will be temporarily held by the Ladle until the end of the transaction.

```
  await ladle.batch([
    ladle.pourAction(vaultId, ladle, withdrawn.mul(-1), 0),
    ladle.unwrapEtherAction(receiver),
    ladle.destroy(vaultId),
  ])
# COOKBOOK

### Withdraw Ether collateral

This batch removes an amount of Ether collateral from a vault. Destroying the vault

NameError: name 'llm' is not defined

In [127]:
# configure the endpoint and deployment to AWS Lambda + API Gateway
ENDPOINT = "huggingface-pytorch-tgi-inference-2023-08-12-21-55-29-603" # the newest model version
runtime = boto3.client("runtime.sagemaker")
response = runtime.invoke_endpoint(EndpointName=ENDPOINT, ContentType="application/json", Body=json.dumps(payload))
print(response)

{'ResponseMetadata': {'RequestId': '7d06c634-0840-4c05-919f-770d9b929df8', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '7d06c634-0840-4c05-919f-770d9b929df8', 'x-amzn-invoked-production-variant': 'AllTraffic', 'date': 'Sat, 12 Aug 2023 22:22:03 GMT', 'content-type': 'application/json', 'content-length': '1271', 'connection': 'keep-alive'}, 'RetryAttempts': 0}, 'ContentType': 'application/json', 'InvokedProductionVariant': 'AllTraffic', 'Body': <botocore.response.StreamingBody object at 0x7f3eda929120>}


In [128]:
prediction = json.loads(response['Body'].read().decode('utf-8'))
prediction[0]['generated_text']

{'generated_text': '\nWhen Bob mints 10 new liquidity tokens (10% of the total supply), the total supply and the virtual fyDai reserves increase. The total supply increases by 10 (the number of new liquidity tokens minted) and the virtual fyDai reserves increase proportionally to the total supply. This is because the total supply of liquidity tokens, denoted as $s$, is used as the virtual fyDai reserves. \n\nAs stated in the document, "With this optimization, the contract has zero inaccessible fyDai upon initialization. However, as soon as there is a trade, the fees increase the value of liquidity shares. As the pool accumulates trading fees and the value of its fyDai increases due to interest, the invariant above increases, meaning that an increasing share of fyDai becomes inaccessible." \n\nSo, whenever a trade occurs, the virtual fyDai reserves are added to the actual fyDai reserves to determine the reserves used to calculate the outcome of the trade. But whenever liquidity tokens a

In [14]:
# directly test it on the Lambda API.
import urllib.parse
from urllib.parse import quote

url = "https://z3iarvxmx2j35ar4yll62dtplm0twevi.lambda-url.us-east-1.on.aws/"
sample = dataset[randrange(len(dataset))]
relevant_docs = search_question(sample['question'])
prompt = format_yield_validate(sample, relevant_docs)
print(prompt)

# URL-encode the query to the actual format, around 3k tokens max, could be errorneous
encoded_query = urllib.parse.quote(prompt)
full_url = f"{url}?query={encoded_query}"
print(full_url)

# Send GET request to your Lambda function
response = requests.get(full_url)

# Check if the request was successful
if response.status_code == 200:
    # Parse the response
    answer = json.loads(response.text)

    # Send the answer to the Discord channel
    print(f"Answer:{answer}\n")
    print(f"Reference Answer:{sample['answer']}\n")
else:
    print(f"{response.status_code} An error occurred while processing your request.")

Query text is I am a user who has experienced a loss of funds due to an issue in the Yield Protocol. How can I be assured that the immediate corrective actions taken by the team are effective and will stop the immediate risk?
%%% Instruction
I am a user who has experienced a loss of funds due to an issue in the Yield Protocol. How can I be assured that the immediate corrective actions taken by the team are effective and will stop the immediate risk?

%%% Context
# emergency_procedure

- Is there agreement in the team that the situation is under control and that the War Room can be closed?
4. Once the issue has been confirmed as valid, the next stop is to take immediate corrective action to prevent further loss of funds. If root cause requires further research, the team must err on the side of caution and take emergency preventive actions while the situation continues to be assessed. A few questions to guide the decisions of the team:
   - Disable deposits? Should features be removed fr

In [129]:
# clean up endpoints when not need
llm.delete_model()
llm.delete_endpoint()