# Use bulkInsert to test GPU index

## Env preparation

1. Docker installation: https://docs.docker.com/engine/install/ubuntu/
2. Install nvidia-docker2:

In [None]:
curl -s -L https://nvidia.github.io/nvidia-docker/gpgkey | sudo apt-key add -
distribution=$(. /etc/os-release;echo $ID$VERSION_ID)
curl -s -L https://nvidia.github.io/nvidia-docker/$distribution/nvidia-docker.list | sudo tee /etc/apt/sources.list.d/nvidia-docker.list
sudo apt-get update
sudo apt-get install nvidia-docker2
sudo systemctl restart docker.service

3. Install NVIDIA driver

In [None]:
sudo apt install --no-install-recommends  nvidia-headless-535 nvidia-utils-535

4. (Optional) mount a high performance disk for test. We need to ensure all following operations are in a high performance disk. For an AWS host, we need to manually mount the NVMe SSD (For example, g4dn).

In [None]:
lsblk # see device path
sudo mkfs -t ext4 /dev/nvme1n1
sudo mkdir /data
sudo mount /dev/nvme1n1 /data
sudo -i blkid # get /dev/nvme1n1 UUID, e.g. dd04113f-deb6-42b0-a021-03110c119295 
sudo vi /etc/fstab # add to the tail: UUID=<UUID get from previous cmd> /data ext4 defaults 1 2
cd /data

## Download milvus image

In [None]:
sudo docker pull milvusdb/milvus:v2.4.0-rc.1

## Use docker compose to start the milvus service 

Save the following file as docker-compose.yml.

version: '3.5'

services:
  etcd:
    container_name: milvus-etcd
    image: quay.io/coreos/etcd:v3.5.5
    environment:
      - ETCD_AUTO_COMPACTION_MODE=revision
      - ETCD_AUTO_COMPACTION_RETENTION=1000
      - ETCD_QUOTA_BACKEND_BYTES=4294967296
      - ETCD_SNAPSHOT_COUNT=50000
    volumes:
      - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/etcd:/etcd
    command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd
    healthcheck:
      test: ["CMD", "etcdctl", "endpoint", "health"]

  minio:
    container_name: milvus-minio
    image: minio/minio:RELEASE.2023-03-20T20-16-18Z
    environment:
      MINIO_ACCESS_KEY: minioadmin
      MINIO_SECRET_KEY: minioadmin
    ports:
      - "9001:9001"
      - "9000:9000"
    volumes:
      - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/minio:/minio_data
    command: minio server /minio_data --console-address ":9001"
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
      interval: 30s
      timeout: 20s
      retries: 3

  standalone:
    container_name: milvus-standalone
    image: milvusdb/milvus:v2.4.0.1-gpu-beta
    command: ["milvus", "run", "standalone"]
    environment:
      ETCD_ENDPOINTS: etcd:2379
      MINIO_ADDRESS: minio:9000
    volumes:
      - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/milvus:/var/lib/milvus
    ports:
      - "19530:19530"
      - "9091:9091"
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              capabilities: ["gpu"]
              device_ids: ["0"]
    depends_on:
      - "etcd"
      - "minio"

networks:
  default:
    name: milvus


Use cmd: 

In [None]:
sudo docker compose up -d

## Prepare data 

Note: We may need to use sudo since we are in /data, and all pip command need to be under sudo.
Set the dataset as an environment variable：

In [None]:
export DATASET="cohere" # or "openai"


In [None]:
pip3 install polars
pip3 install numpy
pip3 install s3fs
pip3 install environs


In [None]:
import polars as pl
import numpy as np
import os
import shutil
import s3fs
import environs
env = environs.Env()
env.read_env(".env")
dataset_name = env.str("DATASET", "cohere")
parquet_path = dataset_name + "_data/"
npy_path = dataset_name + "_npy_data/"

base_file = parquet_path + "shuffle_train.parquet"
query_file = parquet_path + "test.parquet"
output_base = npy_path + "base.npy"
output_id = npy_path + "id.npy"
try: 
    shutil.rmtree(parquet_path)
except:
    pass   
try:
    shutil.rmtree(npy_path)
except:
    pass
os.mkdir(parquet_path)
os.mkdir(npy_path)
#download s3 file
fs = s3fs.S3FileSystem(anon=True, client_kwargs={"region_name": "us-west-2"})
if dataset_name == "cohere":
    s3_path = "assets.zilliz.com/benchmark/cohere_medium_1m"
elif dataset_name == "openai":
    s3_path = "assets.zilliz.com/benchmark/openai_medium_500k"
dataset_info = fs.ls(s3_path, detail=True)

downloads = []
for info in dataset_info:
    downloads.append(info['Key'])
print("download files:", downloads)
fs.download(downloads, parquet_path)

df_train = pl.read_parquet(base_file)
base = np.stack(df_train['emb']).astype(np.float32)
id = np.stack(df_train['id']).astype(np.int64)
all_embeddings = base / np.linalg.norm(base, axis=1)[:, np.newaxis]
np.save(output_base, all_embeddings)
np.save(output_id, id)

# Script for bulkinsert

In [None]:
pip3 install pymilvus

In [None]:
from pymilvus import CollectionSchema, FieldSchema, DataType, utility, connections, Collection, list_collections

import time
import struct
import numpy as np
import os
import random
import json
import time
import os

from minio import Minio
from minio.error import S3Error

import environs
env = environs.Env()
env.read_env(".env")
dataset_name = env.str("DATASET", "cohere")
data_minio_path = dataset_name + "_npy_data/"

collection_name = "VectorDBBenchCollection"
id_path = data_minio_path + "/id.npy"
data_path = data_minio_path + "/base.npy"
def get_base_shape():
    base_data = np.load(data_path, mmap_mode='r')
    return base_data.shape
(nb, dim) = get_base_shape()
connections.connect(host="localhost", port=19530)

# minio
DEFAULT_BUCKET_NAME = "a-bucket"
MINIO_ADDRESS = "0.0.0.0:9000"
MINIO_SECRET_KEY = "minioadmin"
MINIO_ACCESS_KEY = "minioadmin"

def upload(data_folder: str,
           bucket_name: str=DEFAULT_BUCKET_NAME)->(bool, list):
    if not os.path.exists(data_folder):
        print("Data path '{}' doesn't exist".format(data_folder))
        return False, []

    remote_files = []
    try:
        print("Prepare upload files")
        minio_client = Minio(endpoint=MINIO_ADDRESS, access_key=MINIO_ACCESS_KEY, secret_key=MINIO_SECRET_KEY, secure=False)
        found = minio_client.bucket_exists(bucket_name)
        if not found:
            print("MinIO bucket '{}' doesn't exist".format(bucket_name))
            return False, []

        remote_data_path = "milvus_bulkinsert"
        def upload_files(folder:str):
            for parent, dirnames, filenames in os.walk(folder):
                if parent is folder:
                    for filename in filenames:
                        ext = os.path.splitext(filename)
                        if len(ext) != 2 or (ext[1] != ".json" and ext[1] != ".npy"):
                            continue
                        local_full_path = os.path.join(parent, filename)
                        minio_file_path = os.path.join(remote_data_path, os.path.basename(folder), filename)
                        minio_client.fput_object(bucket_name, minio_file_path, local_full_path)
                        print("Upload file '{}' to '{}'".format(local_full_path, minio_file_path))
                        remote_files.append(minio_file_path)
                    for dir in dirnames:
                        upload_files(os.path.join(parent, dir))

        upload_files(data_folder)

    except S3Error as e:
        print("Failed to connect MinIO server {}, error: {}".format(MINIO_ADDRESS, e))
        return False, []

    print("Successfully upload files: {}".format(remote_files))
    return True, remote_files

print(f"\nList collections...")
collection_list = list_collections()
print(list_collections())

if(collection_list.count(collection_name)):
    print(collection_name, " exist, and drop it")
    collection = Collection(collection_name)
    collection.drop()
    print("drop")
print("create collection")
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=False),
    FieldSchema(name="base", dtype=DataType.FLOAT_VECTOR, dim=dim)
]

schema = CollectionSchema(fields)

coll = Collection(collection_name, schema)

begin_t = time.time()
ok, remote_files = upload(data_folder=data_minio_path)

print("do_bulk_insert")
task_id = utility.do_bulk_insert(
    collection_name=collection_name,
    files=remote_files)

print("wait insert")
while True:
    if coll.num_entities == nb:
        coll.flush()
        break
    time.sleep(1)
insert_t  = time.time()
print("bulk insert time:", insert_t - begin_t)

print("create index")
try:
    coll.create_index(field_name="base",
        index_params={'index_type': 'GPU_CAGRA',  
            'metric_type': 'L2',
            'params': {
                'intermediate_graph_degree':64,
                'graph_degree': 32,
                'M':14,
                'efConstruction': 360,
                "nlist":1024,
                }})
except Exception as e:
    print(f"index error: {e}")
    raise e from None

def wait_index():
    while True:
        progress = utility.index_building_progress(collection_name)
        print(progress)
        if progress.get("pending_index_rows", -1) == 0:
            break
        time.sleep(5)
        
print("wait index")
wait_index()
index_t  = time.time()
print("create index time :", index_t - insert_t)

print("load index")
coll.load()
load_t = time.time()
print("load index time :", load_t - index_t)
print("total time:", load_t - begin_t)

## Script for recall and QPS

In [None]:
from multiprocessing import Process, Pool
from pymilvus import CollectionSchema, FieldSchema, DataType, utility, connections, Collection
import time
import struct
import polars as pl
import numpy as np
import environs
env = environs.Env()
env.read_env(".env")
dataset_name = env.str("DATASET", "cohere")
data_path = dataset_name + "_data/"

gt_path = data_path + "/neighbors.parquet"
queries_path = data_path + "/test.parquet"
topk = 10
search_params = {
    "metric_type": "L2", 
    "params": {
        "ef": 200, # 200 for Cohere, 100 for OpenAI to get 98% recall
        "search_width":1,
        "itopk_size": 128,
        "min_iterations":0,
        "max_iterations":34, # 34 for Cohere, 19 for OpenAI to get 98% recall
        }
    }
collection_name = "VectorDBBenchCollection"
connections.connect(host="localhost", port=19530)

def get_gt():
    ans = pl.read_parquet(gt_path)["neighbors_id"]
    res = []
    print(len(ans), type(ans))
    for i in range(len(ans)):
        one_ans = [id for id in ans[i]]
        res.append(one_ans)
    return res 

def get_total_quries():
    test_data =  pl.read_parquet(queries_path)
    test_emb = np.stack(test_data["emb"]).astype(np.float32)
    test_emb = test_emb / np.linalg.norm(test_emb, axis=1)[:, np.newaxis]
    test_emb = test_emb.tolist()
    return test_emb

def get_queries(nq):
    test_emb = get_total_quries()
    num = len(test_emb)
    idx = 0
    queries = []
    while idx < num:
        queries.append(test_emb[idx:idx + nq])
        idx = idx + nq
    return queries

def search(search_params, loop, limit = -1):
    queries = get_total_quries()
    collection = Collection(collection_name)
    collection.load()
    print("loaded")
    to_query = queries
    if (limit > 0):
        to_query = queries[0: limit]
    start = time.time()
    passed = 0.0
    index = 0
    gt = get_gt()
    recall = 0.0
    for i in range(loop):
        for query in to_query:
            t1 = time.time()
            results = collection.search(
                data=[query], 
                anns_field="base", 
                param=search_params, 
                limit=topk, 
                expr=None,
                consistency_level="Eventually"
            )
            passed += (time.time() - t1)
            hit = 0
            for r in results:
                for t in r:
                    if t.id in gt[index][0:topk]:
                        hit += 1
            recall += float(hit) / topk
            index +=1
        print ("recall:", recall / float(len(to_query)))
        print("time usage: " + str(time.time() - start) + ", latency: " + str(passed / len(to_query) / (i + 1)) + ", qps: " + str(len(to_query) * (i + 1) / passed))  

def non_stop_search(name, search_params, queries, run_time):
    collection = Collection(name)
    nq = 0
    passed = 0
    while (True):
        for query in queries:
            t1 = time.time()
            results = collection.search(
                data=query, 
                anns_field="base", 
                param=search_params, 
                limit=topk, 
                expr=None,
                consistency_level="Eventually"
            )
            nq += 1
            passed += (time.time() - t1)
            if (passed > run_time):
                return [nq, passed]

def parallel_search(name, search_params, queries, num_threads, run_time):
    pool = Pool(num_threads)

    inputs = []
    for i in range(num_threads):
        inputs.append((name, search_params, queries, run_time))
    t1 = time.time()
    outputs = pool.starmap(non_stop_search, inputs)
    t2 = time.time()

    sumq = 0
    sumt = 0.0
    for re in outputs:
        sumq += re[0]
        sumt += re[1]
    print("time usage: " + str(t2 - t1) + ", latency: " + str(sumt / sumq) + ", qps: " + str(sumq * num_threads / sumt))  

def do():
    collection = Collection(collection_name)
    collection.load()
    print("loaded")
    for nq in [1, 10, 100]:
        print("Run for NQ: ", nq)
        queries = get_queries(nq)
        parallel_search(collection_name, search_params, queries, num_threads = 500, run_time = 60)

print("serial execution to get search recall:")
search(search_params, 1)
print("concurrent execution(50 threads) to get search qps:")
do()