# 스트리밍 (Streaming)

LangChain의 **스트리밍 시스템(Streaming system)** 은
에이전트 실행(agent run) 중 발생하는 **실시간 피드백(live feedback)** 을
애플리케이션으로 직접 전달할 수 있게 해줍니다.

스트리밍을 사용하면 **전체 응답이 완성되기 전에** 부분적으로 결과를 점진적으로 표시할 수 있습니다.
이를 통해 **LLM의 지연(latency)** 으로 인한 대기 시간을 줄이고,
사용자에게 더 빠르고 자연스러운 **실시간 인터랙션 경험(UX)** 을 제공합니다.

### LangChain 스트리밍으로 가능한 기능들

* **에이전트 진행 상황 스트리밍 (Stream agent progress)**
  → 각 에이전트 단계가 끝날 때마다 상태 업데이트(state update)를 수신할 수 있습니다.

* **LLM 토큰 스트리밍 (Stream LLM tokens)**
  → 언어 모델이 토큰을 생성할 때마다, 생성되는 즉시 스트리밍으로 받아볼 수 있습니다.

* **사용자 정의 업데이트 스트리밍 (Stream custom updates)**
  → 예를 들어 `"100개 중 10개 레코드 가져옴"` 같은
  사용자 정의 신호(user-defined signals)를 실시간으로 전송할 수 있습니다.

* **다중 모드 스트리밍 (Stream multiple modes)**
  → 다음과 같은 모드 중 선택적으로 사용할 수 있습니다:

  1. **updates** – 에이전트 진행 상황 중심의 업데이트
  2. **messages** – LLM 토큰 및 관련 메타데이터
  3. **custom** – 임의의 사용자 정의 데이터 (예: 로그, 상태 메시지 등)
  4. **values** - LLM의 최종 응답 객체들을 순서대로 반환

In [1]:
from dotenv import load_dotenv

load_dotenv()

True

In [2]:
from langchain.chat_models import init_chat_model

# model = init_chat_model("gpt-5-nano", model_provider="openai")
model = init_chat_model("gemini-2.5-flash", model_provider="google_genai")

### 에이전트 진행 상황 스트리밍 (Stream agent progress)
각 에이전트 단계가 끝날 때마다 상태 업데이트(state update)를 수신할 수 있습니다.

In [3]:
from langchain.agents import create_agent

def get_weather(city: str) -> str:
    """특정 도시의 날씨를 가져옵니다."""
    return f"{city}의 날씨는 언제나 맑습니다!"

# 에이전트 생성
agent = create_agent(
    model=model,  # 사용할 모델 지정
    tools=[get_weather],        # 사용할 도구(함수) 등록
)

# 스트리밍 실행
for chunk in agent.stream(  
    {"messages": [{"role": "user", "content": "SF(샌프란시스코)의 날씨는 어때?"}]},
    stream_mode="updates",  # 스트리밍 모드를 'updates'(단계별 진행 상황)로 설정
):
    for step, data in chunk.items():
        print(f"단계(step): {step}")
        print(f"내용(content): {data['messages'][-1].content_blocks}")

단계(step): model
내용(content): [{'type': 'tool_call', 'id': '382ab5fe-434f-4df6-8043-d8b904d0dbb8', 'name': 'get_weather', 'args': {'city': 'San Francisco'}}]
단계(step): tools
내용(content): [{'type': 'text', 'text': 'San Francisco의 날씨는 언제나 맑습니다!'}]
단계(step): model
내용(content): [{'type': 'text', 'text': '샌프란시스코의 날씨는 언제나 맑습니다!'}]


### LLM 토큰 (LLM tokens)

LLM이 **토큰을 생성하는 즉시 실시간으로 스트리밍**하려면
`stream_mode="messages"` 옵션을 사용하면 됩니다.

이 모드를 사용하면, **에이전트가 실행 중 호출하는 도구(tool)들의 스트리밍 출력**과
**최종 응답(final response)** 을 모두 실시간으로 확인할 수 있습니다.

In [4]:
from langchain.agents import create_agent

def get_weather(city: str) -> str:
    """특정 도시의 날씨를 가져옵니다."""
    return f"{city}의 날씨는 언제나 맑습니다!"

# 에이전트 생성
agent = create_agent(
    model=model,  # 사용할 언어 모델 지정
    tools=[get_weather],        # 사용할 도구(함수) 등록
)

# LLM이 토큰을 생성할 때마다 실시간으로 스트리밍 받기
for token, metadata in agent.stream(  
    {"messages": [{"role": "user", "content": "SF(샌프란시스코)의 날씨는 어때?"}]},
    stream_mode="messages",  # 'messages' 모드: LLM 토큰 스트리밍
):
    print(f"노드(node): {metadata['langgraph_node']}")     # 현재 실행 중인 LangGraph 노드 이름
    print(f"내용(content): {token.content_blocks}")       # 실시간으로 생성되는 텍스트 블록(토큰)
    print("\n")

노드(node): model
내용(content): [{'type': 'tool_call', 'id': '8db27d81-e934-4379-8b25-75780a0e85f5', 'name': 'get_weather', 'args': {'city': 'San Francisco'}}]


노드(node): model
내용(content): []


노드(node): tools
내용(content): [{'type': 'text', 'text': 'San Francisco의 날씨는 언제나 맑습니다!'}]


노드(node): model
내용(content): [{'type': 'text', 'text': '샌프란시'}]


노드(node): model
내용(content): [{'type': 'text', 'text': '스코의 날씨는 언제나 맑습니다!\n'}]


노드(node): model
내용(content): []




### 사용자 정의 업데이트 (Custom updates)

도구(tool)가 실행되는 동안 실시간으로 상태 업데이트를 스트리밍(streaming) 하려면
`get_stream_writer`를 사용할 수 있습니다.

즉, 도구가 실행되는 과정에서 중간 진행 상황이나 결과를 실시간으로 전송할 수 있게 하는 기능입니다.
이 기능을 사용하면, 모델의 최종 응답을 기다리지 않고도 사용자는 즉시 진행 상태를 확인할 수 있습니다.

In [5]:
from langchain.agents import create_agent
from langgraph.config import get_stream_writer  

def get_weather(city: str) -> str:
    """주어진 도시의 날씨를 가져옵니다."""
    writer = get_stream_writer()  # 스트리밍 writer 생성
    # 임의의 데이터를 스트리밍 방식으로 출력
    writer(f"도시 {city}의 데이터를 조회 중입니다...")
    writer(f"도시 {city}의 데이터를 성공적으로 가져왔습니다.")
    return f"{city}의 날씨는 언제나 맑습니다!"

# 에이전트 생성
agent = create_agent(
    model,
    tools=[get_weather],
)

# 사용자 질의에 대해 커스텀 스트리밍 실행
for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "샌프란시스코의 날씨는 어때?"}]},
    stream_mode=["custom", "updates"],
):
    print(chunk)

('updates', {'model': {'messages': [AIMessage(content='', additional_kwargs={'function_call': {'name': 'get_weather', 'arguments': '{"city": "San Francisco"}'}}, response_metadata={'prompt_feedback': {'block_reason': 0, 'safety_ratings': []}, 'finish_reason': 'STOP', 'model_name': 'gemini-2.5-flash', 'safety_ratings': [], 'grounding_metadata': {}, 'model_provider': 'google_genai'}, id='lc_run--09b061a7-87ac-4c77-97d6-9e2b811a3981-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'San Francisco'}, 'id': '9b543783-0fc7-4b36-aaad-4e9e4eed2775', 'type': 'tool_call'}], usage_metadata={'input_tokens': 56, 'output_tokens': 69, 'total_tokens': 125, 'input_token_details': {'cache_read': 0}, 'output_token_details': {'reasoning': 53}})]}})
('custom', '도시 San Francisco의 데이터를 조회 중입니다...')
('custom', '도시 San Francisco의 데이터를 성공적으로 가져왔습니다.')
('updates', {'tools': {'messages': [ToolMessage(content='San Francisco의 날씨는 언제나 맑습니다!', name='get_weather', id='303d7835-792c-4de5-8d60-dbcb00df66a3', tool

## 스트리밍과 청크 (Streaming and Chunks)

스트리밍 중에는 **`AIMessageChunk` 객체**들이 순차적으로 전달되며,
이 청크(chunk)들을 결합하면 **완전한 메시지 객체(full message object)** 를 구성할 수 있습니다.


In [7]:
chunks = []            # 스트리밍으로 받은 청크(부분 응답)들을 저장할 리스트
full_message = None    # 전체 메시지를 누적할 변수

# 모델로부터 스트리밍 방식으로 응답을 받음 (stream_mode=values)
for chunk in model.stream("LangChain이 뭔지 한줄로 답해주세요."):
    chunks.append(chunk)               # 각 청크를 리스트에 추가
    print(chunk.text)                  # 청크의 텍스트 부분을 바로 출력 (실시간 스트리밍처럼)
    # full_message가 비어있다면 첫 청크를 넣고, 이후에는 청크를 이어붙임
    full_message = chunk if full_message is None else full_message + chunk

# 모든 청크를 합친 최종 메시지 출력
full_message

LLM(대규모 언어 모델)을 활용한 애플리케이션 개발을 돕는 오픈소스 프레임워크입니다.



AIMessageChunk(content='LLM(대규모 언어 모델)을 활용한 애플리케이션 개발을 돕는 오픈소스 프레임워크입니다.', additional_kwargs={}, response_metadata={'finish_reason': 'STOP', 'model_name': 'gemini-2.5-flash', 'safety_ratings': [], 'grounding_metadata': {}, 'model_provider': 'google_genai'}, id='lc_run--480582aa-1594-4b2a-8433-be80688c848d', usage_metadata={'input_tokens': 12, 'output_tokens': 527, 'total_tokens': 539, 'input_token_details': {'cache_read': 0}, 'output_token_details': {'reasoning': 498}}, chunk_position='last')

In [12]:
for event in agent.stream(  
    {"messages": [{"role": "user", "content": "LangChain이 뭔지 한줄로 답해주세요."}]},
    stream_mode="values"
):
    event['messages'][-1].pretty_print()


LangChain이 뭔지 한줄로 답해주세요.

[{'type': 'text', 'text': 'LangChain은 LLM(대규모 언어 모델)을 기반으로 애플리케이션을 개발할 수 있도록 돕는 프레임워크입니다.', 'extras': {'signature': 'CssBAdHtim/P2zbi5L3hucwa+pbNEcIFbrTPAq1Qlic7Kj4tBlsdAFLnWt3+4vcstgO3QAVQd9a6u27dRXoUCgtgdh0+Mg+Nh+usM/7QqwQn7NsDwf/NJotGzFnwes3mk75eXVLCfiAgrdvz1C3C2+W5fsMH0EyhN7nI9Pl5btdBvL43CXqjbRT2iCSccDX7GB5cUKFJmVfSJMstxgq7PFtyNWytCF2uPnDw3qNMxf/8bq2RzhcUy5kFGGdnnrz+DIvKzZ+alqkP6pJgGJI='}}]
