# Question Answering with LangChain, OpenAI, and MultiQuery Retriever

This interactive workbook demonstrates example of Elasticsearch's [MultiQuery Retriever](https://api.python.langchain.com/en/latest/retrievers/langchain.retrievers.multi_query.MultiQueryRetriever.html) to generate similar queries for a given user input and apply all queries to retrieve a larger set of relevant documents from a vectorstore.

Before we begin, we first split the fictional workplace documents into passages with `langchain` and uses OpenAI to transform these passages into embeddings and then store these into Elasticsearch.

We will then ask a question, generate similar questions using langchain and OpenAI, retrieve relevant passages from the vector store, and use langchain and OpenAI again to provide a summary for the questions.

## Install packages and import modules

In [None]:
!python3 -m pip install -qU jq lark langchain langchain-elasticsearch langchain_openai tiktoken

In [None]:
!pip install langchain_elasticsearch

In [None]:
from langchain_openai.embeddings import OpenAIEmbeddings
from langchain_elasticsearch import ElasticsearchStore
from langchain_openai.llms import OpenAI
from langchain.retrievers.multi_query import MultiQueryRetriever
from getpass import getpass

## Connect to Elasticsearch

ℹ️ We're using an Elastic Cloud deployment of Elasticsearch for this notebook. If you don't have an Elastic Cloud deployment, sign up [here](https://cloud.elastic.co/registration?utm_source=github&utm_content=elasticsearch-labs-notebook) for a free trial. 

We'll use the **Cloud ID** to identify our deployment, because we are using Elastic Cloud deployment. To find the Cloud ID for your deployment, go to https://cloud.elastic.co/deployments and select your deployment.

We will use [ElasticsearchStore](https://api.python.langchain.com/en/latest/vectorstores/langchain.vectorstores.elasticsearch.ElasticsearchStore.html) to connect to our elastic cloud deployment, This would help create and index data easily.  We would also send list of documents that we created in the previous step

In [None]:
# https://www.elastic.co/search-labs/tutorials/install-elasticsearch/elastic-cloud#finding-your-cloud-id
ELASTIC_CLOUD_ID = getpass("Elastic Cloud ID: ")

# https://www.elastic.co/search-labs/tutorials/install-elasticsearch/elastic-cloud#creating-an-api-key
ELASTIC_API_KEY = getpass("Elastic Api Key: ")

# https://platform.openai.com/api-keys
OPENAI_API_KEY = getpass("OpenAI API key: ")

embeddings = OpenAIEmbeddings(openai_api_key=OPENAI_API_KEY)

vectorstore = ElasticsearchStore(
    es_cloud_id=ELASTIC_CLOUD_ID,
    es_api_key=ELASTIC_API_KEY,
    index_name="multi_query_index", #give it a meaningful name
    embedding=embeddings,
)

## Indexing Data into Elasticsearch
Let's download the sample dataset and deserialize the document.

In [5]:
from urllib.request import urlopen
import json

url = "https://raw.githubusercontent.com/elastic/elasticsearch-labs/main/example-apps/chatbot-rag-app/data/data.json"

response = urlopen(url)
data = json.load(response)

with open("temp.json", "w") as json_file:
    json.dump(data, json_file)

In [None]:
# To confirm the expected data content
with open("temp.json", "r") as f:
    data = json.load(f)
print(data[:5])  # Print the first 5 records

In [None]:
count = 0
for i in data:
    display(count, len(i), i)
    count += 1

In [None]:
# Function to check the length of page_content for all documents
def check_document_lengths(docs):
    for i, doc in enumerate(docs):
        content_length = len(doc.page_content) if doc.page_content else 0
        print(f"Document {i+1} - Length: {content_length} characters")

# Call the function to inspect lengths
check_document_lengths(docs)

### Split Documents into Passages

We’ll chunk documents into passages in order to improve the retrieval specificity and to ensure that we can provide multiple passages within the context window of the final question answering prompt.

Here we are chunking documents into 800 token passages with an overlap of 400 tokens.

Here we are using a simple splitter but Langchain offers more advanced splitters to reduce the chance of context being lost.

In [None]:
!pip install jq

In [None]:
from langchain.document_loaders import JSONLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter


def metadata_func(record: dict, metadata: dict) -> dict:
    #Populate the metadata dictionary with keys name, summary, url, category, and updated_at.
    """
    Populate the metadata dictionary with relevant fields from the record.
    This metadata will be added to each document chunk.
    """
    # Extract specific metadata fields from the record
    metadata["name"] = record.get("name")  # Document name, default is "Unknown" if missing
    metadata["summary"] = record.get("summary")  # Document summary
    metadata["url"] = record.get("url")  # Source URL of the document
    metadata["category"] = record.get("category",)  # Category of the document
    metadata["updated_at"] = record.get("updated_at")  # Last update timestamp

    # Return the updated metadata dictionary
    return metadata


# For more loaders https://python.langchain.com/docs/modules/data_connection/document_loaders/
# And 3rd party loaders https://python.langchain.com/docs/modules/data_connection/document_loaders/#third-party-loaders
loader = JSONLoader(
    file_path="temp.json",
    jq_schema=".[]",
    content_key="content",
    metadata_func=metadata_func,
)

text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=800, chunk_overlap=400 #define chunk size and chunk overlap
)
docs = loader.load_and_split(text_splitter=text_splitter)

In [None]:
#Test the metadata_func by printing metadata for a sample record

sample_record = {
    "name": "Document 1",
    "summary": "A brief summary.",
    "url": "http://example.com",
    "category": "Tutorial",
    "updated_at": "2025-01-30"
}
metadata = {}
print(metadata_func(sample_record, metadata))

In [None]:
print(docs[:2])  # Check the first two chunks of the documents

### Bulk Import Passages

Now that we have split each document into the chunk size of 800, we will now index data to elasticsearch using [ElasticsearchStore.from_documents](https://api.python.langchain.com/en/latest/vectorstores/langchain.vectorstores.elasticsearch.ElasticsearchStore.html#langchain.vectorstores.elasticsearch.ElasticsearchStore.from_documents).

We will use Cloud ID, Password and Index name values set in the `Create cloud deployment` step.

In [None]:
# Step 1: Print a Sample of the Documents
# Check the first few documents to identify inconsistencies
def debug_documents(docs):
    for i, doc in enumerate(docs[:5]):
        print(f"Document {i + 1}:\n", doc, "\n")

# Call the function to print the first few documents
debug_documents(docs)

In [None]:
# Inspect the specific document causing the issue (document 6)
print(f"Inspecting problematic document:\n{docs[5]}")

In [None]:
# Validate the metadata and page content of the document
problematic_doc = docs[5]
print("Page Content:", problematic_doc.page_content)
print("Metadata:", problematic_doc.metadata)

In [None]:
# Step 2: Validate the Metadata for Each Document
# Ensure each document has all required fields and valid values
def validate_metadata(docs):
    for i, doc in enumerate(docs):
        if not doc.metadata:
            print(f"Document {i} is missing metadata.")
        if not doc.page_content:
            print(f"Document {i} is missing page_content.")

# Call the function to validate metadata
validate_metadata(docs)

In [None]:
# Index the split documents into Elasticsearch
try:
    documents = vectorstore.from_documents(
    docs, # List of document chunks
    embeddings, # Embedding model
    index_name="multi_query_index", # Ensure this matches the created index name
    es_cloud_id=ELASTIC_CLOUD_ID, # Cloud ID for Elasticsearch
    es_api_key=ELASTIC_API_KEY, # API Key for Elasticsearch
)
except Exception as e:
    print(f"Error during bulk indexing: {e}")

# Initialize the OpenAI language model (LLM)
llm = OpenAI(temperature=0, openai_api_key=OPENAI_API_KEY) # temperature=>Controls randomness (0 = deterministic)

# Set up the MultiQueryRetriever using the LLM and vectorstore
retriever = MultiQueryRetriever.from_llm(vectorstore.as_retriever(), llm) # Elasticsearch retriever with LLM

# Question Answering with MultiQuery Retriever

Now that we have the passages stored in Elasticsearch, we can now ask a question to get the relevant passages.

In [None]:
from langchain.schema.runnable import RunnableParallel, RunnablePassthrough
from langchain.prompts import ChatPromptTemplate, PromptTemplate
from langchain.schema import format_document

import logging

logging.basicConfig()
logging.getLogger("langchain.retrievers.multi_query").setLevel(logging.INFO)

LLM_CONTEXT_PROMPT = ChatPromptTemplate.from_template(
    """You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. Be as verbose and educational in your response as possible. 
    
    context: {context}
    Question: "{question}"
    Answer:
    """
)

LLM_DOCUMENT_PROMPT = PromptTemplate.from_template(
    """
---
SOURCE: {name}
{page_content}
---
"""
)


def _combine_documents(
    docs, document_prompt=LLM_DOCUMENT_PROMPT, document_separator="\n\n"
):
    doc_strings = [format_document(doc, document_prompt) for doc in docs]
    return document_separator.join(doc_strings)


_context = RunnableParallel(
    context=retriever | _combine_documents,
    question=RunnablePassthrough(),
)

chain = _context | LLM_CONTEXT_PROMPT | llm

ans = chain.invoke("what is the nasa sales team?")

print("---- Answer ----")
print(ans)

**Generate at least two new iteratioins of the previous cells - Be creative.** Did you master Multi-
Query Retriever concepts through this lab?

# **Iteration No. 01: Modified Context Prompt**

In [None]:
# Custom context prompt for more structured and formal responses
CUSTOM_CONTEXT_PROMPT = ChatPromptTemplate.from_template(
    """You are an assistant specialized in delivering structured and precise answers. Use the following retrieved context to answer the question. If you don't know the answer, clearly state so. Provide bullet points for key aspects and explain thoroughly.

    context: {context}
    Question: "{question}"
    Answer (structured response):
    """
)

# Use the existing _context pipeline
custom_chain = _context | CUSTOM_CONTEXT_PROMPT | llm

# Test with a new question
ans01 = custom_chain.invoke("What are the key responsibilities of a Senior Software Engineer?")

print("---- Answer ----")
print(ans01)


# **Iteration No. 02: Modified Document Prompt**

In [None]:

# Custom document prompt for retrieved documents
CUSTOM_DOCUMENT_PROMPT = PromptTemplate.from_template(
    """
==== Document Metadata ====
SOURCE: {name}
CATEGORY: {category}
UPDATED: {updated_at}

CONTENT:
{page_content}
===========================
"""
)

# Update the document formatting in the chain
def _combine_custom_documents(
    docs, document_prompt=CUSTOM_DOCUMENT_PROMPT, document_separator="\n\n=== NEXT DOCUMENT ===\n\n"
):
    doc_strings = [format_document(doc, document_prompt) for doc in docs]
    return document_separator.join(doc_strings)

# Update the context pipeline with custom document formatting
custom_context = RunnableParallel(
    context=retriever | _combine_custom_documents,
    question=RunnablePassthrough(),
)

# Create the new chain with the custom context
custom_chain_documents = custom_context | LLM_CONTEXT_PROMPT | llm

# Test with the same or new question
ans02 = custom_chain_documents.invoke("What is the NASA sales team?")

print("---- Custom Document Answer ----")
print(ans02)

# **Iteration No. 03: Dynamic Filters Into the Retriever**

In [None]:
from langchain.schema.runnable import RunnableMap

# Define a filter function for the retriever
def filter_documents_by_metadata(docs, category=None, updated_after=None):
    """
    Filters documents based on metadata conditions.

    Args:
    - docs: List of retrieved documents.
    - category: Filter by document category (e.g., 'sharepoint').
    - updated_after: Filter by update date (e.g., '2025-01-01').

    Returns:
    - Filtered list of documents.
    """
    filtered_docs = []
    for doc in docs:
        doc_category = doc.metadata.get("category", None)
        doc_updated_at = doc.metadata.get("updated_at", None)

        # Apply category filter
        if category and doc_category != category:
            continue

        # Apply updated_after filter (assuming dates are formatted as 'YYYY-MM-DD')
        if updated_after and doc_updated_at:
            try:
                if doc_updated_at < updated_after:
                    continue
            except ValueError:
                pass  # Skip if date format is invalid

        filtered_docs.append(doc)
    return filtered_docs

# Modify the context pipeline to include filtering
filtered_context = RunnableMap(
    {
        "context": retriever | (lambda docs: filter_documents_by_metadata(docs, category="sharepoint", updated_after="2025-01-01")),
        "question": RunnablePassthrough(),
    }
)

# Create a new chain with the filtered context
filtered_chain = filtered_context | LLM_CONTEXT_PROMPT | llm

# Test with a filtered query
ans03 = filtered_chain.invoke("What documents are relevant for NASA projects?")
print("---- Filtered Answer ----")
print(ans03)

# **Iteration No. 04: Combining Dynamic Filters Into the Retriever and Summarization**

In [None]:
from langchain.schema.runnable import RunnableLambda, RunnableMap
from langchain.prompts import ChatPromptTemplate

# Define the filtering function
def filter_documents_by_metadata(docs, category=None, updated_after=None):
    """
    Filters documents based on metadata conditions.

    Args:
    - docs: List of retrieved documents.
    - category: Filter by document category (e.g., 'sharepoint').
    - updated_after: Filter by update date (e.g., '2025-01-01').

    Returns:
    - Filtered list of documents.
    """
    filtered_docs = []
    for doc in docs:
        doc_category = doc.metadata.get("category", None)
        doc_updated_at = doc.metadata.get("updated_at", None)

        # Apply category filter
        if category and doc_category != category:
            continue

        # Apply updated_after filter
        if updated_after and doc_updated_at:
            try:
                if doc_updated_at < updated_after:
                    continue
            except ValueError:
                pass  # Skip invalid date formats

        filtered_docs.append(doc)
    return filtered_docs

# Define the summarization function
def summarize_document(doc):
    """
    Summarizes the content of a single document using the LLM.
    """
    summary_prompt = f"""
    Here is the content of a document:
    ---
    {doc.page_content}
    ---
    Please summarize the main idea of this document in 1-2 sentences.
    """
    return llm(summary_prompt)

# Combine filtering and summarization
def filter_and_summarize(docs, category=None, updated_after=None):
    """
    Filters documents by metadata and generates summaries for each document.
    """
    filtered_docs = filter_documents_by_metadata(docs, category, updated_after)
    summarized_docs = []
    for doc in filtered_docs:
        summary = summarize_document(doc)
        summarized_docs.append({"metadata": doc.metadata, "summary": summary})
    return summarized_docs

# Test the pipeline with filtered and summarized output
query = "What documents are relevant for NASA projects?"
retrieved_docs = retriever.get_relevant_documents(query)

# Filter and summarize the documents
filtered_docs = filter_and_summarize(retrieved_docs, category="sharepoint", updated_after="2025-01-01")

# Print filtered and summarized documents
print("---- Filtered and Summarized Documents ----")
if filtered_docs:
    for doc in filtered_docs:
        print(f"Source: {doc['metadata']['name']}")
        print(f"Summary: {doc['summary']}\n")
else:
    print("No relevant documents found.")

# Generate the final response using the LLM
if filtered_docs:
    context = "\n".join([f"Source: {doc['metadata']['name']}\nSummary: {doc['summary']}" for doc in filtered_docs])
    final_prompt = f"""
    Here is the context from relevant documents:
    {context}
    Question: {query}
    Answer:
    """
    ans04 = llm(final_prompt)
else:
    ans04 = "No relevant documents found."

# Print the final answer
print("---- Final Answer ----")
print(ans04)