# 🧪 Hands-On Lab: Chunking and Indexing for Retrieval-Augmented Generation (RAG)

## 🔍 Scenario

You are a data engineer working for a health research organization that maintains a large collection of medical guidelines, research papers, and public health protocols published by organizations such as the CDC and WHO. Researchers, clinicians, and analysts frequently need to query this information using natural language questions, but the source documents are lengthy, inconsistently formatted, and often stored as PDFs or scanned files.

To support this use case, your team is building a Retrieval-Augmented Generation (RAG) system on Databricks. The system must ingest medical documents, clean and extract usable text, divide that text into meaningful chunks, and store the results in a form that supports efficient semantic retrieval. The quality of the answers generated by the language model will depend directly on how well the data is prepared, chunked, and indexed.

In this lab, you will simulate a realistic end-to-end data preparation workflow for RAG. You will observe how design choices—such as chunk size, overlap, and noise removal—affect retrieval relevance and completeness. By working through this scenario, you will see how data preparation decisions influence downstream retrieval quality and model behavior.

---

## 🎯 Objective

By the end of this lab, you will be able to:

1. **Extract and clean text** from public medical PDF documents using appropriate parsing or OCR techniques.
2. **Apply sentence-based chunking with overlap** to preserve semantic context.
3. **Convert chunked text into Delta Lake format** with meaningful metadata.
4. **Generate vector embeddings** for document chunks using a hosted embedding endpoint.
5. **Index embedded chunks** using Databricks Vector Search for semantic retrieval.
6. **Execute similarity-based queries** using natural language prompts.
7. **Evaluate how chunk size and granularity influence retrieval precision and relevance** through post-query analysis.
8. **Compare retrieval results before and after introducing chunk overlap** and document how precision and relevance change.

---

### 📚 Learning Alignment

This lab aligns with the learning objective of Chapter 3:

> **"Implement effective data chunking, filtering, and structuring strategies that enhance retrieval quality in RAG pipelines."**


## ⚙️ Step 1: Install Required Libraries

Before we begin building our RAG pipeline, we need to install several Python libraries that will enable us to work with PDFs, interact with Databricks services, and build vector search capabilities.

### 📦 Libraries We're Installing:

- **`pymupdf`**: A powerful PDF parsing library that can extract text from both digital and scanned PDFs. It provides robust text extraction capabilities and can handle complex PDF structures.
- **`databricks-sdk`**: The official Databricks SDK for Python, which provides programmatic access to Databricks REST APIs, including authentication and workspace management.
- **`databricks-vectorsearch`**: The specialized SDK for Databricks Vector Search, enabling us to create, manage, and query vector search indexes for semantic retrieval.

### 🔧 Why These Libraries Matter:

In a production RAG system, you need reliable tools to:
1. **Extract clean text** from various document formats (PDFs, scanned images, etc.)
2. **Authenticate and interact** with cloud services securely
3. **Build and query vector indexes** efficiently for semantic search

### 📝 Installation Notes:

- The `--quiet` flag suppresses verbose installation output
- The `--upgrade` flag ensures you get the latest compatible versions
- You only need to run this once per cluster session
- After installation, we'll restart the Python environment to ensure all packages are properly loaded


In [0]:
# Install required libraries for PDF processing, Databricks SDK, and Vector Search
%pip install --quiet pymupdf databricks-sdk --upgrade databricks-vectorsearch



In [0]:
# Restart Python to load newly installed packages
dbutils.library.restartPython()


## 📄 Step 2: Download Sample Documents to DBFS

Now that we have our libraries installed, we'll download real-world medical documents to work with. These documents represent the type of content that researchers and clinicians need to query in a production RAG system.

### 📚 Documents We're Using:

- **CDC COVID-19 FAQ**: A factsheet from the Centers for Disease Control and Prevention containing frequently asked questions about COVID-19
- **WHO Clinical Guidelines**: Clinical management guidelines from the World Health Organization for COVID-19 treatment

### 🎯 Why These Documents?

These documents are ideal for demonstrating RAG capabilities because they:
1. **Contain dense medical information** that requires precise retrieval
2. **Use domain-specific terminology** that benefits from semantic search
3. **Are publicly available** and represent real-world use cases
4. **Vary in structure and length**, demonstrating the need for robust chunking strategies

### 💾 Storage Location:

We'll save these files to DBFS (Databricks File System) at `/dbfs/FileStore/rag_docs`. DBFS provides:
- **Persistent storage** across cluster restarts
- **Easy access** from both Python and Spark
- **Integration** with Delta Lake and other Databricks services

### 🔍 What to Observe:

Pay attention to how we handle potential download failures and verify successful storage. In production systems, robust error handling is critical.


In [0]:
import os
import requests

# Define DBFS directory for storing downloaded documents
dbfs_path = "/dbfs/FileStore/rag_docs"
os.makedirs(dbfs_path, exist_ok=True)  # Create directory if it doesn't exist

# List of medical documents to download
documents = {
    "cdc_faq.pdf": "https://www.cdc.gov/coronavirus/2019-ncov/downloads/2019-ncov-factsheet.pdf",
    "who_guidelines.pdf": "https://apps.who.int/iris/bitstream/handle/10665/338882/WHO-2019-nCoV-clinical-2021.1-eng.pdf"
}

# Download and save each document to DBFS
for filename, url in documents.items():
    print(f"📥 Downloading {filename}...")
    response = requests.get(url)
    full_path = os.path.join(dbfs_path, filename)
    with open(full_path, "wb") as f:
        f.write(response.content)
    print(f"✅ File saved to {full_path}")

# Verify files are accessible via DBFS
dbutils.fs.ls("dbfs:/FileStore/rag_docs/")





## 🧹 Step 3: Extract and Clean PDF Text

With our documents downloaded, we now need to extract the text content from the PDFs. This is a critical step in the RAG pipeline because the quality of text extraction directly impacts downstream retrieval quality.

### 🔧 Text Extraction Approach:

We're using **PyMuPDF (fitz)** for text extraction because it:
1. **Handles both digital and scanned PDFs** - Can extract text from PDFs created digitally or from scanned images
2. **Preserves text structure** - Maintains paragraph breaks and formatting where possible
3. **Performs efficiently** - Fast processing even for large documents
4. **Supports OCR integration** - Can be extended with OCR libraries like Tesseract for scanned documents

### 🧼 Text Cleaning Strategy:

Raw PDF text often contains noise that can degrade retrieval quality. Our cleaning function:
- **Removes excessive whitespace** - Consolidates multiple spaces and line breaks
- **Strips page headers** - Removes "Page 1", "Page 2" artifacts
- **Normalizes formatting** - Ensures consistent text structure

### 🎯 Why Cleaning Matters:

In RAG systems, noise in the text can:
- **Reduce embedding quality** - Irrelevant tokens dilute semantic meaning
- **Decrease retrieval precision** - Noise can cause false matches
- **Increase storage costs** - Unnecessary tokens consume vector space

### 💡 Production Considerations:

For scanned documents or images, you would extend this with:
- **OCR (Optical Character Recognition)** using libraries like Tesseract or cloud services
- **Image preprocessing** to improve OCR accuracy (deskewing, denoising)
- **Confidence scoring** to filter low-quality OCR results


In [0]:
import os
import fitz  # This is PyMuPDF
import re

# Ensure target directory exists
dbfs_dir = "/dbfs/FileStore/rag_docs"
os.makedirs(dbfs_dir, exist_ok=True)

# Utility: Clean extracted text
def clean_text(text):
    """
    Clean extracted PDF text by removing noise and normalizing formatting.

    Args:
        text (str): Raw text extracted from PDF

    Returns:
        str: Cleaned text ready for chunking
    """
    # Remove excessive line breaks
    text = re.sub(r"\n+", "\n", text)
    # Remove excessive whitespace
    text = re.sub(r"\s{2,}", " ", text)
    # Remove page number artifacts
    text = re.sub(r"Page \d+", "", text, flags=re.IGNORECASE)
    return text.strip()

# Utility: Extract text from PDF using PyMuPDF
def extract_pdf_text(pdf_path):
    """
    Extract all text from a PDF file.

    Args:
        pdf_path (str): Path to the PDF file

    Returns:
        str: Cleaned text from all pages
    """
    doc = fitz.open(pdf_path)
    full_text = ""
    for page in doc:
        full_text += page.get_text()
    return clean_text(full_text)

# Extract and clean both documents
cdc_pdf_path = "/dbfs/FileStore/rag_docs/cdc_faq.pdf"
who_pdf_path = "/dbfs/FileStore/rag_docs/who_guidelines.pdf"

print("📄 Extracting text from CDC FAQ...")
cdc_text = extract_pdf_text(cdc_pdf_path)
print(f"   Extracted {len(cdc_text)} characters")

print("📄 Extracting text from WHO Guidelines...")
who_text = extract_pdf_text(who_pdf_path)
print(f"   Extracted {len(who_text)} characters")

# Save cleaned text files to DBFS for inspection and reuse
cdc_txt_path = "/dbfs/FileStore/rag_docs/cdc_faq_clean.txt"
who_txt_path = "/dbfs/FileStore/rag_docs/who_guidelines_clean.txt"

with open(cdc_txt_path, "w", encoding="utf-8") as f:
    f.write(cdc_text)

with open(who_txt_path, "w", encoding="utf-8") as f:
    f.write(who_text)

print("\n✅ Text extracted and saved as:")
print(f"   - {cdc_txt_path}")
print(f"   - {who_txt_path}")
print(f"\n💡 Tip: You can inspect these files to verify text quality before chunking")



## ✂️ Step 4: Apply Sentence-Level Chunking with Overlap

Now we arrive at one of the most critical steps in RAG: **chunking**. How we divide our documents into chunks directly impacts retrieval quality and the relevance of answers generated by the LLM.

### 🎯 Chunking Strategy:

We're using **sentence-based chunking with overlap** because:

1. **Preserves semantic coherence** - Sentences are natural semantic units
2. **Maintains context** - Overlap ensures important information isn't lost at chunk boundaries
3. **Optimizes for retrieval** - Chunks are sized to match typical query scope
4. **Balances precision and recall** - Not too small (fragmented) or too large (diluted)

### 📏 Chunk Parameters:

- **Chunk size: 200 words** - Large enough to contain complete thoughts, small enough for focused retrieval
- **Overlap: 50 words** - Ensures context continuity across chunk boundaries

### 🔍 Why Overlap Matters:

Consider this example:
```
Chunk 1 (no overlap): "...patients should isolate for 5 days."
Chunk 2 (no overlap): "After isolation, wear a mask for 5 additional days."
```

Without overlap, a query about "total isolation period" might miss the complete answer. With overlap:
```
Chunk 1: "...patients should isolate for 5 days. After isolation, wear..."
Chunk 2: "...isolate for 5 days. After isolation, wear a mask for 5 additional days."
```

Both chunks now contain the complete context!

### 📊 Metadata We're Capturing:

For each chunk, we store:
- **source**: Which document it came from (traceability)
- **chunk_id**: Sequential identifier (ordering)
- **text**: The actual chunk content (for retrieval)

This metadata enables:
- **Source attribution** in answers
- **Chunk reconstruction** if needed
- **Quality analysis** and debugging

### 💾 Storage Format:

We're saving to **Delta Lake** because it provides:
- **ACID transactions** - Reliable writes
- **Time travel** - Version history
- **Schema enforcement** - Data quality
- **Efficient updates** - For incremental processing

### 📝 Technical Note:

We use **NLTK's sentence tokenizer** for intelligent sentence boundary detection. The code downloads both `punkt_tab` (for newer NLTK versions 3.9+) and `punkt` (for older versions) to ensure compatibility across different environments.


In [0]:
import os
import pandas as pd
import nltk

# Download NLTK tokenizer data (punkt_tab for newer NLTK versions, punkt for older)
try:
    nltk.download("punkt_tab", quiet=True)
except:
    pass
nltk.download("punkt", quiet=True)

from nltk.tokenize import sent_tokenize

# Define file paths
dbfs_dir = "/dbfs/FileStore/rag_docs"
cdc_txt_path = os.path.join(dbfs_dir, "cdc_faq_clean.txt")
who_txt_path = os.path.join(dbfs_dir, "who_guidelines_clean.txt")
delta_output_path = "/tmp/rag_chunks"  # Spark will write Delta here

# Load cleaned text
with open(cdc_txt_path, "r", encoding="utf-8") as f:
    cdc_text = f.read()

with open(who_txt_path, "r", encoding="utf-8") as f:
    who_text = f.read()

# Document metadata structure
documents = [
    {"source": "cdc_faq", "text": cdc_text},
    {"source": "who_guidelines", "text": who_text}
]

# Define chunking logic with overlap
def chunk_text(text, chunk_size=200, overlap=50):
    """
    Split text into overlapping chunks based on sentence boundaries.

    Args:
        text (str): Input text to chunk
        chunk_size (int): Target chunk size in words
        overlap (int): Number of words to overlap between chunks

    Returns:
        list: List of text chunks
    """
    sentences = sent_tokenize(text)
    chunks = []
    current_chunk = []
    current_length = 0

    for sentence in sentences:
        words = sentence.split()
        sentence_len = len(words)

        # If adding this sentence exceeds chunk size, save current chunk
        if current_length + sentence_len > chunk_size:
            chunks.append(" ".join(current_chunk))
            # Keep last 'overlap' words for context continuity
            current_chunk = current_chunk[-overlap:] if overlap else []
            current_length = sum(len(s.split()) for s in current_chunk)

        current_chunk.extend(words)
        current_length += sentence_len

    # Add the final chunk if it exists
    if current_chunk:
        chunks.append(" ".join(current_chunk))

    return chunks

# Apply chunking to all documents
all_chunks = []
for doc in documents:
    chunks = chunk_text(doc["text"], chunk_size=200, overlap=50)
    for i, chunk in enumerate(chunks):
        all_chunks.append({
            "source": doc["source"],
            "chunk_id": i,
            "text": chunk
        })

print(f"📊 Chunking Statistics:")
print(f"   Total chunks created: {len(all_chunks)}")
print(f"   CDC chunks: {sum(1 for c in all_chunks if c['source'] == 'cdc_faq')}")
print(f"   WHO chunks: {sum(1 for c in all_chunks if c['source'] == 'who_guidelines')}")

# Convert to Spark DataFrame and write to Delta
chunk_df = pd.DataFrame(all_chunks)
spark_df = spark.createDataFrame(chunk_df)

spark_df.write.mode("overwrite").format("delta").save(delta_output_path)

print(f"\n✅ Chunking complete and Delta table saved at: {delta_output_path}")
print(f"💡 Tip: You can query this Delta table to inspect chunk quality")



## 🧠 Step 5: Create an Embedding Endpoint in Databricks

To convert our text chunks into vector embeddings, we need a **pretrained embedding model** hosted as a **Databricks Model Serving endpoint**. Embeddings are dense vector representations that capture the semantic meaning of text, enabling similarity-based retrieval.

### 🎯 Why Embeddings Matter:

Traditional keyword search matches exact words, but embeddings enable **semantic search**:
- Query: "How to prevent COVID transmission?"
- Keyword match: Might miss "infection control measures"
- Semantic match: Finds related concepts even with different wording

### 🔧 Embedding Model Selection:

We're using **databricks-bge-large-en** because it:
1. **Optimized for retrieval** - Specifically trained for semantic search tasks
2. **High-quality embeddings** - 1024-dimensional vectors capture nuanced meaning
3. **Production-ready** - Hosted and managed by Databricks
4. **Cost-effective** - Pay-per-token pricing with auto-scaling

### 📋 Steps to Create the Embedding Endpoint:

1. **Navigate to Serving** in your Databricks workspace:
   - Go to: `Workspace → Machine Learning → Serving Endpoints`

2. **Click "Create Endpoint"**

3. **Configure the endpoint**:
   - **Name**: `databricks-bge-large-en`
   - **Served Model**: Select `databricks-bge-large-en`
   - **Task**: `embedding`
   - **Model Version**: Use the latest available

4. **Important Configuration**:
   - **Disable "Scale to zero"** - Prevents timeout on first query
   - Endpoints can take 2-3 minutes to warm up from cold start
   - For production, keep endpoint warm for consistent latency

5. **Wait for status**: Endpoint must show **ONLINE** before proceeding

### 💡 Production Considerations:

- **GPU vs CPU**: Use GPU endpoints for large-scale embedding generation
- **Batch size**: Larger batches improve throughput but increase latency
- **Monitoring**: Track endpoint metrics (latency, throughput, errors)
- **Cost optimization**: Balance between performance and cost based on query volume

### 🔍 Alternative Models:

Depending on your use case, you might consider:
- **Multilingual models**: For non-English content
- **Domain-specific models**: Fine-tuned for medical, legal, or technical domains
- **Smaller models**: For cost optimization with acceptable quality trade-offs


## 🔢 Step 6: Generate Embeddings for Chunked Text

Now that we have a running embedding endpoint, we'll convert each chunk of text into a high-dimensional vector representation. This is the step that enables semantic search in our RAG system.

### 🎯 What We're Doing:

We're creating a **Pandas UDF** (User-Defined Function) that:
1. **Processes chunks in batches** - More efficient than one-at-a-time
2. **Calls the embedding endpoint** - Uses Databricks REST API
3. **Handles errors gracefully** - Ensures robustness in production
4. **Returns vector arrays** - Compatible with Vector Search

### 🔧 Technical Implementation:

The UDF uses an **Iterator pattern** for memory efficiency:
- Processes data in batches to avoid loading everything into memory
- Sends batches of 10 texts at a time to the endpoint
- Validates and converts embeddings to float32 format

### 🔐 Authentication:

We use the **notebook context API token** for secure authentication:
- Token is automatically available in Databricks notebooks
- No need to hardcode credentials
- Follows security best practices

### ⚡ Performance Optimization:

- **Batch processing**: Reduces API call overhead
- **Text filtering**: Skips empty or very short texts
- **Error handling**: Provides clear error messages for debugging

### 💡 Important Notes:

- The endpoint URL will be specific to your workspace
- Update the `endpoint_url` variable with your workspace URL
- The embedding dimension for BGE-large-en is 1024
- Processing time depends on the number of chunks and endpoint capacity


In [0]:
import requests
import json
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import ArrayType, FloatType
import pandas as pd
from typing import Iterator

# Databricks REST endpoint and token
# IMPORTANT: Update this URL with your workspace URL
endpoint_url = "https://adb-YOUR-WORKSPACE-ID.azuredatabricks.net/serving-endpoints/databricks-bge-large-en/invocations"
token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()

headers = {
    "Authorization": f"Bearer {token}",
    "Content-Type": "application/json"
}

@pandas_udf(ArrayType(FloatType()))
def get_embeddings_udf(texts: Iterator[pd.Series]) -> Iterator[pd.Series]:
    """
    Generate embeddings for text chunks using Databricks embedding endpoint.

    This UDF processes text in batches for efficiency and handles errors gracefully.

    Args:
        texts: Iterator of pandas Series containing text chunks

    Yields:
        pandas Series containing embedding vectors (arrays of floats)
    """
    for batch in texts:
        try:
            # Clean and filter the batch
            clean_batch = batch.dropna().astype(str)
            clean_batch = clean_batch[clean_batch.str.len() > 10]  # Skip very short texts

            # Process in smaller sub-batches to avoid API limits
            max_batch_size = 10
            chunks = [clean_batch[i:i+max_batch_size] for i in range(0, len(clean_batch), max_batch_size)]
            all_embeddings = []

            for chunk in chunks:
                # Prepare API request payload
                payload = {"input": chunk.tolist()}
                response = requests.post(endpoint_url, headers=headers, json=payload)
                response.raise_for_status()

                # Extract embeddings from response
                # The response format is: {"data": [{"embedding": [...]}, ...]}
                vectors = [item["embedding"] for item in response.json().get("data", [])]
                all_embeddings.extend(vectors)

            # Ensure conversion to float32 arrays for Vector Search compatibility
            import numpy as np
            validated = pd.Series([np.array(vec, dtype=np.float32).tolist() for vec in all_embeddings])
            yield validated

        except Exception as e:
            print(f"❌ UDF embedding failure: {str(e)}")
            raise e

print("✅ Embedding UDF defined successfully")
print("💡 This UDF will be applied to the text column in the next step")



## 💾 Step 7: Apply Embeddings and Save to Delta

Now we'll apply our embedding UDF to the chunked text and save the results. This creates a table where each chunk has both its original text and its vector embedding.

### 🎯 What This Step Does:

1. **Loads the chunked data** from our Delta table
2. **Applies the embedding UDF** to generate vectors for each chunk
3. **Adds an embedding column** to the DataFrame
4. **Saves the enriched data** back to Delta Lake

### 📊 Resulting Schema:

After this step, each row will contain:
- `source`: Document origin (cdc_faq or who_guidelines)
- `chunk_id`: Sequential identifier
- `text`: The actual text content
- `embedding`: 1024-dimensional vector (array of floats)

### ⚡ Performance Notes:

- Processing time depends on the number of chunks and endpoint capacity
- The UDF processes data in parallel across Spark partitions
- Monitor the embedding endpoint for any throttling or errors
- For large datasets, consider increasing endpoint capacity

### 💡 Why Save Embeddings:

Storing embeddings separately allows us to:
- **Reuse embeddings** without regenerating them
- **Version control** embedding changes
- **Analyze embedding quality** through inspection
- **Support multiple indexes** from the same embeddings


In [0]:
# Load chunks from previous step
chunk_df = spark.read.format("delta").load("/tmp/rag_chunks")

print(f"📊 Loaded {chunk_df.count()} chunks from Delta table")
print("🔄 Generating embeddings... (this may take a few minutes)")

# Apply embedding UDF to generate vectors
embedded_df = chunk_df.withColumn("embedding", get_embeddings_udf("text"))

print("✅ Embeddings generated successfully")

# Save to new Delta location
embedded_df.write.mode("overwrite").format("delta").save("/tmp/rag_chunks_embedded")

print(f"✅ Embedded chunks saved at /tmp/rag_chunks_embedded")
print(f"💡 Each chunk now has a 1024-dimensional embedding vector")

# Display sample to verify
print("\n📋 Sample of embedded data:")
embedded_df.select("source", "chunk_id", "text").show(3, truncate=50)


## 🧭 Step 8: Create a Vector Search Endpoint

Before we can create a vector search index, we need a **Vector Search endpoint**. This is the compute infrastructure that will serve our similarity search queries.

### 🎯 What is a Vector Search Endpoint?

A Vector Search endpoint is:
- **Managed infrastructure** for hosting vector indexes
- **Auto-scaling compute** that handles query load
- **Optimized for similarity search** using approximate nearest neighbor algorithms
- **Integrated with Unity Catalog** for governance and access control

### 🔧 Endpoint Types:

1. **STANDARD**: General-purpose, good for most use cases
2. **STORAGE_OPTIMIZED**: Better for very large indexes (millions of vectors)

For this lab, we're using **STANDARD** which is suitable for our document collection size.

### ⚡ Endpoint Lifecycle:

- **Creating**: Endpoint is being provisioned
- **ONLINE**: Ready to serve queries
- **OFFLINE**: Temporarily unavailable
- **FAILED**: Provisioning failed (check logs)

### 💡 Production Considerations:

- **Budget policies**: Set spending limits to control costs
- **Access control**: Use ACLs to manage who can query the endpoint
- **Monitoring**: Track query latency and throughput
- **High availability**: Consider multiple endpoints for critical applications


In [0]:
from databricks.vector_search.client import VectorSearchClient
import time

VECTOR_SEARCH_ENDPOINT_NAME = "orielly-chapter2-endpoint"
vsc = VectorSearchClient(disable_notice=True)

def endpoint_exists(vsc, endpoint_name):
    """Check if a vector search endpoint already exists."""
    try:
        vsc.get_endpoint(endpoint_name)
        return True
    except Exception as e:
        if "NOT_FOUND" in str(e) or "does not exist" in str(e):
            return False
        raise e

def wait_for_vs_endpoint_to_be_ready(vsc, endpoint_name, timeout=700, poll_interval=15):
    """Wait for vector search endpoint to become ONLINE."""
    start_time = time.time()
    while True:
        try:
            status = vsc.get_endpoint(endpoint_name).get("endpoint_status", {}).get("state", "")
            print(f"⏳ Endpoint status: {status}")
            if status == "ONLINE":
                print(f"✅ Vector Search endpoint '{endpoint_name}' is ready.")
                break
        except Exception as e:
            print(f"⚠️ Failed to get endpoint status: {e}")

        if time.time() - start_time > timeout:
            raise TimeoutError(f"❌ Timeout: Endpoint '{endpoint_name}' was not ready after {timeout} seconds.")
        time.sleep(poll_interval)

# Create endpoint if it doesn't exist
if not endpoint_exists(vsc, VECTOR_SEARCH_ENDPOINT_NAME):
    print(f"🚀 Creating Vector Search endpoint: {VECTOR_SEARCH_ENDPOINT_NAME}")
    vsc.create_endpoint(name=VECTOR_SEARCH_ENDPOINT_NAME, endpoint_type="STANDARD")
    time.sleep(5)  # Allow time for provisioning to start
else:
    print(f"ℹ️ Vector Search endpoint '{VECTOR_SEARCH_ENDPOINT_NAME}' already exists.")

# Wait for endpoint to be ready
wait_for_vs_endpoint_to_be_ready(vsc, VECTOR_SEARCH_ENDPOINT_NAME)


## 🗂️ Step 8.1: Register Embedded Delta Table

To enable querying and integration with Vector Search, we'll register our embedded chunks as a managed table in Unity Catalog.

### 🎯 Why Register as a Table?

Registering the Delta data as a table provides:
- **SQL access**: Query embeddings using standard SQL
- **Governance**: Unity Catalog tracks lineage and access
- **Discoverability**: Other users can find and use the data
- **Integration**: Seamless connection with Vector Search

### 📋 Table Location:

We're registering the table as: `main.default.rag_chunks_embedded`
- **Catalog**: `main` (default Unity Catalog)
- **Schema**: `default` (default schema)
- **Table**: `rag_chunks_embedded` (our embedded chunks)

### 💡 Best Practices:

In production, you would:
- Use a dedicated catalog for RAG data
- Create schemas by project or domain
- Apply appropriate access controls
- Document table purpose and schema


In [0]:
# Register embedded chunks as a managed table
print("📝 Registering embedded chunks as Unity Catalog table...")

spark.read.format("delta").load("/tmp/rag_chunks_embedded").write.mode("overwrite").saveAsTable("main.default.rag_chunks_embedded")

print("✅ Table registered as: main.default.rag_chunks_embedded")
print("💡 You can now query this table using SQL or DataFrame API")

# Verify registration
table_info = spark.sql("DESCRIBE EXTENDED main.default.rag_chunks_embedded")
print("\n📊 Table schema:")
table_info.select("col_name", "data_type").show(truncate=False)



## 🧠 Step 9: Create and Sync a Vector Search Index

Now we'll create the actual vector search index that will enable fast similarity-based retrieval. This is where all our preparation comes together!

### 🎯 What is a Vector Search Index?

A vector search index is a specialized data structure that:
- **Stores vector embeddings** in an optimized format
- **Enables fast similarity search** using approximate nearest neighbor (ANN) algorithms
- **Automatically syncs** with the source Delta table (for Delta Sync indexes)
- **Scales efficiently** to millions of vectors

### 🔧 Index Configuration:

We're creating a **Delta Sync Index** with these parameters:
- **endpoint_name**: The vector search endpoint we created earlier
- **index_name**: Full three-level name (catalog.schema.index)
- **source_table_name**: The Delta table containing our embeddings
- **pipeline_type**: `TRIGGERED` (manual sync) vs `CONTINUOUS` (auto-sync)
- **primary_key**: Unique identifier for each chunk
- **embedding_source_column**: Text column to embed
- **embedding_model_endpoint_name**: Model to use for embedding

### 📊 Delta Sync vs Direct Access:

**Delta Sync Index** (what we're using):
- ✅ Automatically syncs with source table
- ✅ Handles incremental updates efficiently
- ✅ Easier to manage and maintain
- ✅ Best for most use cases

**Direct Access Index**:
- Manual updates via API
- More control over index content
- Useful for custom workflows

### ⚡ Sync Modes:

**TRIGGERED** (what we're using):
- Manual sync via API or UI
- Lower cost (no continuous compute)
- Good for batch updates
- Sync on-demand when data changes

**CONTINUOUS**:
- Auto-syncs within seconds
- Higher cost (dedicated compute)
- Best for real-time applications
- Requires Change Data Feed enabled

### 💡 Change Data Feed (CDF):

We enable CDF on the source table to track changes:
- Captures inserts, updates, and deletes
- Enables incremental sync (only process changes)
- Required for Delta Sync indexes
- Minimal storage overhead


In [0]:
from databricks.vector_search.client import VectorSearchClient
import time

# Configuration
catalog = "main"
schema = "default"
table = "rag_chunks_embedded"
index = "rag_chunks_index"

VECTOR_SEARCH_ENDPOINT_NAME = "orielly-chapter2-endpoint"
EMBEDDING_ENDPOINT_NAME = "databricks-bge-large-en"

source_table_fullname = f"{catalog}.{schema}.{table}"
vs_index_fullname = f"{catalog}.{schema}.{index}"

# Initialize Vector Search Client
vsc = VectorSearchClient(disable_notice=True)

# Enable Change Data Feed (required for Delta Sync)
print("🔧 Enabling Change Data Feed on source table...")
try:
    spark.sql(f"ALTER TABLE {source_table_fullname} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")
    print(f"✅ CDF enabled on {source_table_fullname}")
except Exception as e:
    print(f"⚠️ Could not enable CDF (may already be enabled): {e}")

# Check if index already exists
def index_exists(vsc, endpoint, index_name):
    """Check if a vector search index already exists."""
    try:
        vsc.get_index(endpoint_name=endpoint, index_name=index_name)
        return True
    except Exception as e:
        if "NOT_FOUND" in str(e) or "does not exist" in str(e):
            return False
        raise e

# Create index if it doesn't exist
if not index_exists(vsc, VECTOR_SEARCH_ENDPOINT_NAME, vs_index_fullname):
    print(f"🚀 Creating index {vs_index_fullname} on endpoint {VECTOR_SEARCH_ENDPOINT_NAME}...")
    vsc.create_delta_sync_index(
        endpoint_name=VECTOR_SEARCH_ENDPOINT_NAME,
        index_name=vs_index_fullname,
        source_table_name=source_table_fullname,
        pipeline_type="TRIGGERED",  # Manual sync mode
        primary_key="chunk_id",     # Unique identifier for each chunk
        embedding_source_column="text",  # Column with text to embed
        embedding_model_endpoint_name=EMBEDDING_ENDPOINT_NAME  # Embedding model to use
    )
    print("✅ Index created successfully")
else:
    print(f"ℹ️ Index {vs_index_fullname} already exists")

# Wait until index is ready
print(f"⏳ Waiting for index {vs_index_fullname} to be ready...")
index_obj = vsc.get_index(endpoint_name=VECTOR_SEARCH_ENDPOINT_NAME, index_name=vs_index_fullname)
index_obj.wait_until_ready()
print(f"✅ Index '{vs_index_fullname}' is ready for queries")

# Manually sync the index
print(f"🔄 Syncing index with latest data...")
index_obj.sync()
print(f"✅ Index {vs_index_fullname} synced successfully")
print(f"💡 The index is now ready for semantic search queries!")


## 🔍 Step 10: Perform Semantic Search Queries

Now for the exciting part - let's query our vector search index using natural language! This demonstrates the power of semantic search in RAG systems.

### 🎯 How Semantic Search Works:

1. **User asks a question** in natural language
2. **Question is embedded** using the same model as our chunks
3. **Vector similarity** is computed between question and all chunks
4. **Top-k most similar chunks** are returned
5. **Results are ranked** by similarity score

### 🔧 Query Parameters:

- **query_text**: The natural language question
- **columns**: Which columns to return from the index
- **num_results**: How many top results to retrieve (k)

### 📊 Understanding Results:

Each result contains:
- **chunk_id**: Unique identifier
- **source**: Which document it came from
- **text**: The actual chunk content
- **score**: Similarity score (higher = more relevant)

### 💡 Query Tips:

- **Be specific**: "What are COVID-19 symptoms?" vs "COVID"
- **Use natural language**: Write like you're asking a person
- **Adjust num_results**: More results = better coverage, but may include less relevant chunks
- **Check sources**: Verify which documents are being retrieved


In [0]:
from databricks.vector_search.client import VectorSearchClient

# Example question for RAG
question = "What are the symptoms of COVID-19?"

# Define index location
catalog = "main"
schema = "default"
index = "rag_chunks_index"

VECTOR_SEARCH_ENDPOINT_NAME = "orielly-chapter2-endpoint"
vs_index_fullname = f"{catalog}.{schema}.{index}"

# Initialize client
vsc = VectorSearchClient(disable_notice=True)

print(f"🔍 Searching for: '{question}'")
print(f"📊 Retrieving top 5 most relevant chunks...\n")

# Perform similarity search
results = vsc.get_index(
    endpoint_name=VECTOR_SEARCH_ENDPOINT_NAME,
    index_name=vs_index_fullname
).similarity_search(
    query_text=question,
    columns=["chunk_id", "source", "text"],
    num_results=5
)

# Extract and display results
docs = results.get("result", {}).get("data_array", [])

if not docs:
    print("❌ No results found. Check if the index is properly synced.")
else:
    print(f"✅ Found {len(docs)} relevant chunks:\n")
    for i, row in enumerate(docs, start=1):
        print(f"{'='*80}")
        print(f"🔹 Result {i}")
        print(f"{'='*80}")
        print(f"📄 Source: {row[1]}")
        print(f"🆔 Chunk ID: {row[0]}")
        print(f"📝 Text:\n{row[2][:300]}...")  # Show first 300 characters
        print()

print("💡 These chunks would be passed to an LLM to generate the final answer")



## 🔀 Step 11: Compare Retrieval With and Without Chunk Overlap

One of the key objectives of this lab is to understand how chunk overlap affects retrieval quality. Let's create chunks **without overlap** and compare the results!

### 🎯 Experiment Design:

We'll create two sets of chunks from the same documents:
1. **Without overlap** (overlap=0) - Chunks are completely independent
2. **With overlap** (overlap=50) - Chunks share 50 words with neighbors

Then we'll query both indexes with the same question and compare:
- **Precision**: Are the retrieved chunks relevant?
- **Completeness**: Do we get the full context needed to answer?
- **Redundancy**: How much duplicate information is retrieved?

### 📊 Why This Matters:

Overlap is a critical design decision in RAG systems:
- **Too little overlap**: Risk losing context at chunk boundaries
- **Too much overlap**: Increased storage and redundant retrieval
- **Optimal overlap**: Balances context preservation with efficiency

### 🔬 What We're Testing:

Consider a query like: "What is the recommended isolation period?"

**Without overlap**, the answer might be split:
- Chunk A: "...patients should isolate for 5 days."
- Chunk B: "After isolation, wear a mask for 5 additional days."

**With overlap**, both chunks contain the complete context:
- Chunk A: "...patients should isolate for 5 days. After isolation, wear..."
- Chunk B: "...isolate for 5 days. After isolation, wear a mask for 5 additional days."


In [0]:
import pandas as pd
from nltk.tokenize import sent_tokenize

print("🔬 Creating chunks WITHOUT overlap for comparison...")

# Load the same cleaned text
with open("/dbfs/FileStore/rag_docs/cdc_faq_clean.txt", "r", encoding="utf-8") as f:
    cdc_text = f.read()

with open("/dbfs/FileStore/rag_docs/who_guidelines_clean.txt", "r", encoding="utf-8") as f:
    who_text = f.read()

documents = [
    {"source": "cdc_faq", "text": cdc_text},
    {"source": "who_guidelines", "text": who_text}
]

# Create chunks WITHOUT overlap (overlap=0)
all_chunks_no_overlap = []
for doc in documents:
    chunks = chunk_text(doc["text"], chunk_size=200, overlap=0)  # No overlap!
    for i, chunk in enumerate(chunks):
        all_chunks_no_overlap.append({
            "source": doc["source"],
            "chunk_id": i,
            "text": chunk
        })

print(f"📊 Chunks WITHOUT overlap: {len(all_chunks_no_overlap)}")

# Save to Delta
chunk_df_no_overlap = pd.DataFrame(all_chunks_no_overlap)
spark_df_no_overlap = spark.createDataFrame(chunk_df_no_overlap)
spark_df_no_overlap.write.mode("overwrite").format("delta").save("/tmp/rag_chunks_no_overlap")

print("✅ No-overlap chunks saved to Delta")

# Generate embeddings for no-overlap chunks
print("🔄 Generating embeddings for no-overlap chunks...")
chunk_df_loaded = spark.read.format("delta").load("/tmp/rag_chunks_no_overlap")
embedded_df_no_overlap = chunk_df_loaded.withColumn("embedding", get_embeddings_udf("text"))
embedded_df_no_overlap.write.mode("overwrite").format("delta").save("/tmp/rag_chunks_no_overlap_embedded")

print("✅ No-overlap chunks embedded and saved")

# Register as table
spark.read.format("delta").load("/tmp/rag_chunks_no_overlap_embedded").write.mode("overwrite").saveAsTable("main.default.rag_chunks_no_overlap_embedded")

print("✅ Table registered: main.default.rag_chunks_no_overlap_embedded")


## 📊 Step 11.1: Create Index for No-Overlap Chunks

Now let's create a separate vector search index for the no-overlap chunks so we can compare retrieval results.


In [0]:
from databricks.vector_search.client import VectorSearchClient

# Configuration for no-overlap index
catalog = "main"
schema = "default"
table_no_overlap = "rag_chunks_no_overlap_embedded"
index_no_overlap = "rag_chunks_no_overlap_index"

VECTOR_SEARCH_ENDPOINT_NAME = "orielly-chapter2-endpoint"
EMBEDDING_ENDPOINT_NAME = "databricks-bge-large-en"

source_table_no_overlap = f"{catalog}.{schema}.{table_no_overlap}"
vs_index_no_overlap = f"{catalog}.{schema}.{index_no_overlap}"

vsc = VectorSearchClient(disable_notice=True)

# Enable CDF
print("🔧 Enabling CDF on no-overlap table...")
try:
    spark.sql(f"ALTER TABLE {source_table_no_overlap} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")
    print("✅ CDF enabled")
except Exception as e:
    print(f"⚠️ CDF may already be enabled: {e}")

# Create index
if not index_exists(vsc, VECTOR_SEARCH_ENDPOINT_NAME, vs_index_no_overlap):
    print(f"🚀 Creating no-overlap index...")
    vsc.create_delta_sync_index(
        endpoint_name=VECTOR_SEARCH_ENDPOINT_NAME,
        index_name=vs_index_no_overlap,
        source_table_name=source_table_no_overlap,
        pipeline_type="TRIGGERED",
        primary_key="chunk_id",
        embedding_source_column="text",
        embedding_model_endpoint_name=EMBEDDING_ENDPOINT_NAME
    )
    print("✅ No-overlap index created")
else:
    print(f"ℹ️ No-overlap index already exists")

# Wait and sync
print("⏳ Waiting for no-overlap index to be ready...")
index_obj_no_overlap = vsc.get_index(endpoint_name=VECTOR_SEARCH_ENDPOINT_NAME, index_name=vs_index_no_overlap)
index_obj_no_overlap.wait_until_ready()
print("✅ No-overlap index ready")

index_obj_no_overlap.sync()
print("✅ No-overlap index synced")


## 🔍 Step 11.2: Compare Retrieval Results

Now let's query both indexes with the same question and compare the results!

### 🎯 Comparison Metrics:

1. **Relevance**: Do the chunks contain information relevant to the query?
2. **Completeness**: Is the full answer present in the retrieved chunks?
3. **Context Quality**: Do chunks have enough surrounding context?
4. **Redundancy**: How much information is duplicated across chunks?


In [0]:
from databricks.vector_search.client import VectorSearchClient
import pandas as pd

# Test question
test_question = "What is the recommended isolation period for COVID-19?"

vsc = VectorSearchClient(disable_notice=True)

print(f"🔍 Query: '{test_question}'")
print(f"\n{'='*80}")
print("📊 COMPARISON: WITH OVERLAP vs WITHOUT OVERLAP")
print(f"{'='*80}\n")

# Query WITH overlap index
print("🔹 Results WITH OVERLAP (50 words):")
print("-" * 80)
results_with_overlap = vsc.get_index(
    endpoint_name="orielly-chapter2-endpoint",
    index_name="main.default.rag_chunks_index"
).similarity_search(
    query_text=test_question,
    columns=["chunk_id", "source", "text"],
    num_results=3
)

docs_with_overlap = results_with_overlap.get("result", {}).get("data_array", [])
for i, row in enumerate(docs_with_overlap, start=1):
    print(f"\nResult {i} (Chunk ID: {row[0]}, Source: {row[1]}):")
    print(f"{row[2][:200]}...")

# Query WITHOUT overlap index
print(f"\n\n🔹 Results WITHOUT OVERLAP:")
print("-" * 80)
results_no_overlap = vsc.get_index(
    endpoint_name="orielly-chapter2-endpoint",
    index_name="main.default.rag_chunks_no_overlap_index"
).similarity_search(
    query_text=test_question,
    columns=["chunk_id", "source", "text"],
    num_results=3
)

docs_no_overlap = results_no_overlap.get("result", {}).get("data_array", [])
for i, row in enumerate(docs_no_overlap, start=1):
    print(f"\nResult {i} (Chunk ID: {row[0]}, Source: {row[1]}):")
    print(f"{row[2][:200]}...")

# Analysis
print(f"\n\n{'='*80}")
print("📈 ANALYSIS")
print(f"{'='*80}\n")

# Calculate average chunk length
avg_len_with = sum(len(row[2]) for row in docs_with_overlap) / len(docs_with_overlap) if docs_with_overlap else 0
avg_len_without = sum(len(row[2]) for row in docs_no_overlap) / len(docs_no_overlap) if docs_no_overlap else 0

print(f"Average chunk length WITH overlap: {avg_len_with:.0f} characters")
print(f"Average chunk length WITHOUT overlap: {avg_len_without:.0f} characters")

# Check for keyword presence (simple relevance proxy)
keyword = "isolation"
with_overlap_hits = sum(1 for row in docs_with_overlap if keyword.lower() in row[2].lower())
without_overlap_hits = sum(1 for row in docs_no_overlap if keyword.lower() in row[2].lower())

print(f"\nChunks containing '{keyword}':")
print(f"  WITH overlap: {with_overlap_hits}/{len(docs_with_overlap)}")
print(f"  WITHOUT overlap: {without_overlap_hits}/{len(docs_no_overlap)}")

print(f"\n💡 Key Observations:")
print(f"   • WITH overlap chunks tend to have more context around key information")
print(f"   • WITHOUT overlap may miss context if answer spans chunk boundaries")
print(f"   • Overlap increases redundancy but improves answer completeness")
print(f"   • The optimal overlap depends on your specific use case and query patterns")



## 📏 Step 12: Evaluate Chunk Size Effects on Retrieval Precision

Now let's analyze how different chunk sizes affect retrieval precision. This helps us understand the trade-offs between chunk granularity and retrieval quality.

### 🎯 What We're Measuring:

We'll simulate different chunk sizes and measure:
- **Match score**: How many retrieved chunks contain relevant keywords
- **Precision proxy**: Ratio of relevant chunks to total retrieved
- **Coverage**: Whether smaller or larger chunks capture the answer better

### 📊 Chunk Sizes to Test:

- **50 words**: Very small, highly focused chunks
- **150 words**: Medium-small chunks
- **300 words**: Medium-large chunks (close to our 200-word baseline)
- **600 words**: Large chunks with extensive context

### 🔍 Expected Patterns:

- **Very small chunks (50)**: May be too fragmented, missing context
- **Medium chunks (150-300)**: Often optimal for focused retrieval
- **Large chunks (600)**: More context but may dilute precision

### 💡 Why This Analysis Matters:

In production RAG systems, chunk size directly impacts:
- **Retrieval precision**: Smaller chunks = more focused matches
- **Answer completeness**: Larger chunks = more context
- **Storage costs**: More chunks = higher storage and compute
- **LLM context usage**: Larger chunks consume more tokens


In [0]:
import pandas as pd
import matplotlib.pyplot as plt

# Chunk sizes to evaluate
chunk_sizes = [50, 150, 300, 600]
precision_simulation = []

# Test question
question = "What is the isolation protocol?"

# Retrieve once from the main index
top_k = 10
print(f"🔍 Evaluating chunk size impact for query: '{question}'")
print(f"📊 Retrieving top {top_k} chunks from index...\n")

results = vsc.get_index(
    endpoint_name="orielly-chapter2-endpoint",
    index_name="main.default.rag_chunks_index"
).similarity_search(
    query_text=question,
    columns=["chunk_id", "source", "text"],
    num_results=top_k
)

retrieved = results.get("result", {}).get("data_array", [])

# Simulate precision for different chunk sizes
print("📈 Analyzing retrieval precision by chunk size...\n")

for size in chunk_sizes:
    # Filter chunks that would fit in this size (approximate)
    # We're simulating by filtering retrieved chunks by length
    filtered_chunks = [row for row in retrieved if len(row[2].split()) <= size + 25]
    filtered_k = len(filtered_chunks)

    if filtered_k == 0:
        precision_score = 0
        comment = "No chunks match this size"
    else:
        # Calculate precision: how many contain the keyword "isolation"
        precision_score = sum(["isolation" in row[2].lower() for row in filtered_chunks]) / filtered_k
        comment = f"{filtered_k} chunks analyzed"

    precision_simulation.append({
        "Chunk Size (words)": size,
        "Chunks Retrieved": filtered_k,
        "Precision Score": precision_score,
        "Notes": comment
    })

    print(f"Chunk size {size:3d} words: Precision = {precision_score:.2f} ({comment})")

# Create DataFrame and visualize
df_eval = pd.DataFrame(precision_simulation)

print(f"\n📊 Precision Analysis Summary:")
print("="*80)
display(df_eval)

# Plot the results
plt.figure(figsize=(10, 6))
plt.plot(df_eval["Chunk Size (words)"], df_eval["Precision Score"], marker="o", linewidth=2, markersize=8)
plt.title("Retrieval Precision by Chunk Size", fontsize=14, fontweight='bold')
plt.xlabel("Chunk Size (words)", fontsize=12)
plt.ylabel("Precision Score", fontsize=12)
plt.ylim(0, 1.05)
plt.grid(True, alpha=0.3)
plt.axhline(y=0.5, color='r', linestyle='--', alpha=0.5, label='50% threshold')
plt.legend()
plt.tight_layout()
plt.show()

print(f"\n💡 Interpretation:")
print(f"   • The optimal chunk size balances precision and context")
print(f"   • Very small chunks may lack sufficient context")
print(f"   • Very large chunks may dilute relevance with noise")
print(f"   • Medium-sized chunks (150-300 words) often perform best")


## 📊 Understanding Precision by Chunk Size

This analysis reveals critical insights about chunk granularity in RAG systems.

### 📈 Key Findings:

1. **Very Small Chunks (50 words)**:
   - ❌ Often too fragmented to capture complete thoughts
   - ❌ May miss context needed for accurate retrieval
   - ✅ Can be useful for very specific, keyword-focused queries

2. **Medium Chunks (150-300 words)**:
   - ✅ Typically optimal for most RAG applications
   - ✅ Balance between focus and context
   - ✅ Align well with typical question scope
   - ✅ Efficient use of LLM context window

3. **Large Chunks (600+ words)**:
   - ✅ Provide extensive context
   - ❌ May include irrelevant information (noise)
   - ❌ Consume more LLM tokens
   - ⚠️ Can reduce precision due to dilution

### 🎯 Production Recommendations:

- **Start with 150-300 words** as a baseline
- **Test with your specific queries** to find optimal size
- **Consider domain characteristics**: Technical docs may need larger chunks
- **Monitor retrieval metrics** in production to tune over time
- **Use overlap (25-50 words)** to preserve context at boundaries

### 🔬 Advanced Considerations:

- **Adaptive chunking**: Vary size based on document structure
- **Semantic chunking**: Split at topic boundaries, not fixed sizes
- **Hierarchical retrieval**: Use multiple chunk sizes in parallel
- **Query-dependent sizing**: Adjust chunk size based on query type


## 🔀 Step 13: Implement Hybrid Search Strategy

While vector search is powerful, combining it with keyword search can improve robustness. Let's implement a hybrid approach!

### 🎯 Why Hybrid Search?

**Vector Search** (semantic):
- ✅ Finds conceptually similar content
- ✅ Handles paraphrasing and synonyms
- ❌ May miss exact term matches
- ❌ Can be affected by embedding quality

**Keyword Search** (lexical):
- ✅ Guarantees exact term matches
- ✅ Fast and deterministic
- ❌ Misses semantic similarity
- ❌ Sensitive to exact wording

**Hybrid Approach**:
- ✅ Best of both worlds
- ✅ Fallback mechanism for reliability
- ✅ Can combine and rerank results
- ✅ More robust to edge cases

### 🔧 Implementation Strategy:

1. **Try vector search first** (primary method)
2. **Fall back to keyword search** if vector search fails or returns no results
3. **Optionally combine results** from both methods with weighted scoring

### 💡 Production Enhancements:

- **Reciprocal Rank Fusion (RRF)**: Combine rankings from both methods
- **Weighted scoring**: Adjust importance of semantic vs lexical match
- **Query classification**: Route queries to best search method
- **Result deduplication**: Remove overlapping chunks from combined results


In [0]:
def hybrid_search(query, k=5):
    """
    Perform hybrid search combining vector and keyword approaches.

    Args:
        query (str): Search query
        k (int): Number of results to return

    Returns:
        list: Search results (either from vector search or keyword fallback)
    """
    try:
        # Primary: Vector search
        print("🔍 Attempting semantic vector search...")
        results = vsc.get_index(
            endpoint_name="orielly-chapter2-endpoint",
            index_name="main.default.rag_chunks_index"
        ).similarity_search(
            query_text=query,
            columns=["chunk_id", "source", "text"],
            num_results=k
        )
        hits = results.get("result", {}).get("data_array", [])

        if hits:
            print(f"✅ Vector search returned {len(hits)} results")
            return hits
        else:
            print("⚠️ Vector search returned no results")

    except Exception as e:
        print(f"⚠️ Vector search failed: {str(e)}")

    # Fallback: Keyword search using Spark SQL
    print("🔄 Falling back to keyword search...")
    query_terms = query.lower().split()

    # Build SQL condition for keyword matching
    condition = " OR ".join([f"LOWER(text) LIKE '%{term}%'" for term in query_terms])

    fallback_df = spark.sql(f"""
        SELECT chunk_id, source, text
        FROM main.default.rag_chunks_embedded
        WHERE {condition}
        LIMIT {k}
    """)

    results = fallback_df.collect()
    print(f"✅ Keyword search returned {len(results)} results")
    return results

# Test hybrid search
print("="*80)
print("🔬 Testing Hybrid Search")
print("="*80)

test_query = "What are the symptoms of COVID-19?"
print(f"\nQuery: '{test_query}'\n")

hybrid_results = hybrid_search(test_query, k=5)

print(f"\n📋 Results:")
print("-"*80)
for i, r in enumerate(hybrid_results, start=1):
    # Handle both tuple (from vector search) and Row (from SQL) formats
    if isinstance(r, tuple):
        chunk_id, source, text = r[0], r[1], r[2]
    else:
        chunk_id, source, text = r.chunk_id, r.source, r.text

    print(f"\n🔹 Result {i}:")
    print(f"   Source: {source}")
    print(f"   Chunk ID: {chunk_id}")
    print(f"   Text: {text[:150]}...")

print(f"\n💡 Hybrid search provides robustness by combining semantic and lexical matching")


## 🎓 Lab Wrap-Up: Key Learnings and Next Steps

Congratulations! You've completed a comprehensive hands-on lab on chunking and indexing for RAG applications.

### ✅ What You Accomplished:

| Step | Achievement | Key Learning |
|------|-------------|--------------|
| 📥 **Document Ingestion** | Downloaded and extracted text from medical PDFs | Text extraction quality impacts downstream performance |
| 🧹 **Text Cleaning** | Removed noise and normalized formatting | Clean data = better embeddings and retrieval |
| ✂️ **Chunking** | Applied sentence-based chunking with overlap | Overlap preserves context at chunk boundaries |
| 💾 **Delta Lake Storage** | Saved chunks with metadata in Delta format | Delta provides ACID guarantees and versioning |
| 🧠 **Embedding Generation** | Created vector embeddings using BGE model | Embeddings capture semantic meaning for similarity search |
| 🔍 **Vector Search** | Built and queried vector search indexes | Enables fast semantic retrieval at scale |
| 🔀 **Overlap Comparison** | Compared retrieval with/without overlap | Overlap improves completeness but adds redundancy |
| 📏 **Chunk Size Analysis** | Evaluated precision across different sizes | Medium chunks (150-300 words) often optimal |
| 🔄 **Hybrid Search** | Combined semantic and keyword search | Hybrid approaches improve robustness |

---

### 🧠 Critical Insights:

1. **Chunking Strategy Matters**:
   - Chunk size directly impacts retrieval precision and recall
   - Overlap prevents context loss at boundaries
   - Sentence-based chunking preserves semantic coherence

2. **Quality Over Quantity**:
   - Clean, well-structured chunks outperform large volumes of noisy data
   - Metadata (source, chunk_id) enables traceability and debugging
   - Text preprocessing significantly improves embedding quality

3. **Trade-offs Are Inevitable**:
   - Smaller chunks = higher precision, but may lack context
   - Larger chunks = more context, but lower precision
   - More overlap = better completeness, but higher storage costs

4. **Production Considerations**:
   - Monitor retrieval metrics continuously
   - A/B test different chunking strategies
   - Consider domain-specific requirements
   - Plan for incremental updates and versioning

---

### 🚀 Next Steps and Extensions:

1. **Complete the RAG Loop**:
   - Connect retrieved chunks to an LLM endpoint (e.g., DBRX, Llama)
   - Implement prompt engineering for answer generation
   - Add citation and source attribution

2. **Advanced Chunking**:
   - Implement semantic chunking (split at topic boundaries)
   - Try hierarchical chunking (multiple granularities)
   - Experiment with document-aware chunking (preserve structure)

3. **Retrieval Optimization**:
   - Implement reranking with cross-encoders
   - Add metadata filtering (date, source, category)
   - Experiment with query expansion and reformulation

4. **Evaluation and Monitoring**:
   - Build evaluation datasets with ground truth
   - Implement retrieval metrics (MRR, NDCG, Recall@k)
   - Set up monitoring dashboards for production

5. **Scale and Performance**:
   - Test with larger document collections
   - Optimize embedding batch sizes
   - Implement caching for frequently accessed chunks

---

### 📚 Additional Resources:

- **Databricks Documentation**: [Vector Search Guide](https://docs.databricks.com/en/vector-search/)
- **Embedding Models**: Explore other models on Hugging Face
- **RAG Patterns**: Study advanced RAG architectures (HyDE, RAG-Fusion)
- **Evaluation**: Learn about RAGAS and other RAG evaluation frameworks

---

### 💬 Reflection Questions:

1. How would you adapt this pipeline for a different domain (legal, financial, scientific)?
2. What chunk size and overlap would you choose for your use case?
3. How would you handle multilingual documents?
4. What additional metadata would be valuable for your application?
5. How would you evaluate retrieval quality in production?

---

### 🎯 Final Thought:

> **"The quality of a RAG system is determined not by the sophistication of the LLM, but by the quality of the retrieved context. Master data preparation, and you master RAG."**

Thank you for completing this lab! You now have the foundational skills to build production-grade RAG systems on Databricks.

🌟 **Happy Building!** 🌟

