# Qdrant Collection Creation

## Prerequisites

In [1]:
import random
from pprint import pprint
import time

from qdrant_client import QdrantClient
from qdrant_client.models import (
    Distance,
    VectorParams,
    OptimizersConfig,
    ScalarQuantizationConfig,
    ScalarType,
    ScalarQuantization,
    OptimizersConfigDiff,
    HnswConfigDiff,
    PointStruct,
    Filter,
    FieldCondition,
    MatchValue,
    Nested,
    NestedCondition,
    MatchAny
)
from sklearn.datasets import make_blobs
from ray.util.multiprocessing import Pool

## Global Variables

In [2]:
QDRANT_HOST = 'localhost'
QDRANT_PORT = 6333
QDRANT_GRPC_PORT = 6334

CLIENT = QdrantClient(
    url=QDRANT_HOST, 
    port=QDRANT_PORT, 
    # grpc_port=QDRANT_GRPC_PORT,
    # prefer_grpc=True
)

COLLECTION_NAME = 'test_collection'

NUMBER_OF_FEATURES = 192
NUMBER_OF_POINTS = 150_000
NUMBER_OF_CENTERS = 25

CATEGORIES = ['c1', 'c2', 'c3', 'c4', 'c5', 'c6', ]
ASSORTMENT = [
    ['a1', 'a3'],
    ['a3'],
    ['a1', 'a2'],
    ['a4'],
    ['a2', 'a7'],
    ['a2'],
    ['a4', 'a2'],
]

## Functions

In [3]:
def get_collection_info():
    collection_info = CLIENT.get_collection(COLLECTION_NAME)
    info = f"""
name: {COLLECTION_NAME} status: {collection_info.status.value}
=======================================================================
points count: {collection_info.points_count}
vector count: {collection_info.vectors_count}
indexed vectors count: {collection_info.indexed_vectors_count}

collection info: {collection_info}
"""
    print(info)

In [4]:
def create_vectors(n: int = NUMBER_OF_POINTS) -> list:
    vectors, _ = make_blobs(n_samples=n,
                            n_features=NUMBER_OF_FEATURES,
                            centers=NUMBER_OF_CENTERS,
                            cluster_std=2.0,
                            center_box=(-5, 5),
                            shuffle=True,
                            random_state=1)
    return [v.tolist() for v in vectors]


def create_points(n: int = NUMBER_OF_POINTS) -> list:
    vectors = create_vectors(n)
    points = []
    for i, vector in enumerate(vectors):
        point = {
            'id': i+1,
            'vector': vector,
            'payload': {
                'category': CATEGORIES[random.randint(0, len(CATEGORIES)-1)],
                'stores': [
                    {'id': j, 'assortment': ASSORTMENT[random.randint(0, len(ASSORTMENT)-1)]}
                for j in range(0, 25)],
            }
        }
        points.append(point)
    return points

In [5]:
def upsert_points(points: list):
    client = QdrantClient(host=QDRANT_HOST,
                          port=QDRANT_PORT,
                          grpc_port=QDRANT_GRPC_PORT,
                          prefer_grpc=True,
                          timeout=100_000)
    client.upsert(
        collection_name=COLLECTION_NAME, 
        points=[PointStruct(id=p['id'], 
                        vector=p['vector'], 
                        payload=p['payload']) for p in points]
    )


def chunks(l: list, n: int):
    for i in range(0, len(l), n):
        yield l[i:i+n]

In [6]:
VECTOR_PARAMS = VectorParams(size=NUMBER_OF_FEATURES, distance=Distance.COSINE)
QUANTIZATION_CONFIG = ScalarQuantization(scalar=ScalarQuantizationConfig(type=ScalarType.INT8, always_ram=True))
OPTIMIZERS_CONFIG = OptimizersConfigDiff(memmap_threshold=20_000, indexing_threshold=1)
HNSW_CONFIG = HnswConfigDiff(m=16, ef_construct=100, full_scan_threshold=10_000)


def create_index():
    field = 'category'
    CLIENT.create_payload_index(COLLECTION_NAME, field, 'keyword')
    # while CLIENT.get_collection(COLLECTION_NAME).payload_schema[field].points != NUMBER_OF_POINTS:
    #     print(f"Zzzz... Waiting for index on {field} to be created. ({CLIENT.get_collection(COLLECTION_NAME).payload_schema[field].points}/{NUMBER_OF_POINTS}).")
    #     time.sleep(5)

    field = 'stores[].id'
    CLIENT.create_payload_index(COLLECTION_NAME, field, 'integer')
    # while CLIENT.get_collection(COLLECTION_NAME).payload_schema[field].points != NUMBER_OF_POINTS:
    #     print(f"Zzzz... Waiting for index on {field} to be created. ({CLIENT.get_collection(COLLECTION_NAME).payload_schema[field].points}/{NUMBER_OF_POINTS}).")
    #     time.sleep(5)

    field = 'stores[].assortment'
    CLIENT.create_payload_index(COLLECTION_NAME, field, 'keyword')
    # while CLIENT.get_collection(COLLECTION_NAME).payload_schema[field].points != NUMBER_OF_POINTS:
    #     print(f"Zzzz... Waiting for index on {field} to be created. ({CLIENT.get_collection(COLLECTION_NAME).payload_schema[field].points}/{NUMBER_OF_POINTS}).")
    #     time.sleep(5)

                             
def create_collection(points: list):
    CLIENT.recreate_collection(collection_name=COLLECTION_NAME,
                               vectors_config=VECTOR_PARAMS,
                               shard_number=1,
                               on_disk_payload=False,
                               optimizers_config=OPTIMIZERS_CONFIG,
                               quantization_config=QUANTIZATION_CONFIG)
    create_index()
    
    pool = Pool()
    pool.map(upsert_points, chunks(points, 2_000))
    pool.close()
    pool.join()
    # for chunk in chunks(points, 2_000):
    #     upsert_points(chunk)

In [7]:
def search_collection(query_vector: list, filter=None, n=10) -> float:
    durations = []
    for _ in range(0, n):
        start_time = time.time()
        hits = CLIENT.search(collection_name=COLLECTION_NAME,
                             query_vector=query_vector,
                             query_filter=filter,
                             limit=10,
                             with_payload=False,
                             with_vectors=False)
        durations.append(time.time() - start_time)
    return sum(durations) / len(durations)

In [8]:
def get_top_level_filter() -> Filter:
    return Filter(must=[FieldCondition(key='category', match=MatchValue(value='c3'))])


def get_nested_filter() -> Filter:
    return Filter(
        must=[
            NestedCondition(
                nested=Nested(
                    key='stores', 
                    filter=Filter(
                        must=[
                            FieldCondition(key='id', match=MatchValue(value=4)),
                            FieldCondition(key='assortment', match=MatchAny(any=['a1'])),
                        ]
                    )
                )
            )
        ]
    )

## Create Collection & Index

In [9]:
create_collection(create_points(n=1))

2023-07-17 13:05:31,026	INFO worker.py:1636 -- Started a local Ray instance.


In [10]:
while CLIENT.get_collection(COLLECTION_NAME).status.value == 'yellow':
    time.sleep(10)
    get_collection_info()

## Search Qdrant

In [11]:
[query_vector] = create_vectors(n=1)

### W/O Filter

In [12]:
search_collection(query_vector, n=100)

0.005563859939575195

### Top Level Filter

In [13]:
search_collection(query_vector, filter=get_top_level_filter(), n=100)

0.0025892186164855955

### Nested Filter

In [14]:
search_collection(query_vector, filter=get_nested_filter(), n=100)

0.0035550570487976074

# Big Experiment

In [None]:
results = []

for number_of_points in range(0, NUMBER_OF_POINTS, 10_000):
    if number_of_points == 0:
        number_of_points = 1

    print(f"Creating collection with {number_of_points} points.")
    
    points = create_points(number_of_points)
    create_collection(points)

    print("Waiting for collection status to be green.")
    while CLIENT.get_collection(COLLECTION_NAME).status.value == 'yellow':
        time.sleep(30)
        get_collection_info()

    [query_vector] = create_vectors(n=1)
    duration_wo_filter = search_collection(query_vector, n=100)
    duration_top_level_filter = search_collection(query_vector, filter=get_top_level_filter(), n=100)
    duration_nested_filter = search_collection(query_vector, filter=get_nested_filter(), n=100)

    results.append({
        'number_of_points': number_of_points,
        'duration_wo_filter': duration_wo_filter,
        'duration_top_level_filter': duration_top_level_filter,
        'duration_nested_filter': duration_nested_filter,
    })

Creating collection with 1 points.
Waiting for collection status to be green.
Creating collection with 10000 points.
Waiting for collection status to be green.
Creating collection with 20000 points.
Waiting for collection status to be green.
Creating collection with 30000 points.
Waiting for collection status to be green.
Creating collection with 40000 points.
Waiting for collection status to be green.

name: test_collection status: green
points count: 40000
vector count: 40000
indexed vectors count: 40000

collection info: status=<CollectionStatus.GREEN: 'green'> optimizer_status=<OptimizersStatusOneOf.OK: 'ok'> vectors_count=40000 indexed_vectors_count=40000 points_count=40000 segments_count=4 config=CollectionConfig(params=CollectionParams(vectors=VectorParams(size=192, distance=<Distance.COSINE: 'Cosine'>, hnsw_config=None, quantization_config=None, on_disk=None), shard_number=1, replication_factor=1, write_consistency_factor=1, on_disk_payload=False), hnsw_config=HnswConfig(m=16, 

## Result

In [None]:
import matplotlib.pyplot as plt
x = [r['number_of_points' for r in results]
y_wo_filter = [r['duration_wo_filter' for r in results]
y_top_level_filter = [r['duration_top_level_filter' for r in results]
y_nested_filter = [r['duration_nested_filter' for r in results]

plt.plot(x, y_wo_filter, label='w/o filter')
plt.plot(x, y_top_level_filter, label='top level filter')
plt.plot(x, y_nested_filter, label='nested filter')


plt.xlabel("Number of Points")
plt.ylabel("Duration in Seconds")
plt.title('Duration Search')
plt.show()