# RAG Indexing Notebook

This notebook demonstrates RAG (Retrieval-Augmented Generation) pipeline setup.

In [None]:
import json
from typing import List, Dict, Any
import numpy as np

# Sample documents for RAG
documents = [
    {
        "id": "spark_tuning_1",
        "title": "Performance Tuning Guide",
        "content": "Spark performance tuning involves optimizing resource allocation, partition count, and shuffle operations."
    },
    {
        "id": "spark_partitioning",
        "title": "Partitioning Strategy",
        "content": "Proper partitioning is crucial for Spark performance. Consider data distribution and executor count."
    },
    {
        "id": "delta_optimization",
        "title": "Delta Lake Optimization",
        "content": "Delta Lake offers ACID transactions and optimizations like Z-ordering and compaction."
    },
    {
        "id": "broadcast_joins",
        "title": "Broadcast Join Strategy",
        "content": "Use broadcast joins when one DataFrame is small enough to fit in memory."
    },
    {
        "id": "skew_handling",
        "title": "Handling Data Skew",
        "content": "Data skew can cause performance issues. Use salting or repartitioning to handle it."
    }
]

print(f"Loaded {len(documents)} documents")
for doc in documents:
    print(f"- {doc['title']}")

In [None]:
# Simulate document embeddings (in real system, use transformers library)
def create_mock_embeddings(documents: List[Dict]) -> Dict[str, np.ndarray]:
    """
    Create mock embeddings for documents.
    In production, use actual embedding models like sentence-transformers.
    """
    embeddings = {}
    np.random.seed(42)
    
    for doc in documents:
        # Create a mock embedding based on document ID
        embedding = np.random.randn(384)  # 384-dimensional embedding
        embeddings[doc['id']] = embedding
    
    return embeddings

embeddings = create_mock_embeddings(documents)
print(f"\nCreated embeddings for {len(embeddings)} documents")
print(f"Embedding dimension: {embeddings[list(embeddings.keys())[0]].shape[0]}")

In [None]:
# Create vector store
class SimpleVectorStore:
    def __init__(self):
        self.documents = {}
        self.embeddings = {}
    
    def add_documents(self, documents: List[Dict], embeddings: Dict):
        """Add documents and embeddings to store"""
        for doc in documents:
            self.documents[doc['id']] = doc
            self.embeddings[doc['id']] = embeddings[doc['id']]
    
    def search(self, query_embedding: np.ndarray, top_k: int = 3) -> List[Dict]:
        """Search for similar documents"""
        similarities = []
        
        for doc_id, doc_embedding in self.embeddings.items():
            # Cosine similarity
            similarity = np.dot(query_embedding, doc_embedding) / (
                np.linalg.norm(query_embedding) * np.linalg.norm(doc_embedding)
            )
            similarities.append((doc_id, similarity))
        
        # Sort by similarity and return top-k
        similarities.sort(key=lambda x: x[1], reverse=True)
        results = []
        
        for doc_id, similarity in similarities[:top_k]:
            doc = self.documents[doc_id].copy()
            doc['similarity'] = round(float(similarity), 4)
            results.append(doc)
        
        return results

# Initialize and populate vector store
vector_store = SimpleVectorStore()
vector_store.add_documents(documents, embeddings)
print("Vector store initialized with documents and embeddings")

In [None]:
# Test RAG retrieval
# Create a mock query embedding (in production, encode actual query)
query = "How to optimize Spark performance with data skew?"
query_embedding = np.random.randn(384)

# Retrieve relevant documents
results = vector_store.search(query_embedding, top_k=3)

print(f"Query: {query}")
print(f"\nTop {len(results)} Relevant Documents:")
print("=" * 60)

for i, result in enumerate(results, 1):
    print(f"\n{i}. {result['title']} (similarity: {result['similarity']})")
    print(f"   Content: {result['content'][:80]}...")

In [None]:
# Generate recommendations using RAG
def generate_rag_recommendations(query: str, retrieved_docs: List[Dict]) -> List[str]:
    """
    Generate recommendations based on retrieved documents.
    In production, use an LLM like GPT-4 or Claude.
    """
    recommendations = []
    
    # Extract keywords from documents
    keywords = {}
    for doc in retrieved_docs:
        words = doc['content'].lower().split()
        for word in words:
            keywords[word] = keywords.get(word, 0) + 1
    
    # Generate synthetic recommendations
    if 'skew' in query.lower():
        recommendations.append("Use salting technique for join operations")
        recommendations.append("Consider repartitioning with even distribution")
        recommendations.append("Use adaptive partitioning strategy")
    
    if 'partition' in query.lower():
        recommendations.append("Optimize partition count based on data size")
        recommendations.append("Consider date-based partitioning strategy")
    
    return recommendations

recommendations = generate_rag_recommendations(query, results)

print("\n" + "=" * 60)
print("Generated Recommendations:")
print("=" * 60)

for i, rec in enumerate(recommendations, 1):
    print(f"{i}. {rec}")

## RAG Pipeline Summary

This notebook demonstrates the Retrieval-Augmented Generation (RAG) pipeline:

1. **Document Collection**: Gather Spark documentation
2. **Embedding**: Convert documents to embeddings using transformers
3. **Vector Store**: Store embeddings in Pinecone/Weaviate
4. **Retrieval**: Semantic search for relevant documents
5. **Generation**: Use retrieved context to generate recommendations

This approach combines the benefits of:
- Retrieval-based systems (factual accuracy)
- Generative models (natural language generation)