# 🔍 RAG with Unsloth Dynamic 4-bit Quantization
This Colab notebook demonstrates a Retrieval-Augmented Generation (RAG) pipeline with **Unsloth’s dynamic 4-bit quantization** for memory‑efficient LLM inference.

**Pipeline Stages:**
1. Install & Setup  
2. Define Data Models & Utilities  
3. Load Quantized Model (Unsloth / BitsAndBytes)  
4. Document Processing & Embedding Indexing  
5. Vector Retrieval  
6. RAG Orchestration  
7. Demo Queries 🚀

### 📌 1) Setup & Installation

In [None]:
!pip install -q unsloth transformers accelerate sentence-transformers faiss-cpu bitsandbytes torch
print("✅ Dependencies installed (if not already).")

### 📌 2) Data Models & Utilities

In [None]:
from dataclasses import dataclass
from typing import List, Optional, Dict, Any
import numpy as np
import gc, torch

@dataclass
class DocumentChunk:
    id: str
    content: str
    metadata: dict
    embedding: Optional[np.ndarray]
    source_document: str
    chunk_index: int

@dataclass
class RetrievalResult:
    chunk: DocumentChunk
    relevance_score: float
    rank: int

@dataclass
class RAGResponse:
    query: str
    response: str
    retrieved_chunks: List[RetrievalResult]
    generation_metadata: dict
    memory_usage: dict

class MemoryMonitor:
    @staticmethod
    def vram():
        if not torch.cuda.is_available():
            return {"device": "cpu", "allocated_gb": None, "reserved_gb": None}
        dev = torch.cuda.current_device()
        return {
            "device": torch.cuda.get_device_name(dev),
            "allocated_gb": round(torch.cuda.memory_allocated(dev)/(1024**3), 3),
            "reserved_gb": round(torch.cuda.memory_reserved(dev)/(1024**3), 3)
        }
    @staticmethod
    def cleanup():
        gc.collect()
        if torch.cuda.is_available():
            torch.cuda.empty_cache()


### 📌 3) Model Management – Unsloth 4-bit Loader

In [None]:
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
from unsloth import FastLanguageModel

class UnslothModelManager:
    def __init__(self, model_name="unsloth/Meta-Llama-3.1-8B-Instruct-bnb-4bit"):
        self.model_name = model_name
        self.model, self.tokenizer = None, None

    def load_quantized_model(self):
        model, tokenizer = FastLanguageModel.from_pretrained(
            self.model_name,
            max_seq_length=4096,
            dtype="bfloat16",
            load_in_4bit=True,
        )
        self.model, self.tokenizer = model, tokenizer
        return model, tokenizer

    def get_memory_usage(self):
        return MemoryMonitor.vram()


### 📌 4) Document Processing & Embeddings

In [None]:
from sentence_transformers import SentenceTransformer
import faiss, uuid

class DocumentProcessor:
    def __init__(self, embed_model="sentence-transformers/all-MiniLM-L6-v2", chunk_size=800, overlap=200):
        self.embedder = SentenceTransformer(embed_model)
        self.chunk_size, self.overlap = chunk_size, overlap
        self.chunks, self.index = [], None

    def chunk_documents(self, docs):
        out = []
        for doc_id, text in docs:
            tokens = text.split()
            step = max(1, self.chunk_size - self.overlap)
            for i in range(0, len(tokens), step):
                chunk = " ".join(tokens[i:i+self.chunk_size])
                if not chunk:
                    continue
                out.append(DocumentChunk(str(uuid.uuid4()), chunk, {"source": doc_id}, None, doc_id, i//step))
                if i + self.chunk_size >= len(tokens):
                    break
        self.chunks = out
        return out

    def generate_embeddings(self):
        texts = [c.content for c in self.chunks]
        if not texts:
            return np.zeros((0, 384), dtype="float32")
        embeddings = self.embedder.encode(texts, convert_to_numpy=True, show_progress_bar=False)
        embeddings = embeddings.astype("float32")
        # Normalize for inner product similarity
        faiss.normalize_L2(embeddings)
        for c, e in zip(self.chunks, embeddings):
            c.embedding = e
        return embeddings

    def create_index(self):
        if not self.chunks:
            raise ValueError("No chunks to index.")
        d = int(self.chunks[0].embedding.shape[0])
        index = faiss.IndexFlatIP(d)
        embs = np.vstack([c.embedding for c in self.chunks]).astype("float32")
        index.add(embs)
        self.index = index


### 📌 5) Retrieval Component

In [None]:
class VectorRetriever:
    def __init__(self, processor: DocumentProcessor, top_k=4):
        if processor.index is None:
            raise ValueError("DocumentProcessor must have an index before creating VectorRetriever.")
        self.processor = processor
        self.top_k = top_k

    def retrieve_chunks(self, query: str):
        q_emb = self.processor.embedder.encode([query], convert_to_numpy=True)
        q_emb = q_emb.astype("float32")
        faiss.normalize_L2(q_emb)
        scores, idxs = self.processor.index.search(q_emb, self.top_k)
        results = []
        for rank, (score, idx) in enumerate(zip(scores[0], idxs[0])):
            results.append(RetrievalResult(self.processor.chunks[idx], float(score), rank+1))
        return results


### 📌 6) RAG Pipeline Orchestration

In [None]:
class RAGPipeline:
    def __init__(self, mm: UnslothModelManager, dp: DocumentProcessor, retriever: VectorRetriever):
        self.mm, self.dp, self.retriever = mm, dp, retriever

    def format_context(self, results):
        return "\n\n".join([f"[Rank {r.rank} | Score {r.relevance_score:.4f} | Source {r.chunk.metadata.get('source')}]\n{r.chunk.content}" for r in results])

    def generate_response(self, query, context, max_new_tokens=256):
        tokenizer = self.mm.tokenizer
        model = self.mm.model
        device = 'cuda' if torch.cuda.is_available() else 'cpu'
        prompt = (
            "You are a helpful assistant that answers only using the provided CONTEXT. "
            "If context is insufficient, say so.\n\nCONTEXT:\n" + context + f"\n\nQuestion: {query}\nAnswer:" 
        )
        inputs = tokenizer(prompt, return_tensors="pt").to(device)
        with torch.no_grad():
            output = model.generate(**inputs, max_new_tokens=max_new_tokens, do_sample=False)
        text = tokenizer.decode(output[0], skip_special_tokens=True)
        # Trim echoed prompt if necessary
        if text.startswith(prompt):
            text = text[len(prompt):].strip()
        return text

    def process_query(self, query: str):
        results = self.retriever.retrieve_chunks(query)
        context = self.format_context(results) if results else "(no relevant context)"
        answer = self.generate_response(query, context)
        return RAGResponse(query, answer, results, {"model": self.mm.model_name}, self.mm.get_memory_usage())


### 📌 7) Demo Queries 🚀

In [None]:
# 1) Load quantized model
mm = UnslothModelManager()
mm.load_quantized_model()
print("Model ready on:", mm.get_memory_usage())

# 2) Sample documents
docs = [
    ("design.md", "The RAG pipeline uses Unsloth dynamic 4-bit quantization to run an LLM efficiently. "
                 "It retrieves chunks from a vector store and generates grounded responses."),
    ("requirements.md", "Users can load quantized models, index documents, retrieve top-k chunks with scores, "
                        "and generate responses grounded in context with memory monitoring."),
]

# 3) Build processor + index
dp = DocumentProcessor()
dp.chunk_documents(docs)
dp.generate_embeddings()
dp.create_index()

# 4) Build retriever + pipeline
retriever = VectorRetriever(dp)
pipeline = RAGPipeline(mm, dp, retriever)

# 5) Ask a question
resp = pipeline.process_query("What is this pipeline about?")
print("\n\n=== Answer ===\n", resp.response)

print("\n=== Retrieved Chunks ===")
for r in resp.retrieved_chunks:
    print(r.rank, f"score={r.relevance_score:.4f}", r.chunk.metadata)

print("\n=== VRAM ===\n", resp.memory_usage)
