# OpenSearch + DeepSeek 实现 RAG 知识库

# 传统的 RAG 方案

![image-tradition](RAG-Tradition.png)

<br>
<br>
<br>
<br>
<br>

# OpenSearch 实现 RAG 知识库方案架构

![arch](RAG-OpenSearch.png)

# 操作步骤

1. 环境准备
2. 设置相关权限（IAM Role/IAM Policy/OpenSearch Security）
3. 在 OpenSearch 创建 ML Connector(DeepSeek Connector/Embedding Connector)
4. 在 OpenSearch 创建 Model(DeepSeek Model/Embedding Model),从 OpenSearch 测试 DeepSeek Model
5. 导入知识库
6. 在 OpenSearch 创建 Search Pipeline
7. 测试问答

# 1. 环境准备

## 1.1 前提条件

* 预先在 Sagemaker 中部署好两个模型：1. DeepSeek, 2. BGE Embedding
* 创建 OpenSearch 集群

## 1.2 初始化

In [None]:
# 安装依赖
!pip install opensearch-py
!pip install requests-aws4auth

In [None]:
import boto3
import json
from botocore.exceptions import ClientError

def get_opensearch_credentials(secret_name, region_name="cn-north-1"):
    """
    从AWS Secret Manager获取OpenSearch的凭证
    
    参数:
        secret_name (str): Secret Manager中存储凭证的密钥名称
        region_name (str): AWS区域名称，默认为us-east-1
        
    返回:
        dict: 包含用户名和密码的字典
    """
    # 创建Secrets Manager客户端
    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name
    )
    
    try:
        # 获取密钥
        get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
    except ClientError as e:
        # 处理可能的错误
        if e.response['Error']['Code'] == 'DecryptionFailureException':
            raise Exception("无法解密密钥值")
        elif e.response['Error']['Code'] == 'InternalServiceErrorException':
            raise Exception("内部服务错误")
        elif e.response['Error']['Code'] == 'InvalidParameterException':
            raise Exception("参数无效")
        elif e.response['Error']['Code'] == 'InvalidRequestException':
            raise Exception("请求无效")
        elif e.response['Error']['Code'] == 'ResourceNotFoundException':
            raise Exception(f"找不到指定的密钥: {secret_name}")
        else:
            raise e
    else:
        # 密钥可能是字符串或二进制，需要处理两种情况
        if 'SecretString' in get_secret_value_response:
            secret = get_secret_value_response['SecretString']
            return json.loads(secret)
        else:
            # 如果是二进制，需要解码
            decoded_binary_secret = base64.b64decode(get_secret_value_response['SecretBinary'])
            return json.loads(decoded_binary_secret)

## 1.3 设置环境变量

In [None]:
# 设置环境变量
import os

os.environ["ACCOUNT_ID"] = "245788323638"

# 在 Sagemaker 中部署的 DeepSeek 模型名称
os.environ["LLM_MODEL_NAME"] = "DeepSeek-R1-Distill-Qwen-7B-g5-endpoint"

# 在 Sagemaker 中部署的 Embedding 模型名称
os.environ["EMBEDDING_MODEL_NAME"] = "bge-m3-2025-03-30-12-58-53-929-auto-endpoint"

# 部署的 Reggion，cn-north-1 -> BJS 北京 Region
os.environ["DEEPSEEK_AWS_REGION"] = "cn-north-1"

# OpenSearch 的访问配置
os.environ["OPENSEARCH_SERVICE_DOMAIN_ENDPOINT"] = "https://search-deepseek-demo-ajierlgzomqjnbpqggl3qzqqky.cn-north-1.es.amazonaws.com.cn"
os.environ["OPENSEARCH_SERVICE_ADMIN_USER"] = "admin"
os.environ["OPENSEARCH_SERVICE_ADMIN_PASSWORD"] = get_opensearch_credentials("opensearch-credentials", os.environ["DEEPSEEK_AWS_REGION"]).get("password")

# 模型相关
os.environ["SAGEMAKER_LLM_ENDPOINT_INFERENCE_ARN"] = f"arn:aws-cn:sagemaker:{os.environ['DEEPSEEK_AWS_REGION']}:{os.environ['ACCOUNT_ID']}:endpoint/{os.environ['LLM_MODEL_NAME']}"
os.environ["SAGEMAKER_EMBEDDING_MODEL_ENDPOINT_INFERENCE_ARN"] = f"arn:aws-cn:sagemaker:{os.environ['DEEPSEEK_AWS_REGION']}:{os.environ['ACCOUNT_ID']}:endpoint/{os.environ['EMBEDDING_MODEL_NAME']}"
os.environ["SAGEMAKER_MODEL_INFERENCE_ENDPOINT"] = f"https://runtime.sagemaker.{os.environ['DEEPSEEK_AWS_REGION']}.amazonaws.com.cn/endpoints/{os.environ['LLM_MODEL_NAME']}/invocations"
os.environ["OPENSEARCH_SERVICE_DOMAIN_ARN"] = f"arn:aws-cn:es:{os.environ['DEEPSEEK_AWS_REGION']}:{os.environ['ACCOUNT_ID']}:domain/deepseek-demo"
os.environ["SAGEMAKER_EMBEDDING_MODEL_INFERENCE_ENDPOINT"] = f"https://runtime.sagemaker.{os.environ['DEEPSEEK_AWS_REGION']}.amazonaws.com.cn/endpoints/{os.environ['EMBEDDING_MODEL_NAME']}/invocations"

print('SAGEMAKER_LLM_ENDPOINT_INFERENCE_ARN:' + os.environ["SAGEMAKER_LLM_ENDPOINT_INFERENCE_ARN"])
print('SAGEMAKER_EMBEDDING_MODEL_ENDPOINT_INFERENCE_ARN:' + os.environ["SAGEMAKER_LLM_ENDPOINT_INFERENCE_ARN"])
print('SAGEMAKER_MODEL_INFERENCE_ENDPOINT:' + os.environ["SAGEMAKER_MODEL_INFERENCE_ENDPOINT"])
print('OPENSEARCH_SERVICE_DOMAIN_ARN:' + os.environ["OPENSEARCH_SERVICE_DOMAIN_ARN"])
print('SAGEMAKER_EMBEDDING_MODEL_INFERENCE_ENDPOINT:' + os.environ["SAGEMAKER_EMBEDDING_MODEL_INFERENCE_ENDPOINT"])

# 2.设置相关权限（IAM Role/IAM Policy/OpenSearch Security）

## 2.1 创建 IAM 角色 invoke_deepseek_role

创建角色 invoke_deepseek_role，用于 OpenSearch Connector 远程调用 Sagemaker endpoint

In [None]:
import boto3
import json
import os


# The script will create a role and policy with the names below. It
# reads the ARN for the SageMaker endpoint from the environment.
invoke_deepseek_policy_name = 'invoke_deepseek_policy'
invoke_deepseek_role_name = 'invoke_deepseek_role'
sagemaker_llm_model_inference_endpoint = os.environ['SAGEMAKER_LLM_ENDPOINT_INFERENCE_ARN']
sagemaker_embedding_model_inference_endpoint = os.environ['SAGEMAKER_EMBEDDING_MODEL_ENDPOINT_INFERENCE_ARN']


# Allows invoke endpoint
policy = {
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "sagemaker:InvokeEndpoint"
      ],
      "Resource": [
        sagemaker_llm_model_inference_endpoint,
        sagemaker_embedding_model_inference_endpoint
      ]
    }
  ]
}


# Allows OpenSearch Service to assume the role. The role and policy
# together allow OpenSearch Service to call SageMaker to invoke
# DeepSeek to generate text.
trust_relationship = {
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "es.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}


# Check for an existing policy with the same name, and error out
# if it exists.
iam = boto3.client('iam')
sts = boto3.client('sts')
existing_policy = None
try:
  # This constructs an ARN for the policy based on the current
  # account. If you need to run this for another account, you can 
  # change the account ID below.
  account_id = sts.get_caller_identity()['Account']
  policy_arn = f'arn:aws-cn:iam::{account_id}:policy/{invoke_deepseek_policy_name}'
  existing_policy = iam.get_policy(PolicyArn=policy_arn)['Policy']
  if existing_policy:
    raise Exception(f"Policy {invoke_deepseek_policy_name} already exists. Please set another policy name")
except iam.exceptions.NoSuchEntityException:
  pass


# Check for an existing policy with the same name, and error out
# if it exists.
existing_role = None
try:
  existing_role = iam.get_role(RoleName=invoke_deepseek_role_name)
  if existing_role:
    raise Exception(f"Role {invoke_deepseek_role_name} already exists. Please set another role name")
except iam.exceptions.NoSuchEntityException:
  pass


# Create the policy
policy = iam.create_policy(
  PolicyName=invoke_deepseek_policy_name,
  PolicyDocument=json.dumps(policy)
)
policy_arn = policy['Policy']['Arn']


# Create the role, with the policy document just created.
role = iam.create_role(
  RoleName=invoke_deepseek_role_name,
  AssumeRolePolicyDocument=json.dumps(trust_relationship)
)
role_arn = role['Role']['Arn']
iam.attach_role_policy(
  RoleName=invoke_deepseek_role_name,
  PolicyArn=policy_arn
)

print(f'Created policy {policy_arn}')
print(f'Created role {role_arn}')

print(f'\nPlease execute the following command\nos.environ["INVOKE_DEEPSEEK_ROLE"] = "{role_arn}"\n')

os.environ["INVOKE_DEEPSEEK_ROLE"] = role_arn

## 2.2 创建 IAM 角色 create_deepseek_connector_role

创建一个用于操作 OpenSearch 的角色，后续将使用该角色向 OpenSearch 发送创建/删除/导入数据等操作

In [None]:
# Copyright 2025 Norris
# MIT-0
#
'''
This module constructs an IAM role and policy that enables your account
to create an OpenSearch connector in your Amazon OpenSearch Service domain.
'''

import boto3
import json
import os


# This script will create a role and policy document with 
# the following names.
create_connector_policy_name = 'create_deepseek_connector_policy'
create_connector_role_name = 'create_deepseek_connector_role'

# Read environment variables for the invoke role, and the domain ARNs.
invoke_connector_role_arn = os.environ['INVOKE_DEEPSEEK_ROLE']
opensearch_service_domain_arn = os.environ['OPENSEARCH_SERVICE_DOMAIN_ARN']

def get_current_role_arn():
    sts_client = boto3.client('sts')
    try:
        # 获取当前身份信息
        response = sts_client.get_caller_identity()
        
        # 返回的 ARN 会是角色的 ARN，如果代码在使用角色的环境中运行
        arn = response['Arn']
        
        # 如果需要提取角色名称
        if ':assumed-role/' in arn:
            # 从 ARN 中提取角色名称
            role_name = arn.split(':assumed-role/')[1].split('/')[0]
            # 构建标准的 IAM 角色 ARN
            account_id = response['Account']
            role_arn = f"arn:aws-cn:iam::{account_id}:role/service-role/{role_name}"
            return role_arn
        
        return arn
    except Exception as e:
        print(f"获取当前角色 ARN 时出错: {e}")
        return None

# 使用示例
current_role_arn = get_current_role_arn()


# This policy will allow post operations on the OpenSearch Service
# domain. It adds a pass role so that OpenSearch can validate the
# connector.
policy = {
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": "iam:PassRole",
      "Resource": invoke_connector_role_arn
    },
    {
      "Effect": "Allow",
      "Action": "es:ESHttpPost",
      "Resource": opensearch_service_domain_arn
    }
  ]
}


# Pulls the current user ARN from Boto's entity resolution, based
# on either aws configure, or environment variables. This role,
# with the policy above enables you to call OpenSearch's
# create_connector API
# current_user_arn = boto3.resource('iam').CurrentUser().arn
trust_relationship = {
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": current_role_arn
      },
      "Action": "sts:AssumeRole"
    }
  ]
}


iam = boto3.client('iam')
sts = boto3.client('sts')


# First validate that the script won't overwrite an existing role or policy. If you receive
# either exception, change the global variable above to give it a different name
#
# Validate that the script won't overwrite an existing policy.  
existing_policy = None
try:
  account_id = sts.get_caller_identity()['Account']
  policy_arn = f'arn:aws-cn:iam::{account_id}:policy/{create_connector_policy_name}'
  existing_policy = iam.get_policy(PolicyArn=policy_arn)['Policy']
  if existing_policy:
    raise Exception(f"Policy {create_connector_policy_name} already exists. Please set another policy name")
except iam.exceptions.NoSuchEntityException:
  # The policy document does not exist. That's the expected result, so there's
  # nothing additional to do
  pass


# Validate that the script won't overwrite an existing role.
existing_role = None
try:
  existing_role = iam.get_role(RoleName=create_connector_role_name)
  if existing_role:
    raise Exception(f"Role {create_connector_role_name} already exists. Please set another role name")
except iam.exceptions.NoSuchEntityException:
  # The role does not exist. That's the expected result, so there's
  # nothing additional to do
  pass


# Create the policy and role. Note, in actual usage, you should wrap these calls
# in try/except blocks and validate the responses. 
#
# Create the policy
policy = iam.create_policy(
  PolicyName=create_connector_policy_name,
  PolicyDocument=json.dumps(policy)
)
policy_arn = policy['Policy']['Arn']


# Create the role
role = iam.create_role(
  RoleName=create_connector_role_name,
  AssumeRolePolicyDocument=json.dumps(trust_relationship)
)
role_arn = role['Role']['Arn']
iam.attach_role_policy(
  RoleName=create_connector_role_name,
  PolicyArn=policy_arn
)

print(f'Created policy {policy_arn}')
print(f'Created role {role_arn}')
# print(f'\nPlease execute the following command\nos.environ["CREATE_DEEPSEEK_CONNECTOR_ROLE"] = "{role_arn}"\n')

os.environ["CREATE_DEEPSEEK_CONNECTOR_ROLE"] = role_arn

## 2.3 配置 OpenSearch 中的安全策略

将 create_deepseek_connector_role 添加到 OpenSearch 的安全策略中

In [None]:
import boto3
from opensearchpy import OpenSearch
import os


# Read the configuration from environment variables.
opensearch_service_api_endpoint = os.environ['OPENSEARCH_SERVICE_DOMAIN_ENDPOINT']
opensearch_user_name = os.environ['OPENSEARCH_SERVICE_ADMIN_USER']
opensearch_user_password = os.environ['OPENSEARCH_SERVICE_ADMIN_PASSWORD']
create_deepseek_connector_role = os.environ['CREATE_DEEPSEEK_CONNECTOR_ROLE']
lambda_invoke_ml_commons_role_name = 'LambdaInvokeOpenSearchMLCommonsRole'
opensearch_port = 443


# Ensure the endpoint matches the contract for the opensearch-py client. Endpoints
# are specified without the leading URL scheme or trailing slashes.
if opensearch_service_api_endpoint.startswith('https://'):
  opensearch_service_api_endpoint = opensearch_service_api_endpoint[len('https://'):]
if opensearch_service_api_endpoint.endswith('/'):
  opensearch_service_api_endpoint = opensearch_service_api_endpoint[:-1]


# Construct the backend roles. OpenSearch's fine-grained access control will detect
# signed traffic and map these entities to the ml_full_access role.
sts = boto3.client('sts')
account_id = sts.get_caller_identity()['Account']
lambda_invoke_ml_commons_role_arn = f'arn:aws-cn:iam::{account_id}:role/{lambda_invoke_ml_commons_role_name}'
role_mapping = {
  "backend_roles": [create_deepseek_connector_role,
                    lambda_invoke_ml_commons_role_arn]
}

hosts = [{"host": opensearch_service_api_endpoint, "port": opensearch_port}]
client = OpenSearch(
    hosts=hosts,
    http_auth=(opensearch_user_name, opensearch_user_password),
    use_ssl=True,
    verify_certs=False,
    ssl_assert_hostname=False,
    ssl_show_warn=False,
)
# client.security.create_role_mapping('ml_full_access', body=role_mapping)

# 使用 transport.perform_request 方法直接调用 API
client.transport.perform_request(
    'PUT',
    '/_plugins/_security/api/rolesmapping/ml_full_access',
    body=role_mapping
)

# 获取角色映射
response = client.transport.perform_request(
    'GET',
    '/_plugins/_security/api/rolesmapping/ml_full_access'
)
print(f'ml_full_access role mapping is now {response}')



#### 更新 trusted_connector_endpoints_regex 的正则表达式
需要在 **OpenSearch Dev tools** 中 执行以下命令，重新设置 ml_connector url 正则表达式规则，来支持亚马逊云科技中国区的URL。
```PUT _cluster/settings
PUT _cluster/settings
{
  "persistent": {
    "plugins.ml_commons.trusted_connector_endpoints_regex": [
      "https://runtime\\.sagemaker\\.cn-north-1\\.amazonaws\\.com\\.cn/endpoints/.*"
    ]
  }
}
```

In [None]:
# Copyright 2025 Norris
# MIT-0
#
# 更新 trusted_connector_endpoints_regex 的正则表达式

import os
import requests


opensearch_service_api_endpoint = os.environ['OPENSEARCH_SERVICE_DOMAIN_ENDPOINT']
opensearch_user_name = os.environ['OPENSEARCH_SERVICE_ADMIN_USER']
opensearch_user_password = os.environ['OPENSEARCH_SERVICE_ADMIN_PASSWORD']
region = os.environ['DEEPSEEK_AWS_REGION']

# Set up user/password auth
userauth = (opensearch_user_name, opensearch_user_password)
headers = {"Content-Type": "application/json"}

########################################################################################
# update endpoints regex
path = f'/_cluster/settings'
url = opensearch_service_api_endpoint + path
payload = {
  "persistent": {
    "plugins.ml_commons.trusted_connector_endpoints_regex": [
      "https://runtime\\.sagemaker\\.cn-north-1\\.amazonaws\\.com\\.cn/endpoints/.*"
    ]
  }
}
r = requests.put(url, auth=userauth, json=payload, headers=headers)

reponse = r.json()
print(f'reponse: {reponse}')

# 3. 创建 ML Connector

## 3.1 创建 DeepSeek Model Connector

In [None]:
import boto3
import json
import os
import requests
from requests_aws4auth import AWS4Auth

opensearch_service_api_endpoint = os.environ['OPENSEARCH_SERVICE_DOMAIN_ENDPOINT']
region = os.environ['DEEPSEEK_AWS_REGION']
invoke_role_arn = os.environ['INVOKE_DEEPSEEK_ROLE']
create_deepseek_connector_role_arn = os.environ['CREATE_DEEPSEEK_CONNECTOR_ROLE']
sagemaker_endpoint_url = os.environ['SAGEMAKER_MODEL_INFERENCE_ENDPOINT']


# Create the AWS4Auth object that will sign the create connector API call. 
credentials = boto3.client('sts').assume_role(
  RoleArn=create_deepseek_connector_role_arn,
  RoleSessionName='create_connector_session'
)['Credentials']
awsauth = AWS4Auth(credentials['AccessKeyId'],
                   credentials['SecretAccessKey'],
                   region,
                   'es',
                   session_token=credentials['SessionToken'])


# Prepare the API call parameters. 
path = '/_plugins/_ml/connectors/_create'
url = opensearch_service_api_endpoint + path
# See the documentation 
# https://opensearch.org/docs/latest/ml-commons-plugin/remote-models/blueprints/ for 
# details on the connector payload, and additional blueprints for other models.
payload = {
  "name": "DeepSeek R1 model connector v2",
  "description": "Connector for my Sagemaker DeepSeek model",
  "version": "1.0",
  "protocol": "aws_sigv4",
  "credential": {
    "roleArn": invoke_role_arn
  },
  "parameters": {
    "service_name": "sagemaker",
    "region": region,
    "do_sample": True,
    "top_p": 0.7,
    "temperature": 0.5,
    "max_tokens": 1024
  },
  "actions": [
    {
      "action_type": "PREDICT",
      "method": "POST",
      "url": sagemaker_endpoint_url,
      "headers": {
        "content-type": "application/json"
      },
      "request_body": """{ "prompt": "${parameters.inputs}", "temperature": ${parameters.temperature}, "top_p": ${parameters.top_p}, "max_tokens": ${parameters.max_tokens} }""",
      "post_process_function": """
            try {
            // 从响应中提取生成的文本
            def completion = "";
            
            // 检查是否存在 choices 数组
            if (params.choices.size() > 0) {
              // 获取第一个选择的文本
              completion = params.choices[0].text;
              
            } else {
              completion = "无法从模型响应中提取文本";
            }
            
            
            def json = "{" +
                             "\\"name\\": \\"response\\","+
                            "\\"dataAsMap\\": {" +
                             "\\"completion\\":\\"" + escape(completion) + "\\"}" +
                             "}";
            return json;
            
        
          } catch (Exception e) {
            // 返回错误信息
            return '{' +
                     '"name": "response",' +
                     '"dataAsMap": {' +
                        '"completion": "处理错误: ' + escape(e) + '"' +
                     '}' +
                   '}';
          }
        """
    }
  ]
}

# This ignores errors and doesn't check the result. In real use,
# you should wrap this code with try/except blocks and check the
# response status code and the response body for errors.
headers = {"Content-Type": "application/json"}
r = requests.post(url, auth=awsauth, json=payload, headers=headers)
print(r.text)
connector_id = json.loads(r.text)['connector_id']


# print(connector_id)
print(f'DEEPSEEK_CONNECTOR_ID="{connector_id}"\n')
os.environ["DEEPSEEK_CONNECTOR_ID"] = connector_id

## 3.2 创建 BGE Emebedding Model Connector

In [None]:
import boto3
import json
import os
import requests 
from requests_aws4auth import AWS4Auth

opensearch_service_api_endpoint = os.environ['OPENSEARCH_SERVICE_DOMAIN_ENDPOINT']
region = os.environ['DEEPSEEK_AWS_REGION']
invoke_role_arn = os.environ['INVOKE_DEEPSEEK_ROLE']
create_deepseek_connector_role_arn = os.environ['CREATE_DEEPSEEK_CONNECTOR_ROLE']
sagemaker_endpoint_url = os.environ['SAGEMAKER_EMBEDDING_MODEL_INFERENCE_ENDPOINT']


# Create the AWS4Auth object that will sign the create connector API call. 
credentials = boto3.client('sts').assume_role(
    RoleArn=create_deepseek_connector_role_arn,
    RoleSessionName='create_connector_session'
)['Credentials']
awsauth = AWS4Auth(credentials['AccessKeyId'], 
                   credentials['SecretAccessKey'], 
                   region, 
                   'es', 
                   session_token=credentials['SessionToken'])


# Prepare the API call parameters. 
path = '/_plugins/_ml/connectors/_create'
url = opensearch_service_api_endpoint + path
# See the documentation 
# https://opensearch.org/docs/latest/ml-commons-plugin/remote-models/blueprints/ for 
# details on the connector payload, and additional blueprints for other models.
payload = {
  "name": "BEG Embedding model connector",
  "description": "Connector for Sagemaker BEG model",
  "version": "1.0",
  "protocol": "aws_sigv4",
  "credential": {
    "roleArn": invoke_role_arn
  },
  "parameters": {
    "service_name": "sagemaker",
    "region": region
  },
    "actions": [
        {
            "action_type": "PREDICT",
            "method": "POST",
            "url": sagemaker_endpoint_url,
            "headers": {
                "content-type": "application/json"
            },
            "request_body": "{ \"input\": \"${parameters.input}\"}",
            "pre_process_function": """
                StringBuilder builder = new StringBuilder();
                builder.append("\\"");
                builder.append(params.text_docs[0]);
                builder.append("\\"");
                def parameters = "{" +"\\"input\\":" + builder + "}";
                return "{" +"\\"parameters\\":" + parameters + "}";
                """,
            "post_process_function": """
                  def name = "sentence_embedding";
                  def dataType = "FLOAT32";
                  if (params.data[0].embedding == null || params.data[0].embedding.length == 0) {
                    return params.message;
                  }
                  def shape = [params.data[0].embedding.length];
                  def json = "{" +
                             "\\"name\\":\\"" + name + "\\"," +
                             "\\"data_type\\":\\"" + dataType + "\\"," +
                             "\\"shape\\":" + shape + "," +
                             "\\"data\\":" + params.data[0].embedding +
                             "}";
                  return json;
                """
        }
    ]
}

# This ignores errors and doesn't check the result. In real use,
# you should wrap this code with try/except blocks and check the
# response status code and the response body for errors.
headers = {"Content-Type": "application/json"}
r = requests.post(url, auth=awsauth, json=payload, headers=headers)
print(r.text)
connector_id = json.loads(r.text)['connector_id']


print(connector_id)
print(f'EMBEDDING_CONNECTOR_ID="{connector_id}"\n')
os.environ["EMBEDDING_CONNECTOR_ID"] = connector_id

# 4.创建 ML Model

## 4.1 注册 DeepSeek Model

In [None]:
import os
import requests


opensearch_service_api_endpoint = os.environ['OPENSEARCH_SERVICE_DOMAIN_ENDPOINT']
opensearch_user_name = os.environ['OPENSEARCH_SERVICE_ADMIN_USER']
opensearch_user_password = os.environ['OPENSEARCH_SERVICE_ADMIN_PASSWORD']
region = os.environ['DEEPSEEK_AWS_REGION']
connector_id = os.environ['DEEPSEEK_CONNECTOR_ID']
create_deepseek_connector_role = os.environ['CREATE_DEEPSEEK_CONNECTOR_ROLE']


# Set up user/password auth
userauth = (opensearch_user_name, opensearch_user_password)
headers = {"Content-Type": "application/json"}


########################################################################################
# Register the model
path = '/_plugins/_ml/models/_register'
url = opensearch_service_api_endpoint + path
payload = {
  "name": "Sagemaker DeepSeek R1 model",
  "function_name": "remote",
  "description": "DeepSeek R1 model on Sagemaker",
  "connector_id": connector_id
}
r = requests.post(url, auth=userauth, json=payload, headers=headers)

model_id = r.json()['model_id']
print(f'model_id: {model_id}')


########################################################################################
# Deploy the model
path = f'/_plugins/_ml/models/model_id/_deploy'
url = opensearch_service_api_endpoint + path
r = requests.post(url, auth=userauth, headers=headers)


print(f'DEEPSEEK_MODEL_ID="{model_id}"\n')
os.environ["DEEPSEEK_MODEL_ID"] = model_id

## 4.2 注册 BGE Embedding Model

In [None]:
import os
import requests


opensearch_service_api_endpoint = os.environ['OPENSEARCH_SERVICE_DOMAIN_ENDPOINT']
opensearch_user_name = os.environ['OPENSEARCH_SERVICE_ADMIN_USER']
opensearch_user_password = os.environ['OPENSEARCH_SERVICE_ADMIN_PASSWORD']
region = os.environ['DEEPSEEK_AWS_REGION']
connector_id = os.environ['EMBEDDING_CONNECTOR_ID']
create_deepseek_connector_role = os.environ['CREATE_DEEPSEEK_CONNECTOR_ROLE']


# Set up user/password auth
userauth = (opensearch_user_name, opensearch_user_password)
headers = {"Content-Type": "application/json"}


########################################################################################
# Register the model
path = '/_plugins/_ml/models/_register'
url = opensearch_service_api_endpoint + path
payload = {
  "name": "Sagemaker BGE m3 model",
  "function_name": "remote",
  "description": "BGE m3 model on Sagemaker",
  "connector_id": connector_id
}
r = requests.post(url, auth=userauth, json=payload, headers=headers)

model_id = r.json()['model_id']
print(f'model_id: {model_id}')


########################################################################################
# Deploy the model
path = f'/_plugins/_ml/models/model_id/_deploy'
url = opensearch_service_api_endpoint + path
r = requests.post(url, auth=userauth, headers=headers)


print(f'EMBEDDING_MODEL_ID="{model_id}"\n')
os.environ["EMBEDDING_MODEL_ID"] = model_id

#### Show MODEL_ID and Connector_ID

In [None]:
print("#####  LLM - DEEPSEEK #####")
print("DEEPSEEK_CONNECTOR_ID:" + os.environ["DEEPSEEK_CONNECTOR_ID"])
print("DEEPSEEK_MODEL_ID:" + os.environ["DEEPSEEK_MODEL_ID"])
print("")
print("#####  EMBEDDING MODEL - BGE #####")
print("EMBEDDING_CONNECTOR_ID:" + os.environ["EMBEDDING_CONNECTOR_ID"])
print("EMBEDDING_MODEL_ID:" + os.environ["EMBEDDING_MODEL_ID"])


## 4.3 测试模型

在 OpenSearch Dashboard 中的 Dev Tool 中执行如下命令 <p>
```shell
POST _plugins/_ml/models/<modelid>/_predict
{  
   "parameters": {    
      "inputs": "OpenSearch Serverless 是什么，和OpenSearch集群模式有什么区别，使用 OpenSearch Serverless，还需要管理服务器资源么？"
   }
}
```

In [None]:
# Copyright 2025 Norris
# MIT-0
#

import os
import requests
from pprint import pprint


opensearch_service_api_endpoint = os.environ['OPENSEARCH_SERVICE_DOMAIN_ENDPOINT']
opensearch_user_name = os.environ['OPENSEARCH_SERVICE_ADMIN_USER']
opensearch_user_password = os.environ['OPENSEARCH_SERVICE_ADMIN_PASSWORD']
region = os.environ['DEEPSEEK_AWS_REGION']
connector_id = os.environ['EMBEDDING_CONNECTOR_ID']
create_deepseek_connector_role = os.environ['CREATE_DEEPSEEK_CONNECTOR_ROLE']
deepseek_model_id = os.environ["DEEPSEEK_MODEL_ID"]

# Set up user/password auth
userauth = (opensearch_user_name, opensearch_user_password)
headers = {"Content-Type": "application/json"}


########################################################################################
# Test LLM model
path = f'/_plugins/_ml/models/{deepseek_model_id}/_predict'
url = opensearch_service_api_endpoint + path
payload = {
  "parameters": {    
      "inputs": "OpenSearch Serverless 是什么，和OpenSearch集群模式有什么区别，使用 OpenSearch Serverless，还需要管理服务器资源么？"
   }
}
r = requests.post(url, auth=userauth, json=payload, headers=headers)

response_content = r.json()['inference_results'][0]['output'][0]['dataAsMap']
pprint(f'response: {response_content}')

# 5.导入知识库数据

In [None]:
from opensearchpy import OpenSearch
import os

opensearch_port = 443
opensearch_service_api_endpoint = os.environ['OPENSEARCH_SERVICE_DOMAIN_ENDPOINT']
opensearch_user_name = os.environ['OPENSEARCH_SERVICE_ADMIN_USER']
opensearch_user_password = os.environ['OPENSEARCH_SERVICE_ADMIN_PASSWORD']
embedding_model_id = os.environ['EMBEDDING_MODEL_ID']
index_name = "opensearch_kl_index"


# Ensure the endpoint matches the contract for the opensearch-py client. Endpoints
# are specified without the leading URL scheme or trailing slashes.
if opensearch_service_api_endpoint.startswith('https://'):
  opensearch_service_api_endpoint = opensearch_service_api_endpoint[len('https://'):]
if opensearch_service_api_endpoint.endswith('/'):
  opensearch_service_api_endpoint = opensearch_service_api_endpoint[:-1]


# The mapping sets kNN to true to enable vector search for the index. It defines
# the text field as type text, and a text_embedding field that uses the FAISS engine
# for storage and retrieval, using the HNSW algorithm.
mapping = {
    "settings": {
        "index": {
            "knn": True,
            "number_of_shards": 1,
            "number_of_replicas": 1
        }
    },
    "mappings": {
        "properties": {
            "text": {"type": "text"},
            "text_embedding": {
              "type": "knn_vector",
              "dimension": 1024,
              "method": {
                  "name": "hnsw",
                  "space_type": "l2",
                  "engine": "faiss",
                  "parameters": {"ef_construction": 128, "m": 24},
              }
            }
        }
    }
}


# The data for the knowledge base.
population_data = [
  {"index": {"_index": index_name, "_id": "1"}},
  {"text": "Amazon OpenSearch Service 是一项托管服务，可以轻松地在 AWS 云中部署、操作和扩展 OpenSearch 集群。 OpenSearch 服务域是 OpenSearch 集群的同义词。域是包含您指定的设置、实例类型、实例计数和存储资源的集群。亚马逊 OpenSearch 服务支持 OpenSearch 传统的 Elasticsearch OSS（最高 7.10，即该软件的最终开源版本）。创建域时，您可以选择使用哪种搜索引擎。"},
  {"index": {"_index": index_name, "_id": "2"}},
  {"text": "OpenSearch是一个完全开源的搜索和分析引擎，用于日志分析、实时应用程序监控和点击流分析等用例。"},
  {"index": {"_index": index_name, "_id": "3"}},
  {"text": "Amazon OpenSearch 服务的特点\\n大量 CPU、内存和存储容量配置，也称为实例类型，包括具有成本效益的 Graviton 实例。\\n支持多达 1002 个数据节点\\n高达 25 PB 的连接存储\\n为只读数据提供经济实惠UltraWarm的冷存储"},
  {"index": {"_index": index_name, "_id": "4"}},
  {"text": """OpenSearch 与亚马逊 OpenSearch 服务相比，何时使用该服务\\n对于开源OpenSearch 您的组织愿意手动监控和维护自行预置的集群，并且拥有具备相应技能的人员。
您需要对代码拥有完全的编译级别控制。
您的组织希望或非常独特地使用开源软件。
您执行多云战略，需要不特定于供应商的技术。
您的团队有能力解决任何关键的生产问题。
您希望能够灵活地以任何需要的方式使用、修改和扩展产品。
您希望在新功能发布后立即使用这些功能。"""},
  {"index": {"_index": index_name, "_id": "5"}},
  {"text": """OpenSearch 与亚马逊 OpenSearch 服务相比，何时使用该服务，对于Amazon OpenSearch托管服务，您不想手动管理、监控和维护基础设施。
您想利用 Amazon S3 的持久性和低成本优势，通过跨存储层进行数据分层，从而管理不断增加的分析成本。
你想利用与其他数据库的集成， AWS 服务 例如 DynamoDB、Amazon DocumentDB（兼容 MongoDB）、IAM 和。 CloudWatch CloudFormation
在预防性维护和生产期间出现问题时，您需要轻松获得帮助。 支持
您想利用自我修复、主动维护、韧性和备份等功能。"""},
  {"index": {"_index": index_name, "_id": "6"}},
  {"text": "Amazon OpenSearch Serverless 是 Amazon OpenSearch 服务的按需无服务器选项，它消除了配置、配置和调整 OpenSearch 集群的操作复杂性。它非常适合那些不愿自行管理集群或缺乏用于大规模部署的专用资源和专业知识的组织。借助 OpenSearch Serverless，您可以搜索和分析大量数据，而无需管理底层基础架构。"},
    {"index": {"_index": index_name, "_id": "7"}},
  {"text": "OpenSearch Serverless 集合是一组 OpenSearch 索引，它们协同工作以支持特定的工作负载或用例。与需要手动配置的自我管理 OpenSearch 集群相比，集合简化了操作。"},
    {"index": {"_index": index_name, "_id": "8"}},
  {"text": "OpenSearch Serverless 支持与 OpenSearch 开源套件相同的采集和查询 API 操作，因此您可以继续使用现有的客户端和应用程序。您的客户端必须与 OpenSearch 2.x 兼容，才能使用 OpenSearch Serverless。"},
    {"index": {"_index": index_name, "_id": "9"}},
  {"text": "Amazon OpenSearch Ingestion 是一个完全托管的无服务器数据收集器，可将实时日志、指标和跟踪数据流式传输到亚马逊 OpenSearch 服务域和 OpenSearch 无服务器集合。"},
    {"index": {"_index": index_name, "_id": "10"}},
    {"text": "借助 OpenSearch Ingestion，您不再需要像 Logstash 或 Jaeger 这样的第三方工具来摄取数据。您可以将数据生成器配置为将数据发送到 OpenSearch Ingestion，然后它会自动将其传输到您的指定域或集合。您也可以在交付数据之前转换数据。"}
]


# OpenSearch ingest pipelines let you define processors to apply to your documents at ingest
# time. The text_embedding processor lets you select a source field and a destination field
# for the embedding. At ingest, OpenSearch will call the model, via its model id, to 
# generate the embedding.
ingest_pipeline_definition = {
  "processors": [
    {
      "text_embedding": {
        "model_id": embedding_model_id,
        "field_map": {
            "text": "text_embedding"
        }
      }
    }
  ]
}


# Set up for the client to call OpenSearch Service
hosts = [{"host": opensearch_service_api_endpoint, "port": opensearch_port}]
client = OpenSearch(
    hosts=hosts,
    http_auth=(opensearch_user_name, opensearch_user_password),
    use_ssl=True,
    verify_certs=False,
    ssl_assert_hostname=False,
    ssl_show_warn=False,
)


# Check whether an index already exists with the chosen name. If you receive an 
# exception, change the index name in the global variable above, and be sure 
# also to change the index_name global variable in run_rag.py
if client.indices.exists(index=index_name) == 200:
  raise Exception(f'Index {index_name} already exists. Please choose a different name in load_data.py. Be sure to change the index_name in run_rag.py as well.')

# This code does not validate the response. In actual use, you should wrap this
# block in try/except and validate the response. 
client.indices.create(index=index_name, body=mapping)
r = client.ingest.put_pipeline(id="embedding_pipeline", 
                               body=ingest_pipeline_definition)
client.bulk(index=index_name, 
            body=population_data,
            pipeline="embedding_pipeline", 
            refresh=True)

print(f'Loaded data into {index_name}')

# 6.创建 Search Pipeline

**Retrieval-augmented generation (RAG) processor**

RAG processor 是一个搜索结果处理器，主要用于对话式搜索中的检索增强生成<br>
* 将向量引擎的检索能力与 LLM 的生成能力无缝结合，无需手动编写复杂管道代码，只需配置即可实现“检索-增强生成”的端到端流程。
* 隐藏底层向量搜索、提示词拼接、模型调用等细节，将提示（prompt）发送给大语言模型（LLM）
* 自动将 OpenSearch 检索到的相关文档作为上下文注入 LLM 提示词，提升生成结果的准确性和相关性，避免幻觉问题。

在 OpenSearch Dashboard 中的 Dev Tool 中执行如下命令
```
PUT /_search/pipeline/my-conversation-search-pipeline-deepseek-zh
{
  "response_processors": [
    {
      "retrieval_augmented_generation": {
        "tag": "Demo pipeline",
        "description": "Demo pipeline Using DeepSeek R1",
        "model_id": "<llm-mode-id>",
        "context_field_list": [
          "text"
        ],
        "system_prompt": "你是一个智能助手.",
        "user_instructions": "针对给定的问题，用少于 200 个字给出简洁、翔实的答案"
      }
    }
  ]
}
```

In [None]:
# Copyright 2025 Norris
# MIT-0
#

import os
import requests


opensearch_service_api_endpoint = os.environ['OPENSEARCH_SERVICE_DOMAIN_ENDPOINT']
opensearch_user_name = os.environ['OPENSEARCH_SERVICE_ADMIN_USER']
opensearch_user_password = os.environ['OPENSEARCH_SERVICE_ADMIN_PASSWORD']
region = os.environ['DEEPSEEK_AWS_REGION']
connector_id = os.environ['EMBEDDING_CONNECTOR_ID']
create_deepseek_connector_role = os.environ['CREATE_DEEPSEEK_CONNECTOR_ROLE']
deepseek_model_id = os.environ["DEEPSEEK_MODEL_ID"]

# Set up user/password auth
userauth = (opensearch_user_name, opensearch_user_password)
headers = {"Content-Type": "application/json"}


########################################################################################
# Create Search Pipeline
path = f'/_search/pipeline/my-conversation-search-pipeline-deepseek-zh'
url = opensearch_service_api_endpoint + path
payload = {
  "response_processors": [
    {
      "retrieval_augmented_generation": {
        "tag": "Demo pipeline",
        "description": "Demo pipeline Using DeepSeek R1",
        "model_id": deepseek_model_id,
        "context_field_list": [
          "text"
        ],
        "system_prompt": "你是一个智能助手.",
        "user_instructions": "针对给定的问题，用少于 200 个字给出简洁、翔实的答案，请使用中文回答"
      }
    }
  ]
}
r = requests.put(url, auth=userauth, json=payload, headers=headers)

reponse = r.json()
print(f'reponse: {reponse}')

# 7.测试问答

在 OpenSearch Dashboard 中的 Dev Tool 中执行如下命令
```
GET opensearch_kl_index/_search?search_pipeline=my-conversation-search-pipeline-deepseek-zh
{
  "query": {
    "neural": {
    "text_embedding": {
        "query_text": "OpenSearch Serverless 是什么，和OpenSearch集群模式有什么区别，使用 OpenSearch Serverless，还需要管理服务器资源么？",
        "model_id": "<embedding-model-id>",
        "k": 5
      }
    }
  },
  "size": 2,
  "_source": [
    "text"
  ],
  "ext": {
    "generative_qa_parameters": {
      "llm_model": "bedrock/claude",
      "llm_question": "OpenSearch Serverless 是什么，和OpenSearch集群模式有什么区别，使用 OpenSearch Serverless，还需要管理服务器资源么？",
      "context_size": 5,
      "timeout": 15
    }
  }
}
```

In [None]:
question = "OpenSearch Serverless 是什么，和OpenSearch集群模式有什么区别，使用 OpenSearch Serverless，还需要管理服务器资源么？"

In [None]:
question = "能否再详细解释一下"

In [None]:
# Copyright 2025 Norris
# MIT-0
#

# Test

import os
import requests
from pprint import pprint


opensearch_service_api_endpoint = os.environ['OPENSEARCH_SERVICE_DOMAIN_ENDPOINT']
opensearch_user_name = os.environ['OPENSEARCH_SERVICE_ADMIN_USER']
opensearch_user_password = os.environ['OPENSEARCH_SERVICE_ADMIN_PASSWORD']
region = os.environ['DEEPSEEK_AWS_REGION']
connector_id = os.environ['EMBEDDING_CONNECTOR_ID']
create_deepseek_connector_role = os.environ['CREATE_DEEPSEEK_CONNECTOR_ROLE']
embedding_mode_id = os.environ["EMBEDDING_MODEL_ID"]

# Set up user/password auth
userauth = (opensearch_user_name, opensearch_user_password)
headers = {"Content-Type": "application/json"}



########################################################################################
# Create Search Pipeline
path = f'/opensearch_kl_index/_search?search_pipeline=my-conversation-search-pipeline-deepseek-zh'
url = opensearch_service_api_endpoint + path
payload = {
  "query": {
    "neural": {
    "text_embedding": {
        "query_text": question,
        "model_id": embedding_mode_id,
        "k": 5
      }
    }
  },
  "size": 2,
  "_source": [
    "text"
  ],
  "ext": {
    "generative_qa_parameters": {
      "llm_model": "bedrock/claude",
      "llm_question": question,
      "context_size": 5,
      "timeout": 15
    }
  }
}
response = requests.post(url, auth=userauth, json=payload, headers=headers)

pprint(response.json())

<br>
<br>
<br>
<br>

# 8.对话式搜索(创建对话记忆)

对话式搜索允许您用自然语言提问，并通过提出后续问题来完善答案。因此，对话变成了您与大型语言模型 (LLM) 之间的对话。<br>为此，模型需要记住整个对话的上下文，而不是单独回答每个问题。
在 OpenSearch Dashboard 中的 Dev Tool 中执行如下命令
```
POST /_plugins/_ml/memory/
{
  "name": "Conversation about DeepSeek Demo"
}
```

In [None]:
# Copyright 2025 Norris
# MIT-0
#

import os
import requests


opensearch_service_api_endpoint = os.environ['OPENSEARCH_SERVICE_DOMAIN_ENDPOINT']
opensearch_user_name = os.environ['OPENSEARCH_SERVICE_ADMIN_USER']
opensearch_user_password = os.environ['OPENSEARCH_SERVICE_ADMIN_PASSWORD']
region = os.environ['DEEPSEEK_AWS_REGION']
connector_id = os.environ['EMBEDDING_CONNECTOR_ID']
create_deepseek_connector_role = os.environ['CREATE_DEEPSEEK_CONNECTOR_ROLE']
deepseek_model_id = os.environ["DEEPSEEK_MODEL_ID"]

# Set up user/password auth
userauth = (opensearch_user_name, opensearch_user_password)
headers = {"Content-Type": "application/json"}


########################################################################################
# Test LLM model
path = f'/_plugins/_ml/memory/'
url = opensearch_service_api_endpoint + path
payload = {
  "name": "Conversation about DeepSeek Demo"
}
r = requests.post(url, auth=userauth, json=payload, headers=headers)

memory_id = r.json()['memory_id']
print(f'memory_id: {memory_id}')
os.environ["MEMORY_ID"] = memory_id

### 带有记忆功能的搜索

In [None]:
question = "OpenSearch Serverless 是什么，和OpenSearch集群模式有什么区别，使用 OpenSearch Serverless，还需要管理服务器资源么？"

In [None]:
question = "能否再详细解释一下"

In [None]:
# Copyright 2025 Norris
# MIT-0
#

# Test

import os
import requests
from pprint import pprint


opensearch_service_api_endpoint = os.environ['OPENSEARCH_SERVICE_DOMAIN_ENDPOINT']
opensearch_user_name = os.environ['OPENSEARCH_SERVICE_ADMIN_USER']
opensearch_user_password = os.environ['OPENSEARCH_SERVICE_ADMIN_PASSWORD']
region = os.environ['DEEPSEEK_AWS_REGION']
connector_id = os.environ['EMBEDDING_CONNECTOR_ID']
create_deepseek_connector_role = os.environ['CREATE_DEEPSEEK_CONNECTOR_ROLE']
embedding_mode_id = os.environ["EMBEDDING_MODEL_ID"]
memory_id = os.environ["MEMORY_ID"]

# Set up user/password auth
userauth = (opensearch_user_name, opensearch_user_password)
headers = {"Content-Type": "application/json"}



########################################################################################
# Create Search Pipeline
path = f'/opensearch_kl_index/_search?search_pipeline=my-conversation-search-pipeline-deepseek-zh'
url = opensearch_service_api_endpoint + path
payload = {
  "query": {
    "neural": {
    "text_embedding": {
        "query_text": question,
        "model_id": embedding_mode_id,
        "k": 5
      }
    }
  },
  "size": 2,
  "_source": [
    "text"
  ],
  "ext": {
    "generative_qa_parameters": {
      "llm_model": "bedrock/claude",
      "llm_question": question,
      "memory_id": memory_id,
      "context_size": 2,
      "message_size": 2,
      "timeout": 15
    }
  }
}
response = requests.post(url, auth=userauth, json=payload, headers=headers)

pprint(response.json())

#### 查看记忆中的消息
```
GET /_plugins/_ml/memory/<memory-id>/messages
```