<a href="https://colab.research.google.com/github/milvus-io/bootcamp/blob/master/tutorials/quickstart/build_RAG_with_milvus.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>   <a href="https://github.com/milvus-io/bootcamp/blob/master/tutorials/quickstart/build_RAG_with_milvus.ipynb" target="_blank">
    <img src="https://img.shields.io/badge/View%20on%20GitHub-555555?style=flat&logo=github&logoColor=white" alt="GitHub Repository"/>

# Build RAG with Milvus

<img src="https://raw.githubusercontent.com/milvus-io/bootcamp/master/tutorials/quickstart/apps/rag_search_with_milvus/pics/rag_demo.png"/>

In this tutorial, we will show you how to build a RAG(Retrieval-Augmented Generation) pipeline with Milvus.

The RAG system combines a retrieval system with a generative model to generate new text based on a given prompt. The system first retrieves relevant documents from a corpus using Milvus, and then uses a generative model to generate new text based on the retrieved documents.


## Preparation
### Dependencies and Environment

In [1]:
! pip install PyPDF2 sentence-transformers pymilvus langchain tqdm PyMuPDF tools pdfplumber docx python-docx

Collecting tools
  Downloading tools-1.0.4-py3-none-any.whl.metadata (1.3 kB)
Collecting docx
  Downloading docx-0.2.4.tar.gz (54 kB)
  Preparing metadata (setup.py) ... [?25ldone
Downloading tools-1.0.4-py3-none-any.whl (39 kB)
Building wheels for collected packages: docx
[33m  DEPRECATION: Building 'docx' using the legacy setup.py bdist_wheel mechanism, which will be removed in a future version. pip 25.3 will enforce this behaviour change. A possible replacement is to use the standardized build interface by setting the `--use-pep517` option, (possibly combined with `--no-build-isolation`), or adding a `pyproject.toml` file to the source tree of 'docx'. Discussion can be found at https://github.com/pypa/pip/issues/6334[0m[33m
[0m  Building wheel for docx (setup.py) ... [?25ldone
[?25h  Created wheel for docx: filename=docx-0.2.4-py3-none-any.whl size=53931 sha256=b00ad5c2394d6bfd4b81d6eb7a2b9ae01ba7144cb907cd03d2d4c0787a507b1b
  Stored in directory: /home/p4ultr4n/.cache/pip/wh

> If you are using Google Colab, to enable dependencies just installed, you may need to **restart the runtime** (click on the "Runtime" menu at the top of the screen, and select "Restart session" from the dropdown menu).

In [2]:
import zipfile
import os
zip_filename = "/home/p4ultr4n/Workplace/VulCan/hacktricks.zip"  # Replace if your file has a different name

with zipfile.ZipFile(zip_filename, 'r') as zip_ref:
    zip_ref.extractall(".")  # Extract to current directory

print("\n✅ Extracted successfully!")


✅ Extracted successfully!


We will use OpenAI as the LLM in this example. You should prepare the [api key](https://platform.openai.com/docs/quickstart) `OPENAI_API_KEY` as an environment variable.

### Prepare the data

We use the FAQ pages from the [Milvus Documentation 2.4.x](https://github.com/milvus-io/milvus-docs/releases/download/v2.4.6-preview/milvus_docs_2.4.x_en.zip) as the private knowledge in our RAG, which is a good data source for a simple RAG pipeline.

Download the zip file and extract documents to the folder `milvus_docs`.

We load all markdown files from the folder `milvus_docs/en/faq`. For each document, we just simply use "# " to separate the content in the file, which can roughly separate the content of each main part of the markdown file.

### Prepare the Embedding Model

We initialize the OpenAI client to prepare the embedding model.

Define a function to generate text embeddings using OpenAI client. We use the [text-embedding-3-small](https://platform.openai.com/docs/guides/embeddings) model as an example.

In [3]:
from pymilvus import MilvusClient, DataType
from sentence_transformers import SentenceTransformer
from langchain.text_splitter import RecursiveCharacterTextSplitter
from sentence_transformers import SentenceTransformer
import PyPDF2

  from .autonotebook import tqdm as notebook_tqdm


In [5]:
# Replace with your Zilliz Cloud details
URI = "https://in03-44216dd9c8d7f1e.serverless.gcp-us-west1.cloud.zilliz.com"
TOKEN = "b5647f84a814f9886144ed037d7fed740c3281a0e89e1d2faae35d4dccc4be0e0af6c05f8d7f0e667c9154fabd093ee2236974fe"  # e.g., "root:your-secret-password"

# Initialize MilvusClient with a local database file
def create_milvus_client(uri, token):
    client = MilvusClient(
        uri=uri,
        token=token,
        secure=True
    )
    print("Connected to Milvus Online!")
    return client
client = create_milvus_client(URI, TOKEN)

Connected to Milvus Online!


In [6]:
collections = client.list_collections()
collections

[]

In [7]:
model = SentenceTransformer('all-MiniLM-L6-v2')
text_splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=50)

Generate a test embedding and print its dimension and first few elements.

## Load data into Milvus

### Create the Collection

> As for the argument of `MilvusClient`:
> - Setting the `uri` as a local file, e.g.`./milvus.db`, is the most convenient method, as it automatically utilizes [Milvus Lite](https://milvus.io/docs/milvus_lite.md) to store all data in this file.
> - If you have large scale of data, you can set up a more performant Milvus server on [docker or kubernetes](https://milvus.io/docs/quickstart.md). In this setup, please use the server uri, e.g.`http://localhost:19530`, as your `uri`.
> - If you want to use [Zilliz Cloud](https://zilliz.com/cloud), the fully managed cloud service for Milvus, adjust the `uri` and `token`, which correspond to the [Public Endpoint and Api key](https://docs.zilliz.com/docs/on-zilliz-cloud-console#free-cluster-details) in Zilliz Cloud.

Check if the collection already exists and drop it if it does.

## Build RAG

### Retrieve data for a query

Let's specify a frequent question about Milvus.

In [8]:
!pip install tools pdfplumber



In [9]:
client.list_collections()

[]

Search for the question in the collection and retrieve the semantic top-3 matches.

Let's take a look at the search results of the query


### Use LLM to get a RAG response

Convert the retrieved documents into a string format.

Define system and user prompts for the Language Model. This prompt is assembled with the retrieved documents from Milvus.

In [10]:
import os
import pdfplumber
import docx
from langchain.text_splitter import RecursiveCharacterTextSplitter, MarkdownHeaderTextSplitter
from sentence_transformers import SentenceTransformer
from pymilvus import Collection, CollectionSchema, FieldSchema, DataType

# Initialize global resources
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=500,
    chunk_overlap=50,
    length_function=len,
    separators=["\n\n", "\n", " ", ""]
)
embedding_model = SentenceTransformer('all-MiniLM-L6-v2')

def create_rag_collection(collection_name="vulcan_rag"):
    """Create Milvus collection WITH PROPER INDEX CREATION"""
    fields = [
        FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
        FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=384),
        FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535),
        FieldSchema(name="source_path", dtype=DataType.VARCHAR, max_length=512),
        FieldSchema(name="file_type", dtype=DataType.VARCHAR, max_length=10),
        FieldSchema(name="doc_id", dtype=DataType.VARCHAR, max_length=512),
        FieldSchema(name="chunk_idx", dtype=DataType.INT64),
        FieldSchema(name="total_chunks", dtype=DataType.INT64)
    ]
    schema = CollectionSchema(
        fields,
        description="Unified RAG collection for multi-format documents"
    )

    # Create collection if not exists
    if collection_name not in client.list_collections():
        client.create_collection(collection_name=collection_name, schema=schema)
        print(f"✅ Created collection: {collection_name}")

        # === CRITICAL FIX: CREATE INDEX IMMEDIATELY AFTER COLLECTION CREATION ===
        # Use client.prepare_index_params to create the correct object
        index_params = client.prepare_index_params(
            field_name="embedding",
            index_type="IVF_FLAT", # Changed from AUTOINDEX
            metric_type="COSINE",
            params={"nlist": 128}  # Lower nlist for small datasets
        )
        client.create_index(
            collection_name=collection_name,
            index_params=index_params # Pass the IndexParams object
        )
        client.load_collection(collection_name)
        print("🔨 Index created for 'embedding' field")
    else:
        print(f"🔄 Collection {collection_name} already exists")

    return collection_name

def extract_text_from_pdf(pdf_path):
    """PDF extraction (simplified for clarity)"""
    try:
        with pdfplumber.open(pdf_path) as pdf:
            return "\n\n".join(page.extract_text() or "" for page in pdf.pages)
    except Exception as e:
        if "Password required" in str(e):
            raise ValueError("Encrypted PDF! Use password parameter") from e
        raise RuntimeError(f"PDF processing failed: {str(e)}") from e

def extract_text_from_docx(docx_path):
    """DOCX extraction with paragraph structure"""
    try:
        doc = docx.Document(docx_path)
        return "\n\n".join([para.text for para in doc.paragraphs if para.text.strip()])
    except Exception as e:
        raise RuntimeError(f"DOCX processing failed: {str(e)}") from e

def extract_text_from_md(md_path):
    """Markdown extraction"""
    try:
        with open(md_path, 'r', encoding='utf-8') as f:
            return f.read()
    except Exception as e:
        raise RuntimeError(f"MD processing failed: {str(e)}") from e

In [11]:
def ingest_directory(start_path, collection_name="vulcan_rag"):
    """
    CORRECTED INGESTION - INDEX CREATED BEFORE DATA INSERTION
    """
    # 1. Create collection AND INDEX (critical order)
    create_rag_collection(collection_name)

    # 2. Process files and insert data
    processors = {
        ".pdf": extract_text_from_pdf,
        ".docx": extract_text_from_docx,
        ".md": extract_text_from_md
    }

    print(f"\nStarting ingestion from: {start_path}")
    processed_files = 0
    total_chunks = 0
    batch = []

    for root, _, files in os.walk(start_path):
        for file in files:
            file_path = os.path.join(root, file)
            ext = os.path.splitext(file)[1].lower()

            if ext not in processors:
                continue

            try:
                rel_path = os.path.relpath(file_path, start_path).replace("\\", "/")
                print(f"\nProcessing: {rel_path}")

                # Extract text
                text = processors[ext](file_path)
                if not text.strip():
                    print(f"  ⚠️ Empty content in {rel_path}, skipping")
                    continue

                # Generate chunks
                chunks = text_splitter.split_text(text)
                doc_id = os.path.splitext(os.path.basename(file_path))[0]

                # Generate embeddings
                embeddings = embedding_model.encode(chunks, convert_to_tensor=False)

                # Prepare for Milvus
                data = [{
                    "embedding": emb.tolist(),
                    "text": chunk,
                    "source_path": rel_path,
                    "file_type": ext[1:],
                    "doc_id": doc_id,
                    "chunk_idx": i,
                    "total_chunks": len(chunks)
                } for i, (chunk, emb) in enumerate(zip(chunks, embeddings))]

                # Insert data
                client.insert(collection_name, data)
                print(f"  ✅ Inserted {len(data)} chunks from {rel_path}")

                processed_files += 1
                total_chunks += len(data)

            except Exception as e:
                print(f"  ❌ Failed to process {file_path}: {str(e)}")
                continue

    print(f"\n✅ Ingestion complete!")
    print(f"- Processed files: {processed_files}")
    print(f"- Total chunks: {total_chunks}")
    print(f"- Stored in collection: {collection_name}")

    return collection_name

In [12]:
def query_rag_with_context(
    query: str,
    collection_name: str = "vulcan_rag",
    filter_str: str = "",
    limit: int = 3,
    context_window: int = 2
) -> list:
    """
    Context-aware RAG query function.
    """
    # 1. Encode the query
    try:
        # NOTE: This matches your working example's embedding generation
        query_embedding = embedding_model.encode([query], convert_to_tensor=False).tolist()
    except NameError:
        raise RuntimeError(
            "Embedding model not initialized. Please run:\n"
            "from sentence_transformers import SentenceTransformer\n"
            "embedding_model = SentenceTransformer('all-MiniLM-L6-v2')"
        )

    # 2. Perform the search
    try:
        results = client.search(
            collection_name=collection_name,
            data=query_embedding,
            anns_field="embedding",  # Added required 'anns_field' parameter
            filter=filter_str,  # Corrected 'expr' to 'filter'
            limit=limit,
            output_fields=["text", "source_path", "file_type", "chunk_idx", "doc_id"],
            search_params={"metric_type": "COSINE", "params": {"nprobe": 16}}
        )
    except Exception as e:
        # Check if collection needs loading (common Milvus 2.0+ issue)
        try:
            client.load_collection(collection_name)
            # Try search again after loading
            results = client.search(
                collection_name=collection_name,
                data=query_embedding,
                anns_field="embedding",  # Added required 'anns_field' parameter
                filter=filter_str,  # Corrected 'expr' to 'filter'
                limit=limit,
                output_fields=["text", "source_path", "file_type", "chunk_idx", "doc_id"],
                search_params={"metric_type": "COSINE", "params": {"nprobe": 16}}
            )
        except Exception as load_error:
            raise RuntimeError(
                f"Search failed: {str(e)}\n"
                f"Collection loading failed: {str(load_error)}\n"
                "Possible causes:\n"
                "1. Collection '{collection_name}' doesn't exist\n"
                "2. Milvus server disconnected\n"
                "3. Collection not loaded (run client.load_collection('{collection_name}'))"
            ) from e

    # 3. Process results and fetch context
    full_results = []
    # Results are returned as a list of lists, so iterate through the outer list first
    for hits in results:
        for hit in hits:
            try:
                doc_id = hit.entity.get("doc_id")
                chunk_idx = hit.entity.get("chunk_idx")
                source_path = hit.entity.get("source_path")
                main_text = hit.entity.get("text")

                # Calculate context range
                start_idx = max(0, chunk_idx - context_window)
                end_idx = chunk_idx + context_window

                # Fetch context chunks (using same client pattern as insertion)
                expr = f'doc_id == "{doc_id}" and chunk_idx >= {start_idx} and chunk_idx <= {end_idx}'

                context_chunks = client.query(collection_name=collection_name, filter=expr,output_fields=["chunk_idx", "text"])

                # Sort chunks by index
                context_chunks.sort(key=lambda x: x['chunk_idx'])

                # Build full context
                full_context = "\n\n".join([chunk['text'] for chunk in context_chunks])

                full_results.append({
                    "main_chunk": main_text,
                    "full_context": full_context,
                    "source_path": source_path,
                    "relevance_score": 1 - hit.distance,
                    "context_chunks": len(context_chunks),
                    "chunk_position": f"{chunk_idx + 1}/{context_chunks[-1]['chunk_idx'] + 1 if context_chunks else '?'}"
                })

            except Exception as e:
                print(f"⚠️ Error processing hit: {str(e)}")
                continue

    return full_results

In [14]:
# Ingest your documents first
ingest_directory("/home/p4ultr4n/Workplace/VulCan/data/",)

# Now query with contex

🔄 Collection vulcan_rag already exists

Starting ingestion from: /home/p4ultr4n/Workplace/VulCan/data/

Processing: hacktricks/src/pentesting-web/xs-search.md
  ✅ Inserted 261 chunks from hacktricks/src/pentesting-web/xs-search.md

Processing: hacktricks/src/pentesting-web/websocket-attacks.md
  ✅ Inserted 39 chunks from hacktricks/src/pentesting-web/websocket-attacks.md

Processing: hacktricks/src/pentesting-web/rsql-injection.md
  ✅ Inserted 68 chunks from hacktricks/src/pentesting-web/rsql-injection.md

Processing: hacktricks/src/pentesting-web/json-xml-yaml-hacking.md
  ✅ Inserted 10 chunks from hacktricks/src/pentesting-web/json-xml-yaml-hacking.md

Processing: hacktricks/src/pentesting-web/dapps-DecentralizedApplications.md
  ✅ Inserted 21 chunks from hacktricks/src/pentesting-web/dapps-DecentralizedApplications.md

Processing: hacktricks/src/pentesting-web/xslt-server-side-injection-extensible-stylesheet-language-transformations.md
  ✅ Inserted 41 chunks from hacktricks/src/pent

'vulcan_rag'

Use OpenAI ChatGPT to generate a response based on the prompts.

In [17]:
results = query_rag_all_collections(
    query="How to do account takeover?",
    context_window=3,
    client=client,
    embedding_model=model# Get 3 chunks before and after
)

# Print results with full context
for i, result in enumerate(results, 1):
    print(f"\n{'='*50}")
    print(result)

🔍 Searching in collection: vulcan_rag

page_content='- Create an account in the third party identity with similar email to the victim using some unicode character (`vićtim@company.com`).
  - The third party provider shouldn't verify the email
  - If the identity provider verifies the email, maybe you can attack the domain part like: `victim@ćompany.com` and register that domain and hope that the identity provider generates the ascii version of the domain while the victim platform normalize the domain name.

- Login via this identity provider in the victim platform who should normalize the unicode character and allow you to access the victim account.

For further details, refer to the document on Unicode Normalization:

{{#ref}}
unicode-injection/unicode-normalization.md
{{#endref}}

## **Reusing Reset Token**

Should the target system allow the **reset link to be reused**, efforts should be made to **find more reset links** using tools such as `gau`, `wayback`, or `scan.io`.

## **Pre 

## Quick Deploy

To learn about how to start an online demo with this tutorial, please refer to [the example application](https://github.com/milvus-io/bootcamp/tree/master/tutorials/quickstart/apps/rag_search_with_milvus).

In [16]:
from typing import List
from langchain.schema import Document  # Or use a simple class if you don't want LangChain

def query_rag_all_collections(
    query: str,
    filter_str: str = "",
    limit: int = 3,
    context_window: int = 2,
    client=None,
    embedding_model=None
) -> List[Document]:
    """
    Search across ALL Milvus collections using RAG with context.
    For each match, retrieves neighboring chunks for richer context.
    Returns a list of LangChain Document-like objects.

    Args:
        query: The user query
        filter_str: Optional metadata filter (e.g., "file_type == 'pdf'")
        limit: Number of nearest neighbors per collection
        context_window: How many neighboring chunks to include before/after
        client: Milvus/Pymilvus client
        embedding_model: SentenceTransformer model for encoding query

    Returns:
        List of Document objects with full context and metadata
    """
    if client is None:
        raise ValueError("Milvus client is required")
    if embedding_model is None:
        raise ValueError("Embedding model is required")

    # 1. Encode the query
    try:
        query_embedding = embedding_model.encode([query], convert_to_tensor=False).tolist()
    except Exception as e:
        raise RuntimeError(f"Failed to encode query: {str(e)}")

    # 2. Get all collections
    try:
        collections = client.list_collections()
    except Exception as e:
        raise RuntimeError(f"Failed to list collections: {str(e)}")

    retrieved_docs = []

    for collection_name in collections:
        print(f"🔍 Searching in collection: {collection_name}")

        try:
            # Perform vector search
            results = client.search(
                collection_name=collection_name,
                data=query_embedding,
                anns_field="embedding",
                filter=filter_str,
                limit=limit,
                output_fields=["text", "source_path", "file_type", "chunk_idx", "doc_id"],
                search_params={"metric_type": "COSINE", "params": {"nprobe": 16}}
            )
        except Exception as e:
            # Try loading the collection first
            try:
                print(f"📦 Loading collection: {collection_name}")
                client.load_collection(collection_name)
                results = client.search(
                    collection_name=collection_name,
                    data=query_embedding,
                    anns_field="embedding",
                    filter=filter_str,
                    limit=limit,
                    output_fields=["text", "source_path", "file_type", "chunk_idx", "doc_id"],
                    search_params={"metric_type": "COSINE", "params": {"nprobe": 16}}
                )
            except Exception as load_error:
                print(f"❌ Failed to search in {collection_name}: {str(load_error)}")
                continue

        # Process each hit in this collection
        for hits in results:
            for hit in hits:
                try:
                    entity = hit.entity
                    doc_id = entity.get("doc_id")
                    chunk_idx = entity.get("chunk_idx")
                    source_path = entity.get("source_path")
                    file_type = entity.get("file_type", "unknown")
                    main_text = entity.get("text")

                    # Define context range
                    start_idx = max(0, chunk_idx - context_window)
                    end_idx = chunk_idx + context_window

                    # Query for context chunks
                    expr = f'doc_id == "{doc_id}" and chunk_idx >= {start_idx} and chunk_idx <= {end_idx}'
                    context_chunks = client.query(
                        collection_name=collection_name,
                        filter=expr,
                        output_fields=["chunk_idx", "text"]
                    )

                    # Sort by chunk index
                    context_chunks.sort(key=lambda x: x['chunk_idx'])

                    # Combine all context text
                    full_context = "\n\n".join([chunk['text'] for chunk in context_chunks])

                    # Create metadata
                    metadata = {
                        "source_collection": collection_name,
                        "source_path": source_path,
                        "file_type": file_type,
                        "relevance_score": 1 - hit.distance,
                        "chunk_position": f"{chunk_idx + 1}/{context_chunks[-1]['chunk_idx'] + 1 if context_chunks else '?'}",
                        "total_context_chunks": len(context_chunks)
                    }

                    # Create Document object
                    doc = Document(
                        page_content=full_context,
                        metadata=metadata
                    )
                    retrieved_docs.append(doc)

                except Exception as e:
                    print(f"⚠️ Error processing hit in {collection_name}: {str(e)}")
                    continue

    return retrieved_docs