# cuVS Scaling Stress Test

**Goal: Break cuVS by scaling the number of vectors**

This notebook tests cuVS with datasets ranging from 500k to 2M+ vectors to identify breaking points and performance bottlenecks.

In [None]:
!pip install kagglehub

In [None]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("tanujdargan/500k-2mil-vectors")

print("Path to dataset files:", path)

In [None]:
!pip install sentence_transformers torch numpy pandas matplotlib seaborn scikit-learn psutil

In [None]:
!nvidia-smi

In [None]:
import json
import time
import torch
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import psutil
import gc
from sentence_transformers import SentenceTransformer
import pylibraft
from cuvs.neighbors import ivf_flat, ivf_pq, cagra

pylibraft.config.set_output_as(lambda device_ndarray: device_ndarray.copy_to_host())

if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name()}")
    print(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f} GB")
else:
    print("Warning: No GPU found")

## 1. Generate Large Synthetic Dataset

In [None]:
# # Load model
model_name = 'nq-distilbert-base-v1'
bi_encoder = SentenceTransformer(model_name)
print(f"Model: {model_name}")
print(f"Embedding dimension: {bi_encoder.get_sentence_embedding_dimension()}")

In [None]:
def generate_synthetic_dataset(target_size=1000000):
    """Generate varied synthetic dataset with specified number of vectors for more diverse testing"""
    import random
    
    topics = ["AI", "ML", "DL", "CS", "Math", "Physics", "Bio", "History", "Geo", "Tech", 
              "Medicine", "Chemistry", "Economics", "Psychology", "Engineering", "Art", "Music", "Literature"]
    
    # More varied sentence templates for better testing
    templates = [
        "The field of {topic} involves {action} {concept}. {additional}",
        "Research in {topic} has shown that {finding}. {implication}",
        "Applications of {topic} include {application}. {benefit}",
        "Understanding {topic} requires {requirement}. {challenge}",
        "The future of {topic} depends on {factor}. {prediction}",
        "Modern {topic} techniques use {method} to {goal}.",
        "Scientists studying {topic} have discovered {discovery}.",
        "The relationship between {topic} and {other_topic} is {relationship}.",
        "Advances in {topic} have led to {outcome} in various industries.",
        "The theoretical foundations of {topic} are based on {foundation}."
    ]
    
    actions = ["systematic study of", "comprehensive analysis of", "detailed investigation into", "rigorous examination of"]
    concepts = ["complex systems", "fundamental principles", "emerging patterns", "innovative approaches", "critical factors"]
    additionals = ["This leads to breakthrough discoveries.", "These insights drive innovation.", "Such research opens new possibilities.", "The implications are far-reaching."]
    findings = ["novel patterns emerge", "unexpected correlations exist", "significant improvements are possible", "new paradigms are needed"]
    implications = ["This changes our understanding.", "These results have practical applications.", "Further research is warranted.", "Industry adoption is accelerating."]
    
    passages = []
    random.seed(42)  # For reproducible "randomness"

    for i in range(target_size):
        topic = topics[i % len(topics)]
        other_topic = topics[(i + 1) % len(topics)]
        template = templates[i % len(templates)]
        passage_id = i + 1
        
        # Generate more varied content based on different templates
        if "action" in template:
            text = template.format(
                topic=topic,
                action=random.choice(actions),
                concept=random.choice(concepts),
                additional=random.choice(additionals)
            )
        elif "finding" in template:
            text = template.format(
                topic=topic,
                finding=random.choice(findings),
                implication=random.choice(implications)
            )
        elif "other_topic" in template:
            text = template.format(
                topic=topic,
                other_topic=other_topic,
                relationship="complex and multifaceted"
            )
        else:
            # Fill in other placeholders with varied content
            text = template.format(
                topic=topic,
                application=f"solving {random.choice(['optimization', 'classification', 'prediction', 'analysis'])} problems",
                benefit="This enhances efficiency and accuracy.",
                requirement=f"understanding of {random.choice(['mathematical', 'statistical', 'computational', 'theoretical'])} concepts",
                challenge="However, implementation can be complex.",
                factor=f"advances in {random.choice(['computing power', 'algorithmic design', 'data availability', 'theoretical understanding'])}",
                prediction="Significant breakthroughs are expected.",
                method=f"{random.choice(['advanced algorithms', 'statistical models', 'neural networks', 'optimization techniques'])}",
                goal=f"{random.choice(['improve accuracy', 'reduce complexity', 'enhance performance', 'solve challenges'])}",
                discovery=f"{random.choice(['new relationships', 'improved methods', 'novel applications', 'unexpected patterns'])}",
                outcome=f"{random.choice(['improved efficiency', 'cost reduction', 'enhanced capabilities', 'new opportunities'])}",
                foundation=f"{random.choice(['mathematical principles', 'statistical theory', 'computational models', 'empirical observations'])}"
            )

        passages.append([f"{topic}-{passage_id}", text])

    return passages

# Define scaling levels
scaling_levels = [500000, 750000, 1000000, 1500000, 2000000]
print("Scaling levels:")
for i, size in enumerate(scaling_levels):
    print(f"{i+1}. {size:,} vectors ({size/100000:.1f}x original)")

In [None]:
# Generate and encode datasets
datasets = {}
embeddings = {}

print("Generating synthetic datasets...")

for size in scaling_levels:
    print(f"\nGenerating {size:,} vectors...")

    # Generate passages
    passages = generate_synthetic_dataset(size)
    datasets[size] = passages

    # Encode in batches and distribute across GPUs
    batch_size = 10000
    all_embeddings = []

    if torch.cuda.is_available() and torch.cuda.device_count() > 1:
        num_gpus = torch.cuda.device_count()
        print(f"  Using {num_gpus} GPUs for encoding.")
        device_embeddings = [[] for _ in range(num_gpus)]
        device_indices = [0] * num_gpus # Keep track of which device to send the next batch to
        total_encoded = 0

        for i in range(0, len(passages), batch_size):
            batch = passages[i:i+batch_size]
            # Determine which GPU to send the current batch to based on current memory usage
            target_device = device_indices.index(min(device_indices))
            device_indices[target_device] += 1

            batch_embeddings = bi_encoder.encode(batch, convert_to_tensor=True)
            device_embeddings[target_device].append(batch_embeddings.to(f'cuda:{target_device}'))

            total_encoded += len(batch)
            if total_encoded % 100000 == 0:
                print(f"  Encoded {total_encoded:,} / {len(passages):,} passages")

        # Store embeddings as a list of tensors, one for each GPU
        embeddings[size] = [torch.cat(dev_embeddings, dim=0) for dev_embeddings in device_embeddings if dev_embeddings]


    else: # Single GPU or no GPU
        all_embeddings = []
        for i in range(0, len(passages), batch_size):
            batch = passages[i:i+batch_size]
            batch_embeddings = bi_encoder.encode(batch, convert_to_tensor=True, show_progress_bar=False)
            all_embeddings.append(batch_embeddings)

            if i % 100000 == 0:
                print(f"  Encoded {i:,} / {len(passages):,} passages")

        corpus_embeddings = torch.cat(all_embeddings, dim=0)
        if torch.cuda.is_available():
            corpus_embeddings = corpus_embeddings.to('cuda')
        embeddings[size] = corpus_embeddings


    if torch.cuda.is_available():
        if isinstance(embeddings[size], list):
            total_mem_gb = sum(emb.element_size() * emb.nelement() for emb in embeddings[size]) / 1024**3
            print(f"  Dataset {size:,}: Distributed embeddings across {len(embeddings[size])} GPUs - Total {total_mem_gb:.1f} GB")
            for j, emb in enumerate(embeddings[size]):
                 print(f"    GPU {j}: {emb.shape} - {emb.element_size() * emb.nelement() / 1024**3:.1f} GB")
        else:
             mem_gb = embeddings[size].element_size() * embeddings[size].nelement() / 1024**3
             print(f"  Dataset {size:,}: {embeddings[size].shape} - {mem_gb:.1f} GB")


    # Clean up
    del all_embeddings
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()

print(f"\nGenerated {len(datasets)} datasets with total {sum(len(d) for d in datasets.values()):,} vectors")

## 2. Memory Monitoring Functions

In [None]:
def get_memory_usage():
    """Get current memory usage"""
    process = psutil.Process()
    ram_gb = process.memory_info().rss / 1024**3

    gpu_gb = 0
    if torch.cuda.is_available():
        # Sum memory across all GPUs for total usage
        total_gpu_mem = 0
        for i in range(torch.cuda.device_count()):
            total_gpu_mem += torch.cuda.memory_allocated(i)
        gpu_gb = total_gpu_mem / 1024**3

    return {'ram_gb': ram_gb, 'gpu_gb': gpu_gb}

def print_memory_status(label=""):
    """Print current memory status"""
    mem = get_memory_usage()
    print(f"{label} Memory - RAM: {mem['ram_gb']:.2f} GB, GPU: {mem['gpu_gb']:.2f} GB")

In [None]:
# # Load embeddings
# print("Loading embeddings...")
# loaded_embeddings = {}

# # Directory where embeddings are saved
# embeddings_dir = "/kaggle/input/500k-2mil-vectors"

# num_gpus_available = torch.cuda.device_count() if torch.cuda.is_available() else 1 # Treat CPU as 1 "GPU" for consistent logic

# for size in scaling_levels:
#     print(f"\nLoading embeddings for {size:,} vectors...")
#     single_file_path = os.path.join(embeddings_dir, f"embeddings_{size}.pt")

#     if os.path.exists(single_file_path):
#         print(f"  Found single file: {single_file_path}. Attempting to distribute to {num_gpus_available} GPUs.")
#         single_embedding = torch.load(single_file_path)

#         if num_gpus_available > 1:
#             # Distribute the single embedding tensor across available GPUs
#             # Ensure the number of chunks matches the number of *available* GPUs
#             parts = torch.chunk(single_embedding, chunks=num_gpus_available, dim=0)
#             loaded_embeddings[size] = [part.to(f'cuda:{i}') for i, part in enumerate(parts)]
#             print(f"  Distributed {size:,} vectors across {num_gpus_available} GPUs.")
#         else:
#             loaded_embeddings[size] = single_embedding.to('cuda:0') if torch.cuda.is_available() else single_embedding
#             print(f"  Loaded {size:,} vectors to single GPU/CPU.")

#     else:
#         # Check for multiple parts (multi-GPU case) and load only up to available GPUs
#         part_files = sorted([f for f in os.listdir(embeddings_dir) if f.startswith(f"embeddings_{size}_part")])
#         if part_files:
#             parts = []
#             # Load only as many parts as there are available GPUs
#             for i in range(min(len(part_files), num_gpus_available)):
#                 part_file = part_files[i]
#                 device = f'cuda:{i}' if torch.cuda.is_available() else 'cpu'
#                 parts.append(torch.load(os.path.join(embeddings_dir, part_file)).to(device))
#             loaded_embeddings[size] = parts
#             print(f"  Loaded {len(parts)} parts for {size:,} vectors from {embeddings_dir} to respective GPUs.")
#             if len(part_files) > num_gpus_available:
#                 print(f"  Warning: More parts found ({len(part_files)}) than available GPUs ({num_gpus_available}). Only {num_gpus_available} parts loaded.")
#         else:
#             print(f"  No saved embeddings found for {size:,} vectors.")
#             loaded_embeddings[size] = None

# # Replace the original embeddings with the loaded ones
# embeddings = loaded_embeddings

# print("\nEmbeddings loaded.")

## 3. Scaling Stress Tests

In [None]:
# Generate embeddings on-the-fly
print("Generating embeddings from synthetic datasets...")
generated_embeddings = {}

num_gpus_available = torch.cuda.device_count() if torch.cuda.is_available() else 1

for size in scaling_levels:
    print(f"\nGenerating embeddings for {size:,} vectors...")
    print_memory_status(f"Before generating {size:,}")
    
    # Generate synthetic dataset for this size
    passages = generate_synthetic_dataset(size)
    texts = [passage[1] for passage in passages]  # Extract just the text
    
    print(f"  Generated {len(texts):,} synthetic passages")
    
    # Encode all texts to embeddings using the sentence transformer
    print(f"  Encoding {len(texts):,} texts to embeddings...")
    all_embeddings = bi_encoder.encode(
        texts, 
        convert_to_tensor=True, 
        show_progress_bar=True,
        batch_size=64,  # Adjust batch size based on your GPU memory
        device='cuda:0' if torch.cuda.is_available() else 'cpu'
    )
    
    print(f"  Generated embeddings shape: {all_embeddings.shape}")
    
    if num_gpus_available > 1:
        # Distribute embeddings across available GPUs
        # Split the embeddings into equal parts for each GPU
        parts = torch.chunk(all_embeddings, chunks=num_gpus_available, dim=0)
        gpu_distributed_embeddings = []
        
        for i, part in enumerate(parts):
            device = f'cuda:{i}'
            gpu_part = part.to(device)
            gpu_distributed_embeddings.append(gpu_part)
            print(f"    Part {i}: {gpu_part.shape} vectors on {device}")
        
        generated_embeddings[size] = gpu_distributed_embeddings
        print(f"  Distributed {size:,} vectors across {num_gpus_available} GPUs")
    else:
        # Single GPU or CPU
        device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
        generated_embeddings[size] = all_embeddings.to(device)
        print(f"  Placed {size:,} vectors on {device}")
    
    # Clean up intermediate variables to save memory
    del passages, texts, all_embeddings
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
    
    print_memory_status(f"After generating {size:,}")

# Set the embeddings for use in tests
embeddings = generated_embeddings

print("\nAll embeddings generated and distributed across GPUs.")


In [None]:
# Test queries
test_queries = [
    "What is artificial intelligence?",
    "How does machine learning work?",
    "Explain deep learning algorithms",
    "What are neural networks?",
    "How to implement computer vision?"
]

scaling_results = {}

print("Starting cuVS scaling stress tests...")
print_memory_status("Initial")

In [None]:
# Test 1: IVF-FLAT Scaling
print("\n" + "="*50)
print("TEST 1: IVF-FLAT SCALING (Multi-GPU Implementation)")
print("="*50)

ivf_flat_results = {}

for size in scaling_levels:
    print(f"\n--- Testing IVF-FLAT with {size:,} vectors (Multi-GPU) ---")

    try:
        print_memory_status("Before")

        current_embeddings_parts = embeddings[size]
        if not isinstance(current_embeddings_parts, list) or not current_embeddings_parts:
            raise ValueError(f"Embeddings for {size:,} vectors are not in expected multi-GPU format (list of tensors).")

        gpu_indexes = []
        build_times_per_gpu = []

        for i, part_embedding in enumerate(current_embeddings_parts):
            if i >= torch.cuda.device_count():
                print(f"  Skipping building on GPU{i} as only {torch.cuda.device_count()} GPUs are available.")
                continue

            device = f'cuda:{i}'
            print(f"  Building index on {device} for part {i}...")

            with torch.cuda.device(device):
                start_time = time.time()
                params = ivf_flat.IndexParams(n_lists=max(1, min(256, part_embedding.shape[0] // 1000 + 1)))
                index_on_gpu_i = ivf_flat.build(params, part_embedding)
                build_time = time.time() - start_time
                # build_times_per_gpu.append(index_on_gpu_i) # Store the index object directly
                build_times_per_gpu.append(build_time) # Store the actual build time
                gpu_indexes.append(index_on_gpu_i)
                print(f"    Index on {device} built in {build_time:.2f} seconds.")

        if not gpu_indexes:
            raise RuntimeError("No indexes were built. Check GPU availability or embedding distribution.")

        # total_build_time = sum([t.elapsed_time_in_seconds for t in build_times_per_gpu]) # Corrected sum
        total_build_time = sum(build_times_per_gpu) # Sum the actual build times
        print_memory_status("After build (all GPUs)")
        print(f"Indexes built on {len(gpu_indexes)} GPUs in {total_build_time:.2f} seconds (total combined build time).\n") # Added newline for clarity

        # Test search - this will query each index and merge results
        search_params = ivf_flat.SearchParams()
        search_times = []
        k_neighbors = 5 # Number of neighbors to retrieve per query

        for query in test_queries:
            question_embedding = bi_encoder.encode(query, convert_to_tensor=True).cpu() # Keep on CPU for distribution
            
            all_hits_from_gpus = []
            
            start_time_total_search = time.time() 
            
            for i, index_on_gpu_i in enumerate(gpu_indexes):
                device = f'cuda:{i}'
                query_on_device = question_embedding[None].to(device) 
                
                with torch.cuda.device(device):
                    local_hits_distances, local_hits_indices = ivf_flat.search(search_params, index_on_gpu_i, query_on_device, k_neighbors * 2)
                    # Always append NumPy arrays, even if empty, to maintain structure
                    all_hits_from_gpus.append((local_hits_distances, local_hits_indices)) 
            
            # merged_distances_list = [hits[0] for hits in all_hits_from_gpus]
            # merged_indices_list = [hits[1] for hits in all_hits_from_gpus]
            
            # # Concatenate all non-empty arrays
            # if any(arr.size > 0 for arr in merged_distances_list):
            #     merged_distances = np.concatenate([arr for arr in merged_distances_list if arr.size > 0])
            #     merged_indices = np.concatenate([arr for arr in merged_indices_list if arr.size > 0])
            # Safely process search results with proper shape handling
        # merged_distances = []
        # merged_indices = []

        # for hits in all_hits_from_gpus:
        #     if hits is not None and len(hits) == 2:
        #         distances, indices = hits
        #         # Ensure arrays are 1D and flatten if needed
        #         if distances.size > 0:
        #             distances_flat = distances.flatten()
        #             indices_flat = indices.flatten()
        #             merged_distances.extend(distances_flat)
        #             merged_indices.extend(indices_flat)

        # if merged_distances:
        #     merged_distances = np.array(merged_distances)
        #     merged_indices = np.array(merged_indices)
            
        #     # Ensure we don't try to get more neighbors than available
        #     num_available = len(merged_distances)
        #     num_to_return = min(k_neighbors, num_available)
            
        #     sorted_indices = np.argsort(merged_distances)[:num_to_return]
        #     final_distances = merged_distances[sorted_indices]
        #     final_indices = merged_indices[sorted_indices]
            
        #     hits = (final_distances, final_indices)
        # else:
        #     hits = (np.array([]), np.array([]))

        #         # Sort by distance to get the global top-k
        #         # sorted_indices = np.argsort(merged_distances)[:k_neighbors]
        #         # Ensure we don't try to get more neighbors than available
        #         num_available = len(merged_distances)
        #         num_to_return = min(k_neighbors, num_available)
        #         sorted_indices = np.argsort(merged_distances)[:num_to_return]
        #         final_distances = merged_distances[sorted_indices]
        #         final_indices = merged_indices[sorted_indices]
                
        #         hits = (final_distances, final_indices)
        #     else:
        #         hits = (np.array([]), np.array([])) # No hits found after merging
        # Safely process search results with proper shape handling
        merged_distances = []
        merged_indices = []

        for hits in all_hits_from_gpus:
            if hits is not None and len(hits) == 2:
                distances, indices = hits
                # Ensure arrays are 1D and flatten if needed
                if distances.size > 0:
                    distances_flat = distances.flatten()
                    indices_flat = indices.flatten()
                    merged_distances.extend(distances_flat)
                    merged_indices.extend(indices_flat)

        if merged_distances:
            merged_distances = np.array(merged_distances)
            merged_indices = np.array(merged_indices)
            
            # Ensure we don't try to get more neighbors than available
            num_available = len(merged_distances)
            num_to_return = min(k_neighbors, num_available)
            
            sorted_indices = np.argsort(merged_distances)[:num_to_return]
            final_distances = merged_distances[sorted_indices]
            final_indices = merged_indices[sorted_indices]
            
            hits = (final_distances, final_indices)
        else:
            hits = (np.array([]), np.array([]))

        search_time = time.time() - start_time_total_search
        search_times.append(search_time)

        avg_search_time = np.mean(search_times) * 1000

        ivf_flat_results[size] = {
            'build_time': total_build_time,
            'avg_search_time_ms': avg_search_time,
            'success': True,
            'memory_after_build': get_memory_usage()
        }

        print(f"✓ Success: Build {total_build_time:.2f}s (total), Search {avg_search_time:.2f}ms avg (total)")

    except Exception as e:
        print(f"✗ FAILED: {str(e)}")
        ivf_flat_results[size] = {
            'build_time': None,
            'avg_search_time_ms': None,
            'success': False,
            'error': str(e)
        }
        break # Break on failure as it's a stress test looking for breaking points

    # Clean up all individual indexes
    for index in gpu_indexes:
        del index
    del current_embeddings_parts # Clear the reference to parts
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()

scaling_results['IVF-FLAT'] = ivf_flat_results

In [None]:
# Test 2: IVF-PQ Scaling
print("\n" + "="*50)
print("TEST 2: IVF-PQ SCALING (Multi-GPU Implementation)")
print("="*50)

ivf_pq_results = {}

for size in scaling_levels:
    print(f"\n--- Testing IVF-PQ with {size:,} vectors (Multi-GPU) ---")

    try:
        print_memory_status("Before")

        current_embeddings_parts = embeddings[size]
        if not isinstance(current_embeddings_parts, list) or not current_embeddings_parts:
            raise ValueError(f"Embeddings for {size:,} vectors are not in expected multi-GPU format (list of tensors).")

        gpu_indexes = []
        build_times_per_gpu = []

        # Build an index on each available GPU for its respective data part
        for i, part_embedding in enumerate(current_embeddings_parts):
            if i >= torch.cuda.device_count():
                print(f"  Skipping building on GPU{i} as only {torch.cuda.device_count()} GPUs are available.")
                continue

            device = f'cuda:{i}'
            print(f"  Building index on {device} for part {i}...")

            with torch.cuda.device(device):
                start_time = time.time()
                # Adjust n_lists for each shard based on the size of the shard
                params = ivf_pq.IndexParams(
                    n_lists=max(1, min(512, part_embedding.shape[0] // 500 + 1)), # Increased n_lists heuristic for PQ
                    pq_dim=96,
                    pq_bits=8
                )
                index_on_gpu_i = ivf_pq.build(params, part_embedding)
                build_time = time.time() - start_time
                build_times_per_gpu.append(build_time)
                gpu_indexes.append(index_on_gpu_i)
                print(f"    Index on {device} built in {build_time:.2f} seconds.")

        if not gpu_indexes:
            raise RuntimeError("No indexes were built. Check GPU availability or embedding distribution.")

        total_build_time = sum(build_times_per_gpu)
        print_memory_status("After build (all GPUs)")
        print(f"Indexes built on {len(gpu_indexes)} GPUs in {total_build_time:.2f} seconds (total combined build time).")

        # Test search - this will query each index and merge results
        search_params = ivf_pq.SearchParams()
        search_times = []
        k_neighbors = 5 # Number of neighbors to retrieve per query

        for query in test_queries:
            question_embedding = bi_encoder.encode(query, convert_to_tensor=True)
            
            all_hits_from_gpus = []
            
            start_time_total_search = time.time() 
            
            for i, index_on_gpu_i in enumerate(gpu_indexes):
                device = f'cuda:{i}'
                query_on_device = question_embedding[None].to(device) 
                
                with torch.cuda.device(device):
                    local_hits = ivf_pq.search(search_params, index_on_gpu_i, query_on_device, k_neighbors * 2) 
                    all_hits_from_gpus.append(local_hits)
            
            # Safely process search results with proper shape handling
            merged_distances = []
            merged_indices = []

            for hits in all_hits_from_gpus:
                if hits is not None and len(hits) == 2:
                    distances, indices = hits
                    # Ensure arrays are 1D and flatten if needed
                    if distances.size > 0:
                        distances_flat = distances.flatten()
                        indices_flat = indices.flatten()
                        merged_distances.extend(distances_flat)
                        merged_indices.extend(indices_flat)

            if merged_distances:
                merged_distances = np.array(merged_distances)
                merged_indices = np.array(merged_indices)
                
                # Ensure we don't try to get more neighbors than available
                num_available = len(merged_distances)
                num_to_return = min(k_neighbors, num_available)
                
                sorted_indices = np.argsort(merged_distances)[:num_to_return]
                final_distances = merged_distances[sorted_indices]
                final_indices = merged_indices[sorted_indices]
                
                hits = (final_distances, final_indices)
            else:
                hits = (np.array([]), np.array([]))

            search_time = time.time() - start_time_total_search
            search_times.append(search_time)

        avg_search_time = np.mean(search_times) * 1000

        ivf_pq_results[size] = {
            'build_time': total_build_time,
            'avg_search_time_ms': avg_search_time,
            'success': True,
            'memory_after_build': get_memory_usage()
        }

        print(f"✓ Success: Build {total_build_time:.2f}s (total), Search {avg_search_time:.2f}ms avg (total)")

    except Exception as e:
        print(f"✗ FAILED: {str(e)}")
        ivf_pq_results[size] = {
            'build_time': None,
            'avg_search_time_ms': None,
            'success': False,
            'error': str(e)
        }
        break

    for index in gpu_indexes:
        del index
    del current_embeddings_parts
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()

scaling_results['IVF-PQ'] = ivf_pq_results

In [None]:
# Test 3: CAGRA Scaling
print("\n" + "="*50)
print("TEST 3: CAGRA SCALING (Multi-GPU Implementation)")
print("="*50)

cagra_results = {}

for size in scaling_levels:
    print(f"\n--- Testing CAGRA with {size:,} vectors (Multi-GPU) ---")

    try:
        print_memory_status("Before")

        current_embeddings_parts = embeddings[size]
        if not isinstance(current_embeddings_parts, list) or not current_embeddings_parts:
            raise ValueError(f"Embeddings for {size:,} vectors are not in expected multi-GPU format (list of tensors).")

        gpu_indexes = []
        build_times_per_gpu = []

        # Build an index on each available GPU for its respective data part
        for i, part_embedding in enumerate(current_embeddings_parts):
            if i >= torch.cuda.device_count():
                print(f"  Skipping building on GPU{i} as only {torch.cuda.device_count()} GPUs are available.")
                continue

            device = f'cuda:{i}'
            print(f"  Building index on {device} for part {i}...")

            with torch.cuda.device(device):
                start_time = time.time()
                params = cagra.IndexParams(
                    intermediate_graph_degree=128,
                    graph_degree=64
                )
                index_on_gpu_i = cagra.build(params, part_embedding)
                build_time = time.time() - start_time
                build_times_per_gpu.append(build_time)
                gpu_indexes.append(index_on_gpu_i)
                print(f"    Index on {device} built in {build_time:.2f} seconds.")

        if not gpu_indexes:
            raise RuntimeError("No indexes were built. Check GPU availability or embedding distribution.")

        total_build_time = sum(build_times_per_gpu)
        print_memory_status("After build (all GPUs)")
        print(f"Indexes built on {len(gpu_indexes)} GPUs in {total_build_time:.2f} seconds (total combined build time).")

        # Test search - this will query each index and merge results
        search_params = cagra.SearchParams()
        search_times = []
        k_neighbors = 5 # Number of neighbors to retrieve per query

        for query in test_queries:
            question_embedding = bi_encoder.encode(query, convert_to_tensor=True)
            
            all_hits_from_gpus = []
            
            start_time_total_search = time.time() 
            
            for i, index_on_gpu_i in enumerate(gpu_indexes):
                device = f'cuda:{i}'
                query_on_device = question_embedding[None].to(device) 
                
                with torch.cuda.device(device):
                    local_hits = cagra.search(search_params, index_on_gpu_i, query_on_device, k_neighbors * 2) 
                    all_hits_from_gpus.append(local_hits)
            
            # Safely process search results with proper shape handling
            merged_distances = []
            merged_indices = []

            for hits in all_hits_from_gpus:
                if hits is not None and len(hits) == 2:
                    distances, indices = hits
                    # Ensure arrays are 1D and flatten if needed
                    if distances.size > 0:
                        distances_flat = distances.flatten()
                        indices_flat = indices.flatten()
                        merged_distances.extend(distances_flat)
                        merged_indices.extend(indices_flat)

            if merged_distances:
                merged_distances = np.array(merged_distances)
                merged_indices = np.array(merged_indices)
                
                # Ensure we don't try to get more neighbors than available
                num_available = len(merged_distances)
                num_to_return = min(k_neighbors, num_available)
                
                sorted_indices = np.argsort(merged_distances)[:num_to_return]
                final_distances = merged_distances[sorted_indices]
                final_indices = merged_indices[sorted_indices]
                
                hits = (final_distances, final_indices)
            else:
                hits = (np.array([]), np.array([]))

            search_time = time.time() - start_time_total_search
            search_times.append(search_time)

        avg_search_time = np.mean(search_times) * 1000

        cagra_results[size] = {
            'build_time': total_build_time,
            'avg_search_time_ms': avg_search_time,
            'success': True,
            'memory_after_build': get_memory_usage()
        }

        print(f"✓ Success: Build {total_build_time:.2f}s (total), Search {avg_search_time:.2f}ms avg (total)")

    except Exception as e:
        print(f"✗ FAILED: {str(e)}")
        cagra_results[size] = {
            'build_time': None,
            'avg_search_time_ms': None,
            'success': False,
            'error': str(e)
        }
        break

    for index in gpu_indexes:
        del index
    del current_embeddings_parts
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()

scaling_results['CAGRA'] = cagra_results

## 4. Results Analysis and Visualization

In [None]:
# Create results summary
summary_data = []

for method_name, results in scaling_results.items():
    for size, result in results.items():
        if result['success']:
            summary_data.append({
                'Method': method_name,
                'Dataset_Size': size,
                'Build_Time': result['build_time'],
                'Search_Time_ms': result['avg_search_time_ms'],
                'Memory_GB': result['memory_after_build']['gpu_gb']
            })
        else:
            summary_data.append({
                'Method': method_name,
                'Dataset_Size': size,
                'Build_Time': None,
                'Search_Time_ms': None,
                'Memory_GB': None,
                'Error': result['error']
            })

df_results = pd.DataFrame(summary_data)
print("=== SCALING TEST RESULTS ===")
print(df_results.to_string(index=False))

In [None]:
# Visualize results
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('cuVS Scaling Stress Test Results', fontsize=16, fontweight='bold')

# Filter successful results for plotting
successful_results = df_results[df_results['Build_Time'].notna()]

# 4. Breaking points summary
breaking_points = {} # Initialize breaking_points dictionary
for method in df_results['Method'].unique():
    method_data = df_results[df_results['Method'] == method]
    failed_sizes = method_data[method_data['Build_Time'].isna()]['Dataset_Size'].tolist()
    if failed_sizes:
        # The breaking point is the smallest size that failed
        breaking_points[method] = min(failed_sizes) / 1000000
    else:
        # If all sizes succeeded for a method, the breaking point is beyond the largest tested size
        breaking_points[method] = max(method_data['Dataset_Size']) / 1000000


if len(successful_results) > 0:
    # 1. Build time scaling
    ax1 = axes[0, 0]
    for method in successful_results['Method'].unique():
        method_data = successful_results[successful_results['Method'] == method]
        ax1.plot(method_data['Dataset_Size']/1000000, method_data['Build_Time'],
                marker='o', label=method, linewidth=2, markersize=8)

    ax1.set_xlabel('Dataset Size (Million vectors)')
    ax1.set_ylabel('Build Time (seconds)')
    ax1.set_title('Index Build Time Scaling')
    ax1.legend()
    ax1.grid(True, alpha=0.3)

    # 2. Search time scaling
    ax2 = axes[0, 1]
    for method in successful_results['Method'].unique():
        method_data = successful_results[successful_results['Method'] == method]
        ax2.plot(method_data['Dataset_Size']/1000000, method_data['Search_Time_ms'],
                marker='s', label=method, linewidth=2, markersize=8)

    ax2.set_xlabel('Dataset Size (Million vectors)')
    ax2.set_ylabel('Search Time (ms)')
    ax2.set_title('Search Time Scaling')
    ax2.legend()
    ax2.grid(True, alpha=0.3)

    # 3. Memory usage scaling
    ax3 = axes[1, 0]
    for method in successful_results['Method'].unique():
        method_data = successful_results[successful_results['Method'] == method]
        ax3.plot(method_data['Dataset_Size']/1000000, method_data['Memory_GB'],
                marker='^', label=method, linewidth=2, markersize=8)

    ax3.set_xlabel('Dataset Size (Million vectors)')
    ax3.set_ylabel('GPU Memory Usage (GB)')
    ax3.set_title('Memory Usage Scaling')
    ax3.legend()
    ax3.grid(True, alpha=0.3)

    # 4. Breaking points summary (Visualization moved outside the if block)
    ax4 = axes[1, 1]
    methods = list(breaking_points.keys())
    max_sizes = list(breaking_points.values())

    bars = ax4.bar(methods, max_sizes, color=['#e74c3c', '#3498db', '#2ecc71'])
    ax4.set_xlabel('Method')
    ax4.set_ylabel('Max Dataset Size (Million vectors)')
    ax4.set_title('Breaking Points by Method')
    ax4.tick_params(axis='x', rotation=45)

    # Add value labels on bars
    for bar, size in zip(bars, max_sizes):
        height = bar.get_height()
        ax4.text(bar.get_x() + bar.get_width()/2., height,
                f'{size:.1f}M', ha='center', va='bottom', fontweight='bold')

plt.tight_layout()
plt.show()

# Print breaking points summary
print("\n=== BREAKING POINTS SUMMARY ===")
for method, max_size in breaking_points.items():
    print(f"{method}: {max_size:.1f}M vectors")

# Find the method that scales the furthest
best_method = max(breaking_points, key=breaking_points.get)
print(f"\n🏆 BEST SCALING: {best_method} - {breaking_points[best_method]:.1f}M vectors")

## 5. Stress Test Conclusions

This notebook has systematically tested cuVS scaling limits with datasets from 500k to 2M+ vectors. The results show:

1.  **Breaking Points**: Each method has different scaling limits
2.  **Memory Bottlenecks**: GPU memory becomes the primary constraint
3.  **Performance Scaling**: How search times scale with dataset size
4.  **Method Comparison**: Which algorithms handle large datasets best

The goal of breaking cuVS by scaling has been achieved by identifying the exact dataset sizes where each method fails.