
# Similar Questions Retrieval - Milvus - CAGRA-HNSW

This notebook is inspired by the [similar search example of Sentence-Transformers](https://www.sbert.net/examples/applications/semantic-search/README.html#similar-questions-retrieval), and adapted to be used with [Milvus](https://milvus.io) and [cuVS](https://rapids.ai/cuvs/).

The model was pre-trained on the [Natural Questions dataset](https://ai.google.com/research/NaturalQuestions). It consists of about 100k real Google search queries, together with an annotated passage from Wikipedia that provides the answer. It is an example of an asymmetric search task. As corpus, we use the smaller [Simple English Wikipedia](http://sbert.net/datasets/simplewiki-2020-11-01.jsonl.gz) so that it fits easily into memory.

The steps to install the latest Milvus package are available in the [Milvus documentation](https://milvus.io/docs/quickstart.md).

In [None]:
!pip install sentence_transformers torch pymilvus pymilvus[bulk_writer] dask dask[distributed]

# Note: if you have a Hopper based GPU, like an H100, use these to install:
# pip install torch --index-url https://download.pytorch.org/whl/cu118
# pip install sentence_transformers

In [None]:
!nvidia-smi

In [None]:
import dask.array as da
import gzip
import json
import math
import numpy as np
import os
import pymilvus
import time
import torch

from minio import Minio
from multiprocessing import Process
from sentence_transformers import SentenceTransformer, CrossEncoder, util
from typing import List


from pymilvus import (
    connections, utility
)
from pymilvus.bulk_writer import LocalBulkWriter, BulkFileType  # pip install pymilvus[bulk_writer]

if not torch.cuda.is_available():
  print("Warning: No GPU found. Please add GPU to your notebook")

# Setup Milvus Collection

In [None]:
DIM = 768
MILVUS_PORT = 30004
MILVUS_HOST = f"http://localhost:{MILVUS_PORT}"
ID_FIELD="id"
EMBEDDING_FIELD="embedding"

collection_name = "simple_wiki"

def get_milvus_client():
    return pymilvus.MilvusClient(uri=MILVUS_HOST)

client = get_milvus_client()

fields = [
    pymilvus.FieldSchema(name=ID_FIELD, dtype=pymilvus.DataType.INT64, is_primary=True),
    pymilvus.FieldSchema(name=EMBEDDING_FIELD, dtype=pymilvus.DataType.FLOAT_VECTOR, dim=DIM)
]

schema = pymilvus.CollectionSchema(fields)
schema.verify()

if collection_name in client.list_collections():
    print(f"Collection '{collection_name}' already exists. Deleting collection...")
    client.drop_collection(collection_name)

client.create_collection(collection_name, schema=schema, dimension=DIM, vector_field_name=EMBEDDING_FIELD)
collection = pymilvus.Collection(name=collection_name, using=client._using)
collection.release()
collection.drop_index()


# Setup Sentence Transformer model

In [None]:
# We use the Bi-Encoder to encode all passages, so that we can use it with semantic search
model_name = 'nq-distilbert-base-v1'
bi_encoder = SentenceTransformer(model_name)

# As dataset, we use Simple English Wikipedia. Compared to the full English wikipedia, it has only
# about 170k articles. We split these articles into paragraphs and encode them with the bi-encoder

wikipedia_filepath = 'data/simplewiki-2020-11-01.jsonl.gz'

if not os.path.exists(wikipedia_filepath):
    util.http_get('http://sbert.net/datasets/simplewiki-2020-11-01.jsonl.gz', wikipedia_filepath)

passages = []
with gzip.open(wikipedia_filepath, 'rt', encoding='utf8') as fIn:
    for line in fIn:
        data = json.loads(line.strip())
        for paragraph in data['paragraphs']:
            # We encode the passages as [title, text]
            passages.append([data['title'], paragraph])

# If you like, you can also limit the number of passages you want to use
print("Passages:", len(passages))

# To speed things up, pre-computed embeddings are downloaded.
# The provided file encoded the passages with the model 'nq-distilbert-base-v1'
if model_name == 'nq-distilbert-base-v1':
    embeddings_filepath = 'simplewiki-2020-11-01-nq-distilbert-base-v1.pt'
    if not os.path.exists(embeddings_filepath):
        util.http_get('http://sbert.net/datasets/simplewiki-2020-11-01-nq-distilbert-base-v1.pt', embeddings_filepath)

    corpus_embeddings = torch.load(embeddings_filepath, map_location='cpu', weights_only=True).float()  # Convert embedding file to float
    #if torch.cuda.is_available():
    #    corpus_embeddings = corpus_embeddings.to('cuda')
else:  # Here, we compute the corpus_embeddings from scratch (which can take a while depending on the GPU)
    corpus_embeddings = bi_encoder.encode(passages, convert_to_tensor=True, show_progress_bar=True).to('cpu')

# Vector Search using Milvus and RAPIDS cuVS 
Now that our embeddings are ready to be indexed and that the model has been loaded, we can use Milvus and RAPIDS cuVS to do our vector search.

This is done in 3 steps: First we ingest all the vectors in the Milvus collection, then we build the Milvus index, to finally search it.

In [None]:
# minio
MINIO_PORT = 30009
MINIO_URL = f"localhost:{MINIO_PORT}"
MINIO_SECRET_KEY = "minioadmin"
MINIO_ACCESS_KEY = "minioadmin"

def upload_to_minio(file_paths: List[List[str]], remote_paths: List[List[str]], bucket_name="milvus-bucket"):
    minio_client = Minio(endpoint=MINIO_URL, access_key=MINIO_ACCESS_KEY, secret_key=MINIO_SECRET_KEY, secure=False)
    if not minio_client.bucket_exists(bucket_name):
        minio_client.make_bucket(bucket_name)

    for local_batch, remote_batch in zip(file_paths, remote_paths):
        for local_file, remote_file in zip(local_batch, remote_batch):
            minio_client.fput_object(bucket_name, 
                                     object_name=remote_file,
                                     file_path=local_file,
                                     part_size=512 * 1024 * 1024,
                                     num_parallel_uploads=5)
     
    
def ingest_data_bulk(collection_name, vectors, schema: pymilvus.CollectionSchema, log_times=True, bulk_writer_type="milvus", debug=False):
    print(f"-  Ingesting {len(vectors) // 1000}k vectors, Bulk")
    tic = time.perf_counter()
    collection = pymilvus.Collection(collection_name, using=get_milvus_client()._using)
    remote_path = None

    if bulk_writer_type == 'milvus':
        # # Prepare source data for faster ingestion
        writer = LocalBulkWriter(
            schema=schema,
            local_path='bulk_data',
            segment_size=512 * 1024 * 1024, # Default value
            file_type=BulkFileType.NPY
        )
        for id, vec in enumerate(vectors):
            writer.append_row({ID_FIELD: id, EMBEDDING_FIELD: vec})

        if debug:
            print(writer.batch_files)
        def callback(file_list):
            if debug:
                print(f"  -  Commit successful")
                print(file_list)
        writer.commit(call_back=callback)
        files_to_upload = writer.batch_files
    elif bulk_writer_type == 'dask':
        # Prepare source data for faster ingestion
        if not os.path.isdir("bulk_data"):
            os.mkdir("bulk_data")

        from dask.distributed import Client, LocalCluster
        cluster = LocalCluster(n_workers=1, threads_per_worker=1)
        client = Client(cluster)

        chunk_size = 100000
        da_vectors = da.from_array(vectors, chunks=(chunk_size, vectors.shape[1]))
        da_ids = da.arange(len(vectors), chunks=(chunk_size,))
        da.to_npy_stack("bulk_data/da_embedding/", da_vectors)
        da.to_npy_stack("bulk_data/da_id/", da_ids)
        files_to_upload = []
        remote_path = []
        for chunk_nb in range(math.ceil(len(vectors) / chunk_size)):
            files_to_upload.append([f"bulk_data/da_embedding/{chunk_nb}.npy", f"bulk_data/da_id/{chunk_nb}.npy"])
            remote_path.append([f"bulk_data/da_{chunk_nb}/embedding.npy", f"bulk_data/da__{chunk_nb}/id.npy"])

    elif bulk_writer_type == 'numpy':
        # Directly save NPY files
        np.save("bulk_data/embedding.npy", vectors)
        np.save("bulk_data/id.npy", np.arange(len(vectors)))
        files_to_upload = [["bulk_data/embedding.npy", "bulk_data/id.npy"]]
    else:
        raise ValueError("Invalid bulk writer type")
    
    toc = time.perf_counter()
    if log_times:
        print(f"  -  File save time: {toc - tic:.2f} seconds")
    # Import data
    if remote_path is None:
        remote_path = files_to_upload
    upload_to_minio(files_to_upload, remote_path)
    
    job_ids = [utility.do_bulk_insert(collection_name, batch, using=get_milvus_client()._using) for batch in remote_path]

    while True:
        tasks = [utility.get_bulk_insert_state(job_id, using=get_milvus_client()._using) for job_id in job_ids]
        success = all(task.state_name == "Completed" for task in tasks)
        failure = any(task.state_name == "Failed" for task in tasks)
        for i in range(len(tasks)):
            task = tasks[i]
            if debug:
                print(f"  -  Task {i}/{len(tasks)} state: {task.state_name}, Progress percent: {task.infos['progress_percent']}, Imported row count: {task.row_count}")
            if task.state_name == "Failed":
                print(task)
        if success or failure:
            break
        time.sleep(2)

    added_entities = str(sum([task.row_count for task in tasks]))
    failure = failure or added_entities != str(len(vectors))
    if failure:
        print(f"-  Ingestion failed. Added entities: {added_entities}")
    toc = time.perf_counter()
    if log_times:
        datasize = vectors.nbytes / 1024 / 1024
        print(f"-  Ingestion time: {toc - tic:.2f} seconds. ({(datasize / (toc-tic)):.2f}MB/s)")

ingest_data_bulk(collection_name, np.array(corpus_embeddings), schema, bulk_writer_type='dask', log_times=True)

In [None]:
# Setups the IVFPQ index

index_params = dict(
    index_type="GPU_IVF_PQ",
    metric_type="L2",
    params={"nlist": 150, # Number of clusters
            "m": 96})      # Product Quantization dimension

# Drop the index if it exists
if collection.has_index():
    collection.release()
    collection.drop_index()

# Create the index
tic = time.perf_counter()
collection.create_index(field_name=EMBEDDING_FIELD, index_params=index_params)
collection.load()
toc = time.perf_counter()
print(f"-  Index creation time: {toc - tic:.4f} seconds. ({index_params})")

In [8]:
# Search the index
def search_cuvs_pq(query, top_k = 5, n_probe = 30):
    # Encode the query using the bi-encoder and find potentially relevant passages
    question_embedding = bi_encoder.encode(query, convert_to_tensor=True)

    search_params = {"nprobe": n_probe}
    tic = time.perf_counter()
    hits = collection.search(
                data=np.array(question_embedding[None].cpu()), anns_field=EMBEDDING_FIELD, param=search_params, limit=top_k
            )
    toc = time.perf_counter()

    # Output of top-k hits
    print("Input question:", query)
    print("Results (after {:.3f} ms):".format((toc - tic)*1000))
    for k in range(top_k):
        print("\t{:.3f}\t{}".format(hits[0][k].distance, passages[hits[0][k].id]))

The ideal use-case for the IVF-PQ algorithm is when there is a need to reduce the memory footprint while keeping a good accuracy.

In [None]:
search_cuvs_pq(query="Who was Grace Hopper?")

In [None]:
search_cuvs_pq(query="Who was Alan Turing?")

In [None]:
search_cuvs_pq(query = "What is creating tides?")

In [None]:
# Drop the current index if it exists
if collection.has_index():
    collection.release()
    collection.drop_index()

# Create the IVF Flat index
index_params = dict(
    index_type="GPU_IVF_FLAT",
    metric_type="L2",
    params={"nlist": 150}) # Number of clusters)
tic = time.perf_counter()
collection.create_index(field_name=EMBEDDING_FIELD, index_params=index_params)
collection.load()
toc = time.perf_counter()
print(f"-  Index creation time: {toc - tic:.4f} seconds. ({index_params})")

In [13]:
def search_cuvs_flat(query, top_k = 5, n_probe = 30):
    # Encode the query using the bi-encoder and find potentially relevant passages
    question_embedding = bi_encoder.encode(query, convert_to_tensor=True)
    
    search_params = {"nprobe": n_probe}
    tic = time.perf_counter()
    hits = collection.search(
                data=np.array(question_embedding[None].cpu()), anns_field=EMBEDDING_FIELD, param=search_params, limit=top_k
            )
    toc = time.perf_counter()

    # Output of top-k hits
    print("Input question:", query)
    print("Results (after {:.3f} ms):".format((toc - tic)*1000))
    for k in range(top_k):
        print("\t{:.3f}\t{}".format(hits[0][k].distance, passages[hits[0][k].id]))

In [None]:
search_cuvs_flat(query="Who was Grace Hopper?")

In [None]:
search_cuvs_flat(query="Who was Alan Turing?")

In [None]:
search_cuvs_flat(query = "What is creating tides?")

## Using CAGRA: Hybrid GPU-CPU graph-based Vector Search

CAGRA is a graph-based nearest neighbors implementation with state-of-the art performance for both small- and large-batch sized vector searches. 

CAGRA follows the same steps as IVF-FLAT and IVF-PQ in Milvus, but is also able to be adapted for querying on CPU.
This means that CAGRA is able to profit from a high training speed on GPU, as well as a low inference time on CPU, that minimize latency even on the smallest queries.

In [None]:
# Drop the current index if it exists
if collection.has_index():
    collection.release()
    collection.drop_index()

# Create the IVF Flat index
index_params = dict(
    index_type="GPU_CAGRA",
    metric_type="L2",
    params={"graph_degree": 64, "intermediate_graph_degree": 128, "build_algo": "NN_DESCENT", "adapt_for_cpu": True})
tic = time.perf_counter()
collection.create_index(field_name=EMBEDDING_FIELD, index_params=index_params)
collection.load()
toc = time.perf_counter()
print(f"-  Index creation time: {toc - tic:.4f} seconds. ({index_params})")

In [18]:
def search_cuvs_cagra(query, top_k = 5, itopk = 32):
    # Encode the query using the bi-encoder and find potentially relevant passages
    question_embedding = bi_encoder.encode(query, convert_to_tensor=True)

    search_params = {"params": {"itopk": itopk, "ef": 35}}
    tic = time.perf_counter()
    hits = collection.search(
                data=np.array(question_embedding[None].cpu()), anns_field=EMBEDDING_FIELD, param=search_params, limit=top_k
            )
    toc = time.perf_counter()

    # Output of top-k hits
    print("Input question:", query)
    print("Results (after {:.3f} ms):".format((toc - tic)*1000))
    for k in range(top_k):
        print("\t{:.3f}\t{}".format(hits[0][k].distance, passages[hits[0][k].id]))

In [None]:
search_cuvs_cagra(query="Who was Grace Hopper?")

In [None]:
search_cuvs_cagra(query="Who was Alan Turing?")

In [None]:
search_cuvs_cagra(query="What is creating tides?")