In [13]:
import logging
from langgraph.prebuilt import ToolInvocation
import json
from langchain_core.messages import FunctionMessage, BaseMessage
from typing import TypedDict, Annotated, Sequence
import operator
from langgraph.graph import StateGraph, END
from langchain_core.messages import BaseMessage
from tools.logger import JsonFormatter
from pathlib import Path
from datetime import datetime

def add_timestamp_to_file(file_path):
    fp = Path(file_path)
    if not fp.exists():
        fp.touch()
        abs_fp = Path(file_path).resolve().absolute()
        abs_fp = abs_fp.with_stem(abs_fp.stem + '_' + datetime.now().strftime('%Y-%m-%d-%H%M%S.%f'))
    return abs_fp

def configure_logger(state, log_name=None, log_filename=None):
    configurable = state['configurable']
    # check logger is already configured
    if configurable.get('logger') is None:
        # set up logger
        if log_name is None:
            log_name = 'default_logger'
        if log_filename is None:
            log_filename = 'default_log.log'
        configurable['logger'] = {
            'log_filename': log_filename,
            'log_name': log_name 
        }
    log_name = configurable['logger']['log_name']
    log_filename = configurable['logger']['log_filename']
    logger = logging.getLogger(log_name)
    logger.setLevel(logging.INFO)
    # Create file handler which logs even debug messages
    fh = logging.FileHandler(log_filename)
    fh.setLevel(logging.INFO)
    # Create formatter and add it to the handlers
    formatter = JsonFormatter({"level": "levelname", 
                                    "message": "message", 
                                    "timestamp": "asctime"})
    # formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    fh.setFormatter(formatter)
    # Add the handlers to logger
    if not logger.hasHandlers():
        logger.addHandler(fh)
    return logger

def delete_logger(state):
    configurable = state['configurable']
    if configurable.get('logger') is not None:
        log_name = configurable['logger']['log_name']
        logger = logging.getLogger(log_name)
        # remove all handlers
        for handler in logger.handlers[:]:
            logger.removeHandler(handler)
        # remove logger from state
        del logger
        state['configurable'].pop('logger', None)

# Define the function that determines whether to continue or not
def should_continue(state):
    logger = configure_logger(state)
    messages = state['messages']
    last_message = messages[-1]
    logger.info("Deciding whether to continue based on the last message: %s", last_message)
    # If there is no function call then we finish
    if "function_call" not in last_message.additional_kwargs:
        logger.info("should_continue: end")
        # remove logger from state
        delete_logger(state)
        return "end"
    # Otherwise if there is we continue
    else:
        logger.info("should_continue: continue")
        return "continue"

# Define the function that calls the model
def call_model(state):
    logger = configure_logger(state)
    messages = state['messages']
    logger.info("Calling model with messages: %s", messages)
    response = model.invoke(messages)
    logger.info("Model response: %s", response)
    # We return a list because this will get added to the existing list
    return {"messages": [response]}

# Define the function to execute tools
def call_tool(state):
    logger = configure_logger(state)
    messages = state['messages']
    last_message = messages[-1]
    logger.info("Calling tool with last message: %s", last_message)
    action = ToolInvocation(
        tool=last_message.additional_kwargs["function_call"]["name"],
        tool_input=json.loads(last_message.additional_kwargs["function_call"]["arguments"])
    )
    response = tool_executor.invoke(action)
    function_message = FunctionMessage(content=str(response), name=action.tool)
    logger.info("Tool response: %s", function_message)
    # We return a list because this will get added to the existing list
    return {"messages": [function_message]}

class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]
    configurable: TypedDict  # Add configurable attribute for log filename

# Define a new graph
workflow = StateGraph(AgentState)

# Define the two nodes we will cycle between
workflow.add_node("agent", call_model)
workflow.add_node("action", call_tool)

# Set the entrypoint as `agent`
workflow.set_entry_point("agent")

# Add conditional edges
workflow.add_conditional_edges(
    "agent",
    should_continue,
    {
        "continue": "action",
        "end": END
    }
)

# Add a normal edge from `action` to `agent`
workflow.add_edge('action', 'agent')

# Compile the graph
app = workflow.compile()

In [14]:
from langchain_community. chat_models.fake import FakeMessagesListChatModel
from langchain_core.messages import HumanMessage, AIMessage


# Define a list of responses that the fake model will use
responses = [
    AIMessage(content="The weather in San Francisco is sunny."),
    AIMessage(content="I'm sorry, I couldn't fetch the weather details.")
]

model = FakeMessagesListChatModel(responses=responses)

# Set up the tool executor (assuming you have defined your tools)
from langgraph.prebuilt import ToolExecutor
tools = []  # Define your tools here
tool_executor = ToolExecutor(tools)

# Example state with configurable log file name
state = {
    "messages": [HumanMessage(content="what is the weather in sf")],
    "configurable": {}
}

logger = configure_logger(state, 'my_logger', 'my_log.log')
logger.info("Initial input: %s", state)

output = app.invoke(state)

In [35]:

abs_fp = add_timestamp_to_file('my_log.log')
# add current datetime in yyyy-mm-dd-hMS.ms to the abs_fp stem


PosixPath('/home/ubuntu/workspace/XMODE-LLMCompiler/my_log_2024-07-19-115546.479404.log')