# Part 1

In [None]:
# Note that to run this notebook, the pdf should be in ./data
# Link is invalid as of 4 Nov 2025; this cell cannot run

#!wget -P ./data https://www.mof.gov.sg/docs/librariesprovider3/budget2024/download/pdf/fy2024_analysis_of_revenue_and_expenditure.pdf

.env in root to contain

```.env
GOOGLE_API_KEY="your-google-api-key"
LANGCHAIN_TRACING_V2="true"
LANGCHAIN_PROJECT="budget-2024-pipeline"
LANGCHAIN_API_KEY="your-langchain-api-key"
```

### Evaluation of PDF Extraction Libraries

When extracting data from the *FY2024 Analysis of Revenue and Expenditure*, the main goal is accurate recovery of fiscal tables and numerical information, not visual fidelity. Below is a concise evaluation of three libraries—`pdfplumber`, `PyMuPDF`, and `Unstructured`—followed by justification for selecting `pdfplumber`.

---

### pdfplumber

**Pros**  
- Exposes word-level and coordinate-level data, allowing controlled reconstruction of tables using positional logic.  
- Reliable for structured text and numeric extraction; text is rarely dropped or merged.  
- Fully deterministic and reproducible, ideal for analytical pipelines.  
- Lightweight, dependency-free, and easy to integrate with pandas or JSON-based workflows.  
- Offers visual debugging tools for verifying extraction boundaries.

**Cons**  
- Cannot fully preserve original table layout.  
- Slower on large PDFs and lacks built-in OCR support.  

---

### PyMuPDF (fitz)

**Pros**  
- Very fast and memory efficient.  
- Good for mixed content and extracting images or metadata.  
- Maintains reading order in simple, single-column layouts.

**Cons**  
- Treats text as large blocks, often merging table columns and misaligning data.  
- Requires manual heuristics to rebuild tables.  
- Less transparent control for structured data recovery.

---

### Unstructured

**Pros**  
- Uses heuristic and ML-based segmentation to divide documents into logical elements like titles, paragraphs, and tables.  
- Supports multiple file formats and integrates well with document-to-LLM pipelines.  
- Can include OCR and layout models for scanned documents.

**Cons**  
- Converts tables into plain text, losing numeric alignment.  
- High dependency overhead and slower performance.  
- Geared toward semantic understanding, not numerical precision.

---

### Justification for Choosing pdfplumber

`pdfplumber` is chosen not because it preserves layout perfectly, but because it offers consistent, coordinate-based extraction that can be empirically validated and programmatically controlled. In testing with the FY2024 budget PDF, it reliably captured all key fiscal values—operating revenue, tax categories, fund top-ups—without omission or distortion.  

Empirically, its output required less postprocessing to align figures with labels than PyMuPDF or Unstructured. Beyond empirical accuracy, it provides transparency, reproducibility, and fine-grained positional access, all critical for a data-centric workflow.  

Thus, even though it lacks native table layout reconstruction, `pdfplumber` remains the most practical and reliable tool for structured financial document analysis, balancing interpretability, precision, and minimal setup.


In [7]:
import warnings
warnings.filterwarnings("ignore")

import io
import yaml
import pdfplumber
from typing import List, Dict, Any
import json

from utils.call_gemini import GeminiAPIClient

from dotenv import load_dotenv
load_dotenv()

class PdfplumberLoader:
    """
    Extracts structured text and tables with pdfplumber.
    Falls back to Gemini OCR and table reconstruction when needed.
    """

    def __init__(self, pdf_path: str, ocr_threshold: int = 30):
        self.pdf_path = pdf_path
        self.ocr_threshold = ocr_threshold
        self.gemini = GeminiAPIClient()

    def load(self) -> Dict[str, Any]:
        structured = {"metadata": {"source": self.pdf_path}, "elements": []}
        ocr_pages: List[int] = []

        with pdfplumber.open(self.pdf_path) as pdf:
            for page_num, page in enumerate(pdf.pages, start=1):
                text = (page.extract_text() or "").strip()

                #  OCR fallback if text missing
                if len(text) < self.ocr_threshold:
                    img = page.to_image(resolution=300).original
                    buf = io.BytesIO()
                    img.save(buf, format="PNG")

                    ocr_resp = self.gemini.generate_content(
                        "Extract all visible text and numbers from this document image.",
                        image_bytes=buf.getvalue()
                    )
                    text = ocr_resp.text
                    ocr_pages.append(page_num)

                # Extract tables
                tables = page.extract_tables()
                table_md_blocks = []
                for t in tables or []:
                    if not t:
                        continue
                    try:
                        header, *rows = t
                        # convert manually to markdown if needed
                        if not any("|" in (cell or "") for row in rows for cell in row):
                            table_md = self._to_markdown(t)
                        else:
                            table_md = "\n".join(["|".join(r or "") for r in t])
                        # ensure valid markdown
                        if "|" not in table_md:
                            table_prompt = (
                                "Convert the following messy or OCRed text into a valid Markdown table "
                                "preserving numeric precision:\n\n" + table_md
                            )
                            table_md = self.gemini.generate_content(table_prompt).text
                        table_md_blocks.append(table_md)
                    except Exception as e:
                        print(f"[WARN] Table parsing failed on page {page_num}: {e}")
                        continue

                structured["elements"].append({
                    "page": page_num,
                    "content_markdown": text,
                })


        print(f"[INFO] Gemini OCR triggered on pages: {ocr_pages or 'None'}")
        return structured

    def _to_markdown(self, table: List[List[str]]) -> str:
        """Convert list-of-lists table to Markdown."""
        md = []
        for i, row in enumerate(table):
            md.append("| " + " | ".join(cell.strip() if cell else "" for cell in row) + " |")
            if i == 0:
                md.append("|" + "|".join(["---"] * len(row)) + "|")
        return "\n".join(md)
    
def main():
    # Load config file
    with open("config.yaml", "r", encoding="utf-8") as f:
        config = yaml.safe_load(f)

    pdf_fp = config.get("pdf_fp")
    if not pdf_fp:
        raise ValueError(" Missing 'pdf_fp' in config.yaml.")

    loader = PdfplumberLoader(pdf_path=pdf_fp)
    structured_output = loader.load()

    output_path = config["extracted_text_path"]
    
    with open(output_path, "w", encoding="utf-8") as f:
        json.dump(structured_output, f, ensure_ascii=False, indent=2)

    print(f"Extraction complete. Output saved to: {output_path}")


main()    

Cannot set gray non-stroke color because /'P115' is an invalid float value
Cannot set gray non-stroke color because /'P117' is an invalid float value


[INFO] Gemini OCR triggered on pages: None
Extraction complete. Output saved to: ./data/extracted_text.json


In [8]:
import json
with open("data/extracted_text.json", "r", encoding="utf-8") as f:
    data = json.load(f)
data

{'metadata': {'source': './data/fy2024_analysis_of_revenue_and_expenditure.pdf'},
 'elements': [{'page': 1,
   'content_markdown': 'ANALYSIS OF\nREVENUE AND\nEXPENDITURE\nFinancial Year 2024\nDistributed on Budget Day: 16 February 2024'},
  {'page': 2,
   'content_markdown': 'EXPLANATORY NOTES\nThis document summarises and provides relevant\nhighlights of the FY2024 Revenue and Expenditure\nEstimates presented to Parliament on 16 February 2024.\nMINISTRY OF FINANCE 2'},
  {'page': 3,
   'content_markdown': 'CONTENTS\nANALYSIS OF REVENUE AND EXPENDITURE ............... 4\n01 Update on Financial Year 2023 ........................................................................ 5\n1.1 Expected Overall Fiscal Position for FY2023 5\n1.2 Operating Revenue 5\n1.3 Total Expenditure 6\n1.4 Special Transfers 7\n1.5 Net Investment Returns Contribution 7\n1.6 Spending from Endowment Funds and Trust Funds 7\n1.7 Capitalisation of Nationally Significant Infrastructure and SINGA\nInterest Costs and L

## Part 1.2

Prompting Approach:

- A dedicated extraction prompt runs per page.

- Outputs are merged into a unified JSON file (extracted_field.json).

Rationale:

- Avoids context truncation in long documents.

- Enables parallel or incremental processing of sections (future-scalable for multi-threaded or distributed runs).

- Long documents often lose mid-section fidelity when fed as a single context window, especially if fields are sparsely located.

LLM:

- ChatGoogleGenerativeAI(model="gemini-2.5-flash") used for both free-form reasoning and structured schema extraction, and also due to its long context window.

- Specific schema needs to be provided to each call to enforce it, or gemini would not confirm to json structure, and this might require additional invokation.


In [9]:
import warnings
warnings.filterwarnings("ignore")

import os
import json
import yaml
from typing import Any, Dict, List
from dotenv import load_dotenv
from langchain_google_genai import ChatGoogleGenerativeAI
from pydantic import BaseModel


from dotenv import load_dotenv
load_dotenv()

from utils.prompts import FIELD_EXTRACTION_PROMPT
from utils.model import FinancialFields


class FieldExtractionChain:
    """
    Uses Gemini (LangChain wrapper) to extract structured key metrics
    from selected pages in a parsed Budget document.
    """

    def __init__(self, model: str = "gemini-2.5-flash"):
        self.model = ChatGoogleGenerativeAI(
            model=model,
            temperature=0,
            convert_system_message_to_human=True,
        ).with_structured_output(FinancialFields)

    def run(
        self,
        structured_text: Dict[str, Any],
        target_pages: List[int],
        prompt_template: str,
    ) -> Dict[str, Any]:
        """
        Iterate through selected pages, extract structured data per page,
        and merge into a single dictionary.
        """
        elements = structured_text.get("elements", [])
        results = {
            "corporate_income_tax_2024_billion": None,
            "corporate_income_tax_yoy_percent": None,
            "total_topups_2024_billion": None,
            "operating_revenue_taxes_list": [],
            "latest_actual_fiscal_position_billion": None,
        }
        for page in target_pages:
            page_text = "\n\n".join(
                el["content_markdown"] for el in elements if el["page"] == page
            ).strip()
            if not page_text:
                print(f"[INFO] Skipping empty page {page}")
                continue

            prompt = prompt_template.format(text_block=page_text)

            structured_resp = self.model.invoke(prompt)
            page_data = structured_resp.model_dump()
            results = self._merge_results(results, page_data)
            print(f"[INFO] Structured extraction successful for page {page}")

        return results

    def _merge_results(self, base: Dict[str, Any], update: Dict[str, Any]) -> Dict[str, Any]:
        """Merge structured page-level results."""
        for k, v in update.items():
            if v is None:
                continue
            if isinstance(v, list):
                base[k].extend([x for x in v if x not in base[k]])
            else:
                base[k] = v
        return base


def main():
    # Load config
    with open("config.yaml", "r", encoding="utf-8") as f:
        config = yaml.safe_load(f)

    structured_json_fp = config.get("extracted_text_path")
    target_pages = config.get("target_pages_part_1", [])
    gemini_model = config.get("gemini_model", "gemini-2.5-flash")

    if not structured_json_fp or not os.path.exists(structured_json_fp):
        raise FileNotFoundError("extracted_text_path not found in config.yaml or file missing.")

    with open(structured_json_fp, "r", encoding="utf-8") as f:
        structured_text = json.load(f)

    extractor = FieldExtractionChain(model=gemini_model)
    results = extractor.run(structured_text, target_pages, FIELD_EXTRACTION_PROMPT)

    # Save results to JSON
    output_fp = config["extracted_field_path"]
    with open(output_fp, "w", encoding="utf-8") as f:
        json.dump(results, f, ensure_ascii=False, indent=2)

    print(f"Field extraction complete. Results saved to: {output_fp}")

main()

[INFO] Structured extraction successful for page 5
[INFO] Structured extraction successful for page 6
[INFO] Structured extraction successful for page 8
[INFO] Structured extraction successful for page 20
Field extraction complete. Results saved to: ./data/extracted_field.json


In [12]:
## Show the Field extraction prompt
print(FIELD_EXTRACTION_PROMPT)

You are a financial document analysis assistant.
Think step by step.
1) If there are any tables, attempt to reconstruct from the text. The text is parsed from left to right.
2) Identify and extract the following fields. Perform calculation only if necessary.
3) Output only valid JSON.

Required fields:
{{
  "corporate_income_tax_2024_billion": float,
  "corporate_income_tax_yoy_percent": float,
  "total_topups_2024_billion": float,
  "operating_revenue_taxes_list": [string], 
  "latest_actual_fiscal_position_billion": float
}}

Guidelines:
- These are the field meaning:
    "corporate_income_tax_2024_billion" <Total sum of amount of Corporate Income Tax in 2024>
    "corporate_income_tax_yoy_percent": <YOY percentage difference of Corp Income Tax in 2024>
    "total_topups_2024_billion": <Total amount of top ups in 2024>
    "operating_revenue_taxes_list": <List of taxes mentioned in section “Operating Revenue>
    "latest_actual_fiscal_position_billion":<Latest Actual Fiscal Position 

In [10]:
import json
with open("data/extracted_field.json", "r", encoding="utf-8") as f:
    data = json.load(f)
data

{'corporate_income_tax_2024_billion': 28.38,
 'corporate_income_tax_yoy_percent': 23.0,
 'total_topups_2024_billion': 20.352,
 'operating_revenue_taxes_list': ['Corporate Income Tax',
  'Other Taxes',
  'Vehicle Quota Premiums',
  'Personal Income Tax',
  'Assets Taxes',
  'Betting Taxes',
  'Goods and Services Tax',
  'Foreign Worker Levy',
  'Water Conservation Tax',
  'Land Betterment Charge',
  'Annual Tonnage Tax',
  'Withholding Tax',
  'Customs, Excise and Carbon Taxes',
  'Motor Vehicle Taxes',
  'Stamp Duty'],
 'latest_actual_fiscal_position_billion': -3.57}

# Part 2

2.1 and 2.2 is executed in the same loop

## Step 2.1 – Normalization via Local MCP

A normalize_date tool is exposed locally using FastMCP, executed via subprocess.

- Ensures deterministic ISO date conversion independent of LLM parsing noise.

## Step 2.2 – Temporal Reasoning

- Prompt directs the LLM to reason over normalized dates relative to 2024-01-01.

- Enforces structured output with a Pydantic schema

Advantages:

- Deterministic, schema-safe outputs.

- Reasoning aligned to objective date reference ensures reproducibility.

In [14]:
import warnings
warnings.filterwarnings("ignore")

import os
import json
import yaml
from typing import List, Dict, Any

from dotenv import load_dotenv
from langchain_core.tools import tool
from langchain_google_genai import ChatGoogleGenerativeAI
from langgraph.prebuilt import create_react_agent

from utils.prompts import REASONING_NORMALIZED_DATE_PROMPT, NORMALIZED_DATE_AGENT_PROMPT
from utils.model import ExtractedTextModel, Part2AnswerSchema, ConfigModel
from mcp_client.mcp_client import MCPClient


class BudgetDatePipeline:
    def __init__(self, config: ConfigModel, extracted: ExtractedTextModel):
        self.config = config
        self.extracted = extracted
        
        # Initialize MCP client
        self.mcp_client = MCPClient("mcp_server/normalize_date_server.py")

        # Register tool
        @tool("normalize_date", return_direct=True)
        def normalize_date(date_string: str) -> str:
            """Normalize budget-style dates to ISO (YYYY-MM-DD)."""
            return self.mcp_client.call(date_string)

        # LLM setup
        self.model = ChatGoogleGenerativeAI(
            model="gemini-2.5-flash",
            temperature=0,
            convert_system_message_to_human=True,
        )

        self.strucutured_model = ChatGoogleGenerativeAI(
            model="gemini-2.5-flash",
            temperature=0,
            convert_system_message_to_human=True,
        ).with_structured_output(Part2AnswerSchema)

        
        self.agent = create_react_agent(model=self.model, tools=[normalize_date], prompt=NORMALIZED_DATE_AGENT_PROMPT)

    def process_pages(self) -> Dict[int, List[Dict[str, Any]]]:
        page_results = []

        for page in self.config.target_pages_part_2:
            page_elems = [e for e in self.extracted.elements if e["page"] == page]
            

            for elem in page_elems:
                text = elem.get("content_markdown", "")
                if not text.strip():
                    continue

                # Part 1
                norm_response = self.agent.invoke({"messages": [{"role": "user", "content": "Text: " + text}]})
                normalized_output = norm_response["messages"][-1].content
                # Part 2
                summary_response = self.strucutured_model.invoke(REASONING_NORMALIZED_DATE_PROMPT.format(normalized_output, text))
                
                page_results.append(summary_response.model_dump())
                print(summary_response.model_dump(), page_results)
        return page_results

# Load environment variables
load_dotenv()
if not os.getenv("GOOGLE_API_KEY"):
    raise EnvironmentError("Missing GOOGLE_API_KEY in .env")

# Load YAML config
with open("config.yaml", "r") as f:
    cfg_dict = yaml.safe_load(f)
config = ConfigModel(**cfg_dict)

# Load extracted JSON
with open(config.extracted_text_path, "r") as f:
    extracted_json = json.load(f)

extracted = ExtractedTextModel(**extracted_json)

# Initialize pipeline
pipeline = BudgetDatePipeline(config=config, extracted=extracted)

# Run pipeline 
results = pipeline.process_pages()
print(results)

# Save final combined output
os.makedirs(config.output_dir, exist_ok=True)
output_path = os.path.join(config.output_dir, "normalized_part2.json")

with open(output_path, "w", encoding="utf-8") as f:
    json.dump(results, f, indent=2, ensure_ascii=False)

print(f"\n Full normalization + summarization results saved to: {output_path}")

{'original_text': 'Distributed on Budget Day: 16 February 2024.', 'normalized_date': '2024-02-16', 'status': 'Upcoming'} [{'original_text': 'Distributed on Budget Day: 16 February 2024.', 'normalized_date': '2024-02-16', 'status': 'Upcoming'}]
{'original_text': 'Estate Duty does not apply to a person who dies after 15 February 2008.', 'normalized_date': '2008-02-15', 'status': 'Expired'} [{'original_text': 'Distributed on Budget Day: 16 February 2024.', 'normalized_date': '2024-02-16', 'status': 'Upcoming'}, {'original_text': 'Estate Duty does not apply to a person who dies after 15 February 2008.', 'normalized_date': '2008-02-15', 'status': 'Expired'}]
[{'original_text': 'Distributed on Budget Day: 16 February 2024.', 'normalized_date': '2024-02-16', 'status': 'Upcoming'}, {'original_text': 'Estate Duty does not apply to a person who dies after 15 February 2008.', 'normalized_date': '2008-02-15', 'status': 'Expired'}]

 Full normalization + summarization results saved to: ./data\norma

In [18]:
print(REASONING_NORMALIZED_DATE_PROMPT)

Given the normalized date {} and text below, Categorize the date as Expired, Ongoing, or Upcoming with respect to 2024-01-01.You must output the original text, the normalized date, and the status.Text: {}.


In [19]:
print(NORMALIZED_DATE_AGENT_PROMPT)

INSTRUCTIONS:
Given the text, normalize the date as ISO Format. Only the relevant date relating to document distribution date or to date of the estate duty.


# Part 3

The system employs a supervisor pattern implemented in LangGraph.

Agent Roles:

- Revenue Agent: Extracts and analyzes all government income sources (Corporate Tax, GST, Excise, etc.).

- Expenditure Agent: Interprets expenditure allocations and identifies funding mechanisms (e.g., Future Energy Fund, Infrastructure Support).

Supervisor Responsibilities:

- Parse user query.

- Route sub-queries to agents based on domain relevance.

- Aggregate responses into a cohesive answer.

## Comparison: Fan-in/Fan-out vs Supervisor Pattern

In this project, two multi-agent orchestration strategies were considered: **Fan-in/Fan-out** and the **Supervisor Pattern**. Both have distinct strengths and trade-offs that influence scalability, accuracy, and interpretability.

### Fan-in/Fan-out Pattern with Router as the Supervisor

![Fan-in/Fan-out Pattern](images/fan-in-out.png)

This approach decomposes a complex query into smaller sub-tasks through a router or decomposer. Each sub-task is dispatched to specialized agents that run in parallel. The results are then aggregated by an aggregator model to form the final answer.  

It is highly structured and deterministic, allowing parallel execution and efficient handling of well-defined problems. However, its rigidity can lead to missed context if the router fails to allocate subtasks correctly. The absence of a review loop also means that partial or incomplete results might go unnoticed. Fan-in/fan-out patterns are best suited for pipelines where sub-tasks are independent, and the decomposition logic is fixed (for example, extraction of multiple unrelated entities).

### Supervisor Pattern

![Supervisor Pattern](images/supervisor.png)

The supervisor pattern introduces a central coordinating agent that maintains reasoning control throughout the workflow. The supervisor dynamically decides which specialized agent to call, interprets intermediate responses, and iteratively refines the conversation until a complete answer is formed.  

This approach offers interpretability and flexibility, allowing dynamic re-prompting and oversight of agent performance. However, it can be less parallel and may risk missing simultaneous sub-tasks if not explicitly designed for concurrency. It is ideal for open-ended reasoning tasks where interpretive understanding and contextual synthesis are more important than raw throughput.

This is however more challenging to implement with Langgraph because of out the box, langgraph implemetnation of agent is simplistic and even with detailed context engineering, it is difficult to instruct the agent to address multiple parts of the question. There could be other design options, such as attaching a decompostion tool to the agent, but this proves unable to achieve good results either. It might be easier to implement with crewai, but langgraph is utilized (with customization) for ease of telementry.


### Chosen Approach

This project adopts the **Supervisor Pattern**. The reasoning tasks—identifying revenue streams and explaining how the Future Energy Fund will be supported—require interpretive synthesis across multiple fiscal dimensions. The supervisor model provides traceable, iterative reasoning, ensuring that both revenue and expenditure perspectives are considered before producing the final comprehensive answer.

The supervisor is a custom-made agent with the following logic:

- It takes in all the previous messages and states, determining the next step, including which agent to call.
- The next step is “finish” if the terminal condition is met. The terminal condition is determined by the LLM — either by fully answering the question and calling all tools, or by reaching `max_loop`.
- Its main job is routing, and at no point in time is it allowed to answer the question directly.

### How is max_loop chosen?

- Business Logic: There's only 2 agents here, at most you call the same agent 2-3 times.
- Rate Limits: Max loop chosen to accomodate rate limits and manage costs.
- Empirical Performance: Choose the lowest max_loop to avoid drop in performance during testing.

### Revenue and Expenditure Agents

There are also multiple approaches for the agent:
- Including the full document in the context
- Include only the extracted fields from the previous step into the context
- Perform a search for the pages which contain either revenue or expenditure information.

The most thorough method is a search. However, there are many challenges, since this task is not a ranking task but an extractive summmarization, that means that you have to go through all the parts of the documents to extract all relevant parts. Therefore it is difficult to utilize fuzzy or embeddings search as this would return a ranked list (a less accurate threshold can be utilized). 

Therefore, keyword search is utilized (case insensitive) using regex, with the agent prompted to try different keywords. However, this can be further improved upon (hybrid search, stemming and lemminization, training a small classifier etc).


## Observability

Langsmith is used for tracing LLM outputs. It connects to langchain cloud (or alternatively langsmith deployed on native cluster).

There are other options like opentelementry for local tracing.

In [None]:
import warnings
warnings.filterwarnings("ignore")

import os
import json
import yaml
import argparse
from typing import List, Dict, Any

from langgraph.graph import StateGraph, START, END
from langgraph.types import Command
from langchain_core.tools import tool
from langchain.agents import create_agent
from langchain_google_genai import ChatGoogleGenerativeAI
from langsmith import traceable

from utils.model import RevenueOutput, ExpenditureOutput, FinalAnswer, Router, BudgetState, Part3ConfigModel
from utils.prompts import REVENUE_AGENT_PROMPT, EXPENDITURE_AGENT_PROMPT, SUPERVISOR_SYSTEM_PROMPT, REVIEWER_SYSTEM_PROMPT, ROUTER_PROMPT
from mcp_client.mcp_client import MCPClient

from langsmith import traceable

from dotenv import load_dotenv
load_dotenv()

def load_config(path: str = "config.yaml") -> Part3ConfigModel:
    if not os.path.exists(path):
        raise FileNotFoundError(f"Configuration file not found: {path}")
    with open(path, "r", encoding="utf-8") as f:
        cfg = yaml.safe_load(f)
    return Part3ConfigModel(**cfg)


class BudgetSupervisorPipeline:
    def __init__(self, config: Part3ConfigModel):
        self.config = config
        self.llm = ChatGoogleGenerativeAI(
            model=self.config.model_name,
            temperature=0,
        )

        self.RevenueParser = self.llm.with_structured_output(RevenueOutput)
        self.ExpenditureParser = self.llm.with_structured_output(ExpenditureOutput)
        self.Reviewer = self.llm.with_structured_output(FinalAnswer)

        self.mcp_client = MCPClient(server_path="mcp_server/search_budget_server.py")
        self._init_tools()
        self._init_agents()
        self._init_graph()

        


    def _init_tools(self):
        mcp_ref = self.mcp_client
        json_path = self.config.extracted_text_path

        @tool("search_budget_text", return_direct=True)
        def search_budget_text(keyword: str) -> List[Dict[str, Any]]:
            """Call the MCP budget text search server to find text containing the keyword."""
            # Send both keyword and file path to the MCP server
            return mcp_ref.call(
                "search_budget_text",
                arguments={"keyword": keyword, "structured_json_path": json_path},
            )

        self.search_budget_text = search_budget_text

    def _init_agents(self):
        self.RevenueAgent = create_agent(
            model=self.llm,
            tools=[self.search_budget_text],
            system_prompt=REVENUE_AGENT_PROMPT,
        )

        self.ExpenditureAgent = create_agent(
            model=self.llm,
            tools=[self.search_budget_text],
            system_prompt=EXPENDITURE_AGENT_PROMPT,
        )

    @traceable(name="SupervisorNode")
    def supervisor_node(self, state: Dict[str, Any]) -> Command:
        user_query = state.get("query", "")
        loop_count = state.get("loop_count", 0)
        last_node = state.get("last_node")
        revenue = state.get("revenue", "")
        expenditure = state.get("expenditure", "")
        cur_reasoning = state.get("cur_reasoning", "")

        print(f"\n[Supervisor Loop {loop_count}] - Deciding next worker...")
        print(f"last_node={last_node}")

        if loop_count >= self.config.max_loop:
            print("Max loop count reached. Ending process.")
            goto = "FINISH"
            reasoning = "Stopped after maximum allowed loops."
        else:
            members_dict = {
                "revenue_node": "Handles revenue/tax/income-related queries.",
                "expenditure_node": "Handles expenditure/fund/budget-related queries.",
            }
            worker_info = "\n\n".join(
                [f"WORKER: {k}\nDESCRIPTION: {v}" for k, v in members_dict.items()]
            ) + "\n\nWORKER: FINISH\nDESCRIPTION: Stop when query fully answered."

            system_prompt = SUPERVISOR_SYSTEM_PROMPT.format(worker_info= worker_info)

            router_llm = self.llm.with_structured_output(Router)
            messages = [
                {"role": "system", "content": system_prompt},
                {
                    "role": "user",
                    "content": ROUTER_PROMPT.format(
                        user_query= user_query,
                        last_node = last_node,
                        loop_count=loop_count,
                        cur_reasoning= cur_reasoning,
                        revenue= revenue or '<empty>',
                        expenditure=expenditure  or '<empty>'
                    ),
                },
            ]
            response = router_llm.invoke(messages)
            goto = response["next"]
            reasoning = response["reasoning"]

        print(f"Supervisor routed to: {goto}")
        print(f"Reasoning: {reasoning}")

        if last_node == goto and goto != "FINISH":
            print("Same route repeated → forcing FINISH.")
            goto = "FINISH"
            reasoning += " (Stopped because same node repeated.)"

        if goto == "FINISH":
            combined = self.Reviewer.invoke(REVIEWER_SYSTEM_PROMPT.format(revenue=revenue, expenditure= expenditure, user_query= user_query))
            print("Supervisor completed summary.\n")
            return Command(
                goto=END,
                update={"final_output": combined, "cur_reasoning": reasoning},
            )

        return Command(
            goto=goto,
            update={
                "query": user_query,
                "cur_reasoning": reasoning,
                "loop_count": loop_count + 1,
                "last_node": goto,
            },
        )

    @traceable(name="RevenueAgentNode")
    def node_revenue(self, state: Dict[str, Any]) -> Dict[str, Any]:
        print("Running Revenue Agent...")
        resp = self.RevenueAgent.invoke({"messages": [{"role": "user", "content": "Past actions: " + state["query"]}]})
        raw = resp["messages"][-1].content
        try:
            structured = self.RevenueParser.invoke(
                f"Convert to JSON with field 'revenue_streams': {raw}"
            )
            revenue_value = getattr(structured, "revenue_streams", raw)
        except Exception as e:
            print(f"RevenueParser failed: {e}")
            revenue_value = raw
        return {"revenue": revenue_value, "expenditure": state.get("expenditure"),  "Past actions: " +  "query": state["query"]}

    @traceable(name="ExpenditureAgentNode")
    def node_expenditure(self, state: Dict[str, Any]) -> Dict[str, Any]:
        print("Running Expenditure Agent...")
        resp = self.ExpenditureAgent.invoke({"messages": [{"role": "user", "content": "Past actions: " + state["query"]}]})
        raw = resp["messages"][-1].content
        try:
            structured = self.ExpenditureParser.invoke(
                f"Convert to JSON with field 'expenditure_streams': {raw}"
            )
            expenditure_value = getattr(structured, "expenditure_streams", raw)
        except Exception as e:
            print(f"ExpenditureParser failed: {e}")
            expenditure_value = raw
        return {"expenditure": expenditure_value, "revenue": state.get("revenue"), "query": "Past actions: " + state["query"]}

    def _init_graph(self):
        graph = StateGraph(BudgetState)
        graph.add_node("supervisor", self.supervisor_node)
        graph.add_node("revenue_node", self.node_revenue)
        graph.add_node("expenditure_node", self.node_expenditure)
        graph.add_edge(START, "supervisor")
        graph.add_edge("revenue_node", "supervisor")
        graph.add_edge("expenditure_node", "supervisor")
        graph.add_edge("supervisor", END)
        self.app = graph.compile()

    def run(self, user_query: str):
        print("\nSTARTING GRAPH EXECUTION\n")
        result = self.app.invoke({"query": user_query, "loop_count": 0, "last_node": None})        
        return result["final_output"].model_dump()


In [None]:
print(REVENUE_AGENT_PROMPT)

You are the Revenue Agent, search strictly only for government revenue information..
You will be provided a context of what the user query is and the searches which were done so far, but you must continue solely with the search for expenditure.Use `search_budget_text` to find information about revenue, taxes, NIRC, or income.
Search using keywords and synonyms like revenue, income, GST, etc., until you get the answer.
Strictly only search using ONE keyword at a time.Try minimum 5 different keywords, but only ONE keyword at a time.Summarize key government revenue sources and their values.



In [22]:
print(EXPENDITURE_AGENT_PROMPT)

Try minimum 5 different keywords, but only ONE keyword at a time.You are the Expenditure Agent, search strictly only for government expenditure information.
You will be provided a context of what the user query is and the searches which were done so far, but you must continue solely with the search for revenue.Use `search_budget_text` to find information about government expenditures, spending or budgets.
Search using keywords and synonyms like expenditure, spending, fund, or allocation.
Strictly only search using ONE keyword at a time.Try minimum 5 different keywords, but only ONE keyword at a time.Summarize fund allocations and how they are supported.



In [23]:
print(SUPERVISOR_SYSTEM_PROMPT)

You are a SUPERVISOR managing specialized government budget agents.
{worker_info}

Choose which worker should act next. Each worker performs a task and returns results.
When you believe the query is fully answered, route to FINISH.


In [24]:
print(REVIEWER_SYSTEM_PROMPT)


Directly answer the query concisely using revenue and expenditure information:

REVENUE:
{revenue}

EXPENDITURE:
{expenditure}

QUERY:
{user_query}



In [25]:
print(ROUTER_PROMPT)

USER QUERY:
{user_query}

- Last node executed: {last_node}
- Current loop count: {loop_count}
- Current reasoning: {cur_reasoning}
- Revenue findings:
{revenue}
- Expenditure findings:
{expenditure}
Decide which worker should act next. Return JSON.


** Note to tracing and telementry is utilizing Langsmith cloud, but local tracing option could be implemented **

![Langsmith Tracing](images/langsmith.png)


In [16]:

config = load_config("config.yaml")
pipeline = BudgetSupervisorPipeline(config)


In [31]:
query = "What are the key government revenue streams, and how will the Budget for the Future Energy Fund be supported?"
result = pipeline.run(query)
print("Final Result: ", result["direct_answer"])


STARTING GRAPH EXECUTION


[Supervisor Loop 0] - Deciding next worker...
last_node=None
Supervisor routed to: revenue_node
Reasoning: The user is asking about key government revenue streams, which is handled by the revenue_node. After this, the expenditure_node will be needed to address the question about the Future Energy Fund.
Running Revenue Agent...

[Supervisor Loop 1] - Deciding next worker...
last_node=revenue_node
Supervisor routed to: expenditure_node
Reasoning: The user is asking about how the Budget for the Future Energy Fund will be supported, which is an expenditure-related query. The revenue streams have already been addressed by the previous node.
Running Expenditure Agent...

[Supervisor Loop 2] - Deciding next worker...
last_node=expenditure_node
Supervisor routed to: FINISH
Reasoning: Both parts of the user query have been addressed. The revenue streams have been identified, and the support for the Future Energy Fund has been found in the expenditure findings. The ne

In [26]:
query = "What are the key government key expenditure and income streams?"
result = pipeline.run(query)
print("Final Result: ", result["direct_answer"])


STARTING GRAPH EXECUTION


[Supervisor Loop 0] - Deciding next worker...
last_node=None
Supervisor routed to: expenditure_node
Reasoning: The user is asking about both expenditure and income streams. I will start by addressing the expenditure part of the query using the expenditure_node.
Running Expenditure Agent...

[Supervisor Loop 1] - Deciding next worker...
last_node=expenditure_node
Supervisor routed to: FINISH
Reasoning: The user asked for both expenditure and income streams. The previous execution of the expenditure_node provided information on both operating revenue (income streams) and total expenditure (expenditure streams), thus fully answering the query.
Supervisor completed summary.

Final Result:  The key government income streams are Corporate Income Tax, Personal Income Tax, and Goods and Services Tax. The key government expenditure streams are Operating Expenditure and Development Expenditure.


In [27]:
query = "What are the key government main operating revenue streams?"
result = pipeline.run(query)
print("Final Result: ", result["direct_answer"])


STARTING GRAPH EXECUTION


[Supervisor Loop 0] - Deciding next worker...
last_node=None
Supervisor routed to: revenue_node
Reasoning: The user is asking about government revenue streams, which is directly handled by the revenue_node.
Running Revenue Agent...

[Supervisor Loop 1] - Deciding next worker...
last_node=revenue_node
Supervisor routed to: FINISH
Reasoning: The revenue_node has provided a comprehensive list and breakdown of the government's main operating revenue streams for FY2023 and FY2024, as well as a definition highlighting the main components. The user's query is fully answered.
Supervisor completed summary.

Final Result:  The key government main operating revenue streams are Corporate Income Tax, Personal Income Tax, and Goods and Services Tax.
