Based on https://www.pinecone.io/learn/series/faiss/vector-indexes/ and https://www.elastic.co/guide/en/elasticsearch/reference/current/knn-search.html

# Set up Elasticsearch client

In [1]:
%pip install elasticsearch
%pip install humanize

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [1]:
import os
import json
import shutil
import humanize
from datetime import datetime
import random
from elasticsearch import Elasticsearch, helpers
import numpy as np
import math
import pandas as pd
pd.set_option('display.max_colwidth', 190)
import urllib.request as request
from contextlib import closing
import warnings
warnings.filterwarnings('ignore')

# Load and prepare demo data

In [3]:
# first we download the Sift1M dataset
with closing(request.urlopen('ftp://ftp.irisa.fr/local/texmex/corpus/sift.tar.gz')) as r:
    with open('sift.tar.gz', 'wb') as f:
        shutil.copyfileobj(r, f)

In [4]:
import tarfile

# the download leaves us with a tar.gz file, we unzip it
tar = tarfile.open('sift.tar.gz', "r:gz")
tar.extractall()

In [7]:
import numpy as np

# now define a function to read the fvecs file format of Sift1M dataset
def read_fvecs(fp):
    a = np.fromfile(fp, dtype='int32')
    d = a[0]
    return a.reshape(-1, d + 1)[:, 1:].copy().view('float32')

In [8]:
# data we will search through
xb = read_fvecs('./sift/sift_base.fvecs')  # 1M samples
# also get some query vectors to search with
xq = read_fvecs('./sift/sift_query.fvecs')
# take just one query (there are many in sift_learn.fvecs)
xq = xq[0].reshape(1, xq.shape[1])

In [9]:
# The query vector
xq.shape

(1, 128)

In [10]:
# The vector search space
xb.shape

(1000000, 128)

In [11]:
# The input vector to be used for the vector queries
pd.DataFrame({'embedding' : [xq]})

Unnamed: 0,embedding
0,"[[1.0, 3.0, 11.0, 110.0, 62.0, 22.0, 4.0, 0.0, 43.0, 21.0, 22.0, 18.0, 6.0, 28.0, 64.0, 9.0, 11.0, 1.0, 0.0, 0.0, 1.0, 40.0, 101.0, 21.0, 20.0, 2.0, 4.0, 2.0, 2.0, 9.0, 18.0, 35.0, 1.0, ..."


# Connect to Elastic

In [2]:
esuser=os.getenv("ESUSER")
espassword=os.getenv("ESPASSWORD")
eshost=os.getenv("ESHOST")
esport=os.getenv("ESPORT")
client = Elasticsearch(
     f"https://{esuser}:{espassword}@{eshost}:{esport}",  # Elasticsearch endpoint
     verify_certs=False,
     request_timeout=120
)

In [3]:
dict(client.info())

{'name': 'm-2.30378e5f-16fe-488c-b12a-ff1e21906722.f1a236d6fe2348b9a4d2b297d12fbfa5.bc28ac43cf10402584b5f01db462d330.databases.appdomain.cloud',
 'cluster_name': '30378e5f-16fe-488c-b12a-ff1e21906722',
 'cluster_uuid': '2k4t1vNQSr67MnxoJloCbA',
 'version': {'number': '8.7.0',
  'build_flavor': 'default',
  'build_type': 'tar',
  'build_hash': '09520b59b6bc1057340b55750186466ea715e30e',
  'build_date': '2023-03-27T16:31:09.816451435Z',
  'build_snapshot': False,
  'lucene_version': '9.5.0',
  'minimum_wire_compatibility_version': '7.17.0',
  'minimum_index_compatibility_version': '7.0.0'},
 'tagline': 'You Know, for Search'}

# Upload vector embeddings to Elastic without a vector index

In [5]:
# Create an elastic index without a vector index mapping:
if client.indices.exists(index="embeddings_noindex"):
    client.indices.delete(index="embeddings_noindex")
client.indices.create(index="embeddings_noindex")

ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'embeddings_noindex'})

In [12]:
%%time
# Upload the documents into the Elasticsearch index:
document_list = []
batch_size=1000
for i in range(0, len(xb)):
    document = {"_id": i, "embedding_noindex": xb[i]}
    document_list.append(document)
    if i % batch_size == batch_size-1:
        helpers.bulk(client, document_list, index='embeddings_noindex')
        document_list = []

# Make sure the search index is refreshed
client.indices.refresh(index="embeddings_noindex")

CPU times: user 1min 3s, sys: 3.97 s, total: 1min 7s
Wall time: 11min 50s


ObjectApiResponse({'_shards': {'total': 2, 'successful': 2, 'failed': 0}})

In [None]:
# Verify the number of documents in the index
client.count(index='embeddings_noindex')["count"]

In [None]:
# Look up a random document for test:
client.get(index="embeddings_noindex", id=random.randint(0, len(xb)-1))

In [None]:
# Get the total index size and indexing time:
index_stats = client.indices.stats(index="embeddings_noindex").get('_all').get('primaries')
print("Index size:    " + humanize.naturalsize(index_stats.get('store').get('size_in_bytes')))
print("Indexing time: " + humanize.precisedelta(index_stats.get('indexing').get('index_time_in_millis')/1000, minimum_unit='minutes'))

## Exact nearest neighbor query with euclidean distance on dense_vector field without vector index

In [53]:
%%time
query = {
    "script_score": {
        "query" : {
            "match_all": {}
        },
        "script": {
            "source": "1 / (1 + l2norm(params.queryVector, 'embedding'))",
            "params": {
                "queryVector": xq[0]
            }
        }
    }
}
list=dict(client.search(index="embeddings_noindex", query=query, size=10))['hits']['hits']
euclidean_baseline=pd.DataFrame([(i.get('_id'), i.get('_score')) for i in list], columns=['id', 'distance'])
euclidean_baseline

CPU times: user 4.1 ms, sys: 2.61 ms, total: 6.71 ms
Wall time: 2.57 s


Unnamed: 0,id,distance
0,932085,0.004276
1,934876,0.004242
2,561813,0.004082
3,708177,0.003899
4,706771,0.003886
5,695756,0.003848
6,435345,0.003813
7,701258,0.00377
8,455537,0.003727
9,872728,0.003717


# Upload vector embeddings to Elastic with HNSW vector index

In [157]:
# Create an elastic index together with a mapping for an HNSW index for euclidean distance
if client.indices.exists(index="embeddings"):
    client.indices.delete(index="embeddings")
mapping = {
  "mappings": {
    "properties": {
      "embedding": {
        "type": "dense_vector",
        "dims": 128,
        "index": "true",
        "similarity": "l2_norm",
        "index_options": {
            "type": "hnsw",
            "m": 16,
            "ef_construction": 100
        }
      }
    }
  }
}
client.indices.create(index="embeddings", body=mapping)

ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'embeddings'})

In [158]:
%%time
# Upload the documents into the Elasticsearch index (includes HNSW index building):
document_list = []
batch_size=1000
for i in range(0, len(xb)):
    document = {"_id": i, "embedding": xb[i]}
    document_list.append(document)
    if i % batch_size == batch_size-1:
        helpers.bulk(client, document_list, index='embeddings')
        document_list = []


CPU times: user 1min 9s, sys: 6.06 s, total: 1min 15s
Wall time: 52min 22s


In [159]:
# Make sure the search index is refreshed
client.indices.refresh(index="embeddings")

ObjectApiResponse({'_shards': {'total': 2, 'successful': 2, 'failed': 0}})

In [160]:
# Verify the number of documents in the index
client.count(index='embeddings')["count"]

1000000

In [19]:
# Look up a random document for test:
client.get(index="embeddings", id=random.randint(0, len(xb)-1))

ObjectApiResponse({'_index': 'embeddings', '_id': '874182', '_version': 1, '_seq_no': 874182, '_primary_term': 1, 'found': True, '_source': {'embedding': [77.0, 1.0, 0.0, 11.0, 40.0, 2.0, 4.0, 34.0, 120.0, 23.0, 11.0, 18.0, 9.0, 0.0, 0.0, 17.0, 11.0, 24.0, 27.0, 47.0, 20.0, 0.0, 1.0, 2.0, 4.0, 12.0, 7.0, 11.0, 26.0, 5.0, 1.0, 0.0, 114.0, 2.0, 0.0, 44.0, 123.0, 1.0, 0.0, 35.0, 134.0, 6.0, 6.0, 21.0, 8.0, 0.0, 0.0, 45.0, 13.0, 4.0, 10.0, 44.0, 77.0, 42.0, 19.0, 3.0, 1.0, 1.0, 1.0, 2.0, 26.0, 96.0, 68.0, 1.0, 103.0, 2.0, 0.0, 40.0, 97.0, 4.0, 8.0, 26.0, 134.0, 7.0, 0.0, 4.0, 17.0, 17.0, 4.0, 23.0, 12.0, 0.0, 0.0, 3.0, 134.0, 134.0, 8.0, 3.0, 0.0, 0.0, 0.0, 3.0, 51.0, 119.0, 52.0, 2.0, 42.0, 3.0, 4.0, 35.0, 18.0, 4.0, 4.0, 6.0, 134.0, 3.0, 2.0, 6.0, 9.0, 48.0, 23.0, 47.0, 0.0, 0.0, 0.0, 1.0, 90.0, 134.0, 25.0, 1.0, 0.0, 0.0, 6.0, 21.0, 35.0, 19.0, 3.0, 1.0]}})

In [12]:
# Get the total index size and indexing time:
index_stats = client.indices.stats(index="embeddings").get('_all').get('primaries')
print("Index size:    " + humanize.naturalsize(index_stats.get('store').get('size_in_bytes')))
print("Indexing time: " + humanize.precisedelta(index_stats.get('indexing').get('index_time_in_millis')/1000, minimum_unit='minutes'))

Index size:    922.0 MB
Indexing time: 27.30 minutes


## Aproximate nearest neighbor query with euclidean distance using HNSW index on dense_vector

In [54]:
%%time
query = {
    "field": "embedding",
    "query_vector": xq[0],
    "k": 10,
    "num_candidates": 10
}
dict(client.knn_search(index="embeddings", knn=query))['hits']['hits']
euclidean_hnsw=pd.DataFrame([(i.get('_id'), i.get('_score')) for i in list], columns=['id', 'distance'])
euclidean_hnsw

CPU times: user 4.06 ms, sys: 2.57 ms, total: 6.63 ms
Wall time: 66.8 ms


Unnamed: 0,id,distance
0,932085,0.004276
1,934876,0.004242
2,561813,0.004082
3,708177,0.003899
4,706771,0.003886
5,695756,0.003848
6,435345,0.003813
7,701258,0.00377
8,455537,0.003727
9,872728,0.003717


In [55]:
# Calculate Recall Percentage for euclidean distance HNSW lookup:
baseline = euclidean_baseline["distance"].to_numpy()
baseline[np.in1d(baseline, euclidean_hnsw["distance"].to_numpy()).tolist()].size / baseline.size * 100

100.0