# Multi-Agent Workflows + RAG — LangGraph (Session 6)

This notebook builds a **multi-agentic LangGraph** application with two teams (Research + Document Writing) and a meta-supervisor. It includes fixes for:
- Tavily import deprecation (`langchain_tavily`)
- `read_document` start/end bug
- RAG context serialization (convert `Document` objects to text)
- `reference_previous_responses` returns text instead of raw Document repr
- Clean streaming print helpers

You will need **OpenAI** and **Tavily** API keys.


## 🔧 Dependencies & Environment

In [None]:
import os, getpass, nest_asyncio
nest_asyncio.apply()
os.environ['OPENAI_API_KEY'] = getpass.getpass('OpenAI API Key:')
os.environ['TAVILY_API_KEY'] = getpass.getpass('TAVILY_API_KEY:')

## 🧱 Task 1 — Simple LangGraph RAG
We load a local PDF (in `data/`) and build a Qdrant-in-memory vector store, then a simple 2-node graph: `retrieve -> generate`.

In [None]:
from langchain_community.document_loaders import DirectoryLoader, PyMuPDFLoader
from langchain_openai.embeddings import OpenAIEmbeddings
from langchain_community.vectorstores import Qdrant
from langchain_core.documents import Document
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
from langgraph.graph import START, StateGraph
from typing import TypedDict, List
from typing_extensions import Annotated
import tiktoken
import operator
from langchain.text_splitter import RecursiveCharacterTextSplitter
def tiktoken_len(text: str) -> int:
    try:
        enc = tiktoken.encoding_for_model('gpt-4o')
    except Exception:
        enc = tiktoken.get_encoding('cl100k_base')
    return len(enc.encode(text))
directory_loader = DirectoryLoader('data', glob='**/*.pdf', loader_cls=PyMuPDFLoader)
how_people_use_ai_documents = directory_loader.load()
text_splitter = RecursiveCharacterTextSplitter(chunk_size=750, chunk_overlap=0, length_function=tiktoken_len)
how_people_use_ai_chunks = text_splitter.split_documents(how_people_use_ai_documents)
len(how_people_use_ai_chunks)

In [None]:
embedding_model = OpenAIEmbeddings(model='text-embedding-3-small')
qdrant_vectorstore = Qdrant.from_documents(documents=how_people_use_ai_chunks, embedding=embedding_model, location=':memory:')
qdrant_retriever = qdrant_vectorstore.as_retriever()

In [None]:
HUMAN_TEMPLATE = """
# CONTEXT:
{context}

QUERY:
{query}

Use the provided context to answer the provided user query. Only use the provided context to answer the query. If you do not know the answer, or it's not contained in the provided context respond with "I don't know".
"""
chat_prompt = ChatPromptTemplate.from_messages([('human', HUMAN_TEMPLATE)])
generator_llm = ChatOpenAI(model='gpt-4o-mini', temperature=0)
class RAGState(TypedDict):
    question: str
    context: List[Document]
    response: str
def _format_docs(docs: List[Document]) -> str:
    return '\n\n'.join([d.page_content for d in docs])
def retrieve(state: RAGState):
    return {'context': qdrant_retriever.invoke(state['question'])}
def generate(state: RAGState):
    generator_chain = chat_prompt | generator_llm | StrOutputParser()
    response = generator_chain.invoke({'query': state['question'], 'context': _format_docs(state['context'])})
    return {'response': response}
rag_graph = StateGraph(RAGState).add_sequence([retrieve, generate])
rag_graph.add_edge(START, 'retrieve')
compiled_rag_graph = rag_graph.compile()
compiled_rag_graph.invoke({'question': 'How does the average person use AI?'})

## 🧩 Task 2 — Helper Functions for Agent Graphs

In [None]:
from typing import Any, Dict
import functools
import operator
from langchain.agents import AgentExecutor, create_openai_functions_agent
from langchain.output_parsers.openai_functions import JsonOutputFunctionsParser
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import AIMessage, BaseMessage, HumanMessage
from langchain_core.tools import BaseTool
from langgraph.graph import END
from langchain_openai import ChatOpenAI
def agent_node(state, agent, name):
    result = agent.invoke(state)
    return {'messages': [HumanMessage(content=result['output'], name=name)]}
def create_agent(llm: ChatOpenAI, tools: list, system_prompt: str) -> AgentExecutor:
    system_prompt += ('\nWork autonomously according to your specialty, using the tools available to you.'
                      ' Do not ask for clarification.'
                      ' Your other team members (and other teams) will collaborate with you with their own specialties.'
                      ' You are chosen for a reason!')
    prompt = ChatPromptTemplate.from_messages([
        ('system', system_prompt),
        MessagesPlaceholder(variable_name='messages'),
        MessagesPlaceholder(variable_name='agent_scratchpad'),
    ])
    agent = create_openai_functions_agent(llm, tools, prompt)
    return AgentExecutor(agent=agent, tools=tools)
def create_team_supervisor(llm: ChatOpenAI, system_prompt: str, members: list):
    options = ['FINISH'] + members
    function_def = {'name':'route','description':'Select the next role.','parameters':{'title':'routeSchema','type':'object','properties':{'next':{'title':'Next','anyOf':[{'enum':options}],}},'required':['next'],}}
    prompt = ChatPromptTemplate.from_messages([
        ('system', system_prompt),
        MessagesPlaceholder(variable_name='messages'),
        ('system','Given the conversation above, who should act next? Or should we FINISH? Select one of: {options}'),
    ]).partial(options=str(options), team_members=', '.join(members))
    return prompt | llm.bind_functions(functions=[function_def], function_call='route') | JsonOutputFunctionsParser()

## 🔎 Task 3 — Research Team (Search + RAG)

In [None]:
from typing import Annotated, Optional
from langchain_core.tools import tool
from langchain_tavily import TavilySearch
# Tavily tool will be initialized when API key is available
tavily_tool = None
try:
    tavily_tool = TavilySearch(max_results=5)
except Exception as e:
    print(f"Tavily tool initialization failed (API key required): {e}")
    # Create a mock tool for testing
    from langchain_core.tools import tool
    @tool
    def mock_tavily_search(query: str) -> str:
        """Mock Tavily search tool for testing."""
        return f"Mock search results for: {query}"
    tavily_tool = mock_tavily_search
@tool
def retrieve_information(query: Annotated[str, 'query to ask the retrieve information tool']):
    """Use Retrieval Augmented Generation to retrieve information about how people use AI"""
    return compiled_rag_graph.invoke({'question': query})
class ResearchTeamState(TypedDict):
    messages: Annotated[List[BaseMessage], operator.add]
    team_members: List[str]
    next: str
research_llm = ChatOpenAI(model='gpt-4o-mini', temperature=0)
search_agent = create_agent(research_llm, [tavily_tool], 'You are a research assistant who can search for up-to-date info using the tavily search engine.')
search_node = functools.partial(agent_node, agent=search_agent, name='Search')
research_agent = create_agent(research_llm, [retrieve_information], 'You are a research assistant who can provide specific information on how people use AI')
research_node = functools.partial(agent_node, agent=research_agent, name='HowPeopleUseAIRetriever')
research_supervisor_agent = create_team_supervisor(
    research_llm,
    ('You are a supervisor tasked with managing a conversation between the following workers:  Search, HowPeopleUseAIRetriever. '
     'Given the following user request, determine the subject to be researched and respond with the worker to act next. '
     'Each worker will perform a task and respond with their results and status. '
     'You should never ask your team to do anything beyond research. They are not required to write content or posts. '
     'You should only pass tasks to workers that are specifically research focused. When finished, respond with FINISH.'),
    ['Search','HowPeopleUseAIRetriever']
)
research_graph = StateGraph(ResearchTeamState)
research_graph.add_node('Search', search_node)
research_graph.add_node('HowPeopleUseAIRetriever', research_node)
research_graph.add_node('ResearchSupervisor', research_supervisor_agent)
research_graph.add_edge('Search','ResearchSupervisor')
research_graph.add_edge('HowPeopleUseAIRetriever','ResearchSupervisor')
research_graph.add_conditional_edges('ResearchSupervisor', lambda x: x['next'], {'Search':'Search','HowPeopleUseAIRetriever':'HowPeopleUseAIRetriever','FINISH':END})
research_graph.set_entry_point('ResearchSupervisor')
compiled_research_graph = research_graph.compile()
def enter_research_chain(message: str):
    return {'messages':[HumanMessage(content=message)]}
research_chain = enter_research_chain | compiled_research_graph
def pretty(s):
    for k,v in s.items():
        msgs = v.get('messages',[])
        if msgs:
            text = getattr(msgs[-1],'content','')[:200].replace('\n',' ')
            print(f'[{k}] {text}...')
        else:
            nxt = v.get('next')
            if nxt:
                print(f'[{k}] -> {nxt}')
for s in research_chain.stream('How are people using AI to improve their lives?', {'recursion_limit':60}):
    if '__end__' not in s:
        pretty(s)

### ❓ Question #1: Why a “powerful” LLM here?
**A1.1**: Multi-step routing + tool choice under uncertainty (when to search vs use RAG; merging heterogeneous evidence) requires stronger planning and calibration.

**A1.2**: The agent must decompose goals, decide tool order, judge sufficiency of evidence, and synthesize for a target audience—weak models loop or hallucinate here.

## ✍️ Task 4 — Document Writing Team (Plan, Write, Edit)

In [None]:
from langchain_community.document_loaders import CSVLoader
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Dict
import uuid, os
previous_cohort_loader = CSVLoader('data/AIE7_Projects_with_Domains.csv', content_columns=['Project Domain','Secondary Domain (if any)'])
previous_cohort = previous_cohort_loader.load()
qdrant_previous_cohort_vectorstore = Qdrant.from_documents(documents=previous_cohort, embedding=embedding_model, location=':memory:')
qdrant_previous_cohort_retriever = qdrant_previous_cohort_vectorstore.as_retriever()
os.makedirs('./content/data', exist_ok=True)
def create_random_subdirectory():
    random_id = str(uuid.uuid4())[:8]
    p = os.path.join('./content/data', random_id)
    os.makedirs(p, exist_ok=True)
    return p
WORKING_DIRECTORY = Path(create_random_subdirectory())

In [None]:
from typing import Annotated, Optional
from langchain_core.tools import tool
@tool
def create_outline(points: Annotated[List[str],'List of main points or sections.'], file_name: Annotated[str,'File path to save the outline.']) -> Annotated[str,'Path of the saved outline file.']:
    """Create an outline file with the given points."""
    with (WORKING_DIRECTORY / file_name).open('w') as f:
        for i, pt in enumerate(points):
            f.write(f"{i+1}. {pt}\n")
    return f'Outline saved to {file_name}'
@tool
def read_document(file_name: Annotated[str,'File path to read.'], start: Annotated[Optional[int],'1-indexed start line (default 1)']=1, end: Annotated[Optional[int],'1-indexed inclusive end line (default None)']=None) -> str:
    """Read a document file with optional line range."""
    p = (WORKING_DIRECTORY / file_name)
    with p.open('r') as f:
        lines = f.readlines()
    s = max(1, (start or 1)) - 1
    e = None if end is None else max(1, end)
    return ''.join(lines[s:e])
@tool
def write_document(content: Annotated[str,'Text content to be written into the document.'], file_name: Annotated[str,'File path to save the document.']) -> Annotated[str,'Path of the saved document file.']:
    """Write content to a document file."""
    with (WORKING_DIRECTORY / file_name).open('w') as f:
        f.write(content)
    return f'Document saved to {file_name}'
@tool
def reference_previous_responses(query: Annotated[str,'The query to search for in the previous responses.']) -> Annotated[str,'The previous responses that match the query.']:
    """Search for previous responses using the query."""
    docs = qdrant_previous_cohort_retriever.invoke(query)
    return '\n\n'.join([d.page_content for d in docs])
@tool
def edit_document(file_name: Annotated[str,'Path of the document to be edited.'], inserts: Annotated[Dict[int,str],'{line_number: text} using 1-indexed lines'] = {}) -> Annotated[str,'Path of the edited document file.']:
    """Edit a document by inserting text at specified line numbers."""
    p = (WORKING_DIRECTORY / file_name)
    with p.open('r') as f:
        lines = f.readlines()
    for ln, txt in sorted(inserts.items()):
        if 1 <= ln <= len(lines)+1:
            lines.insert(ln-1, txt+'\n')
        else:
            return f'Error: Line number {ln} is out of range.'
    with p.open('w') as f:
        f.writelines(lines)
    return f'Document edited and saved to {file_name}'

### Document Writing State, Prelude, and Nodes

In [None]:
class DocWritingState(TypedDict):
    messages: Annotated[List[BaseMessage], operator.add]
    team_members: str
    next: str
    current_files: str
def prelude(state):
    written_files = []
    if not WORKING_DIRECTORY.exists():
        WORKING_DIRECTORY.mkdir(parents=True, exist_ok=True)
    try:
        written_files = [f.relative_to(WORKING_DIRECTORY) for f in WORKING_DIRECTORY.rglob('*')]
    except Exception:
        pass
    if not written_files:
        return {**state, 'current_files': 'No files written.'}
    return {**state, 'current_files': '\nBelow are files your team has written to the directory:\n' + '\n'.join([f' - {f}' for f in written_files])}
authoring_llm = ChatOpenAI(model='gpt-4o-mini', temperature=0)
doc_writer_agent = create_agent(authoring_llm, [write_document, edit_document, read_document], 'You are an expert writing customer assistance responses.\nBelow are files currently in your directory:\n{current_files}')
context_aware_doc_writer_agent = prelude | doc_writer_agent
doc_writing_node = functools.partial(agent_node, agent=context_aware_doc_writer_agent, name='DocWriter')
note_taking_agent = create_agent(authoring_llm, [create_outline, read_document, reference_previous_responses], 'You are an expert senior researcher tasked with writing a customer assistance outline and taking notes to craft a customer assistance response.\n{current_files}')
context_aware_note_taking_agent = prelude | note_taking_agent
note_taking_node = functools.partial(agent_node, agent=context_aware_note_taking_agent, name='NoteTaker')
copy_editor_agent = create_agent(authoring_llm, [write_document, edit_document, read_document], 'You are an expert copy editor who focuses on fixing grammar, spelling, and tone issues\nBelow are files currently in your directory:\n{current_files}')
context_aware_copy_editor_agent = prelude | copy_editor_agent
copy_editing_node = functools.partial(agent_node, agent=context_aware_copy_editor_agent, name='CopyEditor')
authoring_supervisor_agent = create_team_supervisor(authoring_llm, 'You are a supervisor tasked with managing a conversation between the following workers: {team_members}. You should always verify the technical contents after any edits are made. Given the following user request, respond with the worker to act next. Each worker will perform a task and respond with their results and status. When each team is finished, you must respond with FINISH.', ['DocWriter','NoteTaker','CopyEditor'])
from langgraph.graph import StateGraph
authoring_graph = StateGraph(DocWritingState)
authoring_graph.add_node('DocWriter', doc_writing_node)
authoring_graph.add_node('NoteTaker', note_taking_node)
authoring_graph.add_node('CopyEditor', copy_editing_node)
authoring_graph.add_node('AuthoringSupervisor', authoring_supervisor_agent)
authoring_graph.add_edge('DocWriter','AuthoringSupervisor')
authoring_graph.add_edge('NoteTaker','AuthoringSupervisor')
authoring_graph.add_edge('CopyEditor','AuthoringSupervisor')
authoring_graph.add_conditional_edges('AuthoringSupervisor', lambda x: x['next'], {'DocWriter':'DocWriter','NoteTaker':'NoteTaker','CopyEditor':'CopyEditor','FINISH':END})
authoring_graph.set_entry_point('AuthoringSupervisor')
compiled_authoring_graph = authoring_graph.compile()
def enter_authoring_chain(message: str, members: List[str]):
    return {'messages':[HumanMessage(content=message)], 'team_members': ', '.join(members)}
authoring_chain = functools.partial(enter_authoring_chain, members=authoring_graph.nodes) | compiled_authoring_graph
for s in authoring_chain.stream('What are the most common use-cases in this data. What are the most common domains?', {'recursion_limit':80}):
    if '__end__' not in s:
        pretty(s)

### 🧠 Q2: How to force specific tools / flows?
- Constrain tools per agent (each agent only sees the tools it should use).
- Use a supervisor with a routing schema that only lists allowed next steps.
- Insert guard nodes (e.g., always call RAG after Search before writing).
- Add checks (e.g., if no citations, route back to Search).

## 🕸️ Task 5 — Meta-Supervisor (Full Graph)

In [None]:
super_llm = ChatOpenAI(model='gpt-4o-mini', temperature=0)
super_supervisor_agent = create_team_supervisor(super_llm, 'You are a supervisor tasked with managing a conversation between the following teams: {team_members}. Given the following user request, respond with the worker to act next. Each worker will perform a task and respond with their results and status. When all workers are finished, you must respond with FINISH.', ['Research team','Response team'])
class SuperState(TypedDict):
    messages: Annotated[List[BaseMessage], operator.add]
    next: str
def get_last_message(state: SuperState) -> str:
    return state['messages'][-1].content
def join_graph(response: dict):
    return {'messages':[response['messages'][-1]]}
from langgraph.graph import StateGraph
super_graph = StateGraph(SuperState)
super_graph.add_node('Research team', get_last_message | research_chain | join_graph)
super_graph.add_node('Response team', get_last_message | authoring_chain | join_graph)
super_graph.add_node('SuperSupervisor', super_supervisor_agent)
super_graph.add_edge('Research team','SuperSupervisor')
super_graph.add_edge('Response team','SuperSupervisor')
super_graph.add_conditional_edges('SuperSupervisor', lambda x: x['next'], {'Response team':'Response team','Research team':'Research team','FINISH':END})
super_graph.set_entry_point('SuperSupervisor')
compiled_super_graph = super_graph.compile()
from pathlib import Path
def create_random_subdirectory():
    import uuid, os
    p = Path('./content/data')/str(uuid.uuid4())[:8]
    p.mkdir(parents=True, exist_ok=True)
    return p
WORKING_DIRECTORY = Path(create_random_subdirectory())
for s in compiled_super_graph.stream({'messages':[HumanMessage(content='Write a report on the rise of context engineering in the LLM Space in 2025, and how it\'s impacting how people are using AI.')]} , {'recursion_limit':40}):
    if '__end__' not in s:
        pretty(s)

### ✅ Notes
- Ensure you have `data/` present with the referenced PDF and CSV files.
- For reproducibility, pin versions in `requirements.txt` as suggested in the instructions.