In [None]:
# requirements
!pip3 install -U elasticsearch

In [1]:
import torch
import re
from urllib.parse import urlparse, parse_qs

In [2]:
from elasticsearch import Elasticsearch, helpers, NotFoundError

In [3]:
url = 'es://localhost:9200?dropifexists#mytestcollection'

In [4]:
o = urlparse(url, allow_fragments=True)
qargs = parse_qs(o.query, keep_blank_values=True)
print(o)
print(qargs)

# parse dropifexists param
esuri = f'http://{o.netloc}'
drop_if_exists = re.search('(^$)|(^1$)|(^t[rue]{,3}$)|(^y[es]{,2}$)', qargs['dropifexists'][0].lower()) is not None
print(esuri)
print(drop_if_exists)

ParseResult(scheme='es', netloc='localhost:9200', path='', params='', query='dropifexists', fragment='mytestcollection')
{'dropifexists': ['']}
http://localhost:9200
True


In [39]:
client = Elasticsearch(
    hosts=[esuri],
    basic_auth=('elastic', 'letmein')
)

In [68]:
def create_es_index():
    mappings = {
        'properties': {
            'key': {
                'type': 'integer'
            },
            'embedding': {
                'type': 'dense_vector',
                'dims': 5,
                'index': 'true', # set to true if KNN search is desired, false otherwise
                'index_options': {
                    'type': 'int8_hnsw', # hnsw, int8_hnsw, flat, int8_flat
                },
                'similarity': 'cosine', # cosine, dot_product, l2_norm, max_inner_product, 
            }
        }
    }
    settings = {
        'number_of_replicas': 0 # default=1 but fails to resolve cluster status to GREEN in case of single-conde cluster
    }
    response = client.indices.create(index=o.fragment, mappings=mappings, settings=settings)
    print(response)
    return

if client.indices.exists(index=o.fragment).body:
    print(f"Collection '{o.fragment}' exists.")
    if drop_if_exists:
        print('dropping')
        client.indices.delete(index=o.fragment)
        #
        print('re-creating')
        create_es_index()

else:
    create_es_index()
    


Collection 'mytestcollection' exists.
dropping
re-creating
{'acknowledged': True, 'shards_acknowledged': True, 'index': 'mytestcollection'}


In [61]:
# utilities
response = client.indices.clear_cache(index=o.fragment)
print(response)

response = client.indices.flush(index=o.fragment)
print(response)

response = client.indices.get(index=o.fragment)
print(response)

response = client.indices.recovery(index=o.fragment)
print(response)

response = client.indices.refresh(index=o.fragment)
print(response)

response = client.indices.stats(index=o.fragment)
print(response)

response = client.indices.close(index=o.fragment)
print(response)

response = client.indices.open(index=o.fragment)
print(response)

response = client.cluster.put_settings(persistent={'cluster.routing.allocation.enable': None})
print(response)

response = client.cluster.reroute(metric=None)
print(response)

response = client.cluster.health()
print(response)
print(f'cluster status: {response['status']}')

response = client.indices.stats(index=o.fragment)
print(response)
print(f'index status: {response['indices'][o.fragment]['health']}')

response = client.cat.allocation()
print(response)

# response = client.cluster.put_settings(body={'index.routing.allocation.disable_allocation': False})
# print(response)
# "number_of_replicas" : 0



{'_shards': {'total': 1, 'successful': 1, 'failed': 0}}
{'_shards': {'total': 1, 'successful': 1, 'failed': 0}}
{'mytestcollection': {'aliases': {}, 'mappings': {'properties': {'embedding': {'type': 'dense_vector', 'dims': 5, 'index': False}, 'key': {'type': 'integer'}}}, 'settings': {'index': {'routing': {'allocation': {'include': {'_tier_preference': 'data_content'}}}, 'number_of_shards': '1', 'provided_name': 'mytestcollection', 'creation_date': '1716383588754', 'number_of_replicas': '0', 'uuid': '9e8QTrWMRFSzwH31tgFeLg', 'version': {'created': '8503000'}}}}}
{'mytestcollection': {'shards': [{'id': 0, 'type': 'EMPTY_STORE', 'stage': 'DONE', 'primary': True, 'start_time_in_millis': 1716383588881, 'stop_time_in_millis': 1716383588965, 'total_time_in_millis': 84, 'source': {}, 'target': {'id': 'SY8j7_DZRAqIujsEXVHZMA', 'host': '172.20.0.2', 'transport_address': '172.20.0.2:9300', 'ip': '172.20.0.2', 'name': 'esnode01'}, 'index': {'size': {'total_in_bytes': 0, 'reused_in_bytes': 0, 'rec

  response = client.cluster.reroute(metric=None)


In [62]:
a = torch.rand(int(1e4), 5)
print(a.size())

torch.Size([10000, 5])


In [70]:
## add data in a bulk
# document list 
actions = [ {'_index': o.fragment, '_id': i, 'key': i, 'embedding': a[i].tolist() } for i in range(int(5e3)) ] 
response = helpers.bulk(client=client, actions=actions)
print(response)

# # document generator
actions_gen = map(lambda i: {'_index': o.fragment, '_id': i, 'key': i, 'embedding': a[i].tolist() }, range(int(5e3), int(1e4)) )
response = helpers.bulk(client=client, actions=actions_gen)
print(response)


(5000, [])
(5000, [])


In [71]:
# delete some ids single step
ids_to_delete = [4,5,6]
for i in ids_to_delete:
    try:
        response = client.delete(index=o.fragment, id=i)
        print(response)
    except NotFoundError as err:
        print(f'{type(err)}, {err.message}, {err.body}')


{'_index': 'mytestcollection', '_id': '4', '_version': 3, 'result': 'deleted', '_shards': {'total': 1, 'successful': 1, 'failed': 0}, '_seq_no': 15000, '_primary_term': 1}
{'_index': 'mytestcollection', '_id': '5', '_version': 3, 'result': 'deleted', '_shards': {'total': 1, 'successful': 1, 'failed': 0}, '_seq_no': 15001, '_primary_term': 1}
{'_index': 'mytestcollection', '_id': '6', '_version': 3, 'result': 'deleted', '_shards': {'total': 1, 'successful': 1, 'failed': 0}, '_seq_no': 15002, '_primary_term': 1}


In [29]:
# delete multiple ids at once
more_ids_to_delete = [40,50,60]
response = client.delete_by_query(
    index=o.fragment,
    query={'terms': {'key': more_ids_to_delete}}
)
print(response)

{'took': 5, 'timed_out': False, 'total': 0, 'deleted': 0, 'batches': 0, 'version_conflicts': 0, 'noops': 0, 'retries': {'bulk': 0, 'search': 0}, 'throttled_millis': 0, 'requests_per_second': -1.0, 'throttled_until_millis': 0, 'failures': []}


In [30]:
even_more_ids_to_delete = [41,51,61]
response = client.delete_by_query(
    index=o.fragment,
    query={'terms': {'_id': even_more_ids_to_delete}}
)
print(response)

{'took': 26, 'timed_out': False, 'total': 3, 'deleted': 3, 'batches': 1, 'version_conflicts': 0, 'noops': 0, 'retries': {'bulk': 0, 'search': 0}, 'throttled_millis': 0, 'requests_per_second': -1.0, 'throttled_until_millis': 0, 'failures': []}


In [31]:
# retrieve documents with ID
response = client.get(
    index=o.fragment, 
    id=1
)
print(response)

{'_index': 'mytestcollection', '_id': '1', '_version': 3, '_seq_no': 20026, '_primary_term': 5, 'found': True, '_source': {'key': 1, 'embedding': [0.5171189904212952, 0.5124468207359314, 0.6883494257926941, 0.697633683681488, 0.5548747777938843]}}


In [32]:
# get multiple documents with IDs
ids_to_retrieve = [0,1,4,5,8,4139812,1]
response = client.mget(
    index=o.fragment, 
    ids=ids_to_retrieve
)
print(response)

{'docs': [{'_index': 'mytestcollection', '_id': '0', '_version': 3, '_seq_no': 20025, '_primary_term': 5, 'found': True, '_source': {'key': 0, 'embedding': [0.9673076272010803, 0.19709092378616333, 0.3503369092941284, 0.8820860385894775, 0.9244221448898315]}}, {'_index': 'mytestcollection', '_id': '1', '_version': 3, '_seq_no': 20026, '_primary_term': 5, 'found': True, '_source': {'key': 1, 'embedding': [0.5171189904212952, 0.5124468207359314, 0.6883494257926941, 0.697633683681488, 0.5548747777938843]}}, {'_index': 'mytestcollection', '_id': '4', 'found': False}, {'_index': 'mytestcollection', '_id': '5', 'found': False}, {'_index': 'mytestcollection', '_id': '8', '_version': 3, '_seq_no': 20033, '_primary_term': 5, 'found': True, '_source': {'key': 8, 'embedding': [0.8588331341743469, 0.7824482917785645, 0.7422242164611816, 0.40977078676223755, 0.6987150311470032]}}, {'_index': 'mytestcollection', '_id': '4139812', 'found': False}, {'_index': 'mytestcollection', '_id': '1', '_version'

In [33]:
b = torch.tensor([d['_source']['embedding'] for d in response['docs'] if d['found']], dtype=torch.float)
print(b.size())


torch.Size([4, 5])


In [72]:
# terms api single key
response=client.search(
    index=o.fragment,
    query={'term': {'key': 1}}
)
print(response)


{'took': 3, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 1, 'relation': 'eq'}, 'max_score': 1.0, 'hits': [{'_index': 'mytestcollection', '_id': '1', '_score': 1.0, '_source': {'key': 1, 'embedding': [0.08199286460876465, 0.37916743755340576, 0.8131399154663086, 0.3192172050476074, 0.40480709075927734]}}]}}


In [74]:
# terms api multiple keys
response=client.search(
    index=o.fragment,
    query={'terms': {'key': ids_to_retrieve}}
)
print(response)

{'took': 3, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 3, 'relation': 'eq'}, 'max_score': 1.0, 'hits': [{'_index': 'mytestcollection', '_id': '0', '_score': 1.0, '_source': {'key': 0, 'embedding': [0.5990166664123535, 0.06569528579711914, 0.33004242181777954, 0.5136980414390564, 0.7231322526931763]}}, {'_index': 'mytestcollection', '_id': '1', '_score': 1.0, '_source': {'key': 1, 'embedding': [0.08199286460876465, 0.37916743755340576, 0.8131399154663086, 0.3192172050476074, 0.40480709075927734]}}, {'_index': 'mytestcollection', '_id': '8', '_score': 1.0, '_source': {'key': 8, 'embedding': [0.9858206510543823, 0.4045566916465759, 0.6138359308242798, 0.7088015079498291, 0.960491955280304]}}]}}


In [47]:
# get count 
response = client.count(index=o.fragment)
print(response)

{'count': 0, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}}


In [48]:
# gather some info
print(client.info())
print(client.cat.health())

{'name': 'esnode01', 'cluster_name': 'escluster01', 'cluster_uuid': 'qDCDDpHhTx6N2Y26r7DgUw', 'version': {'number': '8.13.4', 'build_flavor': 'default', 'build_type': 'docker', 'build_hash': 'da95df118650b55a500dcc181889ac35c6d8da7c', 'build_date': '2024-05-06T22:04:45.107454559Z', 'build_snapshot': False, 'lucene_version': '9.10.0', 'minimum_wire_compatibility_version': '7.17.0', 'minimum_index_compatibility_version': '7.0.0'}, 'tagline': 'You Know, for Search'}
1716383435 13:10:35 escluster01 green 1 1 3 3 0 0 0 0 - 100.0%



In [37]:
# finally close connections
client.close()

In [None]:
# KNN search
# response = client.search(
#     index="book_index",
#     knn={
#         "field": "title_vector",
#         "query_vector": model.encode("javascript books"),
#         "k": 10,
#         "num_candidates": 100,
#     },
# )
#

# KNN with filter on field
# response = client.search(
#     index="book_index",
#     knn={
#         "field": "title_vector",
#         "query_vector": model.encode("javascript books"),
#         "k": 10,
#         "num_candidates": 100,
#         "filter": {"term": {"publisher.keyword": "addison-wesley"}},
#     },
# )


In [None]:
# HYBRID SEARCH
# response = client.search(
#     index="book_index",
#     size=5,
#     query={"match": {"summary": "python programming"}},
#     knn={
#         "field": "title_vector",
#         "query_vector": model.encode(
#             "python programming"
#         ).tolist(),  # generate embedding for query so it can be compared to `title_vector`
#         "k": 5,
#         "num_candidates": 10,
#     },
#     rank={"rrf": {}},
# )