diff --git a/langchain_postgres/v2/async_vectorstore.py b/langchain_postgres/v2/async_vectorstore.py index ddb0e94..8382b3e 100644 --- a/langchain_postgres/v2/async_vectorstore.py +++ b/langchain_postgres/v2/async_vectorstore.py @@ -670,6 +670,7 @@ async def __query_collection( dense_results, sparse_results, **hybrid_search_config.fusion_function_parameters, + distance_strategy=self.distance_strategy, ) return combined_results return dense_results diff --git a/langchain_postgres/v2/hybrid_search_config.py b/langchain_postgres/v2/hybrid_search_config.py index 7f6c277..7946549 100644 --- a/langchain_postgres/v2/hybrid_search_config.py +++ b/langchain_postgres/v2/hybrid_search_config.py @@ -4,6 +4,8 @@ from sqlalchemy import RowMapping +from .indexes import DistanceStrategy + def weighted_sum_ranking( primary_search_results: Sequence[RowMapping], @@ -11,6 +13,7 @@ def weighted_sum_ranking( primary_results_weight: float = 0.5, secondary_results_weight: float = 0.5, fetch_top_k: int = 4, + **kwargs: Any, ) -> Sequence[dict[str, Any]]: """ Ranks documents using a weighted sum of scores from two sources. @@ -69,6 +72,7 @@ def reciprocal_rank_fusion( secondary_search_results: Sequence[RowMapping], rrf_k: float = 60, fetch_top_k: int = 4, + **kwargs: Any, ) -> Sequence[dict[str, Any]]: """ Ranks documents using Reciprocal Rank Fusion (RRF) of scores from two sources. @@ -87,35 +91,45 @@ def reciprocal_rank_fusion( A list of (document_id, rrf_score) tuples, sorted by rrf_score in descending order. """ + distance_strategy = kwargs.get( + "distance_strategy", DistanceStrategy.COSINE_DISTANCE + ) rrf_scores: dict[str, dict[str, Any]] = {} # Process results from primary source - for rank, row in enumerate( - sorted(primary_search_results, key=lambda item: item["distance"], reverse=True) - ): - values = list(row.values()) - doc_id = str(values[0]) - row_values = dict(row) - primary_score = rrf_scores[doc_id]["distance"] if doc_id in rrf_scores else 0.0 - primary_score += 1.0 / (rank + rrf_k) - row_values["distance"] = primary_score - rrf_scores[doc_id] = row_values + # Determine sorting order based on the vector distance strategy. + # For COSINE & EUCLIDEAN(distance), we sort ascending (reverse=False). + # For INNER_PRODUCT (similarity), we sort descending (reverse=True). + is_similarity_metric = distance_strategy == DistanceStrategy.INNER_PRODUCT + sorted_primary = sorted( + primary_search_results, + key=lambda item: item["distance"], + reverse=is_similarity_metric, + ) + + for rank, row in enumerate(sorted_primary): + doc_id = str(list(row.values())[0]) + if doc_id not in rrf_scores: + rrf_scores[doc_id] = dict(row) + rrf_scores[doc_id]["distance"] = 0.0 + # Add the "normalized" rank score + rrf_scores[doc_id]["distance"] += 1.0 / (rank + rrf_k) # Process results from secondary source - for rank, row in enumerate( - sorted( - secondary_search_results, key=lambda item: item["distance"], reverse=True - ) - ): - values = list(row.values()) - doc_id = str(values[0]) - row_values = dict(row) - secondary_score = ( - rrf_scores[doc_id]["distance"] if doc_id in rrf_scores else 0.0 - ) - secondary_score += 1.0 / (rank + rrf_k) - row_values["distance"] = secondary_score - rrf_scores[doc_id] = row_values + # Keyword search relevance is always "higher is better" -> sort descending + sorted_secondary = sorted( + secondary_search_results, + key=lambda item: item["distance"], + reverse=True, + ) + + for rank, row in enumerate(sorted_secondary): + doc_id = str(list(row.values())[0]) + if doc_id not in rrf_scores: + rrf_scores[doc_id] = dict(row) + rrf_scores[doc_id]["distance"] = 0.0 + # Add the rank score from this list to the existing score + rrf_scores[doc_id]["distance"] += 1.0 / (rank + rrf_k) # Sort the results by rrf score in descending order # Sort the results by weighted score in descending order diff --git a/tests/unit_tests/v2/test_hybrid_search_config.py b/tests/unit_tests/v2/test_hybrid_search_config.py index 7ea000e..70a48d0 100644 --- a/tests/unit_tests/v2/test_hybrid_search_config.py +++ b/tests/unit_tests/v2/test_hybrid_search_config.py @@ -1,13 +1,17 @@ +from typing import cast + import pytest +from sqlalchemy import RowMapping from langchain_postgres.v2.hybrid_search_config import ( reciprocal_rank_fusion, weighted_sum_ranking, ) +from langchain_postgres.v2.indexes import DistanceStrategy # Helper to create mock input items that mimic RowMapping for the fusion functions -def get_row(doc_id: str, score: float, content: str = "content") -> dict: +def get_row(doc_id: str, score: float, content: str = "content") -> RowMapping: """ Simulates a RowMapping-like dictionary. The fusion functions expect to extract doc_id as the first value and @@ -17,7 +21,8 @@ def get_row(doc_id: str, score: float, content: str = "content") -> dict: # Python dicts maintain insertion order (Python 3.7+). # This structure ensures list(row.values())[0] is doc_id and # list(row.values())[-1] is score. - return {"id_val": doc_id, "content_field": content, "distance": score} + row_dict = {"id_val": doc_id, "content_field": content, "distance": score} + return cast(RowMapping, row_dict) class TestWeightedSumRanking: @@ -102,30 +107,31 @@ def test_fetch_top_k(self) -> None: class TestReciprocalRankFusion: def test_empty_inputs(self) -> None: + """Tests that the function handles empty inputs gracefully.""" results = reciprocal_rank_fusion([], []) assert results == [] def test_primary_only(self) -> None: - primary = [ - get_row("p1", 0.8), - get_row("p2", 0.6), - ] # p1 rank 0, p2 rank 1 + """Tests RRF with only primary results using default cosine (lower is better).""" + primary = [get_row("p1", 0.8), get_row("p2", 0.6)] rrf_k = 60 - # p1_score = 1 / (0 + 60) - # p2_score = 1 / (1 + 60) + # --- Calculation (Cosine: lower is better) --- + # Sorted order: p2 (0.6) -> rank 0; p1 (0.8) -> rank 1 + # p2_score = 1 / (0 + 60) + # p1_score = 1 / (1 + 60) results = reciprocal_rank_fusion(primary, [], rrf_k=rrf_k) # type: ignore assert len(results) == 2 - assert results[0]["id_val"] == "p1" + assert results[0]["id_val"] == "p2" assert results[0]["distance"] == pytest.approx(1.0 / (0 + rrf_k)) - assert results[1]["id_val"] == "p2" + assert results[1]["id_val"] == "p1" assert results[1]["distance"] == pytest.approx(1.0 / (1 + rrf_k)) def test_secondary_only(self) -> None: - secondary = [ - get_row("s1", 0.9), - get_row("s2", 0.7), - ] # s1 rank 0, s2 rank 1 + """Tests RRF with only secondary results (higher is better).""" + secondary = [get_row("s1", 0.9), get_row("s2", 0.7)] rrf_k = 60 + # --- Calculation (Keyword: higher is better) --- + # Sorted order: s1 (0.9) -> rank 0; s2 (0.7) -> rank 1 results = reciprocal_rank_fusion([], secondary, rrf_k=rrf_k) # type: ignore assert len(results) == 2 assert results[0]["id_val"] == "s1" @@ -134,96 +140,130 @@ def test_secondary_only(self) -> None: assert results[1]["distance"] == pytest.approx(1.0 / (1 + rrf_k)) def test_mixed_results_default_k(self) -> None: - primary = [get_row("common", 0.8), get_row("p_only", 0.7)] - secondary = [get_row("common", 0.9), get_row("s_only", 0.6)] + """Tests fusion with default cosine (lower better) and keyword (higher better).""" + primary = [ + get_row("common", 0.8), + get_row("p_only", 0.7), + ] # Order: p_only, common + secondary = [ + get_row("common", 0.9), + get_row("s_only", 0.6), + ] # Order: common, s_only rrf_k = 60 - # common_score = (1/(0+k))_prim + (1/(0+k))_sec = 2/k - # p_only_score = (1/(1+k))_prim = 1/(k+1) - # s_only_score = (1/(1+k))_sec = 1/(k+1) + # --- Calculation --- + # common: rank 1 in P (1/61) + rank 0 in S (1/60) -> highest score + # p_only: rank 0 in P (1/60) + # s_only: rank 1 in S (1/61) results = reciprocal_rank_fusion(primary, secondary, rrf_k=rrf_k) # type: ignore assert len(results) == 3 assert results[0]["id_val"] == "common" - assert results[0]["distance"] == pytest.approx(2.0 / rrf_k) - # Check the next two elements, their order might vary due to tie in score - next_ids = {results[1]["id_val"], results[2]["id_val"]} - next_scores = {results[1]["distance"], results[2]["distance"]} - assert next_ids == {"p_only", "s_only"} - for score in next_scores: - assert score == pytest.approx(1.0 / (1 + rrf_k)) + assert results[0]["distance"] == pytest.approx(1 / 61 + 1 / 60) + assert results[1]["id_val"] == "p_only" + assert results[1]["distance"] == pytest.approx(1 / 60) + assert results[2]["id_val"] == "s_only" + assert results[2]["distance"] == pytest.approx(1 / 61) def test_fetch_top_k_rrf(self) -> None: + """Tests that fetch_top_k limits results correctly after fusion.""" + # Using cosine distance (lower is better) primary = [get_row(f"p{i}", (10 - i) / 10.0) for i in range(5)] - rrf_k = 1 - results = reciprocal_rank_fusion(primary, [], rrf_k=rrf_k, fetch_top_k=2) # type: ignore + # Scores: [1.0, 0.9, 0.8, 0.7, 0.6] + # Sorted order: p4 (0.6), p3 (0.7), p2 (0.8), ... + results = reciprocal_rank_fusion(primary, [], fetch_top_k=2) # type: ignore assert len(results) == 2 - assert results[0]["id_val"] == "p0" - assert results[0]["distance"] == pytest.approx(1.0 / (0 + rrf_k)) - assert results[1]["id_val"] == "p1" - assert results[1]["distance"] == pytest.approx(1.0 / (1 + rrf_k)) + assert results[0]["id_val"] == "p4" + assert results[1]["id_val"] == "p3" def test_rrf_content_preservation(self) -> None: + """Tests that the data from the first time a document is seen is kept.""" primary = [get_row("doc1", 0.9, content="Primary Content")] secondary = [get_row("doc1", 0.8, content="Secondary Content")] - # RRF processes primary then secondary. If a doc is in both, - # the content from the secondary list will overwrite primary's. - results = reciprocal_rank_fusion(primary, secondary, rrf_k=60) # type: ignore + # RRF processes primary first. When "doc1" is seen, its data is stored. + # It will not be overwritten by the "doc1" from the secondary list. + results = reciprocal_rank_fusion(primary, secondary) # type: ignore assert len(results) == 1 assert results[0]["id_val"] == "doc1" - assert results[0]["content_field"] == "Secondary Content" + assert results[0]["content_field"] == "Primary Content" - # If only in primary - results_prim_only = reciprocal_rank_fusion(primary, [], rrf_k=60) # type: ignore - assert results_prim_only[0]["content_field"] == "Primary Content" + # If only in secondary + results_prim_only = reciprocal_rank_fusion([], secondary, rrf_k=60) # type: ignore + assert results_prim_only[0]["content_field"] == "Secondary Content" def test_reordering_from_inputs_rrf(self) -> None: - """ - Tests that RRF fused ranking can be different from both primary and secondary - input rankings. - Primary Order: A, B, C - Secondary Order: C, B, A - Fused Order: (A, C) tied, then B - """ - primary = [ - get_row("docA", 0.9), - get_row("docB", 0.8), - get_row("docC", 0.1), - ] - secondary = [ - get_row("docC", 0.9), - get_row("docB", 0.5), - get_row("docA", 0.2), - ] - rrf_k = 1.0 # Using 1.0 for k to simplify rank score calculation - # docA_score = 1/(0+1) [P] + 1/(2+1) [S] = 1 + 1/3 = 4/3 - # docB_score = 1/(1+1) [P] + 1/(1+1) [S] = 1/2 + 1/2 = 1 - # docC_score = 1/(2+1) [P] + 1/(0+1) [S] = 1/3 + 1 = 4/3 + """Tests that RRF can produce a ranking different from the inputs.""" + primary = [get_row("docA", 0.9), get_row("docB", 0.8), get_row("docC", 0.1)] + secondary = [get_row("docC", 0.9), get_row("docB", 0.5), get_row("docA", 0.2)] + rrf_k = 1.0 + # --- Calculation (Primary sorted ascending, Secondary descending) --- + # Primary ranks: docC (0), docB (1), docA (2) + # Secondary ranks: docC (0), docB (1), docA (2) + # docC_score = 1/(0+1) [P] + 1/(0+1) [S] = 2.0 + # docB_score = 1/(1+1) [P] + 1/(1+1) [S] = 1.0 + # docA_score = 1/(2+1) [P] + 1/(2+1) [S] = 2/3 results = reciprocal_rank_fusion(primary, secondary, rrf_k=rrf_k) # type: ignore assert len(results) == 3 - assert {results[0]["id_val"], results[1]["id_val"]} == {"docA", "docC"} - assert results[0]["distance"] == pytest.approx(4.0 / 3.0) - assert results[1]["distance"] == pytest.approx(4.0 / 3.0) - assert results[2]["id_val"] == "docB" - assert results[2]["distance"] == pytest.approx(1.0) - - def test_reordering_from_inputs_weighted_sum(self) -> None: - """ - Tests that the fused ranking can be different from both primary and secondary - input rankings. - Primary Order: A (0.9), B (0.7) - Secondary Order: B (0.8), A (0.2) - Fusion (0.5/0.5 weights): - docA_score = (0.9 * 0.5) + (0.2 * 0.5) = 0.45 + 0.10 = 0.55 - docB_score = (0.7 * 0.5) + (0.8 * 0.5) = 0.35 + 0.40 = 0.75 - Expected Fused Order: docB (0.75), docA (0.55) - This is different from Primary (A,B) and Secondary (B,A) in terms of - original score, but the fusion logic changes the effective contribution). - """ - primary = [get_row("docA", 0.9), get_row("docB", 0.7)] - secondary = [get_row("docB", 0.8), get_row("docA", 0.2)] + assert results[0]["id_val"] == "docC" + assert results[0]["distance"] == pytest.approx(2.0) + assert results[1]["id_val"] == "docB" + assert results[1]["distance"] == pytest.approx(1.0) + assert results[2]["id_val"] == "docA" + assert results[2]["distance"] == pytest.approx(2.0 / 3.0) - results = weighted_sum_ranking(primary, secondary) # type: ignore + # -------------------------------------------------------------------------- + ## New Tests for Other Strategies and Edge Cases + + def test_mixed_results_max_inner_product(self) -> None: + """Tests fusion with MAX_INNER_PRODUCT (higher is better) for primary.""" + primary = [get_row("best", 0.9), get_row("worst", 0.1)] # Order: best, worst + secondary = [get_row("best", 20.0), get_row("worst", 5.0)] # Order: best, worst + rrf_k = 10 + # best: rank 0 in P + rank 0 in S -> 1/10 + 1/10 = 0.2 + # worst: rank 1 in P + rank 1 in S -> 1/11 + 1/11 + results = reciprocal_rank_fusion( + primary, # type: ignore + secondary, # type: ignore + rrf_k=rrf_k, + distance_strategy=DistanceStrategy.INNER_PRODUCT, + ) + assert len(results) == 2 + assert results[0]["id_val"] == "best" + assert results[0]["distance"] == pytest.approx(0.2) + assert results[1]["id_val"] == "worst" + assert results[1]["distance"] == pytest.approx(2.0 / 11.0) + + def test_mixed_results_euclidean(self) -> None: + """Tests fusion with EUCLIDEAN (lower is better) for primary.""" + primary = [ + get_row("closer", 10.5), + get_row("farther", 25.5), + ] # Order: closer, farther + secondary = [ + get_row("closer", 100.0), + get_row("farther", 10.0), + ] # Order: closer, farther + rrf_k = 10 + # closer: rank 0 in P + rank 0 in S -> 1/10 + 1/10 = 0.2 + # farther: rank 1 in P + rank 1 in S -> 1/11 + 1/11 + results = reciprocal_rank_fusion( + primary, # type: ignore + secondary, # type: ignore + rrf_k=rrf_k, + distance_strategy=DistanceStrategy.EUCLIDEAN, + ) assert len(results) == 2 - assert results[0]["id_val"] == "docB" - assert results[0]["distance"] == pytest.approx(0.75) - assert results[1]["id_val"] == "docA" - assert results[1]["distance"] == pytest.approx(0.55) + assert results[0]["id_val"] == "closer" + assert results[0]["distance"] == pytest.approx(0.2) + assert results[1]["id_val"] == "farther" + assert results[1]["distance"] == pytest.approx(2.0 / 11.0) + + def test_rrf_with_identical_scores(self) -> None: + """Tests that stable sort is preserved for identical scores.""" + # Python's sorted() is stable. p1 appears before p2 in the list. + primary = [get_row("p1", 0.5), get_row("p2", 0.5)] + rrf_k = 60 + # Expected order (stable sort): p1 (rank 0), p2 (rank 1) + results = reciprocal_rank_fusion(primary, []) # type: ignore + assert results[0]["id_val"] == "p1" + assert results[0]["distance"] == pytest.approx(1 / 60) + assert results[1]["id_val"] == "p2" + assert results[1]["distance"] == pytest.approx(1 / 61)