In [44]:
# HTTP Requests and Async Handling
import requests
import asyncio
import json
# LangChain and Pydantic for Tool Construction and Input Validation
from langchain_community.tools.requests.tool import RequestsPostTool, GenericRequestsWrapper
from langchain_core.pydantic_v1 import BaseModel, Field

# Typing for Type Annotations
from typing import List, Dict, Any, Optional, Union

# LangChain Core for Tool and Runnable Interfaces
from langchain_core.runnables import RunnableLambda, RunnableConfig
from langchain_core.tools import tool, StructuredTool

# Correct imports for Base classes and exceptions in LangChain
from langchain.tools import BaseTool
from langchain_core.callbacks import BaseCallbackManager, BaseCallbackHandler
from langchain_core.tools import BaseTool, ToolException, ValidationError

# Additional Utilities for Asynchronous Operations
from typing_extensions import TypedDict, AsyncIterator



In [63]:
class InputResumoFinanceiro(BaseModel):
    """
    Arguments for the financial summary retrieval tool.
    """
    dDia: str = Field(description='The day for which to get the financial summary', examples=['10/08/2024'])
    lApenasResumo: bool = Field(default=True, description='Whether to get only the summary')


In [65]:

@tool("resumo-financas", args_schema=InputResumoFinanceiro)
class ResumoFinanceiroTool(StructuredTool):
    def __init__(self):
        self.url = 'https://app.omie.com.br/api/v1/financas/resumo/'
        self.app_key = '3568546098117'
        self.app_secret = 'd063098e99c6535528c08a26ab77aff2'

    def invoke(self, input: InputResumoFinanceiro) -> dict:
        params = {
            "call": "ObterResumoFinancas",
            "app_key": self.app_key,
            "app_secret": self.app_secret,
            "param": [input.dict()]
        }
        response = requests.post(self.url, json=params)
        if response.status_code == 200:
            return response.json()
        else:
            raise ToolException(f"Failed to retrieve financial summary: HTTP {response.status_code} - {response.text}")


In [59]:
@tool("resumo-financas", args_schema=InputResumoFinanceiro) 
class ResumoFinanceiroTool(StructuredTool):
    url = 'https://app.omie.com.br/api/v1/financas/resumo/'
    app_key = '3568546098117'
    app_secret = 'd063098e99c6535528c08a26ab77aff2'

    def invoke(self, input: InputResumoFinanceiro) -> dict:
        params = {
            "call": "ObterResumoFinancas",
            "app_key": self.app_key,
            "app_secret": self.app_secret,
            "param": [input.dict()]
        }
        response = requests.post(self.url, json=params)
        if response.status_code == 200:
            return response.json()
        else:
            raise ToolException(f"Failed to retrieve financial summary: HTTP {response.status_code} - {response.text}")

In [71]:
def fetch_financial_summary(dDia: str, lApenasResumo: bool):
    tool_input = ("03/08/2024", True)
    finance_tool = ResumoFinanceiroTool()  # Ensure this instantiation is correct
    try:
        result = finance_tool(tool_input)  # Directly passing the tool_input
        print("Financial Summary Retrieved:", result)
    except Exception as e:  # Catch more broadly if needed
        print("Error:", str(e))


In [70]:
print(result)

NameError: name 'result' is not defined