# RAG with HuggingFace and Milvus - Student Notebook

In this assignment, you will implement a complete RAG (Retrieval-Augmented Generation) pipeline using:
- **Dataset**: HuggingFace Documentation (`m-ric/huggingface_doc`)
- **Vector Store**: Milvus
- **Embeddings**: BGE-small-en-v1.5
- **LLM**: Microsoft Phi-3-mini-4k-instruct/"Qwen/Qwen2-1.5B-Instruct"
- **Evaluation**: Opik (AnswerRelevance, Hallucination)

## Instructions
1. Read through each section carefully
2. Complete the code in cells marked with `# TODO`
3. Run all cells in order
4. Verify your implementation with the evaluation cells


---

##https://github.com/milvus-io/milvus

## 1. Setup

Install required dependencies and configure environment.

In [None]:
# Install dependencies
!pip install -q pymilvus sentence-transformers datasets transformers torch accelerate opik tqdm

In [None]:
import os
import json
from typing import List, Dict, Tuple
from tqdm import tqdm

# Set your HuggingFace token for model access
# You can get one at: https://huggingface.co/settings/tokens
os.environ["HF_TOKEN"] = "hf_..."  # Replace with your token

# Opik configuration (optional - for generation evaluation)
# Get your API key at: https://www.comet.com/
os.environ["OPIK_API_KEY"] = ""  # Replace with your Opik API key if available

print("Environment configured!")

## 2. Data Loading

Load the HuggingFace documentation dataset.

In [None]:
from datasets import load_dataset

# Load the HuggingFace documentation dataset
dataset = load_dataset("m-ric/huggingface_doc", split="train")

print(f"Dataset loaded with {len(dataset)} documents")
print(f"Columns: {dataset.column_names}")
print(f"\nSample document (first 500 chars):")
print(dataset[0]["text"][:500])

In [None]:
# Extract text and source information
documents = []
for item in dataset:
    documents.append({
        "text": item["text"],
        "source": item["source"]
    })

print(f"Extracted {len(documents)} documents")

# For this assignment, we'll use a subset to keep things manageable
MAX_DOCS = 500
documents = documents[:MAX_DOCS]
print(f"Using {len(documents)} documents for this assignment")

## 3. Chunking

Split documents into smaller chunks for better retrieval.

### Your Task
Implement the `chunk_document` function that:
1. Takes a text string, chunk_size, and chunk_overlap as parameters
2. Splits the text into overlapping chunks of the specified size
3. Returns a list of chunk strings

### Hints
- Use a sliding window approach with step = chunk_size - chunk_overlap
- Handle edge cases: empty text, text shorter than chunk_size
- Make sure each chunk is non-empty before adding it

In [None]:
# ============================================================
# TODO: IMPLEMENT CHUNKING (15 points)
# ============================================================

def chunk_document(text: str, chunk_size: int = 1000, chunk_overlap: int = 200) -> List[str]:
    """
    Split a document into overlapping chunks of fixed size.

    Args:
        text: The document text to chunk
        chunk_size: Maximum size of each chunk in characters
        chunk_overlap: Number of overlapping characters between chunks

    Returns:
        List of text chunks
    """
    chunks = []

    # TODO: Implement fixed-size chunking with overlap
    #
    # Step 1: Handle edge cases (empty text, short text)
    # Hint: If text is empty or shorter than chunk_size, return it as a single chunk

    # Step 2: Calculate step size for sliding window
    # Hint: step = chunk_size - chunk_overlap

    # Step 3: Create chunks using a while loop
    # Hint: Start at position 0, extract chunk_size characters,
    #       then move forward by step size

    # Step 4: Only add non-empty chunks (use .strip() to check)

    # YOUR CODE HERE
    pass

    return chunks


def chunk_all_documents(documents: List[Dict], chunk_size: int = 1000, chunk_overlap: int = 200) -> List[Dict]:
    """
    Chunk all documents and preserve metadata.

    Args:
        documents: List of document dicts with 'text' and 'source' keys
        chunk_size: Maximum chunk size
        chunk_overlap: Overlap between chunks

    Returns:
        List of chunk dicts with 'text', 'source', and 'chunk_id' keys
    """
    all_chunks = []
    chunk_id = 0

    # TODO: Iterate through documents, chunk each one, and add metadata
    #
    # For each document:
    #   1. Get text and source from document dict
    #   2. Call chunk_document() to get chunks
    #   3. For each chunk, create a dict with chunk_id, text, and source
    #   4. Append to all_chunks and increment chunk_id

    # YOUR CODE HERE
    pass

    return all_chunks

In [None]:
# Test your chunking implementation
test_text = "A" * 2500  # 2500 characters
test_chunks = chunk_document(test_text, chunk_size=1000, chunk_overlap=200)

print(f"Test: 2500 char text with chunk_size=1000, overlap=200")
print(f"Expected chunks: ~4")
print(f"Your chunks: {len(test_chunks)}")

if len(test_chunks) >= 3 and len(test_chunks) <= 5:
    print("✅ Chunking test passed!")
else:
    print("❌ Check your chunking implementation")

In [None]:
# Create chunks from all documents
CHUNK_SIZE = 1000
CHUNK_OVERLAP = 200

chunks = chunk_all_documents(documents, CHUNK_SIZE, CHUNK_OVERLAP)

print(f"\nCreated {len(chunks)} chunks from {len(documents)} documents")
print(f"Average chunks per document: {len(chunks) / len(documents):.2f}")

# Show sample chunk
if chunks:
    print(f"\nSample chunk:")
    print(f"  ID: {chunks[0]['chunk_id']}")
    print(f"  Source: {chunks[0]['source']}")
    print(f"  Text (first 200 chars): {chunks[0]['text'][:200]}...")

## 4. Embeddings

Generate vector embeddings for each chunk using BGE-small-en-v1.5.

### Your Task
Implement the `generate_embeddings` function that:
1. Processes texts in batches for memory efficiency
2. Uses the SentenceTransformer model to generate embeddings
3. Returns embeddings as a list of lists (for Milvus compatibility)

### Hints
- Use `model.encode()` with `normalize_embeddings=True` for cosine similarity
- Process in batches to avoid memory issues
- Convert numpy arrays to lists using `.tolist()`

In [None]:
from sentence_transformers import SentenceTransformer

# Load the embedding model
EMBEDDING_MODEL = "BAAI/bge-small-en-v1.5" #Use any model of your choice from Sentence Transformers
embedding_model = SentenceTransformer(EMBEDDING_MODEL)

print(f"Loaded embedding model: {EMBEDDING_MODEL}")

# Test embedding
test_embedding = embedding_model.encode(["This is a test"], normalize_embeddings=True)
EMBEDDING_DIM = len(test_embedding[0])
print(f"Embedding dimension: {EMBEDDING_DIM}")

In [None]:
# ============================================================
# TODO: IMPLEMENT EMBEDDING GENERATION (15 points)
# ============================================================

def generate_embeddings(texts: List[str], model: SentenceTransformer, batch_size: int = 32) -> List[List[float]]:
    """
    Generate embeddings for a list of texts.

    Args:
        texts: List of text strings to embed
        model: SentenceTransformer model
        batch_size: Number of texts to process at once

    Returns:
        List of embedding vectors (as lists of floats)
    """
    all_embeddings = []

    # TODO: Generate embeddings in batches
    #
    # Step 1: Loop through texts in batches of size batch_size
    # Hint: Use range(0, len(texts), batch_size) to get batch start indices

    # Step 2: For each batch, call model.encode() with:
    #   - The batch of texts
    #   - normalize_embeddings=True (important for cosine similarity)
    #   - show_progress_bar=False (we use tqdm at the outer level)

    # Step 3: Convert to list format and extend all_embeddings
    # Hint: Use .tolist() to convert numpy array to Python list

    # YOUR CODE HERE
    pass

    return all_embeddings

In [None]:
# Test your embedding generation
test_texts = ["Hello world", "This is a test", "RAG is cool"]
test_embeddings = generate_embeddings(test_texts, embedding_model)

print(f"Generated {len(test_embeddings)} embeddings")
print(f"Embedding dimension: {len(test_embeddings[0]) if test_embeddings else 0}")

if len(test_embeddings) == 3 and len(test_embeddings[0]) == 384:
    print("✅ Embedding generation test passed!")
else:
    print("❌ Check your embedding implementation")

In [None]:
# Generate embeddings for all chunks
chunk_texts = [chunk["text"] for chunk in chunks]
embeddings = generate_embeddings(chunk_texts, embedding_model)

print(f"\nGenerated {len(embeddings)} embeddings")
if embeddings:
    print(f"Embedding dimension: {len(embeddings[0])}")
    print(f"Sample embedding (first 10 values): {embeddings[0][:10]}")

## 5. Vector Store (Milvus)

Store embeddings in Milvus for efficient similarity search.

### Your Task
1. Implement `setup_milvus_collection` to create a new collection
2. Implement `insert_data_to_milvus` to insert chunks and embeddings

### Hints
- Use `client.has_collection()` to check if collection exists
- Use `client.drop_collection()` to remove existing collection
- Use `client.create_collection()` with dimension and metric_type parameters
- Use `client.insert()` to add data

In [None]:
from pymilvus import MilvusClient

# Initialize Milvus client (uses Milvus Lite - stores data locally)
MILVUS_DB_PATH = "./hf_docs_milvus.db"
milvus_client = MilvusClient(uri=MILVUS_DB_PATH)

COLLECTION_NAME = "hf_documentation"

print(f"Milvus client initialized with database: {MILVUS_DB_PATH}")

In [None]:
# ============================================================
# TODO: IMPLEMENT MILVUS COLLECTION SETUP (10 points)
# ============================================================

def setup_milvus_collection(client: MilvusClient, collection_name: str, embedding_dim: int):
    """
    Create a Milvus collection for storing document embeddings.

    Args:
        client: MilvusClient instance
        collection_name: Name of the collection to create
        embedding_dim: Dimension of the embedding vectors
    """
    # TODO: Create a Milvus collection
    #
    # Step 1: Check if collection already exists using client.has_collection()
    # Step 2: If exists, drop it using client.drop_collection()
    # Step 3: Create new collection using client.create_collection() with:
    #   - collection_name: the name parameter
    #   - dimension: embedding_dim parameter
    #   - metric_type: "IP" (Inner Product for cosine similarity)
    #   - consistency_level: "Strong"

    # YOUR CODE HERE
    pass

    print(f"Created collection: {collection_name} with dimension {embedding_dim}")

In [None]:
# Setup the collection
setup_milvus_collection(milvus_client, COLLECTION_NAME, EMBEDDING_DIM)

In [None]:
# ============================================================
# TODO: IMPLEMENT DATA INSERTION (10 points)
# ============================================================

def insert_data_to_milvus(
    client: MilvusClient,
    collection_name: str,
    chunks: List[Dict],
    embeddings: List[List[float]],
    batch_size: int = 100
):
    """
    Insert document chunks and embeddings into Milvus.

    Args:
        client: MilvusClient instance
        collection_name: Name of the collection
        chunks: List of chunk dictionaries with text and metadata
        embeddings: List of embedding vectors
        batch_size: Number of records to insert at once

    Returns:
        Total number of inserted records
    """
    total_inserted = 0

    # TODO: Insert data into Milvus
    #
    # Step 1: Prepare data as a list of dictionaries, where each dict has:
    #   - "id": chunk["chunk_id"]
    #   - "vector": the corresponding embedding
    #   - "text": chunk["text"]
    #   - "source": chunk["source"]

    # Step 2: Insert in batches using client.insert()
    # Hint: Loop through data in batches and call:
    #   result = client.insert(collection_name=collection_name, data=batch)
    #   total_inserted += result["insert_count"]

    # YOUR CODE HERE
    pass

    return total_inserted

In [None]:
# Insert data into Milvus
inserted_count = insert_data_to_milvus(milvus_client, COLLECTION_NAME, chunks, embeddings)

print(f"\nInserted {inserted_count} records into Milvus")

if inserted_count == len(chunks):
    print("✅ All chunks inserted successfully!")
else:
    print("❌ Not all chunks were inserted. Check your implementation.")

## 6. Retrieval

Implement semantic search to retrieve relevant documents for a query.

### Your Task
Implement the `retrieve_documents` function that:
1. Generates an embedding for the query
2. Searches Milvus for similar vectors
3. Returns the top-K most relevant documents

### Hints
- Use `embedding_model.encode()` to embed the query
- Use `client.search()` to find similar vectors
- Extract text and source from the search results

In [None]:
# ============================================================
# TODO: IMPLEMENT RETRIEVAL (25 points)
# ============================================================

def retrieve_documents(
    query: str,
    client: MilvusClient,
    collection_name: str,
    embedding_model: SentenceTransformer,
    top_k: int = 5
) -> List[Dict]:
    """
    Retrieve the most relevant documents for a query.

    Args:
        query: The search query
        client: MilvusClient instance
        collection_name: Name of the collection to search
        embedding_model: Model to generate query embedding
        top_k: Number of results to return

    Returns:
        List of dictionaries with 'text', 'source', and 'score' keys
    """
    # TODO: Implement semantic search
    #
    # Step 1: Generate embedding for the query
    # Hint: Use embedding_model.encode([query], normalize_embeddings=True)
    #       Then convert to list: .tolist()[0]

    # Step 2: Search in Milvus using client.search()
    # Required parameters:
    #   - collection_name: collection_name
    #   - data: [query_embedding] (list containing the embedding)
    #   - limit: top_k
    #   - search_params: {"metric_type": "IP", "params": {}}
    #   - output_fields: ["text", "source"]

    # Step 3: Format results as list of dicts
    # Each dict should have:
    #   - "text": result["entity"]["text"]
    #   - "source": result["entity"]["source"]
    #   - "score": result["distance"]

    # YOUR CODE HERE
    retrieved_docs = []

    return retrieved_docs

In [None]:
# Test retrieval
test_query = "How do I fine-tune a transformer model?"

retrieved = retrieve_documents(
    query=test_query,
    client=milvus_client,
    collection_name=COLLECTION_NAME,
    embedding_model=embedding_model,
    top_k=3
)

print(f"Query: {test_query}")
print(f"\nRetrieved {len(retrieved)} documents:")
for i, doc in enumerate(retrieved):
    print(f"\n--- Document {i+1} (Score: {doc.get('score', 'N/A')}) ---")
    print(f"Source: {doc.get('source', 'N/A')}")
    print(f"Text: {doc.get('text', 'N/A')[:300]}...")

if len(retrieved) == 3 and all('text' in d for d in retrieved):
    print("\n✅ Retrieval test passed!")
else:
    print("\n❌ Check your retrieval implementation")

## 7. Generation

Generate answers using Microsoft Phi-3-mini-4k-instruct/Qwen.

### Your Task
Implement the `generate_answer` function that:
1. Combines retrieved documents into a context string
2. Formats the prompt using the provided template
3. Generates an answer using the language model
4. Returns a structured result dictionary

### Hints
- Join document texts with newlines to create context
- Use the PROMPT_TEMPLATE.format() to fill in context and question
- Call the generator pipeline with appropriate parameters

### https://huggingface.co/microsoft/Phi-3-mini-4k-instruct

### https://huggingface.co/microsoft/Phi-3.5-mini-instruct

### https://huggingface.co/Qwen/Qwen2-1.5B-Instruct

### FEEL FREE TO USE A PROPRIETARY MODEL LIKE OPENAI, CLAUDE

In [None]:
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
import torch

# Load the language model
LLM_MODEL = #SPECIFY THE MODEL FROM HUGGINGFACE

print(f"Loading model: {LLM_MODEL}")
print("This may take a few minutes...")

tokenizer = AutoTokenizer.from_pretrained(LLM_MODEL, trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(
    LLM_MODEL,
    torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
    device_map="auto",
    trust_remote_code=True
)

# Create text generation pipeline
generator = pipeline(
    "text-generation",
    model=model,
    tokenizer=tokenizer
)

print(f"Model loaded successfully!")

SyntaxError: invalid syntax (ipython-input-373466297.py, line 5)

### MODIFY THIS TO SUIT YOUR MODEL

In [None]:
# Prompt template for RAG (YOU ARE FREE TO MODIFY)
PROMPT_TEMPLATE = """Use the following pieces of information enclosed in <context> tags to provide an answer to the question enclosed in <question> tags.
If the context doesn't contain enough information to answer the question, say "I don't have enough information to answer this question."

<context>
{context}
</context>

<question>
{question}
</question>

Answer:"""

In [None]:
# ============================================================
# TODO: IMPLEMENT GENERATION (25 points)
# ============================================================

def generate_answer(
    query: str,
    retrieved_docs: List[Dict],
    generator: pipeline,
    max_new_tokens: int = 256
) -> Dict:
    """
    Generate an answer using retrieved documents as context.

    Args:
        query: The user's question
        retrieved_docs: List of retrieved document dictionaries
        generator: HuggingFace text generation pipeline
        max_new_tokens: Maximum tokens to generate

    Returns:
        Dictionary with 'answer', 'context', 'query', and 'retrieved_docs'
    """
    # TODO: Generate an answer using the RAG pattern
    #
    # Step 1: Combine retrieved documents into context
    # Hint: Join doc["text"] for each doc with "\n\n" separator

    # Step 2: Format the prompt using PROMPT_TEMPLATE
    # Hint: prompt = PROMPT_TEMPLATE.format(context=context, question=query)

    # Step 3: Generate response using the generator pipeline
    # Call generator() with:
    #   - prompt (first argument)
    #   - max_new_tokens=max_new_tokens
    #   - do_sample=True
    #   - temperature=0.7
    #   - top_p=0.9
    #   - return_full_text=False

    # Step 4: Extract the generated text
    # Hint: outputs[0]["generated_text"].strip()

    # Step 5: Return result dictionary

    # YOUR CODE HERE
    context = ""
    answer = ""

    return {
        "query": query,
        "answer": answer,
        "context": context,
        "retrieved_docs": retrieved_docs
    }

In [None]:
# Test generation
test_query = "How do I fine-tune a transformer model?"

# Retrieve relevant documents
retrieved = retrieve_documents(
    query=test_query,
    client=milvus_client,
    collection_name=COLLECTION_NAME,
    embedding_model=embedding_model,
    top_k=3
)

# Generate answer
result = generate_answer(
    query=test_query,
    retrieved_docs=retrieved,
    generator=generator
)

print(f"Question: {result['query']}")
print(f"\nAnswer: {result['answer']}")

if result['answer'] and len(result['answer']) > 10:
    print("\n✅ Generation test passed!")
else:
    print("\n❌ Check your generation implementation")

In [None]:
# Complete RAG pipeline function (DO NOT MODIFY)

def rag_query(
    query: str,
    client: MilvusClient,
    collection_name: str,
    embedding_model: SentenceTransformer,
    generator: pipeline,
    top_k: int = 5,
    max_new_tokens: int = 256
) -> Dict:
    """
    Complete RAG pipeline: retrieve then generate.
    """
    # Retrieve
    retrieved_docs = retrieve_documents(
        query=query,
        client=client,
        collection_name=collection_name,
        embedding_model=embedding_model,
        top_k=top_k
    )

    # Generate
    result = generate_answer(
        query=query,
        retrieved_docs=retrieved_docs,
        generator=generator,
        max_new_tokens=max_new_tokens
    )

    return result

In [None]:
# Test complete pipeline with multiple queries
test_queries = [
    "What is the Trainer class in transformers?",
    "How do I load a dataset from HuggingFace?",
    "What is Gradio used for?"
]

for query in test_queries:
    print(f"\n{'='*60}")
    result = rag_query(
        query=query,
        client=milvus_client,
        collection_name=COLLECTION_NAME,
        embedding_model=embedding_model,
        generator=generator,
        top_k=3
    )
    print(f"Q: {result['query']}")
    print(f"A: {result['answer']}")