# End-to-End Evaluation for RAG Pipeline with Recursive Document Agents

Let's evaluate this RAG pipeline for DevSecOps which was implemented with recursive document agents.

## Set up the query engine

### Install LlamaIndex and set up

In [24]:
!pip install llama_index==0.8.12
!pip install pypdf



In [25]:
from llama_index import (
    VectorStoreIndex,
    ListIndex,
    SimpleDirectoryReader,
    ServiceContext,
    Response
)
from llama_index.evaluation import (
    DatasetGenerator,
    QueryResponseEvaluator,
    ResponseEvaluator
)
from llama_index.retrievers import RecursiveRetriever
from llama_index.query_engine import RetrieverQueryEngine
from llama_index.response_synthesizers import get_response_synthesizer
from llama_index.schema import IndexNode
from llama_index.tools import QueryEngineTool, ToolMetadata
from llama_index.llms import OpenAI
from llama_index.agent import OpenAIAgent
import pandas as pd
import openai
import os

In [26]:
openai.api_key = 'YOUR-API-KEY'

#define LLM service
llm = OpenAI(temperature=0.1, model_name="gpt-3.5-turbo")
service_context = ServiceContext.from_defaults(llm=llm)

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### Load documents

In [27]:
titles = [
    "DevOps Self-Service Pipeline Architecture and Its 3–2–1 Rule",
    "DevOps Self-Service Centric Terraform Project Structure",
    "DevOps Self-Service Centric Pipeline Security and Guardrails"
    ]

documents = {}
for title in titles:
    documents[title] = SimpleDirectoryReader(input_files=[f"data/{title}.pdf"]).load_data()
print(f"loaded documents with {len(documents)} documents")

loaded documents with 3 documents


### Build document agents

In [28]:
# Build agents dictionary
agents = {}

for title in titles:

    # build vector index
    vector_index = VectorStoreIndex.from_documents(documents[title], service_context=service_context)

    # build list index
    list_index = ListIndex.from_documents(documents[title], service_context=service_context)

    # define query engines
    vector_query_engine = vector_index.as_query_engine()
    list_query_engine = list_index.as_query_engine()

    # define tools
    query_engine_tools = [
        QueryEngineTool(
            query_engine=vector_query_engine,
            metadata=ToolMetadata(
                name="vector_tool",
                description=f"Useful for retrieving specific context related to {title}",
            ),
        ),
        QueryEngineTool(
            query_engine=list_query_engine,
            metadata=ToolMetadata(
                name="summary_tool",
                description=f"Useful for summarization questions related to {title}",
            ),
        ),
    ]

    # build agent
    function_llm = OpenAI(model="gpt-3.5-turbo-0613")
    agent = OpenAIAgent.from_tools(
        query_engine_tools,
        llm=function_llm,
        verbose=False,
    )

    agents[title] = agent


### Build index nodes, indexes, and query engine

In [29]:
# define index nodes that link to the document agents
nodes = []
for title in titles:
    doc_summary = (
        f"This content contains details about {title}. "
        f"Use this index if you need to lookup specific facts about {title}.\n"
        "Do not use this index if you want to query multiple documents."
    )
    node = IndexNode(text=doc_summary, index_id=title)
    nodes.append(node)

# define retriever
vector_index = VectorStoreIndex(nodes)
vector_retriever = vector_index.as_retriever(similarity_top_k=1)

# define recursive retriever
# note: can pass `agents` dict as `query_engine_dict` since every agent can be used as a query engine
recursive_retriever = RecursiveRetriever(
    "vector",
    retriever_dict={"vector": vector_retriever},
    query_engine_dict=agents,
    verbose=False,
)

response_synthesizer = get_response_synthesizer(response_mode="compact")

# define query engine
query_engine = RetrieverQueryEngine.from_args(
    recursive_retriever,
    response_synthesizer=response_synthesizer,
    service_context=service_context,
)


## End-to-End Evaluation

### Generate the dataset

In [30]:
import random
random.seed(42)
from llama_index.prompts import Prompt

document_list = SimpleDirectoryReader("data").load_data()

gpt4_service_context = ServiceContext.from_defaults(llm=OpenAI(temperature=0.1, llm="gpt-4"))

question_dataset = []
if os.path.exists("question_dataset.txt"):
    with open("question_dataset.txt", "r") as f:
        for line in f:
            question_dataset.append(line.strip())
else:
    # generate questions
    data_generator = DatasetGenerator.from_documents(
        document_list,
        text_question_template=Prompt(
            "A sample from the documents is below.\n"
            "---------------------\n"
            "{context_str}\n"
            "---------------------\n"
            "Using the documentation sample, carefully follow the instructions below:\n"
            "{query_str}"
        ),
        question_gen_query=(
            "You are an evaluator for a search pipeline. Your task is to write a list of summarization "
            "questions or question/answer questions using the provided documents. Restrict the questions to the "
            "context information provided.\n"
            "Question: "
        ),
        # set this to be low, so we can generate more questions
        service_context=gpt4_service_context
    )
    generated_questions = data_generator.generate_questions_from_nodes()
    print(f"Generated {len(generated_questions)} questions.")

    # randomly pick 30 questions from each dataset
    generated_questions = random.sample(generated_questions, 30)
    question_dataset.extend(generated_questions)

    print(f"Randomly picked {len(question_dataset)} questions.")

    # save the questions!
    with open("question_dataset.txt", "w") as f:
        for question in question_dataset:
            f.write(f"{question.strip()}\n")

Generated 187 questions.
Randomly picked 30 questions.


### Print the questions

In [31]:
for i, question in enumerate(question_dataset, start=1):
    print(f"{i}. {question}")

1. What is the high-level design of DevOps pipelines?
2. What is a recently introduced feature in Infracost Cloud?
3. What is the purpose of Infracost in cloud cost management?
4. Why is it important to include TruffleHog in your pipelines?
5. How can you fix the vulnerability in the base image according to the provided instructions?
6. What is the purpose of the aquasecurity/trivy-action in the GitHub Actions CI workflow?
7. What are the optional parameters that can be used with the Checkov action?
8. How can Infracost be integrated into the infrastructure pipeline?
9. How are application pipelines triggered?
10. What is the topic of the second part in the series?
11. What command is used to generate the Infracost report in HTML format?
12. How does Terraform enable the creation of reusable infrastructure?
13. How can the GitHub Actions workflow be configured to dynamically select the backend configuration file based on the environment?
14. What is the diff feature in Infracost and ho

In [32]:
# define jupyter display function
def display_eval_df(query: str, response: Response, eval_result: str) -> None:
    eval_df = pd.DataFrame(
        {
            "Query": query,
            "Response": str(response),
            "Source": response.get_formatted_sources(500) + "...",
            "Evaluation Result": eval_result,
        },
        index=[0],
    )
    eval_df = eval_df.style.set_properties(
        **{
            "inline-size": "600px",
            "overflow-wrap": "break-word",
        },
        subset=["Response", "Source"]
    )
    display(eval_df)

### Evaluating Response, test with one question first

In [45]:
evaluator = ResponseEvaluator(service_context=gpt4_service_context)
response_vector = query_engine.query(question_dataset[0])
eval_result = evaluator.evaluate(response_vector)

pd.set_option("display.max_colwidth", 0)
display_eval_df(question_dataset[0], response_vector, eval_result)

Unnamed: 0,Query,Response,Source,Evaluation Result
0,What is the high-level design of DevOps pipelines?,"The high-level design of DevOps pipelines typically involves several stages, including source code management, build and compilation, automated testing, artifact repository, deployment, configuration management, continuous monitoring, feedback loop, and continuous improvement. This design aims to automate and streamline the software delivery process, enabling faster and more reliable releases.","> Source (Doc id: f5889f12-1ea8-418b-a068-ca731dda4f3a): Query: What is the high-level design of DevOps pipelines? Response: The high-level design of DevOps pipelines typically follows a continuous integration and continuous delivery (CI/CD) approach. Here is a general outline of the high-level design: 1. Source Code Management: Developers commit their code changes to a version control system (e.g., Git) to track and manage code changes. 2. Build and Compilation: The pipeline triggers a build process to compile the source code and generate execut......",YES


### Evaluating Response for Hallucination

In [34]:
import time
import asyncio
import nest_asyncio
nest_asyncio.apply()

def evaluate_query_engine(evaluator, query_engine, questions):
    async def run_query(query_engine, q):
        try:
            return await query_engine.aquery(q)
        except:
            return Response(response="Error, query failed.")

    total_correct = 0
    all_results = []
    for batch_size in range(0, len(questions), 5):
        batch_qs = questions[batch_size:batch_size+5]

        tasks = [run_query(query_engine, q) for q in batch_qs]
        responses = asyncio.run(asyncio.gather(*tasks))
        print(f"finished batch {(batch_size // 5) + 1} out of {len(questions) // 5}")

        for response in responses:
            eval_result = 1 if "YES" in evaluator.evaluate(response) else 0
            total_correct += eval_result
            all_results.append(eval_result)

        # helps avoid rate limits
        time.sleep(1)

    return total_correct, all_results

In [35]:
total_correct, all_results = evaluate_query_engine(evaluator, query_engine, question_dataset)

print(f"Hallucination? Scored {total_correct} out of {len(question_dataset)} questions correctly.")

finished batch 1 out of 6
finished batch 2 out of 6
finished batch 3 out of 6
finished batch 4 out of 6
finished batch 5 out of 6
finished batch 6 out of 6
Hallucination? Scored 30 out of 30 questions correctly.


### Find out the hallucinated questions and investigate why

In [14]:
import numpy as np

hallucinated_questions = np.array(question_dataset)[np.array(all_results) == 0]
print(hallucinated_questions)

[]


In [None]:
response = query_engine.query('')
print(str(response))
print("-----------------")
print(response.get_formatted_sources(length=1000))

### Evaluating Response for Answer Quality

In [36]:
import time
import asyncio
import nest_asyncio
nest_asyncio.apply()
from llama_index import Response

def evaluate_query_engine(evaluator, query_engine, questions):
    async def run_query(query_engine, q):
        try:
            return await query_engine.aquery(q)
        except:
            return Response(response="Error, query failed.")

    total_correct = 0
    all_results = []
    for batch_size in range(0, len(questions), 5):
        batch_qs = questions[batch_size:batch_size+5]

        tasks = [run_query(query_engine, q) for q in batch_qs]
        responses = asyncio.run(asyncio.gather(*tasks))
        print(f"finished batch {(batch_size // 5) + 1} out of {len(questions) // 5}")

        for question, response in zip(batch_qs, responses):
            eval_result = 1 if "YES" in evaluator.evaluate(question, response) else 0
            total_correct += eval_result
            all_results.append(eval_result)

        # helps avoid rate limits
        time.sleep(1)

    return total_correct, all_results

In [37]:
evaluator = QueryResponseEvaluator(service_context=gpt4_service_context)

total_correct, all_results = evaluate_query_engine(evaluator, query_engine, question_dataset)

print(f"Response satisfies the query? Scored {total_correct} out of {len(question_dataset)} questions correctly.")

finished batch 1 out of 6
finished batch 2 out of 6
finished batch 3 out of 6
finished batch 4 out of 6
finished batch 5 out of 6
finished batch 6 out of 6
Response satisfies the query? Scored 30 out of 30 questions correctly.


### Find out unanswered queries and investigate why

In [None]:
import numpy as np

unanswered_queries = np.array(question_dataset)[np.array(all_results) == 0]
print(unanswered_queries)

[]


In [44]:
response = query_engine.query('What is the topic of the second part in the series?')
print(str(response))
print("-----------------")
print(response.get_formatted_sources(length=256))

The topic of the second part in the series is "DevOps Self-Service Pipeline Architecture and Its 3-2-1 Rule".
-----------------
> Source (Doc id: 509a158b-728b-4266-b1d6-2454b2b088dc): Query: What is the topic of the second part in the series?
Response: The topic of the second part in the series is "DevOps Self-Service Pipeline Architecture and Its 3-2-1 Rule".
