In [9]:
example_topic = "Impact of millon-plus token context window language models on RAG"

In [10]:
from dotenv import load_dotenv
load_dotenv()

True

In [11]:
from langchain_google_genai import ChatGoogleGenerativeAI
from pydantic import BaseModel, Field
from typing import List, Optional
from langchain_core.prompts import ChatPromptTemplate

fast_llm = ChatGoogleGenerativeAI(
    model="gemini-1.5-flash",
    temperature=0.0,)
long_context_llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash",
    temperature=0.0)

gen_related_topics_prompt = ChatPromptTemplate.from_template(
    """I'm writing a Wikipedia page for a topic mentionde below. Please identify and recomend some Wikipedia pagens on clasely related topics
    
    Please list the as many subject and urls as you can
    
    Topic of interest: {topic}"""
)

class RelatedSubject(BaseModel):
    topics: List[str] = Field(
    description="Comprehensive list of related subjects as background research",
    )

expand_chain = gen_related_topics_prompt | fast_llm.with_structured_output(RelatedSubject)

related_subjects = await expand_chain.ainvoke({'topic': example_topic})
related_subjects

RelatedSubject(topics=['Large language models', 'Retrieval augmented generation', 'Context window', 'Natural language processing', 'Artificial intelligence', 'Machine learning', 'Deep learning', 'Transformer networks', 'Wikipedia'])

In [12]:
from langchain_community.retrievers import WikipediaRetriever
from langchain_core.runnables import RunnableLambda, chain as as_runnable

wikipedia_retriever = WikipediaRetriever(load_all_available_meta=True, top_k_results=1)

def format_doc(doc, max_length=1000):
    related = "- ".join(doc.metadata['categories'])
    return f"### {doc.metadata['title']}\n\nSummary: {doc.page_content}\nRelated: {related}\n\n"[:max_length]

def format_docs(docs):
    return "\n\n".join([format_doc(doc) for doc in docs])


class Editor(BaseModel):
    affiliation: str = Field(
        description="Primary affiliation of the editor"
    )
    name: str = Field(
        description="Name of the editor",
    )
    role: str = Field(
        description="Role of the editor in the context of the topic."
    )
    description: str = Field(
        description="Description of the editor's focus, concers, and motives`"
    )
    @property
    def persona(self) -> str:
        return f"Name: {self.name}\nRole: {self.role}\nAffiliation: {self.affiliation}\nDescription: {self.description}"
    
class Perspectives(BaseModel):
    editors: List[Editor] = Field(
        description="List of editors with their perspectives on the topic"
    )

gen_perspectives_prompt = ChatPromptTemplate.from_messages(
    [
        ("system",
         """You need to select a diverse(and distinc) group of Wikipedia editors who will work together to create a comprehensive article on the topic.
         You can use other Wikipedia pages of related topics for inspiration. For each editor, add description of what they will focus on.
         
         Wiki page outlines of related topics for inspiration: 
         {examples}"""),
         ("user","Topic of interest: {topic}"),
    ]
)

gen_perspectives_chain = gen_perspectives_prompt | ChatGoogleGenerativeAI(model='gemini-2.0-flash').with_structured_output(Perspectives)

@as_runnable
async def survey_subjects(topics: str):
    reletaed_subjects = await expand_chain.ainvoke({'topic': topics})
    retrieved_docs = await wikipedia_retriever.abatch(reletaed_subjects.topics, return_exceptions=True)
    all_docs = []
    for docs in retrieved_docs:
        if isinstance(docs, BaseException):
            continue
        all_docs.extend(docs)
    formatted = format_docs(all_docs)
    return await gen_perspectives_chain.ainvoke({
        "examples": formatted,
        "topic": topics
    })

perspectives = await survey_subjects.ainvoke(example_topic)
perspectives.dict()

C:\Users\Admin\AppData\Local\Temp\ipykernel_20164\833876501.py:66: PydanticDeprecatedSince20: The `dict` method is deprecated; use `model_dump` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.11/migration/
  perspectives.dict()


{'editors': [{'affiliation': 'Independent AI Researcher',
   'name': 'Dr. Anya Sharma',
   'role': 'Technical Accuracy Reviewer',
   'description': 'Focuses on the practical implications of large context windows for RAG, particularly concerning information retrieval accuracy and the reduction of hallucination.'},
  {'affiliation': 'Enterprise Solutions Architect',
   'name': 'Kenji Tanaka',
   'role': 'Scalability and Cost Analyst',
   'description': 'Concerned with the scalability and cost-effectiveness of using million-plus token context windows in enterprise RAG applications. Focuses on optimizing infrastructure and reducing latency.'},
  {'affiliation': 'NLP Ethicist',
   'name': 'Fatima Hassan',
   'role': 'Ethics and Bias Auditor',
   'description': 'Examines the ethical considerations of using large context windows, including potential biases amplified by the increased data processing and the impact on user privacy.'},
  {'affiliation': 'Academic Linguist',
   'name': 'Professor

In [33]:
from langgraph.graph import StateGraph, END
from typing_extensions import TypedDict
from langchain_core.messages import AnyMessage, AIMessage, BaseMessage, HumanMessage, ToolMessage
from typing import Annotated, Sequence
from langchain_core.prompts import MessagesPlaceholder

def add_messages(left, right):
    if not isinstance(left, list):
        left = [left]
    if not isinstance(right, list):
        right = [right]
    return left+right

def update_references(references, new_references):
    if not references:
        references = {}
    references.update(new_references)
    return references

def update_editor(editor, new_editor):
    # Can only set at the outset
    if not editor:
        return new_editor
    return editor


class InterviewState(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]
    references: Annotated[Optional[dict], update_references]
    editor: Annotated[Optional[Editor], update_editor]


gen_qn_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            """You are an experienced Wikipedia writer and want to edit a specific page. \
Besides your identity as a Wikipedia writer, you have a specific focus when researching the topic. \
Now, you are chatting with an expert to get information. Ask good questions to get more useful information.

When you have no more questions to ask, say "Thank you so much for your help!" to end the conversation.\
Please only ask one question at a time and don't ask what you have asked before.\
Your questions should be related to the topic you want to write.
Be comprehensive and curious, gaining as much unique insight from the expert as possible.\

Stay true to your specific perspective:

{persona}""",
        ),
        MessagesPlaceholder(variable_name="messages", optional=True),
    ]
)

def tag_with_name(ai_message: AIMessage, name:str):
    ai_message.name = name
    return ai_message


def swap_roles(state: InterviewState, name):
    converted = []
    for message in state['messages']:
        if isinstance(message, AIMessage) and message.name != name:
            message = HumanMessage(**message.model_dump(exclude=("type")))
        converted.append(message)
    return {'messages': converted}


@as_runnable
async def generate_question(state:InterviewState):
    editor = state['editor']
    gn_chain = (
        RunnableLambda(swap_roles).bind(name=editor.name)
        | gen_qn_prompt.partial(persona=editor.persona)
        | fast_llm
        | RunnableLambda(tag_with_name).bind(name=editor.name)
    )
    result = await gn_chain.ainvoke(state)
    return {"messages": [result]}


print(perspectives.editors[0])

messages = [
    HumanMessage(f"So you said you were wrting an article on {example_topic}")
]

question = await generate_question.ainvoke(
    {'editor':perspectives.editors[0],
     "messages":messages}
)

question['messages'][0].content

affiliation='Independent AI Researcher' name='Dr. Anya Sharma' role='Technical Accuracy Reviewer' description='Focuses on the practical implications of large context windows for RAG, particularly concerning information retrieval accuracy and the reduction of hallucination.'


"Hello Dr. Sharma, it's a pleasure to speak with you.  My name is Dr. Anya Sharma, and I'm working on a Wikipedia article about the impact of million-plus token context window language models on Retrieval Augmented Generation (RAG). My focus is on the practical implications for information retrieval accuracy and hallucination reduction.  My first question is:  What are some of the most significant challenges you've encountered in deploying million-plus token context window models within a RAG pipeline, specifically concerning the management and processing of such large contexts?"

## Expert

In [34]:
class Queries(BaseModel):
    queries: List[str] = Field(
        description="Comprehensive list of search engine queries to answer the user's questons."

    )

gen_queries_prompt = ChatPromptTemplate.from_messages(
    [
        (
            'system',
            "You are a helpful research assistant. Query the search engine to answer the user's questions."
        ),
        MessagesPlaceholder(variable_name="messages", optional=True)
    ]
)
gen_queries_chain = gen_queries_prompt | ChatGoogleGenerativeAI(model="gemini-2.0-flash").with_structured_output(Queries, include_raw=True)

queries = await gen_queries_chain.ainvoke({
    'messages': [HumanMessage(content=question['messages'][0].content)]
})
print(f"Queries: {queries['parsed'].queries}")

Queries: ['challenges of deploying million-plus token context window models in RAG pipeline']


In [None]:
import json
from langchain_core.runnables import RunnableConfig

class AnswerWithCitations(BaseModel):
    answer: str = Field(
        description="Comprehensive answer to the user's question with citations."
    )
    cited_urls:List[str] = Field(
        description="List of urls cited in the answer"
    )
    @property
    def as_str(self) -> str:
        return f"{self.answer}\n\nCitations:\n\n" + "\n".join(f"[{i+1}]: {url}" for i, url in enumerate(self.cited_urls))
    

gen_answer_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            """You are an expert who can use information effectively. You are chatting with a Wikipedia writer who wants\
 to write a Wikipedia page on the topic you know. You have gathered the related information and will now use the information to form a response.

Make your response as informative as possible and make sure every sentence is supported by the gathered information.
Each response must be backed up by a citation from a reliable source, formatted as a footnote, reproducing the URLS after your response.""",
        ),
        MessagesPlaceholder(variable_name="messages", optional=True),
    ]
)

gen_answer_chain = gen_answer_prompt | fast_llm.with_structured_output(
    AnswerWithCitations, include_raw=True
).with_config(rum_name="GenerateAnswer")
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.tools import tool

tavily_search = TavilySearchResults(max_results=4)

@tool
async def search_engine(query:str):
    """Search engine to the internet"""
    results = tavily_search.invoke(query)
    return [{'content': r['content'], "url": r["url"]} for r in results]


async def gen_answer(
        state: InterviewState,
        config: Optional[RunnableConfig] =None,
        name: str = "Subject Matter Expert",
        max_str_len: int = 15000
):
    
    swapped_state = swap_roles(state, name)
    queries = await gen_queries_chain.ainvoke(swapped_state)
    query_result = await search_engine.abatch(
        queries['parsed'].queries, config, return_exceptions=True
    )
    successfull_result = [
        res for res in query_result if not isinstance(res, Exception)
        ]
    all_query_results = {
        res['url']: res['content'] for results in successfull_result for res in results
    }
    dumped = json.dumps(all_query_results)[:max_str_len]
    ai_message: AIMessage = queries['raw']
    tool_call = queries['raw'].tool_calls[0]
    tool_id = tool_call['id']
    tool_message = ToolMessage(tool_call_id=tool_id, content=dumped)
    swapped_state['messages'].extend([ai_message, tool_message])

    generated = await gen_answer_chain.ainvoke(swapped_state)
    cited_urls = set(generated['parsed'].cited_urls)
    cited_references = {k:v for k, v in all_query_results.items() if k in cited_urls}
    formatted_message = AIMessage(name=name, content=generated['parsed'].as_str)
    return {"messages": [formatted_message], "references": cited_references}


example_answer = await gen_answer(
    {"messages": [HumanMessage(content=question['messages'][0].content)]},
)


example_answer['messages'][-1].content

'The most significant challenges in deploying million-plus token context window models within a RAG pipeline center around the management and processing of such extensive contexts.  These challenges include the need for significant architectural changes and specialized training data to efficiently handle the increased data volume.  Furthermore, the expanded context window amplifies security risks, particularly when dealing with sensitive or proprietary information.  Debugging and tracing the source of information also becomes significantly more complex, making it difficult to pinpoint the origin of errors or unexpected outputs.  Finally, the computational resources required to process such large contexts are substantial, leading to increased latency and higher operational costs.\n\nCitations:\n\n[1]: https://aiagentslist.com/blog/is-rag-still-relevant-with-million-tokens-llms\n[2]: https://arxiv.org/html/2503.00353v1\n[3]: https://fabrity.com/blog/will-large-context-windows-kill-rag-pi

In [52]:
max_num_turns = 5



def route_messages(state: InterviewState, name: str = "Subject_Matter_Expert"):
    messages = state["messages"]
    num_responses = len(
        [m for m in messages if isinstance(m, AIMessage) and m.name == name]
    )
    if num_responses >= max_num_turns:
        return END
    last_question = messages[-2]
    if last_question.content.endswith("Thank you so much for your help!"):
        return END
    return "ask_question"


builder = StateGraph(InterviewState)

builder.add_node("ask_question", generate_question)
builder.add_node("answer_question", gen_answer)
builder.add_conditional_edges("answer_question", route_messages)
builder.add_edge("ask_question", "answer_question")

builder.set_entry_point("ask_question")
interview_graph = builder.compile().with_config(run_name="Conduct Interviews")

In [None]:
final_step = None
initial_state = {
    "editor": perspectives.editors[0],
    "messages": [AIMessage(
        content=f"So you said you were writing an article on {example_topic}?",
        name="Subject Matter Expert"
    )]
}

async for step in interview_graph.astream(initial_state):
    name=next(iter(step))
    # print(step)
    print("-- ", step)
    final_step = step
    

final_state = next(iter(final_step.values()))

--  {'ask_question': {'messages': [AIMessage(content="Hello Dr. Sharma, it's a pleasure to speak with you.  I'm writing a Wikipedia article on the impact of million-plus token context window language models on Retrieval Augmented Generation (RAG) systems, specifically focusing on how increased context windows affect the accuracy and hallucination rates of these systems. My first question is:  What are some of the most significant, empirically observed improvements in information retrieval accuracy that you've seen in RAG systems as a result of using million-plus token context windows, and what datasets or benchmarks were used to demonstrate these improvements?", additional_kwargs={}, response_metadata={'prompt_feedback': {'block_reason': 0, 'safety_ratings': []}, 'finish_reason': 'STOP', 'model_name': 'gemini-1.5-flash', 'safety_ratings': []}, name='Dr. Anya Sharma', id='run--2fca0290-3aae-451f-b0d3-8b6dd0ef88d0-0', usage_metadata={'input_tokens': 204, 'output_tokens': 116, 'total_toke

'answer_question'