In [2]:
from dotenv import load_dotenv

# API 키 정보 로드
load_dotenv()
from langchain_teddynote import logging

# 프로젝트 이름을 입력합니다.
logging.langsmith("LCEL-Advanced")

LangSmith 추적을 시작합니다.
[프로젝트명]
LCEL-Advanced


In [3]:
from typing import Iterator, List
from langchain.prompts.chat import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI

prompt = ChatPromptTemplate.from_template(
    # 주어진 회사와 유사한 5개의 회사를 쉼표로 구분된 목록으로 작성하세요.
    "Write a comma-separated list of 5 comanies similar to: {company}"
)

# 온도를 0.0으로 설정하여 ChatOpenAI 모델을 초기화
model = ChatOpenAI(model="gpt-4-turbo-preview", temperature=0)

# 프롬프트와 모델을 연결하고 문자열 출력 파서를 적용하여 체인을 생성
str_chain = prompt | model | StrOutputParser()

In [4]:
# 데이터를 스트리밍
for chunk in str_chain.stream({"company": "Google"}):
    # 각 청크를 출력하고, 줄 바꿈 없이 버퍼를 즉시 플러시
    print(chunk, end="", flush=True)

Microsoft, Amazon, Facebook (Meta), Apple, Alibaba

In [5]:
# 체인에 데이터를 invoke 합니다.
str_chain.invoke({"company": "Google"})

'Microsoft, Amazon, Facebook (Meta), Apple, Alibaba'

In [6]:
# 입력으로 llm 토큰의 반복자를 받아 쉼표로 구분된 문자열 리스트로 분할하는 사용자 정의 파서입니다.
def split_into_list(input: Iterator[str]) -> Iterator[List[str]]:
    # 쉼표가 나올 때까지 부분 입력을 보관
    buffer = ""
    for chunk in input:
        # 현재 청크를 버퍼에 추가
        buffer += chunk
        # 버퍼에 쉼표가 있는 동안 반복
        while "," in buffer:
            # 버퍼를 쉼표로 분할
            comma_index = buffer.index(",")
            # 쉼표 이전의 모든 내용을 반환
            yield [buffer[:comma_index].strip()]
            # 나머지는 다음 반복을 위해 저장
            buffer = buffer[comma_index + 1 :]
    yield [buffer.strip()]

In [7]:
list_chain = str_chain | split_into_list  # 문자열 체인을 리스트로 분할합니다.

In [8]:
# 생성한 list_chain 이 문제없이 스트리밍되는지 확인합니다.
for chunk in list_chain.stream({"company": "Google"}):
    print(chunk, flush=True)  # 각 청크를 출력하고, 버퍼를 즉시 플러시합니다.

['Microsoft']
['Amazon']
['Facebook (Meta)']
['Apple']
['Alibaba']


In [9]:
# list_chain 에 데이터를 invoke 합니다.
list_chain.invoke({"company": "Google"})

['Microsoft', 'Amazon', 'Facebook (Meta)', 'Apple', 'Alibaba']

# 비동기(Asynchronous)

In [10]:
from typing import AsyncIterator


# 비동기 함수 정의
async def asplit_into_list(input: AsyncIterator[str]) -> AsyncIterator[List[str]]:
    buffer = ""
    # `input`은 `async_generator` 객체이므로 `async for`를 사용
    async for chunk in input:
        buffer += chunk
        while "," in buffer:
            comma_index = buffer.index(",")
            yield [
                buffer[:comma_index].strip()
            ]  # 쉼표를 기준으로 분할하여 리스트로 반환
            buffer = buffer[comma_index + 1:]
    yield [buffer.strip()]  # 남은 버퍼 내용을 리스트로 반환


# alist_chain 과 asplit_into_list 를 파이프라인으로 연결
alist_chain = str_chain | asplit_into_list

In [11]:
# async for 루프를 사용하여 데이터를 스트리밍합니다.
async for chunk in alist_chain.astream({"company": "Google"}):
    # 각 청크를 출력하고 버퍼를 비웁니다.
    print(chunk, flush=True)

['Microsoft']
['Amazon']
['Facebook (Meta)']
['Apple']
['Alibaba']


In [12]:
# 리스트 체인을 비동기적으로 호출합니다.
await alist_chain.ainvoke({"company": "Google"})

['Microsoft', 'Amazon', 'Facebook (Meta)', 'Apple', 'Alibaba']