In [1]:
import os
import re
from elasticsearch import Elasticsearch, helpers
from getpass import getpass

os.environ["RABBITMQ_HOST"] = "localhost"

from celery_tasks import ingest_data

CHUNK_SIZE = 400
ES_CHUNK_SIZE = 50
INDEX_NAME = "es_french_revo_idx"
MODEL_ID = "BAAI/bge-large-zh-v1.5"
MODEL_ID_ES = "baai__bge-large-zh-v1.5"
MODEL_DIM = 1024
MODEL_SIMILARITY = "cosine"

ES_HOST = "https://localhost:9200/"
ES_PASS = "y5AADXZR0l63CvTz1AsWznNiAM1Ukq7KSd3MEra"
# ES_PASS = getpass("ElasticSearch Password: ")
# COHERE_API_KEY = getpass("Elastic Api Key: ")

  _transport = transport_class(


In [2]:
import torch
torch.cuda.get_device_name(0)

'NVIDIA GeForce RTX 2070 with Max-Q Design'

In [3]:
# Create the client instance
client = Elasticsearch(
    # For local development
    hosts=[ES_HOST],
    basic_auth=('elastic', ES_PASS), 
    verify_certs=False
)
print(client.info())

{'name': 'es01', 'cluster_name': 'docker-cluster', 'cluster_uuid': '2d2hAdh2RK2B7QCE92Kbdw', 'version': {'number': '8.13.0', 'build_flavor': 'default', 'build_type': 'docker', 'build_hash': '09df99393193b2c53d92899662a8b8b3c55b45cd', 'build_date': '2024-03-22T03:35:46.757803203Z', 'build_snapshot': False, 'lucene_version': '9.10.0', 'minimum_wire_compatibility_version': '7.17.0', 'minimum_index_compatibility_version': '7.0.0'}, 'tagline': 'You Know, for Search'}


  _transport = transport_class(


### Load Model from hugging face

The first thing you will need is a model to create the text embeddings out of the chunks, you can use whatever you would like, but this example will run end to end on the minilm-l6-v2 model. With an Elastic Cloud cluster created or another Elasticsearch cluster ready, we can upload the text embedding model using the eland library.

In [4]:
!eland_import_hub_model \
    -u elastic -p $ES_PASS \
    --url $ES_HOST \
    --hub-model-id $MODEL_ID \
    --task-type text_embedding \
    --insecure \
    --clear-previous \
    --start

2024-05-15 13:27:02,334 INFO : Establishing connection to Elasticsearch
  _transport = transport_class(
2024-05-15 13:27:02,366 INFO : Connected to cluster named 'docker-cluster' (version: 8.13.0)
2024-05-15 13:27:02,367 INFO : Loading HuggingFace transformer tokenizer and model 'BAAI/bge-large-zh-v1.5'
Asking to pad to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no padding.
STAGE:2024-05-15 13:27:13 11058:11058 ActivityProfilerController.cpp:312] Completed Stage: Warm Up
STAGE:2024-05-15 13:27:14 11058:11058 ActivityProfilerController.cpp:318] Completed Stage: Collection
STAGE:2024-05-15 13:27:14 11058:11058 ActivityProfilerController.cpp:322] Completed Stage: Post Processing
2024-05-15 13:27:21,422 INFO : Stopping deployment for model with id 'baai__bge-large-zh-v1.5'
2024-05-15 13:27:21,631 INFO : Deleting model with id 'baai__bge-large-zh-v1.5'
2024-05-15 13:27:29,236 INFO : Creating model with id 'baai__bge-large-zh-v1.5'

In [5]:
# Setup the pipeline
client.ingest.put_pipeline(
    id="chunk_text_to_passages",
    processors=[
        {
            "script": {
                "description": "Chunk body_content into sentences by looking for . followed by a space",
                "lang": "painless",
                "source": """
          String[] envSplit = /((?<=。) |(?<=！) |(?<=？) |(?<=…) )/.split(ctx['text']);
          ctx['passages'] = new ArrayList();
          int i = 0;
          boolean remaining = true;
          if (envSplit.length == 0) {
            return
          } else if (envSplit.length == 1) {
            Map passage = ['text': envSplit[0]];ctx['passages'].add(passage)
          } else {
            while (remaining) {
              Map passage = ['text': envSplit[i++]];
              while (i < envSplit.length && passage.text.length() + envSplit[i].length() < params.model_limit) {passage.text = passage.text + ' ' + envSplit[i++]}
              if (i == envSplit.length) {remaining = false}
              ctx['passages'].add(passage)
            }
          }
          """,
                "params": {"model_limit": CHUNK_SIZE},
            }
        },
        {
            "foreach": {
                "field": "passages",
                "processor": {
                    "inference": {
                        "model_id": MODEL_ID_ES,
                        "input_output": [
                            { 
                                "input_field": "_ingest._value.text",
                                "output_field": "_ingest._value.vector.predicted_value"
                            }
                        ],
                        "on_failure": [
                            {
                                "append": {
                                    "field": "_source._ingest.inference_errors",
                                    "value": [
                                        {
                                            "message": "Processor 'inference' in pipeline 'ml-inference-title-vector' failed with message '{{ _ingest.on_failure_message }}'",
                                            "pipeline": "ml-inference-title-vector",
                                            "timestamp": "{{{ _ingest.timestamp }}}",
                                        }
                                    ],
                                }
                            }
                        ],
                    }
                },
            }
        },
    ],
)



ObjectApiResponse({'acknowledged': True})

### Setup Index

Next step is to prepare the mappings to handle the array of sentences and vector objects that will be created during the ingest pipeline. For this particular text embedding model the dimensions are 384 and dot_product similarity will be used for nearest neighbor calculations:

In [6]:
client.indices.delete(index=INDEX_NAME, ignore_unavailable=True)

# Setup the index
client.indices.create(
    index=INDEX_NAME,
    settings={"index": {"default_pipeline": "chunk_text_to_passages"}},
    mappings={
        "dynamic": "true",
        "properties": {
            "passages": {
                "type": "nested",
                "properties": {
                    "vector": {
                        "properties": {
                            "predicted_value": {
                                "type": "dense_vector",
                                "index": True,
                                "dims": MODEL_DIM,
                                "similarity": MODEL_SIMILARITY,
                            }
                        }
                    }
                },
            }
        },
    },
)



ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'es_french_revo_idx'})

## Add some Documents through Celery

Now we can add documents with large amounts of text in body_content and automatically have them chunked, and each chunk text embedded into vectors by the model:

In [7]:
#Read MD File
def read_MD(md_file):
    f = open(md_file, 'r')
    docs = f.read()
    # 删除 markdown 标记
    docs = re.sub('#+ |\*+|_+|\> |\[\^[0-9]+\]|: ', '', docs)
    docs = re.sub('\n *[0-9]+\. +|\n- ', '\n', docs)
    # 按自然段分行
    docs = re.split('\n\n---\n\n|\n\n|\n', docs)
    # 删除空字符串
    docs = list(filter(lambda doc: len(doc) > 0, docs))
    title = docs[0]
    return {
        "title": title,
        "docs": docs[1:]
    }

In [8]:
root_directory = '../french_revo'
directories = [x[0] for x in os.walk(root_directory) if '.git' not in x[0]]
docs = []


for directory in directories[1:]:
    md_files = [f"{directory}/{md_file}" for md_file in os.listdir(directory)]
    for md_file in md_files:
        read_md = read_MD(md_file)
        docs += [{"text": doc, "title": read_md["title"], "file": md_file, "_index": INDEX_NAME} for _i, doc in enumerate(read_md["docs"])]

print(len(docs))

6668


In [9]:
# Add the documents to the index directly
for i in range(0, len(docs), ES_CHUNK_SIZE):
    ingest_data.apply_async(
        kwargs={
            "docs": docs[i: min(i + ES_CHUNK_SIZE, len(docs))]
        }
    )

### Aside: Pretty printing Elasticsearch responses

Your API calls will return hard-to-read nested JSON. We'll create a little function called pretty_response to return nice, human-readable outputs from our examples.

In [10]:
def pretty_response(response):
    if len(response["hits"]["hits"]) == 0:
        print("Your search returned no results.")
    else:
        for hit in response["hits"]["hits"]:
            id = hit["_id"]
            score = hit["_score"]
            doc_title = hit["_source"]["title"]
            passage_text = ""

            for passage in hit["inner_hits"]["passages"]["hits"]["hits"]:
                passage_text += passage["fields"]["passages"][0]["text"][0] + "\n\n"

            pretty_output = f"ID: {id}\nDoc Title: {doc_title}\nPassage Text:\n{passage_text}Score: {score}"
            print(pretty_output)
            print("---")

### Making queries

To search the data and return what chunk matched the query best you use inner_hits with the knn clause to return just that best matching chunk of the document in the hits output from the query.

Below you will see the response which returns the best document and the most relevant passage.

In [12]:
response = client.search(
    index=INDEX_NAME,
    knn={
        "inner_hits": {"size": 1, "_source": False, "fields": ["passages.text"]},
        "field": "passages.vector.predicted_value",
        "k": 20,
        "num_candidates": 100,
        "query_vector_builder": {
            "text_embedding": {
                "model_id": MODEL_ID_ES,
                "model_text": "资产阶级由什么样的人构成？",
            }
        },
    },
)

pretty_response(response)



ConnectionTimeout: Connection timed out