# Build RAG using Zilliz Pipeline

In this notebook, I will introduce how to use [zilliz pipeline](https://docs.zilliz.com/docs/pipelines) to build a simple [RAG](https://research.ibm.com/blog/retrieval-augmented-generation-RAG) application.

We will first create some zilliz pipelines, and then build RAG based on the runs of these pipelines. In fact, you can also create pipelines through the [Web UI](https://docs.zilliz.com/docs/create-piplines-gui), but in this tutorial, the pipeline will be built through the [RESTful API](https://docs.zilliz.com/docs/create-piplines-rest). This method is easier to integrate into your RAG application.


## Prerequisites
You need to create a new cluster in zilliz cloud first. Then you can get your CLOUD_REGION, CLUSTER_ID and API_KEY. Your can refer this [page](https://docs.zilliz.com/docs/create-cluster) for more details.

In [1]:
import os

CLOUD_REGION = os.getenv('ZILLIZ_CLOUD_REGION')  # your CLOUD_REGION
CLUSTER_ID = os.getenv('ZILLIZ_CLUSTER_ID')  # your CLUSTER_ID
API_KEY = os.getenv('ZILLIZ_API_KEY')  # your API_KEY

## Create Pipelines
### Create an ingestion pipeline
We build an ingestion pipeline first. [Ingestion pipelines](https://docs.zilliz.com/docs/understanding-pipelines#ingestion-pipelines) can process unstructured data into searchable vector embeddings and store them in Zilliz Cloud Vector Databases.

When creating ingestion pipeline, you must specify some functions. Ingestion pipeline allows two types of functions.

- The `INDEX_DOC` function splits the input text document into chunks and generates a vector embedding for each chunk. This function maps an input field (doc_url) to four output fields (doc_name, chunk_id, chunk_text, and embedding) as the scalar and vector fields of the auto-generated collection. The field names cannot be changed.
- The `PRESERVE` function stores user-defined input fields as additional scalar fields in the auto-generated collection for storing metadata. You can use PRESERVE functions to preserve the metadata of your documents. A single PRESERVE function preserves one field, and up to five PRESERVE functions can be added to one Ingestion pipeline.

Next, we will create an INDEX_DOC function and a PRESERVE function. They will eventually create a collection in the cluster, named `my_rag_collection`. It contains five fields:
- `doc_name`, `chunk_id`, `chunk_text`, `embedding` are from INDEX_DOC function
- `publish_year` is from PRESERVE function




In [2]:
import requests

headers = {
    "Content-Type": "application/json",
    "Accept": "application/json",
    "Authorization": f"Bearer {API_KEY}"
}

create_pipeline_url = f"https://controller.api.{CLOUD_REGION}.zillizcloud.com/v1/pipelines"

collection_name = 'my_rag_collection'

data = {
    "name": "my_ingestion_pipeline",
    "description": "A pipeline that splits a text file into chunks and generates embeddings. It also stores the publish_year with each chunk.",
    "type": "INGESTION",
    "functions": [
        {
            "name": "index_my_doc",
            "action": "INDEX_DOC",
            "inputField": "doc_url",
            "language": "ENGLISH"
        },
        {
            "name": "keep_doc_info",
            "action": "PRESERVE",
            "inputField": "publish_year",
            "outputField": "publish_year",
            "fieldType": "Int16"
        }
    ],
    "clusterId": f"{CLUSTER_ID}",
    "newCollectionName": f"{collection_name}"
}

response = requests.post(create_pipeline_url, headers=headers, json=data)
print(response.json())
ingestion_pipe_id = response.json()["data"]["pipelineId"]

{'code': 200, 'data': {'pipelineId': 'pipe-62f9acbdbdb3ec832bfe2a', 'name': 'my_ingestion_pipeline', 'type': 'INGESTION', 'description': 'A pipeline that splits a text file into chunks and generates embeddings. It also stores the publish_year with each chunk.', 'status': 'SERVING', 'functions': [{'action': 'INDEX_DOC', 'name': 'index_my_doc', 'inputField': 'doc_url', 'language': 'ENGLISH'}, {'action': 'PRESERVE', 'name': 'keep_doc_info', 'inputField': 'publish_year', 'outputField': 'publish_year', 'fieldType': 'Int16'}], 'clusterId': 'in03-423dca989cc7410', 'newCollectionName': 'my_rag_collection'}}


After successful creation, it will return a pipeline ID. We will run this pipeline later and will use this pipeline ID.

### Create a search pipeline
We build a search pipeline next. [Search pipelines](https://docs.zilliz.com/docs/understanding-pipelines#search-pipelines) enables semantic search by converting a query string into a vector embedding and then retrieving top-K similar vectors with its corresponding text and other metadata.

A Search pipeline allows one type of function `SEARCH_DOC_CHUNK`. You need to set the searching collection name, as well as other fields.



In [3]:
data = {
    "name": "my_search_pipeline",
    "description": "A pipeline that receives text and search for semantically similar doc chunks",
    "type": "SEARCH",
    "functions": [
        {
            "name": "search_chunk_text_and_title",
            "action": "SEARCH_DOC_CHUNK",
            "inputField": "query_text",
            "clusterId": f"{CLUSTER_ID}",
            "collectionName": f"{collection_name}"
        }
    ]
}

response = requests.post(create_pipeline_url, headers=headers, json=data)

print(response.json())
search_pipe_id = response.json()["data"]["pipelineId"]

{'code': 200, 'data': {'pipelineId': 'pipe-09cba473985a1b3108e588', 'name': 'my_search_pipeline', 'type': 'SEARCH', 'description': 'A pipeline that receives text and search for semantically similar doc chunks', 'status': 'SERVING', 'functions': [{'action': 'SEARCH_DOC_CHUNK', 'name': 'search_chunk_text_and_title', 'inputField': 'query_text', 'clusterId': 'in03-423dca989cc7410', 'collectionName': 'my_rag_collection'}]}}


Similarly, after successful creation, it will return a pipeline ID. We will run this pipeline later and will use this pipeline ID.


## Run pipelines
### Run ingestion pipeline
Before running the pipeline, upload your document to [AWS S3](https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html) or [Google Cloud Storage (GCS)](https://cloud.google.com/storage/docs/uploads-downloads). Supported file types include .txt, .pdf, .md, .html, .epub, .csv, .doc, .docx, .xls, .xlsx, .ppt, .pptx.
We have stored a simple knowledge document with respect to zilliz concept in GCS, and we will provide this link to this run.
This knowledge is very new and professional, which will not be included in the LLM training materials.

In the data field, you need to specify the specific value of the input field of the previous pipeline.

In [4]:
run_pipeline_url = f"https://controller.api.{CLOUD_REGION}.zillizcloud.com/v1/pipelines/{ingestion_pipe_id}/run"

gcs_url = 'https://storage.googleapis.com/...'  # your gcs url

data = {
    "data":
        {
            "doc_url": f"{gcs_url}",
            "publish_year": 2023
        }
}

response = requests.post(run_pipeline_url, headers=headers, json=data)

print(response.json())

{'code': 200, 'data': {'doc_name': 'zilliz_concept.md', 'num_chunks': 4}}


Great, we successfully ingested the contents of the document we stored on gcs. In the corresponding zilliz collection, you can also see the corresponding document chunks and vectors, which have been stored in the vector database.

### Run search pipeline
We will run the search pipeline next. Here, we wrap this run with a function, which is for convenience when we build RAG later.

In [5]:
import pprint

def retrieval_from_pipeline(question, search_pipe_id, top_k=2, verbose=False):
    run_pipeline_url = f"https://controller.api.{CLOUD_REGION}.zillizcloud.com/v1/pipelines/{search_pipe_id}/run"

    data = {
        "data": {
            "query_text": question
        },
        "params": {
            "limit": top_k,
            "offset": 0,
            "outputFields": [
                "chunk_text",
                "chunk_id",
                "doc_name",
                "publish_year"
            ],
        }
    }
    response = requests.post(run_pipeline_url, headers=headers, json=data)
    if verbose:
        pprint.pprint(response.json())
    results = response.json()["data"]["result"]
    retrieval_texts = [result['chunk_text'] for result in results]
    return retrieval_texts


question = 'What are the three types of CUs in Zilliz Cloud cluster?'
retrieval_from_pipeline(question, search_pipe_id, top_k=2, verbose=True)

{'code': 200,
 'data': {'result': [{'chunk_id': 0,
                      'chunk_text': 'Cluster, Collection & Entities\n'
                                    'A Zilliz Cloud cluster is a managed '
                                    'Milvus instance associated with certain '
                                    'computing resources. You can create '
                                    'collections in the cluster and insert '
                                    'entities into them. In comparison to a '
                                    'relational database, a collection in a '
                                    'cluster is similar to a table in the '
                                    'database, and an entity in a collection '
                                    'is similar to a record in the table.\n'
                                    'Cluster\n'
                                    'When creating a cluster on Zilliz Cloud, '
                                    'you must specify th

['Cluster, Collection & Entities\nA Zilliz Cloud cluster is a managed Milvus instance associated with certain computing resources. You can create collections in the cluster and insert entities into them. In comparison to a relational database, a collection in a cluster is similar to a table in the database, and an entity in a collection is similar to a record in the table.\nCluster\nWhen creating a cluster on Zilliz Cloud, you must specify the type of CU associated with the cluster. There are three types of CUs available: performance-optimized, capacity-optimized, and cost-optimized. You can learn how to choose among these types in Select the Right CU.\nAfter determining the CU type, you must also specify its size. Note that the number of collections a cluster can hold varies based on its CU size. A cluster with less than 8 CUs can hold no more than 32 collections, while a cluster with more than 8 CUs can hold as many as 256 collections.\nAll collections in a cluster share the CUs asso

We can see that when we ask a question, this search run can return the top k knowledge fragments we need. This is also a basis for forming RAG.

## Build RAG
We have built a function named `retrieval_from_pipeline`, which can recall the knowledge we have ingested into the zilliz vector database.
Below, we build a simple RAG based on the knowledge we have ingested. Here, the LLM we use is OpenAI, so you need to prepare your OpenAI api key.

In [6]:
from openai import OpenAI
client = OpenAI()
client.api_key = os.getenv('OPENAI_API_KEY')  # your OpenAI API key

class RAG:
    def __init__(self, search_pipe_id):
        self._search_pipe_id = search_pipe_id

    def retrieve(self, query: str) -> list:
        """
        Retrieve relevant text from zilliz pipeline.
        """
        results = retrieval_from_pipeline(query, self._search_pipe_id, top_k=2)
        return results

    def generate_completion(self, query: str, context_str: list) -> str:
        """
        Generate answer from context.
        """
        completion = client.chat.completions.create(
            model="gpt-3.5-turbo",
            temperature=0,
            messages=
            [
                {"role": "user",
                 "content":
                     f"We have provided context information below. \n"
                     f"---------------------\n"
                     f"{context_str}"
                     f"\n---------------------\n"
                     f"Given this information, please answer the question: {query}"
                 }
            ]
        ).choices[0].message.content
        return completion

    def query(self, query: str) -> str:
        context_str = self.retrieve(query)
        completion = self.generate_completion(query, context_str)
        return completion

rag = RAG(search_pipe_id)

We have initialized a RAG instance, and now we start to test it by asking about the content of the knowledge we uploaded.

In [7]:
question = 'What are the three types of CUs in Zilliz Cloud cluster?'
rag.query(question)

'The three types of CUs in Zilliz Cloud cluster are performance-optimized, capacity-optimized, and cost-optimized.'

The ground truth content in the original knowledge text is:
> When creating a cluster on Zilliz Cloud, you must specify the type of CU associated with the cluster. **There are three types of CUs available: performance-optimized, capacity-optimized, and cost-optimized.**

We see that the rag we built successfully answers this professional question.