Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: hybrid search with fastembed #553

Merged
merged 9 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 102 additions & 96 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ grpcio-tools = ">=1.41.0"
urllib3 = ">=1.26.14,<3"
portalocker = "^2.7.0"
fastembed = [
{ version = "0.2.2", optional = true, python = "<3.13" }
{ version = "0.2.5", optional = true, python = "<3.13" }
]

[tool.poetry.group.dev.dependencies]
Expand Down
285 changes: 245 additions & 40 deletions qdrant_client/async_qdrant_fastembed.py

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions qdrant_client/conversions/common_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def get_args_subscribed(tp: type): # type: ignore
SnapshotDescription: TypeAlias = rest.SnapshotDescription
NamedVector: TypeAlias = rest.NamedVector
NamedSparseVector: TypeAlias = rest.NamedSparseVector
SparseVector: TypeAlias = rest.SparseVector
PointVectors: TypeAlias = rest.PointVectors
Vector: TypeAlias = rest.Vector
VectorStruct: TypeAlias = rest.VectorStruct
Expand Down
5 changes: 4 additions & 1 deletion qdrant_client/fastembed_common.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from typing import Any, Dict, List, Optional, Union

from pydantic import BaseModel
from pydantic import BaseModel, Field

from qdrant_client.conversions.common_types import SparseVector


class QueryResponse(BaseModel, extra="forbid"): # type: ignore
id: Union[str, int]
embedding: Optional[List[float]]
sparse_embedding: Optional[SparseVector] = Field(default=None)
metadata: Dict[str, Any]
document: str
score: float
Empty file.
31 changes: 31 additions & 0 deletions qdrant_client/hybrid/fusion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from typing import Dict, List

from qdrant_client.http import models


def reciprocal_rank_fusion(
responses: List[List[models.ScoredPoint]], limit: int = 10
) -> List[models.ScoredPoint]:
def compute_score(pos: int) -> float:
ranking_constant = (
2 # the constant mitigates the impact of high rankings by outlier systems
)
return 1 / (ranking_constant + pos)

scores: Dict[models.ExtendedPointId, float] = {}
point_pile = {}
for response in responses:
for i, scored_point in enumerate(response):
if scored_point.id in scores:
scores[scored_point.id] += compute_score(i)
else:
point_pile[scored_point.id] = scored_point
scores[scored_point.id] = compute_score(i)
Comment on lines +17 to +23
Copy link
Contributor

@NirantK NirantK Mar 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we limit the responses that get processed inside the loop?

Since we know that we've only returned 10, perhaps responses can be some multiple of that?

Here is the scenario which I'm trying to avoid: Responses are quite large, let's say a thousand, and then we end up running the loop for those. Alternatively we can implement this in a Numpy matrix operation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate on limiting the responses?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might try rewriting it with numpy, but I am not sure whether it actually worth it, we would still need to iterate over all the responses to map the ids to the scores and then we will need to sum up the scores, etc.

Copy link
Contributor

@NirantK NirantK Mar 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we limit the number of responses we evaluate? For loops can be large and slow for when someone passes too large a list.

Numpy implementation saves some of this compute fwiw: qdrant/fastembed#165

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the implementation in qdrant/fastembed#165
involves 3 times more loops

1. all_items = set(item for rank_list in rank_lists for item, _ in rank_list)
2.  item_to_index = {item: idx for idx, item in enumerate(all_items)}
3. for list_idx, rank_list in enumerate(rank_lists):
        for item, rank in rank_list:
            rank_matrix[item_to_index[item], list_idx] = rank

So the question is whether it is actually more efficient? (I don't argue that it might be more efficient due to the arithmetic operations, I am just a little bit doubtful)

Can we limit the number of responses we evaluate? For loops can be large and slow for when someone passes too large a list.

I don't really see how to do it at the moment, we can't sort responses with <O(n)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

numpy version is actually slower

from typing import List, Dict
import time

import numpy as np

from qdrant_client import models


def rrf(rank_lists, alpha=2, default_rank=1000):
    """
    Optimized Reciprocal Rank Fusion (RRF) using NumPy for large rank lists.

    :param rank_lists: A list of rank lists. Each rank list should be a list of (item, rank) tuples.
    :param alpha: The parameter alpha used in the RRF formula. Default is 60.
    :param default_rank: The default rank assigned to items not present in a rank list. Default is 1000.
    :return: Sorted list of items based on their RRF scores.
    """
    # Consolidate all unique items from all rank lists
    all_items = set(item for rank_list in rank_lists for item, _ in rank_list)

    # Create a mapping of items to indices
    item_to_index = {item: idx for idx, item in enumerate(all_items)}

    # Initialize a matrix to hold the ranks, filled with the default rank
    rank_matrix = np.full((len(all_items), len(rank_lists)), default_rank)

    # Fill in the actual ranks from the rank lists
    for list_idx, rank_list in enumerate(rank_lists):
        for item, rank in rank_list:
            rank_matrix[item_to_index[item], list_idx] = rank

    # Calculate RRF scores using NumPy operations
    rrf_scores = np.sum(1.0 / (alpha + rank_matrix), axis=1)

    # Sort items based on RRF scores
    sorted_indices = np.argsort(-rrf_scores)  # Negative for descending order

    # Retrieve sorted items
    sorted_items = [(list(item_to_index.keys())[idx], rrf_scores[idx]) for idx in sorted_indices]

    return sorted_items


def rank_list(search_result: List[models.ScoredPoint]):
    return [(point.id, rank + 1) for rank, point in enumerate(search_result)]


def reciprocal_rank_fusion(
    responses: List[List[models.ScoredPoint]], limit: int = 1000
) -> List[models.ScoredPoint]:
    def compute_score(pos: int) -> float:
        ranking_constant = (
            2  # the constant mitigates the impact of high rankings by outlier systems
        )
        return 1 / (ranking_constant + pos)

    scores: Dict[models.ExtendedPointId, float] = {}
    point_pile = {}
    for response in responses:
        for i, scored_point in enumerate(response):
            if scored_point.id in scores:
                scores[scored_point.id] += compute_score(i)
            else:
                point_pile[scored_point.id] = scored_point
                scores[scored_point.id] = compute_score(i)

    sorted_scores = sorted(scores.items(), key=lambda item: item[1], reverse=True)
    return sorted_scores[:limit]
    # sorted_points = []  # commented out to make the output the same as the numpy version
    # for point_id, score in sorted_scores[:limit]:
    #     point = point_pile[point_id]
    #     point.score = score
    #     sorted_points.append(point)
    # return sorted_points


if __name__ == '__main__':
    import random

    num_points = 1000
    ids = list(range(num_points))
    a = [
        models.ScoredPoint(
            id=ids[i],
            score=random.random(),
            version=1,
        ) for i in range(num_points)
    ]
    random.shuffle(ids)
    b = [
        models.ScoredPoint(
            id=ids[i],
            score=random.random(),
            version=1,
        ) for i in range(num_points)
    ]

    random.shuffle(ids)
    c = [
        models.ScoredPoint(
            id=ids[i],
            score=random.random(),
            version=1,
        ) for i in range(num_points)
    ]

    start = time.perf_counter()
    rrf([rank_list(a), rank_list(b), rank_list(c)])
    print('numpy, scored point conversion included', time.perf_counter() - start)

    l_a = rank_list(a)
    l_b = rank_list(b)
    l_c = rank_list(c)

    start = time.perf_counter()
    rrf([l_a, l_b, l_c])
    print('numpy, scored point conversion excluded', time.perf_counter() - start)

    start = time.perf_counter()
    reciprocal_rank_fusion([a, b, c], limit=len(a))
    print('lists', time.perf_counter() - start)
numpy, scored point conversion included 0.004260916990460828
numpy, scored point conversion excluded 0.0036911249917466193
lists 0.0007186669972725213


sorted_scores = sorted(scores.items(), key=lambda item: item[1], reverse=True)
sorted_points = []
for point_id, score in sorted_scores[:limit]:
point = point_pile[point_id]
point.score = score
sorted_points.append(point)
return sorted_points
24 changes: 24 additions & 0 deletions qdrant_client/hybrid/test_reranking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from qdrant_client.http import models
from qdrant_client.hybrid.fusion import reciprocal_rank_fusion


def test_reciprocal_rank_fusion() -> None:
responses = [
[
models.ScoredPoint(id="1", score=0.1, version=1),
models.ScoredPoint(id="2", score=0.2, version=1),
models.ScoredPoint(id="3", score=0.3, version=1),
],
[
models.ScoredPoint(id="5", score=12.0, version=1),
models.ScoredPoint(id="6", score=8.0, version=1),
models.ScoredPoint(id="7", score=5.0, version=1),
models.ScoredPoint(id="2", score=3.0, version=1),
],
]

fused = reciprocal_rank_fusion(responses)

assert fused[0].id == "2"
assert fused[1].id in ["1", "5"]
assert fused[2].id in ["1", "5"]
Loading
Loading