In [1]:
# stage6_end_to_end.py
from langgraph.graph import StateGraph, MessagesState, END
from langchain_openai import ChatOpenAI
from langchain.tools import tool
from langchain_core.messages import SystemMessage, HumanMessage, ToolMessage
import wikipedia
import json
import os
from typing import List
from dotenv import load_dotenv
from typing import List, Dict, Annotated
from langgraph.graph.message import add_messages # Import add_messages
from langchain_core.runnables import RunnableConfig
load_dotenv("../../../config/local.env")

  from .autonotebook import tqdm as notebook_tqdm


True

In [2]:
import operator # Import the operator module

class CustomMessagesState(MessagesState):
    draft: str
    feedback: str
    score: int
    task: str
    revise_iter: int
    subtasks: str
    subtask_index: int
    research: Annotated[List[Dict], operator.add]
    next_node: str
    tool_args: str
    last_research_msg: str
    error: str

In [3]:

# ---------- Config ----------
MODEL = "gpt-4o-mini"
EVAL_THRESHOLD = 8
MAX_REVISE_ITER = 2
CHECKPOINT_FILE = "agent_checkpoints.json"

# ---------- Helpers: simple file memory ----------
def load_checkpoints():
    if os.path.exists(CHECKPOINT_FILE):
        with open(CHECKPOINT_FILE, "r", encoding="utf-8") as f:
            return json.load(f)
    return {}

def save_checkpoint(key, value):
    data = load_checkpoints()
    data[key] = value
    with open(CHECKPOINT_FILE, "w", encoding="utf-8") as f:
        json.dump(data, f, ensure_ascii=False, indent=2)

In [4]:
# ---------- Tools ----------

@tool
def wiki_search(query: str) -> str:
    """Search Wikipedia for a summary on the given query."""
    try:
        return wikipedia.summary(query, sentences=2)
    except Exception:
        return "No summary found."

@tool
def calculator(expression: str) -> str:
    """Evaluate a mathematical expression."""
    try:
        return str(eval(expression))
    except Exception as e:
        return f"Error: {e}"


import pandas as pd
import io

@tool
def read_csv_tool(input_str: str) -> str:
    """
    Read a CSV file from a string or base64-encoded bytes and return its contents.
    Input format:
    - For text CSV: pass CSV text directly
    - For file bytes: pass base64 string starting with 'base64:'
    Returns:
        CSV content preview + column names.
    """
    try:
        print("REACHED read_csv_tool")
        if input_str.startswith("base64:"):
            import base64
            bdata = base64.b64decode(input_str.replace("base64:", ""))
            df = pd.read_csv(io.BytesIO(bdata))
        else:
            df = pd.read_csv(io.StringIO(input_str))

        preview = df.head().to_string()
        cols = ", ".join(df.columns)

        return f"Columns: {cols}\n\nPreview:\n{preview}"

    except Exception as e:
        return f"Error reading CSV: {str(e)}"


TOOLS = [wiki_search, calculator, read_csv_tool]

# ---------- LLM ----------
llm = ChatOpenAI(model=MODEL)

In [5]:
import re

def extract_numerator(feedback: str, default: int = 7) -> int:
    """
    Extracts the first integer before a slash (/) in the given feedback string.
    Returns `default` if no such pattern is found.
    """
    match = re.search(r'\b(\d+)(?=\s*/)', feedback)
    return int(match.group(1)) if match else default

In [6]:

# ---------- System prompts ----------
SYSTEM_PLANNER = SystemMessage(content=("You are a planner that breaks a high-level task into subtasks."
    "Understand the user's main task and create clear, manageable subtasks to achieve it."
    "If the task involves CSV data, ensure at least one subtask addresses it. and if required include the data context from the user provided context"
))
SYSTEM_RESEARCH = SystemMessage(content=(
    "You are a researcher that decides which tool to call. "
    "Return an LLM response that may include a tool_call if you need external data."
    "If the user provides a CSV or requests analysis of CSV data, you MUST call the 'read_csv_tool' tool."
))
SYSTEM_WRITER = SystemMessage(content="You are a writer. Synthesize the provided research into a concise, clear answer.")
SYSTEM_EVALUATOR = SystemMessage(content="You are an evaluator. Score from 1-10 and provide short critique and improvement points.")

In [None]:
# ---------- Node implementations ----------
def planner_node(state: CustomMessagesState):
    try:
        user_task = state["task"]
        # simple decomposition prompt
        prompt = f"Task: {user_task}\nBreak this into 2 short subtasks (one sentence each). Make the subtasks pointed, coincise and task oriented to cover the task requirements. \
                you only have access to tools like wiki_search, calculator and read_csv_tool. if the task involves csv data, ensure one subtask is about reading and understanding the csv data provided."
        resp = llm.invoke([SYSTEM_PLANNER, HumanMessage(content=prompt)])
        subtasks = [s.strip() for s in resp.content.split("\n") if s.strip()]
        # fallback if LLM didn't give 3 lines
        if len(subtasks) < 1:
            subtasks = [user_task]
        print("Planner generated subtasks:", subtasks)
        return {"subtasks": subtasks, "subtask_index": 0, "research": []}
    except Exception as e:
        return {"error": str(e)}


def researcher_node(state: CustomMessagesState):
    try:
        idx = state["subtask_index"]
        subtasks: List[str] = state["subtasks"]
        if idx >= len(subtasks):
            # nothing left
            return {} #{"done": True}
        current = subtasks[idx]
        # create messages including system and brief history
        # messages = [SYSTEM_RESEARCH, HumanMessage(content=f"Research this: {current}")]

        messages = [
            SYSTEM_RESEARCH,
            HumanMessage(
                content=(
                    f"Research this subtask:\n{current}\n\n"
                    f"Original user data (for reference):\n{state.get('task','')}"
                )
            )
        ]

        # allow model to call tools
        response = llm.bind_tools(TOOLS).invoke(messages)
        # If tool call -> handle via tool node (route will be to 'tool')
        if response.tool_calls:
            # carry needed context for tool node
            return {
                "research": state.get("research", []),
                "subtasks": subtasks,
                "subtask_index": idx,
                "next_node": response.tool_calls[0]["name"],
                "tool_args": response.tool_calls[0]["args"],
                "last_research_msg": response  # store the model message that requested the tool
            }

        # If no tool call, treat response as observation/result
        observations = state.get("research", []) + [{"subtask": current, "result": response.content}]
        return {
            "research": observations,
            "subtasks": subtasks,
            "subtask_index": idx + 1
        }
    except Exception as e:
        return {"error": str(e)}


def tool_node(state: CustomMessagesState):
    try:
        subtasks = state.get("subtasks", [])
        idx = state.get("subtask_index", 0)

        # Safety check
        if not subtasks or idx >= len(subtasks):
            print("[tool_node] No valid subtask found, skipping tool execution.")
            #return {"done": True}
            return {
                "research": state.get("research", []),
                "subtasks": subtasks,
                "subtask_index": idx,
                "next_node": None,
            }
        tool_name = state["next_node"]
        tool_args = state["tool_args"]
        last_model_msg = state["last_research_msg"]
        # execute appropriate tool
        result = None
        for t in TOOLS:
            if t.name == tool_name:
                result = t.invoke(tool_args)
                break
        if result is None: 
            result = f"Tool {tool_name} not found."

        # Build a ToolMessage linked to the tool_call_id from the stored model message
        tool_call_id = None
        if hasattr(last_model_msg, "tool_calls") and last_model_msg.tool_calls:
            tool_call_id = last_model_msg.tool_calls[0]["id"]
        # if no id, generate a safe fallback
        if not tool_call_id:
            tool_call_id = "auto_tool_call"

        tool_msg = ToolMessage(content=result, tool_call_id=tool_call_id)
        # append tool observation into research history
        observations = state.get("research", []) + [{"subtask": state["subtasks"][state["subtask_index"]], "result": result}]
        return {
            "research": observations,
            "subtasks": state["subtasks"],
            "subtask_index": state["subtask_index"] + 1,  # move to next subtask after tool result
        }
    except Exception as e:
        return {"error": str(e)}


def writer_node(state: CustomMessagesState):
    try:
        print("Reached writer node")
        # gather all research into a single prompt
        research = state.get("research", [])
        topic = state.get("task")
        research_text = "\n".join([f"- {r['subtask']}: {r['result']}" for r in research])
        prompt = f"Topic: {topic}\nResearch gathered:\n{research_text}\n\nWrite final answer (concise)."
        resp = llm.invoke([SYSTEM_WRITER, HumanMessage(content=prompt)])
        return {"draft": resp.content, "research": research, "task": topic, "revise_iter": 0}
    except Exception as e:
        return {"error": str(e)}

def evaluator_node(state: CustomMessagesState):
    try:
        print("Reached Eval Node")
        draft = state["draft"]
        prompt = f"Text:\n{draft}\n\nGive a score from 1–10 in the format `score: x/10`, then give a 1-line critique and 1 improvement suggestion."
        resp = llm.invoke([SYSTEM_EVALUATOR, HumanMessage(content=prompt)])
        # crude score parsing
        score = extract_numerator(resp.content, default=7)
        print(f"Evaluation score: {score}, feedback: {resp.content}")
        return {"draft": draft, "feedback": resp.content, "score": score, "revise_iter": state.get("revise_iter", 0)}
    except Exception as e:
        print(f"Error in evaluator_node: {str(e)}")
        return {"error": str(e)}


def reviser_node(state: CustomMessagesState):
    try:
        draft = state["draft"]
        feedback = state["feedback"]
        iter_count = state.get("revise_iter", 0) + 1
        prompt = f"Improve the draft based on this feedback:\nFeedback: {feedback}\n\nDraft:\n{draft}"
        resp = llm.invoke([HumanMessage(content=prompt)])
        return {"draft": resp.content, "revise_iter": iter_count, "feedback": state["feedback"], "score": state["score"], "task": state.get("task")}
    except Exception as e:
        return {"error": str(e)}

def error_node(state: CustomMessagesState):
    return {"draft": f"Agent failed safely. Error: {state.get('error')}"}

def route_researcher(state: CustomMessagesState):
    if ("next_node" in state) and (state["next_node"] is not None):
        return "tool"
    
    idx = state.get("subtask_index", 0)
    subtasks = state.get("subtasks", [])
    
    # Check if there are still subtasks left to process
    if idx < len(subtasks):
        return "researcher"
    else:
        # All subtasks complete
        return "writer"
    

def route_eval(state:CustomMessagesState):
    score = state.get("score", 0)
    revise_iter = state.get("revise_iter", 0)
    if score < EVAL_THRESHOLD and revise_iter < MAX_REVISE_ITER:
        return "reviser"
    else:
        return END

def route_errors(state):
    if "error" in state:
        return "error"

In [None]:

#-----------------------------------------------

builder1 = StateGraph(CustomMessagesState)

builder1.add_node("planner", planner_node)
builder1.add_node("researcher", researcher_node)
builder1.add_node("tool", tool_node)
builder1.add_node("writer", writer_node)
builder1.add_node("evaluator", evaluator_node)
builder1.add_node("reviser", reviser_node)
# builder1.add_node("error", error_node)

builder1.set_entry_point("planner")
builder1.add_edge("planner", "researcher")

builder1.add_conditional_edges("researcher", route_researcher, {"tool": "tool", "writer": "writer"})
# builder1.add_conditional_edges("tool", route_tool, {"researcher": "researcher", END: END})
builder1.add_edge("tool", "researcher")

builder1.add_edge("writer", "evaluator")
builder1.add_conditional_edges("evaluator", route_eval, {"reviser": "reviser", END: END})
builder1.add_edge("reviser", "evaluator")

# builder1.add_edge("planner", "error")
# builder1.add_edge("researcher", "error")
# builder1.add_edge("tool", "error")
# builder1.add_edge("writer", "error")

graph1 = builder1.compile()

In [8]:
custom_recursion_limit = 25

# Create a RunnableConfig object with the specified recursion limit
config = RunnableConfig(recursion_limit=custom_recursion_limit)

In [10]:
# graph1
# prompt1 -> "Write a detailed analysis of the following CSV data:\n\nName,Age,Occupation,Salary\nAlice,30,Engineer,70000\nBob,25,Designer,50000\n;Diana,28,Doctor,80000"


In [9]:
init_state = {"task": "Explain the theory of relativity using data from the provided CSV file:\n\nName,Age,Occupation,Salary\nAlice,30,Engineer,70000\nBob,25,Designer,50000\nDiana,28,Doctor,80000"}
result = graph1.invoke(init_state, config=config)

Planner generated subtasks: ["1. Use the read_csv_tool to analyze the provided CSV file and extract relevant data about individuals' ages and occupations, which can help illustrate points in explaining the theory of relativity.", '2. Research the theory of relativity using wiki_search and compile a concise explanation that incorporates insights from the CSV data.']
REACHED read_csv_tool
REACHED read_csv_tool
[tool_node] No valid subtask found, skipping tool execution.
Reached writer node
Reached Eval Node
Evaluation score: 8, feedback: score: 8/10  
Critique: The explanation of relativity is clear and well-structured, but the connection to the CSV data feels somewhat forced.  
Improvement suggestion: Strengthen the link between relativity and the data by incorporating a more direct analysis or visualization of how different scenarios could illustrate the concepts presented.


In [10]:
print(result["draft"])

The theory of relativity, proposed by Albert Einstein, encompasses two main concepts: special relativity and general relativity. Special relativity addresses the physics of objects moving at constant speeds, particularly close to the speed of light, and introduces the principle that the laws of physics are the same for all observers, regardless of their relative motion. It also highlights the interdependence of time and space, stating that time can dilate and lengths can contract based on an observer's speed.

General relativity extends these ideas to include gravity as a curvature of spacetime caused by mass. Essentially, massive objects like planets and stars warp the fabric of spacetime around them, affecting the motion of other objects.

To illustrate these concepts using the provided CSV data, let's consider the ages and occupations of individuals. For instance, Alice (30), Bob (25), and Diana (28), while different ages, all perceive time similarly in their daily lives as they go 