In [None]:
from typing import List
class Analyst(BaseModel):
    affiliation: str=Field(
        description="Primary affiliation of an analyst."
    )
    name: str=Field(
        description="Name of the analyst."
    )
    role: str=Field(
        description="Role of the analyst in the context of the topic."
    )
    description: str=Field(
        description="Description of analyst focus, concerns and motives."
    )
    @property
    def persona(self)->str:
        return f"Name:{self.name}\nRole: {self.role}\nAffiliation:{self.affiliation}\nDescription:{self.description}\n"
    

In [None]:
class Perspectives(BaseModel):
    analysts: List[Analyst]=Field(
        description="comprehensive list of analysts with their roles and affirmations."
    )

class GenerateAnalystsState(TypedDict):
     topic: str
     max_analysts: int
     human_analyst_feedback: str
     analysts: List[Analyst]

In [None]:
analyst_instructions="""You are tasked with creating a set of AI analyst personas.Follow these instructions carefully:
1.First, review the research topic:
{topic}

2.Examine the editorial feedback that has been optionally provided to guide creation of analysts:
{human_analyst_feedback}

3.Determine the most interesting themes based upon documents or feedback above.

4.Pick the top {max_analysts} themes.

5.Assign one analyst to each theme.
"""

def create_analysts(state:GenerateAnalystsState):
    topic=state["topic"]
    max_analysts=state["max_analysts"]
    human_analyst_feedback=state.get('human_analyst_feedback','')
    structured_llm=llm.with_structured_output(Perspectives)
    system_message=analyst_instructions.format(topic=topic,human_analyst_feedback=human_analyst_feedback,max_analysts=max_analysts)
    analysts=structured_llm.invoke([SystemMessage(content=system_message)]+[HumanMessage(content="Generate a set of analysts")])
    return {"analysts":analysts.analysts}


def human_feedback(state:GenerateAnalystsState):
    pass

def should_continue(state:GenerateAnalystsState):
    human_analyst_feedback=state.get('human_analyst_feedback',None)
    if human_analyst_feedback:
        return "create_analysts"
    return END

builder=StateGraph(GenerateAnalystsState)
builder.add_node("create_analysts",create_analysts)
builder.add_node("human_feedback",human_feedback)
builder.add_edge(START,"create_analysts")
builder.add_edge("create_analysts","human_feedback")
builder.add_conditional_edges("human_feedback",should_continue,["create_analysts",END])

memory=MemorySaver()
graph=builder.compile(interrupt_before=['human_feedback'],checkpointer=memory)
display(Image(graph.get_graph().draw_mermaid_png()))

In [None]:
max_analysts=3
topic='''Benefits of drinking water'''
thread={"configurable":{"thread_id":"1"}}
for event in graph.stream({"topic":topic,"max_analysts":max_analysts},thread,stream_mode="values"):
    analysts=event.get('analysts','')
    if analysts:
        for analyst in analysts:
            print(f"Name:{analyst.name}")
            print(f"Affiliation:{analyst.affiliation}")
            print(f"Role: {analyst.role}")
            print(f"Description: {analyst.description}")

In [None]:
for event in graph.stream(None,thread,stream_mode="values"):
    analysts=event.get('analysts','')
    if analysts:
        for analyst in analysts:
            print(f"Name:{analyst.name}")
            print(f"Affiliation:{analyst.affiliation}")
            print(f"Role: {analyst.role}")
            print(f"Description: {analyst.description}")

In [None]:
analysts=final_state.values.get('analysts')

In [None]:
for analyst in analysts:
    print(f"Name:{analyst.name}")
    print(f"Affiliation:{analyst.affiliation}")
    print(f"Role: {analyst.role}")
    print(f"Description: {analyst.description}")
    print("-"*50)

In [None]:
class InterviewState(MessagesState):
    max_num_turns: int
    context:Annotated[list,operator.add]
    analyst: Analyst
    interview: str
    sections: list

class SearchQuery(BaseModel):
    search_query: str =Field(None,description="search query for retrieval")
    

In [None]:
questions_instructions="""You are an analyst tasked with interviewing an expert to learn a specific topic.
Your goal is to boil down to interesting and specific insights related to the topic.

1.Interesting: Insights that people find non obvious.
2.Specific: Insights that avoid generalities and include examples from the expert.

Here is your topic of focus and a set of goals {goals}

Begin introducing yourself by using your name that fits your persona, and then ask your question.
Continue to ask questions to drill down and refine your understanding of the topic.
When you are satisfied with your undersatnding. Complete the interview with a "Thank you for your help".
Remember to stay in character throughout your response, reflecting your persona and goals provided to you.
"""
def generate_question(state: InterviewState):
    analyst=state["analyst"]
    messages=state["messages"]

    system_message=questions_instructions.format(goals=analyst.persona)
    question=llm.invoke([SystemMessage(content=system_message)]+messages)

    return {"messages":question}


In [None]:
search_instructions=SystemMessage(content=f"""You will be given a conversation between an analyst and an expert.
Your goal is to generate a well structured query for us in retrieval or web search related to the conversation.
First analyse the full conversation.
Pay particular attention to the final question posed by the analyst.
Convert this final question into a well structured web search query""")



def search_web(state: InterviewState):
    structured_llm=llm.with_structured_output(SearchQuery)
    search_query=structured_llm.invoke([search_instructions]+state["messages"])
    tavily_search=TavilySearchResults(api_wrapper=tav,max_results=3)
    search_docs=tavily_search.invoke(search_query.search_query)
    formatted_search_docs="\n\n---\n\n".join([
        f'<Document href="{doc["url"]}"/>\n{doc["content"]}\n</Document>'
        for doc in search_docs
    ])
    return {"context":[formatted_search_docs]}

def search_wikipedia(state: InterviewState):
    structured_llm=llm.with_structured_output(SearchQuery)
    search_query=structured_llm.invoke([search_instructions]+state["messages"])
    search_docs=WikipediaLoader(query=search_query.search_query,load_max_docs=2).load()
    formatted_search_docs = "\n\n---\n\n".join([
    f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>'
    for doc in search_docs
])

    return {"context":[formatted_search_docs]}


answer_instructions=""" You are an expert being interviewed by an analyst.
Here is analyst area of focus: {goals}.
Your goal is to answer a question posed by the interviewer.
To answer the question use this context:
{context}

When answering questions follow these guidelines:
1. Use only the information provided in the context.
2. Do not make assumptions or introduce external information.
3. The context contain sources at the topic of each individual document.
4. Include these sources in your answer next to any relevant statements.
5. Skip addition of brackets as well as Document source preamble in your citation. 
"""

def generate_answer(state:InterviewState):
    analyst=state["analyst"]
    messages=state["messages"]
    context=state["context"]
    system_message=answer_instructions.format(goals=analyst.persona,context=context)
    answer=llm.invoke([SystemMessage(content=system_message)]+messages)
    answer.name="expert"
    return {"messages":[answer]}

def save_interview(state:InterviewState):
    messages=state["messages"]
    interview=get_buffer_string(messages)
    return {"interview":interview}


def route_messages(state:InterviewState,name:str='expert'):
    messages=state["messages"]
    max_num_turns=state.get('max_num_turns',2)

    num_responses=len(
        [m for m in messages if isinstance(m,AIMessage) and m.name==name]
    )

    if num_responses>=max_num_turns:
        return 'save_interview'
    last_question=messages[-2]

    if "Thank you so much for your help" in last_question.content:
        return "save_interview"
    return "ask_question"

section_writer_instructions="""You are an expert technical writer.
Your task is to create a short, easily digestible section of a report based on a set of source documents.
1.Analyse the content of the source documents.
Write the report in a proper understandable manner.
"""

def write_section(state: InterviewState):
    interview=state["interview"]
    context=state["context"]
    analyst=state["analyst"]
    system_message=section_writer_instructions.format(focus=analyst.description)
    section=llm.invoke([SystemMessage(content=system_message)]+[HumanMessage(content=f"Use this source to write your section: {context}")])
    return {"sections":[section.content]}


interview_builder=StateGraph(InterviewState)
interview_builder.add_node("ask_question",generate_question)
interview_builder.add_node("search_web",search_web)
interview_builder.add_node("search_wiki",search_wikipedia)
interview_builder.add_node("answer_question",generate_answer)
interview_builder.add_node("save_interview",save_interview)
interview_builder.add_node("write_section",write_section)


interview_builder.add_edge(START,"ask_question")
interview_builder.add_edge("ask_question","search_web")
interview_builder.add_edge("ask_question","search_wiki")
interview_builder.add_edge("search_web","answer_question")
interview_builder.add_edge("search_wiki","answer_question")
interview_builder.add_conditional_edges("answer_question",route_messages,['ask_question','save_interview'])
interview_builder.add_edge("save_interview","write_section")
interview_builder.add_edge("write_section",END)
memory=MemorySaver()
interview_graph=interview_builder.compile(checkpointer=memory).with_config(run_name="conduct_interviews")
display(Image(interview_graph.get_graph().draw_mermaid_png()))




In [None]:
messages=[HumanMessage(content=f"So you were writing an article on {topic}")]
thread={"configurable":{"thread_id":"7"}}
interview=interview_graph.invoke({"analyst":analysts[0],"messages":messages,"max_num_turns":1},thread)

In [None]:
class ResearchGraphstate(TypedDict):
    topic: str
    max_analysts: int
    human_analyst_feedback: str
    analysts: List[Analyst]
    sections: Annotated[list,operator.add]
    introduction: str
    content: str
    conclusion: str
    final_report: str

In [None]:
from langgraph.constants import Send

def initiate_all_interviews(state:ResearchGraphstate):
    human_analyst_feedback=state.get('human_analyst_feedback')
    if human_analyst_feedback:
        return "create_analysts"
    else:
        topic=state["topic"]
    return [Send("conduct_interview", {"analyst": analyst, "topic": topic, "messages": [HumanMessage(content=f"So you said you were writing an article on {topic}?")]}) for analyst in state["analysts"]]

report_writer_instructions="""You are a technical writer  creating a report on this overall topic:
{topic}

You have a team of analysts.Each analyst has done 2 things:
1.They conducted an interview with an expert on a specific sub topic
2.They write up their finding on a memo

Your task:
1.You will be given a collection of memos.
2.Think carefullt about the insights from each memo.
3.Consolidate all into a final summary with ideas from all reports.
4.It should be a single narrative

Here are the memos from your analysts to build your report from:
{context}
"""                                                                                

def write_report(state: ResearchGraphstate):
    sections=state["sections"]
    topic=state["topic"]

    formatted_sec="\n\n".join([f"{sections} for section in sections"])

    system_message=report_writer_instructions.format(topic=topic,context=formatted_sec)
    report=llm.invoke([SystemMessage(content=system_message)]+[HumanMessage(content="fWrite a report based on these memos")])
    return {"content":report.content}     

builder=StateGraph(ResearchGraphstate)
builder.add_node("create_analysts",create_analysts)
builder.add_node("human_feedback",human_feedback)
builder.add_node("conduct_interview",interview_builder.compile())
builder.add_node("write_report",write_report)

builder.add_edge(START,"create_analysts")
builder.add_edge("create_analysts","human_feedback")
builder.add_conditional_edges("human_feedback",initiate_all_interviews,["create_analysts","conduct_interview"])
builder.add_edge("conduct_interview","write_report")
builder.add_edge("write_report",END)

memory=MemorySaver()
app=builder.compile(interrupt_before=["human_feedback"],checkpointer=memory)
display(Image(app.get_graph(xray=1).draw_mermaid_png()))


In [None]:
max_analysts=2
topic='''2024 Presidential Election'''
thread={"configurable":{"thread_id":"11"}}
for event in app.stream({"topic":topic,"max_analysts":max_analysts},thread,stream_mode="values"):
    analysts=event.get('analysts','')
    if analysts:
        for analyst in analysts:
            print(f"Name:{analyst.name}")
            print(f"Affiliation:{analyst.affiliation}")
            print(f"Role: {analyst.role}")
            print(f"Description: {analyst.description}")

In [None]:
for event in app.stream(None,thread,stream_mode="values"):
    analysts=event.get('analysts','')
    if analysts:
        for analyst in analysts:
            print(f"Name:{analyst.name}")
            print(f"Affiliation:{analyst.affiliation}")
            print(f"Role: {analyst.role}")
            print(f"Description: {analyst.description}")