In [2]:
import dotenv
dotenv.load_dotenv(dotenv_path='secrets.env')

from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain_community.vectorstores import OpenSearchVectorSearch
from langchain_core.documents import Document

# 1. Setup Embeddings
embeddings = GoogleGenerativeAIEmbeddings(model="models/gemini-embedding-001")

# 2. Define Dummy Data
docs = [
    Document(
        page_content="LangChain provides abstractions to make working with LLMs easy.",
        metadata={"source": "documentation"}
    ),
    Document(
        page_content="Elasticsearch is a distributed, RESTful search and analytics engine.",
        metadata={"source": "documentation"}
    ),
    Document(
        page_content="Hybrid search combines vector and keyword search for better results.",
        metadata={"source": "concept"}
    ),
]

INDEX_NAME = 'my-index'
PIPELINE_ID = 'my-rrf-pipeline'

# 3. Create the vector store
vector_store = OpenSearchVectorSearch(
    embedding_function=embeddings,
    index_name=INDEX_NAME,
    opensearch_url="https://localhost:9200",
    engine="faiss",
    verify_certs=False,
    ssl_show_warn=False,
    http_auth=("admin", "StrongPassword123!")
)

# 4. Create the RRF Search Pipeline
# We use the specific 'search_pipeline' client namespace
pipeline_body = {
    "description": "Post-processor for hybrid search using RRF",
    "phase_results_processors": [
        {
            "score-ranker-processor": {
                "combination": {
                    "technique": "rrf",
                    "rank_constant": 60
                }
            }
        }
    ]
}

print(f"Creating/Updating pipeline: {PIPELINE_ID}...")
vector_store.client.search_pipeline.put(
    id=PIPELINE_ID,
    body=pipeline_body
)

# 5. Load Documents into OpenSearch
# This embeds the documents using your GoogleGenerativeAIEmbeddings and indexes them
print("Indexing documents...")
ids = vector_store.add_documents(docs)
print(f"Indexed {len(ids)} documents.")

# Optional: Force a refresh so documents are immediately searchable
vector_store.client.indices.refresh(index=INDEX_NAME)


Creating/Updating pipeline: my-rrf-pipeline...
Indexing documents...
Indexed 3 documents.


{'_shards': {'total': 2, 'successful': 1, 'failed': 0}}

In [3]:
from typing import List
from langchain_core.retrievers import BaseRetriever
from langchain_core.callbacks import CallbackManagerForRetrieverRun
from langchain_core.documents import Document

class OpenSearchRRFRetriever(BaseRetriever):
    """
    A custom retriever that uses an OpenSearch Search Pipeline
    to perform Hybrid Search (Vector + Keyword) with RRF.
    """
    vector_store: OpenSearchVectorSearch
    pipeline_id: str
    k: int = 5

    def _get_relevant_documents(
        self, query: str, *, run_manager: CallbackManagerForRetrieverRun
    ) -> List[Document]:

        # 1. Generate the embedding for the user's query
        # We reuse the embedding function defined in the vector store
        query_vector = self.vector_store.embedding_function.embed_query(query)

        # 2. Construct the OpenSearch "Hybrid" query
        # This mirrors the logic required by the 'score-ranker-processor'
        query_body = {
            "_source": {"exclude": ["vector_field"]},
            "size": self.k,
            "query": {
                "hybrid": {
                    "queries": [
                        # Clause 1: Keyword Match
                        {
                            "match": {
                                "text": {
                                    "query": query
                                }
                            }
                        },
                        # Clause 2: Vector k-NN
                        {
                            "knn": {
                                "vector_field": {
                                    "vector": query_vector,
                                    "k": self.k
                                }
                            }
                        }
                    ]
                }
            }
        }

        # 3. Execute the search via the low-level OpenSearch client
        response = self.vector_store.client.search(
            index=self.vector_store.index_name,
            body=query_body,
            params={"search_pipeline": self.pipeline_id}
        )

        # 4. Map results back to LangChain Documents
        results = []
        for hit in response["hits"]["hits"]:
            content = hit["_source"].get("text", "")
            metadata = hit["_source"].get("metadata", {})

            # Capture the RRF score in metadata for debugging/ranking
            metadata["rrf_score"] = hit["_score"]

            results.append(Document(page_content=content, metadata=metadata))

        return results

In [6]:
# Instantiate your custom retriever
retriever = OpenSearchRRFRetriever(
    vector_store=vector_store,
    pipeline_id=PIPELINE_ID,
    k=3
)

# --- Example 1: Direct Usage ---
results = retriever.invoke("What is LangChain?")

print(f"Top Result: {results[0].page_content}")
print(f"RRF Score: {results[0].metadata['rrf_score']}")


# --- Example 2: Use in a RAG Chain (LCEL) ---
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from langchain_google_genai import ChatGoogleGenerativeAI

# Setup a simple RAG chain
llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash")
prompt = ChatPromptTemplate.from_template(
    "Answer the question based only on the context provided:\n\nContext: {context}\n\nQuestion: {question}"
)

def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

rag_chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)

# Run the chain
response = rag_chain.invoke("How does hybrid search work?")
print("\nLLM Response:")
print(response)

Top Result: LangChain provides abstractions to make working with LLMs easy.
RRF Score: 0.032786883

LLM Response:
Hybrid search works by combining vector and keyword search.
