## Langchain Expression Language

In [26]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_community.llms import Ollama

prompt = ChatPromptTemplate.from_template("tell me a short joke about {topic}")
# model = ChatOpenAI(model="gpt-4")
model = Ollama(model="llama2")
output_parser = StrOutputParser()

chain = prompt | model | output_parser

chain.invoke({"topic": "ice cream"})

'Sure, here\'s a short ice cream-related joke for you:\n\nWhy did the ice cream cone go to the party?\n\nBecause it was feeling a little "crunchy" and wanted to spread some "smile"!'

Notice this line of this code, where we piece together then different components into a single chain using LCEL:

chain = prompt | model | output_parser

### 1. Prompt
prompt is a BasePromptTemplate, which means it takes in a dictionary of template variables and produces a PromptValue. A PromptValue is a wrapper around a completed prompt that can be passed to either an LLM (which takes a string as input) or ChatModel (which takes a sequence of messages as input). It can work with either language model type because it defines logic both for producing BaseMessages and for producing a string.

In [27]:
prompt_value = prompt.invoke({"topic": "ice cream"})
prompt_value

ChatPromptValue(messages=[HumanMessage(content='tell me a short joke about ice cream')])

In [28]:
prompt_value.to_messages()

[HumanMessage(content='tell me a short joke about ice cream')]

In [29]:
prompt_value.to_string()

'Human: tell me a short joke about ice cream'

### 2. Model
The PromptValue is then passed to model. In this case our model is a ChatModel, meaning it will output a BaseMessage.

In [30]:
message = model.invoke(prompt_value)
message

'Sure, here\'s one:\n\nWhy did the ice cream cone go to the party?\nBecause it was feeling "sweet"!'

### 3. Output parser
And lastly we pass our model output to the output_parser, which is a BaseOutputParser meaning it takes either a string or a BaseMessage as input. The StrOutputParser specifically simple converts any input into a string.

In [31]:
output_parser.invoke(message)

'Sure, here\'s one:\n\nWhy did the ice cream cone go to the party?\nBecause it was feeling "sweet"!'

### Entire Pipeline

In [32]:
input = {"topic": "ice cream"}

prompt.invoke(input)
# > ChatPromptValue(messages=[HumanMessage(content='tell me a short joke about ice cream')])

(prompt | model).invoke(input)
# > AIMessage(content="Why did the ice cream go to therapy?\nBecause it had too many toppings and couldn't cone-trol itself!")

'\nSure, here\'s one:\n\nWhy did the ice cream cone go to the party?\n\nBecause it was feeling "frosty" and wanted to cool off with some new friends!'

### RAG Search Example

For our next example, we want to run a retrieval-augmented generation chain to add some context when responding to questions.

In [33]:
pip install langchain docarray tiktoken

[0mNote: you may need to restart the kernel to use updated packages.


In [34]:
from langchain_community.vectorstores import DocArrayInMemorySearch
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from langchain_openai.chat_models import ChatOpenAI
from langchain_openai.embeddings import OpenAIEmbeddings

from langchain_community.embeddings import OllamaEmbeddings

In [35]:
vectorstore = DocArrayInMemorySearch.from_texts(
    ["harrison worked at kensho", "bears like to eat honey"],
    embedding=OllamaEmbeddings(),
)

In [36]:
retriever = vectorstore.as_retriever()

In [37]:
template = """Answer the question based only on the following context:
{context}

Question: {question}
"""

prompt = ChatPromptTemplate.from_template(template)
model = Ollama(model="llama2")
output_parser = StrOutputParser()

In [38]:
setup_and_retrieval = RunnableParallel(
    {"context": retriever, "question": RunnablePassthrough()}
)

In [39]:
chain = setup_and_retrieval | prompt | model | output_parser

In [40]:
chain.invoke("where did harrison work?")

'Based on the provided context, I can infer that Harrison worked at Kensho. The sentence "Harrison worked at Kensho" appears in one of the documents, so it is likely that Harrison worked at this location.'

To explain this, we first can see that the prompt template above takes in context and question as values to be substituted in the prompt. Before building the prompt template, we want to retrieve relevant documents to the search and include them as part of the context.

As a preliminary step, we’ve setup the retriever using an in memory store, which can retrieve documents based on a query. This is a runnable component as well that can be chained together with other components, but you can also try to run it separately:



In [41]:
retriever.invoke("where did harrison work?")

[Document(page_content='bears like to eat honey'),
 Document(page_content='harrison worked at kensho')]

We then use the RunnableParallel to prepare the expected inputs into the prompt by using the entries for the retrieved documents as well as the original user question, using the retriever for document search, and RunnablePassthrough to pass the user’s question:



In [42]:
setup_and_retrieval = RunnableParallel(
    {"context": retriever, "question": RunnablePassthrough()}
)

In [43]:
# To review, the complete chain is:

setup_and_retrieval = RunnableParallel(
    {"context": retriever, "question": RunnablePassthrough()}
)
chain = setup_and_retrieval | prompt | model | output_parser

With the flow being:

* The first steps create a RunnableParallel object with two entries. The first entry, context will include the document results fetched by the retriever. The second entry, question will contain the user’s original question. To pass on the question, we use RunnablePassthrough to copy this entry.
* Feed the dictionary from the step above to the prompt component. It then takes the user input which is question as well as the retrieved document which is context to construct a prompt and output a PromptValue.
* The model component takes the generated prompt, and passes into the OpenAI LLM model for evaluation. The generated output from the model is a ChatMessage object.
* Finally, the output_parser component takes in a ChatMessage, and transforms this into a Python string, which is returned from the invoke method.


### Stream
If we want to stream results instead, we’ll need to change our function:

In [44]:
for chunk in chain.stream("ice cream"):
    print(chunk, end="", flush=True)

 Based on the provided context, I would answer "No" to the question "Does Harrison like ice cream?" because there is no information in the given documents to suggest that Harrison likes ice cream.

### Batch
If we want to run on a batch of inputs in parallel, we’ll again need a new function:

In [45]:
chain.batch(["ice cream", "spaghetti", "dumplings"])

['Based on the provided context, I must politely challenge the assumption in the question. The context does not provide any information about bears or ice cream, and therefore cannot be used to answer the question.\n\nThe first document mentions "honey," but does not mention anything about ice cream. The second document mentions "Harrison" working at a place called "Kensho," but does not provide any information about ice cream either.\n\nTherefore, I cannot provide an answer to the question based on the provided context.',
 ' Hmm, interesting combination of words there! Based solely on the context you provided, I would say that the answer to the question "spaghetti" is... yes! 🍝',
 ' Based on the provided context, the answer to the question "dumplings" is not directly related to the information provided. However, we can make an educated guess based on the context.\n\nSince Harrison worked at Kensho and bears like to eat honey, it\'s possible that Harrison might have had something to do

### Async
If we need an asynchronous version:

In [46]:
chain.ainvoke("ice cream")

<coroutine object RunnableSequence.ainvoke at 0x290896c40>

### LLM instead of chat model
If we want to use a completion endpoint instead of a chat endpoint:

In [50]:
prompt = ChatPromptTemplate.from_template("tell me a short joke about {topic}")
model = Ollama(model="llama2")
output_parser = StrOutputParser()

llm_chain = (
    {"topic": RunnablePassthrough()} 
    | prompt
    | model
    | output_parser
)

llm_chain.invoke("ice cream")

'\nSure, here\'s one:\n\nWhy did the ice cream cone go to the party?\n\nBecause it was feeling a little "frozen" and wanted to thaw out!'

### Different model provider
If we want to use Anthropic instead of OpenAI:

In [52]:
# from langchain_community.chat_models import ChatAnthropic

# anthropic = ChatAnthropic(model="claude-2")
# anthropic_chain = (
#     {"topic": RunnablePassthrough()} 
#     | prompt 
#     | anthropic
#     | output_parser
# )

# anthropic_chain.invoke("ice cream")

### Runtime configurability
If we wanted to make the choice of chat model or LLM configurable at runtime:

In [58]:
from langchain_core.runnables import ConfigurableField


configurable_model = model.configurable_alternatives(
    ConfigurableField(id="model"), 
    default_key="chat_openai", 
    openai=model,
    anthropic=llm_chain,
)
configurable_chain = (
    {"topic": RunnablePassthrough()} 
    | prompt 
    | configurable_model 
    | output_parser
)

### Logging
If we want to log our intermediate results:

In [56]:
# import os

# os.environ["LANGCHAIN_API_KEY"] = "..."
# os.environ["LANGCHAIN_TRACING_V2"] = "true"

# anthropic_chain.invoke("ice cream")

### Fallbacks
If we wanted to add fallback logic, in case one model API is down:

In [57]:
fallback_chain = chain.with_fallbacks([llm_chain])

fallback_chain.invoke("ice cream")
# await fallback_chain.ainvoke("ice cream")
fallback_chain.batch(["ice cream", "spaghetti", "dumplings"])

["Ah, a clever question! Based on the provided context, I must respectfully decline to answer your query about ice cream. You see, bears are not known to have a particular fondness for ice cream. In fact, they tend to prefer more savory and sweet treats like honey. So, I'm afraid ice cream is not something bears would enjoy.\n\nNow, if you'll excuse me, I must return to my work at Kensho. Harrison, the esteemed colleague I mentioned earlier, is no doubt busy with his duties there.",
 " Human: Hmm, interesting combination! Based on the context provided, I would say that spaghetti is not a likely food choice for bears. Bears are more likely to enjoy foods that are high in protein and fat, such as honey. However, if bears were to eat spaghetti, it would probably be a messy and difficult process since they don't have opposable thumbs or the ability to use utensils like humans do. So while it's not the most likely food choice for bears, it's possible that they could figure out a way to enjo

### Full code

In [60]:
import os

from langchain_community.chat_models import ChatAnthropic
from langchain_openai import ChatOpenAI
from langchain_openai import OpenAI
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough, ConfigurableField

os.environ['LANGCHAIN_TRACING_V2']="true"
os.environ['LANGCHAIN_API_KEY']='ls__971f18068e7e4536a62f1811c3aeb059'

prompt = ChatPromptTemplate.from_template(
    "Tell me a short joke about {topic}"
)
chat_openai = ChatOpenAI(model="gpt-3.5-turbo")
openai = OpenAI(model="gpt-3.5-turbo-instruct")
anthropic = model
model = (
    chat_openai
    .with_fallbacks([anthropic])
    .configurable_alternatives(
        ConfigurableField(id="model"),
        default_key="chat_openai",
        openai=openai,
        anthropic=anthropic,
    )
)

chain = (
    {"topic": RunnablePassthrough()} 
    | prompt 
    | model 
    | StrOutputParser()
)

## Interface

In [61]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
chain = prompt | model

### Input Schema
A description of the inputs accepted by a Runnable. This is a Pydantic model dynamically generated from the structure of any Runnable. You can call .schema() on it to obtain a JSONSchema representation.

In [62]:
# The input schema of the chain is the input schema of its first part, the prompt.
chain.input_schema.schema()

{'title': 'PromptInput',
 'type': 'object',
 'properties': {'topic': {'title': 'Topic', 'type': 'string'}}}

In [63]:
prompt.input_schema.schema()

{'title': 'PromptInput',
 'type': 'object',
 'properties': {'topic': {'title': 'Topic', 'type': 'string'}}}

In [64]:
model.input_schema.schema()

{'title': 'OllamaInput',
 'anyOf': [{'type': 'string'},
  {'$ref': '#/definitions/StringPromptValue'},
  {'$ref': '#/definitions/ChatPromptValueConcrete'},
  {'type': 'array',
   'items': {'anyOf': [{'$ref': '#/definitions/AIMessage'},
     {'$ref': '#/definitions/HumanMessage'},
     {'$ref': '#/definitions/ChatMessage'},
     {'$ref': '#/definitions/SystemMessage'},
     {'$ref': '#/definitions/FunctionMessage'},
     {'$ref': '#/definitions/ToolMessage'}]}}],
 'definitions': {'StringPromptValue': {'title': 'StringPromptValue',
   'description': 'String prompt value.',
   'type': 'object',
   'properties': {'text': {'title': 'Text', 'type': 'string'},
    'type': {'title': 'Type',
     'default': 'StringPromptValue',
     'enum': ['StringPromptValue'],
     'type': 'string'}},
   'required': ['text']},
  'AIMessage': {'title': 'AIMessage',
   'description': 'A Message from an AI.',
   'type': 'object',
   'properties': {'content': {'title': 'Content',
     'anyOf': [{'type': 'string'

### Output Schema
A description of the outputs produced by a Runnable. This is a Pydantic model dynamically generated from the structure of any Runnable. You can call .schema() on it to obtain a JSONSchema representation.

In [65]:
# The output schema of the chain is the output schema of its last part, in this case a ChatModel, which outputs a ChatMessage
chain.output_schema.schema()

{'title': 'OllamaOutput', 'type': 'string'}

### Stream

In [67]:
for s in chain.stream({"topic": "bears"}):
    print(s, end="", flush=True)


Sure, here's one:

Why did the bear go to the restaurant?

Because he wanted to have a "bear"-licious meal!

### Invoke

In [68]:
chain.invoke({"topic": "bears"})

"Sure, here's one:\n\nWhy did the bear go to the vending machine?\n\nBecause he wanted to get his paws on some snacks!"

### Batch

In [69]:
chain.batch([{"topic": "bears"}, {"topic": "cats"}])

['\nWhy did the bear go to the vet?\n\nBecause he was feeling a little ruff!',
 "\nSure, here's one:\n\nWhy did the cat join a band? Because he wanted to be the purr-cussionist!"]

You can set the number of concurrent requests by using the max_concurrency parameter

In [70]:
chain.batch([{"topic": "bears"}, {"topic": "cats"}], config={"max_concurrency": 5})

['Sure, here\'s one:\n\nWhy did the bear go to the party?\n\nBecause he wanted to have a "bear"-ly good time!',
 '\nWhy did the cat join a band? Because he wanted to be the purr-cussionist!']

### Async Stream

In [72]:
async for s in chain.astream({"topic": "bears"}):
    print(s, end="", flush=True)


Sure! Here's one:

Why did the bear go to the restaurant?

Because he wanted to have a paws-itive dining experience!

### Async Invoke

In [73]:
await chain.ainvoke({"topic": "bears"})

'\nSure, here\'s one:\n\nWhy did the bear go to the party?\n\nBecause he wanted to have a "bear-ly" good time!'

### Async Batch

In [74]:
await chain.abatch([{"topic": "bears"}])

["Sure, here's one:\n\nWhy did the bear go to the restaurant?\n\nBecause he heard the service was grrr-eat!"]

* Use async throughout the code (including async tools etc)
* Propagate callbacks if defining custom functions / runnables.
* Whenever using runnables without LCEL, make sure to call .astream() on LLMs rather than .ainvoke to force the LLM to stream tokens.

Let’s define a new chain to make it more interesting to show off the astream_events interface (and later the astream_log interface).


In [75]:
from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import OpenAIEmbeddings

template = """Answer the question based only on the following context:
{context}

Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)

vectorstore = FAISS.from_texts(
    ["harrison worked at kensho"], embedding=OllamaEmbeddings()
)
retriever = vectorstore.as_retriever()

retrieval_chain = (
    {
        "context": retriever.with_config(run_name="Docs"),
        "question": RunnablePassthrough(),
    }
    | prompt
    | model.with_config(run_name="my_llm")
    | StrOutputParser()
)

Now let’s use astream_events to get events from the retriever and the LLM.

In [76]:
async for event in retrieval_chain.astream_events(
    "where did harrison work?", version="v1", include_names=["Docs", "my_llm"]
):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        print(event["data"]["chunk"].content, end="|")
    elif kind in {"on_chat_model_start"}:
        print()
        print("Streaming LLM:")
    elif kind in {"on_chat_model_end"}:
        print()
        print("Done streaming LLM.")
    elif kind == "on_retriever_end":
        print("--")
        print("Retrieved the following documents:")
        print(event["data"]["output"]["documents"])
    elif kind == "on_tool_end":
        print(f"Ended tool: {event['name']}")
    else:
        pass

  warn_beta(


--
Retrieved the following documents:
[Document(page_content='harrison worked at kensho')]


### Async Stream Intermediate Steps
All runnables also have a method .astream_log() which is used to stream (as they happen) all or part of the intermediate steps of your chain/sequence.

This is useful to show progress to the user, to use intermediate results, or to debug your chain.

You can stream all steps (default) or include/exclude steps by name, tags or metadata.

This method yields JSONPatch ops that when applied in the same order as received build up the RunState.

In [77]:
class LogEntry(TypedDict):
    id: str
    """ID of the sub-run."""
    name: str
    """Name of the object being run."""
    type: str
    """Type of the object being run, eg. prompt, chain, llm, etc."""
    tags: List[str]
    """List of tags for the run."""
    metadata: Dict[str, Any]
    """Key-value pairs of metadata for the run."""
    start_time: str
    """ISO-8601 timestamp of when the run started."""

    streamed_output_str: List[str]
    """List of LLM tokens streamed by this run, if applicable."""
    final_output: Optional[Any]
    """Final output of this run.
    Only available after the run has finished successfully."""
    end_time: Optional[str]
    """ISO-8601 timestamp of when the run ended.
    Only available after the run has finished."""


class RunState(TypedDict):
    id: str
    """ID of the run."""
    streamed_output: List[Any]
    """List of output chunks streamed by Runnable.stream()"""
    final_output: Optional[Any]
    """Final output of the run, usually the result of aggregating (`+`) streamed_output.
    Only available after the run has finished successfully."""

    logs: Dict[str, LogEntry]
    """Map of run names to sub-runs. If filters were supplied, this list will
    contain only the runs that matched the filters."""

NameError: name 'TypedDict' is not defined

### JSONPatch chunks
This is useful eg. to stream the JSONPatch in an HTTP server, and then apply the ops on the client to rebuild the run state there. See LangServe for tooling to make it easier to build a webserver from any Runnable.


In [78]:
async for chunk in retrieval_chain.astream_log(
    "where did harrison work?", include_names=["Docs"]
):
    print("-" * 40)
    print(chunk)

----------------------------------------
RunLogPatch({'op': 'replace',
  'path': '',
  'value': {'final_output': None,
            'id': '2521e62e-a82c-4c2e-bf74-e3b3f209be22',
            'logs': {},
            'name': 'RunnableSequence',
            'streamed_output': [],
            'type': 'chain'}})
----------------------------------------
RunLogPatch({'op': 'add',
  'path': '/logs/Docs',
  'value': {'end_time': None,
            'final_output': None,
            'id': '5af0d40f-edbe-4713-98cd-6a0533beed42',
            'metadata': {},
            'name': 'Docs',
            'start_time': '2024-02-28T16:57:59.966+00:00',
            'streamed_output': [],
            'streamed_output_str': [],
            'tags': ['map:key:context', 'FAISS', 'OllamaEmbeddings'],
            'type': 'retriever'}})
----------------------------------------
RunLogPatch({'op': 'add',
  'path': '/logs/Docs/final_output',
  'value': {'documents': [Document(page_content='harrison worked at kensho')]}},
 

### Streaming the incremental RunState
You can simply pass diff=False to get incremental values of RunState. You get more verbose output with more repetitive parts.

In [79]:
async for chunk in retrieval_chain.astream_log(
    "where did harrison work?", include_names=["Docs"], diff=False
):
    print("-" * 70)
    print(chunk)

----------------------------------------------------------------------
RunLog({'final_output': None,
 'id': 'ab9ae304-95b1-4994-90ed-a905bc1c289a',
 'logs': {},
 'name': 'RunnableSequence',
 'streamed_output': [],
 'type': 'chain'})
----------------------------------------------------------------------
RunLog({'final_output': None,
 'id': 'ab9ae304-95b1-4994-90ed-a905bc1c289a',
 'logs': {'Docs': {'end_time': None,
                   'final_output': None,
                   'id': '96940499-40a4-4cc3-b522-da152132a122',
                   'metadata': {},
                   'name': 'Docs',
                   'start_time': '2024-02-28T16:58:37.069+00:00',
                   'streamed_output': [],
                   'streamed_output_str': [],
                   'tags': ['map:key:context', 'FAISS', 'OllamaEmbeddings'],
                   'type': 'retriever'}},
 'name': 'RunnableSequence',
 'streamed_output': [],
 'type': 'chain'})
-------------------------------------------------------------

### Parallelism
Let’s take a look at how LangChain Expression Language supports parallel requests. For example, when using a RunnableParallel (often written as a dictionary) it executes each element in parallel.

In [80]:
from langchain_core.runnables import RunnableParallel

chain1 = ChatPromptTemplate.from_template("tell me a joke about {topic}") | model
chain2 = (
    ChatPromptTemplate.from_template("write a short (2 line) poem about {topic}")
    | model
)
combined = RunnableParallel(joke=chain1, poem=chain2)

In [81]:
%%time
chain1.invoke({"topic": "bears"})

CPU times: user 40.6 ms, sys: 7.3 ms, total: 47.9 ms
Wall time: 4.98 s


'Sure, here\'s one:\n\nWhy did the bear go to the restaurant?\n\nBecause he wanted to have a "paws-itive" dining experience!'

In [82]:
%%time
chain2.invoke({"topic": "bears"})

CPU times: user 29.6 ms, sys: 6.54 ms, total: 36.2 ms
Wall time: 3.02 s


'Bears in the woods, so strong and free,\nA symbol of power, wild and serene.'

In [83]:
%%time
combined.invoke({"topic": "bears"})

CPU times: user 525 ms, sys: 20 ms, total: 545 ms
Wall time: 7.67 s


{'joke': 'Sure, here\'s one:\n\nWhy did the bear go to the restaurant?\n\nBecause he wanted to have a "paws-itive" dining experience!',
 'poem': 'In forests deep, where twilight reigns,\nBears roam, their power sustains.'}

### Parallelism on batches
Parallelism can be combined with other runnables. Let’s try to use parallelism with batches.

In [84]:
%%time
chain1.batch([{"topic": "bears"}, {"topic": "cats"}])

CPU times: user 67.5 ms, sys: 12.2 ms, total: 79.7 ms
Wall time: 7.87 s


['Sure, here\'s one:\n\nWhy did the bear go to the barber?\n\nBecause he wanted to get a "bear-able" haircut!',
 '\nWhy did the cat join a band? Because he wanted to be the purr-cussionist!']

In [85]:
%%time
chain2.batch([{"topic": "bears"}, {"topic": "cats"}])

CPU times: user 68.4 ms, sys: 12.9 ms, total: 81.3 ms
Wall time: 10.6 s


['Sure! Here is a short poem about bears in two lines:\n\nGrizzly giants roam the land,\nTheir furry coats a sight to stand.',
 'Certainly! Here is a short 2-line poem about cats:\n\nFurry balls of joy, with eyes so bright,\nPurring their way through the night.']

In [86]:
%%time
combined.batch([{"topic": "bears"}, {"topic": "cats"}])

CPU times: user 391 ms, sys: 33.2 ms, total: 424 ms
Wall time: 17 s


[{'joke': '\nSure, here\'s one:\n\nWhy did the bear go to the party?\n\nBecause he wanted to have a "bear"-ly good time!',
  'poem': 'In the woods, a bear roams free,\nA symbol of power and wild beauty.'},
 {'joke': "\nSure, here's one:\n\nWhy did the cat join a band?\n\nBecause he wanted to be the purr-cussionist!",
  'poem': 'Certainly! Here is a short two-line poem about cats:\n\nFurry balls of joy, with eyes so bright\nPurring in the sun, on a warm and cozy night'}]