[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/kgweber-cwru/coding-with-ai-wn26/blob/main/week-5-building-rag-systems/assignment.ipynb)

# Week 5 Assignment: Build a Domain-Specific RAG System

## Objective
Build a complete, production-quality RAG system for your domain with proper chunking, metadata, and retrieval strategies.

## Requirements
1. Use the RAGSystem class (or build your own)
2. Process at least 15 documents with metadata
3. Implement appropriate chunking
4. Test with 5+ realistic questions
5. Demonstrate metadata filtering
6. Evaluate retrieval quality

In [None]:
import os
import sys
from pathlib import Path

IN_COLAB = "google.colab" in sys.modules

if IN_COLAB:
    !pip install -q google-genai google-auth python-dotenv numpy chromadb
    from google.colab import auth
    auth.authenticate_user()
    try:
        PROJECT_ID = input("Enter your Google Cloud Project ID (press Enter to use default ADC): ").strip()
    except Exception:
        PROJECT_ID = ""
    if PROJECT_ID:
        os.environ["GOOGLE_CLOUD_PROJECT"] = PROJECT_ID
else:
    def find_service_account_json(max_up=6):
        p = Path.cwd()
        for _ in range(max_up):
            candidate = p / "series-2-coding-llms" / "creds"
            if candidate.exists():
                for f in candidate.glob("*.json"):
                    return str(f.resolve())
            candidate2 = p / "creds"
            if candidate2.exists():
                for f in candidate2.glob("*.json"):
                    return str(f.resolve())
            p = p.parent
        return None

    sa_path = find_service_account_json()
    if sa_path:
        os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = sa_path
    else:
        try:
            from dotenv import load_dotenv
            load_dotenv()
        except Exception:
            pass


In [None]:
import os
import numpy as np
import chromadb
from datetime import datetime
from google import genai
from google.genai import types
import google.auth

creds, project = google.auth.default()
project = os.environ.get("GOOGLE_CLOUD_PROJECT", project)
client = genai.Client(vertexai=True, project=project, location="us-central1")
print(f"Using project: {project}")

print("✅ Environment loaded successfully!")

## Copy Classes from Concepts Notebook

(Include Document, DocumentProcessor, RAGDocumentStore, and RAGSystem classes)

In [None]:
class Document:
    """Represents a document with metadata"""
    def __init__(self, content, metadata=None):
        self.content = content
        self.metadata = metadata or {}
        self.metadata['created'] = datetime.now().isoformat()
    
    def __repr__(self):
        return f"Document(content={self.content[:50]}..., metadata={self.metadata})"

class DocumentProcessor:
    """Process and chunk documents"""
    
    @staticmethod
    def chunk_by_sentences(text, chunk_size=3, overlap=1):
        """Chunk by sentence count"""
        sentences = [s.strip() + '.' for s in text.split('.') if s.strip()]
        chunks = []
        
        for i in range(0, len(sentences), chunk_size - overlap):
            chunk = ' '.join(sentences[i:i + chunk_size])
            if chunk:
                chunks.append(chunk)
        
        return chunks
    
    @staticmethod
    def chunk_by_words(text, chunk_size=200, overlap=50):
        """Chunk by word count"""
        words = text.split()
        chunks = []

class ChromaDocumentStore:
    """Document store using ChromaDB for persistence and efficient search"""
    
    def __init__(self, collection_name="rag_collection"):
        if IN_COLAB:
            # Save to Google Drive in Colab
            db_path = "/content/drive/MyDrive/chroma_db"
            if not os.path.exists(db_path):
                os.makedirs(db_path)
            self.client = chromadb.PersistentClient(path=db_path)
        else:
            # Save to local disk otherwise
            self.client = chromadb.Client()
            
        self.collection = self.client.get_or_create_collection(name=collection_name)
    
    def add_document(self, document, chunk_strategy='sentences', chunk_size=3):
        """Add document with chunking"""
        processor = DocumentProcessor()
        
        if chunk_strategy == 'sentences':
            chunks = processor.chunk_by_sentences(document.content, chunk_size)
        else:
            chunks = processor.chunk_by_words(document.content, chunk_size)
        
        if not chunks:
            return 0

        # Get embeddings for all chunks in one API call
        response = client.models.embed_content(
            model="gemini-embedding-001",
            contents=[c.replace("\n", " ") for c in chunks]
        )
        embeddings = [e.values for e in response.embeddings]
        
        # Create a unique document ID for tracking
        document_id = f"{document.metadata.get('source', 'doc')}_{datetime.now().timestamp()}"
        
        # Prepare metadata and IDs for ChromaDB
        metadatas = []
        ids = []
        for i, chunk in enumerate(chunks):
            chunk_metadata = document.metadata.copy()
            chunk_metadata['document_id'] = document_id
            chunk_metadata['chunk_index'] = i
            chunk_metadata['total_chunks'] = len(chunks)
            metadatas.append(chunk_metadata)
            # Create a unique ID for each chunk
            ids.append(f"{document_id}_{i}")

        # Add to ChromaDB collection
        self.collection.add(
            embeddings=embeddings,
            documents=chunks,
            metadatas=metadatas,
            ids=ids
        )
        
        return len(chunks)
    
    def search(self, query, top_k=5, filters=None):
        """Search with optional metadata filtering"""
        response = client.models.embed_content(
            model="gemini-embedding-001",
            contents=query
        )
        query_embedding = response.embeddings[0].values
        
        # ChromaDB handles filtering with the 'where' clause
        results = self.collection.query(
            query_embeddings=[query_embedding],
            n_results=top_k,
            where=filters
        )
        
        # Format results to match our RAG system's expectations
        formatted_results = []
        if results and results['documents']:
            for i, doc in enumerate(results['documents'][0]):
                formatted_results.append({
                    "chunk": doc,
                    "similarity": 1 - results['distances'][0][i], # Chroma uses distance, convert to similarity
                    "metadata": results['metadatas'][0][i]
                })
        
        return formatted_results
    
    def __len__(self):
        return self.collection.count()

class RAGSystem:
    """Complete RAG system with a pluggable document store"""
    
    def __init__(self, store_impl=ChromaDocumentStore):
        self.store = store_impl()
    
    def add_document(self, content, metadata=None, **kwargs):
        """Add document to system"""
        doc = Document(content, metadata)
        chunks_added = self.store.add_document(doc, **kwargs)
        return chunks_added
    
    def query(self, question, top_k=3, filters=None, temperature=0.3):
        """Query system with RAG"""
        # Retrieve
        results = self.store.search(question, top_k=top_k, filters=filters)
        
        if not results:
            return {
                "answer": "No relevant documents found.",
                "sources": []
            }
        
        # Build context
        context = "\n\n".join([
            f"[Source {i+1}] {r['chunk']}"
            for i, r in enumerate(results)
        ])
        
        # Generate
        prompt = f"""Answer the question based on the provided context. 
If the answer is not in the context, say so.

Context:
{context}

Question: {question}

Answer:"""
        
        response = client.models.generate_content(
            model="gemini-2.5-flash-lite",
            contents=prompt,
            config=types.GenerateContentConfig(temperature=temperature)
        )
        
        return {
            "answer": response.text,
            "sources": results
        }
    
    def stats(self):
        """Get system statistics"""
        total_chunks = len(self.store)
        # To get unique docs, count unique document_id values in metadata
        if isinstance(self.store, ChromaDocumentStore):
            all_meta = self.store.collection.get(include=['metadatas'])
            unique_docs = len(set(m.get('document_id', '') for m in all_meta['metadatas']))
        else:
            unique_docs = "N/A"

        return {
            "total_chunks": total_chunks,
            "unique_documents": unique_docs,
            "store_implementation": self.store.__class__.__name__
        }


print("✅ RAG system classes ready")

## Step 1: Describe Your System

**YOUR DESCRIPTION**

Domain: [your field]

Use case: [what problem does this solve?]

Document types: [what you're indexing]

Target users: [who will use this?]

Metadata strategy: [what metadata will you track and why?]

## Step 2: Prepare Your Documents

In [None]:
# Create your document collection with metadata
documents = [
    {
        "content": """YOUR DOCUMENT CONTENT""",
        "metadata": {
            "source": "...",
            "category": "...",
            # Add relevant metadata
        }
    },
    # Add at least 15 documents
]

print(f"Prepared {len(documents)} documents")

## Step 3: Build Your RAG System

In [None]:
# Initialize system
rag = RAGSystem()

# Add documents
print("Building knowledge base...")
for doc in documents:
    chunks = rag.add_document(
        doc["content"],
        doc["metadata"],
        chunk_strategy='sentences',  # or 'words'
        chunk_size=3  # adjust as needed
    )
    print(f"  Added document: {chunks} chunks")

print(f"\nSystem ready! Stats: {rag.stats()}")

## Step 4: Test with Questions

In [None]:
test_questions = [
    "YOUR QUESTION 1",
    "YOUR QUESTION 2",
    "YOUR QUESTION 3",
    "YOUR QUESTION 4",
    "YOUR QUESTION 5",
]

for q in test_questions:
    print(f"\n{'='*70}")
    print(f"Q: {q}")
    print('='*70)
    
    result = rag.query(q, top_k=3)
    
    print(f"\nA: {result['answer']}")
    print("\nSources:")
    for i, src in enumerate(result['sources'], 1):
        print(f"  {i}. [Score: {src['similarity']:.3f}]")
        print(f"     {src['chunk'][:100]}...")
        print(f"     Metadata: {src['metadata']}")

## Step 5: Demonstrate Filtering

In [None]:
# Test metadata filtering
filtered_question = "YOUR QUESTION"
filter_criteria = {"YOUR_METADATA_KEY": "YOUR_VALUE"}

print(f"Question: {filtered_question}")
print(f"Filter: {filter_criteria}\n")

result = rag.query(filtered_question, filters=filter_criteria, top_k=3)
print(f"Answer: {result['answer']}")

## Step 6: Evaluate Retrieval Quality

In [None]:
# Manually evaluate: for each question, were the right chunks retrieved?

evaluation = [
    {
        "question": "YOUR QUESTION",
        "relevant_chunks_found": True/False,
        "answer_quality": "good/fair/poor",
        "notes": "YOUR OBSERVATIONS"
    },
    # Evaluate each test question
]

# Calculate metrics
success_rate = sum(1 for e in evaluation if e['relevant_chunks_found']) / len(evaluation)
print(f"Retrieval success rate: {success_rate:.1%}")

## Reflection

### 1. How did chunking affect retrieval quality?

**YOUR ANSWER**

### 2. Was metadata filtering useful? When would you use it?

**YOUR ANSWER**

### 3. What improvements would make this production-ready?

**YOUR ANSWER**

### 4. What challenges did you encounter?

**YOUR ANSWER**

## Bonus: Save/Load System

Implement persistence so your system can be saved and loaded:

In [None]:
# BONUS CODE HERE