# Use Trained Model in OpenSearch Search Pipeline

This notebook demonstrates how to use a trained model inside an OpenSearch search pipeline.

In [12]:
import requests
import json
import time

## Cluster Settings

Your OpenSearch cluster needs to trust the API. We add one or more regular expressions to the set of trusted ones.
In this example we trust API endpoints running on `localhost` and `inference-api`. Additionally, we enable connections to private IPs which is blocked by default but important when connecting to external APIs.

In [2]:
headers = {
    'Content-Type': 'application/json'
}
host = "http://localhost:9200/"

In [5]:
# Make OS trust your API
url = host + "_cluster/settings"

payload = {
    "persistent": {
        "plugins.ml_commons.trusted_connector_endpoints_regex": [
          "^http://localhost:.*$",
          "^http://127.0.0.1:.*$",
          "^http://inference-api:.*$"
        ]
    }
}

response = requests.request("PUT", url, headers=headers, data=json.dumps(payload))
print(response)

<Response [200]>


## API Connector

Next we create a connector with all the configuration details necessary to call the API endpoint.

In [14]:
url = host + "_plugins/_ml/connectors/_create"

payload = {
  "name": "hybrid_search_optimizer",
  "description": "predict neuralness and keywordness",
  "version": 1,
  "protocol": "http",
  "actions": [
    {
      "action_type": "predict",
      "method": "POST",
      "headers": {
        "content-type": "application/json",
        "accept": "application/json"
      },
      "url": "http://inference-api:8000/predict/",
      "request_body": "{ \"data\": \"${parameters.input}\"}"
    }
  ]
}

response = requests.request("POST", url, headers=headers, data=json.dumps(payload))

if response.status_code == 200:
    connector_id = response.json()['connector_id']
    print(f"Connector with id {connector_id} successfully created")    
else:
    print(f"Connector creation failed: {response}")

Connector with id yxG2YpMBDpIvIC8uGRjp successfully created


## Model Group

We register the external API as a model in OpenSearch.

To do so we create a model group first and then add a model by referencing the created connector.

In [15]:
# Register a model group
url = host + "_plugins/_ml/model_groups/_register"

payload = {
    "name": "hybrid_search_model_group",
    "description": "This is the model group for all models we use for hybrid search"
}

response = requests.request("POST", url, headers=headers, data=json.dumps(payload))

if response.status_code == 200:
    model_group_id = response.json()['model_group_id']
    print(f"Model group with id {model_group_id} successfully created")    
else:
    print(f"Model group creation failed: {response}")

Model group creation failed: <Response [400]>


In [16]:
# Register model in model group
url = host + "_plugins/_ml/models/_register"

payload = {
    "name": "predict_neuralness",
    "function_name": "remote",
    "model_group_id": model_group_id,
    "description": "predict neuralness and keywordness",
    "connector_id": connector_id
}

response = requests.request("POST", url, headers=headers, data=json.dumps(payload))

if response.status_code == 200:
    task_id = response.json()['task_id']
    model_id = response.json()['model_id']
    print(f"Task with id {task_id} to create model with id {model_id} successfully created")    
else:
    print(f"Model creation failed: {response}")

Task with id zBG2YpMBDpIvIC8uJxgi to create model with id zRG2YpMBDpIvIC8uJxhB successfully created


In [17]:
# Check for task completion
url = host + "_plugins/_ml/tasks/" + task_id
response = requests.request("GET", url, headers=headers)

max_attempts = 5
attempts = 0

state = None
while state != 'COMPLETED' and attempts < max_attempts:
    time.sleep(5) # wait five second and then check again.
    response = requests.request("GET", url, headers=headers)
    print(response.json()['state'])
    state = response.json()['state']
    attempts += 1

model_id = response.json()['model_id']

print(f"Task finished and model with id {model_id} successfully created")

COMPLETED
Task finished and model with id zRG2YpMBDpIvIC8uJxhB successfully created


## Try the Model

In [18]:
url = host + f"_plugins/_ml/models/{model_id}/_predict"
payload = {
  "parameters": {"input": "2, 0, 169, 1.1657, 8.58744, 0.67777"}
}

response = requests.request("POST", url, headers=headers, data=json.dumps(payload))
print(response.json())

{'inference_results': [{'output': [{'name': 'response', 'dataAsMap': {'neuralness': 0.51, 'keywordness': 0.49}}], 'status_code': 200}]}


## Integrate the Model in a Query

With the [ML Inference](https://opensearch.org/docs/latest/search-plugins/search-pipelines/ml-inference-search-request/#configuration-parameters) search pipeline we can make inference within search requests and use the output in a query.

Currently unknown is if we can use the pipeline output as part of another pipeline (for hybrid search). See the [thread in Slack](https://opensearch.slack.com/archives/C05BGJ1N264/p1732270893567709) for details.

The following pipeline uses the prdicted values as search weights for two fields, not as keyword/neural search weights in a hybrid search scenario.

In [28]:
# Creating the pipeline

url = host + "_search/pipeline/ml_inference_pipeline" 

payload = {
  "description": "search with predictions",
  "request_processors": [
    {
      "ml_inference": {
        "function_name": "remote",
        "full_response_path": True,
        "model_id": model_id,
        "model_input": """{ "parameters": {"input": "${input_map.features}"}}""",
        "query_template": """{
  "query": {
    "multi_match": {
      "type": "best_fields",
      "fields": [
        "product_id^100",
        "product_bullet_point^3",
        "product_color^2",
        "product_brand^5",
        "product_description^${keywordness}",
        "product_title^${neuralness}"
      ],
      "operator": "and",
      "query": "iphone"
    }
  },"explain": true
}""",
        "input_map": [
          {
            "features": "query.term.features.value"
          }
        ],
        "output_map": [
          {
            "neuralness": "$.inference_results[0]output[0]dataAsMap.neuralness",
            "keywordness": "$.inference_results[0]output[0]dataAsMap.keywordness"
          }
        ],
        "ignore_missing": False,
        "ignore_failure": False
      }
    }
  ]
}

response = requests.request("PUT", url, headers=headers, data=json.dumps(payload))
print(response.json())

{'acknowledged': True}


In [30]:
# Run the query
url = host + "ecommerce/_search?search_pipeline=ml_inference_pipeline"

payload = {
"query": {
  "term": {
    "features": {
      "value": "2, 0, 169, 1.1657, 8.58744, 0.67777"
      }
    }
  }
}

response = requests.request("POST", url, headers=headers, data=json.dumps(payload))
print(response.json())

{'took': 141, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 10000, 'relation': 'gte'}, 'max_score': 26.413527, 'hits': [{'_shard': '[ecommerce][0]', '_node': 'RsKfw9aKSqyvO8W6Rey7TA', '_index': 'ecommerce', '_id': 'B00NOGWZKK', '_score': 26.413527, '_source': {'product_bullet_point': 'iphone 6s plus apple iphone 6 plus case iphone 6 plus cover case apple iphone 6s plus case iphone 6s case heavy duty iphone 6 plus case black iphone 6 plus case iphone 6s plus cover case black iphone 6s plus case slim iphone 6 plus case iphone 6 plus hard case iphone 6s plus hard case\ncase iphone 6s plus iphone 6 plus cover cover iphone 6 plus iphone 6s plus cover iphone 6 soft case iphone 6 plus clear iphone 6 case apple iphone 6 plus cover iphone case 6 plus case iphone 6s phone case\niphone 6 plus back cover apple iphone 6 case apple iphone 6s case apple iphone 6 plus cover case iphone 6 case cover apple iphone 6 plus phone case bl

In [33]:
# this hybrid search pipeline is not working right now
url = host + "_search/pipeline/ml_inference_pipeline" 

payload = {
  "description": "search with predictions",
  "request_processors": [
    {
      "ml_inference": {
        "function_name": "remote",
        "full_response_path": True,
        "model_id": model_id,
        "model_input": """{ "parameters": {"input": "${input_map.features}"}}""",
        "query_template": """{
  "_source": {
    "excludes": [
      "title_embedding"
    ]
  },
  "query": {
    "hybrid": {
      "queries": [
        {
          "multi_match": {
            "type": "best_fields",
            "fields": [
              "product_id^100",
              "product_bullet_point^3",
              "product_color^2",
              "product_brand^5",
              "product_description",
              "product_title^10"
            ],
            "operator": "and",
            "query": "iphone"
          }
        },
        {
          "neural": {
            "title_embedding": {
              "query_text": "iphone",
              "k": 5
            }
          }
        }
      ]
    }
  },
  "search_pipeline": {
    "request_processors": [
      {
        "neural_query_enricher": {
          "description": "one of many search pipelines for experimentation",
          "default_model_id": "i6jHTZMBflg_ePyfu9EK",
          "neural_field_default_id": {
            "title_embeddings": "i6jHTZMBflg_ePyfu9EK"
          }
        }
      }
    ],
    "phase_results_processors": [
      {
        "normalization-processor": {
          "normalization": {
            "technique": "l2"
          },
          "combination": {
            "technique": "arithmetic_mean",
            "parameters": {
              "weights": [
                ${keywordness},
                ${neuralness}
              ]
            }
          }
        }
      }
    ]
  },
  "size": 1
}""",
        "input_map": [
          {
            "features": "query.term.features.value"
          }
        ],
        "output_map": [
          {
            "neuralness": "$.inference_results[0]output[0]dataAsMap.neuralness",
            "keywordness": "$.inference_results[0]output[0]dataAsMap.keywordness"
          }
        ],
        "ignore_missing": False,
        "ignore_failure": False
      }
    }
  ]
}

response = requests.request("PUT", url, headers=headers, data=json.dumps(payload))
print(response.json())

{'acknowledged': True}


In [34]:
# Run the query
url = host + "ecommerce/_search?search_pipeline=ml_inference_pipeline"

payload = {
"query": {
  "term": {
    "features": {
      "value": "2, 0, 169, 1.1657, 8.58744, 0.67777"
      }
    }
  }
}

response = requests.request("POST", url, headers=headers, data=json.dumps(payload))
print(response.json())

{'error': {'root_cause': [{'type': 'null_pointer_exception', 'reason': 'modelId is marked non-null but is null'}], 'type': 'null_pointer_exception', 'reason': 'modelId is marked non-null but is null'}, 'status': 500}


When changing the pipeline config by extracting the neural_query_enricher outside of the `query_template` there are results butr they look not normalized (negative scores for what apparently are neural search results, regular keyword search result scores)

In [35]:
# this hybrid search pipeline is not working right now
url = host + "_search/pipeline/ml_inference_pipeline" 

payload = {
  "description": "search with predictions",
  "request_processors": [
    {
      "ml_inference": {
        "function_name": "remote",
        "full_response_path": True,
        "model_id": "zRG2YpMBDpIvIC8uJxhB",
        "model_input": """{ "parameters": {"input": "${input_map.features}"}}""",
        "query_template": """{
  "_source": {
    "excludes": [
      "title_embedding"
    ]
  },
  "query": {
    "hybrid": {
      "queries": [
        {
          "multi_match": {
            "type": "best_fields",
            "fields": [
              "product_id^100",
              "product_bullet_point^3",
              "product_color^2",
              "product_brand^5",
              "product_description",
              "product_title^10"
            ],
            "operator": "and",
            "query": "iphone"
          }
        },
        {
          "neural": {
            "title_embedding": {
              "query_text": "iphone",
              "k": 50
            }
          }
        }
      ]
    }
  },
  "search_pipeline": {
    "phase_results_processors": [
      {
        "normalization-processor": {
          "normalization": {
            "technique": "l2"
          },
          "combination": {
            "technique": "arithmetic_mean",
            "parameters": {
              "weights": [
                ${keywordness},
                ${neuralness}
              ]
            }
          }
        }
      }
    ]
  },
  "size": 100
}""",
        "input_map": [
          {
            "features": "query.term.features.value"
          }
        ],
        "output_map": [
          {
            "neuralness": "$.inference_results[0]output[0]dataAsMap.neuralness",
            "keywordness": "$.inference_results[0]output[0]dataAsMap.keywordness"
          }
        ],
        "ignore_missing": False,
        "ignore_failure": False
      }
    },
    {
      "neural_query_enricher": {
        "description": "one of many search pipelines for experimentation",
        "default_model_id": "i6jHTZMBflg_ePyfu9EK",
        "neural_field_default_id": {
            "title_embeddings": "i6jHTZMBflg_ePyfu9EK"
          }
      }
    }
  ]
}

response = requests.request("PUT", url, headers=headers, data=json.dumps(payload))
print(response.json())

{'acknowledged': True}
