# 🦜LangChain을 사용한 스트리밍
- 스트리밍은 LLM 기반 애플리케이션이 최종 사용자에게 반응하는 느낌을 주는 데 매우 중요합니다.
- LLM, 파서, 프롬프트, 리트리버, 에이전트와 같은 중요한 LangChain 기본 요소는 LangChain 실행 가능 인터페이스를 구현합니다.

이 인터페이스는 콘텐츠 스트리밍에 대한 두 가지 일반적인 접근 방식을 제공합니다:

- 동기화 스트림과 비동기 아스트림: 체인에서 최종 출력을 스트리밍하는 스트리밍의 기본 구현입니다.
- async astream_events 및 async astream_log: 중간 단계와 체인의 최종 출력을 모두 스트리밍하는 방법을 제공합니다.
두 가지 접근 방식을 살펴보고 사용 방법을 이해해 보겠습니다. 🥷

In [1]:
!pip install -qU langchain-anthropic

TIP. anthrophic 가입하고 API key 설정 할때 번호 인증 하면 크레딧 5불을 줍니다.

In [2]:
api_key = "your_api_key"

In [7]:
# Showing the example using anthropic, but you can use
# your favorite chat model!
from langchain_anthropic import ChatAnthropic

model = ChatAnthropic(model='claude-3-opus-20240229',temperature=0, anthropic_api_key=api_key)

chunks = []
async for chunk in model.astream("안녕 나는 김덕배야. 너는 누구냐?"):
    chunks.append(chunk)
    print(chunk.content, end="|", flush=True)

안|녕|하|세|요| |김|덕|배|님|.| |저|는| |Claude|라|는| |인|공|지|능| |어|시|스|턴|트|입|니|다|.| |사|람|들|과| |대|화|를| |나|누|고| |다|양|한| |주|제|에| |대|해| |도|움|을| |드|리|는| |것|이| |제| |역|할|이|에|요|.| |반|갑|습|니|다|!|

In [8]:
chunks[0]

AIMessageChunk(content='안')

In [9]:
chunks[0] + chunks[1] + chunks[2] + chunks[3] + chunks[4]

AIMessageChunk(content='안녕하세요')

In [10]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_template("{topic}이거에 대한 정보를 나에게 말해줘.") # prompt template
parser = StrOutputParser() # output parser 문자열을 그대로 출력
chain = prompt | model | parser

async for chunk in chain.astream({"topic": "일론머스크"}):
    print(chunk, end="|", flush=True)

일|론| |머|스|크|는| |테|슬|라|,| |스|페|이|스|X|,| |뉴|럴|링|크|,| |보|링| |컴|퍼|니| |등| |여|러| |혁|신|적|인| |기|업|의| |창|업|자|이|자| |CEO|입|니|다|.| |그|는| |전|기|차|,| |우|주| |탐|사|,| |인|공|지|능|,| |신|경| |기|술| |등| |다|양|한| |분|야|에|서| |혁|신|을| |주|도|하|고| |있|습|니|다|.| |

1|.| |테|슬|라|:| |전|기|차| |및| |청|정|에|너|지| |솔|루|션| |기|업|으|로|,| |머|스|크|는| |2|004|년| |투|자|자|로| |참|여|한| |후| |CEO|가| |되|었|습|니|다|.|

2|.| |스|페|이|스|X|:| |우|주| |탐|사| |기|술| |기|업|으|로|,| |재|사|용| |가|능|한| |로|켓| |개|발| |및| |화|성| |colon|ization|을| |목|표|로| |합|니|다|.|

3|.| |뉴|럴|링|크|:| |인|간|의| |두|뇌|와| |컴|퓨|터|를| |연|결|하|는| |기|술|을| |개|발|하|는| |신|경| |기|술| |기|업|입|니|다|.|

4|.| |보|링| |컴|퍼|니|:| |터|널| |굴|착| |기|술|을| |개|발|하|여| |교|통| |혼|잡|을| |해|소|하|고|자| |하|는| |기|업|입|니|다|.|

머|스|크|는| |혁|신|적|인| |아|이|디|어|와| |사|업| |추|진|력|으|로| |유|명|하|지|만|,| |때|로|는| |논|란|의| |중|심|에| |서|기|도| |합|니|다|.| |그|는| |트|위|터|를| |통|해| |활|발|히| |소|통|하|며|,| |암|호|화|폐| |등| |다|양|한| |주|제|에| |대|해| |의|견|을| |표|명|합|니|다|.| |머|스|크|의| |비|전|과| |리|더|십|은| |전| |세|계|적|으|로| |주|목|받|고| |있|습|니|다|.|

# 입력 스트림으로 작업하기
생성 중인 출력에서 JSON을 스트리밍하고 싶다면 어떻게 해야 할까요?

json.loads에 의존하여 부분 JSON을 구문 분석하려고 하면 부분 JSON이 유효한 JSON이 아니므로 구문 분석이 실패할 것입니다.

어떻게 해야 할지 완전히 망설이게 되고 JSON을 스트리밍할 수 없다고 주장하게 될 것입니다.

파서가 입력 스트림에서 작동하여 부분 json을 유효한 상태로 "자동 완성"을 시도해야 하는 방법이 있다는 것이 밝혀졌습니다.

이것이 무엇을 의미하는지 이해하기 위해 이러한 구문 분석기가 실제로 작동하는 모습을 살펴봅시다.

In [11]:
from langchain_core.output_parsers import JsonOutputParser

chain = (
    model | JsonOutputParser() #json 형태로 출력
)  # Due to a bug in older versions of Langchain, JsonOutputParser did not stream results from some models
async for text in chain.astream(
    'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'
):
    print(text, flush=True)

{}
{'countries': []}
{'countries': [{}]}
{'countries': [{'name': ''}]}
{'countries': [{'name': 'France'}]}
{'countries': [{'name': 'France', 'population': 67}]}
{'countries': [{'name': 'France', 'population': 67391}]}
{'countries': [{'name': 'France', 'population': 67391582}]}
{'countries': [{'name': 'France', 'population': 67391582}, {}]}
{'countries': [{'name': 'France', 'population': 67391582}, {'name': ''}]}
{'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain'}]}
{'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47}]}
{'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47351}]}
{'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47351567}]}
{'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 47351567}, {}]}
{'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 4735156

In [12]:
from langchain_core.output_parsers import (
    JsonOutputParser,
)


# A function that operates on finalized inputs
# rather than on an input_stream
def _extract_country_names(inputs):
    """A function that does not operates on input streams and breaks streaming."""
    if not isinstance(inputs, dict):
        return ""

    if "countries" not in inputs:
        return ""

    countries = inputs["countries"]

    if not isinstance(countries, list):
        return ""

    country_names = [
        country.get("name") for country in countries if isinstance(country, dict)
    ]
    return country_names


chain = model | JsonOutputParser() | _extract_country_names

async for text in chain.astream(
    'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'
):
    print(text, end="|", flush=True)

[None, '', 'France', 'France', 'France', 'France', 'France', None, 'France', '', 'France', 'Spain', 'France', 'Spain', 'France', 'Spain', 'France', 'Spain', 'France', 'Spain', None, 'France', 'Spain', '', 'France', 'Spain', 'Japan', 'France', 'Spain', 'Japan', 'France', 'Spain', 'Japan', 'France', 'Spain', 'Japan']|

In [13]:
from langchain_core.output_parsers import JsonOutputParser


async def _extract_country_names_streaming(input_stream):
    """A function that operates on input streams."""
    country_names_so_far = set()

    async for input in input_stream:
        if not isinstance(input, dict):
            continue

        if "countries" not in input:
            continue

        countries = input["countries"]

        if not isinstance(countries, list):
            continue

        for country in countries:
            name = country.get("name")
            if not name:
                continue
            if name not in country_names_so_far:
                yield name
                country_names_so_far.add(name)


chain = model | JsonOutputParser() | _extract_country_names_streaming

async for text in chain.astream(
    'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'
):
    print(text, end="|", flush=True)

France|Spain|Japan|