In [1]:
from elasticsearch import Elasticsearch

CHUNK_SIZE = 400
INDEX_NAME = "chunk_passages_example"
MODEL_ID = "sentence-transformers/all-MiniLM-L6-v2"
MODEL_ID_ES = "sentence-transformers__all-minilm-l6-v2"
MODEL_DIM = 384
MODEL_SIMILARITY = "dot_product"

ES_HOST = "https://localhost:9200/"
ES_PASS = "y5AADXZR0l63CvTz1AsWznNiAM1Ukq7KSd3MEra"
COHERE_API_KEY = "9DUothnkQyEhX9NW7Jr5lr7XsugovOuzYhptkMai"

In [2]:
# Create the client instance
client = Elasticsearch(
    # For local development
    hosts=[ES_HOST],
    basic_auth=('elastic', ES_PASS), 
    verify_certs=False
)
print(client.info())

{'name': 'es01', 'cluster_name': 'docker-cluster', 'cluster_uuid': '8KpPKPbgQKiA5VpXnNoX3Q', 'version': {'number': '8.13.0', 'build_flavor': 'default', 'build_type': 'docker', 'build_hash': '09df99393193b2c53d92899662a8b8b3c55b45cd', 'build_date': '2024-03-22T03:35:46.757803203Z', 'build_snapshot': False, 'lucene_version': '9.10.0', 'minimum_wire_compatibility_version': '7.17.0', 'minimum_index_compatibility_version': '7.0.0'}, 'tagline': 'You Know, for Search'}


  _transport = transport_class(


In [3]:
!eland_import_hub_model \
    -u elastic -p $ES_PASS \
    --url $ES_HOST \
    --hub-model-id $MODEL_ID \
    --task-type text_embedding \
    --insecure \
    --clear-previous \
    --start

2024-05-01 19:31:52,423 INFO : Establishing connection to Elasticsearch
  _transport = transport_class(
2024-05-01 19:31:52,439 INFO : Connected to cluster named 'docker-cluster' (version: 8.13.0)
2024-05-01 19:31:52,439 INFO : Loading HuggingFace transformer tokenizer and model 'sentence-transformers/all-MiniLM-L6-v2'
INFO:2024-05-01 19:31:59 30881:30881 init.cpp:158] If you see CUPTI_ERROR_INSUFFICIENT_PRIVILEGES, refer to https://developer.nvidia.com/nvidia-development-tools-solutions-err-nvgpuctrperm-cupti
STAGE:2024-05-01 19:31:59 30881:30881 ActivityProfilerController.cpp:312] Completed Stage: Warm Up
STAGE:2024-05-01 19:31:59 30881:30881 ActivityProfilerController.cpp:318] Completed Stage: Collection
STAGE:2024-05-01 19:31:59 30881:30881 ActivityProfilerController.cpp:322] Completed Stage: Post Processing
2024-05-01 19:32:01,691 INFO : Stopping deployment for model with id 'sentence-transformers__all-minilm-l6-v2'
2024-05-01 19:32:01,883 INFO : Deleting model with id 'sentence-t

In [4]:
# Setup the pipeline
client.ingest.put_pipeline(
    id="chunk_text_to_passages",
    processors=[
        {
            "script": {
                "description": "Chunk body_content into sentences by looking for . followed by a space",
                "lang": "painless",
                "source": """
          String[] envSplit = /((?<!M(r|s|rs)\.)(?<=\.) |(?<=\!) |(?<=\?) )/.split(ctx['text']);
          ctx['passages'] = new ArrayList();
          int i = 0;
          boolean remaining = true;
          if (envSplit.length == 0) {
            return
          } else if (envSplit.length == 1) {
            Map passage = ['text': envSplit[0]];ctx['passages'].add(passage)
          } else {
            while (remaining) {
              Map passage = ['text': envSplit[i++]];
              while (i < envSplit.length && passage.text.length() + envSplit[i].length() < params.model_limit) {passage.text = passage.text + ' ' + envSplit[i++]}
              if (i == envSplit.length) {remaining = false}
              ctx['passages'].add(passage)
            }
          }
          """,
                "params": {"model_limit": CHUNK_SIZE},
            }
        },
        {
            "foreach": {
                "field": "passages",
                "processor": {
                    "inference": {
                        "model_id": MODEL_ID_ES,
                        "input_output": [
                            { 
                                "input_field": "_ingest._value.text",
                                "output_field": "_ingest._value.vector.predicted_value"
                            }
                        ],
                        "on_failure": [
                            {
                                "append": {
                                    "field": "_source._ingest.inference_errors",
                                    "value": [
                                        {
                                            "message": "Processor 'inference' in pipeline 'ml-inference-title-vector' failed with message '{{ _ingest.on_failure_message }}'",
                                            "pipeline": "ml-inference-title-vector",
                                            "timestamp": "{{{ _ingest.timestamp }}}",
                                        }
                                    ],
                                }
                            }
                        ],
                    }
                },
            }
        },
    ],
)



ObjectApiResponse({'acknowledged': True})

In [5]:
client.indices.delete(index=INDEX_NAME, ignore_unavailable=True)

# Setup the index
client.indices.create(
    index=INDEX_NAME,
    settings={"index": {"default_pipeline": "chunk_text_to_passages"}},
    mappings={
        "dynamic": "true",
        "properties": {
            "passages": {
                "type": "nested",
                "properties": {
                    "vector": {
                        "properties": {
                            "predicted_value": {
                                "type": "dense_vector",
                                "index": True,
                                "dims": MODEL_DIM,
                                "similarity": MODEL_SIMILARITY,
                            }
                        }
                    }
                },
            }
        },
    },
)



ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'chunk_passages_example'})

In [6]:
import json
from urllib.request import urlopen
from elasticsearch import helpers

url = "https://raw.githubusercontent.com/elastic/elasticsearch-labs/main/datasets/workplace-documents.json"
docs = json.loads(urlopen(url).read())

operations = [
    {"_index": INDEX_NAME, "_id": i, "text": doc["content"], "name": doc["name"]}
    for i, doc in enumerate(docs)
]

# Add the documents to the index directly
response = helpers.bulk(
    client,
    operations,
    refresh=True,
    request_timeout=60*10,
)

  response = helpers.bulk(


In [7]:
def pretty_response(response):
    if len(response["hits"]["hits"]) == 0:
        print("Your search returned no results.")
    else:
        for hit in response["hits"]["hits"]:
            id = hit["_id"]
            score = hit["_score"]
            doc_title = hit["_source"]["name"]
            passage_text = ""

            for passage in hit["inner_hits"]["passages"]["hits"]["hits"]:
                passage_text += passage["fields"]["passages"][0]["text"][0] + "\n\n"

            pretty_output = f"\nID: {id}\nDoc Title: {doc_title}\nPassage Text:\n{passage_text}\nScore: {score}\n"
            print(pretty_output)
            print("---")

In [8]:
response = client.search(
    index=INDEX_NAME,
    knn={
        "inner_hits": {"size": 1, "_source": False, "fields": ["passages.text"]},
        "field": "passages.vector.predicted_value",
        "k": 3,
        "num_candidates": 100,
        "query_vector_builder": {
            "text_embedding": {
                "model_id": MODEL_ID_ES,
                "model_text": "Whats the work from home policy?",
            }
        },
    },
)

pretty_response(response)


ID: 0
Doc Title: Work From Home Policy
Passage Text:
Effective: March 2020
Purpose

The purpose of this full-time work-from-home policy is to provide guidelines and support for employees to conduct their work remotely, ensuring the continuity and productivity of business operations during the COVID-19 pandemic and beyond.
Scope

This policy applies to all employees who are eligible for remote work as determined by their role and responsibilities.


Score: 0.854961

---

ID: 7
Doc Title: Intellectual Property Policy
Passage Text:
This policy aims to encourage creativity and innovation while ensuring that the interests of both the company and its employees are protected.

Scope
This policy applies to all employees, including full-time, part-time, temporary, and contract employees.

Definitions
a.


Score: 0.7664343

---

ID: 4
Doc Title: Company Vacation Policy
Passage Text:
Purpose

The purpose of this vacation policy is to outline the guidelines and procedures for requesting and takin

