In [10]:
import os
import json
from sec_edgar_downloader import Downloader
from huggingface_hub import login, notebook_login
from smolagents import Tool, HfApiModel, ToolCallingAgent
from langchain.docstore.document import Document
from langchain.vectorstores import FAISS
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores.utils import DistanceStrategy

In [11]:
notebook_login()

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

In [12]:
########################################################################################################################
# 1. Data Fetcher: Currently using sec filings as an example, replace with your own data source
########################################################################################################################

class SECDataFetcher:
    def __init__(self, storage_dir="./sec_filings"):
        self.downloader = Downloader(email_address="jjbigdub@gmail.com", company_name="FAC-IITK", download_folder=storage_dir)

    def fetch_filings(self, cik: str, form_type: str = "10-Q"):
        filings = self.downloader.get(form_type, cik, limit=1)
        return filings

In [13]:
#################################################################################################################################################
# 2. Extraction Agent: Preprocess the extracted data, can make multiple preprocessing functions depending on the structure of the fetched data
#################################################################################################################################################

class TabularDataAgent(Tool):
    name = "tabular_data_extractor"
    description = (
        "Extracts and intelligently identifies relevant numerical financial data from an SEC 10-Q filing. "
        "If multiple distinct tabular datasets exist within the document, output them as a JSON array; "
        "each element must be an object with two keys: 'table' (a Markdown formatted table with two columns, "
        "'Financial Metric' and 'Value') and 'context' (detailed excerpts or explanation of where the numbers were found). "
        "Do not include any commentary outside of the JSON object."
    )
    inputs = {
        "file_path": {
            "type": "string",
            "description": "The file path of the SEC 10-Q filing document."
        }
    }
    output_type = "string"

    def __init__(self, model, **kwargs):
        super().__init__(**kwargs)
        self.model = model

    def forward(self, file_path: str) -> str:
        try:
            with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
                content = f.read()
        except Exception as e:
            return json.dumps([{"table": "", "context": f"Error reading file: {e}"}])
        
        prompt = (
            "You are an expert financial analyst and data scientist. "
            "You are given the full text of an unstructured SEC 10-Q filing that may have numerical values scattered randomly. "
            "Your task is to intelligently identify which numerical values are relevant to the financial statements, "
            "capture the context (e.g., surrounding sentences or paragraphs) for each value, and output the results as a JSON array. "
            "Each element of the array must be an object with exactly two keys: 'table' and 'context'. "
            "The 'table' must be a Markdown formatted table with two columns: 'Financial Metric' and 'Value'. "
            "If only one dataset is found, output it as an array with a single object. Do not include any extra commentary.\n\n"
            "SEC 10-Q Filing Content:\n"
            f"{content}\n"
        )
        prompt_message = {"role": "user", "content": prompt}
        raw_result = self.model([prompt_message])
        return raw_result

In [14]:
############################################################
# 3. Evaluation Agent for Tabular Data Format
############################################################

class EvaluateAgent(Tool):
    name = "evaluate_tabular_data"
    description = (
        "Evaluates and corrects the format of a JSON output from a tabular data extraction. "
        "Ensure that the output is a JSON array of objects, where each object has exactly two keys: "
        "'table' and 'context'. Return a corrected JSON string if necessary."
    )
    inputs = {
        "extraction_output": {
            "type": "string",
            "description": "The JSON string output from the extraction tool."
        }
    }
    output_type = "string"

    def __init__(self, model, **kwargs):
        super().__init__(**kwargs)
        self.model = model

    def forward(self, extraction_output: str) -> str:
        prompt = (
            "You are an expert in data formatting. Validate the following JSON data to ensure that it is a JSON array "
            "of objects, where each object has exactly two keys: 'table' and 'context'. If it is not correctly formatted, "
            "return a corrected JSON string. Otherwise, return the input unchanged.\n\n"
            f"Input JSON:\n{extraction_output}\n"
        )
        prompt_message = {"role": "user", "content": prompt}
        result = self.run([prompt_message])
        try:
            json.loads(result)
            return result
        except Exception:
            return extraction_output

In [15]:
############################################################
# 4. Query Tool to Search the Vector Database
############################################################

class QueryVectorDBTool(Tool):
    name = "query_vector_db"
    description = (
        "Queries the vector database to retrieve stored financial data. "
        "Input a natural language query and return the most relevant documents, each containing a Markdown table and detailed context."
    )
    inputs = {
        "query": {
            "type": "string",
            "description": "A natural language query to search the vector database."
        }
    }
    output_type = "string"

    def __init__(self, vectordb, **kwargs):
        super().__init__(**kwargs)
        self.vectordb = vectordb

    def forward(self, query: str) -> str:
        docs = self.vectordb.similarity_search(query, k=5)
        results = []
        for i, doc in enumerate(docs):
            results.append(
                f"Document {i} (Source: {doc.metadata.get('source', 'N/A')}):\n{doc.page_content}\n"
            )
        return "\n".join(results)

In [16]:
sec_fetcher = SECDataFetcher(storage_dir="./sec_filings")
cik = "0000320193"  
sec_fetcher.fetch_filings(cik=cik, form_type="10-Q")

parent_folder = os.path.join("./sec_filings", "sec-edgar-filings", cik, "10-Q")

filings = []
for root, dirs, files in os.walk(parent_folder):
    for file in files:
        if file.endswith(".txt"):
            filings.append(os.path.join(root, file))

#print(filings)

model = HfApiModel("meta-llama/Llama-3.1-70B-Instruct")
extraction_tool = TabularDataAgent(model=model)
evaluation_tool = EvaluateAgent(model=model)

documents = []
for filing in filings:
    extraction_result = extraction_tool.forward(filing)
    evaluated_result = evaluation_tool.forward(extraction_result)
    try:
        extraction_data = json.loads(evaluated_result)
        if not isinstance(extraction_data, list):
            extraction_data = [extraction_data]
    except Exception as e:
        extraction_data = [{"table": extraction_result, "context": f"Parsing error: {e}"}]

    for dataset in extraction_data:
        table = dataset.get("table", "")
        context_detail = dataset.get("context", "")
        combined_content = f"Table:\n{table}\n\nDetailed Context:\n{context_detail}"
        doc = Document(page_content=combined_content, metadata={"source": filing, "cik": cik})
        documents.append(doc)
        print(f"Stored dataset from filing: {filing}\n{'-'*40}\n{combined_content}\n{'='*40}\n")

embedding_model = HuggingFaceEmbeddings(model_name="thenlper/gte-small")
vectordb = FAISS.from_documents(
    documents=documents,
    embedding=embedding_model,
    distance_strategy=DistanceStrategy.COSINE,
)

query_tool = QueryVectorDBTool(vectordb=vectordb)
agent = ToolCallingAgent(tools=[query_tool], model=model)

query = "Show me financial metrics related to revenue and net income."
query_result = agent.run(query)
print("Query Result:\n", query_result)


BadRequestError: (Request ID: KsXe--)

Bad request:
Model requires a Pro subscription; check out hf.co/pricing to learn more. Make sure to include your HF token in your query.