# RAG Implementation Workshop
## Setting up RAG with Custom Documentation

### Step 1: Install Required Packages


In [None]:
!pip install -q sentence-transformers chromadb plotly scikit-learn anthropic


### Step 2: Import Required Libraries

In [None]:
import numpy as np
import pandas as pd
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.config import Settings
import plotly.express as px
import plotly.graph_objects as go
from typing import List, Dict
import textwrap
from pathlib import Path
from google.colab import files
import os
from sklearn.decomposition import PCA
import anthropic
from IPython.display import display, HTML, clear_output

### Step 3: Document Upload and Management

### Look into GitHub repo and download those 3 ```.md``` documents

In [None]:
def setup_docs_directory():
    """Create docs directory if it doesn't exist"""
    docs_dir = Path('docs')
    docs_dir.mkdir(exist_ok=True)
    return docs_dir

def upload_docs_to_colab():
    """Upload markdown files to Colab"""
    print("Please upload your markdown files...")
    uploaded = files.upload()

    docs_dir = setup_docs_directory()

    # Move uploaded files to docs directory
    for filename in uploaded.keys():
        if filename.endswith('.md'):
            dest_path = docs_dir / filename
            with open(dest_path, 'wb') as f:
                f.write(uploaded[filename])
            print(f"✅ Saved {filename} to docs directory")

    return list(uploaded.keys())


### Step 4: RAG Implementation

In [None]:
class EnhancedRAG:
    def __init__(self):
        self.model = SentenceTransformer('all-MiniLM-L6-v2')
        self.chroma_client = chromadb.Client(Settings(
            persist_directory="./chroma_db"
        ))
        self.collection = None
        self.chunks = []
        self.embeddings = []

    def chunk_text(self, text: str, chunk_size: int = 200, overlap: int = 50) -> List[str]:
        """Split text into overlapping chunks and visualize the process."""
        words = text.split()
        chunks = []

        for i in range(0, len(words), chunk_size - overlap):
            chunk = ' '.join(words[i:i + chunk_size])
            chunks.append(chunk)

        # Visualize chunking
        print(f"\n{'='*20} Chunking Visualization {'='*20}")
        print(f"Total chunks created: {len(chunks)}")

        for idx, chunk in enumerate(chunks[:3]):  # Show first 3 chunks
            print(f"\nChunk {idx + 1} Preview:")
            print(textwrap.fill(chunk[:200] + "..." if len(chunk) > 200 else chunk, width=80))
            if idx == 2 and len(chunks) > 3:
                print("\n... and", len(chunks) - 3, "more chunks")

        return chunks

    def visualize_embeddings(self, embeddings: np.ndarray, chunks: List[str]):
        """Create an interactive visualization of embeddings."""
        pca = PCA(n_components=3)
        embeddings_3d = pca.fit_transform(embeddings)

        fig = go.Figure(data=[go.Scatter3d(
            x=embeddings_3d[:, 0],
            y=embeddings_3d[:, 1],
            z=embeddings_3d[:, 2],
            mode='markers+text',
            text=[f"Chunk {i+1}" for i in range(len(chunks))],
            hovertext=[chunk[:100] + "..." for chunk in chunks],
            marker=dict(
                size=10,
                color=list(range(len(chunks))),
                colorscale='Viridis',
                opacity=0.8
            )
        )])

        fig.update_layout(
            title="3D Visualization of Document Embeddings",
            scene=dict(
                xaxis_title="PCA Component 1",
                yaxis_title="PCA Component 2",
                zaxis_title="PCA Component 3"
            ),
            width=800,
            height=800
        )

        fig.show()

    def setup_collection(self, documents: List[Dict[str, str]]):
        """Initialize ChromaDB collection with provided documents."""
        # First, create chunks from documents
        all_chunks = []
        metadata_list = []
        ids = []

        print("\n📄 Processing Documents and Creating Chunks...")

        for idx, doc in enumerate(documents):
            chunks = self.chunk_text(doc["text"])
            self.chunks.extend(chunks)

            # Create metadata and IDs for each chunk
            for chunk_idx, chunk in enumerate(chunks):
                metadata_list.append({
                    "source": doc["source"],
                    "chunk_index": chunk_idx,
                    "original_doc_id": idx
                })
                ids.append(f"doc_{idx}_chunk_{chunk_idx}")
                all_chunks.append(chunk)

        # Generate and store embeddings
        print("\n🔢 Generating Embeddings...")
        self.embeddings = self.model.encode(all_chunks)

        # Visualize embeddings
        self.visualize_embeddings(self.embeddings, all_chunks)

        # Create or get collection
        self.collection = self.chroma_client.get_or_create_collection(
            name="enhanced_documentation_examples"
        )

        # Add documents to collection
        self.collection.add(
            documents=all_chunks,
            metadatas=metadata_list,
            ids=ids
        )

        print(f"\n✅ Successfully processed {len(documents)} documents into {len(all_chunks)} chunks")
        return self.collection

    def retrieve_documents(self, query: str, n_results: int = 3):
        """Retrieve relevant documents and visualize the retrieval process."""
        results = self.collection.query(
            query_texts=[query],
            n_results=n_results
        )

        # Visualize relevance scores
        distances = results['distances'][0]
        documents = results['documents'][0]
        metadatas = results['metadatas'][0]

        fig = go.Figure(data=[
            go.Bar(
                x=[f"Chunk {meta['chunk_index']} from {meta['source']}"
                   for meta in metadatas],
                y=[1 - dist for dist in distances],  # Convert distance to similarity
                text=[f"{(1-dist)*100:.1f}%" for dist in distances],
                textposition='auto',
            )
        ])

        fig.update_layout(
            title=f"Relevance Scores for Query: '{query}'",
            xaxis_title="Document Chunks",
            yaxis_title="Relevance Score",
            yaxis_range=[0, 1],
            width=800,
            height=400
        )

        fig.show()

        return results

    def generate_response(self, query: str, api_key: str):
        """Generate a response using Claude with retrieved context."""
        # Get relevant documents
        results = self.retrieve_documents(query)
        context = "\n".join(results['documents'][0])
        ### Please enter the Claude API key
        api_key = ""
        # Initialize Claude client
        client = anthropic.Anthropic(api_key=api_key)

        # Create the prompt
        system_prompt = """You are an AI assistant helping with Python documentation questions.
        Use the provided context to answer questions, and maintain the style and terminology used in the context."""

        user_message = f"""Context from documentation:
        {context}

        Question: {query}

        Please provide a detailed answer based on the context provided."""

        # Generate response
        response = client.messages.create(
            model="claude-3-5-haiku-20241022",
            max_tokens=1000,
            temperature=0.2,
            system=system_prompt,
            messages=[{"role": "user", "content": user_message}]
        )

        return response.content[0].text



### Step 5: Document Loading

In [None]:
class DocumentLoader:
    """Handles loading documents for the RAG system."""

    def __init__(self, docs_dir: str = "docs"):
        """Initialize with the directory containing documentation files."""
        self.docs_dir = Path(docs_dir)

    def load_documents(self) -> List[Dict[str, str]]:
        """Load all markdown files from the docs directory."""
        documents = []

        print(f"📚 Loading documents from {self.docs_dir}")

        # Create directory if it doesn't exist
        self.docs_dir.mkdir(exist_ok=True)

        # Load all .md files
        for file_path in self.docs_dir.glob("*.md"):
            try:
                with open(file_path, 'r', encoding='utf-8') as f:
                    content = f.read()

                documents.append({
                    "source": file_path.name,
                    "text": content
                })
                print(f"✅ Loaded: {file_path.name}")

            except Exception as e:
                print(f"❌ Error loading {file_path}: {str(e)}")

        print(f"\nTotal documents loaded: {len(documents)}")
        return documents


### Step 6: Interactive Demo

In [None]:
class RAGSession:
    """Class to maintain RAG session state"""
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.rag = EnhancedRAG()

        # Load documents
        loader = DocumentLoader()
        self.documents = loader.load_documents()

        # Setup collection
        print("\n🚀 Initializing RAG System...")
        self.rag.setup_collection(self.documents)

    def process_query(self, query: str):
        """Process a single query and return response"""
        try:
            print("\n🔍 Retrieving relevant documents...")
            # This will show the visualization
            response = self.rag.generate_response(query, self.api_key)
            return response
        except Exception as e:
            print(f"❌ Error processing query: {str(e)}")
            return None

def run_rag_demo(api_key: str):
    """Run an interactive demo of the RAG system."""
    # Initialize RAG
    print("\n🚀 Initializing RAG System...")
    rag = EnhancedRAG()

    # Load documents once
    loader = DocumentLoader()
    documents = loader.load_documents()

    # Setup collection once
    collection = rag.setup_collection(documents)
    print("\n✅ RAG System initialized and ready!")

    while True:
        try:
            # Get query
            print("\n" + "="*50)
            query = input("Enter your question (or 'exit' to quit): ").strip()

            if query.lower() == 'exit':
                print("Thank you for using the RAG demo!")
                break

            if not query:
                continue

            # Process query
            print(f"\n🔍 Processing query: {query}")

            # This will show visualizations and generate response
            response = rag.generate_response(query, api_key)

            # Print response
            print("\n🤖 Response:")
            print(response)

            # Wait for user before next query
            print("\nPress Enter for next question...")
            input()

        except Exception as e:
            print(f"\n❌ Error: {str(e)}")
            print("Let's try another question...")

### Step 7: Main Execution

In [None]:
def main():
    # Step 1: Upload documents
    print("Step 1: Upload your markdown files")
    uploaded_files = upload_docs_to_colab()
    print(f"\nUploaded files: {uploaded_files}")

    # Step 2: Get API key
    api_key = input("\nPlease enter your Anthropic API key: ")

    # Step 3: Run the demo
    print("\nStarting RAG demo...")
    run_rag_demo(api_key)

### Run the Implementation

In [None]:
if __name__ == "__main__":
    main()