# Making Retrievers

`langchain-core.retrievers`의 `BaseRetriever`를 상속하여 여러가지 개념의 커스텀 리트리버를 구현합니다.  
왜 직접 구현하냐?  
LangChain V1.0 이 되면서 모든 구현체들이 langchain-classic 으로 묶음이 되었습니다.  
더이상 지원하지 않겠다는 뜻이고, V2.0 이 언제될진 모르겠으나 그때 지워진다는 의미로 보시면 됩니다.  

## 학습 목표

1. **MultiQueryRetriever**: LLM으로 쿼리를 여러 버전으로 변형하여 검색 재현율 향상
2. **ConvexEnsembleRetriever**: 여러 검색기의 결과를 가중 결합(Convex Combination)
3. **MultiVectorRetriever**: 여러 벡터를 활용해 검색을 다중으로 수행할 수 있는 구조
4. **EnsembleRetriever**: Convex Combination, RRF 를 동시에 쓸 수 있는 EnsembleRetriever 구현체
5. **점수 정규화 전략**: Min-Max, Reciprocal Rank, Softmax 비교

## 참고 자료

- [Multi-Query 개념](https://wikidocs.net/234109)
- [Multi-Vector 개념](https://wikidocs.net/234281)
- [Convex Combination](https://wikidocs.net/263833)

## langchain-classic vs langchain-core

본 구현은 `langchain-classic`을 사용하지 않고 `langchain-core`의 `BaseRetriever`만으로 구현합니다.

| 패키지 | 상태 | 사용 여부 |
|--------|------|-----------|
| langchain-classic | Legacy | 사용 안함 - LangChain v0.X 버전용 |
| langchain-core | 최신 | O 사용 |


In [9]:
import asyncio
import os

import numpy as np
from langchain_core.callbacks import CallbackManagerForRetrieverRun
from langchain_core.documents import Document
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.retrievers import BaseRetriever
from langchain_core.runnables import Runnable
from pydantic import ConfigDict, Field


In [None]:
# ----------------------------------------------------------------------------
# OpenAI / OpenRouter 모델 초기화 헬퍼
# ----------------------------------------------------------------------------
from typing import Literal

from dotenv import load_dotenv
from langchain_openai import ChatOpenAI, OpenAIEmbeddings

load_dotenv()


def _resolve_api_context() -> tuple[str, str]:
    """선택된 API 키와 베이스 URL 정보를 반환합니다."""
    api_key = os.getenv("OPENROUTER_API_KEY")
    if not api_key:
        raise RuntimeError("OPENROUTER_API_KEY가 필요합니다.")

    base_url = os.getenv("OPENROUTER_API_BASE") or "https://openrouter.ai/api/v1"

    return (api_key, base_url)


def create_openrouter_llm(
    model: str = "openai/gpt-4.1-mini",
    temperature: float = 0.3,
    max_tokens: int | None = None,
    **kwargs,
) -> ChatOpenAI:
    """OpenAI 호환 LLM 생성 헬퍼.

    Args:
        model: 모델 이름. OpenRouter에서는 provider/model 형식 사용 가능
               (예: openai/gpt-4o, anthropic/claude-3-sonnet, google/gemini-pro)
        temperature: 생성 온도 (0.0-2.0)
        max_tokens: 최대 생성 토큰 수

    Returns:
        ChatOpenAI: 설정된 LLM 인스턴스
    """
    api_key, base_url = _resolve_api_context()

    openai_kwargs: dict = {
        "model": model,
        "api_key": api_key,
        "temperature": temperature,
        "max_retries": 3,
        "timeout": 60,
        **kwargs,
    }
    if max_tokens is not None:
        openai_kwargs["max_tokens"] = max_tokens
    if base_url:
        openai_kwargs["base_url"] = base_url
    return ChatOpenAI(**openai_kwargs)


def create_embedding_model(
    model: str = "openai/text-embedding-3-small",
    **kwargs,
) -> OpenAIEmbeddings:
    """OpenAI 호환 임베딩 모델 생성.

    Args:
        model: 임베딩 모델 이름. OpenRouter에서는 provider/model 형식 사용 가능
               (예: openai/text-embedding-3-small, openai/text-embedding-3-large)
        **kwargs: 추가 파라미터 (encoding_format 등은 model_kwargs로 전달됨)

    Returns:
        OpenAIEmbeddings: 설정된 임베딩 모델 인스턴스
    """
    api_key, base_url = _resolve_api_context()

    # 전달받은 kwargs에서 model_kwargs로 전달할 파라미터 분리
    # encoding_format, extra_headers 등은 model_kwargs로 전달
    model_kwargs: dict = {}
    embedding_kwargs: dict = {
        "model": model,
        "api_key": api_key,
        "show_progress_bar": True,
        "skip_empty": True,
    }

    # 전달받은 kwargs 처리
    for key, value in kwargs.items():
        # OpenRouter API 특정 파라미터는 model_kwargs로 전달
        if key in ("encoding_format"):
            model_kwargs[key] = value
        else:
            # 나머지는 OpenAIEmbeddings에 직접 전달
            embedding_kwargs[key] = value

    if base_url:
        embedding_kwargs["base_url"] = base_url

    # model_kwargs가 있으면 전달
    if model_kwargs:
        embedding_kwargs["model_kwargs"] = model_kwargs

    return OpenAIEmbeddings(**embedding_kwargs)


def create_embedding_model_direct(
    model: str = "qwen/qwen3-embedding-0.6b",
    encoding_format: Literal["float", "base64"] = "float",
    input_text: str | list[str] = "",
    **kwargs,
) -> list[float] | list[list[float]]:
    """OpenAI SDK를 직접 사용하여 임베딩 생성 (encoding_format 지원).

    LangChain의 OpenAIEmbeddings가 encoding_format을 지원하지 않을 때 사용.

    Args:
        model: 임베딩 모델 이름
        encoding_format: 인코딩 형식 ("float")
        input_text: 임베딩할 텍스트 (문자열 또는 문자열 리스트)
        **kwargs: 추가 파라미터

    Returns:
        임베딩 벡터 리스트 (단일 텍스트) 또는 리스트의 리스트 (여러 텍스트)
    """
    from openai import OpenAI

    api_key, base_url = _resolve_api_context()

    client = OpenAI(
        base_url=base_url,
        api_key=api_key,
    )

    # input_text가 비어있으면 kwargs에서 가져오기
    if not input_text:
        input_text = kwargs.get("input", "")

    response = client.embeddings.create(
        model=model,
        input=input_text,
        encoding_format=encoding_format,
    )

    # 단일 텍스트인 경우 첫 번째 임베딩 반환
    if isinstance(input_text, str):
        return response.data[0].embedding
    else:
        # 여러 텍스트인 경우 모든 임베딩 반환
        return [item.embedding for item in response.data]


def get_available_model_types() -> dict[str, list[str]]:
    """OpenRouter에서 사용 가능한 모델 유형을 반환합니다.

    Returns:
        dict[str, list[str]]: 모델 유형별 모델 목록
    """
    return {
        "chat": [
            "openai/gpt-4.1",
            "openai/gpt-4.1-mini",
            "openai/gpt-5",
            "openai/gpt-5-mini",
            "anthropic/claude-sonnet-4.5",
            "anthropic/claude-haiku-4.5",
            "google/gemini-2.5-flash-preview-09-2025",
            "google/gemini-pro-2.5",
            "x-ai/grok-4-fast",
            "moonshotai/kimi-k2-thinking",
            "liquid/lfm-2.2-6b",
            "z-ai/glm-4.6",
        ],
        "embedding": [
            "openai/text-embedding-3-small",
            "openai/text-embedding-3-large",
            "google/gemini-embedding-001",
            "qwen/qwen3-embedding-0.6b",
            "qwen/qwen3-embedding-4b",
            "qwen/qwen3-embedding-8b",
        ],
    }


embeddings = create_embedding_model()
llm = create_openrouter_llm()

### MultiQueryRetriever

In [4]:
class MultiQueryRetriever(BaseRetriever):
    """
    Multi-Query Retriever: LLM을 사용하여 쿼리를 여러 버전으로 변형하고
    각각 검색한 결과를 결합합니다.

    장점:
    - 단일 쿼리의 한계를 극복
    - 다양한 표현으로 재현율(Recall) 향상
    - 쿼리 의도의 다각적 해석

    단점:
    - LLM 호출 비용
    - 레이턴시 증가
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)

    base_retriever: BaseRetriever = Field(description="기본 검색기")
    llm: Runnable = Field(description="쿼리 생성용 LLM")
    num_queries: int = Field(default=4, description="생성할 쿼리 개수")
    merge_strategy: str = Field(default="rrf", description="결합 전략: rrf, max, sum")
    rrf_k: int = Field(default=60, description="RRF 상수")
    include_original: bool = Field(default=True, description="원본 쿼리 포함 여부")

    def _generate_queries(self, query: str) -> list[str]:
        """LLM으로 쿼리 변형 생성"""
        prompt = ChatPromptTemplate.from_messages(
            [
                (
                    "system",
                    """당신은 검색 쿼리 전문가입니다.
사용자의 질문을 다양한 관점에서 재구성하여 {num} 개의 문서 검색용 질문을 생성하세요.

[요구사항]
- 원본 질문의 핵심 의도를 유지하되, 다른 표현 사용
- 구체적이고 명확한 쿼리
- 각 쿼리는 한 줄로 작성
- 번호나 불릿 없이 쿼리만 작성
- 각 쿼리는 새 줄로 구분

예시:
입력: "연차휴가는 어떻게 계산되나요?"
출력:
연차 유급휴가 계산 방법\n
근로자 연차 일수 산정 기준\n
1년 근무 시 연차 개수\n
연차휴가 발생 조건""",
                ),
                ("user", "{query}"),
            ]
        )

        chain = prompt | self.llm | StrOutputParser()

        try:
            result = chain.invoke({"query": query, "num": self.num_queries})

            # 결과 파싱 (줄바꿈으로 분리)
            queries = [q.strip() for q in result.split("\n") if q.strip()]

            # 개수 제한
            queries = queries[: self.num_queries]

            # 원본 쿼리 포함
            if self.include_original:
                queries = [query] + queries

            return queries

        except Exception as e:
            print(f"쿼리 생성 실패: {e}")
            return [query]  # 실패 시 원본만 반환

    def _merge_results(
        self,
        results_list: list[list[Document]],
    ) -> list[Document]:
        """여러 검색 결과를 결합"""

        if self.merge_strategy == "rrf":
            return self._rrf_merge(results_list)
        elif self.merge_strategy == "max":
            return self._max_merge(results_list)
        elif self.merge_strategy == "sum":
            return self._sum_merge(results_list)
        else:
            # 기본값: RRF
            return self._rrf_merge(results_list)

    def _rrf_merge(self, results_list: list[list[Document]]) -> list[Document]:
        """Reciprocal Rank Fusion"""
        doc_scores = {}
        doc_objects = {}

        for results in results_list:
            for rank, doc in enumerate(results, start=1):
                doc_id = hash(doc.page_content)

                if doc_id not in doc_scores:
                    doc_scores[doc_id] = 0.0
                    doc_objects[doc_id] = doc

                # RRF 점수: 1 / (k + rank)
                doc_scores[doc_id] += 1.0 / (self.rrf_k + rank)

        # 점수 기준 정렬
        sorted_docs = sorted(
            doc_scores.items(),
            key=lambda x: x[1],
            reverse=True,
        )

        # 메타데이터에 점수 추가
        results = []
        for doc_id, score in sorted_docs:
            doc = doc_objects[doc_id]
            new_metadata = {**doc.metadata, "rrf_score": score}
            new_doc = Document(
                page_content=doc.page_content,
                metadata=new_metadata,
            )
            results.append(new_doc)

        return results

    def _max_merge(self, results_list: list[list[Document]]) -> list[Document]:
        """최대 점수 선택 (점수가 메타데이터에 있다고 가정)"""
        doc_scores = {}
        doc_objects = {}

        for results in results_list:
            for doc in results:
                doc_id = hash(doc.page_content)
                score = doc.metadata.get("score", 0.0)

                if doc_id not in doc_scores or score > doc_scores[doc_id]:
                    doc_scores[doc_id] = score
                    doc_objects[doc_id] = doc

        sorted_docs = sorted(
            doc_scores.items(),
            key=lambda x: x[1],
            reverse=True,
        )

        return [doc_objects[doc_id] for doc_id, _ in sorted_docs]

    def _sum_merge(self, results_list: list[list[Document]]) -> list[Document]:
        """점수 합계"""
        doc_scores = {}
        doc_objects = {}

        for results in results_list:
            for doc in results:
                doc_id = hash(doc.page_content)
                score = doc.metadata.get("score", 1.0)

                if doc_id not in doc_scores:
                    doc_scores[doc_id] = 0.0
                    doc_objects[doc_id] = doc

                doc_scores[doc_id] += score

        sorted_docs = sorted(
            doc_scores.items(),
            key=lambda x: x[1],
            reverse=True,
        )

        return [doc_objects[doc_id] for doc_id, _ in sorted_docs]

    def _get_relevant_documents(
        self,
        query: str,
        *,
        run_manager: CallbackManagerForRetrieverRun | None = None,
    ) -> list[Document]:
        """검색 실행"""
        # 1. 쿼리 변형 생성
        queries = self._generate_queries(query)

        print(f"\n생성된 쿼리 {len(queries)}개:")
        for idx, q in enumerate(queries, 1):
            print(f"  {idx}. {q}")

        # 2. 각 쿼리로 검색
        results_list = []
        for q in queries:
            results = self.base_retriever.invoke(q)
            results_list.append(results)

        # 3. 결과 병합
        merged = self._merge_results(results_list)

        return merged

    async def _aget_relevant_documents(
        self,
        query: str,
        *,
        run_manager: CallbackManagerForRetrieverRun | None = None,
    ) -> list[Document]:
        """비동기 검색 실행"""
        # 1. 쿼리 변형 생성
        queries = self._generate_queries(query)

        # 2. 병렬 검색
        tasks = [self.base_retriever.ainvoke(q) for q in queries]
        results_list = await asyncio.gather(*tasks)

        # 3. 결과 병합
        merged = self._merge_results(results_list)

        return merged

In [None]:
try:
    # 베이스 리트리버 (KiwiBM25 사용)
    base_retriever = KiwiBM25Retriever.from_documents(
        documents=sample_docs,
        k=5,
    )

    # MultiQueryRetriever 생성
    multi_query_retriever = MultiQueryRetriever(
        base_retriever=base_retriever,
        llm=llm,
        num_queries=3,
        merge_strategy="rrf",
        include_original=True,
    )

    # 테스트 쿼리
    test_query = "휴가는 얼마나 받을 수 있나요?"

    print(f"\n원본 쿼리: {test_query}")
    print("\n" + "=" * 80)

    # 검색 실행
    results = multi_query_retriever.invoke(test_query)

    print("\n최종 결과 (Top-3):")
    for idx, doc in enumerate(results[:3], 1):
        rrf_score = doc.metadata.get("rrf_score", 0)
        print(f"\n[{idx}] RRF 점수: {rrf_score:.6f}")
        print(f"    내용: {doc.page_content[:80]}...")
        print(f"    출처: {doc.metadata.get('source', 'N/A')}")

except ImportError:
    print("⚠️  OpenAI 패키지가 설치되지 않았습니다.")
except Exception as e:
    print(f"⚠️  데모 실행 실패: {e}")

### ConvexEnsembleRetriever

In [5]:
class ConvexEnsembleRetriever(BaseRetriever):
    """
    Convex Ensemble Retriever: 여러 검색기의 결과를 가중 결합합니다.

    Convex Combination:
    - 최종 점수 = Σ (weight_i × normalized_score_i)
    - 가중치 합 = 1.0 (Convex 조건)

    점수 정규화 전략:
    1. Min-Max: (score - min) / (max - min)
    2. Reciprocal Rank: 1 / rank
    3. Softmax: exp(score) / Σ exp(scores)

    장점:
    - 각 검색기의 강점 활용
    - 가중치 조절로 도메인 최적화
    - 점수 스케일 불일치 해결

    참고:
    - https://wikidocs.net/263833
    - https://github.com/teddylee777/langchain-teddynote
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)

    retrievers: list[BaseRetriever] = Field(description="검색기 리스트")
    weights: list[float] = Field(description="가중치 리스트")
    normalize_strategy: str = Field(
        default="reciprocal_rank",
        description="정규화 전략: min_max, reciprocal_rank, softmax",
    )
    c: int = Field(
        default=60,
        description="Reciprocal Rank 상수",
    )

    def __init__(self, **data):
        super().__init__(**data)

        # 가중치 검증
        if len(self.retrievers) != len(self.weights):
            raise ValueError("검색기와 가중치 개수가 일치해야 합니다.")

        # Convex 조건 검증 (가중치 합 = 1.0)
        weight_sum = sum(self.weights)
        if not (0.99 <= weight_sum <= 1.01):
            print(f"⚠️  가중치 합이 {weight_sum:.3f}입니다. 1.0으로 정규화합니다.")
            self.weights = [w / weight_sum for w in self.weights]

    def _normalize_scores(
        self,
        docs_with_scores: list[tuple[Document, float]],
    ) -> list[tuple[Document, float]]:
        """점수 정규화"""

        if not docs_with_scores:
            return []

        if self.normalize_strategy == "min_max":
            return self._min_max_normalize(docs_with_scores)
        elif self.normalize_strategy == "reciprocal_rank":
            return self._reciprocal_rank_normalize(docs_with_scores)
        elif self.normalize_strategy == "softmax":
            return self._softmax_normalize(docs_with_scores)
        else:
            # 기본값: reciprocal rank
            return self._reciprocal_rank_normalize(docs_with_scores)

    def _min_max_normalize(
        self,
        docs_with_scores: list[tuple[Document, float]],
    ) -> list[tuple[Document, float]]:
        """Min-Max 정규화: [0, 1] 범위로 스케일링"""
        scores = [score for _, score in docs_with_scores]

        min_score = min(scores)
        max_score = max(scores)

        if max_score - min_score < 1e-6:
            # 모든 점수가 같으면 동일 점수 부여
            return [(doc, 1.0) for doc, _ in docs_with_scores]

        normalized = []
        for doc, score in docs_with_scores:
            norm_score = (score - min_score) / (max_score - min_score)
            normalized.append((doc, norm_score))

        return normalized

    def _reciprocal_rank_normalize(
        self,
        docs_with_scores: list[tuple[Document, float]],
    ) -> list[tuple[Document, float]]:
        """Reciprocal Rank 정규화: 순위 기반"""
        # 점수 기준 정렬
        sorted_docs = sorted(
            docs_with_scores,
            key=lambda x: x[1],
            reverse=True,
        )

        normalized = []
        for rank, (doc, _) in enumerate(sorted_docs, start=1):
            # 1 / (c + rank)
            norm_score = 1.0 / (self.c + rank)
            normalized.append((doc, norm_score))

        # 원래 순서로 복원 (hash 기반)
        doc_score_map = {hash(doc.page_content): score for doc, score in normalized}

        result = []
        for doc, _ in docs_with_scores:
            norm_score = doc_score_map.get(hash(doc.page_content), 0.0)
            result.append((doc, norm_score))

        return result

    def _softmax_normalize(
        self,
        docs_with_scores: list[tuple[Document, float]],
    ) -> list[tuple[Document, float]]:
        """Softmax 정규화: 확률 분포로 변환"""
        scores = np.array([score for _, score in docs_with_scores])

        # Softmax
        exp_scores = np.exp(scores - np.max(scores))
        softmax_scores = exp_scores / exp_scores.sum()

        normalized = []
        for (doc, _), norm_score in zip(docs_with_scores, softmax_scores, strict=False):
            normalized.append((doc, float(norm_score)))

        return normalized

    def _weighted_fusion(
        self,
        results_list: list[list[tuple[Document, float]]],
    ) -> list[Document]:
        """가중 결합 (Convex Combination)"""

        # 각 검색기의 결과 정규화
        normalized_results = []
        for results in results_list:
            normalized = self._normalize_scores(results)
            normalized_results.append(normalized)

        # 문서별 가중 점수 계산
        doc_scores = {}
        doc_objects = {}

        for weight, results in zip(self.weights, normalized_results, strict=False):
            for doc, norm_score in results:
                doc_id = hash(doc.page_content)

                if doc_id not in doc_scores:
                    doc_scores[doc_id] = 0.0
                    doc_objects[doc_id] = doc

                # Convex Combination: Σ (weight_i × score_i)
                doc_scores[doc_id] += weight * norm_score

        # 점수 기준 정렬
        sorted_docs = sorted(
            doc_scores.items(),
            key=lambda x: x[1],
            reverse=True,
        )

        # 메타데이터에 점수 추가
        results = []
        for doc_id, score in sorted_docs:
            doc = doc_objects[doc_id]
            new_metadata = {**doc.metadata, "convex_score": score}
            new_doc = Document(
                page_content=doc.page_content,
                metadata=new_metadata,
            )
            results.append(new_doc)

        return results

    def _get_relevant_documents(
        self,
        query: str,
        *,
        run_manager: CallbackManagerForRetrieverRun | None = None,
    ) -> list[Document]:
        """검색 실행"""
        # 각 검색기로 검색
        results_list = []

        for idx, retriever in enumerate(self.retrievers):
            # 검색 실행
            docs = retriever.invoke(query)

            # 점수 추출 (메타데이터 또는 기본값 1.0)
            docs_with_scores = []
            for doc in docs:
                score = doc.metadata.get("score", 1.0)
                docs_with_scores.append((doc, score))

            results_list.append(docs_with_scores)

        # 가중 결합
        merged = self._weighted_fusion(results_list)

        return merged

In [None]:
"""
앙상블 리트리버: 여러 리트리버의 결과를 결합하는 검색기

이 모듈은 두 가지 앙상블 방법을 지원합니다:

1. RRF (Reciprocal Rank Fusion): 순위 기반 점수 융합
   - 각 검색기의 순위를 활용하여 점수 계산
   - 순위만 필요하므로 실제 유사도 점수가 없어도 사용 가능
   - 공식: score = Σ(weight_i / (rank_i + c))

2. CC (Convex Combination): 볼록 조합 - 정규화된 점수의 가중 평균
   - 각 검색기의 실제 유사도 점수를 정규화하여 가중 평균
   - 가중치의 합이 1이어야 함 (볼록 조합의 수학적 정의)
   - 공식: score = Σ(weight_i * normalized_score_i)
"""

from collections import defaultdict
from collections.abc import Callable, Hashable, Iterable, Iterator
from enum import Enum
from itertools import chain
from typing import (
    Any,
    TypeVar,
    cast,
)

from langchain_core.callbacks import (
    AsyncCallbackManagerForRetrieverRun,
    CallbackManagerForRetrieverRun,
)
from langchain_core.documents import Document
from langchain_core.retrievers import BaseRetriever, RetrieverLike
from langchain_core.runnables import RunnableConfig
from langchain_core.runnables.config import ensure_config, patch_config
from langchain_core.runnables.utils import (
    ConfigurableFieldSpec,
    get_unique_config_specs,
)
from pydantic import model_validator

# 타입 변수: 제네릭 타입을 위한 선언
T = TypeVar("T")  # 임의의 타입
H = TypeVar("H", bound=Hashable)  # 해시 가능한 타입 (dict의 키로 사용 가능)


class EnsembleMethod(str, Enum):
    """앙상블 방법을 정의하는 열거형

    RRF: Reciprocal Rank Fusion - 순위 기반 융합
    CC: Convex Combination - 볼록 조합 (점수 기반 융합)
    """

    RRF = "rrf"
    CC = "cc"


def unique_by_key(iterable: Iterable[T], key: Callable[[T], H]) -> Iterator[T]:
    """주어진 키 함수를 기준으로 iterable에서 고유한 요소만 반환

    중복 제거 로직:
    - 각 요소에 대해 key 함수를 적용하여 고유 식별자 생성
    - 이미 본 식별자는 건너뛰고, 처음 보는 식별자만 yield

    예시:
        docs = [doc1, doc2, doc1, doc3]  # doc1이 중복
        unique_docs = unique_by_key(docs, lambda d: d.id)
        # 결과: [doc1, doc2, doc3]

    Args:
        iterable: 필터링할 iterable 객체
        key: 각 요소에서 해시 가능한 키를 추출하는 함수

    Yields:
        키 함수 기준으로 중복이 제거된 고유한 요소들
    """
    seen = set()  # 이미 본 키들을 저장하는 집합
    for e in iterable:
        if (k := key(e)) not in seen:  # Walrus 연산자: 할당과 동시에 조건 검사
            seen.add(k)  # 새로운 키를 집합에 추가
            yield e  # 고유한 요소 반환


class EnsembleRetriever(BaseRetriever):
    """여러 리트리버의 결과를 앙상블하는 리트리버

    두 가지 앙상블 방법을 지원:
    1. RRF (Reciprocal Rank Fusion): 순위 기반 융합
    2. CC (Convex Combination): 볼록 조합 방식의 점수 기반 융합

    사용 예시:
        # BM25와 Dense Retriever를 3:7 비율로 결합 (CC 방법)
        ensemble = EnsembleRetriever(
            retrievers=[bm25_retriever, dense_retriever],
            weights=[0.3, 0.7],  # CC 방법은 합이 1이어야 함
            method=EnsembleMethod.CC
        )

        # RRF 방법으로 동일 가중치 적용
        ensemble = EnsembleRetriever(
            retrievers=[bm25_retriever, dense_retriever],
            weights=[1.0, 1.0],  # RRF는 합이 1일 필요 없음
            method=EnsembleMethod.RRF,
            c=60  # RRF 상수
        )

    Args:
        retrievers: 앙상블할 리트리버들의 리스트
        weights: 각 리트리버에 대응하는 가중치 리스트
                 CC 방법 사용 시 반드시 합이 1이어야 함
        method: 사용할 앙상블 방법 ("rrf" 또는 "cc")
        c: RRF 방법에서 사용할 상수. 기본값 60
           - 작을수록 상위 순위에 더 큰 가중치
           - 클수록 순위 간 점수 차이가 완만해짐
        id_key: 문서 고유성 판단에 사용할 metadata의 키
                지정하지 않으면 page_content를 사용
    """

    retrievers: list[RetrieverLike]  # 앙상블할 리트리버 목록
    weights: list[float]  # 각 리트리버의 가중치
    method: EnsembleMethod = EnsembleMethod.RRF  # 앙상블 방법 (기본: RRF)
    c: int = 60  # RRF 상수
    id_key: str | None = None  # 문서 고유 식별 키

    @model_validator(mode="before")
    @classmethod
    def validate_weights(cls, values: dict[str, Any]) -> Any:
        """가중치 유효성 검증

        1. 가중치가 없으면 균등 가중치 자동 설정
        2. CC 방법 사용 시 가중치 합이 1인지 검증

        볼록 조합(Convex Combination)의 수학적 정의:
        - Σw_i = 1 (가중치의 합이 1)
        - w_i ≥ 0 (모든 가중치가 0 이상)
        이 조건을 만족해야 결과가 입력 점수들의 볼록 포(convex hull) 내에 존재
        """
        weights = values.get("weights")
        method = values.get("method", EnsembleMethod.RRF)

        if not weights:
            # 가중치가 없으면 균등 분배
            n_retrievers = len(values["retrievers"])
            values["weights"] = [1 / n_retrievers] * n_retrievers
        elif method == EnsembleMethod.CC and abs(sum(weights) - 1.0) > 1e-6:
            # CC 방법은 가중치 합이 1이어야 함 (볼록 조합의 정의)
            raise ValueError("Weights must sum to 1.0 for CC method")

        return values

    @property
    def config_specs(self) -> list[ConfigurableFieldSpec]:
        """이 runnable의 설정 가능한 필드 목록을 반환"""
        return get_unique_config_specs(
            spec for retriever in self.retrievers for spec in retriever.config_specs
        )

    def invoke(
        self, input: str, config: RunnableConfig | None = None, **kwargs: Any
    ) -> list[Document]:
        from langchain_core.callbacks import CallbackManager

        config = ensure_config(config)
        callback_manager = CallbackManager.configure(
            config.get("callbacks"),
            None,
            verbose=kwargs.get("verbose", False),
            inheritable_tags=config.get("tags", []),
            local_tags=self.tags,
            inheritable_metadata=config.get("metadata", {}),
            local_metadata=self.metadata,
        )
        run_manager = callback_manager.on_retriever_start(
            None,
            input,
            name=config.get("run_name") or self.get_name(),
            **kwargs,
        )
        try:
            result = self.rank_fusion(input, run_manager=run_manager, config=config)
        except Exception as e:
            run_manager.on_retriever_error(e)
            raise e
        else:
            run_manager.on_retriever_end(
                result,
                **kwargs,
            )
            return result

    async def ainvoke(
        self, input: str, config: RunnableConfig | None = None, **kwargs: Any
    ) -> list[Document]:
        from langchain_core.callbacks import AsyncCallbackManager

        config = ensure_config(config)
        callback_manager = AsyncCallbackManager.configure(
            config.get("callbacks"),
            None,
            verbose=kwargs.get("verbose", False),
            inheritable_tags=config.get("tags", []),
            local_tags=self.tags,
            inheritable_metadata=config.get("metadata", {}),
            local_metadata=self.metadata,
        )
        run_manager = await callback_manager.on_retriever_start(
            None,
            input,
            name=config.get("run_name") or self.get_name(),
            **kwargs,
        )
        try:
            result = await self.arank_fusion(input, run_manager=run_manager, config=config)
        except Exception as e:
            await run_manager.on_retriever_error(e)
            raise e
        else:
            await run_manager.on_retriever_end(
                result,
                **kwargs,
            )
            return result

    def _get_relevant_documents(
        self,
        query: str,
        *,
        run_manager: CallbackManagerForRetrieverRun,
    ) -> list[Document]:
        """주어진 쿼리에 대해 관련 문서들을 검색

        BaseRetriever의 추상 메서드 구현.
        실제 로직은 rank_fusion 메서드에 위임.

        Args:
            query: 검색할 쿼리 문자열

        Returns:
            앙상블되어 재순위화된 문서 리스트
        """

        # 모든 리트리버의 결과를 융합
        fused_documents = self.rank_fusion(query, run_manager)

        return fused_documents

    async def _aget_relevant_documents(
        self,
        query: str,
        *,
        run_manager: AsyncCallbackManagerForRetrieverRun,
    ) -> list[Document]:
        """주어진 쿼리에 대해 관련 문서들을 비동기로 검색

        BaseRetriever의 비동기 추상 메서드 구현.
        실제 로직은 arank_fusion 메서드에 위임.

        Args:
            query: 검색할 쿼리 문자열

        Returns:
            앙상블되어 재순위화된 문서 리스트
        """

        # 모든 리트리버의 결과를 비동기로 융합
        fused_documents = await self.arank_fusion(query, run_manager)

        return fused_documents

    def ensemble_results(self, doc_lists: list[list[Document]]) -> list[Document]:
        """RRF 또는 CC 방법을 사용하여 결과를 앙상블

        여러 검색기의 결과를 하나로 융합하는 진입점.
        설정된 method에 따라 적절한 앙상블 함수로 분기.

        Args:
            doc_lists: 각 리트리버의 검색 결과 리스트들
                      예: [[doc1, doc2, doc3], [doc3, doc1, doc4]]

        Returns:
            최종 점수 기준 내림차순으로 정렬된 문서 리스트

        Raises:
            ValueError: 문서 리스트 개수와 가중치 개수가 다른 경우
        """
        if len(doc_lists) != len(self.weights):
            raise ValueError("Number of rank lists must be equal to the number of weights.")

        # 설정된 앙상블 방법에 따라 분기
        if self.method == EnsembleMethod.RRF:
            return self.reciprocal_rank_fusion(doc_lists)
        elif self.method == EnsembleMethod.CC:
            return self.convex_combination(doc_lists)
        else:
            raise ValueError("Invalid ensemble method")

    def reciprocal_rank_fusion(self, doc_lists: list[list[Document]]) -> list[Document]:
        """여러 순위 리스트에 대해 Reciprocal Rank Fusion 수행

        RRF 개념:
        - 각 검색기에서 문서의 순위만을 사용 (실제 점수 불필요)
        - 상위 순위일수록 높은 점수 부여 (역수 사용)
        - 여러 검색기에서 공통으로 상위에 랭크된 문서가 최종적으로 높은 점수

        RRF 공식:
            score(doc) = Σ weight_i / (rank_i + c)

        예시 (c=60, weights=[1.0, 1.0]):
            Retriever1: [A(1위), B(2위), C(3위)]
            Retriever2: [B(1위), C(2위), A(5위)]

            A의 점수 = 1.0/(1+60) + 1.0/(5+60) = 0.0164 + 0.0154 = 0.0318
            B의 점수 = 1.0/(2+60) + 1.0/(1+60) = 0.0161 + 0.0164 = 0.0325 ← 최고
            C의 점수 = 1.0/(3+60) + 1.0/(2+60) = 0.0159 + 0.0161 = 0.0320

            최종 순위: B > C > A

        장점:
        - 실제 유사도 점수가 없어도 작동 (순위만 필요)
        - 서로 다른 스케일의 점수를 가진 검색기도 공정하게 결합
        - 구현이 단순하고 효과적
        """
        rrf_score: dict[str, float] = defaultdict(float)  # 문서별 RRF 점수 누적

        # 각 리트리버의 결과를 순회하며 점수 계산
        for doc_list, weight in zip(doc_lists, self.weights, strict=True):
            for rank, doc in enumerate(doc_list, start=1):  # 순위는 1부터 시작
                # 문서의 고유 ID 추출
                doc_id = doc.page_content if self.id_key is None else doc.metadata[self.id_key]
                # RRF 공식: weight / (rank + c)
                # rank가 작을수록 (상위일수록) 높은 점수
                rrf_score[doc_id] += weight / (rank + self.c)

        # 모든 문서를 하나의 리스트로 결합
        all_docs = chain.from_iterable(doc_lists)

        # 중복 제거하고 RRF 점수 기준 내림차순 정렬
        sorted_docs = sorted(
            unique_by_key(
                all_docs,
                lambda doc: (
                    doc.page_content if self.id_key is None else doc.metadata[self.id_key]
                ),
            ),
            key=lambda doc: rrf_score[
                doc.page_content if self.id_key is None else doc.metadata[self.id_key]
            ],
            reverse=True,  # 높은 점수가 먼저
        )
        return sorted_docs

    def convex_combination(self, doc_lists: list[list[Document]]) -> list[Document]:
        """여러 순위 리스트에 대해 Convex Combination(볼록 조합) 수행

        ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
        Convex Combination이란?
        ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

        수학적 정의:
            볼록 조합(Convex Combination)은 여러 값들을 가중 평균하는 방법으로,
            가중치의 합이 1이고 모든 가중치가 0 이상인 선형 결합입니다.

            공식: score(doc) = Σ w_i × normalized_score_i
            제약: Σ w_i = 1, w_i ≥ 0

        핵심 아이디어:
            1. 각 검색기의 점수를 정규화 (0~1 범위로)
            2. 정규화된 점수에 가중치를 곱해서 합산
            3. 가중치 합이 1이므로 최종 점수도 0~1 범위 유지

        ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
        구체적인 예시
        ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

        상황: BM25와 Dense Retriever를 3:7 비율로 결합

        BM25 결과 (weight=0.3):
            문서A: 15.3점
            문서B: 12.7점
            문서C: 8.5점
            max_score = 15.3

        Dense Retriever 결과 (weight=0.7):
            문서A: 0.82점
            문서B: 0.91점
            문서C: 0.75점
            max_score = 0.91

        Step 1: 정규화 (각 검색기 내에서 최댓값으로 나눔)
            BM25 정규화:
                문서A: 15.3 / 15.3 = 1.00
                문서B: 12.7 / 15.3 = 0.83
                문서C: 8.5 / 15.3 = 0.56

            Dense 정규화:
                문서A: 0.82 / 0.91 = 0.90
                문서B: 0.91 / 0.91 = 1.00
                문서C: 0.75 / 0.91 = 0.82

        Step 2: 가중 조합 (정규화된 점수 × 가중치의 합)
            문서A: 0.3 × 1.00 + 0.7 × 0.90 = 0.30 + 0.63 = 0.93
            문서B: 0.3 × 0.83 + 0.7 × 1.00 = 0.25 + 0.70 = 0.95 ← 최고!
            문서C: 0.3 × 0.56 + 0.7 × 0.82 = 0.17 + 0.57 = 0.74

        Step 3: 최종 순위
            B (0.95) > A (0.93) > C (0.74)

        ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
        RRF vs CC 비교
        ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

        RRF (Reciprocal Rank Fusion):
            ✓ 순위만 필요 (점수 불필요)
            ✓ 스케일이 다른 점수 체계도 공정하게 처리
            ✓ 구현 간단
            ✗ 실제 점수 정보 활용 안 함

        CC (Convex Combination):
            ✓ 실제 유사도 점수 활용
            ✓ 검색기별 신뢰도 반영 가능 (가중치)
            ✓ 수학적으로 명확한 의미 (볼록 집합 내 점)
            ✗ 점수가 필수 (metadata에 'score' 필요)
            ✗ 가중치 합이 1이어야 함 (제약)

        선택 가이드:
            - 검색기 점수가 신뢰할 만하고, 각 검색기의 중요도를 명확히
              조절하고 싶다면 → CC 사용
            - 검색기 점수가 없거나, 단순히 순위만 중요하다면 → RRF 사용

        ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
        """
        cc_scores: dict[str, float] = defaultdict(float)  # 문서별 CC 점수 누적

        # 각 리트리버의 결과를 순회하며 점수 계산
        for doc_list, weight in zip(doc_lists, self.weights, strict=True):
            # Step 1: 정규화를 위한 최댓값 찾기
            # 해당 검색기 결과 중 가장 높은 점수를 찾음 (0 방지를 위해 or 1)
            max_score = max(doc.metadata.get("score", 0) for doc in doc_list) or 1

            # Step 2: 각 문서의 점수를 정규화하고 가중치 적용
            for doc in doc_list:
                # 문서의 고유 ID 추출
                doc_id = doc.page_content if self.id_key is None else doc.metadata[self.id_key]

                # 정규화: 원본 점수 / 최댓값 → [0, 1] 범위로 변환
                # 최고 점수 문서는 1.0, 나머지는 비율에 따라 0~1 사이 값
                normalized_score = doc.metadata.get("score", 0) / max_score

                # 볼록 조합 공식 적용: weight × normalized_score
                # 여러 검색기에서 같은 문서가 나오면 점수가 누적됨
                cc_scores[doc_id] += weight * normalized_score

        # Step 3: 중복 제거 - 모든 검색기 결과에서 고유 문서만 추출
        all_docs = list(
            unique_by_key(
                chain.from_iterable(doc_lists),
                lambda doc: (
                    doc.page_content if self.id_key is None else doc.metadata[self.id_key]
                ),
            )
        )

        # Step 4: CC 점수 기준 내림차순 정렬
        sorted_docs = sorted(
            all_docs,
            key=lambda doc: cc_scores[
                doc.page_content if self.id_key is None else doc.metadata[self.id_key]
            ],
            reverse=True,  # 높은 점수가 먼저
        )

        return sorted_docs

    def rank_fusion(
        self,
        query: str,
        run_manager: CallbackManagerForRetrieverRun,
        *,
        config: RunnableConfig | None = None,
    ) -> list[Document]:
        """모든 리트리버의 결과를 수집하고 융합

        전체 프로세스:
        1. 각 리트리버에 쿼리를 전달하여 결과 수집
        2. 결과가 Document 타입인지 확인 및 변환
        3. 설정된 앙상블 방법(RRF/CC)으로 결과 융합

        Args:
            query: 검색 쿼리
            run_manager: 콜백 관리자
            config: 런타임 설정

        Returns:
            앙상블된 최종 문서 리스트
        """
        # Step 1: 모든 리트리버의 결과 수집
        # 각 리트리버를 순회하며 동일한 쿼리로 검색 수행
        retriever_docs = [
            retriever.invoke(
                query,
                patch_config(config, callbacks=run_manager.get_child(tag=f"retriever_{i + 1}")),
            )
            for i, retriever in enumerate(self.retrievers)
        ]

        # Step 2: 결과가 모두 Document 객체인지 확인
        # 일부 리트리버는 문자열을 반환할 수 있으므로 Document로 변환
        for i in range(len(retriever_docs)):
            retriever_docs[i] = [
                Document(page_content=cast(str, doc)) if isinstance(doc, str) else doc
                for doc in retriever_docs[i]
            ]

        # Step 3: 앙상블 방법 적용 (RRF 또는 CC)
        fused_documents = self.ensemble_results(retriever_docs)

        return fused_documents

    async def arank_fusion(
        self,
        query: str,
        run_manager: AsyncCallbackManagerForRetrieverRun,
        *,
        config: RunnableConfig | None = None,
    ) -> list[Document]:
        """모든 리트리버의 결과를 비동기로 수집하고 융합

        rank_fusion의 비동기 버전.
        여러 리트리버를 병렬로 실행하여 성능 향상.

        전체 프로세스:
        1. asyncio.gather로 모든 리트리버를 병렬 실행
        2. 결과가 Document 타입인지 확인 및 변환
        3. 설정된 앙상블 방법(RRF/CC)으로 결과 융합

        Args:
            query: 검색 쿼리
            run_manager: 비동기 콜백 관리자
            config: 런타임 설정

        Returns:
            앙상블된 최종 문서 리스트
        """
        # Step 1: 모든 리트리버를 병렬로 실행 (asyncio.gather)
        # 순차 실행보다 훨씬 빠름 (특히 네트워크 I/O가 있는 경우)
        retriever_docs = await asyncio.gather(
            *[
                retriever.ainvoke(
                    query,
                    patch_config(config, callbacks=run_manager.get_child(tag=f"retriever_{i + 1}")),
                )
                for i, retriever in enumerate(self.retrievers)
            ]
        )

        # Step 2: 결과가 모두 Document 객체인지 확인
        # 일부 리트리버는 문자열을 반환할 수 있으므로 Document로 변환
        for i in range(len(retriever_docs)):
            retriever_docs[i] = [
                Document(page_content=doc) if not isinstance(doc, Document) else doc  # type: ignore[arg-type]
                for doc in retriever_docs[i]
            ]

        # Step 3: 앙상블 방법 적용 (RRF 또는 CC)
        # 이 부분은 동기 함수(하지만 CPU Bound 부하량이 작아서 속도는 크게 영향을 끼치지 않음)
        # 필요하면 asyncio.to_thread로 넣어서 asyncio eventloop 깨지지 않도록 가능.
        fused_documents = self.ensemble_results(retriever_docs)

        return fused_documents

### Multi-Vector Search

Multi-Vector Search는 문서의 여러 측면(제목, 본문, 요약 등)을 별도 벡터로 인덱싱하여 검색 정확도를 높이는 전략입니다.

## 학습 목표

1. **Named Vectors 활용**: Qdrant의 Named Vectors 기능으로 다중 벡터 저장
  > Embedding 모델은 Context Size 에 따라서 적절하게 '큰 것' 부터 '작은 것'까지 배치하도록 합니다.
2. **다중 표현 전략**: `title, summary, body, parent_docs_summary` 등을 각각 위 규칙에 맞게 Dense Embedding
3. **가중 검색**: 벡터별로 가중치를 달리하여 검색
4. **파이프라인**: LLM 요약 생성 + 임베딩 + 인덱싱

## Multi-Vector vs Single-Vector

| 항목 | Single-Vector | Multi-Vector |
|------|---------------|--------------|
| 인덱스 크기 | 작음 | 2~3배 |
| 검색 정확도 | 보통 | 높음 |
| 레이턴시 | 빠름 | 약간 느림 |
| 구현 복잡도 | 낮음 | 중간 |

## 참고 자료

- [Multi-Vector 개념](https://wikidocs.net/234281)
- [Qdrant Named Vectors](https://qdrant.tech/documentation/concepts/vectors/#named-vectors)


In [None]:
# ============================================================================
# 1. Multi-Vector 인덱싱 파이프라인
# ============================================================================


def generate_summary(text: str, llm: Runnable, max_length: int = 100) -> str:
    """
    LLM을 사용하여 텍스트 요약 생성

    Args:
        text: 원본 텍스트
        llm: LLM 모델
        max_length: 최대 요약 길이

    Returns:
        요약 텍스트
    """
    prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                f"다음 텍스트를 {max_length}자 이내로 요약하세요. 핵심 내용만 간결하게 작성하세요.",
            ),
            ("user", "{text}"),
        ]
    )

    chain = prompt | llm | StrOutputParser()

    try:
        summary = chain.invoke({"text": text})
        return summary.strip()
    except Exception as e:
        print(f" 요약 생성 실패: {e}")
        # 실패 시 앞 부분 잘라서 반환
        return text[:max_length]


def create_multi_vector_collection(
    collection_name: str,
    dimension: int = 1536,
    on_disk: bool = False,
):
    """
    Multi-Vector용 Qdrant 컬렉션 생성

    Named Vectors:
    - dense_title: 제목 임베딩
    - dense_summary: 요약 임베딩
    - dense_body: 본문 임베딩
    """
    from qdrant_client.models import Distance, VectorParams

    client = QdrantClient(url=os.getenv("QDRANT_URL", "http://localhost:6333"))

    # 기존 컬렉션 삭제 (테스트용)
    try:
        client.delete_collection(collection_name)
        print(f"기존 컬렉션 '{collection_name}' 삭제")
    except Exception as e:
        print(f"기존 컬렉션 '{collection_name}' 삭제 실패: {e}")

    # Named Vectors 정의
    vectors_config = {
        "dense_title": VectorParams(
            size=dimension,
            distance=Distance.COSINE,
            on_disk=on_disk,
        ),
        "dense_summary": VectorParams(
            size=dimension,
            distance=Distance.COSINE,
            on_disk=on_disk,
        ),
        "dense_body": VectorParams(
            size=dimension,
            distance=Distance.COSINE,
            on_disk=on_disk,
        ),
    }

    # 컬렉션 생성
    client.create_collection(
        collection_name=collection_name,
        vectors_config=vectors_config,
    )

    print(f"Multi-Vector 컬렉션 '{collection_name}' 생성 완료")
    print(f"   - Named Vectors: {list(vectors_config.keys())}")

    return client


def index_multi_vector_documents(
    documents: list[Document],
    collection_name: str,
    embeddings: OpenAIEmbeddings,
    llm: Runnable | None = None,
    generate_summaries: bool = True,
):
    """
    문서를 Multi-Vector로 인덱싱

    Args:
        documents: 문서 리스트
        collection_name: 컬렉션 이름
        embeddings: 임베딩 모델
        llm: 요약 생성용 LLM (선택)
        generate_summaries: 요약 자동 생성 여부
    """
    client = QdrantClient(url=os.getenv("QDRANT_URL", "http://localhost:6333"))

    points = []
    max_length = 500

    for idx, doc in enumerate(documents):
        # 제목 추출 (메타데이터 또는 첫 문장)
        title = doc.metadata.get("title", doc.page_content.split("\n")[0][:100])

        # 요약 생성
        if generate_summaries and llm:
            summary = generate_summary(doc.page_content, llm, max_length=max_length)
        else:
            # 요약이 없으면 앞 150자 사용
            summary = doc.page_content[:max_length]

        # 본문
        body = doc.page_content

        # 각각 임베딩
        title_vector = embeddings.embed_query(title)
        summary_vector = embeddings.embed_query(summary)
        body_vector = embeddings.embed_query(body)

        # Named Vectors로 저장
        point = PointStruct(
            id=idx,
            vector={
                "dense_title": title_vector,
                "dense_summary": summary_vector,
                "dense_body": body_vector,
            },
            payload={
                "page_content": body,
                "metadata": doc.metadata,
                "title": title,
                "summary": summary,
            },
        )

        points.append(point)

    # 배치 업로드
    client.upsert(
        collection_name=collection_name,
        points=points,
    )

    print(f"{len(points)}개 문서 인덱싱 완료")
    print("   - Title, Summary, Body 각각 임베딩")


In [None]:
# ============================================================================
# Multi-Vector 검색 전략
# ============================================================================


def multi_vector_search(
    query: str,
    collection_name: str,
    embeddings: OpenAIEmbeddings,
    vector_weights: dict | None = None,
    k: int = 5,
) -> list[tuple[Document, float]]:
    """
    Multi-Vector 검색: 여러 Named Vector를 가중 결합

    Args:
        query: 검색 쿼리
        collection_name: 컬렉션 이름
        embeddings: 임베딩 모델
        vector_weights: 벡터별 가중치 {"dense_title": 0.3, "dense_summary": 0.3, "dense_body": 0.4}
        k: 반환할 문서 개수

    Returns:
        (Document, score) 리스트
    """
    client = QdrantClient(url=os.getenv("QDRANT_URL", "http://localhost:6333"))

    # 기본 가중치
    if vector_weights is None:
        vector_weights = {
            "dense_title": 0.3,
            "dense_summary": 0.3,
            "dense_body": 0.4,
        }

    # 쿼리 임베딩
    query_vector = embeddings.embed_query(query)

    # 각 Named Vector로 검색
    results_dict = {}

    for vector_name, weight in vector_weights.items():
        search_result = client.search(
            collection_name=collection_name,
            query_vector=(vector_name, query_vector),
            limit=k * 2,  # 여유있게 가져오기
            with_payload=True,
        )

        # 점수 저장
        for point in search_result:
            doc_id = point.id

            if doc_id not in results_dict:
                results_dict[doc_id] = {
                    "payload": point.payload,
                    "score": 0.0,
                }

            # 가중 점수 누적
            results_dict[doc_id]["score"] += weight * point.score

    # 점수 기준 정렬
    sorted_results = sorted(
        results_dict.items(),
        key=lambda x: x[1]["score"],
        reverse=True,
    )[:k]

    # Document 객체로 변환
    documents = []
    for _, data in sorted_results:
        doc = Document(
            page_content=data["payload"]["page_content"],
            metadata=data["payload"].get("metadata", {}),
        )
        documents.append((doc, data["score"]))

    return documents


print("""
가중치 전략:
1. Title-Boost (제목 중심)
   - dense_title: 0.5
   - dense_summary: 0.3
   - dense_body: 0.2
   사용: 제목이 중요한 논문, 법률 문서

2. Balanced (균형)
   - dense_title: 0.3
   - dense_summary: 0.3
   - dense_body: 0.4
   사용: 일반 문서 검색

3. Content-Focused (본문 중심)
   - dense_title: 0.1
   - dense_summary: 0.2
   - dense_body: 0.7
   사용: 긴 문서, 상세 내용 검색
""")