diff --git a/engine/base_client/client.py b/engine/base_client/client.py index f0fc7ab9..8f648bed 100644 --- a/engine/base_client/client.py +++ b/engine/base_client/client.py @@ -180,8 +180,9 @@ def run_experiment( # Extract mixed workload parameters insert_fraction = 0.0 + seed = None if mixed_workload_params: - insert_fraction = mixed_workload_params.get("insert_fraction", 0.1) + insert_fraction = mixed_workload_params.get("insert_fraction", 0.0) seed = mixed_workload_params.get("seed", None) if seed is not None: random.seed(seed) # Set seed for reproducible patterns diff --git a/engine/base_client/search.py b/engine/base_client/search.py index bfbe80cf..b66f3a8a 100644 --- a/engine/base_client/search.py +++ b/engine/base_client/search.py @@ -109,6 +109,9 @@ def search_all( ) self.setup_search() + # Reset the doc_id counter to prevent any initialization during client setup + self.__class__._doc_id_counter = None + search_one = functools.partial(self.__class__._search_one, top=top) insert_one = functools.partial(self.__class__._insert_one) @@ -155,94 +158,192 @@ def cycling_query_generator(queries, total_count): used_queries = queries_list total_query_count = len(used_queries) - if parallel == 1: - # Create a progress bar with the correct total - pbar = tqdm.tqdm(total=total_query_count, desc="Processing queries", unit="queries") - - # Single-threaded execution - start = time.perf_counter() - - # Process queries with progress updates - results = [] - total_insert_count = 0 - total_search_count = 0 - all_insert_latencies = [] - all_search_latencies = [] - - for query in used_queries: - if random.random() < insert_fraction: - precision, latency = insert_one(query) - total_insert_count += 1 - all_insert_latencies.append(latency) - results.append(('insert', precision, latency)) - else: - precision, latency = search_one(query) - total_search_count += 1 - all_search_latencies.append(latency) - results.append(('search', precision, latency)) - pbar.update(1) - - # Close the progress bar - pbar.close() - total_time = time.perf_counter() - start + # Interval reporting setup + interval_size = 10000 # Report every 10K operations + need_interval_reporting = total_query_count >= interval_size + interval_counter = 0 + overall_start_time = time.perf_counter() + + # Calculate total number of intervals for progress tracking + total_intervals = (total_query_count + interval_size - 1) // interval_size # Ceiling division + + # Initialize progress bar for intervals if needed (only if output is to terminal) + if need_interval_reporting and os.isatty(1): # Check if stdout is a terminal + interval_pbar = tqdm.tqdm(total=total_intervals, desc="Intervals", unit="interval") else: - # Dynamically calculate chunk size based on total_query_count - chunk_size = max(1, total_query_count // parallel) - - # If used_queries is a generator, we need to handle it differently - if hasattr(used_queries, '__next__'): - # For generators, we'll create chunks on-the-fly - query_chunks = [] - remaining = total_query_count - while remaining > 0: - current_chunk_size = min(chunk_size, remaining) - chunk = [next(used_queries) for _ in range(current_chunk_size)] - query_chunks.append(chunk) - remaining -= current_chunk_size + interval_pbar = None + + # Initialize global doc_id offset to ensure uniqueness across intervals + global_doc_id_offset = 0 + + # Overall accumulators + overall_results = [] + overall_insert_count = 0 + overall_search_count = 0 + overall_insert_latencies = [] + overall_search_latencies = [] + + # Interval statistics for output file + interval_stats = [] + + # Convert generator to iterator for interval processing + query_iterator = iter(used_queries) + + # Process queries in intervals of 10K + while True: + # Get next interval chunk (up to 10K queries) + interval_queries = list(islice(query_iterator, interval_size)) + if not interval_queries: + break # No more queries + + interval_counter += 1 + current_interval_size = len(interval_queries) + + if parallel == 1: + # Single-threaded execution for this interval + interval_start = time.perf_counter() + + # Force reset and set doc_id counter offset for single-threaded execution + # This ensures we override any previous initialization + self.__class__._doc_id_counter = itertools.count(global_doc_id_offset) + + # Process queries for this interval + interval_results = [] + interval_insert_count = 0 + interval_search_count = 0 + interval_insert_latencies = [] + interval_search_latencies = [] + + for query in interval_queries: + if random.random() < insert_fraction: + precision, latency = insert_one(query) + interval_insert_count += 1 + interval_insert_latencies.append(latency) + interval_results.append(('insert', precision, latency)) + else: + precision, latency = search_one(query) + interval_search_count += 1 + interval_search_latencies.append(latency) + interval_results.append(('search', precision, latency)) + + interval_time = time.perf_counter() - interval_start else: - # For lists, we can use the chunked_iterable function - query_chunks = list(chunked_iterable(used_queries, chunk_size)) - - # Create a queue to collect results - result_queue = Queue() - - # Create worker processes - processes = [] - for chunk in query_chunks: - process = Process(target=worker_function, args=(self, distance, search_one, insert_one, - chunk, result_queue, insert_fraction)) - processes.append(process) - - # Start worker processes - for process in processes: - process.start() - - # Collect results from all worker processes - results = [] - total_insert_count = 0 - total_search_count = 0 - all_insert_latencies = [] - all_search_latencies = [] - min_start_time = time.perf_counter() - - for _ in processes: - proc_start_time, chunk_results, insert_count, search_count, insert_latencies, search_latencies = result_queue.get() - results.extend(chunk_results) - total_insert_count += insert_count - total_search_count += search_count - all_insert_latencies.extend(insert_latencies) - all_search_latencies.extend(search_latencies) + # Parallel execution for this interval + # Dynamically calculate chunk size based on current interval size + chunk_size = max(1, current_interval_size // parallel) + + # For interval queries (always a list), use chunked_iterable + query_chunks = list(chunked_iterable(interval_queries, chunk_size)) + + # Create a queue to collect results + result_queue = Queue() + + # Create worker processes + processes = [] + for i, chunk in enumerate(query_chunks): + # Calculate unique doc_id offset for this worker in this interval + worker_doc_id_offset = global_doc_id_offset + (i * 1000000) + process = Process(target=worker_function, args=(self, distance, search_one, insert_one, + chunk, result_queue, insert_fraction, worker_doc_id_offset)) + processes.append(process) + + # Start worker processes + for process in processes: + process.start() + + # Collect results from all worker processes + interval_results = [] + interval_insert_count = 0 + interval_search_count = 0 + interval_insert_latencies = [] + interval_search_latencies = [] + min_start_time = time.perf_counter() + + for _ in processes: + proc_start_time, chunk_results, insert_count, search_count, insert_latencies, search_latencies = result_queue.get() + interval_results.extend(chunk_results) + interval_insert_count += insert_count + interval_search_count += search_count + interval_insert_latencies.extend(insert_latencies) + interval_search_latencies.extend(search_latencies) + + # Update min_start_time if necessary + if proc_start_time < min_start_time: + min_start_time = proc_start_time + + # Stop measuring time for the critical work + interval_time = time.perf_counter() - min_start_time + + # Wait for all worker processes to finish + for process in processes: + process.join() + + # Accumulate overall results + overall_results.extend(interval_results) + overall_insert_count += interval_insert_count + overall_search_count += interval_search_count + overall_insert_latencies.extend(interval_insert_latencies) + overall_search_latencies.extend(interval_search_latencies) + + # Update global doc_id offset for next interval + if parallel == 1: + # For single-threaded, reserve space based on actual inserts in this interval + global_doc_id_offset += max(1000000, interval_insert_count * 2) # Some buffer + else: + # Reserve space for all parallel workers in this interval + global_doc_id_offset += parallel * 1000000 + + # Report interval metrics if needed + if need_interval_reporting: + interval_search_precisions = [result[1] for result in interval_results if result[0] == 'search'] - # Update min_start_time if necessary - if proc_start_time < min_start_time: - min_start_time = proc_start_time - - # Stop measuring time for the critical work - total_time = time.perf_counter() - min_start_time - - # Wait for all worker processes to finish - for process in processes: - process.join() + # Calculate separate RPS for searches and inserts + search_rps = interval_search_count / interval_time if interval_search_count > 0 else 0 + insert_rps = interval_insert_count / interval_time if interval_insert_count > 0 else 0 + + # Create interval statistics for output file + interval_stat = { + "interval": interval_counter, + "operations": current_interval_size, + "time_seconds": float(interval_time), # Ensure it's a float + "total_rps": float(current_interval_size / interval_time), # Overall RPS + "search_rps": float(search_rps), # Search-only RPS + "insert_rps": float(insert_rps), # Insert-only RPS + "searches": interval_search_count, + "inserts": interval_insert_count, + "search_precision": float(np.mean(interval_search_precisions)) if interval_search_precisions else None + } + interval_stats.append(interval_stat) + + # Debug: Print number of intervals collected so far + print(f"DEBUG: Collected {len(interval_stats)} intervals so far", flush=True) + + # Update progress bar with separate RPS metrics + if interval_pbar: + interval_pbar.update(1) + interval_pbar.set_postfix({ + 'Total_RPS': f"{current_interval_size / interval_time:.1f}", + 'Search_RPS': f"{search_rps:.1f}", + 'Insert_RPS': f"{insert_rps:.1f}", + 'Searches': interval_search_count, + 'Inserts': interval_insert_count, + 'Precision': f"{np.mean(interval_search_precisions):.4f}" if interval_search_precisions else "N/A" + }) + + # Close progress bar when done + if interval_pbar: + interval_pbar.close() + print() # Add a blank line after progress bar + + # Calculate total time for overall metrics + total_time = time.perf_counter() - overall_start_time + + # Use overall accumulated results + results = overall_results + total_insert_count = overall_insert_count + total_search_count = overall_search_count + all_insert_latencies = overall_insert_latencies + all_search_latencies = overall_search_latencies # Extract overall precisions and latencies all_precisions = [result[1] for result in results] @@ -253,6 +354,11 @@ def cycling_query_generator(queries, total_count): self.__class__.delete_client() + + if len(interval_stats) > 0: + print(f"DEBUG: First interval: {interval_stats[0]}", flush=True) + print(f"DEBUG: Last interval: {interval_stats[-1]}", flush=True) + return { # Overall metrics "total_time": total_time, @@ -280,6 +386,9 @@ def cycling_query_generator(queries, total_count): "actual_insert_fraction": total_insert_count / len(all_latencies) if len(all_latencies) > 0 else 0, "target_insert_fraction": insert_fraction, + # Interval statistics (only included if intervals were used) + "interval_stats": interval_stats if interval_stats else None, + # Legacy compatibility (for existing code that expects these) "mean_time": np.mean(all_latencies), "mean_precisions": np.mean(search_precisions) if search_precisions else 1.0, # Only search precisions @@ -332,7 +441,7 @@ def process_chunk(chunk, search_one, insert_one, insert_fraction): return results, insert_count, search_count, insert_latencies, search_latencies # Function to be executed by each worker process -def worker_function(self, distance, search_one, insert_one, chunk, result_queue, insert_fraction=0.0): +def worker_function(self, distance, search_one, insert_one, chunk, result_queue, insert_fraction=0.0, doc_id_offset=0): self.init_client( self.host, distance, @@ -341,6 +450,9 @@ def worker_function(self, distance, search_one, insert_one, chunk, result_queue, ) self.setup_search() + # Force set the doc_id counter offset for this worker (overrides any previous state) + self.__class__._doc_id_counter = itertools.count(doc_id_offset) + start_time = time.perf_counter() results, insert_count, search_count, insert_latencies, search_latencies = process_chunk( chunk, search_one, insert_one, insert_fraction diff --git a/run.py b/run.py index f389b9da..5698633e 100644 --- a/run.py +++ b/run.py @@ -30,14 +30,13 @@ def run( ef_runtime: List[int] = typer.Option([], help="Filter search experiments by ef runtime values. Only experiments with these ef values will be run."), describe: str = typer.Option(None, help="Describe available options: 'datasets' or 'engines'. When used, shows information and exits."), verbose: bool = typer.Option(False, "--verbose", "-v", help="Show detailed information when using --describe"), - mixed_workload: bool = typer.Option(False, help="Enable mixed workload mode"), - insert_fraction: float = typer.Option(0.1, help="Fraction of operations that are inserts (0.0-1.0)"), + insert_fraction: float = typer.Option(0.0, help="Fraction of operations that are inserts (0.0-1.0). Mixed workload is automatically enabled when > 0.0"), mixed_workload_seed: int = typer.Option(None, help="Random seed for reproducible mixed workload patterns"), ): """ Example: python3 run.py --engines *-m-16-* --engines qdrant-* --datasets glove-* - python3 run.py --engines redis --datasets glove-* --mixed-workload --insert-fraction 0.2 + python3 run.py --engines redis --datasets glove-* --insert-fraction 0.2 python3 run.py --describe datasets python3 run.py --describe engines --verbose """ @@ -69,7 +68,8 @@ def run( } mixed_params = {} - if mixed_workload: + # Automatically enable mixed workload when insert_fraction > 0 + if insert_fraction > 0: mixed_params = { "insert_fraction": insert_fraction, "seed": mixed_workload_seed