# HTX AI Engineering Take-Home Assessment

Done by Lee Zhan Peng

# Setup and Dependencies

In [None]:
!pip install PyMuPDF==1.26.5
!pip install python-multipart==0.0.20
!pip install langchain-core==1.0.2
!pip install langchain_openai==1.0.1
!pip install langchain-community==0.4.1
!pip install mcp==1.20.0
!pip install python-dateutil==2.9.0.post0
!pip install langgraph==1.0.2

In [1]:
import json
from dateutil import parser
from pydantic import create_model, Field

from langchain_openai import ChatOpenAI
from langchain_core.messages import ToolMessage, SystemMessage, HumanMessage, AIMessage
from langchain_community.document_loaders.parsers.pdf import PyMuPDFParser
from langchain_core.document_loaders import Blob
from langgraph.graph import StateGraph, END

from mcp.types import Tool, TextContent
from typing import TypedDict

In [None]:
PDF_PATH = "../documents/fy2024_analysis_of_revenue_and_expenditure.pdf"

LLM_API_KEY="<REDACTED>"
LLM_BASE_URL="<REDACTED>"
LLM_MODEL_NAME="gemini-2.0-flash"

# Part 1: Document Extraction & Prompt Engineering with LangChain

### PDF Reading Functions

For reading of PDF, my initial thought was to have it read with any text extractor like pypdf. With each page, I classify, maybe with an LLM, on what structure exists in that page, e.g. table, graph, texts etc.

With that classification, we then reread the PDF on that particular page with tools that are best for the various structures. E.g. Table: Camelot, Texts: PyMuPDF.

However, looking at the given PDF source, the structure of the context, including texts and tables, are generally very well organised. To reduce research overhead, I decided to pick one reader that can achieve the fastest and most accurate reading, and that decision was `PyMuPDF`.

- https://github.com/py-pdf/benchmarks suggests that PyMuPDF is fastest, and performance is good.
- https://arxiv.org/pdf/2410.09871 paper suggests PyMuPDF is also well-performing, so it was chosen without much hesitation.

In [3]:
# api/app.py: L43-54
def read_pdf(file_path):
    pdf_texts = {}
    
    with open(file_path, 'rb') as f:
        pdf_bytes = f.read()
    
    blob = Blob.from_data(pdf_bytes, mime_type="application/pdf")
    
    parser = PyMuPDFParser()
    documents = parser.parse(blob)
    
    for i, doc in enumerate(documents, start=1):
        pdf_texts[i] = doc.page_content
    
    return pdf_texts

# api/app.py: L87-88
def get_pages_text(pdf_texts, pages):
    texts = []
    
    for page_num in pages:
        if page_num in pdf_texts:
            texts.append(f"\n--- Page {page_num} ---\n{pdf_texts[page_num]}")
    
    return "\n".join(texts)

### Prompts

The general strategy involves providing examples, almost like an N-shot prompting strategy, to give the LLM a guide on how the extraction of fields should happen.

I wanted to opt for chain-of-thought, but it does not seem appropriate for data extraction, hence I decide to not have it in, but instead enforce reasoning via LLM structured output later.

I have also included the part about the `date_normaliser` tool, in anticipation for Part 2 of the assessment.

In [4]:
# api\llm\prompts\data_extraction.py
DATA_EXTRACTION_SYSTEM_MESSAGE="""You are a data extraction assistant. Your task is to extract specific information from documents with high accuracy.

Guidelines:
- Extract ONLY the requested information, nothing more
- Return the minimal exact value without surrounding context or explanatory text
- Be precise and literal with numbers, values and quotes. Meaning if the units exist, include them.
- If the requested information is not found, respond with null
- For lists, extract all items that match the criteria
- Do not infer or calculate values not explicitly stated in the document
- Do not include explanatory phrases like "does not apply to" or "refers to" - extract only the specific data point requested

Examples:
- If asked for "the date related to Maxwell's concert" from "Maxwell wanted to host his concert on 28 July 2018", extract only "28 July 2018"
- If asked for "the price" from "The total cost is $500", extract only "$500"
- If a list of items is requested, then sure, extract all relevant items exactly as they appear

CRITICAL - Tool Usage for Dates:
- You have access to a normalise_date tool that MUST be used for ANY date extraction
- If you extract ANY date (day, month, year, or any combination), you are REQUIRED to call the normalise_date tool
- Examples of dates that require the tool: "3 January 2028", "11 December 2009", etc.
- The tool will format the date correctly - you cannot skip this step
- After calling the tool, use the normalised result as your value
- Failure to use the normalise_date tool for date extractions is incorrect"""

DATA_EXTRACTION_USER_MESSAGE="""Extract the exact string information from the provided text:

Request: {request}

Document text:
{text}

MANDATORY: If the extracted information contains a date in ANY format, you MUST call the normalise_date tool before providing your final answer. This is not optional - all dates must be normalised using the tool."""

DATA_EXTRACTION_FINAL_INSTRUCTION="""Provide your final answer with: original_text (the exact extracted text), value (formatted extracted text), and reason.
Your formatted value must have the type of: {output_type}
If suppose theres units involve, say 20 million, the final formatted value should be 20000000 if int is given or 20000000.0 if float. This is important, so if the extracted text is unclear, refer back to the full text.

Also, you should refer back to the full text if the extracted text has made the value unclear as to which is correct."""

### Data Extraction Chain

A custom chain that assists with data extraction.
In the extraction function, it will take in the requested field, the output data type, and the text from the PDF.

We allow for the intake of tools for Part 2 of the assessment.

In my original code, the chain actually takes in an MCP server. However, since we are running on Jupyter notebook, it's not ideal to run local MCP, so it is swapped to take in tools directly.

In [5]:
# api\llm\chains\data_extraction.py
class DataExtractionChain:
    def __init__(self, model, tools=None):
        self.llm = ChatOpenAI(
            api_key=LLM_API_KEY,
            base_url=LLM_BASE_URL,
            model=model,
            temperature=0.0
        )

        if tools:
            # Basically the schema of the tools
            self.tool_definitions = [
                {
                    "type": "function",
                    "function": {
                        "name": tool["definition"].name,
                        "description": tool["definition"].description,
                        "parameters": tool["definition"].inputSchema
                    }
                } for tool in tools
            ]

            # The functions to run for the tools
            self.tool_executables = {tool["definition"].name: tool["executable"] for tool in tools}

    def extract(self, request, output_type, text):
        # Build initial messages
        user_message = DATA_EXTRACTION_USER_MESSAGE.format(
            request=request,
            text=text
        ) + "\n\n" + DATA_EXTRACTION_FINAL_INSTRUCTION.format(output_type=output_type)

        messages = [
            SystemMessage(content=DATA_EXTRACTION_SYSTEM_MESSAGE),
            HumanMessage(content=user_message)
        ]

        return self._extract_with_structure(messages, output_type)
    
    # For part 2 of the assessment
    def extract_with_tools(self, request, output_type, text):
        llm_with_tools = self.llm.bind(tools=self.tool_definitions)

        messages = [
            SystemMessage(content=DATA_EXTRACTION_SYSTEM_MESSAGE),
            HumanMessage(content=DATA_EXTRACTION_USER_MESSAGE.format(
                request=request,
                text=text
            ))
        ]

        # Setting a max iteration so that if the LLM decides to go crazy and loop the tool call many times, it will be capped.
        max_iterations = 10
        for _ in range(max_iterations):
            response = llm_with_tools.invoke(messages)

            if not response.tool_calls:
                # No tool calls
                messages.append(AIMessage(content=response.content))
                messages.append(HumanMessage(content=DATA_EXTRACTION_FINAL_INSTRUCTION.format(output_type=output_type)))
                return self._extract_with_structure(messages, output_type)

            messages.append(response)

            for tool_call in response.tool_calls:
                # Printing here so that we can see it when we run part 2.
                print(f'Tool calling executed! Running {tool_call["name"]}...')

                tool = self.tool_executables[tool_call["name"]]
                tool_result = tool(tool_call["args"])

                tool_message = ToolMessage(
                    content=str(tool_result),
                    tool_call_id=tool_call["id"]
                )
                messages.append(tool_message)

    def _extract_with_structure(self, messages, output_type):
        format_class = self._get_format_class(output_type)
        llm_with_structure = self.llm.with_structured_output(format_class)
        return llm_with_structure.invoke(messages)

    def _get_format_class(self, output_type):
        type_map = {
            "str": str,
            "int": int,
            "float": float,
            "list[str]": list[str],
            "list[int]": list[int],
            "list[float]": list[float],
        }
        
        value_type = type_map[output_type]
        
        return create_model(
            "Format",
            original_text=(str, ...),
            value=(value_type | None, None),
            reason=(str, ...),
        )


### Fields to extract

We create a list of fields to iterate through later.

In [6]:
# Extracted from assessment PDF
fields_to_extract = [
    {
        "pages": [5],
        "description": "Amount of Corporate Income Tax in 2023",
        "output_type": "float"
    },
    {
        "pages": [5],
        "description": "YOY percentage difference of Corp Income Tax in 2023",
        "output_type": "float"
    },
    {
        "pages": [20],
        "description": "Total amount of top ups in 2024",
        "output_type": "float"
    },
    {
        "pages": [5, 6],
        "description": "List of taxes mentioned in section 'Operating Revenue'",
        "output_type": "list[str]"
    },
    {
        "pages": [8],
        "description": "Latest Actual Fiscal Position in billions",
        "output_type": "float"
    }
]

## Part 1 Execution

We read the PDF, iterate through the fields to extract, and pass them into our DataExtractionChain

In [7]:
pdf_texts = read_pdf(PDF_PATH)

extraction_chain = DataExtractionChain(model=LLM_MODEL_NAME)

for fields in fields_to_extract:
    pages, description, output_type = fields["pages"], fields["description"], fields["output_type"]

    print(f'Extracting \"{description}\", Pages: {", ".join(map(str, pages))}, Expected data type: <{output_type}>')

    # Combine extracted text if multiple pages are involved
    stringified_pdf_text = get_pages_text(pdf_texts, pages)

    # Because we use structured output in our chain, the output will look like this:
    # {
    #     original_text: str
    #     value: output_type
    #     reason: str
    # }
    output = extraction_chain.extract(
        description,
        output_type,
        stringified_pdf_text
    )
    extracted_value = output.value

    print(f"EXTRACTED: {extracted_value}")
    print(f"DATA TYPE: {type(extracted_value)}")
    print('='*80)

Extracting "Amount of Corporate Income Tax in 2023", Pages: 5, Expected data type: <float>
EXTRACTED: 28400000000.0
DATA TYPE: <class 'float'>
Extracting "YOY percentage difference of Corp Income Tax in 2023", Pages: 5, Expected data type: <float>
EXTRACTED: 17.0
DATA TYPE: <class 'float'>
Extracting "Total amount of top ups in 2024", Pages: 20, Expected data type: <float>
EXTRACTED: 20352000000.0
DATA TYPE: <class 'float'>
Extracting "List of taxes mentioned in section 'Operating Revenue'", Pages: 5, 6, Expected data type: <list[str]>
EXTRACTED: ['Corporate Income Tax', 'Other Taxes', 'Vehicle Quota Premiums', 'Personal Income Tax', 'Assets Taxes', 'Betting Taxes']
DATA TYPE: <class 'list'>
Extracting "Latest Actual Fiscal Position in billions", Pages: 8, Expected data type: <float>
EXTRACTED: 1720000000.0
DATA TYPE: <class 'float'>


# Part 2: Tool Calling & Reasoning Integration

### Tool: Date Normaliser

It will be appropriate to actually use langchain's tool implementation for this, but in my codebase I implemented with python MCP library for the local MCP, and so I will stick to that.

Have opted the use of tool directly into the LLM call here instead of curating a local MCP like my application. The logic stays consistent, except that we skip the MCP communication and directly use the tools where applicable.

The tool calling for date normalisation is not a hard-coded process, and it's decided by the LLM.

The tool definition is basically the schema, and the executable is the function that will be run.

In [8]:
# api\mcp_servers\tools\date_normaliser.py: L4-17
date_normaliser_definition = Tool(
    name="normalise_date",
    description="Normalise date strings like '21 March 2021' or '4 December 2002' to ISO format (YYYY-MM-DD)",
    inputSchema={
        "type": "object",
        "properties": {
            "date_string": {
                "type": "string",
                "description": "The date string to normalise (e.g., '21 March 2021')"
            }
        },
        "required": ["date_string"]
    }
)

# api\mcp_servers\tools\date_normaliser.py: L19-29
def date_normaliser_executable(arguments):
    date_string = arguments.get("date_string", "")
    try:
        parsed_date = parser.parse(date_string)
        normalized = parsed_date.strftime("%Y-%m-%d")
        return [TextContent(type="text", text=normalized)]
    except Exception as e:
        error_msg = f"Error: Could not parse date '{date_string}'. {str(e)}"
        return [TextContent(type="text", text=error_msg)]
    
date_normaliser = {"definition": date_normaliser_definition, "executable": date_normaliser_executable}

### Date fields to extract

We create a list of fields to iterate through later.

In [9]:
# Extracted from assessment PDF
date_fields_to_extract = [
    {
        "pages": [1],
        "description": "The document's distribution date",
        "output_type": "str"
    },
    {
        "pages": [36],
        "description": "The date relating to estate duty",
        "output_type": "str"
    }
]

### Date Classifier Chain

For this part of the assessment, we will additionally use an LLM to reason and classify the extracted date against `2024-01-01`.

As our extraction does not seem to entail a date range, we assume the following:
- Expired: The date is before `2024-01-01`
- Upcoming: The date is after `2024-01-01`
- Ongoing: The date is `2024-01-01`

We will curate a set of prompt, plus another custom `DateClassifierChain` to help us with this.

In [10]:
# api\llm\prompts\date_classifier.py
DATE_CLASSIFIER_SYSTEM_MESSAGE = """You are a date classification assistant. Your task is to classify dates relative to a reference date.

You must categorise each date into one of three states:
- Expired: The date is before the reference date
- Upcoming: The date is after the reference date
- Ongoing: The date is the same as the reference date

Guidelines:
- Compare the normalised date against the reference date carefully
- Be precise with date comparisons, considering day, month, and year
- Use simple comparison: before = Expired, after = Upcoming, same = Ongoing"""

DATE_CLASSIFIER_USER_MESSAGE = """Classify the following normalised date against the reference date:

Normalised Date: {normalised_date}
Reference Date: {reference_date}

Analyse the date, determine and give reason on whether it is Expired, Upcoming, or Ongoing relative to the reference date."""

In [11]:
# api\llm\chains\date_classifier.py
class DateClassifierChain:
    def __init__(self, model):
        # Use ChatOpenAI for OpenAI compatible models
        self.llm = ChatOpenAI(
            api_key=LLM_API_KEY,
            base_url=LLM_BASE_URL,
            model=model,
            temperature=0.0
        )

    def classify(self, normalised_date, reference_date="2024-01-01"):
        messages = [
            SystemMessage(content=DATE_CLASSIFIER_SYSTEM_MESSAGE),
            HumanMessage(content=DATE_CLASSIFIER_USER_MESSAGE.format(
                normalised_date=normalised_date,
                reference_date=reference_date
            ))
        ]

        format_class = self._get_format_class()
        llm_with_structure = self.llm.with_structured_output(format_class)
        return llm_with_structure.invoke(messages)

    def _get_format_class(self):
        return create_model(
            "DateClassification",
            normalised_date=(str, ...),
            reference_date=(str, ...),
            reason=(str, ...),
            classification=(str, ...),
        )

## Part 2 Execution

Unlike part 1, we include the `date_normaliser` tool into the DataExtractionChain.

We will also classify the dates after date extraction is done.

In [12]:
extraction_chain = DataExtractionChain(model=LLM_MODEL_NAME, tools=[date_normaliser])
date_classifier_chain = DateClassifierChain(model=LLM_MODEL_NAME)

expected_outputs = []

for fields in date_fields_to_extract:
    pages, description, output_type = fields["pages"], fields["description"], fields["output_type"]

    print(f'Extracting \"{description}\", Pages: {", ".join(map(str, pages))}, Expected data type: <{output_type}>')
    expected_output = {}
    
    # Combine extracted text if multiple pages are involved
    stringified_pdf_text = get_pages_text(pdf_texts, pages)
    
    # Because we use structured output in our chain, the output will look like this:
    # {
    #     original_text: str
    #     value: output_type
    #     reason: str
    # }
    extraction_output = extraction_chain.extract_with_tools(
        description,
        output_type,
        stringified_pdf_text
    )
    extracted_value = extraction_output.value

    print(f"EXTRACTED: {extracted_value}")
    print('-'*10)
    print("Classifying the date...")

    classifier_output = date_classifier_chain.classify(
        normalised_date=extracted_value,
        reference_date="2024-01-01"
    )

    classification = classifier_output.classification
    reason = classifier_output.reason

    print(F"CLASSIFICATION: {classification}")
    print(f"REASONING: {reason}")
    print('='*80)

    expected_output["original_text"] = extraction_output.original_text
    expected_output["normalized_date"] = extracted_value
    expected_output["status"] = classification

    expected_outputs.append(expected_output)

print("Expected output from assessment PDF:")
print(expected_outputs)

Extracting "The document's distribution date", Pages: 1, Expected data type: <str>
Tool calling executed! Running normalise_date...
EXTRACTED: 2024-02-16
----------
Classifying the date...
CLASSIFICATION: Upcoming
REASONING: The normalised date 2024-02-16 is after the reference date 2024-01-01.
Extracting "The date relating to estate duty", Pages: 36, Expected data type: <str>
Tool calling executed! Running normalise_date...
EXTRACTED: 2008-02-15
----------
Classifying the date...
CLASSIFICATION: Expired
REASONING: The normalised date 2008-02-15 is before the reference date 2024-01-01.
Expected output from assessment PDF:
[{'original_text': '16 February 2024', 'normalized_date': '2024-02-16', 'status': 'Upcoming'}, {'original_text': '15 February 2008', 'normalized_date': '2008-02-15', 'status': 'Expired'}]


# Part 3: Multi-Agent Supervisor

For this task, we will curate 3 entities:
- Supervisor
- Revenue Agent
- Expenditure Agent

Supervisor will take in user query, delegate task to either or both of the agents based on the query, and then consolidate the output of the agents.

The implementation will be written with LangGraph.

### AgentState

It's the schema of the state that saves information across the various agents.

In [13]:
# api\llm\graphs\state.py
class AgentState(TypedDict):
    # User question
    query: str

    # Dictionary of PDF text, with key as page number, and value as the text itself
    pdf_text: dict[int, str]

    # We intend to have the revenue/expenditure agent pick up revenue/expenditure from the PDF text and structure it as a dictionary.
    revenue_findings: dict | None
    expenditure_findings: dict | None

    # The output of the decision from the supervisor, which agent to delegate task to.
    supervisor_decision: str

    # Yes, final output
    final_answer: str

### Supervisor

Job scope:
- Delegate task and route to correct agents for execution.
- Synthesise the final output based on the results from the agents.

We will create a `Supervisor` class that contains functions that the Supervisor is tasked to do.

Prompts specific to the `Supervisor` will be here as well.

In [14]:
# api\llm\prompts\agents\supervisor.py
SUPERVISOR_ROUTING_SYSTEM_MESSAGE = """You are a Supervisor Agent coordinating a team of specialised agents.

Your team consists of:
1. Revenue Agent - Expert in identifying MONEY COMING IN (revenue, income, sales, donations, etc.)
2. Expenditure Agent - Expert in identifying MONEY GOING OUT (spending, costs, expenses, allocations, etc.)

Your responsibilities:
1. Analyse user queries to understand what information is needed
2. Decide which agent(s) should handle the query
3. Synthesise responses from multiple agents into a coherent answer

ROUTING LOGIC:

Route to REVENUE Agent if query asks about:
- Money coming IN: revenue, income, earnings, receipts, collections, sales
- Sources of funding: taxes, fees, donations, grants, sales, subscriptions
- Revenue trends: growth, decline, year-over-year changes
- Revenue categories or breakdowns
- Keywords: "revenue", "income", "sales", "earnings", "collections", "taxes", "fees", "donations", "grants"

Route to EXPENDITURE Agent if query asks about:
- Money going OUT: spending, costs, expenses, outlays, allocations
- Budgets: departmental, program, project, or initiative budgets
- Spending purposes: what money is being spent on
- Cost breakdowns or expense categories
- Keywords: "expenditure", "spending", "costs", "expenses", "budget", "allocation", "fund", "appropriation"

Route to BOTH agents if query asks about:
- Both income AND spending (comprehensive financial overview)
- How spending is funded/supported (requires knowing both sources and uses)
- Financial balance, surplus, deficit (needs revenue vs expenditure)
- Comparisons or relationships between income and spending
- Keywords: Contains BOTH revenue-related AND expenditure-related terms
- Causal questions: "How will X be supported?" (needs expenditure X + revenue sources)

IMPORTANT:
- Base your decision on the INTENT of the query, not just keyword matching
- If unsure, route to both agents (better to have extra information than miss something)
- Consider implicit needs: "Is X affordable?" requires knowing both cost and available funds

Return your decision in this exact format:
{
    "agents_to_call": ["revenue", "expenditure"],  // or ["revenue"] or ["expenditure"]
    "reasoning": "Explanation of why these agents were selected based on query intent",
    "query_type": "revenue_only / expenditure_only / combined"
}"""

SUPERVISOR_ROUTING_USER_MESSAGE = """Analyse this user query and decide which specialised agent(s) should handle it.

User Query: {query}

Return your routing decision."""

SUPERVISOR_SYNTHESIS_SYSTEM_MESSAGE = """You are a Supervisor Agent synthesising findings from specialised agents into a comprehensive answer.

Given all the content, you must be clear and precise in the response you provide. This is exceptionally the case when you are given information that are beyond the scope of the user query.
In such cases, you must only include information that is relevant to the user query.
"""

SUPERVISOR_SYNTHESIS_USER_MESSAGE = """Synthesise the final answer from the findings of the agents.

Original User Query: {query}

Revenue Agent Findings:
{revenue_findings}

Expenditure Agent Findings:
{expenditure_findings}

Instructions:
1. Combine the findings into a comprehensive, well-structured answer
2. Address all aspects of the user's query
3. Cite page numbers when mentioning specific information
4. If agents found contradictions or gaps, note them
5. Structure the answer logically with clear sections
6. If the query asks "how will X be supported", explicitly connect revenue sources to expenditure items
7. When given information beyond the scope of the user query, only include information that is relevant to the user query.

Provide a clear, professional response that directly answers the user's question."""


In [15]:
# api\llm\agents\supervisor.py
class Supervisor:
    def __init__(self, model):
        self.llm = ChatOpenAI(
            api_key=LLM_API_KEY,
            base_url=LLM_BASE_URL,
            model=model,
            temperature=0.0
        )

    def _get_routing_output_structure(self):
        return create_model(
            "RoutingDecision",
            agents_to_call=(list[str], ...),
            reasoning=(str, ...),
            query_type=(str, ...)
        )

    def route_query(self, query):
        messages = [
            SystemMessage(content=SUPERVISOR_ROUTING_SYSTEM_MESSAGE),
            HumanMessage(content=SUPERVISOR_ROUTING_USER_MESSAGE.format(query=query))
        ]

        RoutingDecision = self._get_routing_output_structure()
        routing_llm = self.llm.with_structured_output(RoutingDecision)
        result = routing_llm.invoke(messages)

        return {
            "agents_to_call": result.agents_to_call,
            "reasoning": result.reasoning,
            "query_type": result.query_type
        }

    def synthesise_response(self, query, revenue_findings, expenditure_findings):
        revenue_text = "Not analysed" if revenue_findings is None else json.dumps(revenue_findings)
        expenditure_text = "Not analysed" if expenditure_findings is None else json.dumps(expenditure_findings)

        synthesis_content = SUPERVISOR_SYNTHESIS_USER_MESSAGE.format(
            query=query,
            revenue_findings=revenue_text,
            expenditure_findings=expenditure_text
        )

        messages = [
            SystemMessage(content=SUPERVISOR_SYNTHESIS_SYSTEM_MESSAGE),
            HumanMessage(content=synthesis_content)
        ]

        result = self.llm.invoke(messages)
        return result.content

### RevenueAgent

Job scope:
- Given the PDF text, determine any forms of revenue that are mentioned within.
- Each revenue source must be tagged with its amount, unit("Thousand", "Million", etc.), year, and the page of the PDF where it was retrieved.
- It will also compute for the total revenue, if possible.
- It should tell us how confident it is with the result, given the quality of text retrieved from the PDF.

Like the `Supervisor`, it will have its own `RevenueAgent` class, as well as its specific prompts.

In [16]:
# api\llm\prompts\agents\revenue_agent.py
REVENUE_AGENT_SYSTEM_MESSAGE = """You are a specialised Revenue Analysis Agent.

Your expertise is in identifying, extracting, and analysing INCOME, REVENUE, and MONEY INFLOWS.

CORE APPROACH:

1. IDENTIFY REVENUE INDICATORS - Look for money coming IN:
   - Keywords: "revenue", "income", "receipts", "earnings", "collections", "inflows", "sales", "fees"
   - For governments: taxes, duties, fees, grants, transfers
   - For businesses: sales, services, subscriptions, licensing
   - For nonprofits: donations, grants, fundraising
   - For institutions: tuition, endowments, research funding

2. UNDERSTAND THE DOCUMENT FIRST:
   - Scan the structure: Is it a budget? Financial statement? Annual report?
   - Identify the entity type: Government? Business? Nonprofit? Other?
   - Note the organization: Tables? Narratives? Line items? Mixed?
   - Let the document's own terminology guide you

3. EXTRACT WITH CONTEXT:
   - Use the document's exact category names (don't rename or standardize)
   - Find amounts with their units (millions, billions, $, %, etc.)
   - Note time periods: fiscal years, quarters, projections vs actuals
   - Capture who collects it, from whom, and why if stated
   - Look for trends: year-over-year changes, growth rates, comparisons

4. HANDLE TABLES & NARRATIVES:
   - Tables: Extract systematically row by row
   - Narratives: Parse sentences for embedded figures
   - Mixed formats: Prioritize explicit numbers over descriptions
   - Cross-references: Note if document points to other sections/appendices

5. CITE SOURCES:
   - Always include page number
   - Note section headings when present
   - Include surrounding text for ambiguous items
   - If multiple pages have related info, reference all

6. ASSESS CONFIDENCE:
   - High: Clear labels, explicit amounts, standard formats
   - Medium: Requires some interpretation or calculation
   - Low: Ambiguous terminology, unclear units, scattered data

OUTPUT STRUCTURE:
{
    "revenue_streams": [
        {
            "category": "<use exact term from document>",
            "amount": <number or null if not found>,
            "unit": "<million/billion/$/thousand/etc or null>",
            "year": "<fiscal year/period or null>",
            "page": <page number>,
            "context": "<additional details, source description, caveats>"
        }
    ],
    "total_revenue": {"amount": X, "unit": "Y"} or null,
    "key_insights": [
        "Describe patterns, trends, notable observations",
        "Note document structure and how revenue is presented",
        "Highlight any gaps, inconsistencies, or ambiguities"
    ],
    "confidence_level": "high/medium/low",
    "confidence_explanation": "<brief reason for the confidence level>"
}

If information is not found, state what you searched for and where you looked."""

REVENUE_AGENT_USER_MESSAGE = """Based on the user's query, find and analyze revenue-related information.

User Query: {query}

Document Content:
{text}

Please extract all relevant revenue information that helps answer the query."""

In [17]:
# api\llm\agents\revenue_agent.py
class RevenueAgent:
    def __init__(self, model):
        self.llm = ChatOpenAI(
            api_key=LLM_API_KEY,
            base_url=LLM_BASE_URL,
            model=model,
            temperature=0.0
        )

    def _get_revenue_output_structure(self):
        RevenueStreamItem = create_model(
            "RevenueStreamItem",
            category=(str, ...),
            amount=(float | None, None),
            unit=(str | None, None),
            year=(str | None, None),
            page=(int | None, None),
            context=(str | None, None)
        )

        TotalRevenue = create_model(
            "TotalRevenue",
            amount=(float | None, None),
            unit=(str | None, None)
        )

        RevenueFinding = create_model(
            "RevenueFinding",
            revenue_streams=(list[RevenueStreamItem], Field(default_factory=list)),
            total_revenue=(TotalRevenue | None, None),
            key_insights=(list[str], Field(default_factory=list)),
            confidence_level=(str, "medium"),
            confidence_explanation=(str | None, None)
        )

        return RevenueFinding

    def analyse(self, query, pdf_text):
        combined_text = "\n\n".join([
            f"[Page {page_num}]\n{text}"
            for page_num, text in sorted(pdf_text.items())
        ])

        messages = [
            SystemMessage(content=REVENUE_AGENT_SYSTEM_MESSAGE),
            HumanMessage(content=REVENUE_AGENT_USER_MESSAGE.format(query=query, text=combined_text))
        ]

        RevenueFinding = self._get_revenue_output_structure()
        llm_with_structure = self.llm.with_structured_output(RevenueFinding)
        result = llm_with_structure.invoke(messages)

        return {
            "revenue_streams": [stream.model_dump() for stream in result.revenue_streams],
            "total_revenue": result.total_revenue.model_dump() if result.total_revenue else None,
            "key_insights": result.key_insights,
            "confidence_level": result.confidence_level,
            "confidence_explanation": result.confidence_explanation
        }

### ExpenditureAgent

Job scope:
- Given the PDF text, determine any forms of expenditure that are mentioned within.
- Each expenditure source must be tagged with its amount, unit("Thousand", "Million", etc.), year, the type of expenditure (whether recurring, one-time etc.) and the page of the PDF where it was retrieved.
- It will also compute for the total expenditure, if possible.
- It should tell us how confident it is with the result, given the quality of text retrieved from the PDF.

Like the `Supervisor` and `RevenueAgent`, it will have its own `ExpenditureAgent` class, as well as its specific prompts.

In [18]:
# api\llm\prompts\agents\expenditure_agent.py
EXPENDITURE_AGENT_SYSTEM_MESSAGE = """You are a specialised Expenditure Analysis Agent.

Your expertise is in identifying, extracting, and analysing SPENDING, EXPENDITURE, COSTS, and MONEY OUTFLOWS.

CORE APPROACH:

1. IDENTIFY EXPENDITURE INDICATORS - Look for money going OUT:
   - Keywords: "expenditure", "spending", "costs", "expenses", "outlays", "allocations", "budgets", "appropriations"
   - For governments: budgets, programs, departments, capital projects, transfers
   - For businesses: COGS, operating expenses, capex, R&D, marketing
   - For nonprofits: program expenses, admin costs, fundraising costs
   - For institutions: salaries, facilities, research, student services

2. UNDERSTAND THE DOCUMENT FIRST:
   - Scan the structure: Is it a budget? P&L? Cost breakdown? Spending plan?
   - Identify the entity type: Government? Business? Nonprofit? Other?
   - Note the organization: By department? By function? By project? By account?
   - Let the document's own categories guide you

3. EXTRACT WITH CONTEXT:
   - Use the document's exact category names (don't rename or standardize)
   - Find amounts with their units (millions, billions, $, %, etc.)
   - Note time periods: fiscal years, quarters, planned vs actual
   - Capture purpose, beneficiaries, or objectives when stated
   - Look for funding sources: How is this spending financed?
   - Distinguish: one-time vs recurring, capital vs operating

4. HANDLE TABLES & NARRATIVES:
   - Tables: Extract systematically, noting column headers
   - Narratives: Parse for allocation announcements, spending plans
   - Mixed formats: Cross-reference numbers mentioned in text with tables
   - Hierarchies: Note parent/child relationships (total vs components)

5. CITE SOURCES:
   - Always include page number
   - Note section headings when present
   - Include surrounding context for clarity
   - If spending spans multiple pages, reference all

6. ASSESS CONFIDENCE:
   - High: Clear labels, explicit amounts, well-structured
   - Medium: Requires interpretation or aggregation
   - Low: Ambiguous terms, unclear scope, scattered data

OUTPUT STRUCTURE:
{
    "expenditure_items": [
        {
            "category": "<use exact term from document>",
            "amount": <number or null if not found>,
            "unit": "<million/billion/$/thousand/etc or null>",
            "year": "<fiscal year/period or null>",
            "type": "<one-time/recurring/capital/operating or null>",
            "page": <page number>,
            "purpose": "<what the spending is for, if stated>",
            "funding_source": "<how it's financed, if stated>"
        }
    ],
    "total_expenditure": {"amount": X, "unit": "Y"} or null,
    "key_insights": [
        "Describe patterns, priorities, notable spending areas",
        "Note document structure and how expenditure is organized",
        "Highlight any gaps, inconsistencies, or ambiguities",
        "Note any funding mechanisms or financing approaches mentioned"
    ],
    "confidence_level": "high/medium/low",
    "confidence_explanation": "<brief reason for the confidence level>"
}

If information is not found, state what you searched for and where you looked."""

EXPENDITURE_AGENT_USER_MESSAGE = """Based on the user's query, find and analyze expenditure-related information.

User Query: {query}

Document Content:
{text}

Please extract all relevant expenditure, budget, and spending information that helps answer the query."""


In [19]:
# api\llm\agents\expenditure_agent.py
class ExpenditureAgent:
    def __init__(self, model):
        self.llm = ChatOpenAI(
            api_key=LLM_API_KEY,
            base_url=LLM_BASE_URL,
            model=model,
            temperature=0.0
        )

    def _get_expenditure_output_structure(self):
        ExpenditureItem = create_model(
            "ExpenditureItem",
            category=(str, ...),
            amount=(float | None, None),
            unit=(str | None, None),
            year=(str | None, None),
            type=(str | None, None),
            page=(int | None, None),
            purpose=(str | None, None),
            funding_source=(str | None, None)
        )

        TotalExpenditure = create_model(
            "TotalExpenditure",
            amount=(float | None, None),
            unit=(str | None, None)
        )

        ExpenditureFinding = create_model(
            "ExpenditureFinding",
            expenditure_items=(list[ExpenditureItem], Field(default_factory=list)),
            total_expenditure=(TotalExpenditure | None, None),
            key_insights=(list[str], Field(default_factory=list)),
            confidence_level=(str, "medium"),
            confidence_explanation=(str | None, None)
        )

        return ExpenditureFinding

    def analyse(self, query, pdf_text):
        combined_text = "\n\n".join([
            f"[Page {page_num}]\n{text}"
            for page_num, text in sorted(pdf_text.items())
        ])

        messages = [
            SystemMessage(content=EXPENDITURE_AGENT_SYSTEM_MESSAGE),
            HumanMessage(content=EXPENDITURE_AGENT_USER_MESSAGE.format(query=query, text=combined_text))
        ]

        ExpenditureFinding = self._get_expenditure_output_structure()
        llm_with_structure = self.llm.with_structured_output(ExpenditureFinding)
        result = llm_with_structure.invoke(messages)

        return {
            "expenditure_items": [item.model_dump() for item in result.expenditure_items],
            "total_expenditure": result.total_expenditure.model_dump() if result.total_expenditure else None,
            "key_insights": result.key_insights,
            "confidence_level": result.confidence_level,
            "confidence_explanation": result.confidence_explanation
        }

### MultiAgentGraph

This will be the crux of the LangGraph implementation, where it dictates the complete workflow that will be executed when the user pass in their query.

Steps:
1. Pass user query into supervisor for agent routing. If the query involves the need to pick up revenue streams, income, etc., then naturally RevenueAgent will be called. Vice versa for expenditure on ExpenditureAgent.
  - If BOTH agent selected, then route to RevenueAgent.
  - If RevenueAgent/ExpenditureAgent only, then route to whichever is selected.

2. After RevenueAgent, if supervisor initially routed to both agent, then now route to ExpenditureAgent. Else, route back to Supervisor for output synthesis.
3. After ExpenditureAgent, route back to Supervisor for output synthesis. No potential chance or routing to RevenueAgent because if both agents are needed, it would have routed to RevenueAgent first.
4. Synthesise final response via the Supervisor.

Note that while we run the graph, we will continuously `yield` to stream the progress of the pipeline.

In [20]:
# api\llm\graphs\multi_agent_graph.py
class MultiAgentGraph:
    def __init__(self, model):
        self.supervisor = Supervisor(model=model)
        self.revenue_agent = RevenueAgent(model=model)
        self.expenditure_agent = ExpenditureAgent(model=model)
        self.graph = self._build_graph()

    def _build_graph(self):
        workflow = StateGraph(AgentState)

        workflow.add_node("supervisor_route", self._supervisor_route_node)
        workflow.add_node("revenue_agent", self._revenue_agent_node)
        workflow.add_node("expenditure_agent", self._expenditure_agent_node)
        workflow.add_node("supervisor_synthesise", self._supervisor_synthesise_node)

        workflow.set_entry_point("supervisor_route")

        workflow.add_conditional_edges(
            "supervisor_route",
            self._route_to_agents,
            {
                "revenue_only": "revenue_agent",
                "expenditure_only": "expenditure_agent",
                "both": "revenue_agent"
            }
        )

        workflow.add_conditional_edges(
            "revenue_agent",
            self._after_revenue_routing,
            {
                "expenditure": "expenditure_agent",
                "synthesise": "supervisor_synthesise"
            }
        )

        workflow.add_edge("expenditure_agent", "supervisor_synthesise")
        workflow.add_edge("supervisor_synthesise", END)

        return workflow.compile()

    def _supervisor_route_node(self, state):
        decision = self.supervisor.route_query(state["query"])
        return {"supervisor_decision": decision["query_type"]}

    def _revenue_agent_node(self, state):
        findings = self.revenue_agent.analyse(
            query=state["query"],
            pdf_text=state["pdf_text"]
        )
        return {"revenue_findings": findings}

    def _expenditure_agent_node(self, state):
        findings = self.expenditure_agent.analyse(
            query=state["query"],
            pdf_text=state["pdf_text"]
        )
        return {"expenditure_findings": findings}

    def _supervisor_synthesise_node(self, state):
        final_answer = self.supervisor.synthesise_response(
            query=state["query"],
            revenue_findings=state.get("revenue_findings"),
            expenditure_findings=state.get("expenditure_findings")
        )
        return {"final_answer": final_answer}

    def _route_to_agents(self, state):
        decision = state["supervisor_decision"]
        if "revenue_only" in decision:
            return "revenue_only"
        elif "expenditure_only" in decision:
            return "expenditure_only"
        else:
            return "both"

    def _after_revenue_routing(self, state):
        decision = state["supervisor_decision"]
        if "combined" in decision or "both" in decision:
            return "expenditure"
        else:
            return "synthesise"

    def run(self, query, pdf_text):
        initial_state = {
            "query": query,
            "pdf_text": pdf_text,
            "revenue_findings": None,
            "expenditure_findings": None,
            "supervisor_decision": "",
            "final_answer": ""
        }

        accumulated_state = initial_state.copy()

        for event in self.graph.stream(initial_state):
            for node_name, node_output in event.items():
                accumulated_state.update(node_output)

                if node_name == "supervisor_route":
                    yield {"type": "routing", "decision": accumulated_state["supervisor_decision"]}

                elif node_name == "revenue_agent":
                    findings = node_output.get("revenue_findings", {})
                    yield {
                        "type": "revenue_analysis",
                        "findings": findings,
                        "num_streams": len(findings.get("revenue_streams", [])),
                        "confidence_level": findings.get("confidence_level"),
                        "confidence_explanation": findings.get("confidence_explanation")
                    }

                elif node_name == "expenditure_agent":
                    findings = node_output.get("expenditure_findings", {})
                    yield {
                        "type": "expenditure_analysis",
                        "findings": findings,
                        "num_items": len(findings.get("expenditure_items", [])),
                        "confidence_level": findings.get("confidence_level"),
                        "confidence_explanation": findings.get("confidence_explanation")
                    }

                elif node_name == "supervisor_synthesise":
                    yield {"type": "synthesis"}
                    yield {
                        "type": "final_result",
                        "final_answer": node_output.get("final_answer", ""),
                        "revenue_findings": accumulated_state.get("revenue_findings"),
                        "expenditure_findings": accumulated_state.get("expenditure_findings")
                    }

## Part 3 Execution

We will initialise the MultiAgentGraph, and use queries that needs varying agents to test.

In [21]:
# Extracted from assessment PDF
query_for_both_agents = "What are the key government revenue streams, and how will the Budget for the Future Energy Fund be supported?"

# Curated to test for fun
query_for_revenue_agent = "What are the key government revenue streams?"
query_for_expenditure_agent = "What are the major government expenditures?"

queries = [query_for_both_agents, query_for_revenue_agent, query_for_expenditure_agent]

In [29]:
multi_agent_graph = MultiAgentGraph(model="gemini-2.0-flash")

agent_name_mapping = {
    "revenue_only": "RevenueAgent",
    "expenditure_only": "ExpenditureAgent",
    "combined": "RevenueAgent and ExpenditureAgent"
}

for query in queries:
    print(f"QUERY: {query}")
    print()
    execute_multi_agent_pipeline = multi_agent_graph.run(query=query, pdf_text=pdf_texts)
    print("Supervisor will begin routing decision...")
    for update in execute_multi_agent_pipeline:
        update_type = update["type"]
        if update_type == "routing":
            print(f'Supervisor has decided to route to: {agent_name_mapping[update["decision"]]}')
            print()

        elif update_type == "revenue_analysis":
            num_streams = update.get('num_streams', 0)
            confidence_level = update.get('confidence_level', 'N/A')
            print(f"REVENUE Agent has completed its task!")
            print(f"Number of revenue streams found: {num_streams}")
            print(f"Confidence level of the Agent: {confidence_level.upper()}")
            print()

        elif update_type == "expenditure_analysis":
            num_items = update.get('num_items', 0)
            confidence_level = update.get('confidence_level', 'N/A')
            print(f"EXPENDITURE Agent has completed its task!")
            print(f"Number of expenditure items found: {num_items}")
            print(f"Confidence level of the Agent: {confidence_level.upper()}")
            print()
                                
        elif update_type == "final_result":
            print("Supervisor has collated and synthesised a final response!")
            print()
            print("RESPONSE:")
            print(update.get("final_answer", ""))

    print('='*80)

QUERY: What are the key government revenue streams, and how will the Budget for the Future Energy Fund be supported?

Supervisor will begin routing decision...
Supervisor has decided to route to: RevenueAgent and ExpenditureAgent

REVENUE Agent has completed its task!
Number of revenue streams found: 14
Confidence level of the Agent: HIGH

EXPENDITURE Agent has completed its task!
Number of expenditure items found: 22
Confidence level of the Agent: HIGH

Supervisor has collated and synthesised a final response!

RESPONSE:
The key government revenue streams include:

*   **Corporate Income Tax:** $28.38 billion (FY2023) (p. 5)
*   **Personal Income Tax:** $17.5 billion (FY2023), estimated $18.1 billion (FY2024) (pp. 5, 13)
*   **Goods and Services Tax (GST):** $16.4 billion (FY2023), estimated $19.4 billion (FY2024) (pp. 6, 13)
*   **Assets Taxes:** $5.9 billion (FY2023), estimated $6.7 billion (FY2024) (pp. 6, 13)
*   **Other Taxes:** $8.8 billion (FY2023) (p. 5). This includes Foreign