# RAG using OpenSearch Flow Framework


OpenSearch provides strong ML capabilities as building blocks to create powerful use cases. Some of the ML features such as Neural semantic search, hybrid search or multi modal search or other ML inference capabilities are usually put together in a complex workflow to retrieve final search results. Flow framework plugin in OpenSearch is designed to accomplish complex workflow create with a single API call. It offers out of the box common use case templates speeds up developing complex search and Generative AI workflows simple. 

For e.g. there is a use case called conversational search that allows user to ask question from their knowledge base much like what we built in previous labs, however, instead going through multiple steps of registering multiple models for embedding and generation, we simply use one API call and provide initialisation parameter that deploys an embedding connector to create vector, creates an index that will be used as knowledgebase, configures ingest pipeline and finally registers an LLM model to be used to answer user's conversational search questions. All with single API call.

Let us go through some basic setup required for the lab.


## 1. Lab Pre-requisites

#### a. Download and install python dependencies

For this notebook we require a few libraries. We'll use the Python clients for OpenSearch and SageMaker, and OpenSearch ML Client library for generating text embeddings.

In [None]:
!pip install opensearch-py-ml accelerate tqdm --quiet
!pip install sagemaker --upgrade --quiet
!pip install requests_aws4auth --quiet
!pip install alive-progress --quiet
!pip install deprecated --quiet

#OpenSearch Python SDK
!pip install opensearch_py  --quiet
#Progress bar for for loop
!pip install alive-progress  --quiet

# Let's import PyTorch and confirm that the latest version of PyTorch is running. 
# The version should already be 1.13.1 or higher. If not, we will restart the kernel.

import torch
pytorch_version = torch.__version__
print( f"Pytorch version: {pytorch_version}")

def restartkernel() :
    display_html("<script>Jupyter.notebook.kernel.restart()</script>",raw=True)
    
if pytorch_version.startswith('1.1'):
    from IPython.display import display_html
    restartkernel()

#### b. Import libraries & initialize resource information
The line below will import all the relevant libraries and modules used in this notebook.

In [None]:
import boto3
import os
import time
import json
import pandas as pd
from tqdm import tqdm
import sagemaker
from opensearchpy import OpenSearch, RequestsHttpConnection
from sagemaker import get_execution_role
import random 
import string
import s3fs
from urllib.parse import urlparse
from IPython.display import display, HTML
from alive_progress import alive_bar
from opensearch_py_ml.ml_commons import MLCommonClient
from requests_aws4auth import AWS4Auth
import requests 

#### c. Get CloudFormation stack output variables

We have preconfigured a few resources by creating a CloudFormation stack in the account. Names and ARN of these resources will be used within this lab. We are going to load some of the information variables here.

In [None]:
# Create a Boto3 session
session = boto3.Session()

# Get the account id
account_id = boto3.client('sts').get_caller_identity().get('Account')

# Get the current region
region = session.region_name

cfn = boto3.client('cloudformation')

# Method to obtain output variables from Cloudformation stack. 
def get_cfn_outputs(stackname):
    outputs = {}
    for output in cfn.describe_stacks(StackName=stackname)['Stacks'][0]['Outputs']:
        outputs[output['OutputKey']] = output['OutputValue']
    return outputs

## Setup variables to use for the rest of the demo
cloudformation_stack_name = "advanced-rag-opensearch"

outputs = get_cfn_outputs(cloudformation_stack_name)
aos_host = outputs['OpenSearchDomainEndpoint']
s3_bucket = outputs['s3BucketTraining']
bedrock_inf_iam_role = outputs['BedrockBatchInferenceRole']
bedrock_inf_iam_role_arn = outputs['BedrockBatchInferenceRoleArn']
sagemaker_notebook_url = outputs['SageMakerNotebookURL']

# We will just print all the variables so you can easily copy if needed.
outputs

## 3. Prepare data
Below is the code that loads dataset of wine reviews, we'll use this data set to recommend wines that resemble the user provided description.

#### Sampling subset of the records to load into opensearch quickly
Since the data is composed of 129,000 records, it could take some time to convert them into vectors and load them in a vector store. Therefore, we will take a subset (300 records) of our data. We will add a variable called record_id which corresponds to the index of the record

In [None]:
url = "https://raw.githubusercontent.com/davestroud/Wine/master/winemag-data-130k-v2.json"
df = pd.read_json(url)
df_sample = df.sample(300,random_state=37).reset_index()
df_sample['record_id'] = range(1, len(df_sample) + 1)
df_sample[:5]

## 3. Create a connection with Amazon OpenSearch Service domain.
Next, we'll use Python API to set up connection with OpenSearch domain.

#### Important pre-requisite
You should have followed the steps in the Lab instruction section to map Sagemaker notebook role to OpenSearch `ml_full_access`, `flow_framework_full_access` and `all_access` roles. If not, please visit the lab instructions and complete the **Setting up permission for IAM Roles in OpenSearch** section.

#### Retrieving credentials from Secrets manager
We are going to use Amazon Sagemaker Notebook IAM role to configure the workflows in OpenSearch. This IAM Role has permission to pass BedrockInference IAM role to OpenSearch. OpenSearch will then be able to use BedrockInference IAM role to make calls to Bedrock models.

##### NOTE: 
_At any point in this lab, if you get a failure message - **The security token included in the request is expired.**_ You can resolve it by running this cell again. The cell refreshes the security credentials that is required for the rest of the lab.

In [None]:
kms = boto3.client('secretsmanager')
aos_credentials = json.loads(kms.get_secret_value(SecretId=outputs['OpenSearchSecret'])['SecretString'])

service = 'es'

# Retrieves credential from current NoteBook session which is running as NBRole IAM role.
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)

aos_client = OpenSearch(
    hosts = [{'host': aos_host, 'port': 443}],
    http_auth = awsauth,
    use_ssl = True,
    verify_certs = True,
    connection_class = RequestsHttpConnection
)
ml_client = MLCommonClient(aos_client)
ml_client



## 4. Single API call Amazon Titan Text embedding v2 connector deployment

Amazon OpenSearch Service has launched flow framework starting from v2.13. This framework offers few out of the box workflow templates. For more information read [here](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/ml-workflow-framework.html).

You will be using one of the out of the box templates to deploy Amazon Bedrock connector with single API call. The template is called - `bedrock_titan_embedding_model_deploy`. Once deployed, we can use ML common framework predict API to invoke the model.


In [None]:
#Lets create path to the Workflow provisioning API
use_case_template_name = 'bedrock_titan_embedding_model_deploy'
path = f'_plugins/_flow_framework/workflow?use_case={use_case_template_name}&provision=true'
url = f'https://{aos_host}/{path}'
titan_v2_embedding_model_id = "amazon.titan-embed-text-v2:0"
headers = {"Content-Type": "application/json"}

print(url)

# Initializing the flow workflow template parameters that we referenced in the template at the time of 
# creating the template

default_params = {
    "template.name": "bedrock_titan_embedding_model_deploy",
    "create_connector.region": f"{region}",
    "create_connector.credential.roleArn": f"{bedrock_inf_iam_role_arn}",
    "create_connector.actions.url": f"https://bedrock-runtime.{region}.amazonaws.com/model/{titan_v2_embedding_model_id}/invoke",
}

# Calling API for provisioning workflow.
r = requests.post(url, auth=awsauth, json=default_params, headers=headers)

#print status of the API call.
print(f"Status: {r.status_code}. Response:{r.text}")


# if status is success then obtain the workflow id for provisioning in the next step
if r.status_code == 200 or r.status_code == 201 :
    print(r.text)
    workflow_id = json.loads(r.text)["workflow_id"]
else: 
    print(f"failed {r.text}")


#### Check the status of provisioning
Following cell returns the status of the provisioning request and also captures the embedding model id in a variable which we can use to genereate the text embedding

In [None]:
# Put together URL to _status API of flow framework
path = f'_plugins/_flow_framework/workflow/{workflow_id}/_status'
url = f'https://{aos_host}/{path}'
print(url)


# We will capture various model ids so that we can test them individually if we wanted to.
embedding_model_id=""

# Calling the _status API
r = requests.get(url, auth=awsauth, json=default_params, headers=headers)

print(f"Status: {r.status_code}.")

# if _status API call returns successfully, we should extract the data we need.
if r.status_code == 200: 
    status = json.loads(r.text)["state"]
    if status == "COMPLETED":
        response_json = json.loads(r.text)
        print(json.dumps( response_json , indent=4))
        for resources in response_json["resources_created"]:
            if resources["workflow_step_id"] == "register_model":
                embedding_model_id=resources["resource_id"]    
        print(f"Embedding model successfully deployed. ID: {embedding_model_id}")    
    else: 
        print(f"Failed or Pending Response. Please see the details below \n{r.text}")


#### Troubleshooting
If the status of the workflow is FAILED then you will need to deprovision the workflow. You can use the last 2 cells of this Notebook to deprovision and delete the workflow and retry deploying model from step 4. 

#### Let's test the deployed Amazon Titan Text embedding model connector
We will generate text embedding and print first 5 dimensions out of 1024 dimensions

In [None]:
# You can test the embedding generation here.
payload = {
    "parameters":
    { 
        "inputText": "Best wine that goes with beef"
    } 
}

path = '_plugins/_ml/models/'+embedding_model_id+'/_predict'
url = 'https://'+aos_host + '/' + path
r = requests.post(url, auth=awsauth, json=payload , headers=headers )

if r.status_code == 200:
    embedding = json.loads(r.text)['inference_results'][0]['output'][0]['data']
    print(embedding[:5])
else:
    print(f"Error: {r.text}")
    print(f"You may need to re-run previous cell until it says Embedding model successfully deployed ")

### What did we do?
We deployed a Amazon Titan Text embedding v2 model with just one API call by providing a template name `bedrock_titan_embedding_model_deploy` and supplying only the parameters we would like to override. In our case we overrode the default parameter value `Titan G1 model` with latest (to date: July 2024) `Amazon titan v2 model`.

## 5. Flow framework workflows
While above was a very minimal example of a single API call deployment. Following is a more elaborate example of RAG architecture created as a workflow. OpenSearch has capability to call Amazon Bedrock service for not only embedding generation but also text generation. We can create a complete RAG architecture using flow custom templates. This custom template will achieve following.

1. Registers and deploys a connection to Anthropic Claude 3 Sonnet model throught Amazon Bedrock service.
2. Registers and deploys a connection to Amazon Bedrock Titan text embedding model for text to vector embedding generation.
3. Creates an ingestion pipeline that uses above Bedrock Titan text embedding model to convert text to vector and put it in `description_embedding` variable.
4. Creates an index and sets the above ingestion pipeline as its default ingestion pipeline.
5. Creates a [RAG Tool](https://opensearch.org/docs/latest/ml-commons-plugin/agents-tools/tools/rag-tool/) that is a OpenSearch ML common plugin feature. It helps create RAG architecture. It takes as input user's question, it runs a semantic search, and passes the output of the semantic search result to LLM like Anthropic claude to generate answer. Much like what we did with our code in previous labs.
6. Last step, the workflow creates a Root agent that launches the above RAG tool to answer user's question.

First we will define a workflow and in the second step we will provision the workflow.


### Define a custom workflow
We are going to define a complete RAG workflow. Each node in the workflow represents steps taken to achieve the above defined sequence.

The RAG we are creating is exactly the same as we did in the previous Lab. It is a Wine sommelier bot that helps answer user's questions about wine from a select list of wines.

In [None]:
#Create a path to flow framework API
path = '_plugins/_flow_framework/workflow?'
url = f'https://{aos_host}/{path}'

# Most API calls we will use now works using application/json content type.
headers = {"Content-Type": "application/json"}


#initialise variables to hold connector and model id.
connector_id = ""
model_id = ""
workflow_id = ""

payload = {
    "name": "deploy-bedrock-chat-model",
    "description": "Deploys a Wine sommelier RAG chatbot",
    "use_case": "my_rag_chat",
    "version": {
        "template": "1.0.0",
        "compatibility": [
            "2.12.0",
            "3.0.0"
        ]
    },
    "workflows": {
        "provision": {
            "nodes": [
                     {
                      #this node creates a Bedrock connection with Anthropic Claude Sonnet 3 model.
                      "id": "create_bedrock_connector",
                      "type": "create_connector",
                      "user_inputs": {                          
                        "name": "Amazon Bedrock Connector: Claude Sonnet 3",
                        "version": "1",
                        "protocol": "aws_sigv4",
                        "description": "The connector to bedrock Claude Sonnet model",
                        "actions": [
                          {
                            "headers": {
                              "content-type": "application/json"
                            },
                            "method": "POST",
                            "request_body": "{\"system\": \"You are a sommelier that uses their vast knowledge of wine to make great recommendations people will enjoy.\", \"messages\":[{\"role\": \"user\", \"content\":[{\"type\":\"text\", \"text\":\"${parameters.prompt}. Customer question: ${parameters.question}\"}]}], \"max_tokens\":${parameters.max_tokens_to_sample}, \"temperature\":${parameters.temperature},  \"anthropic_version\":\"${parameters.anthropic_version}\" }",                    
                            "action_type": "predict",
                            "url": f"https://bedrock-runtime.{region}.amazonaws.com/model/anthropic.claude-3-sonnet-20240229-v1:0/invoke",
                        }
                        ],
                        "credential": {
                            "roleArn": f"{bedrock_inf_iam_role_arn}" 
                         },
                        "parameters": {
                          "endpoint": f"bedrock-runtime.{region}.amazonaws.com",
                          "content_type": "application/json",
                          "auth": "Sig_V4",
                          "max_tokens_to_sample": "8000",
                          "service_name": "bedrock",
                          "temperature": "0.0001",
                          "response_filter": "$.content[0].text",
                          "region": f"{region}",
                          "anthropic_version": "bedrock-2023-05-31"
                        }
                      }
                },
                {
                    #this node creates Amazon Bedrock titan v2 connector
                    "id": "create_embedding_connector",
                    "type": "create_connector",
                    "user_inputs": {
                        "name": "${{create_embedding_connector.name}}",
                        "description": "${{create_embedding_connector.description}}",
                        "version": "1",
                        "protocol": "aws_sigv4",
                        "credential": {
                            "roleArn" : f"{bedrock_inf_iam_role_arn}"
                        },
                        "parameters": {
                            "service_name": "bedrock",
                            "model": "amazon.titan-embed-text-v2:0",
                            "region": f"{region}",
                            "anthropic_version": "bedrock-2023-05-31",
                        },
                        "actions": [
                              {
                                "action_type": "PREDICT",
                                "method": "POST",
                                "url": f"https://bedrock-runtime.{region}.amazonaws.com/model/amazon.titan-embed-text-v2:0/invoke",
                                "headers": {
                                  "content-type": "application/json"
                                },
                                "request_body": "{ \"inputText\": \"${parameters.inputText}\" }",
                                #these are built-in function in opensearch to pre/post process titan embeddings
                                "pre_process_function": "connector.pre_process.bedrock.embedding",
                                "post_process_function": "connector.post_process.bedrock.embedding"
                              }
                        ]
                    }
                },

                {
                    #this node registers Anthropic Claude Sonnet 3 model .                    
                    "id": "register_bedrock_model",
                    "type": "register_remote_model",
                    
                    "previous_node_inputs": {
                        "create_bedrock_connector": "connector_id"
                    },
                    "user_inputs": {
                        "name": "anthropic.claude-v3",
                        "function_name": "remote",
                        "description": "bedrock-chat-model",
                        "deploy": True
                    }
                },
                {
                    #this node registers Amazon Bedrock titan v2 model .                    
                    "id": "register_bedrock_embedding_model",
                    "type": "register_remote_model",
                    "previous_node_inputs": {
                        "create_embedding_connector": "connector_id"
                    },
                    "user_inputs": {
                        "name": "Bedrock embedding model v2",
                        "description": "Bedrock embedding model v2",
                        "function_name": "remote",
                        "deploy": True
                    }
                },
                {
                    # this node creates ingest pipeline that launches bedrock embedding model
                    # for generating embedding before writing data to opensearch
                    
                    "id": "create_ingest_pipeline",
                    "type": "create_ingest_pipeline",
                    "previous_node_inputs": {
                        "register_bedrock_embedding_model": "model_id"
                    },
                    "user_inputs": {
                        "pipeline_id": "${{create_ingest_pipeline.pipeline_id}}",
                        "configurations": {
                            "description": "A neural ingest pipeline",
                            "processors": [
                                {
                                    "text_embedding": {
                                      "model_id": "${{register_bedrock_embedding_model.model_id}}",
                                      "field_map": {
                                        "${{text_embedding.field_map.input}}": "${{text_embedding.field_map.output}}"
                                      }
                                    }
                                }
                            ]
                        }
                    }
                },
                {
                    # This node would create an index to capture wine reviews and embeddings
                    "id": "create_index",
                    "type": "create_index",
                    "previous_node_inputs": {
                        "create_ingest_pipeline": "pipeline_id"
                    },
                    "user_inputs": {
                        "index_name": "wine_knowledge_base2",
                        "configurations": {
                            "settings": {
                                "index": {
                                    "default_pipeline": "${{create_ingest_pipeline.pipeline_id}}",
                                    "knn": "true"
                                }
                            },
                            "mappings": {
                                "properties": {
                                  "${{text_embedding.field_map.input}}": {
                                    "type": "text"
                                  },
                                  "${{text_embedding.field_map.output}}": {
                                    "type": "knn_vector",
                                    "method": {
                                        "engine": "lucene",
                                        "space_type": "l2",
                                        "name": "hnsw",
                                        "parameters": {}
                                    },
                                    "dimension": "1024"
                                  }
                                }
                            }
                        }
                    }
                },
                {
                    # this node would deploy a RAG tool that will launch embedding model first to 
                    # generate text embedding for user's question. Then it will search opensearch
                    # for semantically similar reviews, and then passes on the returned results with
                    # a prompt to Anthropic claude 3 model to generate an answer.
                    "id": "rag_tool",
                    "type": "create_tool",
                     "previous_node_inputs": {
                        "register_bedrock_model": "model_id",
                        "register_bedrock_embedding_model": "model_id"
                    },
                    "user_inputs": {
                        "type": "RAGTool",
                        "name": "RAGTool",
                        "parameters": {
                            "inference_model_id": "${{register_bedrock_model.model_id}}",
                            "embedding_model_id": "${{register_bedrock_embedding_model.model_id}}",
                            "index": "${{create_index.name}}",
                            "embedding_field": "${{text_embedding.field_map.output}}",
                            "source_field": "[\"${{text_embedding.field_map.input}}\",\"winery\", \"points\",\"designation\",\"country\",\"wine_name\"]",
                            "query_type": "neural",
                            "input": "${parameters.question}",
                            "prompt": "${{rag_tool.parameters.prompt}}",
                            "include_output_in_agent_response": True
                        }
                    }
                },
                {
                    # to launch RAG tool we need an Agent. This node creates the agent
                    # that is configured with RAG Tool as the tool to execute.
                    "id": "root_agent",
                    "type": "register_agent",
                    "previous_node_inputs": {
                        "rag_tool": "tools"
                    },
                    "user_inputs": {
                        "parameters": {
                            "prompt": "${{root_agent.parameters.parameters}}"
                        },
                        "app_type": "chatbot",
                        "name": "Root agent",
                        "description": "this is the root agent",
                        "tools_order": [
                            "rag_tool"
                        ],
                        "memory": {
                            "type": "conversation_index"
                        },
                        "type": "flow"
                    }
                }
            ]
        }
    }
}


r = requests.post(url, auth=awsauth, json=payload, headers=headers)

#print status code
print(f"Status code {r.status_code}")

# if status is success then obtain the workflow id for provisioning in the next step
if r.status_code == 200 or r.status_code == 201 :
    print(r.text)
    workflow_id = json.loads(r.text)["workflow_id"]
else: 
    print(f"failed {r.text}")

### Provision a workflow
Following cell will formulate a required prompt for the LLM using parameters and within the prompt it embeds the output of the semantic search results `parameters.output_field:-` to be included so LLM references the information returned in the search results rather than answering from its pre-trained knowledge.

In [None]:
#Lets create path to the Workflow provisioning API
path = f'_plugins/_flow_framework/workflow/{workflow_id}/_provision'
url = f'https://{aos_host}/{path}'
print(url)


# Let's delete if there is any existing wine_knowledge_base2 index in the system.
index_name = "wine_knowledge_base2"

try:
    aos_client.indices.delete(index=index_name)
    print(f"Index {index_name} deleted from the cluster successfully.")
except:
    print("Index '" + index_name + "' not found. Creating index on cluster.")
    
    

# Create an example for model to learn how to respond to user's question using search results.

one_shot_description_example = "_id: xB2GTpABZyO2331xxdm \n _source: {'description': 'This perfumey white dances in intense and creamy layers of stone fruit and vanilla, remaining vibrant and balanced from start to finish. The generous fruit is grown in the relatively cooler Oak Knoll section of the Napa Valley. This should develop further over time and in the glass.', 'winery': 'Darioush', 'points': 92, 'designation': None, 'country': 'US'}"
one_shot_response_example = "I have a wonderful wine for you. It's a dry, medium bodied white wine from Darioush winery in the Oak Knoll section of Napa Valley, US. It has flavors of vanilla and oak. It scored 92 points in wine spectator."

# System prompt that includes reference to parameters.output_field which indicates inclusion of 
# the search results.


user_prompt = (
    f"[Meta instructions]"
    f"You must pick one of the wine in \"Wine data\" section that matches best the user question. Wine data section is in json format."
    f"For debugging your results, we need you to output all wine data after your recommendation, in a human readable records format.\n"
    f"[Instructions]"
    "As a sommelier, you must include the wine variety, the country of origin, and a colorful description relating to the user question.  Do not suggest anything outside of the wine data provided. You don't necessarily have to pick the top rated wine if its not best matching user question, but you have to select from within wine data only. If wine data was empty, respond by saying sorry i do not have enough information\n"
    f"Example Wine data: {one_shot_description_example} \n "
    f"Example recommendation: {one_shot_response_example} \n"
    "<Wine data>: ${parameters.output_field:-} \n"
)

# Initializing the flow workflow template parameters that we referenced in the template at the time of 
# creating the template

default_params = {
    "create_ingest_pipeline.pipeline_id": "wine-ingest-pipeline2",
    "text_embedding.field_map.input": "description",
    "text_embedding.field_map.output": "description_embedding",
    "create_index.name": "wine_knowledge_base2",
    "rag_tool.parameters.prompt": user_prompt,
    "root_agent.parameters.parameters": "root_agent.parameters.parameters",
}

# Printing the defaul parameters 
print(json.dumps(default_params, indent=4))

# Calling API for provisioning workflow.
r = requests.post(url, auth=awsauth, json=default_params, headers=headers)

#print status of the API call.
print(f"Status: {r.status_code}. Response:{r.text}")


### Check status of the workflow provisioning
Above API call initiates the deployment. We will need to check the status if all connectors are deployed successfully using `_status` API.


In [None]:
# Put together URL to _status API of flow framework
path = f'_plugins/_flow_framework/workflow/{workflow_id}/_status'
url = f'https://{aos_host}/{path}'
print(url)

# We will capture various model ids so that we can test them individually if we wanted to.
inference_model_id=""
embedding_model_id=""
agent_id=""
index_name = ""

# Calling the _status API
r = requests.get(url, auth=awsauth, json=default_params, headers=headers)

print(f"Status: {r.status_code}.")

# if _status API call returns successfully, we should extract the data we need.
if r.status_code == 200: 
    status = json.loads(r.text)["state"]
    if status == "COMPLETED":
        response_json = json.loads(r.text)
        #print(json.dumps( response_json , indent=4))
        for resources in response_json["resources_created"]:
            if resources["workflow_step_id"] == "register_bedrock_model":
                inference_model_id=resources["resource_id"]
            elif resources["workflow_step_id"] == "register_bedrock_embedding_model":
                embedding_model_id=resources["resource_id"]    
            elif resources["workflow_step_id"] == "root_agent":
                agent_id=resources["resource_id"]  
            elif resources["workflow_step_id"] == "create_index":
                index_name=resources["resource_id"]          
        print(f"inference model: {inference_model_id}")
        print(f"Embedding model: {embedding_model_id}")    
        print(f"Agent id: {agent_id}")    
        print(f"Index name: {index_name}")    
    else: 
        print(f"Failed Response:{r.text}")

#### Troubleshooting guidance.

If above API call returns a failed status then please check the failure message. If the status of the workflow is FAILED then you will need to deprovision the workflow. You can use the last 2 cells of this Notebook to deprovision and delete the workflow and retry deploying model from step 4.

If failure is due to a pre-existing index then you may need to delete `wine_knowledge_base2` index or ingestion pipeline.

## 6. Load the data in index
Now that we have setup the index and ingestion pipeline together with models. We are ready to ingest the data. The ingestion will automatically use `wine-ingest-pipeline2` and load data in `wine_knowledge_base2`

In [None]:
cnt = 0
batch = 0
action = json.dumps({ "index": { "_index": index_name } })
body_ = ''


with alive_bar(len(df_sample), force_tty = True) as bar:
    for index, record in (df_sample.iterrows()):

        payload={
           "description": record["description"],
           "points":record["points"],
           "variety":record["variety"],
           "country":record["country"],
           "designation":record["designation"],
           "winery":record["winery"],
            "wine_name":record["title"],
        }
        body_ = body_ + action + "\n" + json.dumps(payload) + "\n"
        cnt = cnt+1
        
        if(cnt == 100):
            
            response = aos_client.bulk(
                                index = index_name,
                                 body = body_)
            cnt = 0
            batch = batch +1
            body_ = ''
        
        bar()
print("Total Bulk batches completed: "+str(batch))

#### Checking the number of records loaded in index

In [None]:
res = aos_client.search(index=index_name, body={"query": {"match_all": {}}})
print("Records found: %d." % res['hits']['total']['value'])

## 7. Call the agent created by flow framework.
We are going to call the Root agent that will call RAG tool to run the RAG workflow that we just created. We will get response from Anthropic claude sonnet 3 model which will reference the search results retrieved from OpenSearch to answer the question.

In [None]:
question = "Best wine that goes with turkey ?"
payload = {
    "parameters":
    { 
        "question": question
    } 
}

path = '_plugins/_ml/agents/'+agent_id+'/_execute'
url = 'https://'+aos_host + '/' + path
r = requests.post(url, auth=awsauth, json=payload , headers=headers )
if r.status_code == 200:
    print(json.loads(r.text)['inference_results'][0]['output'][0]['result'])
else:
    print(r.text)

## That's it!
You can see by deploying the template how quickly were we able to put together a complete RAG solution. 

Now let's do some validation

### Check the question with semantic retrieval 
You can run a simple semantic search on the created index to compare what records the Claude sonnet referred to answer the question. 

In [None]:
def retrieve_opensearch_with_semantic_search(phrase, n=2):
    osquery={
        "_source": {
            "exclude": [ "description_embedding" ]
        },
        
      "size": n,
      "query": {
        "neural": {
          "description_embedding": {
            "query_text": f"{phrase}",
            "model_id": f"{embedding_model_id}",
            "k": 2
          }
        }
      }    
    }

    res = aos_client.search(index=index_name, 
                           body=osquery,
                           stored_fields=["description","winery","points", "designation", "country"],
                           explain = True)
    top_result = res['hits']['hits']
    
    results = []
    
    for entry in top_result:
        result = {
            "description":entry['_source']['description'],
            "winery":entry['_source']['winery'],
            "points":entry['_source']['points'],
            "designation":entry['_source']['designation'],
            "country":entry['_source']['country'],
            "variety":entry['_source']['variety'],
            "_id":entry['_id'],
            "wine_name":entry['_source']['wine_name']
        }
        results.append(result)
    
    return results

retrieve_opensearch_with_semantic_search(question, 3)

In [None]:
# You can also test the embedding generation. This model is deployed by the 
# flow framework as part of the RAG workflow.
payload = {
    "parameters":
    { 
        "inputText": question
    } 
}

path = '_plugins/_ml/models/'+embedding_model_id+'/_predict'
url = 'https://'+aos_host + '/' + path
r = requests.post(url, auth=awsauth, json=payload , headers=headers )

if r.status_code == 200:
    embedding = json.loads(r.text)['inference_results'][0]['output'][0]['data']
    print(embedding[:5])

In [None]:
# You can also call embedding model using ML Common python client.
# Both above cell and this cell serve the same purpose.
input_sentences = [question]
embedding_output = ml_client.generate_embedding(f"{embedding_model_id}", input_sentences)
embed = embedding_output['inference_results'][0]['output'][0]['data']
print(embed[:5])

### What did we learn
We learn that we can create a complex workflow in OpenSearch and define it with JSON configuration. OpenSearch can deploy these complex workflow with different default parameters so you can create multiple RAG use cases with a single template. OpenSearch service will add more templates in future for you to develop semantic search, hybrid search and other generative AI use cases quickly.

## Deprovision a workflow

After you have tested the above with multiple different questions you may want to deprovision a workflow. When you deprovision a workflow it would undeploy all the model connections that it originally created. To be able to delete workflow you will first need to deprovision it. Following code shows how to deprovision a workflow. 

*Note that any index resource that it creates will not be deleted. Our `wine_knowledgebase2` index won't be deleted.*

In [None]:
# Create path to deprovisioning API}
path = f'_plugins/_flow_framework/workflow/{workflow_id}/_deprovision'
url = f'https://{aos_host}/{path}'
print(url)

headers = {"Content-Type": "application/json"}

r = requests.post(url, auth=awsauth, json=default_params, headers=headers)
print(r.status_code)
print(r.text)

## Delete a workflow
Deprovisioning a workflow leaves the template behind. You will need to delete the workflow for it to be removed from the template. Template does not occupy much space, however, for proper clean up you will need to delete the workflow.

In [None]:
# Create URL that we will use with DELETE HTTP method.
path = f'_plugins/_flow_framework/workflow/{workflow_id}'
url = f'https://{aos_host}/{path}'
print(url)


r = requests.delete(url, auth=awsauth, json=default_params, headers=headers)
print(r.status_code)
print(r.text)

### Lab finished
Now you can go back to Lab instructions again
