In [1]:
#### Require python 3.10+
!python --version

Python 3.10.12


In [2]:
conda install -c conda-forge ipywidgets

Collecting package metadata (current_repodata.json): done
Solving environment: done


  current version: 4.14.0
  latest version: 23.7.4

Please update conda by running

    $ conda update -n base -c conda-forge conda



# All requested packages already installed.

Retrieving notices: ...working... done

Note: you may need to restart the kernel to use updated packages.


In [6]:
!pip install --no-cache-dir opensearch-py python-dotenv boto3 tqdm h5py matplotlib ipywidgets jedi ipython




In [None]:
# For autocomplete use shift+tab
%config IPCompleter.greedy=True

In [2]:
# Download the sift-128 dataset
!curl -o sift-128-euclidean.hdf5 -L http://ann-benchmarks.com/sift-128-euclidean.hdf5

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   167  100   167    0     0   1018      0 --:--:-- --:--:-- --:--:--  1056
100  500M  100  500M    0     0  4017k      0  0:02:07  0:02:07 --:--:-- 1424k0     0  9777k      0  0:00:52  0:00:15  0:00:37 4401k 0  7036k      0  0:01:12  0:00:22  0:00:50 1078kk      0  0:01:28  0:00:34  0:00:54 4710k  0:00:36  0:00:55 3817k    0     0  4691k      0  0:01:49  0:00:47  0:01:02 2174k  0     0  4516k      0  0:01:53  0:00:50  0:01:03 1863k   0     0  4264k      0  0:02:00  0:00:54  0:01:06 1067k      0  0:02:30  0:01:15  0:01:15 1455k4330k      0  0:01:58  0:01:52  0:00:06 7234k 0     0  4084k      0  0:02:05  0:02:05 --:--:-- 3152k


In [21]:
# Read Data set
import numpy as np
import h5py

dataset = h5py.File('sift-128-euclidean.hdf5', "r")
X_TRAIN = np.array(dataset["train"])
X_TEST = np.array(dataset["test"])
dimension = int(dataset.attrs["dimension"]) if "dimension" in dataset.attrs else len(X_TRAIN[0])

print(f"Ingest dataset size is : {len(X_TRAIN)}")
print(f"Queries dataset size is : {len(X_TEST)}")
print(f"dataset dimensions is : {dimension}")

Ingest dataset size is : 1000000
Queries dataset size is : 10000
dataset dimensions is : 128


In [22]:
from dotenv import load_dotenv
from opensearchpy import OpenSearch, RequestsHttpConnection
import os


res = load_dotenv("environment.txt")

OS_HOST = os.getenv('OS_HOST')
OS_PORT = os.getenv('OS_PORT')
OS_USER = os.getenv('USER_NAME')
OS_PASSWORD = os.getenv('PASSWORD')


client = OpenSearch(
    hosts = [{'host': OS_HOST, 'port': OS_PORT}],
    http_compress = True, # enables gzip compression for request bodies
    http_auth = (OS_USER, OS_PASSWORD),
    use_ssl = True,
    verify_certs = True,
    connection_class = RequestsHttpConnection,
    timeout=6000,
    pool_maxsize = 20
)

client.info()



{'name': '428d6ce63b054e8d1dd55d36eb0ea810',
 'cluster_name': '199552501713:go-daddy-xlarge',
 'cluster_uuid': 'Sc_tsdMeQ-6R8gdYNCY1FQ',
 'version': {'distribution': 'opensearch',
  'number': '2.7.0',
  'build_type': 'tar',
  'build_hash': 'unknown',
  'build_date': '2023-08-08T16:51:18.396423063Z',
  'build_snapshot': False,
  'lucene_version': '9.5.0',
  'minimum_wire_compatibility_version': '7.10.0',
  'minimum_index_compatibility_version': '7.0.0'},
 'tagline': 'The OpenSearch Project: https://opensearch.org/'}

In [57]:
vector_index_name = os.getenv('VECTOR_INDEX_NAME', "test-vector")

print(f"vector name from env is : {vector_index_name}")

index_mappings = {
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0,
        #"refresh_interval": "-1",
        "index": {
          "knn": True,
          "knn.algo_param.ef_search": 128 # Adjust to improve precision. Higher improves recall & precsion but increases latency. Lower degrades recall & precision but improves latency.
        }
    },
    "mappings": {
        "properties": {
            "vec": {
                "type": "knn_vector",
                "dimension": dimension,
                "index": "true",
                "method": {
                    "name": "hnsw",
                    "space_type": "l2", # l2 for SIFT, cosinesimil for typical
                    "engine": "nmslib",
                    "parameters": {
                        "ef_construction": 128
                    }
                }
            }
        }
    }
}

if client.indices.exists(index=vector_index_name):
    response = client.indices.delete(index=vector_index_name)
    print(f"Deleting the index. Response : {response}")

response = client.indices.create(index=vector_index_name, body=index_mappings)
print(f"Creating the index. Response : {response}")



vector name from env is : test_vector
Deleting the index. Response : {'acknowledged': True}
Creating the index. Response : {'acknowledged': True, 'shards_acknowledged': True, 'index': 'test_vector'}


In [58]:
# ingest data in the index
from tqdm.notebook import tqdm
from opensearchpy.helpers import bulk
import time


bulk_size = 1000
def dataGen():
    for i, vec in enumerate(X_TRAIN):
        yield { "_index": vector_index_name, "_id": str(i + 1), "vec": vec.tolist() }

data_to_ingest = []
total_time_to_ingest = 0.
ingest_latency = []
for data in tqdm(dataGen(), total=len(X_TRAIN)):
    if len(data_to_ingest) == bulk_size:
        start = time.time()
        (res, errors) = bulk(client, data_to_ingest)
        end = time.time()
        total_time_to_ingest += (end-start)
        ingest_latency.append(end-start)
        if len(errors) != 0:
            print(errors)
            data_to_ingest = []
            StopIteration
        else:
            data_to_ingest = []

    if len(data_to_ingest) < bulk_size:
        data_to_ingest.append(data)
    

if len(data_to_ingest) != 0:
    start = time.time()
    (_, errors) = bulk(client, data_to_ingest)
    end = time.time()
    total_time_to_ingest += (end-start)
    if len(errors) != 0:
        print(errors)
    else:
        data_to_ingest = []

print(f"Ingestion completed. Total time to ingest = {total_time_to_ingest} seconds, average time per document: {total_time_to_ingest/(len(X_TRAIN))}")


  0%|          | 0/1000000 [00:00<?, ?it/s]

Ingestion completed. Total time to ingest = 1092.6282296180725 seconds, average time per document: 0.0010926282296180726


In [62]:
# Refresh the index as we set the refresh interval to -1
client.indices.refresh(index=vector_index_name)



{'_shards': {'total': 1, 'successful': 1, 'failed': 0}}

In [75]:
import json
# Check index details, you should see 1M documents in the index.
print(client.cat.indices(index=vector_index_name))

print("Segments Info After refresh and force merge...")
print(json.dumps(client.indices.segments(index=vector_index_name)))

green open test_vector ifyr-_kLRgC2wKk9h40dBw 1 0 1000000 0 1.4gb 1.4gb

Segments Info After refresh and force merge...
{"_shards": {"total": 1, "successful": 1, "failed": 0}, "indices": {"test_vector": {"shards": {"0": [{"routing": {"state": "STARTED", "primary": true, "node": "e1N5crlpTI6LSejpgVckMQ"}, "num_committed_segments": 10, "num_search_segments": 10, "segments": {"_0": {"generation": 0, "num_docs": 1000, "deleted_docs": 0, "size_in_bytes": 1556128, "memory_in_bytes": 0, "committed": true, "search": true, "version": "9.5.0", "compound": true, "attributes": {"Lucene90StoredFieldsFormat.mode": "BEST_SPEED"}}, "_1": {"generation": 1, "num_docs": 1000, "deleted_docs": 0, "size_in_bytes": 1556730, "memory_in_bytes": 0, "committed": true, "search": true, "version": "9.5.0", "compound": true, "attributes": {"Lucene90StoredFieldsFormat.mode": "BEST_SPEED"}}, "_2": {"generation": 2, "num_docs": 1000, "deleted_docs": 0, "size_in_bytes": 1557526, "memory_in_bytes": 0, "committed": true, 

In [80]:
# Setup for Search
import numpy as np
from tqdm.notebook import tqdm
# search in the index
def searchQueryGen(input_array=X_TEST):
    for i, vec in enumerate(input_array):
        yield {
            "_source": False, # Don't get the source as this impacts latency
            "size": 100,
            "query": {
                "knn": {
                    "vec": {
                        "vector": vec.tolist(),
                        "k": 100
                    }
                }
            }
        }

In [82]:
# Uncomment the below code to optimize the search latency
# Run few queries to load the data in cache. We are running 10 queries here to warmup the cluster

# X_WARMUP = X_TEST[0:10]
# for query in tqdm(searchQueryGen(X_WARMUP), total=len(X_WARMUP)):
#     search_response = client.search(body=query, index=vector_index_name, _source=False, docvalue_fields=["_id"], stored_fields="_none_")

# print("--- Warmuped up the Cluster----")



  0%|          | 0/10 [00:00<?, ?it/s]

--- Warmup up the Cluster----


In [69]:



neighbors_lists = []
search_latency = []
took_time = []
for query in tqdm(searchQueryGen(), total=len(X_TEST)):
    start = time.time()
    search_response = client.search(body=query, index=vector_index_name, _source=False, docvalue_fields=["_id"], stored_fields="_none_")
    end = time.time()
    search_latency.append(end - start)
    took_time.append(search_response["took"])
    search_hits = search_response['hits']['hits']
    search_neighbors = [int(hit["fields"]["_id"][0]) for hit in search_hits]
    neighbors_lists.append(search_neighbors)

print(f"\n\naverage Latency(ms): {np.average(search_latency) *1000}") 
print(f"p50 Latency(ms): {np.percentile(search_latency, 50) *1000}") 
print(f"p90 Latency(ms): {np.percentile(search_latency, 90) *1000}")
print(f"p99 Latency(ms): {np.percentile(search_latency, 99) *1000}")

print(f"average took_time(ms): {np.average(took_time)}") 
print(f"p50 took_time(ms): {np.percentile(took_time, 50)}") 
print(f"p90 took_time(ms): {np.percentile(took_time, 90)}")
print(f"p90 took_time(ms): {np.percentile(took_time, 99)}")



  0%|          | 0/10000 [00:00<?, ?it/s]



average Latency(ms): 114.25395605564118
p50 Latency(ms): 109.2754602432251
p90 Latency(ms): 125.4896640777588
p99 Latency(ms): 179.84636783599856
average took_time(ms): 3.3558
p50 took_time(ms): 3.0
p90 took_time(ms): 4.0
p90 took_time(ms): 4.0


In [71]:
# Useful Metrics
print("========================== Search Metrics ===================================")
print("========================== Server Side Latency ===================================")
print(f"average took_time(ms): {np.average(took_time)}") 
print(f"p50 took_time(ms): {np.percentile(took_time, 50)}") 
print(f"p90 took_time(ms): {np.percentile(took_time, 90)}")
print(f"p90 took_time(ms): {np.percentile(took_time, 99)}")


print("========================== Client side latency ===================================")
print(f"\n\naverage Latency(ms): {np.average(search_latency) *1000}") 
print(f"p50 Latency(ms): {np.percentile(search_latency, 50) *1000}") 
print(f"p90 Latency(ms): {np.percentile(search_latency, 90) *1000}")
print(f"p99 Latency(ms): {np.percentile(search_latency, 99) *1000}")






average Latency(ms): 114.25395605564118
p50 Latency(ms): 109.2754602432251
p90 Latency(ms): 125.4896640777588
p99 Latency(ms): 179.84636783599856
average took_time(ms): 3.3558
p50 took_time(ms): 3.0
p90 took_time(ms): 4.0
p90 took_time(ms): 4.0


In [None]:
# Parallelization for Search