## Langgraph Essay Writer Agent 

You are an expert writer tasked with writing a high level outline of an essay.  Write such an outline for user provided topic.  Give an outline of the essay along with notes and instructions for the sections

##### Note: Go to JupyterLab terminal and execute following command before getting started
<pre>
    uv add langgraph
</pre>

In [1]:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated, List, Optional
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults
import os
import requests
import json
from openai import OpenAI
import httpx
from langchain.prompts import ChatPromptTemplate
from langchain.chains import LLMChain
from langchain_openai import ChatOpenAI
from langchain.agents import tool
from langchain.agents import initialize_agent, AgentType, load_tools
from langchain_core.tools import Tool
from pydantic import BaseModel, ValidationError
#from langchain_core.pydantic_v1 import BaseModel
from langchain.tools import tool
from langchain.chains.router import MultiPromptChain
from langchain.chains.router.llm_router import LLMRouterChain,RouterOutputParser
from langchain.prompts import PromptTemplate
from langgraph.checkpoint.sqlite import SqliteSaver
from tavily import TavilyClient
from langchain_community.tools.tavily_search import TavilySearchResults
from IPython.display import Image
import re
import json
from typing import Dict, Any


# connect to tavily search tool - use your tavily api key
os.environ['TAVILY_API_KEY']="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"
tavily = TavilyClient(api_key=os.environ["TAVILY_API_KEY"])
#tool = TavilySearchResults(max_results=2)

#define agent state
class AgentState(TypedDict):
    task: str
    lnode: str
    plan: str
    draft: str
    critique: str
    content: List[str]
    queries: List[str]
    revision_number: int
    max_revisions: int
    count: Annotated[int, operator.add]


#define and configure the model
# configure model
httpx_client = httpx.Client(http2=True, verify=False, timeout=10.0)

vcapservices = os.getenv('VCAP_SERVICES')
services = json.loads(vcapservices)

def is_chatservice(service):
    return service["name"] == "gen-ai-qwen3-ultra"

chat_services = filter(is_chatservice, services["genai"])
chat_credentials = list(chat_services)[0]["credentials"]

model = ChatOpenAI(temperature=0, model=chat_credentials["model_name"], base_url=chat_credentials["api_base"], api_key=chat_credentials["api_key"], http_client=httpx_client)


#define prompts
PLAN_PROMPT = """
You are an expert writer tasked with writing a high level outline of an eassy. \
Write such an outline for the user provided topic. Give an outline of eassy along \
with any relevant notes or instructions for the sections.
"""

WRITER_PROMPT = """
You are an eassy assistant tasked with writing excellent 5-paragraph eassys. \
Generate the best eassy possible for the user's request and the initial outline. \
If the user provides critique, respond with a revised version of ypur previous attempts. \

--------

{content}"""

REFLECTION_PROMPT = """
You are a teacher grading an eassy submission. \
Generate critique and recommendations for the user's submission. \
Provide detailed recommendations, including requests for length, depth, style, etc.
"""

RESEARCH_PLAN_PROMPT = """
You are a researcher charged with providing information that can \
be used when writing the following eassy. Generate a list of search queries that will gather \
any relevant information. Only generate 3 queries max
"""

RESEARCH_CRITIQUE_PROMPT = """
You are a researcher charged with providing information that can \
be used when making any requested revisions (as oulined below). \
Generate a list of search queries that will gather any relevant information. \
Only generate 3 queries max.
"""
    
def extract_json(text):
    # Remove unwanted tags like <think> and <speak>
    cleaned_text = re.sub(r'<\/?[\w\d]+>', '', text).strip()

    # Now try to extract the JSON part using regex
    match = re.search(r'\{.*\}', cleaned_text, re.DOTALL)
    if not match:
        raise ValueError("No JSON object found in response")
    return json.loads(match.group(0))

def normalize_to_queries(output: str) -> Dict[str, Any]:
    """
    Normalize LLM output into a dict matching the Queries schema.
    Always returns: {"queries": [...]}.
    Also logs the result as clean JSON.
    """

    # Remove <think>...</think> blocks if present
    output = re.sub(r"<think>.*?</think>", "", output, flags=re.DOTALL).strip()

    data: Dict[str, Any]

    # Try strict JSON parse first
    try:
        parsed = json.loads(output)
        if isinstance(parsed, dict) and "queries" in parsed:
            data = parsed
        elif isinstance(parsed, list):
            data = {"queries": parsed}
        else:
            raise ValueError("Invalid schema")
    except Exception:
        # Fallback: treat as markdown/bullet/numbered list
        lines = [
            re.sub(r'^\s*[\d\-\*\.\)]*\s*', '', line).strip(' *"`')
            for line in output.splitlines() if line.strip()
        ]
        # Deduplicate while preserving order
        seen = set()
        unique_lines = [q for q in lines if not (q in seen or seen.add(q))]
        data = {"queries": unique_lines}

    return data
    
class Queries(BaseModel):
    queries: List[str]

#implement nodes
def plan_node(state: AgentState):
    messages = [
        SystemMessage(content=PLAN_PROMPT),
        HumanMessage(content=state["task"])
    ]
    response = model.invoke(messages)
    response_content = response.content if hasattr(response, 'content') else str(response)

    # Remove <think>...</think> blocks completely
    response_content = re.sub(r"<think>.*?</think>", "", response_content, flags=re.DOTALL).strip()
    return {"plan": response_content}
    
def research_plan_node(state: dict):
    """
    Generates a research plan using a Qwen model and Tavily search.
    Works without Pydantic.
    """
    # Invoke Qwen model (plain text output)
    raw_response = model.invoke(
        [
            SystemMessage(content=RESEARCH_PLAN_PROMPT),
            HumanMessage(content=state["task"]),
        ]
    )
    response_content = getattr(raw_response, "content", str(raw_response))

    # Remove <think>...</think> if present
    response_content = re.sub(r"<think>.*?</think>", "", response_content, flags=re.DOTALL).strip()

    # Normalize into a list of queries (handle bullets, numbers, etc.)
    lines = [
        re.sub(r'^\s*[\d\-\*\.\)]*\s*', '', line).strip(' *"`')
        for line in response_content.splitlines()
        if line.strip()
    ]
    # Deduplicate
    seen = set()
    queries_list = [q for q in lines if not (q in seen or seen.add(q))]

    # Initialize content
    content = state.get("content", [])

    # Perform Tavily searches
    for q in queries_list:
        try:
            response = tavily.search(query=q, max_results=2)
            for r in response.get("results", []):
                content_piece = r.get("content", "")
                if content_piece:
                    content.append(str(content_piece))
        except Exception as e:
            print(f"Search failed for query '{q}': {e}")

    return {
        "content": content,
    }
def generation_node(state: AgentState):
    content = "\n\n".join(["content"] or [])
    user_message = HumanMessage(content=f"{state['task']}\n\nHere is my plan:\n\n{state['plan']}")
    messages = [
        SystemMessage(content=WRITER_PROMPT.format(content=content)),
        user_message,
    ]
    response = model.invoke(messages)
    response_content = response.content if hasattr(response, 'content') else str(response)

    # Remove <think>...</think> blocks completely
    response_content = re.sub(r"<think>.*?</think>", "", response_content, flags=re.DOTALL).strip()
    return {
        "draft": response_content,
        "revision_number": state.get("revision_number", 1) + 1,
    }
def reflection_node(state: AgentState):
    messages = [
        SystemMessage(content=REFLECTION_PROMPT),
        HumanMessage(content=state['draft']),
    ]
    response = model.invoke(messages)
    response_content = response.content if hasattr(response, 'content') else str(response)

    # Remove <think>...</think> blocks completely
    response_content = re.sub(r"<think>.*?</think>", "", response_content, flags=re.DOTALL).strip()

    return {"critique": response_content}

def research_critique_node(state: AgentState):
    """
    Generates a research plan using a Qwen model and Tavily search.
    Works without Pydantic.
    """
    # Invoke Qwen model (plain text output)
    raw_response = model.invoke(
        [
            SystemMessage(content=RESEARCH_CRITIQUE_PROMPT),
            HumanMessage(content=state["critique"]),
        ]
    )
    response_content = getattr(raw_response, "content", str(raw_response))

    # Remove <think>...</think> if present
    response_content = re.sub(r"<think>.*?</think>", "", response_content, flags=re.DOTALL).strip()

    # Normalize into a list of queries (handle bullets, numbers, etc.)
    lines = [
        re.sub(r'^\s*[\d\-\*\.\)]*\s*', '', line).strip(' *"`')
        for line in response_content.splitlines()
        if line.strip()
    ]
    # Deduplicate
    seen = set()
    queries_list = [q for q in lines if not (q in seen or seen.add(q))]

    # Initialize content
    content = state.get("content", [])

    # Perform Tavily searches
    for q in queries_list:
        try:
            response = tavily.search(query=q, max_results=2)
            for r in response.get("results", []):
                content_piece = r.get("content", "")
                if content_piece:
                    content.append(str(content_piece))
        except Exception as e:
            print(f"Search failed for query '{q}': {e}")

    return {
        "content": content,
    }

def should_continue(state):
    if state["revision_number"] > state["max_revisions"]:
        return END
    return "reflect"


#build graph
builder = StateGraph(AgentState)
builder.add_node("planner", plan_node)
builder.add_node("generate", generation_node)
builder.add_node("reflect", reflection_node)
builder.add_node("research_plan", research_plan_node)
builder.add_node("research_critique", research_critique_node)

#set entry point
builder.set_entry_point("planner")

#define conditional edges
builder.add_conditional_edges(
    "generate", 
    should_continue, 
    {END: END, "reflect": "reflect"}
)

#define edges
builder.add_edge("planner", "research_plan")
builder.add_edge("research_plan", "generate")

builder.add_edge("reflect", "research_critique")
builder.add_edge("research_critique", "generate")



#print graph
#Image(graph.get_graph().draw_png())


thread = {"configurable": {"thread_id": "1"}}
with SqliteSaver.from_conn_string(":memory:") as checkpointer:
    graph = builder.compile(checkpointer=checkpointer)
    for s in graph.stream({
        'task': "what is the difference between langchain and langsmith",
        "max_revisions": 2,
        "revision_number": 1,
    }, thread):
        print(s)

{'planner': {'plan': '### **Essay Outline: Understanding the Difference Between LangChain and LangSmith**\n\n---\n\n#### **I. Introduction**  \n- **Purpose**: Introduce the growing importance of language model frameworks in AI development.  \n- **Thesis**: While LangChain is a framework for building language model applications, LangSmith is a platform for developing, testing, and deploying these applications.  \n- **Relevance**: Highlight how both tools complement each other in the AI ecosystem.\n\n---\n\n#### **II. What is LangChain?**  \n- **Definition**: LangChain is an open-source framework designed to simplify the creation of applications powered by language models (e.g., GPT, LLaMA).  \n- **Key Features**:  \n  - **Modular Architecture**: Allows developers to connect language models, data sources, and outputs via "chains."  \n  - **Support for Multiple Models**: Integrates with various LLMs (e.g., OpenAI, Hugging Face).  \n  - **Customization**: Enables building complex workflows

In [2]:
from helper import ewriter, writer_gui
MultiAgent = ewriter()
app = writer_gui(MultiAgent.graph)
app.launch(share=True)

* Running on local URL:  http://0.0.0.0:7860
* Running on public URL: https://465722abfb47c07dec.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


In [3]:
app.graph.get_graph().print_ascii()

                     +-----------+                  
                     | __start__ |                  
                     +-----------+                  
                            *                       
                            *                       
                            *                       
                      +---------+                   
                      | planner |                   
                      +---------+                   
                            *                       
                            *                       
                            *                       
                   +---------------+                
                   | research_plan |                
                   +---------------+                
                            *                       
                            *                       
                            *                       
                      +----------+            