# Streaming

Often in Q&A applications it's important to show users the sources that were used to generate the answer. The simplest way to do this is for the chain to return the Documents that were retrieved in each generation.

We'll work off of the Q&A app with sources we built over the [LLM Powered Autonomous Agents](https://lilianweng.github.io/posts/2023-06-23-agent/) blog post by Lilian Weng in the [Returning sources](/docs/use_cases/question_answering/sources) guide.

## Setup
### Dependencies

We’ll use an OpenAI chat model and embeddings and a Memory vector store in this walkthrough, but everything shown here works with any [ChatModel](/docs/modules/model_io/chat) or [LLM](/docs/modules/model_io/llms), [Embeddings](https://js.langchain.com/docs/modules/data_connection/text_embedding/), and [VectorStore](https://js.langchain.com/docs/modules/data_connection/vectorstores/) or [Retriever](/docs/modules/data_connection/retrievers/).

We’ll use the following packages:

```bash
npm install --save langchain @langchain/community @langchain/openai cheerio
```

We need to set environment variable `OPENAI_API_KEY`:

```bash
export OPENAI_API_KEY=YOUR_KEY
```


### LangSmith

Many of the applications you build with LangChain will contain multiple steps with multiple invocations of LLM calls. As these applications get more and more complex, it becomes crucial to be able to inspect what exactly is going on inside your chain or agent. The best way to do this is with [LangSmith](https://smith.langchain.com/).

Note that LangSmith is not needed, but it is helpful. If you do want to use LangSmith, after you sign up at the link above, make sure to set your environment variables to start logging traces:


```bash
export LANGCHAIN_TRACING_V2=true
export LANGCHAIN_API_KEY=YOUR_KEY
```

## Chain with sources

Here is Q&A app with sources we built over the [LLM Powered Autonomous Agents](https://lilianweng.github.io/posts/2023-06-23-agent/) blog post by Lilian Weng in the [Returning sources](/docs/use_cases/question_answering/sources) guide:

In [1]:
import "cheerio";
import { CheerioWebBaseLoader } from "langchain/document_loaders/web/cheerio";
import { RecursiveCharacterTextSplitter } from "langchain/text_splitter";
import { MemoryVectorStore } from "langchain/vectorstores/memory"
import { OpenAIEmbeddings, ChatOpenAI } from "@langchain/openai";
import { pull } from "langchain/hub";
import { ChatPromptTemplate } from "@langchain/core/prompts";
import { formatDocumentsAsString } from "langchain/util/document";
import { RunnableSequence, RunnablePassthrough, RunnableMap } from "@langchain/core/runnables";
import { StringOutputParser } from "@langchain/core/output_parsers";


Instead, please import from "@langchain/core/documents".

This will be mandatory after the next "langchain" minor version bump to 0.2.


[Module: null prototype] {
  contains: [36m[Function: contains][39m,
  default: [Function: initialize] {
    contains: [36m[Function: contains][39m,
    html: [36m[Function: html][39m,
    merge: [36m[Function: merge][39m,
    parseHTML: [36m[Function: parseHTML][39m,
    root: [36m[Function: root][39m,
    text: [36m[Function: text][39m,
    xml: [36m[Function: xml][39m,
    load: [36m[Function: load][39m,
    _root: Document {
      parent: [1mnull[22m,
      prev: [1mnull[22m,
      next: [1mnull[22m,
      startIndex: [1mnull[22m,
      endIndex: [1mnull[22m,
      children: [],
      type: [32m"root"[39m
    },
    _options: { xml: [33mfalse[39m, decodeEntities: [33mtrue[39m },
    fn: Cheerio {}
  },
  html: [36m[Function: html][39m,
  load: [36m[Function: load][39m,
  merge: [36m[Function: merge][39m,
  parseHTML: [36m[Function: parseHTML][39m,
  root: [36m[Function: root][39m,
  text: [36m[Function: text][39m,
  xml: [36m[Function:

In [2]:
const loader = new CheerioWebBaseLoader(
  "https://lilianweng.github.io/posts/2023-06-23-agent/"
);

const docs = await loader.load();

const textSplitter = new RecursiveCharacterTextSplitter({ chunkSize: 1000, chunkOverlap: 200 });
const splits = await textSplitter.splitDocuments(docs);
const vectorStore = await MemoryVectorStore.fromDocuments(splits, new OpenAIEmbeddings());

// Retrieve and generate using the relevant snippets of the blog.
const retriever = vectorStore.asRetriever();
const prompt = await pull<ChatPromptTemplate>("rlm/rag-prompt");
const llm = new ChatOpenAI({ modelName: "gpt-3.5-turbo", temperature: 0 });

const ragChainFromDocs = RunnableSequence.from([
  RunnablePassthrough.assign({ context: (input) => formatDocumentsAsString(input.context) }),
  prompt,
  llm,
  new StringOutputParser()
]);

let ragChainWithSource = new RunnableMap({ steps: { context: retriever, question: new RunnablePassthrough() }})
ragChainWithSource = ragChainWithSource.assign({ answer: ragChainFromDocs });

await ragChainWithSource.invoke("What is Task Decomposition")

{
  question: [32m"What is Task Decomposition"[39m,
  context: [
    Document {
      pageContent: [32m"Fig. 1. Overview of a LLM-powered autonomous agent system.\n"[39m +
        [32m"Component One: Planning#\n"[39m +
        [32m"A complicated ta"[39m... 898 more characters,
      metadata: {
        source: [32m"https://lilianweng.github.io/posts/2023-06-23-agent/"[39m,
        loc: { lines: [36m[Object][39m }
      }
    },
    Document {
      pageContent: [32m'Task decomposition can be done (1) by LLM with simple prompting like "Steps for XYZ.\\n1.", "What are'[39m... 887 more characters,
      metadata: {
        source: [32m"https://lilianweng.github.io/posts/2023-06-23-agent/"[39m,
        loc: { lines: [36m[Object][39m }
      }
    },
    Document {
      pageContent: [32m"Agent System Overview\n"[39m +
        [32m"                \n"[39m +
        [32m"                    Component One: Planning\n"[39m +
        [32m"                 "[39m... 850 

## Streaming final outputs

With LCEL it's easy to stream final outputs:

In [3]:
for await (const chunk of await ragChainWithSource.stream("What is task decomposition?")) {
  console.log(chunk)
}

{ question: "What is task decomposition?" }
{
  context: [
    Document {
      pageContent: "Fig. 1. Overview of a LLM-powered autonomous agent system.\n" +
        "Component One: Planning#\n" +
        "A complicated ta"... 898 more characters,
      metadata: {
        source: "https://lilianweng.github.io/posts/2023-06-23-agent/",
        loc: { lines: [Object] }
      }
    },
    Document {
      pageContent: 'Task decomposition can be done (1) by LLM with simple prompting like "Steps for XYZ.\\n1.", "What are'... 887 more characters,
      metadata: {
        source: "https://lilianweng.github.io/posts/2023-06-23-agent/",
        loc: { lines: [Object] }
      }
    },
    Document {
      pageContent: "Agent System Overview\n" +
        "                \n" +
        "                    Component One: Planning\n" +
        "                 "... 850 more characters,
      metadata: {
        source: "https://lilianweng.github.io/posts/2023-06-23-agent/",
        loc: { lines:

We can add some logic to compile our stream as it's being returned:

In [4]:
const output = {};
let currentKey: string | null = null;

for await (const chunk of await ragChainWithSource.stream("What is task decomposition?")) {
  for (const key of Object.keys(chunk)) {
    if (output[key] === undefined) {
      output[key] = chunk[key];
    } else {
      output[key] += chunk[key];
    }

    if (key !== currentKey) {
      console.log(`\n\n${key}: ${JSON.stringify(chunk[key])}`);
    } else {
      console.log(chunk[key]);
    }
    currentKey = key;
  }
}



question: "What is task decomposition?"


context: [{"pageContent":"Fig. 1. Overview of a LLM-powered autonomous agent system.\nComponent One: Planning#\nA complicated task usually involves many steps. An agent needs to know what they are and plan ahead.\nTask Decomposition#\nChain of thought (CoT; Wei et al. 2022) has become a standard prompting technique for enhancing model performance on complex tasks. The model is instructed to “think step by step” to utilize more test-time computation to decompose hard tasks into smaller and simpler steps. CoT transforms big tasks into multiple manageable tasks and shed lights into an interpretation of the model’s thinking process.\nTree of Thoughts (Yao et al. 2023) extends CoT by exploring multiple reasoning possibilities at each step. It first decomposes the problem into multiple thought steps and generates multiple thoughts per step, creating a tree structure. The search process can be BFS (breadth-first search) or DFS (depth-first search) w

[32m"answer"[39m

## Streaming intermediate steps

Suppose we want to stream not only the final outputs of the chain, but also some intermediate steps. As an example let's take our [Chat history](/docs/use_cases/question_answering/chat_history) chain. Here we reformulate the user question before passing it to the retriever. This reformulated question is not returned as part of the final output. We could modify our chain to return the new question, but for demonstration purposes we'll leave it as is.

In [5]:
import { StringOutputParser } from "@langchain/core/output_parsers";
import { ChatPromptTemplate, MessagesPlaceholder } from "@langchain/core/prompts";

const contextualizeQSystemPrompt = `Given a chat history and the latest user question
which might reference context in the chat history, formulate a standalone question
which can be understood without the chat history. Do NOT answer the question,
just reformulate it if needed and otherwise return it as is.`;

const contextualizeQPrompt = ChatPromptTemplate.fromMessages([
  ["system", contextualizeQSystemPrompt],
  new MessagesPlaceholder("chatHistory"),
  ["human", "{question}"]
]);
const contextualizeQChain = contextualizeQPrompt.pipe(llm).pipe(new StringOutputParser()).withConfig({ tags: ["contextualizeQChain"] });

const qaSystemPrompt = `You are an assistant for question-answering tasks.
Use the following pieces of retrieved context to answer the question.
If you don't know the answer, just say that you don't know.
Use three sentences maximum and keep the answer concise.

{context}`;
const qaPrompt = ChatPromptTemplate.fromMessages([
  ["system", qaSystemPrompt],
  new MessagesPlaceholder("chatHistory"),
  ["human", "{question}"]
]);


const contextualizedQuestion = (input: Record<string, unknown>) => {
  if ("chatHistory" in input) {
    return contextualizeQChain;
  }
  return input.question;
};

const ragChain = RunnableSequence.from([
  RunnablePassthrough.assign({
    context: (input: Record<string, unknown>) => {
      if ("chatHistory" in input) {
        const chain = contextualizedQuestion(input);
        return chain.pipe(retriever).pipe(formatDocumentsAsString);
      }
      return "";
    },
  }),
  qaPrompt,
  llm
])

To stream intermediate steps we'll use the `streamLog` method. This is a method that yields JSONPatch ops that when applied in the same order as received build up the RunState:

```typescript
import { type LogEntry } from "@langchain/core/tracers/log_stream";

interface RunState {
    /**
     * ID of the run.
     */ 
    id: string;
    /**
     * List of output chunks streamed by Runnable.stream()
     */ 
    streamed_output: any[];
    /**
     * Final output of the run, usually the result of aggregating (`+`) streamed_output.
     * Only available after the run has finished successfully.
     */ 
    final_output?: any;
    /**
     * Map of run names to sub-runs. If filters were supplied, this list will
     * contain only the runs that matched the filters.
     */ 
    logs: Record<string, LogEntry>;
}
```

You can stream all steps (default) or include/exclude steps by name, tags or metadata. In this case we'll only stream intermediate steps that are part of the `contextualizeQChain` and the final output. Notice that when defining the `contextualizeQChain` we gave it a corresponding tag, which we can now filter on. 

We only show the first 20 chunks of the stream for readability:

In [6]:
import { BaseMessage, HumanMessage } from "@langchain/core/messages";

let chatHistory: Array<BaseMessage> = [];

const question = "What is task decomposition?";
const aiMsg = await ragChain.invoke({ question, chatHistory });
chatHistory = chatHistory.concat([new HumanMessage(question), aiMsg]);
console.log(chatHistory);

const secondQuestion = "What are common ways of doing it?";
let count = 0;
const streamLog = await ragChain.streamLog(
  {
    question: secondQuestion,
    chatHistory
  },
  undefined,
  { includeTags: ["contextualizeQChain"] }
);
for await (const jsonPatchOp of streamLog) {
  console.log(jsonPatchOp);
  count++;
  if (count > 20) {
    break;
  }
}

[
  HumanMessage {
    lc_serializable: true,
    lc_kwargs: { content: "What is task decomposition?", additional_kwargs: {} },
    lc_namespace: [ "langchain_core", "messages" ],
    content: "What is task decomposition?",
    name: undefined,
    additional_kwargs: {}
  },
  AIMessage {
    lc_serializable: true,
    lc_kwargs: {
      content: "Task decomposition is a technique used to break down complex tasks into smaller and more manageable "... 278 more characters,
      additional_kwargs: { function_call: undefined, tool_calls: undefined }
    },
    lc_namespace: [ "langchain_core", "messages" ],
    content: "Task decomposition is a technique used to break down complex tasks into smaller and more manageable "... 278 more characters,
    name: undefined,
    additional_kwargs: { function_call: undefined, tool_calls: undefined }
  }
]
RunLogPatch {
  ops: [
    {
      op: "replace",
      path: "",
      value: {
        id: "3d7bf12c-b781-4e84-94c9-ac6d007794c9",
        strea

: 

If we wanted to get our retrieved docs, we could filter on name "Retriever":

In [None]:
let count = 0;
const streamLog = await ragChain.streamLog(
  {
    question: secondQuestion,
    chatHistory
  },
  undefined,
  { includeTags: ["Retriever"] }
);
for await (const jsonPatchOp of streamLog) {
  console.log(jsonPatchOp);
  count++;
  if (count > 20) {
    break;
  }
}