# Multi-agent: 다중 에이전의 구조화된 출력


'구조화된 출력'은 JSON 모드와 함수 호출을 기반으로 구축된 새로운 기능으로, 모델 출력이 엄격한 스키마를 따르도록 강제한다. 새로운 매개변수인 `strict: true`를 사용하여, 응답이 제공된 스키마를 준수함을 보장할 수 있다.

## 환경 설정

In [1]:
# 필요한 라이브러리들을 가져온다.
import json
from textwrap import dedent
from openai import OpenAI

import os
from dotenv import load_dotenv  

load_dotenv()
api_key = os.getenv("OPENAI_API_KEY")

MODEL = "gpt-4.1-mini"
client = OpenAI()

In [2]:
from IPython.display import Image
import json
import pandas as pd
import matplotlib.pyplot as plt
from io import StringIO
import numpy as np

## 에이전트 설정

먼저 4개의 에이전트로 구성된 시스템을 설정하자:

1.  **분류 에이전트(Triaging agent):** 어떤 에이전트를 호출할지 결정한다.
2.  **데이터 전처리 에이전트(Data pre-processing Agent):** 분석을 위해 데이터를 준비한다. (예: 데이터 정제)
3.  **데이터 분석 에이전트(Data Analysis Agent):** 데이터에 대한 분석을 수행한다.
4.  **데이터 시각화 에이전트(Data Visualization Agent):** 분석 결과를 시각화하여 통찰력을 추출한다.

각 에이전트에 대한 시스템 프롬프트를 정의하는 것으로 시작할 것이다.

In [3]:
# 분류 에이전트를 위한 시스템 프롬프트이다.
triaging_system_prompt = """당신은 분류 에이전트다. 당신의 역할은 사용자의 쿼리를 평가하고 관련 에이전트에게 전달하는 것이다. 사용 가능한 에이전트는 다음과 같다:
- 데이터 처리 에이전트: 데이터를 정제, 변환, 집계한다.
- 분석 에이전트: 통계, 상관관계, 회귀 분석을 수행한다.
- 시각화 에이전트: 막대 차트, 선 차트, 파이 차트를 생성한다.

send_query_to_agents 도구를 사용하여 사용자의 쿼리를 관련 에이전트에게 전달하라. 또한, 필요한 경우 speak_to_user 도구를 사용하여 사용자로부터 추가 정보를 얻어라."""

# 데이터 처리 에이전트를 위한 시스템 프롬프트이다.
processing_system_prompt = """당신은 데이터 처리 에이전트다. 당신의 역할은 다음 도구들을 사용하여 데이터를 정제, 변환, 집계하는 것이다:
- clean_data
- transform_data
- aggregate_data"""

# 분석 에이전트를 위한 시스템 프롬프트이다.
analysis_system_prompt = """당신은 분석 에이전트다. 당신의 역할은 다음 도구들을 사용하여 통계, 상관관계, 회귀 분석을 수행하는 것이다:
- stat_analysis
- correlation_analysis
- regression_analysis"""

# 시각화 에이전트를 위한 시스템 프롬프트이다.
visualization_system_prompt = """당신은 시각화 에이전트다. 당신의 역할은 다음 도구들을 사용하여 막대 차트, 선 차트, 파이 차트를 생성하는 것이다:
- create_bar_chart
- create_line_chart
- create_pie_chart"""

## Agent Tools 정의

그런 다음 각 에이전트에 대한 도구를 정의할 것이다.

분류 에이전트를 제외하고, 각 에이전트는 자신의 역할에 특화된 도구를 갖게 될 것이다:

#### 데이터 전처리 에이전트

1.  데이터 정제 (Clean data)
2.  데이터 변환 (Transform data)
3.  데이터 집계 (Aggregate data)

#### 데이터 분석 에이전트

1.  통계 분석 (Statistical analysis)
2.  상관관계 분석 (Correlation analysis)
3.  회귀 분석 (Regression Analysis)

#### 데이터 시각화 에이전트

1.  막대 차트 생성 (Create bar chart)
2.  선 차트 생성 (Create line chart)
3.  파이 차트 생성 (Create pie chart)

<!-- end list -->

In [4]:
# 각 에이전트가 사용할 도구의 스키마를 정의한다.
# 'strict: True'는 모델이 이 스키마를 엄격하게 따르도록 강제한다.

# 분류 에이전트의 도구이다.
triage_tools = [
    {
        "type": "function",
        "function": {
            "name": "send_query_to_agents",
            "description": "사용자 쿼리를 기능에 따라 관련 에이전트에게 보낸다.",
            "parameters": {
                "type": "object",
                "properties": {
                    "agents": {
                        "type": "array",
                        "items": {"type": "string"},
                        "description": "쿼리를 보낼 에이전트 이름의 배열이다."
                    },
                    "query": {
                        "type": "string",
                        "description": "보낼 사용자 쿼리이다."
                    }
                },
                "required": ["agents", "query"]
            }
        },
        "strict": True
    }
]

# 데이터 전처리 에이전트의 도구이다.
preprocess_tools = [
    {
        "type": "function",
        "function": {
            "name": "clean_data",
            "description": "중복을 제거하고 결측값을 처리하여 제공된 데이터를 정제한다.",
            "parameters": {
                "type": "object",
                "properties": {
                    "data": {
                        "type": "string",
                        "description": "정제할 데이터셋이다. JSON이나 CSV와 같은 적절한 형식이어야 한다."
                    }
                },
                "required": ["data"],
                "additionalProperties": False
            }
        },
        "strict": True
    },
    # ... (transform_data, aggregate_data 도구 정의는 생략)
]

# 데이터 분석 에이전트의 도구이다.
analysis_tools = [
    {
        "type": "function",
        "function": {
            "name": "stat_analysis",
            "description": "주어진 데이터셋에 대한 통계 분석을 수행한다.",
            "parameters": {
                "type": "object",
                "properties": {
                    "data": {
                        "type": "string",
                        "description": "분석할 데이터셋이다. JSON이나 CSV와 같은 적절한 형식이어야 한다."
                    }
                },
                "required": ["data"],
                "additionalProperties": False
            }
        },
        "strict": True
    },
    # ... (correlation_analysis, regression_analysis 도구 정의는 생략)
]

# 데이터 시각화 에이전트의 도구이다.
visualization_tools = [
    {
        "type": "function",
        "function": {
            "name": "create_bar_chart",
            "description": "제공된 데이터로 막대 차트를 생성한다.",
            "parameters": {
                "type": "object",
                "properties": {
                    "data": {
                        "type": "string",
                        "description": "막대 차트에 사용할 데이터이다. JSON이나 CSV와 같은 적절한 형식이어야 한다."
                    },
                    "x": {
                        "type": "string",
                        "description": "x축에 대한 컬럼이다."
                    },
                    "y": {
                        "type": "string",
                        "description": "y축에 대한 컬럼이다."
                    }
                },
                "required": ["data", "x", "y"],
                "additionalProperties": False
            }
        },
        "strict": True
    },
    # ... (create_line_chart, create_pie_chart 도구 정의는 생략)
]

## 도구 실행

  - 사용자 쿼리를 다중 에이전트 시스템에 전달하는 것을 처리한다.
  - 다중 에이전트 시스템의 내부 작동을 처리한다.
  - 도구 호출을 실행한다.

In [5]:
# 예제 사용자 쿼리이다.
user_query = """
아래는 일부 데이터다. 먼저 중복을 제거한 다음 데이터의 통계를 분석하고 선 차트를 그려달라.

house_size (m3), house_price ($)
90, 100
80, 90
100, 120
90, 100
"""

In [6]:
# 데이터 정제 함수이다.
def clean_data(data):
    data_io = StringIO(data)
    df = pd.read_csv(data_io, sep=",")
    df_deduplicated = df.drop_duplicates()
    return df_deduplicated

# 통계 분석 함수이다.
def stat_analysis(data):
    data_io = StringIO(data)
    df = pd.read_csv(data_io, sep=",")
    return df.describe()

# 선 차트를 그리는 함수이다.
def plot_line_chart(data):
    data_io = StringIO(data)
    df = pd.read_csv(data_io, sep=",")

    x = df.iloc[:, 0]
    y = df.iloc[:, 1]

    # 최적 적합선을 계산한다.
    coefficients = np.polyfit(x, y, 1)
    polynomial = np.poly1d(coefficients)
    y_fit = polynomial(x)

    plt.figure(figsize=(10, 6))
    plt.plot(x, y, 'o', label='데이터 포인트')
    plt.plot(x, y_fit, '-', label='최적 적합선')
    plt.title('최적 적합선을 포함한 선 차트')
    plt.xlabel(df.columns[0])
    plt.ylabel(df.columns[1])
    plt.legend()
    plt.grid(True)
    plt.show()

# 도구를 실행하는 함수를 정의한다.
def execute_tool(tool_calls, messages):
    for tool_call in tool_calls:
        tool_name = tool_call.function.name
        tool_arguments = json.loads(tool_call.function.arguments)

        if tool_name == 'clean_data':
            # 데이터 정제를 시뮬레이션한다.
            cleaned_df = clean_data(tool_arguments['data'])
            cleaned_data = {"cleaned_data": cleaned_df.to_dict()}
            messages.append({"role": "tool", "name": tool_name, "content": json.dumps(cleaned_data)})
            print('정제된 데이터: ', cleaned_df)
        
        elif tool_name == 'stat_analysis':
            # 통계 분석을 시뮬레이션한다.
            stats_df = stat_analysis(tool_arguments['data'])
            stats = {"stats": stats_df.to_dict()}
            messages.append({"role": "tool", "name": tool_name, "content": json.dumps(stats)})
            print('통계 분석: ', stats_df)
        
        elif tool_name == 'create_line_chart':
            # 선 차트 생성을 시뮬레이션한다.
            line_chart = {"line_chart": "sample_line_chart"}
            messages.append({"role": "tool", "name": tool_name, "content": json.dumps(line_chart)})
            plot_line_chart(tool_arguments['data'])
        # ... (다른 도구 실행 로직은 생략)
    return messages

In [7]:
# 각 에이전트의 처리를 담당하는 함수들을 정의한다.

# 데이터 처리 에이전트 핸들러이다.
def handle_data_processing_agent(query, conversation_messages):
    messages = [{"role": "system", "content": processing_system_prompt}]
    messages.append({"role": "user", "content": query})

    response = client.chat.completions.create(
        model=MODEL,
        messages=messages,
        temperature=0,
        tools=preprocess_tools,
    )

    conversation_messages.append([tool_call.function for tool_call in response.choices[0].message.tool_calls])
    execute_tool(response.choices[0].message.tool_calls, conversation_messages)

# 분석 에이전트 핸들러이다.
def handle_analysis_agent(query, conversation_messages):
    messages = [{"role": "system", "content": analysis_system_prompt}]
    messages.append({"role": "user", "content": query})

    response = client.chat.completions.create(
        model=MODEL,
        messages=messages,
        temperature=0,
        tools=analysis_tools,
    )

    conversation_messages.append([tool_call.function for tool_call in response.choices[0].message.tool_calls])
    execute_tool(response.choices[0].message.tool_calls, conversation_messages)

# 시각화 에이전트 핸들러이다.
def handle_visualization_agent(query, conversation_messages):
    messages = [{"role": "system", "content": visualization_system_prompt}]
    messages.append({"role": "user", "content": query})

    response = client.chat.completions.create(
        model=MODEL,
        messages=messages,
        temperature=0,
        tools=visualization_tools,
    )

    conversation_messages.append([tool_call.function for tool_call in response.choices[0].message.tool_calls])
    execute_tool(response.choices[0].message.tool_calls, conversation_messages)

마지막으로, 사용자 쿼리 처리를 담당하는 포괄적인 도구를 생성한다.

이 함수는 사용자 쿼리를 받아 모델로부터 응답을 얻고, 이를 다른 에이전트에게 전달하여 실행하도록 처리한다. 이 외에도, 진행 중인 대화의 상태를 유지할 것이다.

In [8]:
# 사용자 메시지를 처리하고 분류하는 함수이다.
def handle_user_message(user_query, conversation_messages=[]):
    user_message = {"role": "user", "content": user_query}
    conversation_messages.append(user_message)

    messages = [{"role": "system", "content": triaging_system_prompt}]
    messages.extend(conversation_messages)

    # 분류 에이전트를 호출하여 어떤 하위 에이전트를 사용할지 결정한다.
    response = client.chat.completions.create(
        model=MODEL,
        messages=messages,
        temperature=0,
        tools=triage_tools,
    )

    conversation_messages.append([tool_call.function for tool_call in response.choices[0].message.tool_calls])

    # 결정된 하위 에이전트들을 순차적으로 호출한다.
    for tool_call in response.choices[0].message.tool_calls:
        if tool_call.function.name == 'send_query_to_agents':
            agents = json.loads(tool_call.function.arguments)['agents']
            query = json.loads(tool_call.function.arguments)['query']
            for agent in agents:
                if agent == "Data Processing Agent":
                    handle_data_processing_agent(query, conversation_messages)
                elif agent == "Analysis Agent":
                    handle_analysis_agent(query, conversation_messages)
                elif agent == "Visualization Agent":
                    handle_visualization_agent(query, conversation_messages)

    return conversation_messages

## 다중 에이전트 시스템 실행

마지막으로, 사용자 쿼리에 대해 포괄적인 `handle_user_message` 함수를 실행하고 출력을 확인한다.

In [9]:
# 사용자 쿼리로 다중 에이전트 시스템을 실행한다.
handle_user_message(user_query)

[{'role': 'user',
  'content': '\n아래는 일부 데이터다. 먼저 중복을 제거한 다음 데이터의 통계를 분석하고 선 차트를 그려달라.\n\nhouse_size (m3), house_price ($)\n90, 100\n80, 90\n100, 120\n90, 100\n'},
 [Function(arguments='{"agents": ["데이터 처리 에이전트"], "query": "중복을 제거한 데이터: house_size (m3), house_price ($)\\n90, 100\\n80, 90\\n100, 120\\n90, 100"}', name='send_query_to_agents'),
  Function(arguments='{"agents": ["분석 에이전트"], "query": "중복을 제거한 데이터에 대해 통계 분석을 수행하라. 데이터는 house_size (m3), house_price ($)이다."}', name='send_query_to_agents'),
  Function(arguments='{"agents": ["시각화 에이전트"], "query": "중복을 제거한 데이터에 대해 선 차트를 그려라. 데이터는 house_size (m3), house_price ($)이다."}', name='send_query_to_agents')]]