In [1]:
from influxdb_client import InfluxDBClient
from dotenv import load_dotenv
import os
load_dotenv()

client = InfluxDBClient(
    url=os.getenv("INFLUXDB_URL"),
    token=os.getenv("INFLUXDB_TOKEN"),
    org=os.getenv("INFLUXDB_ORG"),
)
query_api = client.query_api()

In [2]:
from typing import Annotated
from typing_extensions import TypedDict

from langchain.agents import Tool
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages

class State(TypedDict):
    messages: Annotated[list[str], add_messages]
    db_output: list
    next_inspection: list
    
graph_builder = StateGraph(State)

In [12]:
def query_tool(process_id):
    """InfluxDB에서 프로세스 ID에 대한 로그를 조회합니다."""
    query = f'''
    from(bucket: "{process_id}_status")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "status_log")
    '''
    try:
        tables = query_api.query(query)
        logs = []
        for table in tables:
            for record in table.records:
                logs.append(f"{record.get_time()}: {record.get_value()}")
        
        if not logs:
            # 검색 결과가 없는 경우
            if "-" in process_id:
                return f"라인 ID '{process_id}'에 대한 로그가 없습니다."
            else:
                return f"프로세스 ID '{process_id}'에 대한 로그가 없습니다."
        output = "\n".join(logs)
        return {"message": [output]}
    except Exception as e:
        return f"로그 조회 중 오류 발생: {e}"

In [4]:
from langgraph.prebuilt import ToolNode, tools_condition
tool = Tool(
    name="query_tool",
    func=query_tool,
    description="""Get recent status logs from InfluxDB for the given process_id or line_id.
        The function returns the most recent status logs from the last hour.
        Input should be a process ID (e.g., 'P1') or a line ID (e.g., 'P1-A').
        Process ID is used to query by process_id field, while input with hyphen like 'P1-A' is used to query by line_id field.
        """)
tools = [tool]
tool_node = ToolNode(tools=tools)
graph_builder.add_node("tools", tool_node)

<langgraph.graph.state.StateGraph at 0x10eda8990>

In [5]:
from langchain_openai import ChatOpenAI
from langchain_core.messages import AIMessage
from langchain_core.prompts import ChatPromptTemplate

llm = ChatOpenAI(model="gpt-4o")
llm_with_tools = llm.bind_tools(tools)

In [6]:
GENERATE_SYSTEM_TEMPLATE = """
You are a database inspection assistant. You will be given a list of database queries and their results. Your task is to analyze the results and provide insights or suggestions for further actions."""
GENERATE_USER_TEMPLATE = """
You are a process monitoring assistant.
Your job is to analyze process logs and help diagnose issues.

When logs show errors, timeouts, or abnormal patterns, you can:
1. Request more information about specific processes
2. Send maintenance commands when necessary"""


def supervisor(state: State):
    msgs = [
        ("system", GENERATE_SYSTEM_TEMPLATE),
        ("user", GENERATE_USER_TEMPLATE)
    ]
    prompt = ChatPromptTemplate.from_messages(msgs)
    response = llm_with_tools.invoke(
        prompt.format_prompt(messages=state["messages"]),
    )
    outputs = []
    outputs.append(
        AIMessage(
            content=response.content,
        )
    )
    return {"messages": outputs}

graph_builder.add_node("supervisor", supervisor)

<langgraph.graph.state.StateGraph at 0x10eda8990>

In [7]:
graph_builder.add_edge(START, "supervisor")
graph_builder.add_edge("supervisor", END)
graph_builder.add_conditional_edges("supervisor", tools_condition)
graph_builder.add_edge("tools", "supervisor")
graph = graph_builder.compile()

#graph

In [9]:
def stream_graph_updates(user_input: str):
    for event in graph.stream({"messages": [{"role": "user", "content": user_input}]}): # graph 노드 호출 결과 받아옴
        for value in event.values():
            print(value, "\n")

In [11]:
while True:
    try:
        user_input = input()
        if user_input.lower() in ["quit", "exit", "q"]:
            print("Goodbye!")
            break

        stream_graph_updates(user_input)
    except:
        break

{'messages': [AIMessage(content="Please provide the specific process or line IDs whose logs you would like me to analyze, along with any recent logs or patterns you've observed. If there are specific errors or abnormalities, please include those details as well.", additional_kwargs={}, response_metadata={}, id='a925d6c7-159e-4f7a-acda-3d7ad50c538b')]} 

{'messages': [AIMessage(content='Please provide the list of database queries and their results so I can analyze them for any issues or abnormal patterns.', additional_kwargs={}, response_metadata={}, id='18992eee-f87a-4ce2-867d-afc4270550c5')]} 

{'messages': [AIMessage(content='Please provide the specific process logs you would like me to analyze for errors, timeouts, or abnormal patterns.', additional_kwargs={}, response_metadata={}, id='e0b88f66-d0d0-4172-804f-43f5192fa6d3')]} 

Goodbye!
