## LCEL and Chain Interface

LCEL (LangChain Expression Language) is a declarative method designed for succinctly describing chains.

While LLMs can suffice in simple applications, complex applications often necessitate the chaining of LLMs with other components. LangChain furnishes two superior frameworks for this purpose:

    Chain interface: The conventional method
    LCEL: LangChain Expression Language

For those constructing new applications, the use of LCEL is advocated. Notably, Chain can be integrated within LCEL, allowing for a hybrid utilization of both.


## Advantages of LCEL

The benefits of using LCEL include:

    Asynchronous, Batch, and Streaming Support:
    Chains crafted in LCEL inherently cater to both synchronous and asynchronous interfaces, as well as batch and streaming capabilities. This flexibility simplifies the process of starting with a synchronous prototype and later transitioning it to an asynchronous streaming interface.

    Fallback in Chains:
    With LCEL, any chain can readily incorporate fallbacks. This feature streamlines error management and fosters graceful handling.

    Parallel Processing:
    LCEL-composed chains inherently support parallel processing for all its components. Given that LLM applications frequently incorporate lengthy API calls, the importance of parallelism cannot be understated.

    Seamless LangSmith Trace Integration:
    An inherent trait of LCEL is the automatic logging of every step in LangSmith, ensuring optimum observability and debuggability.

In [1]:
# !pip install langchain==0.0.321 langchain-openai

In [2]:
import os
os.environ["OPENAI_API_KEY"] = ""

## Describe the chain with "LCEL".
The chain that connects "prompt template → model" is written as " prompt | model ".

In [3]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.prompt_values import ChatPromptValue, HumanMessage
from langchain_core.output_parsers import StrOutputParser


model = ChatOpenAI(temperature = 0.9)
prompt = ChatPromptTemplate.from_template("Tell a joke about {topic}")
chain = prompt | model

In [4]:
chain

ChatPromptTemplate(input_variables=['topic'], messages=[HumanMessagePromptTemplate(prompt=PromptTemplate(input_variables=['topic'], template='Tell a joke about {topic}'))])
| ChatOpenAI(client=<openai.resources.chat.completions.Completions object at 0x7f7d8aa4cc10>, async_client=<openai.resources.chat.completions.AsyncCompletions object at 0x7f7d8aa60220>, temperature=0.9, openai_api_key=SecretStr('**********'), openai_proxy='')

## Streaming responses

In [5]:
for s in chain.stream({"topic": "cars"}):
    print(s.content, end="", flush=True)

Why did the car refuse to play hide and seek?

Because it was always in the garage!

## LangChain Components and the Runnable Protocol

LangChain components implement the "Runnable" protocol. This protocol provides a standard interface for defining custom chains and invoking them in a standardized manner.

### Standard Interface

The primary standard interfaces include:

    stream: Stream back chunks of response.
    invoke: Invoke chain with input.
    batch: Invoke chain with a list of inputs.

And there's a corresponding set of asynchronous methods:

    astream: Asynchronously streams back a chunk of the response.
    ainvoke: Asynchronously invokes the chain with an input.
    abatch: Asynchronously invokes the chain with a list of inputs.
    astream_log: Streams back intermediate steps in addition to the final response.


### Input Type and Output Type
Input Types

Depending on the component, the types of input can be:

    Prompt: Dictionary
    Retriever: Single string
    LLM, ChatModel: Single string, message list, PromptValue
    Tool: Single string or dictionary (depends on the tool)
    OutputParser: Output of LLM or ChatModel
    
Output Types

Output types vary based on the component:

    LLM: String
    ChatModel: Chat message
    Prompt: PromptValue
    Retriever: List of documents
    Tool: Depends on the tool
    OutputParser: Depends on the parser   

### Checking Input and Output Schemas

You can inspect the schemas for input and output types using:

    input_schema: Input type schema (Pydantic)
    output_schema: Output type schema (Pydantic)


In [6]:
chain.input_schema.schema()

{'title': 'PromptInput',
 'type': 'object',
 'properties': {'topic': {'title': 'Topic', 'type': 'string'}}}

In [7]:
prompt.input_schema.schema()

{'title': 'PromptInput',
 'type': 'object',
 'properties': {'topic': {'title': 'Topic', 'type': 'string'}}}

In [8]:
model.input_schema.schema()

{'title': 'ChatOpenAIInput',
 'anyOf': [{'type': 'string'},
  {'$ref': '#/definitions/StringPromptValue'},
  {'$ref': '#/definitions/ChatPromptValueConcrete'},
  {'type': 'array',
   'items': {'anyOf': [{'$ref': '#/definitions/AIMessage'},
     {'$ref': '#/definitions/HumanMessage'},
     {'$ref': '#/definitions/ChatMessage'},
     {'$ref': '#/definitions/SystemMessage'},
     {'$ref': '#/definitions/FunctionMessage'},
     {'$ref': '#/definitions/ToolMessage'}]}}],
 'definitions': {'StringPromptValue': {'title': 'StringPromptValue',
   'description': 'String prompt value.',
   'type': 'object',
   'properties': {'text': {'title': 'Text', 'type': 'string'},
    'type': {'title': 'Type',
     'default': 'StringPromptValue',
     'enum': ['StringPromptValue'],
     'type': 'string'}},
   'required': ['text']},
  'AIMessage': {'title': 'AIMessage',
   'description': 'Message from an AI.',
   'type': 'object',
   'properties': {'content': {'title': 'Content',
     'anyOf': [{'type': 'strin

In [9]:
chain.output_schema.schema()

{'title': 'ChatOpenAIOutput',
 'anyOf': [{'$ref': '#/definitions/AIMessage'},
  {'$ref': '#/definitions/HumanMessage'},
  {'$ref': '#/definitions/ChatMessage'},
  {'$ref': '#/definitions/SystemMessage'},
  {'$ref': '#/definitions/FunctionMessage'},
  {'$ref': '#/definitions/ToolMessage'}],
 'definitions': {'AIMessage': {'title': 'AIMessage',
   'description': 'Message from an AI.',
   'type': 'object',
   'properties': {'content': {'title': 'Content',
     'anyOf': [{'type': 'string'},
      {'type': 'array',
       'items': {'anyOf': [{'type': 'string'}, {'type': 'object'}]}}]},
    'additional_kwargs': {'title': 'Additional Kwargs', 'type': 'object'},
    'type': {'title': 'Type',
     'default': 'ai',
     'enum': ['ai'],
     'type': 'string'},
    'example': {'title': 'Example', 'default': False, 'type': 'boolean'}},
   'required': ['content']},
  'HumanMessage': {'title': 'HumanMessage',
   'description': 'Message from a human.',
   'type': 'object',
   'properties': {'content': 

In [10]:
chain.invoke({"topic": "Football"})

AIMessage(content='Why did the football coach go to the bank?\n\nTo get his quarterback!')

In [11]:
chain.batch([{"topic": "Love"}, {"topic": "Romance"}])

[AIMessage(content='Why did the man bring a ladder to his date? Because he heard love is in the air!'),
 AIMessage(content='Why did the romance writer break up with their partner? Because they realized they were just a chapter in their life, not the whole book!')]

In [12]:
chain.batch([{"topic": "Coding"}, {"topic": "Travelling"}], config={"max_concurrency": 5})

[AIMessage(content='Why was the computer cold?\n\nBecause it left its Windows open!'),
 AIMessage(content="Why don't scientists trust atoms when they're traveling?\n\nBecause they make up everything!")]

## Async Stream

In [13]:
async for s in chain.astream({"topic": "Satellites"}):
    print(s.content, end="", flush=True)

Why did the satellite bring a camera to the party?   
Because it wanted to take a groupie!

### Async Invoke

In [14]:
async for s in chain.astream({"topic": "Food"}):
    print(s.content, end="", flush=True)

Why did the tomato turn red?

Because it saw the salad dressing!

### Async Batch

In [15]:
await chain.abatch([{"topic": "Food"}, {"topic": "Sweets"}])

[AIMessage(content='Why was the math book sad?\n\nBecause it had too many problems.'),
 AIMessage(content='Why did the cookie go to the doctor? Because it was feeling crumbly!')]

### Async Stream with intermediate steps

Useful for displaying progress to the user, working with intermediate results, and debugging chains. You can stream all steps (default) or include or exclude steps by name, tags, or metadata.

In [16]:
# !pip install faiss-cpu tiktoken

Execute the Retriever chain and output intermediate steps using astream_log()

In [17]:
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_core.output_parsers.string import StrOutputParser

from langchain_community.vectorstores import FAISS
from langchain_core.runnables import (
    RunnableLambda,
    RunnableParallel,
    RunnablePassthrough,
)

template = """Please answer the question based only on the following context:
{context}

Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)

vectorstore = FAISS.from_texts(["Sonu is the creator of AI Anytime Youtube Channel"], embedding=OpenAIEmbeddings())
retriever = vectorstore.as_retriever()

retrieval_chain = (
    {"context": retriever.with_config(run_name='Docs'), "question": RunnablePassthrough()}
    | prompt 
    | model 
    | StrOutputParser()
)

async for chunk in retrieval_chain.astream_log("Who is the creator of AI Anytime?", include_names=['Docs']):
    print("-"*40)
    print(chunk)

  warn_deprecated(


----------------------------------------
RunLogPatch({'op': 'replace',
  'path': '',
  'value': {'final_output': None,
            'id': '86d7c5bb-7113-485c-8ec9-e6ecea6bd86a',
            'logs': {},
            'name': 'RunnableSequence',
            'streamed_output': [],
            'type': 'chain'}})
----------------------------------------
RunLogPatch({'op': 'add',
  'path': '/logs/Docs',
  'value': {'end_time': None,
            'final_output': None,
            'id': 'caf070d5-fb6e-4bae-aed0-1a12754bd29e',
            'metadata': {},
            'name': 'Docs',
            'start_time': '2024-03-29T11:40:18.067+00:00',
            'streamed_output': [],
            'streamed_output_str': [],
            'tags': ['map:key:context', 'FAISS', 'OpenAIEmbeddings'],
            'type': 'retriever'}})
----------------------------------------
RunLogPatch({'op': 'add',
  'path': '/logs/Docs/final_output',
  'value': {'documents': [Document(page_content='Sonu is the creator of AI Anytime

## Parallel Processing

"RunnableParallel" allows each element to run in parallel.

In [18]:
from langchain_core.runnables.base import RunnableParallel
chain1 = ChatPromptTemplate.from_template("Tell a joke about {topic}") | model
chain2 = ChatPromptTemplate.from_template("Write a short (2 line) poem about {topic}") | model
combined = RunnableParallel(joke=chain1, poem=chain2)

In [19]:
%%time
chain1.batch([{"topic": "AI"}, {"topic": "Math"}])

CPU times: user 13.1 ms, sys: 2.53 ms, total: 15.7 ms
Wall time: 1.01 s


[AIMessage(content="Why did the AI break up with his girlfriend?\nBecause she couldn't handle all his algorithms!"),
 AIMessage(content="Why was the equal sign so humble?\n\nBecause he knew he wasn't less than or greater than anyone else!")]

In [20]:
%%time
chain2.batch([{"topic": "Science"}, {"topic": "Mango"}])

CPU times: user 11.3 ms, sys: 1.99 ms, total: 13.3 ms
Wall time: 2.05 s


[AIMessage(content='In the laboratory, truths are found,\nThrough microscopes and chemicals profound.'),
 AIMessage(content='Juicy, sweet mango\nGolden flesh that brings delight')]

In [21]:
%%time

# combined
combined.batch([{"topic": "AI"}, {"topic": "Mountains"}])

CPU times: user 23.2 ms, sys: 3.22 ms, total: 26.4 ms
Wall time: 942 ms


[{'joke': AIMessage(content='Why did the robot go to school? To get a little byte of knowledge!'),
  'poem': AIMessage(content='Artificial intelligence\nMinds of silicon, thoughts immense.')},
 {'joke': AIMessage(content="Why are mountains so funny? Because they're hill-arious!"),
  'poem': AIMessage(content='Majestic peaks touch the sky,\nSilent guardians watching from up high.')}]