# RunnableParallel: 데이터 병렬 조작


## 입력 및 출력 조작

`RunnableParallel` 은 시퀀스 내에서 하나의 `Runnable` 의 출력을 다음 `Runnable` 의 입력 형식에 맞게 조작하는 데 유용하게 사용될 수 있습니다.

여기서 prompt에 대한 입력은 "context"와 "question"이라는 키를 가진 map 형태로 예상됩니다.

사용자 입력은 단순히 질문 내용입니다. 따라서 retriever를 사용하여 컨텍스트를 가져오고, 사용자 입력을 "question" 키 아래에 전달해야 합니다.


In [None]:
# 필요한 모듈 설치
%pip install -qU langchain langchain-openai

In [8]:
from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings

# 텍스트로부터 FAISS 벡터 저장소를 생성합니다.
vectorstore = FAISS.from_texts(
    ["Teddy is an AI engineer who loves programming!"], embedding=OpenAIEmbeddings()
)
# 벡터 저장소를 검색기로 사용합니다.
retriever = vectorstore.as_retriever()
# 템플릿을 정의합니다.
template = """Answer the question based only on the following context:
{context}

Question: {question}
"""
# 템플릿으로부터 채팅 프롬프트를 생성합니다.
prompt = ChatPromptTemplate.from_template(template)

# ChatOpenAI 모델을 초기화합니다.
model = ChatOpenAI()

# 검색 체인을 구성합니다.
retrieval_chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | model
    | StrOutputParser()
)

# 검색 체인을 실행하여 질문에 대한 답변을 얻습니다.
retrieval_chain.invoke("What is Teddy's occupation?")

"Teddy's occupation is an AI engineer."

다른 `Runnable` 과 함께 `RunnableParallel` 을 구성할 때, 유형 변환이 자동으로 처리되므로 `RunnableParallel` 클래스에서 입력으로 주입되는 dict 입력을 별도 래핑할 필요도 없다는 점에 유의하세요.

아래의 3가지 방식은 모두 동일하게 처리합니다.

```python
# 자체 RunnableParallel 로 래핑됨
1. {"context": retriever, "question": RunnablePassthrough()}

2. RunnableParallel({"context": retriever, "question": RunnablePassthrough()})

3. RunnableParallel(context=retriever, question=RunnablePassthrough())
```


## itemgetter를 단축어로 사용하기

RunnableParallel과 결합할 때 Python의 `itemgetter`를 단축어로 사용하여 map에서 데이터를 추출할 수 있습니다.

- [참고] itemgetter에 대한 자세한 정보는 [Python Documentation](https://docs.python.org/3/library/operator.html#operator.itemgetter)에서 확인할 수 있습니다.

아래 예제에서는 `itemgetter` 를 사용하여 map에서 특정 키를 추출합니다.


In [5]:
from operator import itemgetter

from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI, OpenAIEmbeddings

# 텍스트로부터 FAISS 벡터 저장소를 생성합니다.
vectorstore = FAISS.from_texts(
    ["Teddy is an AI engineer who loves programming!"], embedding=OpenAIEmbeddings()
)
# 벡터 저장소를 검색기로 사용합니다.
retriever = vectorstore.as_retriever()

# 템플릿을 정의합니다.
template = """Answer the question based only on the following context:
{context}

Question: {question}

Answer in the following language: {language}
"""
# 템플릿으로부터 채팅 프롬프트를 생성합니다.
prompt = ChatPromptTemplate.from_template(template)

# 체인을 구성합니다.
chain = (
    {
        "context": itemgetter("question") | retriever,
        "question": itemgetter("question"),
        "language": itemgetter("language"),
    }
    | prompt
    | ChatOpenAI()
    | StrOutputParser()
)

# 체인을 호출하여 질문에 답변합니다.
chain.invoke({"question": "What is Teddy's occupation?", "language": "Korean"})

'테디의 직업은 AI 엔지니어입니다.'

## 병렬처리를 단계별로 이해

`RunnableParallel` (또는 `RunnableMap`)을 사용하면 여러 `Runnable` 을 병렬로 실행하고, 이러한 `Runnable` 의 출력을 맵(map)으로 반환하는 것이 쉬워집니다.


In [27]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel
from langchain_openai import ChatOpenAI

model = ChatOpenAI()  # ChatOpenAI 모델을 초기화합니다.

# 수도를 묻는 질문에 대한 체인을 정의합니다.
capital_chain = (
    ChatPromptTemplate.from_template("{country} 의 수도는 어디입니까?")
    | model
    | StrOutputParser()
)

# 면적을 묻는 질문에 대한 체인을 정의합니다.
area_chain = (
    ChatPromptTemplate.from_template("{country} 의 면적은 얼마입니까?")
    | model
    | StrOutputParser()
)

# capital_chain, area_chain 을 병렬로 실행할 수 있는 RunnableParallel 객체를 생성합니다.
map_chain = RunnableParallel(capital=capital_chain, area=area_chain)

# map_chain을 호출하여 대한민국의 수도와 면적을 묻습니다.
map_chain.invoke({"country": "대한민국"})

{'capital': '서울입니다.', 'area': '대한민국의 면적은 약 100,363km² 입니다.'}

아래와 같이 chain 별로 입력 템플릿의 변수가 달라도 상관없이 실행 가능합니다.


In [24]:
# 수도를 묻는 질문에 대한 체인을 정의합니다.
capital_chain2 = (
    ChatPromptTemplate.from_template("{country1} 의 수도는 어디입니까?")
    | model
    | StrOutputParser()
)

# 면적을 묻는 질문에 대한 체인을 정의합니다.
area_chain2 = (
    ChatPromptTemplate.from_template("{country2} 의 면적은 얼마입니까?")
    | model
    | StrOutputParser()
)

# capital_chain, area_chain 을 병렬로 실행할 수 있는 RunnableParallel 객체를 생성합니다.
map_chain = RunnableParallel(capital=capital_chain2, area=area_chain2)

# map_chain을 호출합니다. 이때 각각의 key에 대한 value를 전달합니다.
map_chain.invoke({"country1": "대한민국", "country2": "미국"})

{'capital': '대한민국의 수도는 서울입니다.', 'area': '미국의 면적은 9,826,675 km² 입니다.'}

## 병렬 처리

`RunnableParallel` 은 맵에 있는 각 `Runnable` 이 병렬로 실행되기 때문에 독립적인 프로세스를 병렬로 실행하는 데에도 유용합니다.

예를 들어, 앞서 살펴본 `area_chain`, `capital_chain`, `map_chain`은 `map_chain`이 다른 두 체인을 모두 실행함에도 불구하고 **거의 동일한 실행 시간** 을 가지는 것을 확인할 수 있습니다.


In [29]:
%%timeit

# 면저을 묻는 체인을 호출하고 실행 시간을 측정합니다.
area_chain.invoke({"country": "대한민국"})

946 ms ± 242 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [30]:
%%timeit

# 수도를 묻는 체인을 호출하고 실행 시간을 측정합니다.
capital_chain.invoke({"country": "대한민국"})

895 ms ± 218 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [32]:
%%timeit

# Parallel 하게 구성된 체인을 호출하고 실행 시간을 측정합니다.
map_chain.invoke({"country": "대한민국"})

967 ms ± 202 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
