# Connect to Elasticsearch

In [2]:
from elasticsearch import Elasticsearch
from dotenv import load_dotenv
import os
from elasticsearch import Elasticsearch

load_dotenv()
 
elastic_user=os.getenv('ES_USER')
elastic_password=os.getenv('ES_PASSWORD')
elastic_endpoint=os.getenv("ES_ENDPOINT")

url = f"https://{elastic_user}:{elastic_password}@{elastic_endpoint}:9200"
client = Elasticsearch(url, ca_certs = "./http_ca.crt", verify_certs = True)
 
print(client.info())

{'name': 'liuxgm.local', 'cluster_name': 'elasticsearch', 'cluster_uuid': 'h2QwONxsT4Kt-lTRKmPrhg', 'version': {'number': '8.12.0', 'build_flavor': 'default', 'build_type': 'tar', 'build_hash': '1665f706fd9354802c02146c1e6b5c0fbcddfbc9', 'build_date': '2024-01-11T10:05:27.953830042Z', 'build_snapshot': False, 'lucene_version': '9.9.1', 'minimum_wire_compatibility_version': '7.17.0', 'minimum_index_compatibility_version': '7.0.0'}, 'tagline': 'You Know, for Search'}


# Prepare for dataset

In [8]:
from langchain.document_loaders import JSONLoader 

def metadata_func(record: dict, metadata: dict) -> dict:
    metadata["name"] = record.get("name")
    metadata["summary"] = record.get("summary")
    metadata["url"] = record.get("url")
    metadata["category"] = record.get("category")
    metadata["updated_at"] = record.get("updated_at")

    return metadata

# For more loaders https://python.langchain.com/docs/modules/data_connection/document_loaders/
# And 3rd party loaders https://python.langchain.com/docs/modules/data_connection/document_loaders/#third-party-loaders
loader = JSONLoader(
    file_path="workplace-docs.json",
    jq_schema=".[]",
    content_key="content",
    metadata_func=metadata_func,
)

# Load Model from huggingface

In [9]:
MODEL_ID = "sentence-transformers__all-minilm-l6-v2"

!eland_import_hub_model --url https://elastic:xnLj56lTrH98Lf_6n76y@localhost:9200 \
	--hub-model-id sentence-transformers/all-MiniLM-L6-v2 \
	--task-type text_embedding \
	--ca-cert ./http_ca.crt \
	--clear-previous \
	--start

2024-02-07 15:47:51,334 INFO : Establishing connection to Elasticsearch
2024-02-07 15:47:51,356 INFO : Connected to cluster named 'elasticsearch' (version: 8.12.0)
2024-02-07 15:47:51,356 INFO : Loading HuggingFace transformer tokenizer and model 'sentence-transformers/all-MiniLM-L6-v2'
STAGE:2024-02-07 15:48:57 34543:2762445 ActivityProfilerController.cpp:312] Completed Stage: Warm Up
STAGE:2024-02-07 15:48:57 34543:2762445 ActivityProfilerController.cpp:318] Completed Stage: Collection
STAGE:2024-02-07 15:48:57 34543:2762445 ActivityProfilerController.cpp:322] Completed Stage: Post Processing
2024-02-07 15:48:58,423 INFO : Creating model with id 'sentence-transformers__all-minilm-l6-v2'
2024-02-07 15:48:58,488 INFO : Uploading model definition
100%|███████████████████████████████████████| 87/87 [00:03<00:00, 24.11 parts/s]
2024-02-07 15:49:02,159 INFO : Uploading model vocabulary
2024-02-07 15:49:02,203 INFO : Starting model deployment
2024-02-07 15:49:03,339 INFO : Model successfull

# Setting up our Elasticsearch Index

In [10]:
PIPELINE_ID = "chunk_text_to_passages"
MODEL_DIMS = 384
INDEX_NAME = "nb_parent_retriever_index"

# Create the pipeline
client.ingest.put_pipeline(
  id=PIPELINE_ID, 
  processors=[
    {
      "foreach": {
        "field": "passages",
        "processor": {
          "inference": {
            "field_map": {
              "_ingest._value.text": "text_field"
            },
            "model_id": MODEL_ID,
            "target_field": "_ingest._value.vector",
            "on_failure": [
              {
                "append": {
                  "field": "_source._ingest.inference_errors",
                  "value": [
                    {
                      "message": "Processor 'inference' in pipeline 'ml-inference-title-vector' failed with message '{{ _ingest.on_failure_message }}'",
                      "pipeline": "ml-inference-title-vector",
                      "timestamp": "{{{ _ingest.timestamp }}}"
                    }
                  ]
                }
              }
            ]
          }
        }
      }
    }
  ]
)

# Create the index
client.indices.create( 
  index=INDEX_NAME, 
  settings={
    "index": {
      "default_pipeline": PIPELINE_ID
    }
  },
  mappings={
    "dynamic": "true",
    "properties": {
      "passages": {
        "type": "nested",
        "properties": {
          "vector": {
            "properties": {
              "predicted_value": {
                "type": "dense_vector",
                "index": True,
                "dims": MODEL_DIMS,
                "similarity": "dot_product"
              }
            }
          }
        }
      }
    }
  }
)

ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'nb_parent_retriever_index'})

# Utils: Parent Child Splitter Function

In [11]:
from langchain.text_splitter import RecursiveCharacterTextSplitter

def parent_child_splitter(documents, chunk_size: int = 200):

  child_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size)

  docs = []
  for i, doc in enumerate(documents):
    passages = []

    for _doc in child_splitter.split_documents([doc]):
        passages.append({
            "text": _doc.page_content,
        })

    doc = {
        "content": doc.page_content,
        "metadata": doc.metadata,
        "passages": passages
    }
    docs.append(doc)
    
  return docs

# Utils: Pretty Response

In [12]:
def pretty_response(response, show_parent_text=False):
  if len(response['hits']['hits']) == 0:
      print('Your search returned no results.')
  else:
    for hit in response['hits']['hits']:
      id = hit['_id']
      score = hit['_score']
      doc_title = hit['_source']["metadata"]['name']
      parent_text = ""

      if show_parent_text:
          parent_text = hit['_source']["content"]

      passage_text = ""

      for passage in hit['inner_hits']['passages']['hits']['hits']:
          passage_text += passage["fields"]["passages"][0]['text'][0] + "\n\n"

      pretty_output = (f"\nID: {id}\nDoc Title: {doc_title}\nparent text:\n{parent_text}\nPassage Text:\n{passage_text}\nScore: {score}\n")
      print(pretty_output)
      print("---")

# Full Document, nested passages

In [13]:
from elasticsearch import helpers

chunked_docs = parent_child_splitter(loader.load(), chunk_size=600)

count, errors = helpers.bulk(
  client, 
  chunked_docs,
  index=INDEX_NAME
)

print(f"Indexed {count} documents with {errors} errors")

import time
time.sleep(5)

Indexed 15 documents with [] errors


# Perform a Nested Search

In [14]:
response = client.search(
  index=INDEX_NAME, 
  knn={
    "inner_hits": {
      "size": 1,
      "_source": False,
      "fields": [
        "passages.text"
      ]
    },
    "field": "passages.vector.predicted_value",
    "k": 5,
    "num_candidates": 100,
    "query_vector_builder": {
      "text_embedding": {
        "model_id": "sentence-transformers__all-minilm-l6-v2",
        "model_text": "Whats the work from home policy?"
      }
    }
  }
)

pretty_response(response)


ID: pLOcgo0B8HkRDilJp-Mp
Doc Title: Work From Home Policy
parent text:

Passage Text:
Effective: March 2020
Purpose

The purpose of this full-time work-from-home policy is to provide guidelines and support for employees to conduct their work remotely, ensuring the continuity and productivity of business operations during the COVID-19 pandemic and beyond.
Scope

This policy applies to all employees who are eligible for remote work as determined by their role and responsibilities. It is designed to allow employees to work from home full time while maintaining the same level of performance and collaboration as they would in the office.
Eligibility


Score: 0.8483097

---

ID: q7Ocgo0B8HkRDilJp-Mp
Doc Title: Intellectual Property Policy
parent text:

Passage Text:
Purpose
The purpose of this Intellectual Property Policy is to establish guidelines and procedures for the ownership, protection, and utilization of intellectual property generated by employees during their employment. This poli

# With Langchain

In [15]:
from langchain.vectorstores.elasticsearch import ElasticsearchStore, ApproxRetrievalStrategy
from typing import List, Union
from langchain_core.documents import Document

class CustomRetrievalStrategy(ApproxRetrievalStrategy):

    def query(
      self,
      query: Union[str, None],
      filter: List[dict],
      **kwargs,
    ):
                
      es_query = {
        "knn": {
          "inner_hits": {
              "_source": False,
              "fields": [
                  "passages.text"
              ]
          },
          "field": "passages.vector.predicted_value",
          "filter": filter,
          "k": 5,
          "num_candidates": 100,
          "query_vector_builder": {
            "text_embedding": {
              "model_id": "sentence-transformers__all-minilm-l6-v2",
              "model_text": query
            }
          }
        }
      }

      return es_query
    

vector_store = ElasticsearchStore(
    index_name=INDEX_NAME,
    es_connection=client,
    query_field="content",
    strategy=CustomRetrievalStrategy(),
)

def doc_builder(hit):
  passage_hits = hit.get("inner_hits", {}).get("passages", {}).get("hits", {}).get("hits", [])
  page_content = ""
  for passage_hit in passage_hits:
    passage_fields = passage_hit.get("fields", {}).get("passages", [])[0]
    page_content += passage_fields.get("text", [])[0] + "\n\n"

    return Document(
      page_content=page_content,
      metadata=hit["_source"]["metadata"],
    )

results = vector_store.similarity_search(query="Whats the work from home policy?", doc_builder=doc_builder)
for result in results:
    print(f'Doc title: {result.metadata["name"]}')
    print(f'Text:\n{result.page_content}')

Doc title: Work From Home Policy
Text:
Effective: March 2020
Purpose

The purpose of this full-time work-from-home policy is to provide guidelines and support for employees to conduct their work remotely, ensuring the continuity and productivity of business operations during the COVID-19 pandemic and beyond.
Scope

This policy applies to all employees who are eligible for remote work as determined by their role and responsibilities. It is designed to allow employees to work from home full time while maintaining the same level of performance and collaboration as they would in the office.
Eligibility


Doc title: Intellectual Property Policy
Text:
Purpose
The purpose of this Intellectual Property Policy is to establish guidelines and procedures for the ownership, protection, and utilization of intellectual property generated by employees during their employment. This policy aims to encourage creativity and innovation while ensuring that the interests of both the company and its employees