## Using Elasticsearch to explore Huggingface Datasets

### Connecting to the ES client

In [2]:
from getpass import getpass  
from elasticsearch import Elasticsearch

# Prompt the user to enter their Elastic Cloud ID and API Key securely
ELASTIC_CLOUD_ID = getpass("Elastic Cloud ID: ")
ELASTIC_API_KEY = getpass("Elastic API Key: ")

# Create an Elasticsearch client using the provided credentials
client = Elasticsearch(
    cloud_id=ELASTIC_CLOUD_ID,  # cloud id can be found under deployment management
    api_key=ELASTIC_API_KEY, # your username and password for connecting to elastic, found under Deplouments - Security
)

Huggingface allows as to quickly get started with datasets. This collection of 2 million posts from blueskye will allow us to explore the text social media data and find some cool insights. 

https://huggingface.co/datasets/alpindale/two-million-bluesky-posts

In [5]:
from datasets import load_dataset

ds = load_dataset("alpindale/two-million-bluesky-posts", split="train")

  from .autonotebook import tqdm as notebook_tqdm


Here's an example of a post:

In [6]:
ds[0:1]

{'text': ["This is really interesting polling data about national public attitudes re: California.  It's from the LA Times, in January.  I wonder if this will change substantially in the next two years?  5233025.fs1.hubspotusercontent-na1.net/hubfs/523302..."],
 'created_at': ['2024-11-27T07:53:47.202Z'],
 'author': ['did:plc:5ug6fzthlj6yyvftj3alekpj'],
 'uri': ['at://did:plc:5ug6fzthlj6yyvftj3alekpj/app.bsky.feed.post/3lbw33zxvik24'],
 'has_images': [False],
 'reply_to': [None]}

The most interesting thing we can do with such a dataset is to search through the posts. Huggingface integrates seamlessly with elasticsearch to allow us to add search capabilities to the data. 

[These docs](https://huggingface.co/docs/datasets/en/faiss_es#elasticsearch) show how to add a search index to your Dataset.

In [52]:
index_name="bluesky"
ds.add_elasticsearch_index(column="text", es_client=client ,es_index_name=index_name)

100%|██████████| 2107530/2107530 [08:13<00:00, 4270.93docs/s]


Dataset({
    features: ['text', 'created_at', 'author', 'uri', 'has_images', 'reply_to'],
    num_rows: 2107530
})

In [12]:
# Once the index has been initialized once, you can load it again for future uses from elastic.
ds.load_elasticsearch_index("text", es_client=client ,es_index_name=index_name)

This created the "bluesky" index in Elasticsearch and added our HuggingFace dataset to it. It also creates an index on the "text" feature of our Huggingface dataset that can be further leveraged.

This means that we can run our usual commands to interact with this data through the regular elastic client (or any other methods like direct API calls or the Dev Console):

In [42]:
query={
        "match": {
            "text": "travelling"
        }
    }

response = client.search(index=index_name, query=query)

for hit in response["hits"]["hits"][0:5]:
    print(hit['_source']['text'])


Bleurgh!!!! Why is it that travelling anywhere involves so much, well, travelling?
i am TRAVELLING not DRIVING
Travelling squad gonna be hilarious
Very nice! Are you stull travelling?
Amsterdam, Netherlands 🇳🇱

#Amsterdam #Holland #travelling


Alternatively, we can continue to use the huggingface functions and leverage the ES index that has been added to our dataset:

In [13]:
scores, retrieved_examples = ds.get_nearest_examples(index_name="text", query="travelling", k=5)
retrieved_examples["text"]

['Bleurgh!!!! Why is it that travelling anywhere involves so much, well, travelling?',
 'i am TRAVELLING not DRIVING',
 'Travelling squad gonna be hilarious',
 'Very nice! Are you stull travelling?',
 'Amsterdam, Netherlands 🇳🇱\n\n#Amsterdam #Holland #travelling']

One benefit of indexing your data however, is that you no longer need to locally load the dataset, rather sending the search queries to run where your ES client is hosted insted of processing your computations locally.

### Leveraging models

While simple search is already useful, we can leverage even more Huggingface + Elasticsearch superpowers by adding models into the mix. 

Similarily to the index - we can leverage the storing & compute of your Elastic instance for your chosen LLMs then simply call upon them using either the elastic client or compatible huggingface functions.

For example, we can start with one of the Elasticsearch models registered on HuggingFace: [the E5 multilingual transofrmer](https://huggingface.co/elastic/multilingual-e5-small-optimized) will be useful to help us search through all posts, even in foreign languages.

In [None]:
!docker pull docker.elastic.co/eland/eland

!docker run -it --rm elastic/eland \
    eland_import_hub_model \
      --cloud-id $ELASTIC_CLOUD_ID \
      --es-api-key $ELASTIC_API_KEY \
      --hub-model-id elastic/multilingual-e5-small-optimized \
      --task-type text_embedding \
      --clear-previous \
      --start

This model will now show up under `Trainded Models` in your Elastic Cloud interface.

We can now call this model and run inference tasks on it. Here's an example to get us started:

In [37]:
model_id = "elastic__multilingual-e5-small-optimized"
models = client.ml.get_trained_models(model_id=model_id)
models.body

{'count': 1,
 'trained_model_configs': [{'model_id': 'elastic__multilingual-e5-small-optimized',
   'model_type': 'pytorch',
   'created_by': 'api_user',
   'version': '12.0.0',
   'create_time': 1733235941414,
   'model_size_bytes': 0,
   'estimated_operations': 0,
   'license_level': 'platinum',
   'description': "Model elastic/multilingual-e5-small-optimized for task type 'text_embedding'",
   'tags': [],
   'input': {'field_names': ['text_field']},
   'inference_config': {'text_embedding': {'vocabulary': {'index': '.ml-inference-native-000002'},
     'tokenization': {'xlm_roberta': {'do_lower_case': False,
       'with_special_tokens': True,
       'max_sequence_length': 512,
       'truncate': 'first',
       'span': -1}},
     'embedding_size': 384}},
   'location': {'index': {'name': '.ml-inference-native-000002'}}}]}

In [53]:
#Run a query againt the model - this is the format the query imput must be used in, you can later map your features into this format through an ingest pipeline
doc_test = {'text_field': 'i am TRAVELLING not DRIVING'}

result = client.ml.infer_trained_model(model_id =model_id, docs = doc_test)
result["inference_results"]

ApiError: ApiError(500, 'status_exception', "Error in inference process: [forward() is missing value for argument 'token_type_ids'. Declaration: forward(__torch__.eland.ml.pytorch.transformers.___torch_mangle_889._SentenceTransformerWrapper self, Tensor input_ids, Tensor attention_mask, Tensor token_type_ids, Tensor position_ids) -> Tensor\nException raised from checkAndNormalizeInputs at /usr/src/pytorch/aten/src/ATen/core/function_schema_inl.h:413 (most recent call first):\nframe #0: c10::Error::Error(c10::SourceLocation, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >) + 0x68 (0x7f2be1d5b3c8 in /usr/share/elasticsearch/modules/x-pack-ml/platform/linux-x86_64/bin/../lib/libc10.so)\nframe #1: c10::detail::torchCheckFail(char const*, char const*, unsigned int, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&) + 0xf3 (0x7f2be1d09671 in /usr/share/elasticsearch/modules/x-pack-ml/platform/linux-x86_64/bin/../lib/libc10.so)\nframe #2: <unknown function> + 0x1476f7d (0x7f2be3235f7d in /usr/share/elasticsearch/modules/x-pack-ml/platform/linux-x86_64/bin/../lib/libtorch_cpu.so)\nframe #3: torch::jit::Method::operator()(std::vector<c10::IValue, std::allocator<c10::IValue> >, std::unordered_map<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, c10::IValue, std::hash<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > >, std::equal_to<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > >, std::allocator<std::pair<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const, c10::IValue> > > const&) const + 0x16c (0x7f2be64a9e1c in /usr/share/elasticsearch/modules/x-pack-ml/platform/linux-x86_64/bin/../lib/libtorch_cpu.so)\nframe #4: <unknown function> + 0x19cb3 (0x562367ca2cb3 in ./pytorch_inference)\nframe #5: <unknown function> + 0x1ac33 (0x562367ca3c33 in ./pytorch_inference)\nframe #6: <unknown function> + 0x40194 (0x562367cc9194 in ./pytorch_inference)\nframe #7: <unknown function> + 0x17c61 (0x562367ca0c61 in ./pytorch_inference)\nframe #8: <unknown function> + 0x17d38 (0x562367ca0d38 in ./pytorch_inference)\nframe #9: ml::core::CStaticThreadPool::CWrappedTask::operator()() + 0x2e (0x7f2be1c2288e in /usr/share/elasticsearch/modules/x-pack-ml/platform/linux-x86_64/bin/../lib/libMlCore.so)\nframe #10: ml::core::CStaticThreadPool::worker(unsigned long) + 0xac (0x7f2be1c22f3c in /usr/share/elasticsearch/modules/x-pack-ml/platform/linux-x86_64/bin/../lib/libMlCore.so)\nframe #11: <unknown function> + 0x109acf (0x7f2be1536acf in /usr/share/elasticsearch/modules/x-pack-ml/platform/linux-x86_64/bin/../lib/libstdc++.so.6)\nframe #12: <unknown function> + 0x8609 (0x7f2be18b9609 in /lib/x86_64-linux-gnu/libpthread.so.0)\nframe #13: clone + 0x43 (0x7f2be1141353 in /lib/x86_64-linux-gnu/libc.so.6)\n]")

We can take this principle and use an Elastic pipeline to run the inference on each of our saved blueskye posts in the original index; and adding the generated embeddings as a new field.

In [49]:
client.ingest.put_pipeline(
    id="pipeline_e5",
    processors=[
        {
            "inference": {
                "model_id": model_id,
                "field_map": {"text": "text_field"},  # field to embed: text
                "target_field": "text_embeddings",  # embedded field: text_embeddings
            }
        }
    ],
)

ObjectApiResponse({'acknowledged': True})

In [50]:
mappings = {
    "properties" : {
        "text" : {
            "type" : "keyword",
            "type" : "text"
        },
        "text_embeddings.predicted_value": {
            "type": "dense_vector",
            "dims": 768,
            "index": "true",
            "similarity": "cosine",
        }
    }
}

In [None]:
# Create the index (deleting any existing index)
client.indices.delete(index="bluesky_embedd", ignore_unavailable=True)
client.indices.create(index="bluesky_embedd", mappings=mappings)

In [47]:
client.indices.create(index="test_skye")
docs = [{'text': 'Bleurgh!!!! Why is it that travelling anywhere involves so much, well, travelling?'},
{'text': 'i am TRAVELLING not DRIVING'},
{'text': 'Travelling squad gonna be hilarious'},
{'text': 'Very nice! Are you stull travelling?'},
{'text': 'Amsterdam, Netherlands 🇳🇱\n\n#Amsterdam #Holland #travelling'}]
for doc in docs:
    client.index(index="test_skye", document=doc)

In [51]:
#Creating the new index with enriched data
client.reindex(body={
      "source": {
          "index": "test_skye"},
      "dest": {"index": "test_skye_emb", "pipeline" : "pipeline_e5"}
    }, wait_for_completion=False)

ObjectApiResponse({'task': 'VCXSZIvKQjSq81_QCeN32w:254538329'})

### Alternatively, you can run the model locally via the HuggingFace integration.

In [None]:
from sentence_transformers import SentenceTransformer

model = SentenceTransformer("elastic/multilingual-e5-small-optimized")

In [30]:
sentences = [
    "The weather is lovely today.",
    "It's so sunny outside!",
    "This is random."
]
embeddings = model.encode(sentences)

similarities = model.similarity(embeddings, embeddings)
print(similarities)

tensor([[1.0000, 0.9414, 0.9525],
        [0.9414, 1.0000, 0.9261],
        [0.9525, 0.9261, 1.0000]])
