In [None]:
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from prompts import JSON_PROMPT

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.0)
parser = StrOutputParser() 
prompt = ChatPromptTemplate.from_template(JSON_PROMPT)
json_chain =  prompt | llm | parser


test_instruction = "콘텐츠 제작을 효율적으로 하는 시스템을 구축해줘. {텍스트작성Agent}, {이미지생성Agent}, {동영상편집Agent}가 동시에 작업하고 {콘텐츠통합Agent}가 최종 결과물을 만들도록 해"
result = json_chain.invoke({"instruction": test_instruction})
print(result)

{"flow_name": "ContentCreationPipeline", "type": "Sequential", "sub_agents": [{"flow": {"flow_name": "ConcurrentContentCreation", "type": "Parallel", "sub_agents": [{"agent_name": "텍스트작성Agent"}, {"agent_name": "이미지생성Agent"}, {"agent_name": "동영상편집Agent"}]}}, {"agent_name": "콘텐츠통합Agent"}]}


## JsonOutputParser

In [5]:
from langchain_core.output_parsers import JsonOutputParser
from typing import Any, Dict, List, Literal, Optional
from pydantic import BaseModel, Field


class Flow(BaseModel):
    """
    단일 공개 모델.
    - type == 'LLM'       → tools 필수, sub_agents 금지
    - type in others      → sub_agents 필수, tools 금지
    - sub_agents 원소는 {"agent_name": str} 또는 {"flow": Flow} (재귀)
    - 모든 dict는 예상 키만 허용
    """

    flow_name: str = Field(description="플로우 이름")
    type: Literal["LLM", "Sequential", "Loop", "Parallel"]

    # 단일 모델을 유지하기 위해 도메인 타입을 dict로 받고, 후처리 검증으로 엄격히 검사
    tools: Optional[List[Dict[str, Any]]] = Field(
        default=None, description="LLM 전용: [{'agent_name': str}, ...]"
    )
    sub_agents: Optional[List[Dict[str, Any]]] = Field(
        default=None, description="Sequential/Loop/Parallel 전용: [{'agent_name': str} | {'flow': Flow}, ...]"
    )


parser_1 = JsonOutputParser(pydantic_object=Flow)
# print(parser.get_format_instructions())

prompt_1 = ChatPromptTemplate.from_template(JSON_PROMPT)
prompt_1 = prompt_1.partial(format_instructions=parser_1.get_format_instructions())
chain_1 = prompt_1 | llm | parser_1  # 체인을 구성합니다.

result_1 = chain_1.invoke({"instruction": test_instruction})
print(result_1)

{'flow_name': 'ContentCreationPipeline', 'type': 'Sequential', 'sub_agents': [{'flow': {'flow_name': 'ConcurrentContentCreation', 'type': 'Parallel', 'sub_agents': [{'agent_name': '텍스트작성Agent'}, {'agent_name': '이미지생성Agent'}, {'agent_name': '동영상편집Agent'}]}}, {'agent_name': '콘텐츠통합Agent'}]}


## PydanticParser

In [12]:
from langchain_core.output_parsers import PydanticOutputParser
from typing import Any, Dict, List, Literal, Optional
from pydantic import BaseModel, Field


class Flow(BaseModel):
    """
    단일 공개 모델.
    - type == 'LLM'       → tools 필수, sub_agents 금지
    - type in others      → sub_agents 필수, tools 금지
    - sub_agents 원소는 {"agent_name": str} 또는 {"flow": Flow} (재귀)
    - 모든 dict는 예상 키만 허용
    """

    flow_name: str = Field(description="플로우 이름")
    type: Literal["LLM", "Sequential", "Loop", "Parallel"]

    # 단일 모델을 유지하기 위해 도메인 타입을 dict로 받고, 후처리 검증으로 엄격히 검사
    tools: Optional[List[Dict[str, Any]]] = Field(
        default=None, description="LLM 전용: [{'agent_name': str}, ...]"
    )
    sub_agents: Optional[List[Dict[str, Any]]] = Field(
        default=None, description="Sequential/Loop/Parallel 전용: [{'agent_name': str} | {'flow': Flow}, ...]"
    )

    

# Parser 생성
parser_2 = PydanticOutputParser(pydantic_object=Flow)

# Prompt 템플릿 준비
prompt_2 = ChatPromptTemplate.from_template(JSON_PROMPT)
prompt_2 = prompt_2.partial(format_instructions=parser_2.get_format_instructions())

# 체인 구성
chain_2 = prompt_2 | llm | parser_2

# 실행
result: Flow = chain_2.invoke({"instruction": test_instruction})


# 2. Dict 변환
result_dict = result.dict()
print(result_dict)

{'flow_name': 'ContentCreationPipeline', 'type': 'Sequential', 'tools': None, 'sub_agents': [{'flow': {'flow_name': 'ConcurrentContentCreation', 'type': 'Parallel', 'sub_agents': [{'agent_name': '텍스트작성Agent'}, {'agent_name': '이미지생성Agent'}, {'agent_name': '동영상편집Agent'}]}}, {'agent_name': '콘텐츠통합Agent'}]}


## Structured Output

In [15]:
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field

class Flow(BaseModel):
    """
    단일 공개 모델.
    - type == 'LLM'       → tools 필수, sub_agents 금지
    - type in others      → sub_agents 필수, tools 금지
    - sub_agents 원소는 {"agent_name": str} 또는 {"flow": Flow} (재귀)
    - 모든 dict는 예상 키만 허용
    """

    flow_name: str = Field(description="플로우 이름")
    type: Literal["LLM", "Sequential", "Loop", "Parallel"]

    # 단일 모델을 유지하기 위해 도메인 타입을 dict로 받고, 후처리 검증으로 엄격히 검사
    tools: Optional[List[Dict[str, Any]]] = Field(
        default=None, description="LLM 전용: [{'agent_name': str}, ...]"
    )
    sub_agents: Optional[List[Dict[str, Any]]] = Field(
        default=None, description="Sequential/Loop/Parallel 전용: [{'agent_name': str} | {'flow': Flow}, ...]"
    )


llm_with_strucutre = llm.with_structured_output(Flow)
# Bind responseformatter schema as a tool to the model

prompt_3 = ChatPromptTemplate.from_template(JSON_PROMPT)
chain_3 = prompt_3 | llm_with_strucutre

ai_msg = chain_3.invoke({"instruction": test_instruction})

result_dict = ai_msg.dict()
print(result_dict)

{'flow_name': 'ContentCreationPipeline', 'type': 'Sequential', 'tools': None, 'sub_agents': [{'flow': {'flow_name': 'SimultaneousContentCreation', 'type': 'Parallel', 'sub_agents': [{'agent_name': '텍스트작성Agent'}, {'agent_name': '이미지생성Agent'}, {'agent_name': '동영상편집Agent'}]}}, {'agent_name': '콘텐츠통합Agent'}]}


In [24]:
import time
import pandas as pd
from typing import Any, Dict, List, Literal, Optional
from pydantic import BaseModel, Field
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import JsonOutputParser, PydanticOutputParser, StrOutputParser
from langchain_openai import ChatOpenAI
from prompts import JSON_PROMPT

# ==== 모델 정의 ====
class Flow(BaseModel):
    """
    단일 공개 모델.
    - type == 'LLM'       → tools 필수, sub_agents 금지
    - type in others      → sub_agents 필수, tools 금지
    - sub_agents 원소는 {"agent_name": str} 또는 {"flow": Flow} (재귀)
    - 모든 dict는 예상 키만 허용
    """
    flow_name: str = Field(description="플로우 이름")
    type: Literal["LLM", "Sequential", "Loop", "Parallel"]
    tools: Optional[List[Dict[str, Any]]] = Field(
        default=None, description="LLM 전용: [{'agent_name': str}, ...]"
    )
    sub_agents: Optional[List[Dict[str, Any]]] = Field(
        default=None, description="Sequential/Loop/Parallel 전용: [{'agent_name': str} | {'flow': Flow}, ...]"
    )


# ==== 공통 세팅 ====
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
prompt = ChatPromptTemplate.from_template(JSON_PROMPT)
test_instruction = "콘텐츠 제작을 효율적으로 하는 시스템을 구축해줘. {텍스트작성Agent}, {이미지생성Agent}, {동영상편집Agent}가 동시에 작업하고 {콘텐츠통합Agent}가 최종 결과물을 만들도록 해"


# ==== 0. Baseline: StrOutputParser ====
parser_0 = StrOutputParser()
prompt_0 = ChatPromptTemplate.from_template((JSON_PROMPT))
chain_0 = prompt_0 | llm | parser_0


# ==== 1. JsonOutputParser ====
parser_1 = JsonOutputParser(pydantic_object=Flow)
prompt_1 = ChatPromptTemplate.from_template(JSON_PROMPT).partial(
    format_instructions=parser_1.get_format_instructions()
)
chain_1 = prompt_1 | llm | parser_1


# ==== 2. PydanticOutputParser ====
parser_2 = PydanticOutputParser(pydantic_object=Flow)
prompt_2 = ChatPromptTemplate.from_template(JSON_PROMPT).partial(
    format_instructions=parser_2.get_format_instructions()
)
chain_2 = prompt_2 | llm | parser_2


# ==== 3. with_structured_output ====
llm_structured = llm.with_structured_output(Flow)
prompt_3 = ChatPromptTemplate.from_template("Instruction: {instruction}")
chain_3 = prompt_3 | llm_structured


# ==== 성능 테스트 ====
def run_chain(chain, name: str, n: int = 5):
    success, fail, durations = 0, 0, []
    results = []

    for i in range(n):
        start = time.time()
        try:
            result = chain.invoke({"instruction": test_instruction})

            if isinstance(result, BaseModel):
                result = result.dict()
            print(result)

            results.append(result)
            success += 1
        except Exception as e:
            print(f"[{name}] Run {i+1} failed: {e}")
            fail += 1
        durations.append(time.time() - start)

    avg_time = sum(durations) / len(durations)
    return {
        "Method": name,
        "Success": success,
        "Fail": fail,
        "Avg_Time(s)": round(avg_time, 2),
        "Samples": results[:2],  # 샘플 2개만 표시
    }


if __name__ == "__main__":
    N = 1  # 반복 횟수
    results = []
    results.append(run_chain(chain_0, "Baseline(StrOutputParser)", N))
    results.append(run_chain(chain_1, "JsonOutputParser", N))
    results.append(run_chain(chain_2, "PydanticOutputParser", N))
    results.append(run_chain(chain_3, "with_structured_output", N))

    # 표 출력
    df = pd.DataFrame(results)
    print("\n=== 성능 비교 결과 ===")
    print(df[["Method", "Success", "Fail", "Avg_Time(s)"]])

{"flow_name": "ContentCreationPipeline", "type": "Sequential", "sub_agents": [{"flow": {"flow_name": "ConcurrentContentCreation", "type": "Parallel", "sub_agents": [{"agent_name": "텍스트작성Agent"}, {"agent_name": "이미지생성Agent"}, {"agent_name": "동영상편집Agent"}]}}, {"agent_name": "콘텐츠통합Agent"}]}
{'flow_name': 'ContentCreationPipeline', 'type': 'Sequential', 'sub_agents': [{'flow': {'flow_name': 'ConcurrentContentCreation', 'type': 'Parallel', 'sub_agents': [{'agent_name': '텍스트작성Agent'}, {'agent_name': '이미지생성Agent'}, {'agent_name': '동영상편집Agent'}]}}, {'agent_name': '콘텐츠통합Agent'}]}
{'flow_name': 'ContentCreationPipeline', 'type': 'Sequential', 'tools': None, 'sub_agents': [{'flow': {'flow_name': 'ConcurrentContentCreation', 'type': 'Parallel', 'sub_agents': [{'agent_name': '텍스트작성Agent'}, {'agent_name': '이미지생성Agent'}, {'agent_name': '동영상편집Agent'}]}}, {'agent_name': '콘텐츠통합Agent'}]}
{'flow_name': '콘텐츠 제작 시스템', 'type': 'Parallel', 'tools': None, 'sub_agents': [{'agent_name': '텍스트작성Agent'}, {'agent_na