1. Imports & Config

In [None]:
import os, re, json
from pathlib import Path
from typing import Dict, Any, Optional, List
import faiss
import pandas as pd
from groq import Groq
from transformers import pipeline
# Reuse functions from Task-1 retriever
from retriever import jina_text_embed, parse_page
# Paths
FAISS_INDEX_PATH = "data/faiss.idx"
META_PATH = "data/meta.json"
MEMORY_PATH = "data/memory.json"
TOP_K = 5
Path("data").mkdir(exist_ok=True)
if not Path(MEMORY_PATH).exists():
    Path(MEMORY_PATH).write_text("[]")
# Groq client
client = Groq(api_key=os.getenv("GROQ_API_KEY"))


2. Retriever Agent

In [None]:
def read_meta(meta_path=META_PATH):
    with open(meta_path, "r") as f: return json.load(f)

def load_faiss(index_path=FAISS_INDEX_PATH):
    return faiss.read_index(index_path)

class RetrieverAgent:
    def __init__(self, index_path=FAISS_INDEX_PATH, meta_path=META_PATH, k=TOP_K):
        self.index = load_faiss(index_path)
        self.meta = read_meta(meta_path)
        self.k = k

    def act(self, context: Dict[str,Any]) -> Dict[str,Any]:
        query = context["query"]
        q_emb = jina_text_embed(query)
        faiss.normalize_L2(q_emb)
        D, I = self.index.search(q_emb, self.k)
        hits = []
        texts = []
        for score, idx in zip(D[0], I[0]):
            if idx == -1: continue
            m = self.meta[idx]
            hits.append({"score": float(score), "meta": m})
            texts.append(parse_page(m["path"])["text"][:1000])
        return {
            "agent":"RetrieverAgent",
            "tool":"faiss_search",
            "input":{"query":query},
            "output":{"hits":hits,"retrieved_text":" ".join(texts)}
        }


3. Table Agent

In [None]:
class TableAgent:
    NUM_RE = re.compile(r"\d+[\d,\.]*")

    def act(self, context: Dict[str,Any]) -> Dict[str,Any]:
        text = context.get("retrieved_text","")
        blocks = [b for b in re.split(r"\n\s*\n", text) if len(self.NUM_RE.findall(b)) >= 3]
        tables = []
        for blk in blocks:
            lines = [ln.strip() for ln in blk.splitlines() if ln.strip()]
            if not lines: continue
            if all(len(re.split(r"\s{2,}", ln))>1 for ln in lines[:3]):
                rows = [re.split(r"\s{2,}", ln) for ln in lines]
                df = pd.DataFrame(rows[1:], columns=rows[0])
                for c in df.columns:
                    df[c] = df[c].astype(str).str.replace(",","").str.replace("%","")
                    df[c] = pd.to_numeric(df[c], errors="ignore")
                tables.append(df)
        return {
            "agent":"TableAgent",
            "tool":"parse_tables",
            "input":{"text_snippet":text[:200]},
            "output":{"n_tables":len(tables),"tables":[t.to_dict() for t in tables]}
        }


4. Math Agent

In [None]:
class MathAgent:
    def act(self, context: Dict[str,Any]) -> Dict[str,Any]:
        tables = context.get("tables",[])
        results = []
        for t in tables:
            df = pd.DataFrame(t)
            try:
                # find year column
                year_col = None
                for c in df.columns:
                    if df[c].astype(str).str.match(r"20\d{2}").any():
                        year_col = c
                        break
                val_col = next((c for c in df.columns if c!=year_col), None)
                yoy=None
                if year_col and val_col:
                    df2 = df[[year_col,val_col]].dropna()
                    df2[year_col] = df2[year_col].astype(int)
                    df2[val_col] = pd.to_numeric(df2[val_col], errors="coerce")
                    mapping = dict(zip(df2[year_col], df2[val_col]))
                    years = sorted(mapping.keys())
                    if len(years)>=2 and mapping[years[-2]]!=0:
                        yoy=(mapping[years[-1]]-mapping[years[-2]])/mapping[years[-2]]
                results.append({"ok":yoy is not None,"yoy":yoy})
            except Exception as e:
                results.append({"ok":False,"error":str(e)})
        return {
            "agent":"MathAgent",
            "tool":"compute_yoy",
            "input":{"n_tables":len(tables)},
            "output":{"numeric_results":results}
        }


5. Summarizer Agent (Groq)

In [None]:
class SummarizerAgent:
    def __init__(self, client, model="llama3-70b-8192"):
        self.client = client
        self.model = model

    def act(self, context: Dict[str,Any]) -> Dict[str,Any]:
        q = context["query"]
        num = context.get("numeric_results",{})
        text = context.get("retrieved_text","")
        prompt = f"Context:\n{text}\n\nNumeric: {json.dumps(num)}\n\nQuestion: {q}\nAnswer concisely:"
        resp = self.client.chat.completions.create(
            model=self.model,
            messages=[{"role":"user","content":prompt}],
            temperature=0.2
        )
        ans = resp.choices[0].message.content
        return {
            "agent":"SummarizerAgent",
            "tool":"groq_llm",
            "input":prompt[:200],
            "output":{"final_answer":ans}
        }


6. Groq Supervisor (Dynamic)

In [None]:
class GroqSupervisor:
    def __init__(self, client, model="mixtral-8x7b-32768"):
        self.client = client
        self.model = model

    def decide_next(self, query: str, context: Dict[str,Any]) -> str:
        prompt = f"""
        You are a supervisor of a financial QA system.
        Agents available: RetrieverAgent, TableAgent, MathAgent, SummarizerAgent.
        Query: {query}
        Current context: {json.dumps(context, indent=2)}

        Decide which agent should act next.
        If final answer is ready, output 'STOP'.
        Only return one of: RetrieverAgent, TableAgent, MathAgent, SummarizerAgent, STOP.
        """
        resp = self.client.chat.completions.create(
            model=self.model,
            messages=[{"role":"user","content":prompt}],
            temperature=0.2
        )
        return resp.choices[0].message.content.strip()


7. Dynamic Orchestrator

In [None]:
class DynamicOrchestrator:
    def __init__(self, agents, supervisor, memory_path=MEMORY_PATH):
        self.agents = agents
        self.supervisor = supervisor
        self.memory_path = memory_path

    def log_memory(self, record):
        mem = json.loads(Path(self.memory_path).read_text())
        mem.append(record)
        Path(self.memory_path).write_text(json.dumps(mem, indent=2))

    def run(self, query: str) -> Dict[str,Any]:
        trace, context = [], {"query": query}
        agent_name = self.supervisor.decide_next(query, context)

        while agent_name != "STOP":
            step = self.agents[agent_name].act(context)
            trace.append(step)
            context.update(step["output"])
            agent_name = self.supervisor.decide_next(query, context)

        result = {"query": query, "trace": trace, "final_answer": context.get("final_answer","")}
        self.log_memory(result)
        return result


8. Initialize System

In [None]:
retriever_agent = RetrieverAgent()
table_agent = TableAgent()
math_agent = MathAgent()
summ_agent = SummarizerAgent(client)

agents = {
    "RetrieverAgent": retriever_agent,
    "TableAgent": table_agent,
    "MathAgent": math_agent,
    "SummarizerAgent": summ_agent
}

supervisor = GroqSupervisor(client)
system = DynamicOrchestrator(agents, supervisor)


9. Run Example Query & Save Trace

In [None]:
query = "Compare the YoY revenue growth and R&D spending between 2021 and 2022, and summarize the risks affecting future revenue."
result = system.run(query)

print("=== Final Answer ===\n", result["final_answer"])
Path("outputs").mkdir(exist_ok=True)
Path("outputs/query1.json").write_text(json.dumps(result, indent=2))
print("Trace saved to outputs/query1.json")