<a href="https://colab.research.google.com/github/sb8vk/ML/blob/master/Accelerating_the_RAG_Retriever_A_%22Zero_Copy%22_GPU_Pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

##**The Context**
Most RAG discussions focus heavily on the LLM. However, the bottleneck is often the Retriever, specifically the ingestion, cleaning, and indexing of massive enterprise datasets. If a CPU-based pipeline takes 12 hours to re-index the data, the LLM is always 12 hours out of date.

##**The Approach**
 We are going to move the "ETL" (Extract, Transform, Load) part of RAG onto the GPU using NVIDIA RAPIDS. By keeping data on the GPU, we avoid the latency penalty of moving data back and forth between system RAM and VRAM.

##**What We're Building**

Setup: A script to handle Colab's varying driver versions automatically.

Ingestion: Compare pandas (CPU) vs cudf (GPU) on a large dataset.

Cleaning: Perform regex redaction on millions of rows in parallel.

Indexing: Build a vector search index using cuVS (CAGRA).

# **Hardware Requirement**
Go to Runtime > Change runtime type.

Select T4 GPU (or better).

Setting up a GPU environment on cloud notebooks can be tricky. Three moving parts that must align:
- The NVIDIA Driver (managed by Google)
- The CUDA Runtime
- The Python Version

To solve this reliably, we use the RAPIDS Colab Installer. This detects the specific environment at runtime and fetches the correct pre-compiled wheels, avoiding the memory-crashing "build from source" failures common on smaller instances.

In [None]:
!nvidia-smi

In [None]:
# @title 1. Auto-Install RAPIDS (The Smart Way)
# This script detects your specific Colab config (Driver + Python)
# and installs the matching RAPIDS version to prevent conflicts.

!git clone https://github.com/rapidsai/rapidsai-csp-utils.git
!python rapidsai-csp-utils/colab/pip-install.py

print("\n" + "="*80)
print("STOP! Please go to 'Runtime > Restart Session' now.")
print("This is required to load the new CUDA libraries correctly.")
print("Then, proceed directly to Cell 2.")
print("="*80)

Because we installed low-level system libraries in Step 1, we had to restart the Python kernel. This gives us a clean slate but wipes out ephemeral packages. Here, we quickly restore sentence-transformers and import our GPU stack (cudf, cupy, cuvs).

## **2. Restore Dependencies & Import**

In [None]:
# @title 2. Restore Dependencies & Import
import os
import time
import gc

# 1. Restore Sentence-Transformers (often wiped on reset)
try:
    import sentence_transformers
except ImportError:
    print("Installing sentence-transformers...")
    !pip install -q sentence-transformers

# 2. Import the RAPIDS Stack
import cudf
import cupy as cp
import pandas as pd
import numpy as np
from sentence_transformers import SentenceTransformer
from cuvs.neighbors import cagra

print(f"   Environment Ready!")
print(f"   RAPIDS cuDF Version: {cudf.__version__}")
print(f"   GPU Detected: {cp.cuda.runtime.getDeviceCount()} device(s)")

# **3. Data Generation: Latency vs. Throughput**
A common misconception is that GPUs are always faster than CPUs. However, they have a fixed startup cost (initializing the CUDA context).

Small Data (100k rows): The CPU often wins because it starts instantly.

Big Data (5M rows): The GPU wins because its parallelism amortizes that startup cost.

We will generate both sizes to demonstrate this crossover point.

In [None]:
# @title 3. Generate Synthetic Data (Small vs. Big)
# Define sizes
num_rows_small = 100_000
num_rows_big   = 5_000_000

print(f"--- Generating Datasets ---")

# --- 1. Small Dataset (100k rows) ---
print(f"Creating Small Dataset ({num_rows_small:,} rows)...")
# We use a simple dictionary structure for speed
data_small = {
    'id': range(num_rows_small),
    'val': np.random.rand(num_rows_small),
    'text': [f"CONFIDENTIAL: Project Alpha data {i} Contact: user{i}@corp.com" for i in range(num_rows_small)]
}
pd.DataFrame(data_small).to_parquet('small_dataset.parquet')

# --- 2. Large Dataset (5M rows) ---
print(f"Creating Large Dataset ({num_rows_big:,} rows)...")
# This simulates a raw data dump (e.g., from S3)
data_big = {
    'id': range(num_rows_big),
    'val': np.random.rand(num_rows_big),
    'text': [f"CONFIDENTIAL: Project Alpha data {i} Contact: user{i}@corp.com" for i in range(num_rows_big)]
}
pd.DataFrame(data_big).to_parquet('large_dataset.parquet')

# Cleanup memory to be safe on T4
del data_small, data_big
gc.collect()

print("Data generation complete.")

# **4. The GPU Pipeline: Ingest, Clean, Dedup**
The Zero-Copy Architecture: Here is the core advantage. In a standard pipeline, you load data to RAM, process it on CPU, then copy it to GPU for the LLM. That copy over the PCIe bus is a massive bottleneck.

In this pipeline:

- Ingest: cudf loads directly to VRAM.

- Clean: We use vectorized Regex (executing on thousands of CUDA cores) to redact PII.

- Dedup: We hash and filter instantly in VRAM.

In [None]:
# @title 4. Run GPU Pipeline (Ingest -> Clean -> Dedup)

print("--- 1. Ingestion Benchmark (5M Rows) ---")
# Benchmark CPU
start = time.time()
_ = pd.read_parquet('large_dataset.parquet')
print(f"CPU Load Time: {time.time() - start:.2f}s")

# Benchmark GPU
# (Warmup included in first call overhead, but usually negligible at this scale)
start = time.time()
gdf = cudf.read_parquet('large_dataset.parquet')
print(f"GPU Load Time: {time.time() - start:.2f}s")

print("\n--- 2. High-Speed Cleaning (Regex) ---")
# Scenario: Redact all email addresses
# cuDF compiles this regex state machine and runs it in parallel
start = time.time()
gdf['clean_text'] = gdf['text'].str.replace(
    r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}',
    '<REDACTED>',
    regex=True
)
print(f"Regex Time: {time.time() - start:.2f}s")
print(f"Sample: {gdf['clean_text'][0]}")

print("\n--- 3. Deduplication ---")
# Exact deduplication via hashing
start = time.time()
gdf['hash'] = gdf['clean_text'].hash_values()
gdf = gdf.drop_duplicates(subset=['hash'])
print(f"Dedup Time: {time.time() - start:.2f}s")

# **5. Vector Search**
The final step is Indexing.

**Pro Tip:** We wrap the output in cp.asarray(). This tells CuPy (the GPU-accelerated NumPy equivalent) how to interpret that pointer, allowing us to safely transfer the results back to the CPU for the final display (cuDF -> PyTorch -> cuVS -> CuPy).

In [None]:
# @title 5. Vector Search (With Interoperability Fix)

print("--- Embedding & Indexing ---")
model = SentenceTransformer('all-MiniLM-L6-v2', device='cuda')

# 1. Embed a subset (10k docs)
# We take a subset to keep the embedding time short for this demo
subset_size = 10_000
subset_texts = gdf['clean_text'].iloc[:subset_size].to_arrow().to_pylist()
print(f"Embedding {subset_size} docs...")

# Generate Embeddings (PyTorch Tensor on GPU)
embeddings = model.encode(subset_texts, convert_to_tensor=True)

# 2. Build Index
# Convert PyTorch Tensor -> CuPy Array (Zero-Copy Handoff)
embeddings_cp = cp.asarray(embeddings.cpu().numpy(), dtype=cp.float32)

print("Building Index (CAGRA)...")
build_params = cagra.IndexParams(metric="sqeuclidean")
index = cagra.build(build_params, embeddings_cp)

# 3. Search
query = "project alpha confidential"
print(f"Querying: '{query}'")
query_vec = model.encode([query])
query_cp = cp.asarray(query_vec, dtype=cp.float32)

search_params = cagra.SearchParams()
distances, neighbors = cagra.search(search_params, index, query_cp, k=3)

# --- THE INTEROP FIX ---
# cuVS returns a raw 'device_ndarray'.
# We wrap it in cp.asarray() so we can use standard .get() methods.
neighbors_cp = cp.asarray(neighbors)
indices = neighbors_cp.get().flatten() # Move only the results to CPU

print("\n--- Results ---")
for i, idx in enumerate(indices):
    if idx < len(subset_texts):
        print(f"Match {i+1}: {subset_texts[idx]}")