In [1]:

from typing import Dict, Any, Optional, List, Annotated, TypedDict, Union
import operator

# ----- Shared State Schema -----
class AgentState(TypedDict):
    user_input: str
    supervisor_decision: Optional[str] = None
    maximo_payload: Optional[str] = None
    maximo_agent_response: Optional[str] = None
    vector_search_result: Optional[str] = None
    final_response: Optional[str] = None
    memory_chain: Annotated[List[Dict[str, Any]], operator.add] = []


In [4]:
from langchain.agents import tool
import ast
from connectors.maximo_connector import MaximoConnector

# instantiate maximo connector.
maximo_connector = MaximoConnector()

In [5]:
from pydantic import BaseModel, Field
class MaiximoPayloadInput(BaseModel):
    maximo_payload: str = Field(description="Thing to search for")

In [6]:

@tool(args_schema=MaiximoPayloadInput)
def perform_maximo_operation(maximo_payload: Union[Dict, str]):
    """
    Uses the maximo tools at the llm with tools to perform the maximo operation.
    :param state: The state of the agent containing the user input and states to be updated.
    :return: A dictionary containing the Maximo payload.
    """
    # Check to see the maximo payload returned, and the response type to perform the correct action.
    if isinstance(maximo_payload, str):
        maximo_payload = ast.literal_eval(maximo_payload)

    request_type = maximo_payload.get("request_type")
    params = maximo_payload.get("params")

    if request_type.lower() == 'get':
        result = maximo_connector.get_workorder_details(params)
    elif request_type.lower() == 'post':
        result = maximo_connector.post_workorder_details(params)
    else:
        result = {
            "message": "This query is not related to Maximo."
        }

    return {
        "maximo_agent_response": result
    }


In [None]:
perform_maximo_operation.description

In [None]:
perform_maximo_operation.name

In [None]:
perform_maximo_operation.args