# Deploy ML Model in OSS

In [22]:
import os
import logging

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

In [23]:
import requests
import json

def make_requests(protocol, host, port, path, method, body=None, header=None):
    try:
        response = None
        url = f"{protocol}://{host}:{port}{path}"
        if method == 'GET':
            response = requests.get(url, headers=header)
        if method == 'POST':
            response = requests.post(url, data=json.dumps(body), headers=header)
        if method == 'PUT':
            response = requests.put(url, data=json.dumps(body), headers=header)
        if method == 'DELETE':
            response = requests.delete(url, headers=header)
        return response
    except Exception as ex:
        logger.error(f"exception: {str(ex)}")
        raise ex

In [24]:
protocol = 'http'
host = 'localhost'
port = 9200

header = {
    'Content-Type': 'application/json'
}

# Arguments

In [25]:
MODEL_GROUP = "proof_of_concepts_model_group"
MODEL_GROUP_DESCRIPTION = "proof_of_concepts_model_group"
MODELS = [
    {
        "MODEL_NAME":"huggingface/sentence-transformers/msmarco-distilbert-base-tas-b",
        "MODEL_VERSION":"1.0.2",
        "MODEL_FORMAT":"TORCH_SCRIPT",
        "MODEL_TYPE": "dense"
    },
    {
        "MODEL_NAME":"amazon/neural-sparse/opensearch-neural-sparse-encoding-v1",
        "MODEL_VERSION":"1.0.1",
        "MODEL_FORMAT":"TORCH_SCRIPT",
        "MODEL_TYPE": "sparse"
    }
]

PIPELINE_NAME = "proof_of_concept_pipeline"
FIELD_MAP = {
    "huggingface/sentence-transformers/msmarco-distilbert-base-tas-b": "bert_embeddings",
    "amazon/neural-sparse/opensearch-neural-sparse-encoding-v1": "oss_sparse_embeddings"
}

# Set Cluster Settings

In [26]:
SET_CLUSTER_SETTINGS_PATH = "/_cluster/settings"
SET_CLUSTER_SETTINGS_BODY = {
  "persistent": {
    "plugins": {
      "ml_commons": {
        "only_run_on_ml_node": "false",
        "model_access_control_enabled": "true",
        "native_memory_threshold": "99"
      }
    }
  },
  "transient": {
    "plugins.ml_commons.model_access_control_enabled": "true"
  }
}

response = make_requests(protocol, host, port, SET_CLUSTER_SETTINGS_PATH, 'PUT', body=SET_CLUSTER_SETTINGS_BODY, header=header)
logger.info(f"SET_CLUSTER_SETTINGS response: {response.status_code} body: {response.json()}")

INFO:__main__:SET_CLUSTER_SETTINGS response: 200 body: {'acknowledged': True, 'persistent': {'plugins': {'ml_commons': {'only_run_on_ml_node': 'false', 'model_access_control_enabled': 'true', 'native_memory_threshold': '99'}}}, 'transient': {'plugins': {'ml_commons': {'model_access_control_enabled': 'true'}}}}


# Register Model Group

In [27]:
REGISTER_MODEL_GROUP_PATH = "/_plugins/_ml/model_groups/_register"
REGISTER_MODEL_GROUP_BODY = {
  "name": MODEL_GROUP,
  "description": MODEL_GROUP_DESCRIPTION
}

response = make_requests(protocol, host, port, REGISTER_MODEL_GROUP_PATH, 'POST', body=REGISTER_MODEL_GROUP_BODY, header=header)
logger.info(f"REGISTER_MODEL_GROUP response: {response.status_code} body: {response.json()}")
MODEL_GROUP_ID = response.json()['model_group_id']

INFO:__main__:REGISTER_MODEL_GROUP response: 200 body: {'model_group_id': 'ZsBUIZABJGEOAwdvIoqf', 'status': 'CREATED'}


In [28]:
TEST_MODEL_GROUP_PATH = "/_plugins/_ml/model_groups/_search"
TEST_MODEL_GROUP_BODY = {
  "query": {
    "match": {
      "_id": MODEL_GROUP_ID
    }
  }
}
response = make_requests(protocol, host, port, TEST_MODEL_GROUP_PATH, 'POST', body=TEST_MODEL_GROUP_BODY, header=header)
logger.info(f"TEST_MODEL_GROUP response: {response.status_code} body: {response.json()}")

INFO:__main__:TEST_MODEL_GROUP response: 200 body: {'took': 46, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 1, 'relation': 'eq'}, 'max_score': 1.0, 'hits': [{'_index': '.plugins-ml-model-group', '_id': 'ZsBUIZABJGEOAwdvIoqf', '_version': 1, '_seq_no': 16, '_primary_term': 3, '_score': 1.0, '_source': {'created_time': 1718546080397, 'access': 'public', 'latest_version': 0, 'last_updated_time': 1718546080397, 'name': 'proof_of_concepts_model_group', 'description': 'proof_of_concepts_model_group'}}]}}


# Register Model

In [29]:
MODEL_TASK_IDS = []

for m in MODELS:
    REGISTER_MODEL_PATH = "/_plugins/_ml/models/_register"
    REGISTER_MODEL_BODY = {
      "name": m['MODEL_NAME'],
      "version": m['MODEL_VERSION'],
      "model_group_id": MODEL_GROUP_ID,
      "model_format": m['MODEL_FORMAT']
    }
    
    response = make_requests(protocol, host, port, REGISTER_MODEL_PATH, 'POST', body=REGISTER_MODEL_BODY, header=header)
    logger.info(f"REGISTER_MODEL response: {response.status_code} body: {response.json()}")
    MODEL_TASK_IDS.append(response.json()['task_id'])

logger.info(f"task ids {MODEL_TASK_IDS}")

INFO:__main__:REGISTER_MODEL response: 200 body: {'task_id': 'Z8BUIZABJGEOAwdvRoqM', 'status': 'CREATED'}
INFO:__main__:REGISTER_MODEL response: 200 body: {'task_id': 'aMBUIZABJGEOAwdvR4oZ', 'status': 'CREATED'}
INFO:__main__:task ids ['Z8BUIZABJGEOAwdvRoqM', 'aMBUIZABJGEOAwdvR4oZ']


In [32]:
MODEL_IDS = []
MODEL_ID_MAP = {}
for idx, t in enumerate(MODEL_TASK_IDS):
    TEST_MODEL_PATH = f"/_plugins/_ml/tasks/{t}"
    response = make_requests(protocol, host, port, TEST_MODEL_PATH, 'GET', header=header)
    logger.info(f"TEST_MODEL response: {response.status_code} body: {response.json()}")
    data = response.json()
    if 'state' in data and data['state'] == 'COMPLETED':
        MODEL_ID_MAP[data['model_id']] = MODELS[idx]['MODEL_NAME']
        MODEL_IDS.append(data['model_id'])
logger.info(f"model ids {MODEL_IDS}")

INFO:__main__:TEST_MODEL response: 200 body: {'model_id': 'acBUIZABJGEOAwdvUors', 'task_type': 'REGISTER_MODEL', 'function_name': 'TEXT_EMBEDDING', 'state': 'COMPLETED', 'worker_node': ['kRpi4hWvRbS-9fJxZ85iEQ'], 'create_time': 1718546089608, 'last_update_time': 1718546185827, 'is_async': True}
INFO:__main__:TEST_MODEL response: 200 body: {'model_id': 'asBUIZABJGEOAwdvU4oe', 'task_type': 'REGISTER_MODEL', 'function_name': 'SPARSE_ENCODING', 'state': 'COMPLETED', 'worker_node': ['kRpi4hWvRbS-9fJxZ85iEQ'], 'create_time': 1718546089752, 'last_update_time': 1718546207223, 'is_async': True}
INFO:__main__:model ids ['acBUIZABJGEOAwdvUors', 'asBUIZABJGEOAwdvU4oe']


# Deploy Models

In [33]:
MODEL_DEPLOY_TASK_IDS = []

for m in MODEL_IDS:
    DEPLOY_MODEL_PATH = f"/_plugins/_ml/models/{m}/_deploy"
    response = make_requests(protocol, host, port, DEPLOY_MODEL_PATH, 'POST', body={}, header=header)
    logger.info(f"DEPLOY_MODEL response: {response.status_code} body: {response.json()}")
    MODEL_DEPLOY_TASK_IDS.append(response.json()['task_id'])

logger.info(f"task ids {MODEL_DEPLOY_TASK_IDS}")

INFO:__main__:DEPLOY_MODEL response: 200 body: {'task_id': 'a8BeIZABJGEOAwdvzIpA', 'task_type': 'DEPLOY_MODEL', 'status': 'CREATED'}
INFO:__main__:DEPLOY_MODEL response: 200 body: {'task_id': 'bMBeIZABJGEOAwdvzIrH', 'task_type': 'DEPLOY_MODEL', 'status': 'CREATED'}
INFO:__main__:task ids ['a8BeIZABJGEOAwdvzIpA', 'bMBeIZABJGEOAwdvzIrH']


In [37]:
for t in MODEL_DEPLOY_TASK_IDS:
    TEST_MODEL_DEPLOY_TASK_PATH = f"/_plugins/_ml/tasks/{t}"
    response = make_requests(protocol, host, port, TEST_MODEL_DEPLOY_TASK_PATH, 'GET', header=header)
    logger.info(f"TEST_MODEL response: {response.status_code} body: {response.json()}")
    data = response.json()
    if 'state' in data and data['state'] == 'COMPLETED':
        logger.info(f"task id {t} is {data['state']}")

INFO:__main__:TEST_MODEL response: 200 body: {'model_id': 'acBUIZABJGEOAwdvUors', 'task_type': 'DEPLOY_MODEL', 'function_name': 'TEXT_EMBEDDING', 'state': 'COMPLETED', 'worker_node': ['kRpi4hWvRbS-9fJxZ85iEQ'], 'create_time': 1718546779180, 'last_update_time': 1718546812902, 'is_async': True}
INFO:__main__:task id a8BeIZABJGEOAwdvzIpA is COMPLETED
INFO:__main__:TEST_MODEL response: 200 body: {'model_id': 'asBUIZABJGEOAwdvU4oe', 'task_type': 'DEPLOY_MODEL', 'function_name': 'SPARSE_ENCODING', 'state': 'COMPLETED', 'worker_node': ['kRpi4hWvRbS-9fJxZ85iEQ'], 'create_time': 1718546779334, 'last_update_time': 1718546830637, 'is_async': True}
INFO:__main__:task id bMBeIZABJGEOAwdvzIrH is COMPLETED


# Create Pipeline

In [38]:
processors = []
for i, m in enumerate(MODEL_IDS):
    if MODELS[i]['MODEL_TYPE'] == 'dense':
        processors.append({
            "text_embedding": {
                "model_id": m,
                "field_map": {
                  "text": FIELD_MAP[MODEL_ID_MAP[m]]
                }
          }
        })
    else:
        processors.append({
            "sparse_encoding": {
                "model_id": m,
                "field_map": {
                  "text": FIELD_MAP[MODEL_ID_MAP[m]]
                }
          }
        })
CREATE_PIPELINE_BODY = {
    "description": PIPELINE_NAME,
    "processors": processors
}
CREATE_PIPELINE_PATH = F"/_ingest/pipeline/{PIPELINE_NAME}"
response = make_requests(protocol, host, port, CREATE_PIPELINE_PATH, 'PUT', body=CREATE_PIPELINE_BODY, header=header)
logger.info(f"CREATE_PIPELINE response: {response.status_code} body: {response.json()}")

INFO:__main__:CREATE_PIPELINE response: 200 body: {'acknowledged': True}


In [39]:
GET_PIPELINE_PATH = F"/_ingest/pipeline"
response = make_requests(protocol, host, port, GET_PIPELINE_PATH, 'GET', header=header)
logger.info(f"GET_PIPELINE response: {response.status_code} body: {response.json()}")

INFO:__main__:GET_PIPELINE response: 200 body: {'proof_of_concept_pipeline': {'description': 'proof_of_concept_pipeline', 'processors': [{'text_embedding': {'model_id': 'acBUIZABJGEOAwdvUors', 'field_map': {'text': 'bert_embeddings'}}}, {'sparse_encoding': {'model_id': 'asBUIZABJGEOAwdvU4oe', 'field_map': {'text': 'oss_sparse_embeddings'}}}]}, 'msmarco-distilbert-base-tas-b-ingest-pipeline': {'description': 'msmarco-distilbert ingest pipeline-v2', 'processors': [{'text_embedding': {'model_id': 'g_06_o8B0WrWLUTBacDN', 'field_map': {'text': 'msmarco_distilbert_embedding'}}}]}}


# Cleanup

In [21]:
import time
"""
Delete ingest pipeline
"""
response = make_requests(protocol, host, port, f"/_search/pipeline/{PIPELINE_NAME}", 'DELETE', header=header)
logger.info(f"response: {response.status_code} body: {response.json()}")

"""
Undeploy models
"""
for m in MODEL_IDS:
    response = make_requests(protocol, host, port, f"/_plugins/_ml/models/{m}/_undeploy", 'POST', body={}, header=header)
    logger.info(f"response: {response.status_code} body: {response.json()}")
    time.sleep(120)
    response = make_requests(protocol, host, port, f"/_plugins/_ml/models/{m}", 'DELETE', header=header)
    logger.info(f"response: {response.status_code} body: {response.json()}")

"""
Delete model Group
"""
response = make_requests(protocol, host, port, f"/_plugins/_ml/model_groups/{MODEL_GROUP_ID}", 'DELETE', header=header)
logger.info(f"response: {response.status_code} body: {response.json()}")

INFO:__main__:response: 200 body: {'acknowledged': True}
INFO:__main__:response: 200 body: {'kRpi4hWvRbS-9fJxZ85iEQ': {'stats': {'YsBEIZABJGEOAwdv7YrN': 'undeployed'}}}
INFO:__main__:response: 200 body: {'_index': '.plugins-ml-model', '_id': 'YsBEIZABJGEOAwdv7YrN', '_version': 6, 'result': 'deleted', 'forced_refresh': True, '_shards': {'total': 1, 'successful': 1, 'failed': 0}, '_seq_no': 315, '_primary_term': 3}
INFO:__main__:response: 200 body: {'kRpi4hWvRbS-9fJxZ85iEQ': {'stats': {'Y8BEIZABJGEOAwdv7ooy': 'undeployed'}}}
INFO:__main__:response: 200 body: {'_index': '.plugins-ml-model', '_id': 'Y8BEIZABJGEOAwdv7ooy', '_version': 7, 'result': 'deleted', 'forced_refresh': True, '_shards': {'total': 1, 'successful': 1, 'failed': 0}, '_seq_no': 344, '_primary_term': 3}
INFO:__main__:response: 200 body: {'_index': '.plugins-ml-model-group', '_id': 'X8BEIZABJGEOAwdvY4pw', '_version': 4, 'result': 'deleted', '_shards': {'total': 1, 'successful': 1, 'failed': 0}, '_seq_no': 15, '_primary_term