In [None]:
# !pip install voyageai
# !pip install langchain

In [None]:
from google.colab import userdata

In [None]:
"""
Contextualized Embedding RAG Pipeline using Voyage Context-3
Designed for comparing performance against traditional embeddings
"""

import voyageai
import numpy as np
from typing import List, Dict, Tuple, Optional
from dataclasses import dataclass
# from langchain.text_splitter import SentenceSplitter # Corrected import
import json
import time

@dataclass
class Document:
    """Represents a document with metadata"""
    id: str
    title: str
    content: str
    metadata: Dict
    doc_type: str  # 'legal', 'technical', 'financial', etc.

@dataclass
class Chunk:
    """Represents a document chunk"""
    text: str
    doc_id: str
    chunk_id: int
    start_char: int
    end_char: int
    metadata: Dict

@dataclass
class RetrievalResult:
    """Results from retrieval with metrics"""
    chunks: List[Tuple[Chunk, float]]  # (chunk, similarity_score)
    latency_ms: float
    method: str  # 'contextualized' or 'standard'

In [None]:
import re

class SentenceSplitter:
    """Custom sentence splitter - just copy this class"""
    def split_text(self, text):
        # Split on . ! ? followed by space
        sentences = re.split(r'(?<=[.!?])\s+', text)
        return [s.strip() for s in sentences if s.strip()]

# Use it in your pipeline:
# self.text_splitter = SentenceSplitter()

In [None]:
class ContextualizedRAGPipeline:
    """
    Production-ready RAG pipeline comparing contextualized vs standard embeddings
    """

    def __init__(self, api_key: str, chunk_size: int = 512, chunk_overlap: int = 0):
        self.client = voyageai.Client(api_key=userdata.get('VOYAGE'))
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.text_splitter = SentenceSplitter()


        # Storage for embeddings and chunks
        self.chunks_store: List[Chunk] = []
        self.contextualized_embeddings: List[np.ndarray] = []
        self.standard_embeddings: List[np.ndarray] = []
        self.documents: Dict[str, Document] = {}

    def chunk_document(self, document: Document) -> List[Chunk]:
        """
        Chunk a document while preserving structure and metadata
        """
        # Split text into chunks
        text_chunks = self.text_splitter.split_text(document.content)

        chunks = []
        current_pos = 0

        for i, text in enumerate(text_chunks):
            # Find actual position in original document
            start_pos = document.content.find(text, current_pos)
            end_pos = start_pos + len(text)

            chunk = Chunk(
                text=text,
                doc_id=document.id,
                chunk_id=i,
                start_char=start_pos,
                end_char=end_pos,
                metadata={
                    **document.metadata,
                    'doc_title': document.title,
                    'doc_type': document.doc_type,
                    'chunk_position': i / len(text_chunks),  # Relative position
                    'total_chunks': len(text_chunks)
                }
            )
            chunks.append(chunk)
            current_pos = end_pos

        return chunks

    def index_documents(self, documents: List[Document],
                       use_contextualized: bool = True,
                       use_standard: bool = True) -> Dict:
        """
        Index documents using both contextualized and standard embeddings
        """
        print(f"Indexing {len(documents)} documents...")

        # Store documents
        for doc in documents:
            self.documents[doc.id] = doc

        # Chunk all documents
        all_chunks_by_doc = {}
        for doc in documents:
            chunks = self.chunk_document(doc)
            all_chunks_by_doc[doc.id] = chunks
            self.chunks_store.extend(chunks)

        indexing_stats = {}

        # Generate contextualized embeddings
        if use_contextualized:
            start_time = time.time()

            # Prepare input for contextualized embedding
            # Group chunks by document
            inputs = []
            for doc_id, chunks in all_chunks_by_doc.items():
                inputs.append([chunk.text for chunk in chunks])

            # Get contextualized embeddings
            embeds_obj = self.client.contextualized_embed(
                inputs=inputs,
                model="voyage-context-3",
                input_type="document",
                output_dimension=1024  # Using smaller dimension for efficiency
            )

            # Flatten embeddings while maintaining order
            for result in embeds_obj.results:
                self.contextualized_embeddings.extend(result.embeddings)

            indexing_stats['contextualized'] = {
                'time_seconds': time.time() - start_time,
                'total_tokens': embeds_obj.total_tokens,
                'embeddings_generated': len(self.contextualized_embeddings)
            }

        # Generate standard embeddings for comparison
        if use_standard:
            start_time = time.time()

            # Get all chunk texts
            chunk_texts = [chunk.text for chunk in self.chunks_store]

            # Standard embedding call
            standard_embeds = self.client.embed(
                texts=chunk_texts,
                model="voyage-3-large",
                input_type="document"
            )

            self.standard_embeddings = standard_embeds.embeddings

            indexing_stats['standard'] = {
                'time_seconds': time.time() - start_time,
                'embeddings_generated': len(self.standard_embeddings)
            }

        return indexing_stats

    def search(self, query: str, top_k: int = 10,
              method: str = 'contextualized') -> RetrievalResult:
        """
        Search for relevant chunks using specified embedding method
        """
        start_time = time.time()

        if method == 'contextualized':
            # Embed query for contextualized search
            query_embed = self.client.contextualized_embed(
                inputs=[[query]],
                model="voyage-context-3",
                input_type="query",
                output_dimension=1024
            ).results[0].embeddings[0]

            embeddings = self.contextualized_embeddings
        else:
            # Standard query embedding
            query_embed = self.client.embed(
                texts=[query],
                model="voyage-3-large",
                input_type="query"
            ).embeddings[0]

            embeddings = self.standard_embeddings

        # Calculate similarities
        similarities = np.dot(embeddings, query_embed)

        # Get top-k indices
        top_indices = np.argsort(similarities)[-top_k:][::-1]

        # Prepare results
        results = []
        for idx in top_indices:
            chunk = self.chunks_store[idx]
            score = float(similarities[idx])
            results.append((chunk, score))

        latency_ms = (time.time() - start_time) * 1000

        return RetrievalResult(
            chunks=results,
            latency_ms=latency_ms,
            method=method
        )

    def compare_retrieval_methods(self, queries: List[str], top_k: int = 5) -> Dict:
        """
        Compare contextualized vs standard retrieval across multiple queries
        """
        comparison_results = []

        for query in queries:
            print(f"\nQuery: {query}")

            # Search with both methods
            context_results = self.search(query, top_k, 'contextualized')
            standard_results = self.search(query, top_k, 'standard')

            # Compare top results
            query_comparison = {
                'query': query,
                'contextualized': {
                    'top_chunk': [c[0].text for c in context_results.chunks[:3]],# if context_results.chunks else None,
                    'top_score': context_results.chunks[0][1] if context_results.chunks else 0,
                    'latency_ms': context_results.latency_ms,
                    'top_doc_ids': [c[0].doc_id for c in context_results.chunks[:3]]
                },
                'standard': {
                    'top_chunk': [c[0].text for c in standard_results.chunks[:3]],
                    'top_score': standard_results.chunks[0][1] if standard_results.chunks else 0,
                    'latency_ms': standard_results.latency_ms,
                    'top_doc_ids': [c[0].doc_id for c in standard_results.chunks[:3]]
                }
            }

            comparison_results.append(query_comparison)

            # Print summary
            print(f"  Contextualized top score: {query_comparison['contextualized']['top_score']:.4f}")
            print(f"  Standard top score: {query_comparison['standard']['top_score']:.4f}")
            print(f"  Score improvement: {(query_comparison['contextualized']['top_score'] - query_comparison['standard']['top_score']):.4f}")

        return {
            'queries': comparison_results,
            'summary': self._calculate_summary_metrics(comparison_results)
        }

    def _calculate_summary_metrics(self, results: List[Dict]) -> Dict:
        """Calculate summary metrics for comparison"""
        contextualized_scores = [r['contextualized']['top_score'] for r in results]
        standard_scores = [r['standard']['top_score'] for r in results]

        return {
            'avg_contextualized_score': np.mean(contextualized_scores),
            'avg_standard_score': np.mean(standard_scores),
            'avg_score_improvement': np.mean([c - s for c, s in zip(contextualized_scores, standard_scores)]),
            'avg_latency_contextualized_ms': np.mean([r['contextualized']['latency_ms'] for r in results]),
            'avg_latency_standard_ms': np.mean([r['standard']['latency_ms'] for r in results])
        }

In [None]:
# Example usage for testing long technical documentation
def create_test_documents() -> List[Document]:
    """
    Create sample documents that showcase contextualized embedding benefits
    """
    documents = [
        Document(
            id="aws_s3_docs",
            title="AWS S3 Technical Documentation",
            content="""
Amazon Simple Storage Service (Amazon S3) is an object storage service offering
industry-leading scalability, data availability, security, and performance.

Storage Classes:
S3 Standard is designed for frequently accessed data with low latency requirements.
It delivers low latency and high throughput performance.

S3 Standard-IA (Infrequent Access) is designed for data that is accessed less
frequently but requires rapid access when needed. It offers lower storage pricing
than S3 Standard.

S3 Glacier Instant Retrieval delivers low-cost storage for long-lived data that
is rarely accessed and requires retrieval in milliseconds.

Encryption:
All Amazon S3 buckets have encryption configured by default. The default encryption
uses server-side encryption with Amazon S3 managed keys (SSE-S3). You can also use
server-side encryption with AWS KMS keys (SSE-KMS) or customer-provided keys (SSE-C).

For SSE-KMS, AWS KMS generates and manages the cryptographic keys. KMS uses
envelope encryption with AES-256-GCM. The data key is encrypted under a KMS key
that never leaves AWS KMS unencrypted.

Access Control:
S3 Block Public Access provides settings for access points, buckets, and accounts
to help you manage public access to Amazon S3 resources. By default, new buckets,
access points, and objects don't allow public access.

Bucket policies are JSON-based access policy language that can be used to add or
deny permissions for different principals and actions. IAM policies specify what
actions are allowed or denied on AWS resources.
            """,
            metadata={'source': 'AWS Documentation', 'version': '2024-Q4'},
            doc_type='technical'
        ),

        Document(
            id="azure_blob_docs",
            title="Azure Blob Storage Documentation",
            content="""
Azure Blob Storage is Microsoft's object storage solution for the cloud optimized
for storing massive amounts of unstructured data.

Storage Tiers:
Hot tier is optimized for storing data that is accessed frequently. It offers the
highest storage costs but lowest access costs.

Cool tier is optimized for storing data that is infrequently accessed and stored
for at least 30 days. It has lower storage costs but higher access costs than Hot.

Archive tier is optimized for data that is rarely accessed and stored for at least
180 days with flexible latency requirements. It offers the lowest storage costs.

Encryption:
Azure Storage encryption is enabled by default for all storage accounts using
256-bit AES encryption. Data is encrypted using Microsoft-managed keys by default.

Customer-managed keys with Azure Key Vault can be configured. These keys are
stored in your Azure Key Vault instance. The storage service uses envelope
encryption where data is encrypted with a DEK, which is then encrypted with a KEK.

Access Management:
Azure role-based access control (Azure RBAC) is the authorization system built on
Azure Resource Manager that provides fine-grained access management. You can assign
roles to users, groups, and applications at a certain scope.

Shared Access Signatures (SAS) provide secure delegated access to resources in your
storage account. With a SAS, you have granular control over how a client can access
your data including validity interval and allowed IP addresses.
            """,
            metadata={'source': 'Azure Documentation', 'version': '2024-Q4'},
            doc_type='technical'
        ),

        Document(
            id="fintech_compliance",
            title="FinTech Compliance Requirements Document",
            content="""
Regulatory Compliance Framework for Digital Payment Platforms

PCI DSS Requirements:
All payment card data must be encrypted both in transit and at rest. The encryption
standard must meet AES-256 minimum requirements. Network segmentation must isolate
cardholder data environment from other networks.

Requirement 3.4 specifically mandates that PAN must be rendered unreadable anywhere
it is stored including portable digital media, backup media, and logs. Strong
cryptography with associated key-management processes must be implemented.

GDPR Compliance:
The platform must implement privacy by design principles. Users must provide explicit
consent for data processing. The right to erasure (right to be forgotten) must be
technically implemented within 30 days of request.

Data minimization principles require that only data necessary for the specified
purpose should be collected. The platform must maintain detailed records of all
data processing activities including purpose, categories, and retention periods.

AML/KYC Procedures:
Customer identification program (CIP) must verify identity within a reasonable time
after account opening. This includes collecting name, date of birth, address, and
identification number.

Transaction monitoring systems must flag suspicious activities including structuring,
rapid movement of funds, and transactions with high-risk jurisdictions. The system
must generate SARs (Suspicious Activity Reports) within 30 days of detection.

Data Retention:
Transaction records must be retained for 5 years from the date of transaction. KYC
documentation must be retained for 5 years after the business relationship ends.
Audit logs must be retained for 7 years and must be immutable.
            """,
            metadata={'source': 'Compliance Department', 'version': '2024-Q3'},
            doc_type='legal'
        )
    ]

    return documents


In [None]:

# Test queries that benefit from context
def create_test_queries() -> List[str]:
    """
    Queries designed to test contextualized vs standard retrieval
    """
    return [
        # Queries requiring document context
        "What encryption does AWS use for SSE-KMS?",  # "AES-256-GCM" appears without "AWS" context
        "How long must KYC documentation be retained?",  # "5 years" appears without "KYC" context
        "What are the storage costs for Archive tier?",  # "lowest storage costs" needs Azure context
        "How quickly must right to erasure be implemented?",  # "30 days" needs GDPR context
        "What is the data retion policy for audit logs?", # "7 years"
        "What is the policty arond CIP?",
    ]

# Main execution example
if __name__ == "__main__":
    # Initialize pipeline
    pipeline = ContextualizedRAGPipeline(
        api_key=userdata.get('VOYAGE'),
        chunk_size=512,
        chunk_overlap=0  # As recommended by Voyage
    )

    # Create and index test documents
    documents = create_test_documents()
    indexing_stats = pipeline.index_documents(documents)

    print("Indexing Statistics:")
    print(json.dumps(indexing_stats, indent=2))

    # Run comparison tests
    queries = create_test_queries()
    comparison = pipeline.compare_retrieval_methods(queries, top_k=5)

    # print("\n" + "="*50)
    # print("RETRIEVAL COMPARISON RESULTS")
    # print("="*50)
    # print(json.dumps(comparison['summary'], indent=2))

Indexing 3 documents...
Indexing Statistics:
{
  "contextualized": {
    "time_seconds": 0.6755492687225342,
    "total_tokens": 933,
    "embeddings_generated": 49
  },
  "standard": {
    "time_seconds": 0.6564440727233887,
    "embeddings_generated": 49
  }
}

Query: What encryption does AWS use for SSE-KMS?
  Contextualized top score: 0.6144
  Standard top score: 0.7523
  Score improvement: -0.1380

Query: How long must KYC documentation be retained?
  Contextualized top score: 0.5676
  Standard top score: 0.6816
  Score improvement: -0.1140

Query: What are the storage costs for Archive tier?
  Contextualized top score: 0.5613
  Standard top score: 0.5958
  Score improvement: -0.0346

Query: How quickly must right to erasure be implemented?
  Contextualized top score: 0.5024
  Standard top score: 0.7033
  Score improvement: -0.2009

Query: What is the data retion policy for audit logs?
  Contextualized top score: 0.4802
  Standard top score: 0.7008
  Score improvement: -0.2206

Qu

In [None]:
for query_result in comparison['queries']:
    print(f"Query: {query_result['query']}")
    print("  Contextualized:")
    print(f"    Top Chunk: {query_result['contextualized']['top_chunk']}")
    print(f"    Top Doc IDs: {query_result['contextualized']['top_doc_ids']}")
    print(f"    Top Score: {query_result['contextualized']['top_score']}")
    print("  Standard:")
    print(f"    Top Chunk: {query_result['standard']['top_chunk']}")
    print(f"    Top Doc IDs: {query_result['standard']['top_doc_ids']}")
    print(f"    Top Score: {query_result['standard']['top_score']}")
    print("-" * 20)

Query: What encryption does AWS use for SSE-KMS?
  Contextualized:
    Top Chunk: ['KMS uses \nenvelope encryption with AES-256-GCM.', 'You can also use \nserver-side encryption with AWS KMS keys (SSE-KMS) or customer-provided keys (SSE-C).', 'The data key is encrypted under a KMS key \nthat never leaves AWS KMS unencrypted.']
    Top Doc IDs: ['aws_s3_docs', 'aws_s3_docs', 'aws_s3_docs']
    Top Score: 0.6143524661891981
  Standard:
    Top Chunk: ['KMS uses \nenvelope encryption with AES-256-GCM.', 'For SSE-KMS, AWS KMS generates and manages the cryptographic keys.', 'The default encryption \nuses server-side encryption with Amazon S3 managed keys (SSE-S3).']
    Top Doc IDs: ['aws_s3_docs', 'aws_s3_docs', 'aws_s3_docs']
    Top Score: 0.7523243177605102
--------------------
Query: How long must KYC documentation be retained?
  Contextualized:
    Top Chunk: ['KYC \ndocumentation must be retained for 5 years after the business relationship ends.', 'Data Retention:\nTransaction record