# Approximate Nearest Neighbors on Elastic Search with Docker
[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/stephenleo/adventures-with-ann/blob/main/ann_es_docker.ipynb)

Scaling ANNs to "Big" Data Volumes

![Header Image](images/ann_es_docker.png)

Docker containers are crucial for Data Science at Scale [Link](https://blogs.nvidia.com/blog/2020/09/03/what-is-mlops/). That's very well the case for Approximate Nearest Neighbors (ANNs) on "big" data too!

> Everything must run in a container

Speed and Accuracy (or Recall) are the top two considerations while choosing a Nearest Neighbors or Similarity Search algorithm. In my previous post, [KNN is Dead](https://medium.com/towards-artificial-intelligence/knn-k-nearest-neighbors-is-dead-fc16507eb3e), I have proven the tremendous (>300X) speed advantage ANNs have over KNN at comparable accuracy. I've also discussed how you can choose the fastest, most accurate ANN algorithm on your own dataset [Link](https://medium.com/towards-artificial-intelligence/how-to-choose-the-best-nearest-neighbors-algorithm-8d75d42b16ab). 

However, sometimes, in addition to speed and accuracy, you also need the ability to scale to large data volumes. The ease of distributing the data across multiple machines is a third consideration in these cases. Let's solve all these three considerations concurrently with the fantastic OpenDistro for Elastic Search [Link](https://opendistro.github.io/for-elasticsearch/) in this post.

## Why Elastic Search (ES)?
ES might be a foreign concept to many Data Scientists reading this post, so let me introduce why it is important beyond its typical usage by your IT team.

ES is a search engine database that famously powers search capabilities across the massive volume of Wikipedia [Link](https://opendistro.github.io/for-elasticsearch-docs/docs/elasticsearch/). It allows for full-text search [Link](https://en.wikipedia.org/wiki/Full-text_search) similar to Apache Solr for those from the Hadoop world. ES's distributed nature means it can be scaled to handle huge data volumes by adding more servers/nodes similar to Hadoop. 

The following are the three reasons why Data Scientists would be interested in Elastic Seach.
1. ***Scalable ANN***: OpenDistro ES distribution has implemented the HNSW ANN algorithm as a plugin [Link](https://opendistro.github.io/for-elasticsearch/features/knn.html). ES's distributed nature enables it to scale this high-speed, high-accuracy HNSW ANN search to many millions of records.
2. ***Support***: Most modern IT and Infrastructure teams are already familiar with (and heavily using) ES. As a Data Scientist, you probably want the IT Infra team to set up and maintain the server hardware, so using a technology that they are already familiar with increases your chance of securing their support. This support can literally make or break the productionizing of your Data Science project, so don't underestimate it! 
3. ***Simpler Stack***: ES can be used as a database to store additional fields, which can then be queried together with the nearest neighbors. For example, if you search for nearest neighbors using product name embeddings, you can also get the product price, category, date added, etc. As long as you stored it in ES, you can retrieve it during the ANN search. This simplifies your application stack because you can get all the information you need from one place instead of querying the nearest neighbors from one location and then hitting another database to get the other fields for all these neighbors.

# Installing OpenDistro ES (which includes HNSW)
This is super-duper simple using Docker. We'll follow the instructions from the official documentation [Link](https://opendistro.github.io/for-elasticsearch-docs/docs/install/docker/). 

***Important!*** This setup is only for your experimentation of HNSW ANN on ES! For Production setup, please ask your IT Infra team to set up the necessary security protocols.

## Single Machine
1. Install `docker` on your machine following instructions from the docker website [Link](https://docs.docker.com/engine/install/)
2. Pull the docker image for OpenDistro: `sudo docker pull amazon/opendistro-for-elasticsearch:1.12.0`
3. Run the docker image: `sudo docker run --rm -p 9200:9200 -p 9600:9600 -e "discovery.type=single-node" amazon/opendistro-for-elasticsearch:1.12.0`

That's it! You can now interact with the ES service.

In [None]:
%%bash
sudo docker pull amazon/opendistro-for-elasticsearch:1.12.0
sudo docker run --rm -p 9200:9200 -p 9600:9600 -e "discovery.type=single-node" amazon/opendistro-for-elasticsearch:1.12.0

## Cluster of Multiple Machines
Run the following steps in **each of the machines** you want to set up in your cluster. ***Again Important!*** This is just for experimentation, DO NOT use it in production without the proper security protocols.

1. Install `docker` on your machine following instructions from the docker website [Link](https://docs.docker.com/engine/install/)
2. Pull the docker image for OpenDistro: `sudo docker pull amazon/opendistro-for-elasticsearch:1.12.0`
3. Install `docker-compose` following instructions from the docker website [Link](https://docs.docker.com/compose/install/)
4. Create a `docker-compose.yml` file
5. Create a `elasticsearch.yml` file
6. Start the ES service: `sudo docker-compose up`

If you encounter any errors, try updating the VM map count as per [Link](https://github.com/opendistro-for-elasticsearch/opendistro-build/issues/329): `sudo sysctl -w vm.max_map_count=262144`

The whole process is shown below

In [None]:
%%bash
# Run this script in each of the machine on your cluster
# Provide a unique node_name for each machine on your cluster
node_name=odfe-node01

# JVM memory, update as per your machine's hardware
Xms=10g
Xmx=10g

# Fixed
cluster_name=odfe-cluster01
initial_master=odfe-node01
internal_ip=hostname -I | awk '{print $1}'

# Pull the docker container
sudo docker pull amazon/opendistro-for-elasticsearch:1.12.0

# Create docker-compose.yml
echo '# Instructions from
# https://opendistro.github.io/for-elasticsearch-docs/docs/install/docker/
# https://www.elastic.co/guide/en/elastic-stack-get-started/current/get-started-docker.html

version: "3"
services:
    '$node_name':
    image: amazon/opendistro-for-elasticsearch:1.12.0
    container_name: odfe-node
    environment:
        - cluster.name='$cluster_name'
        - bootstrap.memory_lock=true # along with the memlock settings below, disables swapping
        - "ES_JAVA_OPTS=-Xms'$Xms' -Xmx'$Xmx'" # minimum and maximum Java heap size, recommend setting both to 50% of system RAM
    ulimits:
        memlock:
            soft: -1
            hard: -1
        nofile:
            soft: 65536 # maximum number of open files for the Elasticsearch user, set to at least 65536 on modern systems
            hard: 65536
    volumes:
        - odfe-data:/usr/share/elasticsearch/data
        - ./elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml
    ports:
        - 9200:9200
        - 9300:9300 # required for adding more nodes
        - 9600:9600 # required for Performance Analyzer
    networks:
        - odfe-net

volumes:
    odfe-data:
        driver: local

networks:
    odfe-net:
        driver: bridge' > docker-compose.yml

# Create elasticsearch.yml
echo 'cluster.name: '$cluster_name'
node.name: '$node_name'
node.roles: [master, data]
opendistro_security.disabled: true
http.host: 0.0.0.0
transport.host: 0.0.0.0
transport.publish_host: '$internal_ip'
http.publish_host: '$internal_ip'
http.port: 9200
transport.tcp.port: 9300
network.host: [127.0.0.1, '$internal_ip']
cluster.initial_master_nodes:
    - '$initial_master'
discovery.seed_hosts:
    - '$internal_ip'
    - <ip address of 2nd machine on this cluster>
    - <ip address of 3rd machine on this cluster>
path:
    data: /usr/share/elasticsearch/data
' > elasticsearch.yml

# Start the ES service
sudo docker-compose up

## Check ES Status
After you complete running the steps above on each of your machines, you can check the status of your cluster by using: `curl -XGET <ip_address_of_any_node>:9200/_cat/nodes?v -u <username>:<password> --insecure`. You should see all the nodes in your cluster.

In [2]:
!curl -XGET https://localhost:9200/_cat/nodes?v -u 'admin:admin' --insecure

ip         heap.percent ram.percent cpu load_1m load_5m load_15m node.role master name
172.17.0.2           28          18   1    0.39    0.20     0.13 dimr      *      b8d08220be95


# Using ES for ANN

## Open a DataFrame to load into ES
We shall use the same Amazon 500K product dataset used in my previous post [KNN is Dead!](https://medium.com/towards-artificial-intelligence/knn-k-nearest-neighbors-is-dead-fc16507eb3e)

In [1]:
import pandas as pd
df = pd.read_pickle("df.pkl")
embedding_col = "emb"

print(df.shape)
df.head()

(527543, 2)


Unnamed: 0,title,emb
0,Puppies Faceplate Hard Case Protector for Net1...,"[0.01443001, 0.0070197457, 0.011907284, 0.0197..."
1,White Wolf Faceplate Protector Hard Case for S...,"[0.007030556, 0.03295864, 0.028047869, 0.01396..."
2,Camo Duck Grass Rubberized Hard Case Phone Fac...,"[0.0013388951, 0.03315278, 0.03617841, -0.0105..."
3,Camoflague Camo Usa Deer Combo Hybrid Hard Cas...,"[0.01922137, -0.0041188085, 0.022043534, -0.00..."
4,Motorola H700 Black - Non-Retail Packaging,"[0.004890033, 0.019337852, 0.026887875, -0.037..."


## Creating an ES Index with HNSW
Once you have setup OpenDistro ES, we need to create an ES Index that will hold all our data. This can be done either by using the Python requests library or using the Python [ElasticSearch library](https://elasticsearch-py.readthedocs.io/en/7.10.0/): `pip install elasticsearch`. I'll use the ElasticSearch library in this post. 

In this step, we need to specify the HNSW parameters `ef_construction` and `M` as part of the ES index settings. We also need to indicate whether to use `l2` (euclidean) or `cosinesimil` (angular) distance to find neighbors. Some of the settings are best practices from [Link](https://medium.com/@kumon/how-to-realize-similarity-search-with-elasticsearch-3dd5641b9adb)

In [14]:
!pip install elasticsearch

Collecting elasticsearch
  Downloading elasticsearch-7.10.1-py2.py3-none-any.whl (322 kB)
[K     |████████████████████████████████| 322 kB 9.5 MB/s eta 0:00:01
Installing collected packages: elasticsearch
Successfully installed elasticsearch-7.10.1


In [3]:
# Imports
from elasticsearch import Elasticsearch

# Suppress the insecure warning while using verify = False
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

In [4]:
# ES constants
index_name = "amazon-500k"

es = Elasticsearch(["https://localhost:9200"], 
                   http_auth=("admin", "admin"), 
                   verify_certs=False)

  % self.host


In [5]:
# ES settings
body = {
  "settings": {
    "index": {
      "knn": True,
      "knn.space_type": "l2", 
      "knn.algo_param.ef_construction": 100, 
      "knn.algo_param.m": 16
    },
    "number_of_shards": 10, 
    "number_of_replicas": 0,
    "refresh_interval": -1,
    "translog.flush_threshold_size": "10gb"
  }
}

mapping = {
  "properties": {
    embedding_col: {
      "type": "knn_vector", 
      "dimension": len(df.loc[0,embedding_col])
    }
  }
}

In [6]:
# Create the Index
es.indices.create(index_name, body=body)
es.indices.put_mapping(mapping, index_name)

{'acknowledged': True}

After you complete running the code above, we can check the available indices on our ES cluster using : `curl -XGET <ip_address_of_any_node>:9200/_cat/indices?v -u <username>:<password> --insecure`

In [7]:
!curl -XGET https://localhost:9200/_cat/indices?v -u 'admin:admin' --insecure

health status index                        uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   security-auditlog-2020.12.20 c9IG9Uc5Sn2cF68UAiQJTw   1   1          2            0     30.2kb         30.2kb
green  open   amazon-500k                  Mb3C3OFvSvKIseAINNTAIQ  10   0          0            0       624b           624b
green  open   .opendistro_security         BEb84-71RcSEZ3oAJUaPJg   1   0          9            0     55.4kb         55.4kb


## Uploading Data

Now that the index has been created, we can upload data to it either by using the Python requests library or using the Python [ElasticSearch library](https://elasticsearch-py.readthedocs.io/en/7.10.0/), which is shown below.

In [8]:
# Imports
from elasticsearch.helpers import bulk
from tqdm import tqdm

In [9]:
# Data Generator
def gen():
    for row in tqdm(df.itertuples(), total=len(df)):
        output_yield = { 
            "_op_type": "index", 
            "_index": index_name
        }
        output_yield.update(row._asdict())
        output_yield.update({
            embedding_col: output_yield[embedding_col].tolist()
        })

        yield output_yield

# Upload data to ES in bulk
_, errors = bulk(es, gen(), chunk_size=500, max_retries=2)
assert len(errors) == 0, errors

# Refresh the data
es.indices.refresh(index_name, request_timeout=1000)   

# Warmup API    
res = requests.get("https://localhost:9200/_opendistro/_knn/warmup/"+index_name+"?pretty", auth=("admin", "admin"), verify=False)
print(res.text)

100%|██████████| 527543/527543 [04:29<00:00, 1955.64it/s]


{
  "_shards" : {
    "total" : 10,
    "successful" : 10,
    "failed" : 0
  }
}



## Querying Nearest Neighbors
Once we have uploaded all our data into the ES index, we can then query the "K" nearest neighbors for any new data point as follows.

In [10]:
# Query parameters
query_df = df.sample(1000).copy()
K = 5 # Number of neighbors
step = 200 # Number of items to query at a time

cols_to_query = ["Index", "title"]

In [11]:
# Update the search settings
body = {
  "settings": {
    "index": {"knn.algo_param.ef_search": 100}
  }        
}   

es.indices.put_settings(body=body, index=index_name)

{'acknowledged': True}

In [12]:
# Run the Query
responses = []
for n in tqdm(range(0, len(query_df), step)):
    subset_df = query_df.iloc[n:n+step,:]
    request = []

    for row in subset_df.itertuples():
        req_head = {"index": index_name}
        req_body = {
            "query": {
                "knn": {
                    embedding_col: {
                        "vector": getattr(row, embedding_col).tolist(), 
                        "k": K}
                }
            },
            "size": K,
            "_source": cols_to_query
        }

        request.extend([req_head, req_body])

    r = es.msearch(body=request)
    responses.extend(r['responses'])

# Convert the responses to dataframe columns
nn_data = {f'es_neighbors_{key}': [] for key in cols_to_query}

for item in tqdm(responses):
    nn = pd.concat(map(pd.DataFrame.from_dict, 
                       item['hits']['hits']), 
                   axis=1)['_source'].T.reset_index(drop=True)
    for key in cols_to_query:
        nn_data[f'es_neighbors_{key}'].append(nn[key]
                                              .values
                                              .tolist())

query_df = query_df.assign(**nn_data)
query_df.head()

100%|██████████| 5/5 [00:34<00:00,  6.85s/it]
100%|██████████| 1000/1000 [00:06<00:00, 163.84it/s]


Unnamed: 0,title,emb,es_neighbors_Index,es_neighbors_title
137539,MOTOROLA DROID RAZR M XT907 IMPACT CASE COVER ...,"[0.031467807, -0.048276246, 0.014576058, -0.02...","[137539, 135533, 135529, 135540, 192993]",[MOTOROLA DROID RAZR M XT907 IMPACT CASE COVER...
186522,Forever Collectibles NHL Dual Hybrid iPhone 4/...,"[0.023334723, 0.017239397, 0.03823308, -0.0193...","[158876, 158881, 158875, 186522, 186523]",[Forever Collectibles NHL Dual Hybrid iPhone 4...
344437,"Samsung Galaxy S6 Edge, White Pearl 64GB (Sprint)","[-0.0013037474, -0.0038629419, 0.05036281, 0.0...","[344437, 344439, 344445, 478780, 344444]","[Samsung Galaxy S6 Edge, White Pearl 64GB (Spr..."
27490,Amzer Super Clear Screen Protector with Cleani...,"[-0.021655105, -0.007051119, 0.024543112, -0.0...","[27490, 27484, 27483, 23949, 33349]",[Amzer Super Clear Screen Protector with Clean...
387250,[Benly-5] 1/8&quot; 3.5mm Stereo Mini Jack / F...,"[0.025798462, 0.028967546, 0.013909815, 0.0072...","[387250, 401900, 440491, 440501, 440465]",[[Benly-5] 1/8&quot; 3.5mm Stereo Mini Jack / ...


Let's take a closer look at the neighbors for one of the rows in query_df. They're all phone cases for the same phone indicating just how good the nearest neighbor search is!

In [13]:
query_df["es_neighbors_title"].iloc[0]

['MOTOROLA DROID RAZR M XT907 IMPACT CASE COVER + BLACK RUBBER SKIN PROTECTOR ACCESSORY CAMO DEER',
 'MOTOROLA DROID RAZR M XT907 CAMO DEER HEAVY DUTY CASE + BLACK GEL SKIN SNAP-ON PROTECTOR ACCESSORY',
 'MOTOROLA DROID RAZR M XT907 CAMO DEER HEAVY DUTY CASE + LIME GREEN GEL SKIN SNAP-ON PROTECTOR ACCESSORY',
 'MOTOROLA DROID RAZR M XT907 CAMO DRIED LEAVES HEAVY DUTY CASE + BLACK GEL SKIN SNAP-ON PROTECTOR ACCESSORY',
 'MOTOROLA DROID RAZR M XT907 NON SLIP GRAY HEAVY DUTY CASE + RED GEL SKIN SNAP-ON PROTECTOR ACCESSORY']

## Delete Index
Finally, after our experimentation is done (or if we made a mistake and need to delete everything to start from scratch), we can delete the whole index with a single line of code as below.

In [15]:
es.indices.delete(index=index_name)

{'acknowledged': True}

# OpenDistro's HNSW vs. Other ES ANNs

OpenDistro ES is now available in the [ann-benchmarks](https://github.com/erikbern/ann-benchmarks) repository that I introduced in [How to Choose the Best Nearest Neighbors Algorithm](https://medium.com/towards-artificial-intelligence/how-to-choose-the-best-nearest-neighbors-algorithm-8d75d42b16ab). OpenDistro's HNSW is ~3X faster at comparable Recall than other ES ANN algorithms, as shown below. Though ANN on ES cannot compete on single-core performance with standalone ANN algorithms due to ES overheads, the scalability to huge data makes OpenDistro ES's HNSW ANN plugin worthwhile to consider for large datasets!

![ANN Benchmarks comparison between various ES ANNs](images/custom-euclidean-onlyES.png)

Thank you for reading! All the code in this post is available on my [GitHub repository](https://github.com/stephenleo/adventures-with-ann/blob/main/ann_es_docker.ipynb).