# 모듈 10: 병렬 실행(Parallel Execution) 워크플로우

이 모듈에서는 여러 에이전트를 동시에 실행하여 작업 시간을 단축하고, 다양한 관점의 분석 결과를 한 번에 종합하는 **병렬 실행 패턴**을 배운다.

## 학습 목표

- **비동기 처리(Asyncio)**: Python의 비동기 기능을 활용하여 여러 에이전트에게 동시에 작업을 시킨다.
- **결과 취합(Aggregation)**: 각 에이전트가 내놓은 결과를 하나의 세션 상태(State)에 모은다.
- **성능 향상**: 순차 실행 대비 처리 시간을 얼마나 단축할 수 있는지 이해한다.

## 라이브러리 설치

먼저, 이 노트북이 빌드된 특정 버전의 Google Agent Development Kit (ADK)를 설치한다. 버전을 고정하면 코드가 항상 예상대로 작동한다.

In [None]:
%pip install -q google-adk

In [2]:
import os
from dotenv import load_dotenv
from IPython.display import display, Markdown

from google.adk.tools import ToolContext, google_search
from google.adk.agents import Agent, ParallelAgent, SequentialAgent
from google.adk.sessions import Session, InMemorySessionService
from google.genai.types import Content, Part
from google.adk.runners import Runner

## 인증: API 키 구성

다음으로 Google API 키를 안전하게 제공해야 한다. 이 코드는 키를 붙여넣을 수 있는 보안 입력 프롬프트를 생성한다. 그런 다음 키를 환경 변수로 설정하며, 이는 ADK가 요청을 인증하는 표준 방식이다.

In [3]:
load_dotenv()
MODEL = "gemini-2.5-flash"

## 상태 업데이트 도구 생성

워크플로를 더 지능적으로 만들기 위해 먼저 추출된 정보를 세션의 메모리에 저장할 수 있는 사용자 정의 도구가 필요하다. 이 함수는 `event_type`과 `city`를 받아 `tool_context.state`에 저장한다.

In [4]:
def update_session_state(
    tool_context: ToolContext, event_type: str, city: str
) -> str:
  """추출된 이벤트 유형과 도시를 세션 상태에 저장한다."""
  print(f"  [도구 호출] 세션 상태 업데이트 중: event_type='{event_type}', city='{city}'")
  
  # 도구 컨텍스트를 통해 세션 상태에 접근하여 값을 저장한다
  tool_context.state['event_type'] = event_type
  tool_context.state['city'] = city
  return "세션 상태가 이벤트 매개변수로 업데이트되었다."

## "Intake" 에이전트 생성

이 에이전트는 워크플로의 정문 역할을 한다. 유일한 임무는 사용자의 자연어 쿼리(예: "오스틴에서 열리는 기업 런칭 파티를 위한 공급업체 찾기")를 받아 주요 매개변수를 추출한 다음, `update_session_state` 도구를 호출하여 저장하는 것이다.

In [5]:
# 이 에이전트의 유일한 임무는 사용자 쿼리를 파싱하는 것이다.
intake_agent = Agent(
    name="intake_agent",
    model="gemini-2.5-flash",
    instruction="""
    당신은 매개변수 추출기다. 사용자 쿼리에서 이벤트 유형(event type)과 도시(city)를 식별하라.
    그런 다음, 추출된 값으로 반드시 `update_session_state` 도구를 호출해야 한다.
    """,
    tools=[update_session_state]
)

## 전문가 에이전트 1 생성: 케이터링 스카우트

이것은 병렬 워크플로를 위한 첫 번째 전문가 에이전트다. 유일한 임무는 `intake_agent`에 의해 세션 상태에 저장된 `{{event_type}}` 및 `{{city}}` 변수를 사용하여 케이터링 옵션을 검색하는 것이다.

In [6]:
catering_scout_agent = Agent(
    name="catering_scout_agent",
    model="gemini-2.5-flash",
    instruction="""
    당신은 케이터링 스카우트다. 당신의 유일한 임무는 {{city}}에서 열리는 {{event_type}}에 적합한
    케이터링 업체 3곳을 찾는 것이다.

    결과를 간단한 목록으로 제시하라.
    """,
    tools=[google_search],
    output_key="catering_options", # 출력을 이 변수에 저장하여 나중에 병합할 수 있게 한다
)

## 전문가 에이전트 2 생성: 엔터테인먼트 스카우트

이것은 두 번째 전문가 에이전트다. 유일한 임무는 엔터테인먼트 옵션을 검색하는 것이다. 이 작업은 케이터링 업체를 찾는 것과 완전히 독립적이므로 병렬 실행에 완벽한 후보이다.

In [7]:
entertainment_scout_agent = Agent(
    name="entertainment_scout_agent",
    model="gemini-2.5-flash",
    instruction="""
    당신은 엔터테인먼트 스카우트다. 당신의 유일한 임무는 {{city}}에서 열리는 {{event_type}}에 적합한
    엔터테인먼트 옵션(예: DJ, 라이브 밴드, 마술사) 3가지를 찾는 것이다.

    결과를 간단한 목록으로 제시하라.
    """,
    tools=[google_search],
    output_key="entertainment_options" # 출력을 이 변수에 저장한다
)

## 병렬 워크플로 구축

여기서는 `ParallelAgent`를 사용하여 두 스카우트 에이전트를 동시에 실행하는 워크플로를 생성한다. ADK는 두 검색을 동시에 실행하여 정보 수집 속도를 크게 높일 수 있다.

In [8]:
# 두 에이전트를 병렬로 실행하는 에이전트 정의
parallel_vendor_search = ParallelAgent(
    name="parallel_vendor_search",
    sub_agents=[catering_scout_agent, entertainment_scout_agent]
)

## 종합(Synthesizer) 에이전트 생성

병렬 검색이 완료된 후, 결과를 하나의 일관된 응답으로 결합할 에이전트가 필요하다. `vendor_coordinator_agent`의 임무는 `catering_options`와 `entertainment_options`를 가져와 체계적인 목록으로 제시하는 것이다.

In [9]:
vendor_coordinator_agent = Agent(
    name="vendor_coordinator_agent",
    model="gemini-2.5-flash",
    instruction="""
    당신은 도움이 되는 공급업체 코디네이터다.

    `{{catering_options}}`와 `{{entertainment_options}}`의 결과를
    사용자를 위해 하나의 체계적인 목록으로 결합하라.
    """
)

## 전체 "Fan-Out, Fan-In" 워크플로 조립

이것은 최종적인 전체 워크플로다. `SequentialAgent`를 사용하여 전체 프로세스를 조율한다. 첫째, `intake_agent`가 쿼리를 파싱하고, 둘째, `parallel_vendor_search`가 공급업체를 찾기 위해 "Fan-Out"(병렬 실행)하며, 셋째, `vendor_coordinator_agent`가 결과를 종합하기 위해 "Fan-In"한다.

In [10]:
full_parallel_workflow = SequentialAgent(
    name="full_parallel_workflow",
    sub_agents=[
        intake_agent,
        parallel_vendor_search,
        vendor_coordinator_agent
    ],
    description="매개변수를 추출하고, 여러 항목을 병렬로 찾은 다음, 결과를 요약하는 워크플로다."
)

## 실행 엔진 구축

이것은 쿼리를 실행하기 위한 헬퍼 함수이며, 이전 글과 변경되지 않았다. Runner를 초기화하고 이벤트를 스트리밍하는 핵심 ADK 로직을 처리한다. 상태는 이제 `intake_agent`에 의해 관리되므로 여기서는 `state_delta`가 필요하지 않다.

In [11]:
async def run_agent_query(agent: Agent, query: str, session: Session, user_id: str):
    """Runner를 초기화하고 주어진 에이전트 및 세션에 대해 쿼리를 실행한다."""
    print(f"\n에이전트 쿼리 실행 중: '{agent.name}', 세션: '{session.id}'...")

    runner = Runner(
        agent=agent,
        session_service=session_service,
        app_name=agent.name
    )

    final_response = ""
    try:
        async for event in runner.run_async(
            user_id=user_id,
            session_id=session.id,
            new_message=Content(parts=[Part(text=query)], role="user")
        ):
            if event.is_final_response():
                final_response = event.content.parts[0].text
    except Exception as e:
        final_response = f"오류 발생: {e}"


    print("\n" + "-"*50)
    print("최종 응답:")
    display(Markdown(final_response))
    print("-"*50 + "\n")

    return final_response

## 세션 초기화 및 워크플로 실행

마지막으로 `InMemorySessionService`와 메인 실행 블록을 설정한다. 세션을 생성한 다음 단일 자연어 쿼리로 `full_parallel_workflow`를 호출한다. Intake 에이전트가 자동으로 매개변수 추출을 처리할 것이다.

In [12]:
# --- 세션 서비스 초기화 ---
# 이 하나의 서비스가 이 노트북의 모든 다른 세션을 관리한다.
session_service = InMemorySessionService()
user_id = "adk_event_planner_001"

In [13]:
async def run_stateful_orchestrator():
  # 세션 생성
  session = await session_service.create_session(
        app_name=full_parallel_workflow.name,
        user_id=user_id
  )
  
  # 사용자 쿼리 정의 (영어 쿼리를 한국어 상황에 맞게 번역)
  query = "서울에서 열리는 기업 런칭 파티를 위한 공급업체를 찾아줘."
  print(f"사용자: {query}\n")
  
  # 워크플로 실행
  await run_agent_query(full_parallel_workflow, query, session, user_id)

# 전체 시스템 실행
await run_stateful_orchestrator()

사용자: 서울에서 열리는 기업 런칭 파티를 위한 공급업체를 찾아줘.


에이전트 쿼리 실행 중: 'full_parallel_workflow', 세션: '5f2f74f7-6466-43f6-8623-a9b54b69c709'...




  [도구 호출] 세션 상태 업데이트 중: event_type='기업 런칭 파티', city='서울'

--------------------------------------------------
최종 응답:


서울 기업 런칭 파티를 위한 추천 공급업체 정보입니다.

### 케이터링 업체

*   **쉐프랑케이터링 (Chefrang Catering)**: 기업 행사, 브랜드 런칭 등 다양한 행사에 적합하며, 르 꼬르동 블루 출신 셰프들이 신선한 식재료로 만든 요리와 품격 있는 컨셉을 제공합니다.
*   **퀴진서울 (Cuisine Seoul)**: 고급 기업, VIP, 의전 패키징 케이터링 서비스를 전문으로 제공하여 기업 미팅, 연회, 오피스 식사, 파티 등 다양한 행사에 적합합니다.
*   **매치케이터링 (Matchcatering)**: 창의성, 최고 품질의 재료, 탁월한 기업 케이터링 및 VIP 프리미엄 케이터링 전문 업체로, 행사 컨셉에 맞춰 공간 디자인과 분위기 연출에도 신경을 씁니다.

### 엔터테인먼트 옵션

*   **전문 DJ**: 런칭 파티의 분위기를 활기차게 이끌고, 참석자들의 취향에 맞는 다양한 음악을 선곡하여 세련된 분위기를 연출할 수 있습니다. 기업 행사 및 런칭 파티 전문 DJ 섭외가 가능합니다.
*   **라이브 밴드 (재즈 밴드 또는 팝 밴드)**: 고급스럽고 감성적인 분위기를 선호한다면 재즈 밴드나 팝 밴드를 섭외하여 라이브 공연을 즐길 수 있습니다. "김혜미 밴드"와 같은 재즈 밴드는 기업 행사에 추천됩니다.
*   **마술사**: 참석자들에게 특별하고 신비로운 경험을 선사할 수 있는 마술 공연은 런칭 파티에 흥미로운 볼거리를 제공합니다. 클로즈업 매직이나 스테이지 마술쇼 등 다양한 형태의 공연이 가능합니다.

--------------------------------------------------

