<a href="https://colab.research.google.com/github/nirvanesque/llama_index_methods/blob/main/query_pipeline_present.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# An Introduction to LlamaIndex Query Pipelines

## Overview
LlamaIndex provides a declarative query API that allows you to chain together different modules in order to orchestrate simple-to-advanced workflows over your data.

This is centered around our `QueryPipeline` abstraction. Load in a variety of modules (from LLMs to prompts to retrievers to other pipelines), connect them all together into a sequential chain or DAG, and run it end2end.
- Link to llama_index QueryPipelines: https://docs.llamaindex.ai/en/stable/examples/pipeline/query_pipeline.html
- Link to llama_index QueryPipelines (async): https://docs.llamaindex.ai/en/stable/examples/pipeline/query_pipeline_async.html

**NOTE**: You can orchestrate all these workflows without the declarative pipeline abstraction (by using the modules imperatively and writing your own functions). So what are the advantages of `QueryPipeline`?
- Express common workflows with fewer lines of code/boilerplate
- Greater readability
- Greater parity / better integration points with common low-code / no-code solutions (e.g. LangFlow)
- [In the future] A declarative interface allows easy serializability of pipeline components, providing portability of pipelines/easier deployment to different systems.

## Cookbook

In this cookbook we give you an introduction to our `QueryPipeline` interface and show you some basic workflows you can tackle.

- Chain together prompt and LLM
- Chain together query rewriting (prompt + LLM) with retrieval
- Chain together a full RAG query pipeline (query rewriting, retrieval, reranking, response synthesis)
- Setting up a custom query component

In [None]:
!pip install --upgrade llama-index==0.9.45.post1 arize-phoenix==2.2.1

## Setup

Here we setup some data + indexes (from PG's essay) that we'll be using in the rest of the cookbook.

In [None]:
# setup Arize Phoenix for logging/observability
import phoenix as px
px.launch_app()
import llama_index
llama_index.set_global_handler("arize_phoenix")

In [None]:
from llama_index.query_pipeline import QueryPipeline
from llama_index.llms import OpenAI
from llama_index.prompts import PromptTemplate
from llama_index import (
    VectorStoreIndex,
    ServiceContext,
    SimpleDirectoryReader,
    load_index_from_storage,
)

In [None]:
reader = SimpleDirectoryReader("../data/paul_graham")

In [None]:
docs = reader.load_data()
print(docs[0].get_content())

In [None]:
import os
from llama_index.storage import StorageContext

if not os.path.exists("storage"):
    index = VectorStoreIndex.from_documents(docs)
    # save index to disk
    index.set_index_id("vector_index")
    index.storage_context.persist("./storage")
else:
    # rebuild storage context
    storage_context = StorageContext.from_defaults(persist_dir="storage")
    # load index
    index = load_index_from_storage(storage_context, index_id="vector_index")

## 1. Chain Together Prompt and LLM

In this section we show a super simple workflow of chaining together a prompt with LLM.

We simply define `chain` on initialization. This is a special case of a query pipeline where the components are purely sequential, and we automatically convert outputs into the right format for the next inputs.

In [None]:
# try chaining basic prompts
prompt_str = "Please generate related movies to {movie_name}"
prompt_tmpl = PromptTemplate(prompt_str)
llm = OpenAI(model="gpt-3.5-turbo")

p = QueryPipeline(chain=[prompt_tmpl, llm], verbose=True)

In [None]:
output = p.run(movie_name="The Departed")

In [None]:
print(str(output))

In [None]:
# if you implemented this imperatively - you can still do so!
# just make sure you format the prompt and call the right method on the LLM
from llama_index.llms import ChatMessage, MessageRole

# try chaining basic prompts
prompt_str = "Please generate related movies to {movie_name}"
prompt_tmpl = PromptTemplate(prompt_str)
llm = OpenAI(model="gpt-3.5-turbo")


# format prompt, pass to LLM
movie_name = "The Departed"
full_prompt_tmpl = prompt_tmpl.format(movie_name=movie_name)
response = llm.chat([ChatMessage(content=full_prompt_tmpl, role=MessageRole.USER)])
print(str(response))

### Try Output Parsing

Let's parse the outputs into a structured Pydantic object.

In [None]:
from typing import List
from pydantic import BaseModel, Field
from llama_index.output_parsers import PydanticOutputParser


class Movie(BaseModel):
    """Object representing a single movie."""

    name: str = Field(..., description="Name of the movie.")
    year: int = Field(..., description="Year of the movie.")


class Movies(BaseModel):
    """Object representing a list of movies."""

    movies: List[Movie] = Field(..., description="List of movies.")


llm = OpenAI(model="gpt-3.5-turbo")
output_parser = PydanticOutputParser(Movies)
json_prompt_str = """\
Please generate related movies to {movie_name}.
"""
json_prompt_str = output_parser.format(json_prompt_str)

In [None]:
print(json_prompt_str)

In [None]:
# add JSON spec to prompt template
json_prompt_tmpl = PromptTemplate(json_prompt_str)

p = QueryPipeline(chain=[json_prompt_tmpl, llm, output_parser], verbose=True)
output = p.run(movie_name="Toy Story")

In [None]:
output

### Streaming Support

The query pipelines have LLM streaming support (simply do `as_query_component(streaming=True)`). Intermediate outputs will get autoconverted, and the final output can be a streaming output. Here's some examples.

**1. Chain multiple Prompts with Streaming**

In [None]:
prompt_str = "Please generate related movies to {movie_name}"
prompt_tmpl = PromptTemplate(prompt_str)
# let's add some subsequent prompts for fun
prompt_str2 = """\
Here's some text:

{text}

Can you rewrite this with a summary of each movie?
"""
prompt_tmpl2 = PromptTemplate(prompt_str2)
llm = OpenAI(model="gpt-3.5-turbo")
llm_c = llm.as_query_component(streaming=True)

p = QueryPipeline(
    chain=[prompt_tmpl, llm_c, prompt_tmpl2, llm_c], verbose=True
)
# p = QueryPipeline(chain=[prompt_tmpl, llm_c], verbose=True)

In [None]:
output = p.run(movie_name="The Dark Knight")
for o in output:
    print(o.delta, end="")

**2. Feed streaming output to output parser**

In [None]:
p = QueryPipeline(
    chain=[
        json_prompt_tmpl,
        llm.as_query_component(streaming=True),
        output_parser,
    ],
    verbose=True,
)
output = p.run(movie_name="Toy Story")
print(output)

## Chain Together Query Rewriting Workflow (prompts + LLM) with Retrieval

Here we try a slightly more complex workflow where we send the input through two prompts before initiating retrieval.

1. Generate question about given topic.
2. Hallucinate answer given question, for better retrieval.

Since each prompt only takes in one input, note that the `QueryPipeline` will automatically chain LLM outputs into the prompt and then into the LLM.

You'll see how to define links more explicitly in the next section.

In [None]:
# from llama_index.postprocessor import CohereRerank

# generate question regarding topic
prompt_str1 = "Please generate a concise question about Paul Graham's life regarding the following topic {topic}"
prompt_tmpl1 = PromptTemplate(prompt_str1)
# use HyDE to hallucinate answer.
prompt_str2 = (
    "Please write a passage to answer the question\n"
    "Try to include as many key details as possible.\n"
    "\n"
    "\n"
    "{query_str}\n"
    "\n"
    "\n"
    'Passage:"""\n'
)
prompt_tmpl2 = PromptTemplate(prompt_str2)

llm = OpenAI(model="gpt-3.5-turbo")
retriever = index.as_retriever(similarity_top_k=5)
p = QueryPipeline(
    chain=[prompt_tmpl1, llm, prompt_tmpl2, llm, retriever], verbose=True
)

In [None]:
nodes = p.run(topic="college")
len(nodes)

## Create a Full RAG Pipeline as a DAG

Here we chain together a full RAG pipeline consisting of query rewriting, retrieval, reranking, and response synthesis.

Here we can't use `chain` syntax because certain modules depend on multiple inputs (for instance, response synthesis expects both the retrieved nodes and the original question). Instead we'll construct a DAG explicitly, through `add_modules` and then `add_link`.

### 1. RAG Pipeline with Query Rewriting

We use an LLM to rewrite the query first before passing it to our downstream modules - retrieval/reranking/synthesis.

In [None]:
from llama_index.postprocessor import CohereRerank
from llama_index.response_synthesizers import TreeSummarize
from llama_index import ServiceContext

# define modules
prompt_str = "Please generate a question about Paul Graham's life regarding the following topic {topic}"
prompt_tmpl = PromptTemplate(prompt_str)
llm = OpenAI(model="gpt-3.5-turbo")
retriever = index.as_retriever(similarity_top_k=3)
reranker = CohereRerank()
## NOTE: we are deprecating ServiceContext soon in v0.10 and letting you pass in `llm` directly.
summarizer = TreeSummarize(
    service_context=ServiceContext.from_defaults(llm=llm)
)

In [None]:
# define query pipeline
p = QueryPipeline(verbose=True)
p.add_modules(
    {
        "llm": llm,
        "prompt_tmpl": prompt_tmpl,
        "retriever": retriever,
        "summarizer": summarizer,
        "reranker": reranker,
    }
)

Next we draw links between modules with `add_link`. `add_link` takes in the source/destination module ids, and optionally the `source_key` and `dest_key`. Specify the `source_key` or `dest_key` if there are multiple outputs/inputs respectively.

You can view the set of input/output keys for each module through `module.as_query_component().input_keys` and `module.as_query_component().output_keys`.

Here we explicitly specify `dest_key` for the `reranker` and `summarizer` modules because they take in two inputs (query_str and nodes).

In [None]:
p.add_link("prompt_tmpl", "llm")
p.add_link("llm", "retriever")
p.add_link("retriever", "reranker", dest_key="nodes")
p.add_link("llm", "reranker", dest_key="query_str")
p.add_link("reranker", "summarizer", dest_key="nodes")
p.add_link("llm", "summarizer", dest_key="query_str")

# look at summarizer input keys
print(summarizer.as_query_component().input_keys)

We use `networkx` to store the graph representation. This gives us an easy way to view the DAG!

In [None]:
## create graph
from pyvis.network import Network

net = Network(notebook=True, cdn_resources="in_line", directed=True)
net.from_nx(p.dag)
net.show("rag_dag.html")

In [None]:
response = p.run(topic="YC")

In [None]:
print(str(response))

In [None]:
# you can do async too
response = await p.arun(topic="YC")
print(str(response))

### 2. RAG Pipeline without Query Rewriting

Here we setup a RAG pipeline without the query rewriting step.

Here we need a way to link the input query to both the retriever, reranker, and summarizer. We can do this by defining a special `InputComponent`, allowing us to link the inputs to multiple downstream modules.

In [None]:
from llama_index.postprocessor import CohereRerank
from llama_index.response_synthesizers import TreeSummarize
from llama_index import ServiceContext
from llama_index.query_pipeline import InputComponent

retriever = index.as_retriever(similarity_top_k=5)
summarizer = TreeSummarize(
    service_context=ServiceContext.from_defaults(
        llm=OpenAI(model="gpt-3.5-turbo")
    )
)
reranker = CohereRerank()

In [None]:
p = QueryPipeline(verbose=True)
p.add_modules(
    {
        "input": InputComponent(),
        "retriever": retriever,
        "summarizer": summarizer,
    }
)
p.add_link("input", "retriever")
p.add_link("input", "summarizer", dest_key="query_str")
p.add_link("retriever", "summarizer", dest_key="nodes")

In [None]:
output = p.run(input="what did the author do in YC")

In [None]:
print(str(output))

## Defining a Custom Component in a Query Pipeline

You can easily define a custom component. Simply subclass a `QueryComponent`, implement validation/run functions + some helpers, and plug it in.

Let's wrap the related movie generation prompt+LLM chain from the first example into a custom component.

In [None]:
from llama_index.query_pipeline import (
    CustomQueryComponent,
    InputKeys,
    OutputKeys,
)
from typing import Dict, Any
from llama_index.llms.llm import BaseLLM
from pydantic import Field


class RelatedMovieComponent(CustomQueryComponent):
    """Related movie component."""

    llm: BaseLLM = Field(..., description="OpenAI LLM")

    def _validate_component_inputs(
        self, input: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Validate component inputs during run_component."""
        # NOTE: this is OPTIONAL but we show you here how to do validation as an example
        return input

    @property
    def _input_keys(self) -> set:
        """Input keys dict."""
        # NOTE: These are required inputs. If you have optional inputs please override
        # `optional_input_keys_dict`
        return {"movie"}

    @property
    def _output_keys(self) -> set:
        return {"output"}

    def _run_component(self, **kwargs) -> Dict[str, Any]:
        """Run the component."""
        # use QueryPipeline itself here for convenience
        prompt_str = "Please generate related movies to {movie_name}"
        prompt_tmpl = PromptTemplate(prompt_str)
        p = QueryPipeline(chain=[prompt_tmpl, llm])
        return {"output": p.run(movie_name=kwargs["movie"])}


# from llama_index.query_pipeline import FunctionComponent

# def foo(x: str) -> str:
#     return x + ":hello"

# component = FunctionComponent(fn=foo)

Let's try the custom component out! We'll also add a step to convert the output to Shakespeare.

In [None]:
llm = OpenAI(model="gpt-3.5-turbo")
component = RelatedMovieComponent(llm=llm)

# let's add some subsequent prompts for fun
prompt_str = """\
Here's some text:

{text}

Can you rewrite this in the voice of Shakespeare?
"""
prompt_tmpl = PromptTemplate(prompt_str)

p = QueryPipeline(chain=[component, prompt_tmpl, llm], verbose=True)

In [None]:
output = p.run(movie="Love Actually")

In [None]:
print(str(output))

## Async / Parallel Execution

Here we showcase our query pipeline with async + parallel execution.

We do this by setting up a RAG pipeline that does the following:
1. Send query to multiple RAG query engines.
2. Combine results.

In the process we'll also show some nice abstractions for joining results (e.g. our `ArgPackComponent()`)

### Define Multiple Query Engines (One per Chunk Size)

In [None]:
from llama_index.query_pipeline import (
    QueryPipeline,
    InputComponent,
    ArgPackComponent,
)
from typing import Dict, Any, List, Optional
from llama_index.llama_pack.base import BaseLlamaPack
from llama_index.llms.llm import LLM
from llama_index.llms.openai import OpenAI
from llama_index import Document, VectorStoreIndex, ServiceContext
from llama_index.response_synthesizers import TreeSummarize
from llama_index.schema import NodeWithScore, TextNode
from llama_index.node_parser import SentenceSplitter


llm = OpenAI(model="gpt-3.5-turbo")
chunk_sizes = [128, 256, 512, 1024]
query_engines = {}
for chunk_size in chunk_sizes:
    splitter = SentenceSplitter(chunk_size=chunk_size, chunk_overlap=0)
    nodes = splitter.get_nodes_from_documents(docs)
    vector_index = VectorStoreIndex(nodes)
    query_engines[str(chunk_size)] = vector_index.as_query_engine()

### Construct a Query Pipeline

In [None]:
# construct query pipeline
p = QueryPipeline(verbose=True)
module_dict = {
    **query_engines,
    "input": InputComponent(),
    "summarizer": TreeSummarize(),
    "join": ArgPackComponent(
        convert_fn=lambda x: NodeWithScore(node=TextNode(text=str(x)))
    ),
}
p.add_modules(module_dict)
# add links from input to query engine (id'ed by chunk_size)
for chunk_size in chunk_sizes:
    p.add_link("input", str(chunk_size))
    p.add_link(str(chunk_size), "join", dest_key=str(chunk_size))
p.add_link("join", "summarizer", dest_key="nodes")
p.add_link("input", "summarizer", dest_key="query_str")

#### Visualize

In [None]:
## create graph
from pyvis.network import Network

net = Network(notebook=True, cdn_resources="in_line", directed=True)
net.from_nx(p.dag)
net.show("rag_dag.html")

### Run Pipeline

In [None]:
import time

start_time = time.time()
response = await p.arun(input="What did the author do during his time in YC?")
print(str(response))
end_time = time.time()
print(f"Time taken: {end_time - start_time}")

In [None]:
# compare with sync method

start_time = time.time()
response = p.run(input="What did the author do during his time in YC?")
print(str(response))
end_time = time.time()
print(f"Time taken: {end_time - start_time}")