In [32]:
from document_extractor import FinancialDocumentProcessor
from typing import TypedDict, Optional, Dict, Any

from langchain_docling import DoclingLoader
from langchain_docling.loader import ExportType
from langchain.output_parsers import ResponseSchema, StructuredOutputParser
from langchain_openai import OpenAIEmbeddings, OpenAI
from langchain.vectorstores import FAISS
from langchain_core.prompts import ChatPromptTemplate, PromptTemplate
from langchain.chains import create_retrieval_chain
from docling.datamodel.document import ConversionResult

import os
from dotenv import load_dotenv

load_dotenv()


True

In [31]:
class ResultEvaluator():
    def __init__(self, 
        model_name: str = "gpt-4o-mini",
        openai_api_key: Optional[str] = None,
        temperature: float = 0.0,
        max_tokens: int = 512):
        
        self.model_name = model_name
        self.api_key = openai_api_key or os.getenv("OPENAI_API_KEY")
        self.temperature = temperature
        self.max_tokens = max_tokens

        self.llm = OpenAI(
            api_key=self.api_key,
            model=self.model_name,
            temperature=self.temperature,
            max_tokens=self.max_tokens,
        )

    def evaluate_bs_item(self, bs_item):
        
        try:
            assert bs_item["total_assets"] >= bs_item["total_liabilities"], "Liabilities exceed assets"
            assert bs_item["investment_properties"] <= bs_item["total_assets"], "Investment properties exceed total assets"
            assert bs_item["total_debt"] <= bs_item["total_liabilities"], "Debt exceeds liabilities"
            assert bs_item["nta_per_unit"] > 0, "NTA per unit is non-positive"
            return {"verdict": "Correct", "justification": "All checks passed"}
 

        except AssertionError as e:
            return {"verdict": "Incorrect", "justification": str(e)}

    
    def evaluate_is_item(self, is_item):
        
        try:
            assert is_item["total_revenue"] >= 0, "Total assets cannot be negative"
            return {"verdict": "Correct", "justification": "All checks passed"}

        except AssertionError as e:
            return {"verdict": "Incorrect", "justification": str(e)}

    
    def evaluate(self, context, input:dict):
        system_prompt = """
        You are a rigorous auditor evaluating the output of a financial information extraction system.
        Return your verdict in the following format:
        ```json
        {
        "verdict": "Correct" or "Incorrect",
        "justification": "Short explanation of why the output was deemed correct or not."
        }

        """

        template = PromptTemplate.from_template(
            "{system_prompt}\n"
            "Context information is below.\n---------------------\n{context}\n---------------------\n"
            "Given the context information and not prior knowledge, answer the query.\n"
            "Query: Evaluate whether the extracted information in the json {input} is consistent with what is available in the context. Note that unit conversion may have occurred",
        )

        chain = template | self.llm
        input_dict = {'input': str(input),"system_prompt": system_prompt,"context": context}
        result = chain.invoke(input_dict)
        print("llm_output",result)
        # prompt = f"{system_prompt} . The BS item is {str(bs_item)} and the IS item is {str(is_item)}. Evaluate the extracted information and provide your verdict."

        return result
        

   

In [33]:
class ExtractorState(TypedDict):
    file_name: str
    # conversion_result = Optional[ConversionResult]
    bs_result: Optional[dict]
    is_result: Optional[dict]
    feedback: Optional[str]
    retries: int
    

In [None]:
from langchain_core.runnables import RunnableLambda

fdp = FinancialDocumentProcessor(data_dir="datasets/co_presentations")
evaluator = ResultEvaluator()

# Actor Node
def actor_node(state: ExtractorState) -> ExtractorState:
    print(f"Running actor on {state['file_name']}")

    fdp.ingest_document(state["file_name"])
    fdp.extract_tables()
    extracted_info = fdp.extract_information()

    return {
        **state,
        # "conversion_result":fdp.result,
        "bs_result": extracted_info[0],
        "is_result": extracted_info[1],
        "retries": state["retries"] + 1,
    }

# Critic Node
def critic_node(state: ExtractorState) -> ExtractorState:
    print("Critic evaluating...")
    
    bs_result = state["bs_result"]
    is_result = state["is_result"]
    # verdict = evaluator.evaluate_bs_item(state["bs_result"])["verdict"] and evaluator.evaluate_is_item(state["is_result"])["verdict"]
    verdict_bs = evaluator.evaluate(context = fdp.result.document.export_to_markdown(), input = bs_result)["verdict"]
    print("verdict_bs", verdict_bs)
    verdict_is = evaluator.evaluate(context = fdp.result.document.export_to_markdown(), input = is_result)["verdict"]
    print("verdict_is", verdict_is)
    
    verdict = "Correct" if verdict_bs == "Correct" and verdict_is == "Correct" else "Incorrect"

    if verdict == "Correct":
        print("Critic: All checks passed.")
    else:
        print("Critic: Some checks failed.")
    return {**state, "feedback": verdict}



actor_runnable = RunnableLambda(actor_node)
critic_runnable = RunnableLambda(critic_node)


In [40]:
from langgraph.graph import StateGraph, END
from langchain_core.runnables import RunnableLambda

graph = StateGraph(ExtractorState)

graph.add_node("actor", actor_runnable)
graph.add_node("critic", critic_runnable)

# Sequence: start at actor → critic
graph.set_entry_point("actor")
graph.add_edge("actor", "critic")

# Conditional: loop back if incorrect
graph.add_conditional_edges(
    "critic",
    lambda state: "actor" if state["feedback"] != "Correct" and state["retries"] < 3 else "__end__"
)

app = graph.compile()

In [None]:
initial_state = {
    "file_name": "CLW_HY25_IP.pdf",
    "retries": 0,
    "bs_result": None,
    "is_result": None,
    "feedback": None,
}

final_state = app.invoke(initial_state)
print(final_state)

Running actor on CLW_HY25_IP.pdf
Processing file: CLW_HY25_IP.pdf
