## Create a `QueryEngine` for retrieval augmented generation

### Set up the environemnt first

In [1]:
!pip install -q -r requirements.txt

In [2]:
from my_config import MyConfig
my_config = MyConfig()

### Setting up the persona database
We will be using personas from the dvilasuero/finepersonas-v0.1-tiny dataset. This dataset contains 5K personas that will be attending the party!
Let's load the dataset and store it as files in the data directory

In [3]:
# from datasets import load_dataset
# from pathlib import Path

# dataset = load_dataset(path="dvilasuero/finepersonas-v0.1-tiny", split="train")

# Path("data").mkdir(parents=True, exist_ok=True)
# for i, persona in enumerate(dataset):
#     with open(Path("data") / f"persona_{i}.txt", "w") as f:
#         f.write(persona["persona"])

### Loading and embedding persona documents
We will use the `SimpleDirectoryReader` to load the persona descriptions from the `data` directory. This will return a list of `Document` objects.

In [4]:
from llama_index.core import SimpleDirectoryReader

reader = SimpleDirectoryReader(input_dir="data")
documents = reader.load_data()
len(documents)



5000

Now we have a list of `Document` objects, we can use the `IngestionPipeline` to create nodes from the documents and prepare them for the `QueryEngine`.

We will use the `SentenceSplitter` to split the documents into smaller chunks and the `HuggingFaceEmbedding` (via `vLLM` possibly) to embed the chunks.

#### Create a Custom Embedding Class
We are subclassing `BaseEmbedding` from `llama_index` and override the `aget_text_embedding` method to hit my `RunPod` endpoint. Super Cool stuff.

In [14]:
from llama_index.core.embeddings import BaseEmbedding
import aiohttp
from typing import Optional

class RunPodEmbedding(BaseEmbedding):
    endpoint_url: str

    def __init__(self, endpoint_url: str, **kwargs):
        super().__init__(endpoint_url=endpoint_url, **kwargs)

    async def _aget_text_embedding(self, text: str):
        async with aiohttp.ClientSession() as session:
            payload = {"input": text}
            async with session.post(self.endpoint_url, json=payload) as resp:
                result = await resp.json()
                return result["data"][0]["embedding"]

    def _get_text_embedding(self, text: str):
        raise NotImplementedError("Sync embedding not implemented.")

    async def _aget_query_embedding(self, query: str):
        return await self._aget_text_embedding(query)

    def _get_query_embedding(self, query: str):
        raise NotImplementedError("Sync embedding not implemented.")

In [None]:
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.ingestion import IngestionPipeline

# Instantiate custom class with your RunPod URL
runpod_url = f"https://{my_config.VLLM_INFERENCE_RUNPOD_ID}-8000.proxy.runpod.net/v1/embeddings"
embedding_model = RunPodEmbedding(endpoint_url=runpod_url)

pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(),
        embedding_model,
    ]
)

nodes = list()
for batch_number in range(0, len(documents), 200):
    batch = documents[batch_number:batch_number + 200]
    nodes.extend(await pipeline.arun(documents=batch))
    print(f"Processed batch {batch_number // 200 + 1} => {batch_number}:{batch_number + 200}")
    import time; time.sleep(2)  # To avoid rate limiting
len(nodes)

Processed batch 1 of 26
Processed batch 2 of 26
Processed batch 3 of 26
Processed batch 4 of 26
Processed batch 5 of 26
Processed batch 6 of 26
Processed batch 7 of 26
Processed batch 8 of 26
Processed batch 9 of 26
Processed batch 10 of 26
Processed batch 11 of 26
Processed batch 12 of 26
Processed batch 13 of 26
Processed batch 14 of 26
Processed batch 15 of 26


ClientConnectorDNSError: Cannot connect to host v6irazi2v8xomb-8000.proxy.runpod.net:443 ssl:default [nodename nor servname provided, or not known]

### Storing and indexing documents
Since we are using an ingestion pipeline, we can directly attach a vector store to the pipeline to populate it. In this case, we will use `Chroma` to store our documents. Let's run the pipeline again with the vector store attached. The `IngestionPipeline` caches the operations so this should be fast!

In [None]:
import chromadb
from llama_index.vector_stores.chroma import ChromaVectorStore

db = chromadb.PersistentClient(path="./alfred_chroma_db")
chroma_collection = db.get_or_create_collection(name="alfred")
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)


# Instantiate custom class with your RunPod URL
runpod_url = "https://h8zdxcaagexdrc-8000.proxy.runpod.net/v1/embeddings"
embedding_model = RunPodEmbedding(endpoint_url=runpod_url)

pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(),
        embedding_model
    ],
    vector_store=vector_store,
)

nodes = list()
for batch in range(0, len(documents), 200):
    nodes.extend(await pipeline.arun(documents=documents[batch:batch+200]))
    print(f"Processed batch {batch // 200 + 1} => {batch}:{batch + 200}")
len(nodes)

10

We can create a `VectorStoreIndex` from the vector store and use it to query the documents by passing the vector store and embedding model to the `from_vector_store()` method.

In [6]:
from llama_index.core import VectorStoreIndex


# Instantiate custom class with your RunPod URL
runpod_url = "https://h8zdxcaagexdrc-8000.proxy.runpod.net/v1/embeddings"
embedding_model = RunPodEmbedding(endpoint_url=runpod_url)
# embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5")
index = VectorStoreIndex.from_vector_store(
    vector_store=vector_store, embed_model=embedding_model
)

We don't need to worry about persisting the index to disk, as it is automatically saved within the `ChromaVectorStore` object and the passed directory path.

### Querying the index
Now that we have our index, we can use it to query the documents. Let's create a `QueryEngine` from the index and use it to query the documents using a specific response mode.

In [57]:
from llama_index.core.llms import ChatMessage
import aiohttp
from typing import List

In [68]:
from llama_index.core.llms.llm import LLM
from llama_index.core.llms import ChatMessage, LLMMetadata, CompletionResponse  # Sometimes required
import aiohttp
from typing import List, Any

class RunPodQwenLLM(LLM):
    api_url: str  # Pydantic field

    def __init__(self, api_url: str, **kwargs):
        super().__init__(api_url=api_url, **kwargs)

    # 🟢 Async chat: core method for LlamaIndex RAG
    async def achat(
        self, messages: List[ChatMessage], **kwargs
    ) -> str:
        # Prepare payload
        payload = {
            "model": "Qwen/Qwen2.5-Coder-7B-Instruct",
            "messages": [
                {"role": m.role, "content": m.content} for m in messages
            ],
            "temperature": 0.7,
        }
        async with aiohttp.ClientSession() as session:
            async with session.post(self.api_url, json=payload) as resp:
                result = await resp.json()
                output_text = result["choices"][0]["message"]["content"]
                return ChatMessage(role="assistant", content=output_text)

    # 🔴 All other required methods: stub/not implemented
    async def astream_chat(self, messages: List[ChatMessage], **kwargs) -> Any:
        raise NotImplementedError()

    async def astream_complete(self, prompt: str, **kwargs) -> Any:
        raise NotImplementedError()

    async def chat(self, messages: List[ChatMessage], **kwargs) -> str:
        raise NotImplementedError()

    def complete(self, prompt: str, **kwargs) -> str:
        raise NotImplementedError()

    def stream_chat(self, messages: List[ChatMessage], **kwargs) -> Any:
        raise NotImplementedError()

    def stream_complete(self, prompt: str, **kwargs) -> Any:
        raise NotImplementedError()

    async def acomplete(self, prompt: str, **kwargs) -> CompletionResponse:
        payload = {
            "model": "Qwen/Qwen2.5-Coder-7B-Instruct",   # Adjust model name as needed
            "messages": [
                {"role": "user", "content": prompt}
            ],
            "max_tokens": 1024,
            "temperature": 0.7,
            "top_p": 0.95,
        }
        async with aiohttp.ClientSession() as session:
            async with session.post(self.api_url, json=payload) as resp:
                result = await resp.json()
                output_text = result["choices"][0]["message"]["content"]
                return CompletionResponse(
                    text=output_text,
                    usage=result.get("usage", {}),
                    model=self.metadata.model_name
                )
    
    async def _get_query_embedding(self, query: str):
        # Simple solution: use asyncio to run async in sync context (not efficient, but unblocks you)
        import asyncio
        return asyncio.run(self._aget_query_embedding(query))

    @property
    def metadata(self) -> LLMMetadata:
        return LLMMetadata(
            name="RunPodQwenLLM",
            description="RunPod Qwen LLM for chat completions",
            model_name="Qwen/Qwen2.5-Coder-7B-Instruct",  # Adjust model name as needed
            max_input_size=4096,  # Adjust based on the model's capabilities
            max_output_tokens=1024,  # Adjust based on your needs
            context_window=2048,  # Adjust based on your needs
        )
    

In [69]:
vllm_api_base = "https://e9q6qi5px4g2md-8000.proxy.runpod.net/v1/chat/completions"
llm = RunPodQwenLLM(api_url=vllm_api_base)

In [70]:
query_engine = index.as_query_engine(
    llm=llm,
    response_mode="tree_summarize",
)

In [71]:
response = await query_engine.aquery(
    "Respond using a persona that describes author and travel experiences?"
)
response

Response(response="**Persona:** Dr. Elena Vassiliou, an accomplished anthropologist specializing in Cypriot culture, history, and society. With over two decades of dedicated research and firsthand experience living in Cyprus, Dr. Vassiliou has become an authority on the intricate tapestry of Cypriot life. Her extensive fieldwork has provided her with invaluable insights into the daily customs, traditions, and social dynamics of the island's diverse communities. As a resident of Nicosia for ten years, she has witnessed firsthand how historical events have shaped contemporary Cypriot society and how traditional practices coexist with modern influences. Through her extensive travels across the Mediterranean, Dr. Vassiliou has also gained a broader perspective on regional cultures, enriching her understanding of Cyprus within a wider context. Her work continues to inspire and educate those interested in exploring the rich heritage and evolving identity of Cyprus.", source_nodes=[NodeWithSc