-
Notifications
You must be signed in to change notification settings - Fork 108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: hybrid search with fastembed #553
Conversation
✅ Deploy Preview for poetic-froyo-8baba7 ready!
To edit notification comments on pull requests, go to your Netlify site configuration. |
qdrant_client/qdrant_fastembed.py
Outdated
@@ -89,6 +105,36 @@ def set_model( | |||
) | |||
self._embedding_model_name = embedding_model_name | |||
|
|||
def set_sparse_model( | |||
self, | |||
model_name: Optional[str], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
set_model
has embedding_model_name
, however set_sparse_model
has model_name
We have no tests for query_batch, I'll add them |
* WIP: hybrid search with fastembed * hybrid queries with fastembed * test for hybrid * fix typo * new: extend hybrid search tests, fix mypy, small refactoring (#554) * refactor: align model name parameters in setters, update tests * fix: fix async * fix: add a good test, fix sparse vectors in query batch * refactoring: reduce branching, refactor fastembed tests --------- Co-authored-by: George <george.panchuk@qdrant.tech>
for response in responses: | ||
for i, scored_point in enumerate(response): | ||
if scored_point.id in scores: | ||
scores[scored_point.id] += compute_score(i) | ||
else: | ||
point_pile[scored_point.id] = scored_point | ||
scores[scored_point.id] = compute_score(i) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we limit the responses that get processed inside the loop?
Since we know that we've only returned 10, perhaps responses can be some multiple of that?
Here is the scenario which I'm trying to avoid: Responses are quite large, let's say a thousand, and then we end up running the loop for those. Alternatively we can implement this in a Numpy matrix operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you elaborate on limiting the responses?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might try rewriting it with numpy, but I am not sure whether it actually worth it, we would still need to iterate over all the responses to map the ids to the scores and then we will need to sum up the scores, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we limit the number of responses we evaluate? For loops can be large and slow for when someone passes too large a list.
Numpy implementation saves some of this compute fwiw: qdrant/fastembed#165
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the implementation in qdrant/fastembed#165
involves 3 times more loops
1. all_items = set(item for rank_list in rank_lists for item, _ in rank_list)
2. item_to_index = {item: idx for idx, item in enumerate(all_items)}
3. for list_idx, rank_list in enumerate(rank_lists):
for item, rank in rank_list:
rank_matrix[item_to_index[item], list_idx] = rank
So the question is whether it is actually more efficient? (I don't argue that it might be more efficient due to the arithmetic operations, I am just a little bit doubtful)
Can we limit the number of responses we evaluate? For loops can be large and slow for when someone passes too large a list.
I don't really see how to do it at the moment, we can't sort responses with <O(n)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
numpy version is actually slower
from typing import List, Dict
import time
import numpy as np
from qdrant_client import models
def rrf(rank_lists, alpha=2, default_rank=1000):
"""
Optimized Reciprocal Rank Fusion (RRF) using NumPy for large rank lists.
:param rank_lists: A list of rank lists. Each rank list should be a list of (item, rank) tuples.
:param alpha: The parameter alpha used in the RRF formula. Default is 60.
:param default_rank: The default rank assigned to items not present in a rank list. Default is 1000.
:return: Sorted list of items based on their RRF scores.
"""
# Consolidate all unique items from all rank lists
all_items = set(item for rank_list in rank_lists for item, _ in rank_list)
# Create a mapping of items to indices
item_to_index = {item: idx for idx, item in enumerate(all_items)}
# Initialize a matrix to hold the ranks, filled with the default rank
rank_matrix = np.full((len(all_items), len(rank_lists)), default_rank)
# Fill in the actual ranks from the rank lists
for list_idx, rank_list in enumerate(rank_lists):
for item, rank in rank_list:
rank_matrix[item_to_index[item], list_idx] = rank
# Calculate RRF scores using NumPy operations
rrf_scores = np.sum(1.0 / (alpha + rank_matrix), axis=1)
# Sort items based on RRF scores
sorted_indices = np.argsort(-rrf_scores) # Negative for descending order
# Retrieve sorted items
sorted_items = [(list(item_to_index.keys())[idx], rrf_scores[idx]) for idx in sorted_indices]
return sorted_items
def rank_list(search_result: List[models.ScoredPoint]):
return [(point.id, rank + 1) for rank, point in enumerate(search_result)]
def reciprocal_rank_fusion(
responses: List[List[models.ScoredPoint]], limit: int = 1000
) -> List[models.ScoredPoint]:
def compute_score(pos: int) -> float:
ranking_constant = (
2 # the constant mitigates the impact of high rankings by outlier systems
)
return 1 / (ranking_constant + pos)
scores: Dict[models.ExtendedPointId, float] = {}
point_pile = {}
for response in responses:
for i, scored_point in enumerate(response):
if scored_point.id in scores:
scores[scored_point.id] += compute_score(i)
else:
point_pile[scored_point.id] = scored_point
scores[scored_point.id] = compute_score(i)
sorted_scores = sorted(scores.items(), key=lambda item: item[1], reverse=True)
return sorted_scores[:limit]
# sorted_points = [] # commented out to make the output the same as the numpy version
# for point_id, score in sorted_scores[:limit]:
# point = point_pile[point_id]
# point.score = score
# sorted_points.append(point)
# return sorted_points
if __name__ == '__main__':
import random
num_points = 1000
ids = list(range(num_points))
a = [
models.ScoredPoint(
id=ids[i],
score=random.random(),
version=1,
) for i in range(num_points)
]
random.shuffle(ids)
b = [
models.ScoredPoint(
id=ids[i],
score=random.random(),
version=1,
) for i in range(num_points)
]
random.shuffle(ids)
c = [
models.ScoredPoint(
id=ids[i],
score=random.random(),
version=1,
) for i in range(num_points)
]
start = time.perf_counter()
rrf([rank_list(a), rank_list(b), rank_list(c)])
print('numpy, scored point conversion included', time.perf_counter() - start)
l_a = rank_list(a)
l_b = rank_list(b)
l_c = rank_list(c)
start = time.perf_counter()
rrf([l_a, l_b, l_c])
print('numpy, scored point conversion excluded', time.perf_counter() - start)
start = time.perf_counter()
reciprocal_rank_fusion([a, b, c], limit=len(a))
print('lists', time.perf_counter() - start)
numpy, scored point conversion included 0.004260916990460828
numpy, scored point conversion excluded 0.0036911249917466193
lists 0.0007186669972725213
All Submissions:
dev
branch. Did you create your branch fromdev
?New Feature Submissions:
pre-commit
withpip3 install pre-commit
and set up hooks withpre-commit install
?Changes to Core Features: