# Opensearch ML

This notebook is made for testing neural/semantic search with opensearch

In [1]:
!pip install requests opensearch-py -q


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m24.1.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


## ML model into OS

Reference: https://opensearch.org/docs/latest/ml-commons-plugin/custom-local-models/ 

Lets start by registering a model group

In [1]:
# import requests
from opensearchpy import OpenSearch

osclient = OpenSearch(["localhost:9216"])


if osclient.ping():
    print("OS Ready")

OS Ready


In [2]:
# dummy wrapper for calls that are not wrapped by opensearch-py

def do_request(method, path, body=None, params=None):
    print(f"[{method}] {path}")
    return osclient.transport.perform_request(method, path, body=body, params=params)

In [3]:
# Some settings for opensearch docker local
osclient.cluster.put_settings({
  "persistent": {
    "plugins": {
      "ml_commons": {
        "allow_registering_model_via_url": "true",
        "only_run_on_ml_node": "false",
        "model_access_control_enabled": "true",
        "native_memory_threshold": "99"
      }
    }
  }
})

osclient.cluster.put_settings({
    "persistent": {
        "plugins.ml_commons.connector.private_ip_enabled": True,
        "plugins.ml_commons.trusted_connector_endpoints_regex": [
          "^http://ai_inference:?\\d*/.*$",
        ]
    }
})

osclient.cluster.get_settings()

{'persistent': {'plugins': {'ml_commons': {'only_run_on_ml_node': 'false',
    'trusted_connector_endpoints_regex': ['^http://ai_inference:?\\d*/.*$'],
    'model_access_control_enabled': 'true',
    'native_memory_threshold': '99',
    'connector': {'private_ip_enabled': 'true'},
    'allow_registering_model_via_url': 'true'},
   'index_state_management': {'template_migration': {'control': '-1'}}}},
 'transient': {}}

In [4]:
# Search existing connectors
search_connector_res = do_request("POST", "/_plugins/_ml/connectors/_search", {})
connector_id = None

if search_connector_res["hits"]["hits"]:
    connector_id = search_connector_res["hits"]["hits"][0]["_id"]

search_connector_res["hits"]["hits"]

[POST] /_plugins/_ml/connectors/_search


[]

In [5]:
# Remove connectors (this may require to undeploy/delete model first)

if True:
    for connector in search_connector_res["hits"]["hits"]:
        delete_res = do_request("DELETE", f"/_plugins/_ml/connectors/{connector['_id']}")
        print(delete_res)

In [6]:
if not connector_id:
  connector_res = do_request("POST", "/_plugins/_ml/connectors/_create", 
  {
    "name": "Local app connector",
    "description": "The connector",
    "version": 1,
    "protocol": "http",
    "actions": [
      {
        "action_type": "predict",
        "method": "POST",
        "url": "http://ai_inference:8080/invocations",
        "headers": {
          "content-type": "application/json"
        },
        "post_process_function": "connector.post_process.default.embedding",
        "request_body": "{ \"text\": ${parameters.input} }",
      }
    ]
  })
  connector_id = connector_res["connector_id"]

connector_id

[POST] /_plugins/_ml/connectors/_create


'OsMrnJABV_DL0Puipgjk'

In [7]:
# Register a model group
try:
    model_group_res = do_request("POST", "/_plugins/_ml/model_groups/_register", body={
        "name": "doofinder_ml_group",
        "description": "sample model group",
    })
    model_group_id = model_group_res["model_group_id"]
except Exception:
    os_res = do_request("POST", "/_plugins/_ml/model_groups/_search", body={
        "query": {
            "match_all": {}
        },
        "size": 10
    })
    model_group_id = os_res["hits"]["hits"][-1]["_id"]

model_group_id

[POST] /_plugins/_ml/model_groups/_register
[POST] /_plugins/_ml/model_groups/_search


'PgrZl5AB0l4keGHcXGiw'

In [8]:
res = do_request("POST", f"/_plugins/_ml/models/_search", {
    "query": {
        "match": {
            "name": "remotemodel"
        }
    }
})

model_id = False
model_registered = False
deployed = False
if res["hits"]["hits"]:
    model_registered = True
    deployed = True
    model_id = res["hits"]["hits"][0]["_id"]

res["hits"]["hits"], model_registered

[POST] /_plugins/_ml/models/_search


([], False)

In [12]:
if True:  # delete deployed "remotemodel" models
    for item in res["hits"]["hits"]:
        undeploy_res = do_request("POST", "/_plugins/_ml/models/" + item["_id"] + "/_undeploy")
        print(undeploy_res)
        delete_res = do_request("DELETE", "/_plugins/_ml/models/" + item["_id"])
        print(delete_res)

In [13]:
# Register external model

if not model_registered:
  register_res = do_request("POST", "/_plugins/_ml/models/_register", body={
    "name": "remotemodel",
    "function_name": "remote",
    "model_group_id": model_group_id,
    "description": "test model",
    "connector_id": connector_id
  })
  model_registered = True
  model_id = register_res["model_id"]

  print(register_res)
else:
  print("model already in cluster")

[POST] /_plugins/_ml/models/_register
{'task_id': 'O8MvnJABV_DL0Puiawhi', 'status': 'CREATED', 'model_id': 'PMMvnJABV_DL0Puiawh4'}


### Note

In my case, the model is auto-deployed, by default remote models are auto deployed unless we update the respectively cluster setting.

In [16]:
predict_res = do_request("POST", "/_plugins/_ml/models/" + model_id + "/_predict", 
{
  "parameters": {
    "input": ["foo", "bar"]
  }
})

predict_res

[POST] /_plugins/_ml/models/PMMvnJABV_DL0Puiawh4/_predict


TransportError: TransportError(500, 'null_pointer_exception', 'host must not be null.')