# How to stream runnables
Streaming is critical in making applications based on LLMs feel responsive to end-users. This interface provider 2 general approches:
1. sync `stream` and async `astream`: a default implementation of streaming that streams the final output from the chain.
2. async `astream_events` and async `astream_log`: these provide a way to stream both intermediate steps and final output from the chain.

In [1]:
from langchain_community.llms import Ollama
model = Ollama(model="llama3")

## Using Stream
NOTE: select from working environment it meant if model streaming run on async function, you should use `astream` API

In [10]:
# sync stream API:
chunks = []
for chunk in model.stream("what color is the sky? Answer in short sentence."):
  chunks.append(chunk)
  print(chunk, end="|", flush=True)

The| sky| appears| blue| to| our| eyes| during| the| daytime|,| but| it| can| take| on| hues| of| red|,| orange|,| and| purple| during| sunrise| and| sunset|.||

In [11]:
# async astream API:
chunks = []
async for chunk in model.astream("what color is the sky? Answer in short sentence."):
  chunks.append(chunk)
  print(chunk, end="|", flush=True)
  # print(chuck.content, end="|", flush=True) # depending on models

The| sky| appears| blue| to| our| eyes|,| but| it| can| also| appear| other| colors| depending| on| the| time| of| day| and| atmospheric| conditions|.||

In [12]:
chunks[0]
# Some model, you can got back something called AIMessageChunk
# from langchain_core.messages.ai import AIMessageChunk
# AIMessageChunk(content="The", id='run-b36bea64-5511-4d7a-b6a3-a07b3db0c8e7')

'The'

In [16]:
# Chains
# involves more step using LangChain Expression Language (LCEL): combines a prompt, model and a parser and verify that streaming works
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
parser = StrOutputParser()
chain = prompt | model | parser

async for chunk in chain.astream({"topic": "parrot"}):
    print(chunk, end="|", flush=True)

Here|'s| one|:

|Why| did| the| par|rot| go| to| the| doctor|?

|Because| it| had| a| f|owl| temper|!

|Hope| that| made| you| squ|awk| with| laughter|!||

In [17]:
from langchain_core.output_parsers import JsonOutputParser

chain = model | JsonOutputParser()  # Due to a bug in older versions of Langchain, JsonOutputParser did not stream results from some models
async for text in chain.astream(
  "output a list of the countries france, spain and japan and their populations in JSON format. "
  'Use a dict with an outer key of "countries" which contains a list of countries. '
  "Each country should have the key `name` and `population`"
):
  print(text, flush=True)

{}
{'countries': []}
{'countries': [{}]}
{'countries': [{'name': ''}]}
{'countries': [{'name': 'France'}]}
{'countries': [{'name': 'France', 'population': 653}]}
{'countries': [{'name': 'France', 'population': 653451}]}
{'countries': [{'name': 'France', 'population': 65345100}]}
{'countries': [{'name': 'France', 'population': 65345100}, {}]}
{'countries': [{'name': 'France', 'population': 65345100}, {'name': ''}]}
{'countries': [{'name': 'France', 'population': 65345100}, {'name': 'Spain'}]}
{'countries': [{'name': 'France', 'population': 65345100}, {'name': 'Spain', 'population': 467}]}
{'countries': [{'name': 'France', 'population': 65345100}, {'name': 'Spain', 'population': 467527}]}
{'countries': [{'name': 'France', 'population': 65345100}, {'name': 'Spain', 'population': 46752716}]}
{'countries': [{'name': 'France', 'population': 65345100}, {'name': 'Spain', 'population': 46752716}, {}]}
{'countries': [{'name': 'France', 'population': 65345100}, {'name': 'Spain', 'population': 467

In [18]:
# Any steps in the chain that operate on finalized inputs rather than on input streams can break streaming functionality via stream or astream.
from langchain_core.output_parsers import JsonOutputParser

# A function that operates on finalized inputs
# Streaming will not work with this function
def _extract_country_names(inputs):
    """A function that does not operates on input streams and breaks streaming."""
    if not isinstance(inputs, dict):
        return ""
    if "countries" not in inputs:
        return ""
    countries = inputs["countries"]

    if not isinstance(countries, list):
        return ""
    country_names = [
        country.get("name") for country in countries if isinstance(country, dict)
    ]
    return country_names


chain = model | JsonOutputParser() | _extract_country_names

async for text in chain.astream(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`"
):
    print(text, end="|", flush=True)

['France', 'Spain', 'Japan']|

In [19]:
# Solve the problem using generator functions
# generator functions (yield): a function that returns an iterator and can be paused and resumed
from langchain_core.output_parsers import JsonOutputParser

async def _extract_country_names_streaming(input_stream):
  """A function that operates on input streams."""
  country_names_so_far = set()

  async for input in input_stream:
    if not isinstance(input, dict):
      continue

    if "countries" not in input:
      continue

    countries = input["countries"]

    if not isinstance(countries, list):
      continue

    for country in countries:
      name = country.get("name")
      if not name:
        continue
      if name not in country_names_so_far:
        yield name
        country_names_so_far.add(name)

chain = model | JsonOutputParser() | _extract_country_names_streaming

async for text in chain.astream(
  "output a list of the countries france, spain and japan and their populations in JSON format. "
  'Use a dict with an outer key of "countries" which contains a list of countries. '
  "Each country should have the key `name` and `population`",
):
  print(text, end="|", flush=True)

France|Spain|Japan|

In [27]:
# Non-streaming components
# some built-in components like Retriever, which fetches data from the internet, do not support streaming
from langchain_community.vectorstores import FAISS
from langchain_core.prompts import ChatPromptTemplate
# from langchain_openai import OpenAIEmbeddings
from langchain_community.embeddings import OllamaEmbeddings

template = """Answer the question based only on the following context:
{context}
Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)
# pip install faiss-gpu or pip install faiss-cpu
vectorstore = FAISS.from_texts(
  ["harrison worked at kensho", "harrison likes spicy food"],
  # embedding=OpenAIEmbeddings(),
  embedding=OllamaEmbeddings(model="llama3"),
)
retriever = vectorstore.as_retriever()
chunks = [chunk for chunk in retriever.stream("where did harrison work?")]
chunks

[[Document(page_content='harrison likes spicy food'),
  Document(page_content='harrison worked at kensho')]]

In [28]:
# But we can use RunnablePassthrough to convert the non-streaming component to a streaming component
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
retrieval_chain = (
  {
    "context": retriever.with_config(run_name="Docs"),
    "question": RunnablePassthrough(),
  }
  | prompt
  | model
  | StrOutputParser()
)
for chunk in retrieval_chain.stream("Where did harrison work? " "Write 3 made up sentences about this place."):
  print(chunk, end="|", flush=True)

Based| on| the| given| context|,| Harrison| worked| at| Kens|ho|.

|Here| are| three| made|-up| sentences| about| Kens|ho|:

|K|ens|ho| is| a| trendy| restaurant| that| serves| an| array| of| international| dishes| with| a| focus| on| bold| flavors| and| spices|.| The| vibrant| atmosphere| and| eclectic| decor| make| it| a| popular| spot| for| food|ies| and| social|ites| alike|.| As| the| go|-to| place| for| spicy| food| enthusiasts|,| Kens|ho|'s| chefs| are| always| experimenting| with| new| hot| sauces| and| marin|ades| to| tantal|ize| taste| buds|.||

## Using Stream Events
for langchain v2
`astream_events` API:
- use `async` in the code
- callbacks if defining custom functions / runnables
- Whenever using runnables without LCEL, make sure to call `.astream() `on LLMs rather than `.ainvoke` to force the LLM to stream tokens.
Event Type Reference: [Link](https://python.langchain.com/v0.2/docs/how_to/streaming/#event-reference)

In [2]:
# Chat Model
events = []
async for event in model.astream_events("hello", version="v2"):
    events.append(event)

  warn_beta(


In [3]:
events[:3]

[{'event': 'on_llm_start',
  'data': {'input': 'hello'},
  'name': 'Ollama',
  'tags': [],
  'run_id': 'd55416c6-b085-4998-9491-c4102c34a758',
  'metadata': {}},
 {'event': 'on_llm_stream',
  'run_id': 'd55416c6-b085-4998-9491-c4102c34a758',
  'name': 'Ollama',
  'tags': [],
  'metadata': {},
  'data': {'chunk': 'Hello'}},
 {'event': 'on_llm_stream',
  'run_id': 'd55416c6-b085-4998-9491-c4102c34a758',
  'name': 'Ollama',
  'tags': [],
  'metadata': {},
  'data': {'chunk': '!'}}]

In [5]:
events[-2:]

[{'event': 'on_llm_stream',
  'run_id': 'd55416c6-b085-4998-9491-c4102c34a758',
  'name': 'Ollama',
  'tags': [],
  'metadata': {},
  'data': {'chunk': ''}},
 {'event': 'on_llm_end',
  'data': {'output': {'generations': [[{'text': "Hello! It's nice to meet you. Is there something I can help you with, or would you like to chat?",
       'generation_info': {'model': 'llama3',
        'created_at': '2024-06-12T17:23:09.985896086Z',
        'response': '',
        'done': True,
        'done_reason': 'stop',
        'context': [128006,
         882,
         128007,
         198,
         198,
         15339,
         128009,
         128006,
         78191,
         128007,
         198,
         198,
         9906,
         0,
         1102,
         6,
         82,
         6555,
         311,
         3449,
         499,
         13,
         2209,
         1070,
         2555,
         358,
         649,
         1520,
         499,
         449,
         11,
         477,
         10

In [7]:
from langchain_core.output_parsers import JsonOutputParser
# Chain
chain = model | JsonOutputParser()
events = [
  event async for event in chain.astream_events(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`",
    version="v2",
  )
]
# This will start with 3 events:
# 1. The chain (model + parser), 2. The model, 3. The parser
events[:3]

[{'event': 'on_chain_start',
  'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'},
  'name': 'RunnableSequence',
  'tags': [],
  'run_id': '20d1a9f7-51bf-4e11-8e27-4692baafa44d',
  'metadata': {}},
 {'event': 'on_llm_start',
  'data': {'input': {'prompts': ['output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`']}},
  'name': 'Ollama',
  'tags': ['seq:step:1'],
  'run_id': '76d04bb5-ff65-47eb-91f1-4055f600988d',
  'metadata': {}},
 {'event': 'on_llm_stream',
  'data': {'chunk': GenerationChunk(text='Here')},
  'run_id': '76d04bb5-ff65-47eb-91f1-4055f600988d',
  'name': 'Ollama',
  'tags': ['seq:step:1'],
  'metadata'

In [10]:
# list output stream events, ignore start event, end event and events from chains
num_events = 0

async for event in chain.astream_events(
  "output a list of the countries france, spain and japan and their populations in JSON format. "
  'Use a dict with an outer key of "countries" which contains a list of countries. '
  "Each country should have the key `name` and `population`",
  version="v2",
):
  kind = event["event"]
  if kind == "on_chat_model_stream":
    print(
      f"Chat model chunk: {repr(event['data']['chunk'].content)}",
      flush=True,
    )
  if kind == "on_parser_stream":
    print(f"Parser chunk: {event['data']['chunk']}", flush=True)
  num_events += 1
  if num_events > 100:
    # Truncate the output
    print("...")
    break

Parser chunk: {}
Parser chunk: {'countries': []}
Parser chunk: {'countries': [{}]}
Parser chunk: {'countries': [{'name': ''}]}
Parser chunk: {'countries': [{'name': 'France'}]}
Parser chunk: {'countries': [{'name': 'France', 'population': 670}]}
Parser chunk: {'countries': [{'name': 'France', 'population': 670000}]}
Parser chunk: {'countries': [{'name': 'France', 'population': 67000000}]}
Parser chunk: {'countries': [{'name': 'France', 'population': 67000000}, {}]}
Parser chunk: {'countries': [{'name': 'France', 'population': 67000000}, {'name': ''}]}
Parser chunk: {'countries': [{'name': 'France', 'population': 67000000}, {'name': 'Spain'}]}
Parser chunk: {'countries': [{'name': 'France', 'population': 67000000}, {'name': 'Spain', 'population': 467}]}
Parser chunk: {'countries': [{'name': 'France', 'population': 67000000}, {'name': 'Spain', 'population': 467547}]}
Parser chunk: {'countries': [{'name': 'France', 'population': 67000000}, {'name': 'Spain', 'population': 46754794}]}
Parse

In [11]:
# filter events
# You can filter by either component name, component tags or component type.
chain = model.with_config({"run_name": "model"}) | JsonOutputParser().with_config(
    {"run_name": "my_parser"}
)

# 1. By Name
max_events = 0
async for event in chain.astream_events(
  "output a list of the countries france, spain and japan and their populations in JSON format. "
  'Use a dict with an outer key of "countries" which contains a list of countries. '
  "Each country should have the key `name` and `population`",
  version="v2",
  include_names=["my_parser"],
):
  print(event)
  max_events += 1
  if max_events > 10:
    # Truncate output
    print("...")
    break
# 2. By Type
chain = model.with_config({"run_name": "model"}) | JsonOutputParser().with_config(
  {"run_name": "my_parser"}
)
max_events = 0
async for event in chain.astream_events(
  'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
  version="v2",
  include_types=["chat_model"],
):
  print(event)
  max_events += 1
  if max_events > 10:
    # Truncate output
    print("...")
    break

# 3. By Tags
chain = (model | JsonOutputParser()).with_config({"tags": ["my_chain"]})
max_events = 0
async for event in chain.astream_events(
  'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
  version="v2",
  include_tags=["my_chain"],
):
  print(event)
  max_events += 1
  if max_events > 10:
      # Truncate output
      print("...")
      break


{'event': 'on_parser_start', 'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'my_parser', 'tags': ['seq:step:2'], 'run_id': 'c8f8e749-51d6-48a0-9253-ae5f5eafa883', 'metadata': {}}
{'event': 'on_parser_stream', 'run_id': 'c8f8e749-51d6-48a0-9253-ae5f5eafa883', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {}}}
{'event': 'on_parser_stream', 'run_id': 'c8f8e749-51d6-48a0-9253-ae5f5eafa883', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': []}}}
{'event': 'on_parser_stream', 'run_id': 'c8f8e749-51d6-48a0-9253-ae5f5eafa883', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{}]}}}
{'event': 'on_parser_stream', 'run_id': 'c8f8e749-51d6-48a0-9253-ae5f5eafa883', 'nam

In [None]:
# Non-streaming components: While such components can break streaming of the final output when using astream, astream_events will still yield streaming events from intermediate steps that support streaming!
# ref: https://python.langchain.com/v0.2/docs/how_to/streaming/#non-streaming-components-1

# Propagating Callbacks: when using invoking runnables inside your tools
# ref: https://python.langchain.com/v0.2/docs/how_to/streaming/#propagating-callbacks