In [None]:
# 20250816

In [None]:
# https://csp.gitbook.io/langchain-for-beginners/ch13-langchain-expression-language-lcel/09.-custom-generator

In [None]:
# 09. Custom generator

In [2]:
from typing import Iterator, List
from langchain.prompts.chat import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
import os
from dotenv import load_dotenv
from langchain_groq import ChatGroq

In [3]:
# Load environment variables from .env file
load_dotenv()

True

In [4]:
prompt = ChatPromptTemplate.from_template(
    # List five companies similar to the given company, separated by commas.
    "Write a comma-separated list of 5 companies similar to: {company}"
)
# Initialize the ChatOpenAI model by setting the temperature to 0.0.
model = ChatGroq(temperature=0.0,model="llama3-70b-8192")
# Create a chain by connecting prompts and models and applying a string output parser.
str_chain = prompt | model | StrOutputParser()

In [5]:
# Stream data.
for chunk in str_chain.stream({"company": "Google"}):
    # Outputs each chunk, flushing the buffer immediately without a newline.
    print(chunk, end="", flush=True)


Here is a list of 5 companies similar to Google:

Microsoft, Amazon, Facebook, Apple, Yahoo

In [6]:
# Invoke data into the chain.
str_chain.invoke({"company": "Google"})

'Here is a list of 5 companies similar to Google:\n\nMicrosoft, Amazon, Facebook, Apple, Yahoo'

In [11]:
from typing import Iterator, List
from langchain_core.runnables import Runnable
from langchain_core.messages import HumanMessage

In [12]:
# Your parser: splits streamed text into comma-separated list
def split_into_list(input: Iterator[str]) -> Iterator[List[str]]:
    buffer = ""
    for chunk in input:
        buffer += chunk
        while "," in buffer:
            comma_index = buffer.index(",")
            yield [buffer[:comma_index].strip()]
            buffer = buffer[comma_index + 1 :]
    # Yield remaining buffer
    if buffer.strip():
        yield [buffer.strip()]


In [13]:
# Create a custom Runnable to wrap the parser
class ListParserRunnable(Runnable):
    def invoke(self, input_text: str) -> List[List[str]]:
        return list(split_into_list(iter(input_text)))

    def stream(self, input_text: str) -> Iterator[List[str]]:
        return split_into_list(iter(input_text))


In [17]:
# Example usage: simulate an LLM response
# In practice, replace `llm_response` with streaming tokens from your LLM chain
llm_response = "Google, Microsoft, Apple, Amazon,Facebook"


In [18]:
# Create the chain
list_chain = ListParserRunnable()

In [19]:
# Stream and print each chunk
for chunk in list_chain.stream(llm_response):
    print(chunk, flush=True)

['Google']
['Microsoft']
['Apple']
['Amazon']
['Facebook']


In [21]:
# Pass a string (e.g., company names)
list_chain.invoke("Google")

[['Google']]

In [22]:
# Asynchronous

In [23]:
from typing import AsyncIterator

In [24]:
# Defining an asynchronous function
async def asplit_into_list(input: AsyncIterator[str]) -> AsyncIterator[List[str]]:
    buffer = ""
    # `input` is an `async_generator` object, so use `async for`
    async for chunk in input:
        buffer += chunk
        while "," in buffer:
            comma_index = buffer.index(",")
            yield [
                buffer[:comma_index].strip()
            ]  # Split by comma and return as a list
            buffer = buffer[comma_index + 1:]
    yield [buffer.strip()]  # Returns the remaining buffer contents as a list.

In [25]:
# Pipeline alist_chain and asplit_into_list
alist_chain = str_chain | asplit_into_list

In [26]:
# Stream data using an async for loop.
async for chunk in alist_chain.astream({"company": "Google"}):
    # Output each chunk and empty the buffer.
    print(chunk, flush=True)

['Here is a list of 5 companies similar to Google:\n\nMicrosoft']
['Amazon']
['Facebook']
['Apple']
['Yahoo']


In [27]:
# Calls a list chain asynchronously.
await alist_chain.ainvoke({"company": "Google"})

['Here is a list of 5 companies similar to Google:\n\nMicrosoft',
 'Amazon',
 'Facebook',
 'Apple',
 'Yahoo']