### Simple Sequential Workflow

이 과정에서는 Agent Framework Workflows를 사용하여 간단한 순차 워크플로우를 만드는 방법을 보여줍니다.

순차적 워크플로우는 복잡한 AI 에이전트 시스템을 구축하는 데 있어 기본 토대입니다. 여기서는 각 단계에서 데이터를 처리하고 다음 단계로 전달하는 간단한 2단계 워크플로우를 만드는 방법을 보여줍니다.

#### 개요
이 과정에서는 2개의 실행기(executor)를 포함하는 워크플로우를 생성합니다.  

대문자 변환 실행기 - 입력 텍스트를 대문자로 변환합니다.  
역방향 텍스트 실행기 - 텍스트를 역방향으로 출력합니다.  

이 워크플로우는 다음의 핵심 개념을 보여줍니다.

- 작업 단위(Executor 노드)를 정의하는 두 가지 방법:
    1. @handler로 표시된 비동기 메서드를 사용하여 Executor를 하위 클래스로 분류하는 커스텀 클래스
    2. @executor로 데코레이트된 독립형 비동기 함수

- WorkflowBuilder와 executor를 연결하기
- ctx.send_message()를 사용해서 단계들 간에 데이터 전달하기
- ctx.yield_output()으로 최종 출력 생성하기.
- 실시간 옵저빌리티를 위해 이벤트 스트리밍하기.


#### 1단계: 필요한 모듈 가져오기

먼저 Agent Framework에서 필요한 모듈을 가져옵니다.

In [4]:
import asyncio
from typing_extensions import Never
from agent_framework import WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, Executor, handler

#### 2단계: 첫 번째 Executor 생성

처리기 메서드를 포함하는 Executor를 구현하여 텍스트를 대문자로 변환하는 실행기를 생성합니다.

In [5]:
class UpperCase(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)

    @handler
    async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
        """Convert the input to uppercase and forward it to the next node.

        Note: The WorkflowContext is parameterized with the type this handler will
        emit. Here WorkflowContext[str] means downstream nodes should expect str.
        """
        result = text.upper()

        # Send the result to the next executor in the workflow.
        await ctx.send_message(result)

**핵심 사항:**

- Executor 서브클래싱을 활용하면 필요에 따라 라이프사이클 후크(hook)가 있는 명명된 노드를 정의할 수 있습니다.
- @handler 데코레이터는 작업을 수행하는 비동기 메서드를 나타냅니다.
- 처리기(handler)의 시그니처는 다음과 같아야 합니다.
    - 첫 번째 매개변수는 이 노드에 대해 입력 type 형식입니다(여기서는: text: str).
    - 두 번째 매개변수는 WorkflowContext[T_Out]이며, T_Out은 노드가 ctx.send_message()를 통해 내보낼 데이터의 형식입니다 (여기서는 str).
- 처리기 내부에서는 일반적으로 결과를 계산하고 ctx.send_message(result)를 사용해서 이를 하위 노드로 전달합니다.

#### 3단계: 두 번째 Executor 생성

간단하게 작성하려면, 서브클래싱을 생략하고 앞서와 동일한 시그니처 패턴(입력 type + WorkflowContext)을 갖는 비동기 함수를 정의하고, 앞 쪽에 @executor를 추가하면 됩니다 이렇게 하면 플로우에 연결할 수 있는 완전한 기능을 갖춘 노드가 생성됩니다.


In [7]:
@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[Never, str]) -> None:
    """Reverse the input and yield the workflow output."""
    result = text[::-1]

    # Yield the final output for this workflow run
    await ctx.yield_output(result)

**핵심 사항**

- @executor 데코레이터는 독립적인 비동기 함수를 워크플로우 노드로 변환합니다.
- WorkflowContext는 매개변수로 2개의 형식을 갖습니다.
    - T_Out = Never : 이 노드는 하위 노드로 메시지를 보내지 않습니다.
    - T_W_Out = str : 이 노드는 str 형식의 워크플로우 출력을 생성합니다.
- 터미널 노드는 ctx.yield_output()를 사용하여 출력을 생성하고 이를 워크플로우 결과로 제공합니다.
- 워크플로우는 작업이 더 이상 진행되지 않는, 유휴 상태가 되면 완료됩니다.

### 4단계: 워크플로우 구축

WorkflowBuilder를 사용하여 executor를 연결합니다.


In [8]:
upper_case = UpperCase(id="upper_case_executor")

workflow = (
    WorkflowBuilder()
    .add_edge(upper_case, reverse_text)
    .set_start_executor(upper_case)
    .build()
)

**핵심 요점**

- add_edge()를 사용해서 executor 간의 직접적인 연결을 생성합니다  
- set_start_executor()를 사용해서 진입점을 정의합니다  
- build()를 사용해서 워크플로우 구성을 완료합니다  

### 5단계: 스트리밍을 사용하여 워크플로우를 실행합니다.

워크플로우를 실행하고 이벤트를 실시간으로 관찰합니다.

In [10]:
async def main():
    # Run the workflow and stream events
    async for event in workflow.run_stream("hello world"):
        print(f"Event: {event}")
        if isinstance(event, WorkflowOutputEvent):
            print(f"Workflow completed with result: {event.data}")

if __name__ == "__main__":
    await main()

Event: WorkflowStartedEvent(origin=WorkflowEventSource.FRAMEWORK, data=None)
Event: WorkflowStatusEvent(state=WorkflowRunState.IN_PROGRESS, data=None, origin=WorkflowEventSource.FRAMEWORK)
Event: ExecutorInvokedEvent(executor_id=upper_case_executor, data=hello world)
Event: ExecutorCompletedEvent(executor_id=upper_case_executor, data=['HELLO WORLD'])
Event: SuperStepStartedEvent(iteration=1, data=None)
Event: ExecutorInvokedEvent(executor_id=reverse_text_executor, data=HELLO WORLD)
Event: WorkflowOutputEvent(data=DLROW OLLEH, executor_id=reverse_text_executor)
Workflow completed with result: DLROW OLLEH
Event: ExecutorCompletedEvent(executor_id=reverse_text_executor, data=['DLROW OLLEH'])
Event: SuperStepCompletedEvent(iteration=1, data=None)
Event: WorkflowStatusEvent(state=WorkflowRunState.IDLE, data=None, origin=WorkflowEventSource.FRAMEWORK)


### 6단계: 출력 결과 이해하기

워크플로우를 실행하면 다음과 같은 이벤트가 표시됩니다.

### 핵심 개념 설명

#### Executor를 정의하는 두 가지 방법

1. 사용자 정의 클래스(Executor 서브클래싱) : 라이프사이클 후크나 복잡한 상태 관리가 필요할 때 가장 적합합니다. @handler 데코레이터를 사용하여 비동기 메서드를 정의합니다.
2. 함수 기반(@executor 데코레이터) : 간단한 구현에 적합합니다. 동일한 시그니처 패턴을 가진 독립적인 비동기 함수를 정의할 수 있습니다.

두 접근 방식 모두 동일한 처리기 시그니처를 사용합니다.  
- 첫 번째 매개변수: 이 노드에 대한 입력 형식  
- 두 번째 매개변수: WorkflowContext[T_Out, T_W_Out]  

#### 워크플로우 컨텍스트 유형
WorkflowContext 제네릭 형식은 실행기 간에 어떤 데이터가 흐르는지를 정의합니다.

- WorkflowContext[T_Out] - ctx.send_message()를 통해서 하위 노드로 T_Out 형식의 메시지를 전송하는 노드에 사용됩니다.
- WorkflowContext[T_Out, T_W_Out] - ctx.yield_output()을 통해서 T_W_Out 형식의 워크플로우 출력도 생성하는 노드에 사용됩니다.
- 형식 매개변수가 없는 WorkflowContext는 WorkflowContext[Never, Never]와 동일하며, 이는 이 노드가 하위 노드로 메시지를 보내지도 않고 워크플로우 출력을 생성하지도 않음을 의미합니다.

#### 이벤트 유형
스트리밍 실행 중에는 다음과 같은 이벤트 유형을 살펴 볼 수 있습니다.

- ExecutorInvokedEvent- executor가 처리를 시작할 때
- ExecutorCompletedEvent- executor가 처리를 완료했을 때
- WorkflowOutputEvent- 최종 워크플로우 결과가 포함되어 있습니다.

#### 파이썬 워크플로우 빌더 패턴

WorkflowBuilder는 워크플로우 구축을 위한 직관적인 API를 제공합니다.

- add_edge() : executor간에 직접적인 연결을 생성합니다.
- set_start_executor() : 워크플로우 진입점을 정의합니다.
- build() : 불변의 워크플로우 개체를 만들고 반환합니다.

