In [1]:
from backend.src.services.tax_deep_research_service import deep_research_service
import pytest
import os
from dotenv import load_dotenv
load_dotenv()

True

## Basic Test

In [2]:
class DummyUploadFile:
    def __init__(self, content: str, filename: str = "dummy.pdf"):
        self.filename = filename
        self.content_type = "application/pdf"
        self._path = content

    async def read(self):
        with open(self._path, "rb") as f:
            return f.read()

In [3]:
pdf = "../data/sample_returns/dummy3.pdf"
question = "What are steps that I can take to improve this tax return?"
llm_model = "gpt-4o"

file = DummyUploadFile(content=pdf)

In [4]:
try:
    result = await deep_research_service(files=[file], user_question=question, llm_model=llm_model)
    assert result is not None
    print(f"(DEBUG):\n {result["chain_of_thought"]}\n")
    print(f"FINAL RESULT:\n{result["llm_response"]}")

except Exception as e:
    pytest.fail(f"process_upload raised an error: {e}")

25
17
['Assess the possibility of itemizing deductions versus taking the standard deduction for greater tax savings.', 'Check for any unclaimed credits such as the Child and Dependent Care Credit or Earned Income Tax Credit.', 'Analyze potential for maximizing retirement contributions to reduce taxable income.', 'Evaluate eligibility for education credits or deductions based on any tuition or education-related expenses.', 'Review Schedule C for potential business deductions and ensure all eligible expenses are claimed.', 'Net Investment Income Tax Mitigation: Review Form 8960 to identify strategies to reduce the impact of the Net Investment Income Tax, such as income deferral or reallocation.', 'Additional Medicare Tax Verification: Confirm the accuracy of the Additional Medicare Tax calculation on Form 8959 and explore any potential adjustments.', 'Qualified Business Income Deduction Review: Analyze Form 8995-A to ensure the maximum deduction is claimed and check for any aggregation o

## Parse Testing

### Document Extracting

In [None]:
class DummyUploadFile:
    def __init__(self, content: str, filename: str = "dummy.pdf"):
        self.filename = filename
        self.content_type = "application/pdf"
        self._path = content

    async def read(self):
        with open(self._path, "rb") as f:
            return f.read()

pdf = "../data/sample_returns/dummy3.pdf"
question = "What are steps that I can take to improve this tax return?"
# This needs to be done when sending only paths, not needed for frontend
file = DummyUploadFile(content=pdf)
files = [file]

##### Extracting Payloads and Raw Text

In [None]:
from backend.src.controller.document_tools.document_embedder import DocumentEmbedder
from backend.src.controller.document_tools.document_parser import DocumentParser
from langchain_openai import OpenAIEmbeddings
from typing import TypedDict, List, Dict, Any
from langchain_openai import ChatOpenAI
import os
from dotenv import load_dotenv
import re

load_dotenv()

llm_model = "gpt-4o"
model_key = os.getenv("OPENAI_API_KEY")
llm = ChatOpenAI(model=llm_model, temperature=0, api_key=model_key)

In [None]:
# Extract the text and b64 images from the files for context.
# Also grab Documents used for Vector Embedding for RAG later on
docs, payloads = [], []
if files:
    parser = DocumentParser(max_page_mb=5, dpi_hint=180)
    for upfile in files:
        up_docs, up_payload, up_text = await parser.process_file(upfile)
        docs.extend(up_docs)
        payloads.extend(up_payload)

In [None]:
# A look at what a payload looks like
first = payloads[0]
for key, val in first.items():
    if isinstance(val, dict) and "url" in val:
        # strip off the "data:image/jpeg;base64," prefix
        b64 = val["url"].split(",", 1)[1]
        print(f"{key!r}: dict with URL → base64 length = {len(b64)}")
    else:
        print(f"{key!r}: {type(val).__name__}, length = {len(val)}")

In [None]:
# Compute lengths of each image's base64 payload
b64_sizes = [
    len(p["image_url"]["url"].split(",", 1)[1])
    for p in payloads
]

# However many characters divided by 1,000,000 gives us MB being sent to OpenAI, cant do over 20MB
print(f"Example image payload size: {b64_sizes[0]} characters")
print(f"Max image payload size:     {max(b64_sizes)} characters")
print(f"Total image payload size:   {sum(b64_sizes)} characters")

##### Creating Subtasks

In [None]:
from pydantic import BaseModel, Field

class PlannerOutput(BaseModel):
    """Structure for planner’s JSON output"""
    subtasks: List[str] = Field(description="A list of distinct, concise subtasks")

def load_scenarios(scenarios_file: str) -> List[str]:
    with open(scenarios_file, 'r', encoding='utf-8') as file:
        content = file.read()
    pattern = r'^\d+\.\s*\*\*(.*?)\*\*\s*:\s*(.*?)(?=\n\d+\.|\Z)'
    matches = re.findall(pattern, content, re.MULTILINE | re.DOTALL)
    scenarios = []
    for title, desc in matches:
        # collapse newlines, trim whitespace
        desc = ' '.join(desc.split())
        scenarios.append(f"{title.strip()}: {desc}")
    return scenarios


In [None]:
conversation_history = []
number_of_tasks = 5
llm_model = "gpt-4o"

try :
    scenarios = load_scenarios("./../data/scenarios.md")
except Exception as e:
    print(f"Failed to load scenarios: {e}")
    scenarios = []

planner_prompt = f"""You are an expert CPA planning assistant.
    The user has provided a PDF document with tax information and images that has been sent to you.
    The user's main question is: "{question}".
    Conversation so far (if relevant):
    {conversation_history}

    Based on these inputs, break the user's question into around {number_of_tasks} actionable sub-tasks or key points to investigate.
    Each sub-task should be concise, focusing on a specific aspect of the question or problem.
    You have the user's documents indexed in a vectorstore.
    Plan subtasks that would require retrieving relevant chunks from that vectorstore or searching the online web.
    Only list the sub-tasks, with no extra explanation.
    Example subtasks: {[scenarios]}
    """

##### Feeding Batches of Document (Preffered)

In [None]:
def chunk_by_b64(payloads, max_chars=20*1_048_576):
    batches, cur, total = [], [], 0
    for p in payloads:
        b64 = p["image_url"]["url"].split(",",1)[1]
        L = len(b64)
        if cur and total + L > max_chars:
            batches.append(cur)
            cur, total = [], 0
        cur.append(p); total += L
    if cur: batches.append(cur)
    return batches

In [None]:
MAX_B64 = 5 * 1_048_576 #5 MB
batches = chunk_by_b64(payloads, MAX_B64)
collected = []

structured_planner = llm.with_structured_output(PlannerOutput)

all_subtasks: List[str] = []
for batch in batches:
    content = [{"type":"text","text":planner_prompt}] + batch
    result: PlannerOutput = structured_planner.invoke(input=[{"role": "user", "content": content}])
    print(result)
    all_subtasks.extend(result.subtasks)

generated_subtasks = all_subtasks

In [None]:
generated_subtasks

#### Refining the Plan

In [None]:
from sklearn.metrics.pairwise import cosine_similarity

embedder = OpenAIEmbeddings(model="text-embedding-3-small")

def tax_return_doc_search(query: str) -> str:
    """Search the embedded tax document for relevant information."""
    if vectorstore is None:
        return "No document vectorstore available for search."

    results = vectorstore.similarity_search(query, k=3)
    return "\n\n".join([doc.page_content for doc in results])

def deduplicate_subtasks(subtasks: List[str], threshold: float = 0.8) -> List[str]:
    # embed the names of each subtask
    embeddings = embedder.embed_documents(subtasks)
    kept_texts = []
    kept_embeds: List[List[float]] = []
    for text, emb in zip(reversed(subtasks), reversed(embeddings)):
        if not kept_embeds:
            kept_texts.append(text)
            kept_embeds.append(emb)
            continue
        sims = cosine_similarity([emb], kept_embeds)[0]
        if max(sims) < threshold:
            kept_texts.append(text)
            kept_embeds.append(emb)
        else:
            print(f"Dropped '{text}' (similarity {max(sims):.2f})\n")

    print(f"Kept {len(kept_texts)} of {len(subtasks)} subtasks")
    return kept_texts

def score_subtask(subtask: str) -> float:
    docs = tax_return_doc_search(subtask)
    if "No document" not in docs and len(docs) > 50:
        return 1.0
    # lightweight LLM‐based fallback
    prompt = f"""
        You are a tax‑research assistant. Here are the relevant document excerpts:
        {docs}

        Subtask:
        \"{subtask}\"

        On a scale from 0.0 (no chance of finding useful information) to 1.0 (very likely to find useful information), how researchable is this subtask given the above documents? Lower the score if the subtask is too vague or general.
        Respond with only the numeric score (e.g. 0.42).
        """.strip()
    resp = float(llm.invoke(prompt).content.strip())
    return resp

In [None]:
tasks = deduplicate_subtasks(generated_subtasks, threshold=0.7)
tasks

##### Storing the Documents in a vectorstore for RAG

In [None]:
from dotenv import load_dotenv
load_dotenv()

embedder_model = OpenAIEmbeddings(model="text-embedding-3-small")
user_embedding_folder = "../data/faiss_index"

In [None]:
vectorstore = None
# vectorstore.load_local(user_embedding_folder)
if docs:
    document_embedder = DocumentEmbedder(embedder)
    vectorstore = document_embedder.embed_documents(docs, save_path=user_embedding_folder)

### Running the Agent

In [None]:
from backend.src.controller.agent_v1 import DeepResearchAgent
import os
from dotenv import load_dotenv
load_dotenv()

openai_api_key = os.getenv("OPENAI_API_KEY")
tavily_api_key = os.getenv("TAVILY_API_KEY")

payloads =[]
vectorstore = None
document_text = None

user_question = "What are steps that I can take to improve this tax return?"
number_of_tasks = 3
llm_model = "gpt-4o"

In [None]:
research_agent = DeepResearchAgent(llm_model=llm_model, vectorstore=vectorstore, number_of_tasks=number_of_tasks, model_key=openai_api_key, tavily_api_key=tavily_api_key)
response = research_agent.run(user_question, vision_payloads=payloads, document_text=document_text)

final_answer = response["final_answer"]
chain_of_thought = response["chain_of_thought"]

print(chain_of_thought)
print(final_answer)

In [None]:
research_agent = DeepResearchAgent(llm_model=llm_model, vectorstore=vectorstore, number_of_tasks=number_of_tasks, model_key=openai_api_key, tavily_api_key=tavily_api_key)
response = research_agent.run(user_question, vision_payloads=payloads, document_text=document_text)

final_answer = response["final_answer"]
chain_of_thought = response["chain_of_thought"]