# Introduction

* Connect to Elasticsearch
* Download and deploy embedding model to Elasticsearch
* Create Pipeline and Index

These notebooks were tested with Python 3.11.9

In [1]:
import time
from elasticsearch import Elasticsearch, helpers, exceptions
import csv
import pandas as pd
import os

In [2]:
# credential file
df = pd.read_csv('./elastic credential/watsonx_discovery_credential_tzwx24.csv') #change the path as per your need

In [3]:
# setting up the credential
ELASTIC_USER = df.iloc[0]['username']
ELASTIC_PW = df.iloc[0]['password']
ELASTIC_HOST = df.iloc[0]['watsonx_discovery_url'] # WxD Endpoint
ELASTIC_PORT = df.iloc[0]['port']  #  port number
ELASTIC_CERT_FILE = "./elastic_certificate/es.cert" # path containing certiifcation
INGEST_PIPELINE_NAME="e5-pipeline"
INDEX_NAME_DOC="index-e5"
MODEL_ID=".multilingual-e5-small"
MODEL_ID_E5 = MODEL_ID

In [4]:
client = Elasticsearch(
     ELASTIC_HOST+':'+str(ELASTIC_PORT),  # Elasticsearch endpoint
     basic_auth=(ELASTIC_USER, ELASTIC_PW),
     ca_certs=ELASTIC_CERT_FILE,
     verify_certs=True,
     request_timeout=120
)

In [None]:
print(client.info())

In [None]:
# delete model if already downloaded and deployed
try:
  client.ml.delete_trained_model(model_id=MODEL_ID_E5,force=True)
  print("Model deleted successfully, We will proceed with creating one")
except exceptions.NotFoundError:
  print("Model doesn't exist, but We will proceed with creating one")

# Creates the embedding model configuration. Automatically downloads the model if it doesn't exist. 
client.ml.put_trained_model(
    model_id=MODEL_ID_E5,
    input={
      "field_names": ["text_field"]
    }
  )


In [7]:
MODEL_ID_ELSER = '.elser_model_2'
MODEL_ID_e5_small = '.multilingual-e5-small'

model_to_delete = MODEL_ID_E5

client.ml.stop_trained_model_deployment(
  model_id=model_to_delete,
 # number_of_allocations=2, # it can increase depending on volume of data
 # threads_per_allocation=2,
 # wait_for="starting"
)

ObjectApiResponse({'stopped': True})

In [8]:
while True:
    status = client.ml.get_trained_models(
        model_id=MODEL_ID_E5,
        include="definition_status"
    )
    
    if (status["trained_model_configs"][0]["fully_defined"]):
        print("E5 Model is downloaded and ready to be deployed.")
        break
    else:
        print("E5 Model is downloaded but not ready to be deployed.")
    time.sleep(5)

E5 Model is downloaded but not ready to be deployed.
E5 Model is downloaded but not ready to be deployed.
E5 Model is downloaded but not ready to be deployed.
E5 Model is downloaded but not ready to be deployed.
E5 Model is downloaded but not ready to be deployed.
E5 Model is downloaded but not ready to be deployed.
E5 Model is downloaded but not ready to be deployed.
E5 Model is downloaded but not ready to be deployed.
E5 Model is downloaded but not ready to be deployed.
E5 Model is downloaded but not ready to be deployed.
E5 Model is downloaded but not ready to be deployed.
E5 Model is downloaded but not ready to be deployed.
E5 Model is downloaded but not ready to be deployed.
E5 Model is downloaded but not ready to be deployed.
E5 Model is downloaded but not ready to be deployed.
E5 Model is downloaded but not ready to be deployed.
E5 Model is downloaded but not ready to be deployed.
E5 Model is downloaded but not ready to be deployed.
E5 Model is downloaded but not ready to be dep

In [9]:
# Start trained model deployment if not already deployed

client.ml.start_trained_model_deployment(
  model_id=MODEL_ID_E5,
  number_of_allocations=2, # it can increase depending on volume of data
  threads_per_allocation=2,
  wait_for="starting"
)

while True:
  status = client.ml.get_trained_models_stats(
    model_id=MODEL_ID_E5,
  )
  if (status["trained_model_stats"][0]["deployment_stats"]["state"] == "started"):
    print("E5 Model has been successfully deployed.")
    break
  else:
    print("E5 Model is currently being deployed.")
  time.sleep(5)

E5 Model is currently being deployed.
E5 Model is currently being deployed.
E5 Model is currently being deployed.
E5 Model is currently being deployed.
E5 Model is currently being deployed.
E5 Model has been successfully deployed.


# Create Ingest Pipeline

* In the definition below, a field from our index is mapped to the expected input parameter for the embedding model.
* In this case, the input is "ErrorMessage" and the E5 model expects it to be mapped to "text_field".
* The output of the E5 model (the embedding tokens) will be written to the target field "ml".
* The "inference_config" section configures the pipeline to perform a text_expansion using the model_id. The resulting text expansion is a list named "tokens" stored to the target_field.

In [10]:
INGEST_PIPELINE_NAME_E5 = INGEST_PIPELINE_NAME
client.ingest.put_pipeline(
    id=INGEST_PIPELINE_NAME_E5, 
    description="Ingest pipeline for E5",
    processors=[
    {
      "inference": {
        "model_id": MODEL_ID_E5,
        "target_field": "passage_embedding",
        "field_map": { 
          "text": "text_field"
        }
      }
    }
  ], on_failure= [
    {
      "set": {
        "description": "Index document to '\''failed-<index>'\''",
        "field": "_index",
        "value": "failed-{{{_index}}}"
      }
    },
    {
      "set": {
        "description": "Set error message",
        "field": "ingest.failure",
        "value": "{{_ingest.on_failure_message}}"
      }
    }
  ]
)



ObjectApiResponse({'acknowledged': True})

# Create index
To use the ELSER model at index time, we'll need to create an index mapping that supports a text_expansion query.

ELSER output must be ingested into a field with the rank_features field type.

In [11]:
INDEX_NAME_E5 = INDEX_NAME_DOC
client.indices.delete(index=INDEX_NAME_E5, ignore_unavailable=True)

ObjectApiResponse({'acknowledged': True})

In [42]:
#Additional field mappings can be added in the index, or allow ES to automatically define the field types as data is loaded into the index
#client.indices.create(
#  index=INDEX_NAME_E5,
#  mappings={
#    "properties": {
#      "title": { "type": "keyword" },
#     "text": { "type": "text" },
#      "passage_embedding.predicted_value": {
#            "type": "dense_vector",
#            "dims": 384,
#            "index": "true",
#            "similarity": "cosine"
#        }
#    },
#    "_source": {
#        "includes": [
#             "title",
#            "text",
#            "url",
#            "passage_embedding.predicted_value"
#        ]
#    }
#  }
#)

ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'sbm-e5-ar-en-saudi-tourism-v2-embedding'})

In [47]:
# Delete index if already exists

client.indices.delete(index=INDEX_NAME_E5, ignore=[400, 404])

  client.indices.delete(index=INDEX_NAME_E5, ignore=[400, 404])


ObjectApiResponse({'acknowledged': True})

In [12]:
#Additional field mappings can be added in the index, or allow ES to automatically define the field types as data is loaded into the index
client.indices.create(
  index=INDEX_NAME_E5,
  mappings={
    "properties": {
      "title": { "type": "keyword" },
      "text": { "type": "text" },
      "url": { "type": "keyword" },  # Ensure the url field is defined if needed
      'chunk_num': { "type": "text" },
      "passage_embedding.predicted_value": {
            "type": "dense_vector",
            "dims": 384,
            "index": "true",
            "similarity": "cosine"
        }
    },
    "_source": {
        "includes": [
            "chunk_num"
            "title",
            "text",
            "url",
            "passage_embedding.predicted_value"
        ]
    }
  }
)


ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'indra-e5'})