In [2]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

model = ChatOpenAI()
prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
chain = prompt | model

# Input Schema

In [3]:
chain

ChatPromptTemplate(input_variables=['topic'], messages=[HumanMessagePromptTemplate(prompt=PromptTemplate(input_variables=['topic'], template='tell me a joke about {topic}'))])
| ChatOpenAI(client=<openai.resources.chat.completions.Completions object at 0x0000014CE902B790>, async_client=<openai.resources.chat.completions.AsyncCompletions object at 0x0000014CE9036A10>, openai_api_key=SecretStr('**********'), openai_proxy='')

In [4]:
chain.input_schema

pydantic.main.PromptInput

In [7]:
# The input schema of the chain is the input schema of its first part, the prompt.
chain.input_schema.schema()

{'title': 'PromptInput',
 'type': 'object',
 'properties': {'topic': {'title': 'Topic', 'type': 'string'}}}

In [8]:
prompt.input_schema.schema()

{'title': 'PromptInput',
 'type': 'object',
 'properties': {'topic': {'title': 'Topic', 'type': 'string'}}}

In [9]:
model.input_schema.schema()

{'title': 'ChatOpenAIInput',
 'anyOf': [{'type': 'string'},
  {'$ref': '#/definitions/StringPromptValue'},
  {'$ref': '#/definitions/ChatPromptValueConcrete'},
  {'type': 'array',
   'items': {'anyOf': [{'$ref': '#/definitions/AIMessage'},
     {'$ref': '#/definitions/HumanMessage'},
     {'$ref': '#/definitions/ChatMessage'},
     {'$ref': '#/definitions/SystemMessage'},
     {'$ref': '#/definitions/FunctionMessage'},
     {'$ref': '#/definitions/ToolMessage'}]}}],
 'definitions': {'StringPromptValue': {'title': 'StringPromptValue',
   'description': 'String prompt value.',
   'type': 'object',
   'properties': {'text': {'title': 'Text', 'type': 'string'},
    'type': {'title': 'Type',
     'default': 'StringPromptValue',
     'enum': ['StringPromptValue'],
     'type': 'string'}},
   'required': ['text']},
  'AIMessage': {'title': 'AIMessage',
   'description': 'A Message from an AI.',
   'type': 'object',
   'properties': {'content': {'title': 'Content',
     'anyOf': [{'type': 'str

# Output Schema

In [10]:
# The output schema of the chain is the output schema of its last part, in this case a ChatModel, which outputs a ChatMessage
chain.output_schema.schema()

{'title': 'ChatOpenAIOutput',
 'anyOf': [{'$ref': '#/definitions/AIMessage'},
  {'$ref': '#/definitions/HumanMessage'},
  {'$ref': '#/definitions/ChatMessage'},
  {'$ref': '#/definitions/SystemMessage'},
  {'$ref': '#/definitions/FunctionMessage'},
  {'$ref': '#/definitions/ToolMessage'}],
 'definitions': {'AIMessage': {'title': 'AIMessage',
   'description': 'A Message from an AI.',
   'type': 'object',
   'properties': {'content': {'title': 'Content',
     'anyOf': [{'type': 'string'},
      {'type': 'array',
       'items': {'anyOf': [{'type': 'string'}, {'type': 'object'}]}}]},
    'additional_kwargs': {'title': 'Additional Kwargs', 'type': 'object'},
    'type': {'title': 'Type',
     'default': 'ai',
     'enum': ['ai'],
     'type': 'string'},
    'example': {'title': 'Example', 'default': False, 'type': 'boolean'}},
   'required': ['content']},
  'HumanMessage': {'title': 'HumanMessage',
   'description': 'A Message from a human.',
   'type': 'object',
   'properties': {'conten

# Stream

In [11]:
import time

In [13]:
for s in chain.stream({"topic": "bears"}):
    time.sleep(0.05)
    print(s.content, end="", flush=True)

Why do bears have fur coats?

Because they look grizzly without them!

# Invoke

In [14]:
chain.invoke({"topic": "bears"})

AIMessage(content='Why did the bear bring a flashlight to the party? \n\nBecause he heard it was going to be a "beary" bright affair!')

# Batch

In [15]:
chain.batch([{"topic": "bears"}, {"topic": "cats"}])

[AIMessage(content="Why don't bears wear shoes?\n\nBecause they prefer bear feet!"),
 AIMessage(content='Why was the cat sitting on the computer?\n\nBecause it wanted to keep an eye on the mouse!')]

In [16]:
chain.batch([{"topic": "bears"}, {"topic": "cats"}], config={"max_concurrency": 5})

[AIMessage(content='Why did the bear dissolve in water?\n\nBecause it was polar!'),
 AIMessage(content='Why was the cat sitting on the computer?\n\nBecause it wanted to keep an eye on the mouse!')]

# Async Stream


In [17]:
async for s in chain.astream({"topic": "bears"}):
    print(s.content, end="", flush=True)

Why don't bears like fast food?

Because they can't catch it!

# Async Invoke

In [18]:
await chain.ainvoke({"topic": "bears"})

AIMessage(content="Why don't bears like fast food?\n\nBecause they can't catch it!")

# Async Batch

In [21]:
await chain.abatch([{"topic": "bears"}])

[AIMessage(content="Why don't bears like fast food? Because they can't catch it!")]

# Async Stream Events (beta)

In [32]:
from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import OpenAIEmbeddings

template = """Answer the question based only on the following context:
{context}

Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)

vectorstore = FAISS.from_texts(
    ["harrison worked at kensho"], embedding=OpenAIEmbeddings()
)
retriever = vectorstore.as_retriever()

retrieval_chain = (
    {
        "context": retriever.with_config(run_name="Docs"),
        "question": RunnablePassthrough(),
    }
    | prompt
    | model.with_config(run_name="my_llm")
    | StrOutputParser()
)

In [34]:
async for event in retrieval_chain.astream_events(
    "where did harrison work?", version="v1", include_names=["Docs", "my_llm"]
):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        print(event["data"]["chunk"].content, end="|")
    elif kind in {"on_chat_model_start"}:
        print()
        print("Streaming LLM:")
    elif kind in {"on_chat_model_end"}:
        print()
        print("Done streaming LLM.")
    elif kind == "on_retriever_end":
        print("--")
        print("Retrieved the following documents:")
        print(event["data"]["output"]["documents"])
    elif kind == "on_tool_end":
        print(f"Ended tool: {event['name']}")
    else:
        pass

--
Retrieved the following documents:
[Document(page_content='harrison worked at kensho')]

Streaming LLM:
|H|arrison| worked| at| Kens|ho|.||
Done streaming LLM.


# Async Stream Intermediate Steps

In [37]:
%pip install typeddic

Note: you may need to restart the kernel to use updated packages.


ERROR: Could not find a version that satisfies the requirement typeddic (from versions: none)
ERROR: No matching distribution found for typeddic


In [45]:
from typing import TypedDict, List, Dict, Any, Optional

class LogEntry(TypedDict):
    id: str
    """ID of the sub-run."""
    name: str
    """Name of the object being run."""
    type: str
    """Type of the object being run, eg. prompt, chain, llm, etc."""
    tags: List[str]
    """List of tags for the run."""
    metadata: Dict[str, Any]
    """Key-value pairs of metadata for the run."""
    start_time: str
    """ISO-8601 timestamp of when the run started."""

    streamed_output_str: List[str]
    """List of LLM tokens streamed by this run, if applicable."""
    final_output: Optional[Any]
    """Final output of this run.
    Only available after the run has finished successfully."""
    end_time: Optional[str]
    """ISO-8601 timestamp of when the run ended.
    Only available after the run has finished."""


class RunState(TypedDict):
    id: str
    """ID of the run."""
    streamed_output: List[Any]
    """List of output chunks streamed by Runnable.stream()"""
    final_output: Optional[Any]
    """Final output of the run, usually the result of aggregating (`+`) streamed_output.
    Only available after the run has finished successfully."""

    logs: Dict[str, LogEntry]
    """Map of run names to sub-runs. If filters were supplied, this list will
    contain only the runs that matched the filters."""

## Streaming JSONPatch chunks

In [46]:
async for chunk in retrieval_chain.astream_log(
    "where did harrison work?", include_names=["Docs"]
):
    print("-" * 40)
    print(chunk)

----------------------------------------
RunLogPatch({'op': 'replace',
  'path': '',
  'value': {'final_output': None,
            'id': '12fcac21-f78f-4687-87ae-42c81d48b6cf',
            'logs': {},
            'name': 'RunnableSequence',
            'streamed_output': [],
            'type': 'chain'}})
----------------------------------------
RunLogPatch({'op': 'add',
  'path': '/logs/Docs',
  'value': {'end_time': None,
            'final_output': None,
            'id': 'f9bb8b61-cf14-46fa-b6ed-5f4cccc9f70b',
            'metadata': {},
            'name': 'Docs',
            'start_time': '2024-02-18T12:23:52.968+00:00',
            'streamed_output': [],
            'streamed_output_str': [],
            'tags': ['map:key:context', 'FAISS', 'OpenAIEmbeddings'],
            'type': 'retriever'}})
----------------------------------------
RunLogPatch({'op': 'add',
  'path': '/logs/Docs/final_output',
  'value': {'documents': [Document(page_content='harrison worked at kensho')]}},
 

## Streaming the incremental RunState

In [47]:
async for chunk in retrieval_chain.astream_log(
    "where did harrison work?", include_names=["Docs"], diff=False
):
    print("-" * 70)
    print(chunk)

----------------------------------------------------------------------
RunLog({'final_output': None,
 'id': '18587155-d9af-4d21-b4cd-a080a7c593e6',
 'logs': {},
 'name': 'RunnableSequence',
 'streamed_output': [],
 'type': 'chain'})
----------------------------------------------------------------------
RunLog({'final_output': None,
 'id': '18587155-d9af-4d21-b4cd-a080a7c593e6',
 'logs': {'Docs': {'end_time': None,
                   'final_output': None,
                   'id': '38daf989-1435-421a-827e-346f0cd132b5',
                   'metadata': {},
                   'name': 'Docs',
                   'start_time': '2024-02-18T12:24:38.108+00:00',
                   'streamed_output': [],
                   'streamed_output_str': [],
                   'tags': ['map:key:context', 'FAISS', 'OpenAIEmbeddings'],
                   'type': 'retriever'}},
 'name': 'RunnableSequence',
 'streamed_output': [],
 'type': 'chain'})
-------------------------------------------------------------

# Parallelism

In [22]:
from langchain_core.runnables import RunnableParallel

chain1 = ChatPromptTemplate.from_template("tell me a joke about {topic}") | model
chain2 = ChatPromptTemplate.from_template("write a short (2 line) poem about {topic}")  | model 

combined = RunnableParallel(joke=chain1, poem=chain2)

In [26]:
%%time
chain1.invoke({"topic": "bears"})

CPU times: total: 15.6 ms
Wall time: 1.04 s


AIMessage(content='Why do bears have hairy coats?\n\nFur protection!')

In [27]:
%%time
chain2.invoke({"topic": "bears"})

CPU times: total: 31.2 ms
Wall time: 672 ms


AIMessage(content='In the forest they roam, majestic and rare,\nBears are gentle giants, with a wild, untamed flair.')

In [28]:
%%time
combined.invoke({"topic": "bears"})

CPU times: total: 78.1 ms
Wall time: 43.3 s


{'joke': AIMessage(content="Why did the bear break up with his girlfriend?\n\nBecause he couldn't bear the relationship any longer!"),
 'poem': AIMessage(content='In the forest deep, bears roam free\nMajestic creatures of wild beauty')}

# Parallelism on batches

In [29]:
%%time
chain1.batch([{"topic": "bears"}, {"topic": "cats"}])

CPU times: total: 78.1 ms
Wall time: 21.1 s


[AIMessage(content="Why did the bear break up with his girlfriend? \n\nBecause he couldn't bear the relationship any longer!"),
 AIMessage(content='Why was the cat sitting on the computer?\n\nBecause it wanted to keep an eye on the mouse!')]

In [30]:
%%time
chain2.batch([{"topic": "bears"}, {"topic": "cats"}])

CPU times: total: 109 ms
Wall time: 41.6 s


[AIMessage(content='In the forest deep, the bear roams free\nMajestic and wild, a sight to see'),
 AIMessage(content='Whiskers soft, eyes gleaming bright,\nStealthy hunters of the night.')]

In [31]:
%%time
combined.batch([{"topic": "bears"}, {"topic": "cats"}])

RateLimitError: Error code: 429 - {'error': {'message': 'Rate limit reached for gpt-3.5-turbo in organization org-1NcNxm58gFN5a38KyBr8Qp2A on requests per min (RPM): Limit 3, Used 3, Requested 1. Please try again in 20s. Visit https://platform.openai.com/account/rate-limits to learn more. You can increase your rate limit by adding a payment method to your account at https://platform.openai.com/account/billing.', 'type': 'requests', 'param': None, 'code': 'rate_limit_exceeded'}}