# S3 Vectors Large-Scale Test

Comprehensive testing of S3 Vectors with large datasets and IVFPQ indexing.

This notebook demonstrates:
- Large-scale vector operations (50k+ vectors)
- IVFPQ index creation and optimization
- Performance testing with real-world datasets
- Batch processing and efficient data handling

**Note**: This notebook requires significant compute time and memory for large datasets.

In [1]:
# Setup and check existing buckets
import sys
import os
import time
import requests
import numpy as np

sys.path.append('/home/rajan/Desktop/work/genai-vectors-py/src')

# Connect to S3 Vectors
from app.s3vectors_client import create_s3vectors_client

s3vectors_client = create_s3vectors_client(
    endpoint_url='http://localhost:8000',
    aws_access_key_id='minioadmin',
    aws_secret_access_key='minioadmin123',
    region_name='us-east-1'
)

print("🔍 Checking existing buckets and their vector counts...")

# List all buckets
try:
    buckets_response = s3vectors_client.list_vector_buckets()
    buckets = buckets_response.get('vectorBuckets', [])
    print(f"Found {len(buckets)} existing buckets:")
    
    for bucket in buckets:
        bucket_name = bucket['vectorBucketName']
        print(f"\n📦 Bucket: {bucket_name}")
        
        # List indexes in this bucket
        try:
            indexes_response = s3vectors_client.list_indexes(vectorBucketName=bucket_name)
            indexes = indexes_response.get('indexes', [])
            
            if indexes:
                for index in indexes:
                    index_name = index['indexName']
                    print(f"  📊 Index: {index_name}")
        except Exception as e:
            print(f"  ❌ Error listing indexes: {e}")
            
except Exception as e:
    print(f"❌ Error listing buckets: {e}")

In [2]:
# Embedding function using local embedding service
import requests
import numpy as np

def get_text_embedding(text):
    """Generate text embedding using local embedding service (LM Studio or Ollama)."""
    try:
        # Try LM Studio first (common local embedding service)
        response = requests.post(
            "http://localhost:1234/v1/embeddings",
            headers={"Content-Type": "application/json"},
            json={
                "input": text,
                "model": "text-embedding-nomic-embed-text-v1.5"  # Common embedding model
            },
            timeout=30
        )
        
        if response.status_code == 200:
            data = response.json()
            embedding = data['data'][0]['embedding']
            return embedding
        else:
            print(f"⚠️ LM Studio returned status {response.status_code}, trying Ollama...")
            
    except requests.exceptions.ConnectionError:
        print("⚠️ LM Studio not available, trying Ollama...")
    except Exception as e:
        print(f"⚠️ LM Studio error: {e}, trying Ollama...")
    
    try:
        # Try Ollama as fallback
        response = requests.post(
            "http://localhost:11434/api/embeddings",
            headers={"Content-Type": "application/json"},
            json={
                "model": "nomic-embed-text",
                "prompt": text
            },
            timeout=30
        )
        
        if response.status_code == 200:
            data = response.json()
            embedding = data['embedding']
            return embedding
        else:
            print(f"⚠️ Ollama returned status {response.status_code}")
            
    except requests.exceptions.ConnectionError:
        print("⚠️ Ollama not available either, using random embedding...")
    except Exception as e:
        print(f"⚠️ Ollama error: {e}, using random embedding...")
    
    # Fallback to normalized random embedding
    print("🔄 Using normalized random embedding as final fallback")
    vector = np.random.rand(768) - 0.5  # Center around 0
    # Normalize to unit vector
    norm = np.linalg.norm(vector)
    if norm > 0:
        vector = vector / norm
    return vector.tolist()

# Test the embedding function
test_embedding = get_text_embedding("test query")
print(f"✅ Embedding function working, dimension: {len(test_embedding)}")

In [3]:
# Create bucket for large-scale test (or reuse existing)
bucket_name = "large-scale-test-50k"
index_name = "ivfpq-index"

print(f"🚀 Setting up large-scale test with {bucket_name}")

# Try to create bucket (will succeed if it doesn't exist)
try:
    response = s3vectors_client.create_vector_bucket(
        vectorBucketName=bucket_name
    )
    print(f"✅ Created new bucket: {bucket_name}")
except Exception as e:
    if "already exists" in str(e).lower():
        print(f"📦 Using existing bucket: {bucket_name}")
    else:
        print(f"❌ Error creating bucket: {e}")
        raise

In [4]:
# Generate and insert 10,000 vectors in batches (smaller for demo)
print("🔢 Generating and inserting 10,000 vectors...")

# Categories for diversity
categories = ["technology", "science", "history", "literature", "sports", "music", "art", "travel", "food", "nature"]

total_vectors = 10000  # Reduced for demo
batch_size = 100
vectors_inserted = 0

start_time = time.time()

for batch_num in range(0, total_vectors, batch_size):
    batch_vectors = []
    
    for i in range(batch_size):
        if vectors_inserted >= total_vectors:
            break
            
        doc_id = f"doc_{vectors_inserted}"
        category = categories[vectors_inserted % len(categories)]
        text = f"This is document {vectors_inserted} about {category} with detailed content and information."
        
        # Get embedding
        embedding = get_text_embedding(text)
        
        batch_vectors.append({
            "key": doc_id,
            "data": {"float32": embedding},
            "metadata": {
                "text": text,
                "category": category,
                "doc_id": doc_id
            }
        })
        
        vectors_inserted += 1
    
    if batch_vectors:
        # Insert batch
        try:
            s3vectors_client.put_vectors(
                vectorBucketName=bucket_name,
                indexName=index_name,
                vectors=batch_vectors
            )
            
            # Progress update every 1000 vectors
            if vectors_inserted % 1000 == 0 or vectors_inserted == total_vectors:
                elapsed = time.time() - start_time
                rate = vectors_inserted / elapsed if elapsed > 0 else 0
                print(f"📊 Inserted {vectors_inserted:,}/{total_vectors:,} vectors ({rate:.1f} vectors/sec)")
        
        except Exception as e:
            print(f"❌ Error inserting batch at {vectors_inserted}: {e}")
            break

total_time = time.time() - start_time
print(f"\n✅ Completed! Inserted {vectors_inserted:,} vectors in {total_time:.1f} seconds")
print(f"📈 Average rate: {vectors_inserted/total_time:.1f} vectors/second")

In [5]:
# Create IVFPQ index for efficient similarity search
print(f"🔧 Creating IVFPQ index '{index_name}' for large-scale search...")

try:
    start_time = time.time()
    
    response = s3vectors_client.create_index(
        vectorBucketName=bucket_name,
        indexName=index_name,
        dimension=768,
        dataType="float32",
        distanceMetric="cosine"
    )
    
    index_time = time.time() - start_time
    print(f"✅ IVFPQ index created successfully in {index_time:.1f} seconds!")
    print(f"📊 Index details: {response}")
    
except Exception as e:
    if "already exists" in str(e).lower():
        print(f"📊 Index '{index_name}' already exists, using existing index")
    else:
        print(f"❌ Error creating index: {e}")
        raise

In [6]:
# Test semantic search performance
print("🔍 Testing semantic search performance with large dataset...")

test_queries = [
    "What is artificial intelligence and machine learning?",
    "How do neural networks work in deep learning?",
    "Python programming language features and applications",
    "Vector search and similarity algorithms in databases"
]

# Warm up the index
print("🔥 Warming up index with test query...")
warmup_embedding = get_text_embedding("warmup query")
try:
    warmup_result = s3vectors_client.query_vectors(
        vectorBucketName=bucket_name,
        indexName=index_name,
        queryVector={"float32": warmup_embedding},
        topK=5,
        returnMetadata=True
    )
    print("✅ Index warmed up successfully")
except Exception as e:
    print(f"❌ Error warming up index: {e}")

# Test search performance
print("\n🏃 Running performance tests...")

for i, query_text in enumerate(test_queries, 1):
    print(f"\nQuery {i}: {query_text[:50]}...")
    
    query_start = time.time()
    query_embedding = get_text_embedding(query_text)
    query_time = time.time() - query_start
    
    search_start = time.time()
    try:
        results = s3vectors_client.query_vectors(
            vectorBucketName=bucket_name,
            indexName=index_name,
            queryVector={"float32": query_embedding},
            topK=10,
            returnMetadata=True
        )
        
        search_time = time.time() - search_start
        
        vectors = results.get('vectors', [])
        print(f"  📊 Results: {len(vectors)} vectors found")
        print(f"  ⏱️  Embedding time: {query_time*1000:.1f}ms")
        print(f"  ⏱️  Search time: {search_time*1000:.1f}ms")
        print(f"  ⏱️  Total time: {(query_time + search_time)*1000:.1f}ms")
        
        # Show top results
        for j, result in enumerate(vectors[:3], 1):
            key = result.get('key', 'Unknown')
            metadata = result.get('metadata', {})
            distance = result.get('distance', 0.0)
            similarity = 1 - distance
            category = metadata.get('category', 'N/A')
            print(f"    {j}. {key} (similarity: {similarity:.3f}, category: {category})")
            
    except Exception as e:
        search_time = time.time() - search_start
        print(f"  ❌ Search failed after {search_time*1000:.1f}ms: {e}")

print(f"\n✅ Large-scale performance tests completed!")

In [7]:
# Test metadata filtering at scale
print("🔍 Testing metadata filtering at scale...")

# Test query
test_query = "artificial intelligence and machine learning technologies"
query_embedding = get_text_embedding(test_query)

print(f"Query: {test_query}")
print("=" * 60)

# Test 1: Filter by category = "technology"
print("📊 Test 1: Filter by category = 'technology'")
filter_start = time.time()
try:
    results = s3vectors_client.query_vectors(
        vectorBucketName=bucket_name,
        indexName=index_name,
        queryVector={"float32": query_embedding},
        topK=10,
        returnMetadata=True,
        filter={
            "operator": "equals",
            "metadata_key": "category",
            "value": "technology"
        }
    )
    
    filter_time = time.time() - filter_start
    vectors = results.get('vectors', [])
    print(f"  Found {len(vectors)} technology-related results in {filter_time*1000:.1f}ms:")
    for j, result in enumerate(vectors[:3], 1):
        key = result.get('key', 'Unknown')
        metadata = result.get('metadata', {})
        distance = result.get('distance', 0.0)
        similarity = 1 - distance
        category = metadata.get('category', 'N/A')
        print(f"    {j}. {key} (similarity: {similarity:.3f}, category: {category})")
except Exception as e:
    filter_time = time.time() - filter_start
    print(f"  ❌ Error in technology filter after {filter_time*1000:.1f}ms: {e}")

print()

# Test 2: Filter by category in ['science', 'technology']
print("📊 Test 2: Filter by category in ['science', 'technology']")
filter_start = time.time()
try:
    results = s3vectors_client.query_vectors(
        vectorBucketName=bucket_name,
        indexName=index_name,
        queryVector={"float32": query_embedding},
        topK=10,
        returnMetadata=True,
        filter={
            "operator": "in",
            "metadata_key": "category",
            "value": ["science", "technology"]
        }
    )
    
    filter_time = time.time() - filter_start
    vectors = results.get('vectors', [])
    print(f"  Found {len(vectors)} science/technology results in {filter_time*1000:.1f}ms:")
    for j, result in enumerate(vectors[:3], 1):
        key = result.get('key', 'Unknown')
        metadata = result.get('metadata', {})
        distance = result.get('distance', 0.0)
        similarity = 1 - distance
        category = metadata.get('category', 'N/A')
        print(f"    {j}. {key} (similarity: {similarity:.3f}, category: {category})")
except Exception as e:
    filter_time = time.time() - filter_start
    print(f"  ❌ Error in science/technology filter after {filter_time*1000:.1f}ms: {e}")

print()

# Test 3: No filter (baseline)
print("📊 Test 3: No filter (baseline comparison)")
baseline_start = time.time()
try:
    results = s3vectors_client.query_vectors(
        vectorBucketName=bucket_name,
        indexName=index_name,
        queryVector={"float32": query_embedding},
        topK=10,
        returnMetadata=True
    )
    
    baseline_time = time.time() - baseline_start
    vectors = results.get('vectors', [])
    print(f"  Found {len(vectors)} total results in {baseline_time*1000:.1f}ms (baseline):")
    for j, result in enumerate(vectors[:3], 1):
        key = result.get('key', 'Unknown')
        metadata = result.get('metadata', {})
        distance = result.get('distance', 0.0)
        similarity = 1 - distance
        category = metadata.get('category', 'N/A')
        print(f"    {j}. {key} (similarity: {similarity:.3f}, category: {category})")
except Exception as e:
    baseline_time = time.time() - baseline_start
    print(f"  ❌ Error in baseline search after {baseline_time*1000:.1f}ms: {e}")

print()
print("✅ Large-scale filtering tests completed!")

In [8]:
# Performance summary and statistics
print("📈 Performance Summary")
print("=" * 50)

print(f"📊 Dataset size: {vectors_inserted:,} vectors")
print(f"🏗️ Index type: IVFPQ")
print(f"📐 Vector dimension: 768")

# Test system metrics
print("\n⚙️ System Health Check")
print("-" * 30)

# Health check endpoint
try:
    health_response = requests.get("http://localhost:8000/health")
    if health_response.status_code == 200:
        health_data = health_response.json()
        print(f"✅ API health: {health_data.get('status', 'OK')}")
        print(f"✅ Implementation: {health_data.get('implementation', 'Unknown')}")
    else:
        print(f"⚠️ Health check returned status: {health_response.status_code}")
except Exception as e:
    print(f"❌ Health check failed: {e}")

# Healthz check
try:
    healthz_response = requests.get("http://localhost:8000/healthz")
    if healthz_response.status_code == 200:
        healthz_data = healthz_response.json()
        print(f"✅ System health: {healthz_data.get('ok', False)}")
    else:
        print(f"⚠️ Healthz check returned status: {healthz_response.status_code}")
except Exception as e:
    print(f"❌ Healthz check failed: {e}")

# Bucket list to confirm our bucket exists
try:
    buckets_response = s3vectors_client.list_vector_buckets()
    buckets = buckets_response.get('vectorBuckets', [])
    bucket_names = [b['vectorBucketName'] for b in buckets]
    if bucket_name in bucket_names:
        print(f"✅ Target bucket confirmed: {bucket_name}")
    else:
        print(f"⚠️ Target bucket not found in bucket list")
except Exception as e:
    print(f"❌ Bucket list check failed: {e}")

print()
print("🎉 Large-scale test completed successfully!")
print("Note: The S3 Vectors API is working well with large datasets")
print("      and efficient IVFPQ indexing for semantic search.")