# Lab 6: MAF Dev UI - 통합 워크플로우 예제

## 개요

하나의 워크플로우에서 **Sequential → Concurrent** 패턴을 모두 체험합니다.

**워크플로우 구조:**
```
Planner (순차) → Broadcast → (Culture || Food || Nature) 병렬 → Combiner → Finalizer
```

---

## 1. 환경 설정

In [None]:
# 라이브러리 import
import json
import threading
from dataclasses import dataclass

from azure.identity import (
    AzureCliCredential,
    ChainedTokenCredential,
    ManagedIdentityCredential,
)
from azure.identity.aio import (
    AzureCliCredential as AsyncAzureCliCredential,
    ChainedTokenCredential as AsyncChainedTokenCredential,
    ManagedIdentityCredential as AsyncManagedIdentityCredential,
)

from agent_framework import WorkflowBuilder, WorkflowContext, executor
from agent_framework.azure import AzureAIAgentClient
from agent_framework.devui import serve

print("✅ 라이브러리 로드 완료")

In [None]:
# Config 로드
with open('config.json', 'r') as f:
    config = json.load(f)

PROJECT_CONNECTION_STRING = config['project_connection_string']
MODEL_DEPLOYMENT_NAME = config.get('model_deployment_name', 'gpt-4o')

endpoint = PROJECT_CONNECTION_STRING.split(';')[0]

print(f"✅ Endpoint: {endpoint}")
print(f"   Model: {MODEL_DEPLOYMENT_NAME}")

In [None]:
# Agent Client 초기화 (Tracing 활성화)
from azure.ai.projects.aio import AIProjectClient

async_credential = AsyncChainedTokenCredential(
    AsyncManagedIdentityCredential(),
    AsyncAzureCliCredential()
)

# Connection String 파싱
# 형식: "endpoint;subscription_id;resource_group;project_name"
parts = PROJECT_CONNECTION_STRING.split(';')
subscription_id = parts[1] if len(parts) > 1 else None
resource_group = parts[2] if len(parts) > 2 else None
project_name = parts[3] if len(parts) > 3 else None

# AI Project Client for Tracing
project_client = AIProjectClient(
    endpoint=endpoint,
    subscription_id=subscription_id,
    resource_group_name=resource_group,
    project_name=project_name,
    credential=async_credential
)

agent_client = AzureAIAgentClient(
    project_endpoint=endpoint,
    model_deployment_name=MODEL_DEPLOYMENT_NAME,
    async_credential=async_credential,
    project_client=project_client  # Tracing 활성화
)

# Agent 생성
travel_agent = agent_client.create_agent(
    name="TravelAgent",
    instructions="여행 정보 전문가입니다. 간결하게 답변하세요."
)

print("✅ Agent 초기화 완료")

## 2. 통합 워크플로우 정의

In [None]:
# 컨텍스트 정의
@dataclass
class TravelWorkflowContext(WorkflowContext):
    destination: str = ""
    initial_plan: str = ""
    culture_info: str = ""
    food_info: str = ""
    nature_info: str = ""
    final_guide: str = ""

print("✅ 컨텍스트 정의 완료")

In [None]:
# 노드 정의

# Sequential 노드
@executor(id="planner")
async def planner_node(context: TravelWorkflowContext, ctx: WorkflowContext) -> None:
    """1단계: 초안 계획 작성 (순차)"""
    print(f"\n📝 [Planner] {context.destination} 여행 계획 작성 중...")
    
    thread = travel_agent.get_new_thread()
    result = await travel_agent.run(
        f"{context.destination} 2박3일 여행 개요를 간단히 작성해주세요.",
        thread=thread
    )
    context.initial_plan = result.text if hasattr(result, 'text') else str(result)
    
    print("✅ [Planner] 완료")
    await ctx.send_message(context, target_id="broadcast")

# Broadcast 노드
@executor(id="broadcast")
async def broadcast_node(context: TravelWorkflowContext, ctx: WorkflowContext) -> None:
    """2단계: 병렬 분석 시작"""
    print("\n📢 [Broadcast] 병렬 분석 시작...")
    await ctx.send_message(context, target_id="culture")
    await ctx.send_message(context, target_id="food")
    await ctx.send_message(context, target_id="nature")
    print("✅ [Broadcast] 완료")

# Concurrent 노드들
@executor(id="culture")
async def culture_node(context: TravelWorkflowContext, ctx: WorkflowContext) -> None:
    """3-1단계: 문화 분석 (병렬)"""
    print("🏛️ [Culture] 분석 중...")
    thread = travel_agent.get_new_thread()
    result = await travel_agent.run(
        f"{context.destination} 주요 문화/역사 명소 3곳 추천",
        thread=thread
    )
    context.culture_info = result.text if hasattr(result, 'text') else str(result)
    await ctx.send_message(context, target_id="combiner")
    print("✅ [Culture] 완료")

@executor(id="food")
async def food_node(context: TravelWorkflowContext, ctx: WorkflowContext) -> None:
    """3-2단계: 음식 분석 (병렬)"""
    print("🍜 [Food] 분석 중...")
    thread = travel_agent.get_new_thread()
    result = await travel_agent.run(
        f"{context.destination} 대표 음식 3가지 추천",
        thread=thread
    )
    context.food_info = result.text if hasattr(result, 'text') else str(result)
    await ctx.send_message(context, target_id="combiner")
    print("✅ [Food] 완료")

@executor(id="nature")
async def nature_node(context: TravelWorkflowContext, ctx: WorkflowContext) -> None:
    """3-3단계: 자연 분석 (병렬)"""
    print("🌿 [Nature] 분석 중...")
    thread = travel_agent.get_new_thread()
    result = await travel_agent.run(
        f"{context.destination} 자연 경관 3곳 추천",
        thread=thread
    )
    context.nature_info = result.text if hasattr(result, 'text') else str(result)
    await ctx.send_message(context, target_id="combiner")
    print("✅ [Nature] 완료")

# Combiner 노드
@executor(id="combiner")
async def combiner_node(context: TravelWorkflowContext, ctx: WorkflowContext) -> None:
    """4단계: 결과 통합"""
    print("\n🔗 [Combiner] 결과 통합 중...")
    context.final_guide = f"""
# {context.destination} 여행 가이드

## 개요
{context.initial_plan}

## 🏛️ 문화 & 역사
{context.culture_info}

## 🍜 음식
{context.food_info}

## 🌿 자연
{context.nature_info}
"""
    print("✅ [Combiner] 완료")
    await ctx.send_message(context, target_id="finalizer")

# Finalizer 노드
@executor(id="finalizer")
async def finalizer_node(context: TravelWorkflowContext, ctx: WorkflowContext) -> None:
    """5단계: 최종 출력"""
    print("\n✨ [Finalizer] 최종 정리 중...")
    await ctx.yield_output(context)
    print("✅ [Finalizer] 완료")

print("✅ 노드 정의 완료")

In [None]:
# 워크플로우 빌드
travel_workflow = (
    WorkflowBuilder(name="Travel Guide Workflow")
    # Sequential 부분
    .set_start_executor(planner_node)
    .add_edge(planner_node, broadcast_node)
    # Concurrent 부분 (Fan-out)
    .add_edge(broadcast_node, culture_node)
    .add_edge(broadcast_node, food_node)
    .add_edge(broadcast_node, nature_node)
    # Concurrent → Sequential (Fan-in)
    .add_edge(culture_node, combiner_node)
    .add_edge(food_node, combiner_node)
    .add_edge(nature_node, combiner_node)
    # 최종 단계
    .add_edge(combiner_node, finalizer_node)
    .build()
)

print("✅ 워크플로우 빌드 완료")
print("   Planner → Broadcast → (Culture || Food || Nature) → Combiner → Finalizer")

## 3. Dev UI 서버 시작

In [None]:
# Dev UI 서버 시작
import socket

def is_port_in_use(port):
    """포트 사용 여부 확인"""
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        return s.connect_ex(('127.0.0.1', port)) == 0

def start_devui():
    print("="*70)
    print("🌐 Dev UI 서버 시작 중...")
    print("="*70)
    print("\n🔗 브라우저에서 http://localhost:8080 접속\n")
    
    serve(
        entities=[travel_workflow],
        port=8080,
        host='127.0.0.1',
        auto_open=False,
        ui_enabled=True,
        tracing_enabled=True  # ✅ Tracing 활성화
    )

# 포트 사용 확인
if is_port_in_use(8080):
    print("⚠️  포트 8080이 이미 사용 중입니다.")
    print("   Dev UI 서버가 이미 실행 중이거나 다른 프로세스가 사용 중입니다.")
    print("\n✅ 기존 서버 사용: http://localhost:8080")
    print("\n💡 서버를 재시작하려면:")
    print("   1. Jupyter Notebook 커널 재시작 (Kernel > Restart)")
    print("   2. 또는 터미널에서: lsof -ti:8080 | xargs kill -9")
else:
    # 백그라운드 스레드에서 실행
    server_thread = threading.Thread(target=start_devui, daemon=True)
    server_thread.start()

    import time
    time.sleep(2)

    print("✅ Dev UI 서버 실행 중!")
    print("   http://localhost:8080")
    print("\n💡 사용 방법:")
    print("   1. 브라우저에서 위 URL 접속")
    print("   2. 'Run' 버튼 클릭")
    print("   3. Input: {\"destination\": \"제주도\"}")
    print("   4. 워크플로우 실행 모니터링!")

{
    "resource_metrics": [
        {
            "resource": {
                "attributes": {
                    "telemetry.sdk.language": "python",
                    "telemetry.sdk.name": "opentelemetry",
                    "telemetry.sdk.version": "1.37.0",
                    "service.name": "agent_framework"
                },
                "schema_url": ""
            },
            "scope_metrics": [
                {
                    "scope": {
                        "name": "agent_framework",
                        "version": "1.0.0b251016",
                        "schema_url": "",
                        "attributes": null
                    },
                    "metrics": [
                        {
                            "name": "gen_ai.client.token.usage",
                            "description": "Captures the token usage of chat clients",
                            "unit": "tokens",
                            "data": {
                            

## 4. (선택) 노트북에서 직접 실행

**⚠️ Dev UI에서 실행하는 것을 권장합니다!**

In [None]:
# 노트북에서 워크플로우 실행 (참고용)
async def run_workflow():
    context = TravelWorkflowContext(destination="부산")
    
    print("🎯 워크플로우 실행 시작...\n")
    
    outputs = []
    async for event in travel_workflow.run_stream(context):
        if hasattr(event, 'output') and event.output is not None:
            outputs.append(event.output)
    
    result = outputs[-1] if outputs else context
    
    print("\n" + "="*70)
    print("✅ 실행 완료!")
    print("="*70)
    print(result.final_guide)
    
    return result

# 실행
# result = await run_workflow()

print("💡 위 셀의 주석을 해제하여 실행하거나")
print("   Dev UI에서 실행하세요!")

---

## ✅ 완료!

**워크플로우 구조:**
- **Sequential**: Planner → Broadcast
- **Concurrent**: Culture || Food || Nature (병렬 실행)
- **Sequential**: Combiner → Finalizer

**Dev UI에서 확인:**
1. http://localhost:8080 접속
2. 그래프 시각화 확인
3. Run 버튼으로 실행
4. 노드별 상태 실시간 모니터링 🎉