In [1]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

model = ChatOpenAI()
prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
chain = prompt | model

### Input schema

In [2]:
# The input schema of the chain is the input schema of its first part, the prompt.
chain.input_schema.schema()

{'title': 'PromptInput',
 'type': 'object',
 'properties': {'topic': {'title': 'Topic', 'type': 'string'}}}

In [5]:
prompt.input_schema.schema()

{'title': 'PromptInput',
 'type': 'object',
 'properties': {'topic': {'title': 'Topic', 'type': 'string'}}}

In [None]:
model.input_schema.schema()

### Output Schema

In [None]:
# The output schema of the chain is the output schema of its last part, in this case a ChatModel, which outputs a ChatMessage
chain.output_schema.schema()

### Stream

In [15]:
for s in chain.stream({"topic": "bears"}):
    print(s.content, end="", flush=True)

Why don't bears like fast food?

Because they can't catch it!

### Invoke

In [16]:
chain.invoke({"topic": "bears"})

AIMessage(content="Why don't bears use cell phones?\n\nBecause they already have bear-y good reception in the woods!")

### Batch

In [17]:
chain.batch([{"topic": "bears"}, {"topic": "cats"}])

[AIMessage(content="Why don't bears wear shoes?\n\nBecause they have bear feet!"),
 AIMessage(content='Why did the cat sit on the computer?\n\nBecause it wanted to keep an eye on the mouse!')]

### Asynch Stream

In [20]:
async for s in chain.astream({"topic": "bears"}):
    print(s.content, end="", flush=True)

Why don't bears wear shoes?

Because they have bear feet!

### Asynch Invoke

In [21]:
await chain.ainvoke({"topic": "bears"})

AIMessage(content="Sure, here's a bear-themed joke for you:\n\nWhy don't bears wear shoes?\n\nBecause they already have bear feet!")

### Asynch Batch

In [22]:
await chain.abatch([{"topic": "bears"}, {"topic": "monkeys"}])

[AIMessage(content="Why don't bears wear shoes?\n\nBecause they have bear feet!"),
 AIMessage(content="Why don't monkeys play cards in the wild?\n\nBecause there are too many cheetahs!")]

### Async Stream Events

In [23]:
from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import OpenAIEmbeddings

template = """Answer the question based only on the following context:
{context}

Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)

vectorstore = FAISS.from_texts(
    ["harrison worked at kensho"], embedding=OpenAIEmbeddings()
)
retriever = vectorstore.as_retriever()

retrieval_chain = (
    {
        "context": retriever.with_config(run_name="Docs"),
        "question": RunnablePassthrough(),
    }
    | prompt
    | model.with_config(run_name="my_llm")
    | StrOutputParser()
)

In [34]:
async for event in retrieval_chain.astream_events(
    "where did harrison work?", include_names=["Docs", "my_llm"]
):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        # print("4. Streaming ouput")
        print(event["data"]["chunk"].content, end="")
    elif kind in {"on_chat_model_start"}:
        print("3. Start streaming LLM:")
    elif kind in {"on_chat_model_end"}:
        print()
        print("5. Done streaming LLM.")
    elif kind == "on_retriever_end":
        print("2. Retrieved the following documents:")
        print(event["data"]["output"]["documents"])
    elif kind == "on_retriever_start":
        print("1. Start retrieving document for input")
    elif kind == "on_tool_end":
        print(f"Ended tool: {event['name']}")
    else:
        pass

1. Start retrieving document for input
2. Retrieved the following documents:
[Document(page_content='harrison worked at kensho')]
3. Start streaming LLM:
Harrison worked at Kensho.
5. Done streaming LLM.


### Async Stream Intermediate Steps

#### Streaming JSONPatch chunks

In [35]:
async for chunk in retrieval_chain.astream_log(
    "where did harrison work?", include_names=["Docs"]
):
    print("-" * 40)
    print(chunk)

----------------------------------------
RunLogPatch({'op': 'replace',
  'path': '',
  'value': {'final_output': None,
            'id': 'd675b4ac-b0a2-40f0-9ea4-004cea32d9ba',
            'logs': {},
            'name': 'RunnableSequence',
            'streamed_output': [],
            'type': 'chain'}})
----------------------------------------
RunLogPatch({'op': 'add',
  'path': '/logs/Docs',
  'value': {'end_time': None,
            'final_output': None,
            'id': 'ece154f1-36d8-484c-82ff-6ab263144b12',
            'metadata': {},
            'name': 'Docs',
            'start_time': '2024-01-25T14:38:06.825+00:00',
            'streamed_output': [],
            'streamed_output_str': [],
            'tags': ['map:key:context', 'FAISS', 'OpenAIEmbeddings'],
            'type': 'retriever'}})
----------------------------------------
RunLogPatch({'op': 'add',
  'path': '/logs/Docs/final_output',
  'value': {'documents': [Document(page_content='harrison worked at kensho')]}},
 

### Streaming the incremental RunState

In [36]:
async for chunk in retrieval_chain.astream_log(
    "where did harrison work?", include_names=["Docs"], diff=False
):
    print("-" * 70)
    print(chunk)

----------------------------------------------------------------------
RunLog({'final_output': None,
 'id': '16861f2e-07d6-4522-9658-bef983623476',
 'logs': {},
 'name': 'RunnableSequence',
 'streamed_output': [],
 'type': 'chain'})
----------------------------------------------------------------------
RunLog({'final_output': None,
 'id': '16861f2e-07d6-4522-9658-bef983623476',
 'logs': {'Docs': {'end_time': None,
                   'final_output': None,
                   'id': 'c8d3bb5e-1578-43e1-8852-4fcb73e19934',
                   'metadata': {},
                   'name': 'Docs',
                   'start_time': '2024-01-25T14:39:45.139+00:00',
                   'streamed_output': [],
                   'streamed_output_str': [],
                   'tags': ['map:key:context', 'FAISS', 'OpenAIEmbeddings'],
                   'type': 'retriever'}},
 'name': 'RunnableSequence',
 'streamed_output': [],
 'type': 'chain'})
-------------------------------------------------------------

### Parallelism

In [43]:
from langchain_core.runnables import RunnableParallel

chain1 = ChatPromptTemplate.from_template("tell me a joke about {topic}") | model
chain2 = (
    ChatPromptTemplate.from_template("write a short (2 line) poem about {topic}")
    | model
)
combined = RunnableParallel(joke=chain1, poem=chain2)
combined

{
  joke: ChatPromptTemplate(input_variables=['topic'], messages=[HumanMessagePromptTemplate(prompt=PromptTemplate(input_variables=['topic'], template='tell me a joke about {topic}'))])
        | ChatOpenAI(client=<openai.resources.chat.completions.Completions object at 0x7f24f8c1d710>, async_client=<openai.resources.chat.completions.AsyncCompletions object at 0x7f24e457d2d0>, openai_api_key='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', openai_proxy=''),
  poem: ChatPromptTemplate(input_variables=['topic'], messages=[HumanMessagePromptTemplate(prompt=PromptTemplate(input_variables=['topic'], template='write a short (2 line) poem about {topic}'))])
        | ChatOpenAI(client=<openai.resources.chat.completions.Completions object at 0x7f24f8c1d710>, async_client=<openai.resources.chat.completions.AsyncCompletions object at 0x7f24e457d2d0>, openai_api_key='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', openai_proxy='')
}

In [38]:
%%time
chain1.invoke({"topic": "bears"})

CPU times: user 11.1 ms, sys: 0 ns, total: 11.1 ms
Wall time: 1.8 s


AIMessage(content="Why don't bears wear shoes?\n\nBecause they already have bear feet!")

In [39]:
%%time
chain2.invoke({"topic": "bears"})

CPU times: user 11.6 ms, sys: 0 ns, total: 11.6 ms
Wall time: 1.83 s


AIMessage(content='Roaming wild, fierce and free,\nBears embody strength and majesty.')

In [40]:
%%time
combined.invoke({"topic": "bears"})

CPU times: user 16.8 ms, sys: 0 ns, total: 16.8 ms
Wall time: 1.9 s


{'joke': AIMessage(content="Why don't bears wear shoes?\n\nBecause they have bear feet!"),
 'poem': AIMessage(content="Majestic creatures roam,\nNature's strength, beauty shown.")}

#### Parallelism on batches

In [41]:
%%time
chain1.batch([{"topic": "bears"}, {"topic": "cats"}])

CPU times: user 25.3 ms, sys: 0 ns, total: 25.3 ms
Wall time: 1.76 s


[AIMessage(content="Why don't bears wear shoes?\n\nBecause they have bear feet!"),
 AIMessage(content="Why don't cats play poker in the wild?\n\nToo many cheetahs!")]

In [42]:
%%time
chain2.batch([{"topic": "bears"}, {"topic": "cats"}])

CPU times: user 38.2 ms, sys: 1.82 ms, total: 40 ms
Wall time: 22.3 s


[AIMessage(content="Bears roam forest wild,\nNature's strength, untamed and mild."),
 AIMessage(content='Whiskers whisper, tails dance,\nCats prowl in a moonlit trance.')]

In [44]:
%%time
combined.batch([{"topic": "bears"}, {"topic": "cats"}])

CPU times: user 34.5 ms, sys: 18.3 ms, total: 52.8 ms
Wall time: 22.5 s


[{'joke': AIMessage(content="Why don't bears wear shoes? \n\nBecause they have bear feet!"),
  'poem': AIMessage(content='Majestic creatures roam with grace,\nIn wilderness, bears find their sacred space.')},
 {'joke': AIMessage(content="Sure, here's a cat joke for you:\n\nWhy don't cats play poker in the wild?\n\nToo many cheetahs!"),
  'poem': AIMessage(content='Whiskers dance, tails entwine,\nIn feline grace, secrets we find.')}]