# LangChain 스트리밍

스트리밍을 사용하면 LLM의 출력이 생성되는 대로 점진적으로 표시할 수 있어 더 나은 사용자 경험을 제공합니다. LangChain 에이전트는 기본적으로 스트리밍 모드로 실행되며, 실시간으로 응답을 제공합니다.

여러 가지 스트리밍 모드가 있으며, 각각 다른 종류의 정보를 제공합니다:

- **`updates`**: 에이전트의 진행 상황 (기본값)
- **`messages`**: LLM의 토큰 스트리밍
- **`custom`**: 커스텀 업데이트

## 사전 준비

환경 변수를 설정합니다.

In [None]:
from dotenv import load_dotenv

load_dotenv(override=True)

## 에이전트 진행 상황 스트리밍

`stream_mode="updates"`는 에이전트의 진행 상황을 추적하는 기본 스트리밍 모드입니다. 각 노드가 실행을 완료할 때마다 업데이트를 생성합니다.

In [None]:
from langchain.agents import create_agent
from langchain_openai import ChatOpenAI
from langchain.tools import tool

@tool
def get_weather(city: str) -> str:
    """Get the weather for a specific city."""
    return f"The weather in {city} is sunny!"

model = ChatOpenAI(model="gpt-4.1-mini")
agent = create_agent(model=model, tools=[get_weather])

# 기본 스트리밍 (updates 모드)
for chunk in agent.stream({"messages": [{"role": "user", "content": "What's the weather in Seoul?"}]}):
    print(chunk)
    print("---")

## LLM 토큰 스트리밍

`stream_mode="messages"`를 사용하면 LLM이 생성하는 토큰을 실시간으로 스트리밍할 수 있습니다. 이는 사용자에게 즉각적인 피드백을 제공하는 데 유용합니다.

In [None]:
# messages 스트리밍 모드
for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "Tell me a short story about a robot."}]},
    stream_mode="messages"  # LLM 토큰 스트리밍
):
    print(chunk, end="", flush=True)

### 실용적인 예제: 타자기 효과

LLM 토큰을 스트리밍하여 타자기처럼 텍스트를 출력하는 실용적인 예제입니다.

In [None]:
import sys
import time

print("AI: ", end="", flush=True)

for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "Write a haiku about technology."}]},
    stream_mode="messages"
):
    # AIMessageChunk에서 텍스트 추출
    if hasattr(chunk, 'content'):
        print(chunk.content, end="", flush=True)
        time.sleep(0.02)  # 타자기 효과를 위한 약간의 지연

print()  # 줄바꿈

## 커스텀 업데이트

`get_stream_writer()`를 사용하면 에이전트 실행 중 커스텀 업데이트를 스트리밍할 수 있습니다. 이는 진행 상황, 중간 결과 또는 디버그 정보를 전송하는 데 유용합니다.

커스텀 업데이트는 `stream_mode="custom"`으로 수신할 수 있습니다.

In [None]:
from langchain.tools import tool, ToolRuntime

@tool
def process_data(data_size: int, runtime: ToolRuntime) -> str:
    """Process data with progress updates."""
    writer = runtime.get_stream_writer()
    
    # 진행 상황을 커스텀 업데이트로 전송
    for i in range(0, data_size, 10):
        progress = min(i + 10, data_size)
        writer({"progress": progress, "total": data_size})
    
    return f"Processed {data_size} items successfully!"

agent = create_agent(model=model, tools=[process_data])

# 커스텀 스트리밍 모드로 진행 상황 추적
for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "Process 50 items of data"}]},
    stream_mode="custom"  # 커스텀 업데이트 수신
):
    if "progress" in chunk:
        percentage = (chunk["progress"] / chunk["total"]) * 100
        print(f"Progress: {chunk['progress']}/{chunk['total']} ({percentage:.0f}%)")

## 다중 스트리밍 모드

여러 스트리밍 모드를 동시에 사용할 수 있습니다. 리스트로 전달하면 각 모드의 업데이트를 모두 받을 수 있습니다.

In [None]:
# 여러 스트리밍 모드 동시 사용
for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "Process 30 items"}]},
    stream_mode=["updates", "custom"]  # 여러 모드 동시 사용
):
    # chunk는 (stream_mode, data) 튜플 형태
    mode, data = chunk
    
    if mode == "updates":
        print(f"[UPDATE] Node completed: {list(data.keys())}")
    elif mode == "custom":
        if "progress" in data:
            print(f"[PROGRESS] {data['progress']}/{data['total']}")

## 스트리밍 비활성화

개별 모델 또는 도구에 대해 스트리밍을 비활성화하려면 해당 객체를 생성할 때 `streaming=False`를 설정합니다.

In [None]:
from langchain_openai import ChatOpenAI

# 스트리밍 비활성화된 모델
non_streaming_model = ChatOpenAI(model="gpt-4.1-mini", streaming=False)

agent = create_agent(model=non_streaming_model, tools=[get_weather])

# messages 모드를 사용해도 토큰이 스트리밍되지 않음
for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "What's the weather in Tokyo?"}]},
    stream_mode="messages"
):
    # 전체 응답이 한 번에 전달됨
    print(chunk)

## 종합 예제: 진행률 바가 있는 데이터 처리

여러 스트리밍 기능을 결합한 실용적인 예제입니다.

In [None]:
import time
from langchain.tools import tool, ToolRuntime
from langchain.agents import create_agent
from langchain_openai import ChatOpenAI

@tool
def analyze_data(dataset_name: str, num_records: int, runtime: ToolRuntime) -> str:
    """Analyze a dataset with detailed progress reporting."""
    writer = runtime.get_stream_writer()
    
    # 단계별 분석 프로세스
    steps = [
        ("loading", "Loading data", 0.2),
        ("cleaning", "Cleaning data", 0.3),
        ("processing", "Processing data", 0.3),
        ("finalizing", "Finalizing results", 0.2)
    ]
    
    for step_name, step_desc, duration in steps:
        writer({
            "step": step_name,
            "description": step_desc,
            "status": "started"
        })
        
        time.sleep(duration)  # 작업 시뮬레이션
        
        writer({
            "step": step_name,
            "description": step_desc,
            "status": "completed"
        })
    
    return f"Successfully analyzed {num_records} records from {dataset_name}!"

@tool
def get_summary(analysis_result: str, runtime: ToolRuntime) -> str:
    """Generate a summary of the analysis."""
    return f"Analysis complete. {analysis_result}"

model = ChatOpenAI(model="gpt-4.1-mini")
agent = create_agent(model=model, tools=[analyze_data, get_summary])

print("Starting data analysis...\n")

# 다중 스트리밍 모드로 실행
for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "Analyze the sales dataset with 1000 records"}]},
    stream_mode=["custom", "updates"]
):
    mode, data = chunk
    
    if mode == "custom":
        # 커스텀 진행 상황 표시
        if "step" in data:
            status_icon = "✓" if data["status"] == "completed" else "→"
            print(f"{status_icon} {data['description']}... {data['status']}")
    
    elif mode == "updates":
        # 노드 완료 정보 (선택적으로 표시)
        if "messages" in data:
            last_msg = data["messages"][-1]
            if hasattr(last_msg, "content") and last_msg.content:
                print(f"\n[Result] {last_msg.content}")

print("\nAnalysis finished!")

## 실전 팁

### 1. 적절한 스트리밍 모드 선택

- **`updates`**: 에이전트의 전체적인 진행 상황을 추적할 때
- **`messages`**: 사용자에게 LLM의 응답을 실시간으로 보여줄 때
- **`custom`**: 복잡한 작업의 세부 진행 상황을 보고할 때

In [None]:
# 채팅 인터페이스에 적합한 설정
def chat_interface():
    for chunk in agent.stream(
        {"messages": [{"role": "user", "content": "Hello!"}]},
        stream_mode="messages"  # 실시간 응답 표시
    ):
        if hasattr(chunk, 'content'):
            yield chunk.content

# 백그라운드 작업 모니터링에 적합한 설정
def background_task():
    for chunk in agent.stream(
        {"messages": [{"role": "user", "content": "Process data"}]},
        stream_mode=["updates", "custom"]  # 진행 상황 추적
    ):
        mode, data = chunk
        # 진행 상황을 데이터베이스나 로그에 기록
        pass

### 2. 에러 처리

스트리밍 중 에러가 발생할 수 있으므로 적절한 에러 처리가 중요합니다.

In [None]:
try:
    for chunk in agent.stream(
        {"messages": [{"role": "user", "content": "Test query"}]},
        stream_mode="messages"
    ):
        print(chunk, end="", flush=True)
except Exception as e:
    print(f"\nError during streaming: {e}")

### 3. 성능 최적화

불필요한 스트리밍 모드를 사용하지 않으면 성능이 향상됩니다.

In [None]:
# 좋은 예: 필요한 모드만 사용
for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "Query"}]},
    stream_mode="messages"  # 필요한 모드만
):
    pass

# 나쁜 예: 모든 모드 사용 (불필요한 오버헤드)
# for chunk in agent.stream(
#     {"messages": [{"role": "user", "content": "Query"}]},
#     stream_mode=["updates", "messages", "custom"]
# ):
#     pass