# Streaming

## Setup

In [14]:
import bs4
from operator import itemgetter
import nest_asyncio

# LANGCHAIN
from langchain import hub
from langchain.vectorstores import Chroma

from langchain_community.document_loaders import WebBaseLoader
from langchain_community.embeddings import OllamaEmbeddings
from langchain_community.llms import Ollama

from langchain_core.messages import HumanMessage
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from langchain_core.tracers.log_stream import LogStreamCallbackHandler

from langchain_text_splitters import RecursiveCharacterTextSplitter

## Chain with sources

In [2]:
# Indexing
bs_strainer = bs4.SoupStrainer(class_=("post-content", "post-title", "post-header"))
loader = WebBaseLoader(
    web_paths=("https://lilianweng.github.io/posts/2023-06-23-agent/",),
    bs_kwargs={"parse_only": bs_strainer},
)
docs = loader.load()
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
splits = text_splitter.split_documents(docs)
vectorstore = Chroma.from_documents(
    documents=splits, embedding=OllamaEmbeddings(model="llama2:13b")
)

In [3]:
# Retrieval and Generation
retriever = vectorstore.as_retriever()
prompt = hub.pull("rlm/rag-prompt")
llm = Ollama(model="llama2:13b")

def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

rag_chain_from_docs = (
    RunnablePassthrough.assign(context=(lambda x: format_docs(x["context"])))
    | prompt
    | llm
    | StrOutputParser()
)

In [4]:
rag_chain_with_source = RunnableParallel(
    {"context": retriever, "question": RunnablePassthrough()}
).assign(answer=rag_chain_from_docs)

## Streaming final outputs

In [5]:
for chunk in rag_chain_with_source.stream("What is task decomposition?"):
    print(chunk)

{'question': 'What is task decomposition?'}
{'context': [Document(page_content='Fig. 11. Illustration of how HuggingGPT works. (Image source: Shen et al. 2023)\nThe system comprises of 4 stages:\n(1) Task planning: LLM works as the brain and parses the user requests into multiple tasks. There are four attributes associated with each task: task type, ID, dependencies, and arguments. They use few-shot examples to guide LLM to do task parsing and planning.\nInstruction:', metadata={'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/'}), Document(page_content='LSH (Locality-Sensitive Hashing): It introduces a hashing function such that similar input items are mapped to the same buckets with high probability, where the number of buckets is much smaller than the number of inputs.\nANNOY (Approximate Nearest Neighbors Oh Yeah): The core data structure are random projection trees, a set of binary trees where each non-leaf node represents a hyperplane splitting the input space into 

In [8]:
output = {}
curr_key = None
for chunk in rag_chain_with_source.stream("What is task decomposition?"):
    for key in chunk:
        if key not in output:
            output[key] = chunk[key]
        else:
            output[key] += chunk[key]
        
        if key != curr_key:
            print(f"\n\n{key}: {chunk[key]}", end="", flush=True)
        else:
            print(chunk[key], end="", flush=True)
        curr_key = key



question: What is task decomposition?

context: [Document(page_content='Fig. 11. Illustration of how HuggingGPT works. (Image source: Shen et al. 2023)\nThe system comprises of 4 stages:\n(1) Task planning: LLM works as the brain and parses the user requests into multiple tasks. There are four attributes associated with each task: task type, ID, dependencies, and arguments. They use few-shot examples to guide LLM to do task parsing and planning.\nInstruction:', metadata={'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/'}), Document(page_content='LSH (Locality-Sensitive Hashing): It introduces a hashing function such that similar input items are mapped to the same buckets with high probability, where the number of buckets is much smaller than the number of inputs.\nANNOY (Approximate Nearest Neighbors Oh Yeah): The core data structure are random projection trees, a set of binary trees where each non-leaf node represents a hyperplane splitting the input space into half a

## Streaming intermediate steps

In [11]:
contextualize_q_system_prompt = """Given a chat history and the latest user question \
which might reference context in the chat history, formulate a standalone question \
which can be understood without the chat history. Do NOT answer the question, \
just reformulate it if needed and otherwise return it as is."""

contextualize_q_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", contextualize_q_system_prompt),
        MessagesPlaceholder(variable_name="chat_history"),
        ("human", "{question}")
    ]
)
contextualize_q_chain = (
    contextualize_q_prompt | llm | StrOutputParser()
).with_config(tags=["contextualize_q_chain"])

qa_system_prompt = """You are an assistant for question-answering tasks. \
Use the following pieces of retrieved context to answer the question. \
If you don't know the answer, just say that you don't know. \
Use three sentences maximum and keep the answer concise.\

{context}"""
qa_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", qa_system_prompt),
        MessagesPlaceholder(variable_name="chat_history"),
        ("human", "{question}")
    ]
)

def contextualized_question(input: dict):
    if input.get("chat_history"):
        return contextualize_q_chain
    else:
        return input["question"]

rag_chain = (
    RunnablePassthrough.assign(context=contextualize_q_chain | retriever | format_docs)
    | qa_prompt
    | llm
)


In [13]:
nest_asyncio.apply()

In [15]:
chat_history = []
question = "What is task decomposition?"
ai_msg = rag_chain.invoke({"question": question, "chat_history": chat_history})
chat_history.extend([HumanMessage(content=question), ai_msg])

question = "What are common wawys of doint it?"
ct = 0
async for jsonpatch_op in rag_chain.astream_log(
    {"question": question, "chat_history": chat_history},
    include_tags=["contextualize_q_chain"],
):
    print(jsonpatch_op)
    print("\n" + "-" * 30 + "\n")
    ct += 1 
    if ct > 20:
        break

RunLogPatch({'op': 'replace',
  'path': '',
  'value': {'final_output': None,
            'id': '59bbf000-f9ff-40b5-b516-45e9ec1b6050',
            'logs': {},
            'name': 'RunnableSequence',
            'streamed_output': [],
            'type': 'chain'}})

------------------------------

RunLogPatch({'op': 'add',
  'path': '/logs/RunnableSequence',
  'value': {'end_time': None,
            'final_output': None,
            'id': 'e6558081-b646-485b-b98a-b6a90b87dbeb',
            'metadata': {},
            'name': 'RunnableSequence',
            'start_time': '2024-04-17T01:46:42.387+00:00',
            'streamed_output': [],
            'streamed_output_str': [],
            'tags': ['seq:step:1', 'contextualize_q_chain'],
            'type': 'chain'}})

------------------------------

RunLogPatch({'op': 'add',
  'path': '/logs/ChatPromptTemplate',
  'value': {'end_time': None,
            'final_output': None,
            'id': 'ffa8df56-ca22-4fb3-be8b-f909201d8a8f',
     

In [16]:
ct = 0
async for jsonpatch_op in rag_chain.astream_log(
    {"question":question, "chat_history": chat_history},
    include_names=["Retriever"],
    with_streamed_output_list=False
):
    print(jsonpatch_op)
    print("\n" + "-" * 30 + "\n")
    ct += 1
    if ct > 20:
        break

RunLogPatch({'op': 'replace',
  'path': '',
  'value': {'final_output': None,
            'id': '16b44684-bff1-45ae-9587-2101488004d7',
            'logs': {},
            'name': 'RunnableSequence',
            'streamed_output': [],
            'type': 'chain'}})

------------------------------

RunLogPatch({'op': 'add',
  'path': '/logs/Retriever',
  'value': {'end_time': None,
            'final_output': None,
            'id': 'a8df5630-30f7-4740-9090-301ce73c16df',
            'metadata': {},
            'name': 'Retriever',
            'start_time': '2024-04-17T01:48:31.634+00:00',
            'streamed_output': [],
            'streamed_output_str': [],
            'tags': ['seq:step:2', 'Chroma', 'OllamaEmbeddings'],
            'type': 'retriever'}})

------------------------------

RunLogPatch({'op': 'add',
  'path': '/logs/Retriever/final_output',
  'value': {'documents': [Document(page_content='This benchmark evaluates the agent’s tool use capabilities at three levels:\n\n