From 7751ccd0826a7406e0585331073b8b7262d4699b Mon Sep 17 00:00:00 2001 From: Martin Dimitrov Date: Tue, 2 Sep 2025 13:36:05 -0700 Subject: [PATCH 1/3] removed progress bar reporting --- engine/base_client/search.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/engine/base_client/search.py b/engine/base_client/search.py index bfbe80cf..0c93ae82 100644 --- a/engine/base_client/search.py +++ b/engine/base_client/search.py @@ -156,13 +156,10 @@ def cycling_query_generator(queries, total_count): total_query_count = len(used_queries) if parallel == 1: - # Create a progress bar with the correct total - pbar = tqdm.tqdm(total=total_query_count, desc="Processing queries", unit="queries") - # Single-threaded execution start = time.perf_counter() - # Process queries with progress updates + # Process queries results = [] total_insert_count = 0 total_search_count = 0 @@ -180,10 +177,7 @@ def cycling_query_generator(queries, total_count): total_search_count += 1 all_search_latencies.append(latency) results.append(('search', precision, latency)) - pbar.update(1) - # Close the progress bar - pbar.close() total_time = time.perf_counter() - start else: # Dynamically calculate chunk size based on total_query_count From 01dfafe3ce2ae4acf0f40224ea336f32249c46b8 Mon Sep 17 00:00:00 2001 From: Martin Dimitrov Date: Thu, 4 Sep 2025 11:25:42 -0700 Subject: [PATCH 2/3] added periodic printing of interval results to file --- engine/base_client/search.py | 270 ++++++++++++++++++++++++----------- 1 file changed, 190 insertions(+), 80 deletions(-) diff --git a/engine/base_client/search.py b/engine/base_client/search.py index 0c93ae82..94e9e111 100644 --- a/engine/base_client/search.py +++ b/engine/base_client/search.py @@ -109,6 +109,9 @@ def search_all( ) self.setup_search() + # Reset the doc_id counter to prevent any initialization during client setup + self.__class__._doc_id_counter = None + search_one = functools.partial(self.__class__._search_one, top=top) insert_one = functools.partial(self.__class__._insert_one) @@ -155,88 +158,184 @@ def cycling_query_generator(queries, total_count): used_queries = queries_list total_query_count = len(used_queries) - if parallel == 1: - # Single-threaded execution - start = time.perf_counter() - - # Process queries - results = [] - total_insert_count = 0 - total_search_count = 0 - all_insert_latencies = [] - all_search_latencies = [] - - for query in used_queries: - if random.random() < insert_fraction: - precision, latency = insert_one(query) - total_insert_count += 1 - all_insert_latencies.append(latency) - results.append(('insert', precision, latency)) - else: - precision, latency = search_one(query) - total_search_count += 1 - all_search_latencies.append(latency) - results.append(('search', precision, latency)) - - total_time = time.perf_counter() - start + # Interval reporting setup + interval_size = 10000 # Report every 10K operations + need_interval_reporting = total_query_count >= interval_size + interval_counter = 0 + overall_start_time = time.perf_counter() + + # Calculate total number of intervals for progress tracking + total_intervals = (total_query_count + interval_size - 1) // interval_size # Ceiling division + + # Initialize progress bar for intervals if needed (only if output is to terminal) + if need_interval_reporting and os.isatty(1): # Check if stdout is a terminal + interval_pbar = tqdm.tqdm(total=total_intervals, desc="Intervals", unit="interval") else: - # Dynamically calculate chunk size based on total_query_count - chunk_size = max(1, total_query_count // parallel) - - # If used_queries is a generator, we need to handle it differently - if hasattr(used_queries, '__next__'): - # For generators, we'll create chunks on-the-fly - query_chunks = [] - remaining = total_query_count - while remaining > 0: - current_chunk_size = min(chunk_size, remaining) - chunk = [next(used_queries) for _ in range(current_chunk_size)] - query_chunks.append(chunk) - remaining -= current_chunk_size + interval_pbar = None + + # Initialize global doc_id offset to ensure uniqueness across intervals + global_doc_id_offset = 0 + + # Overall accumulators + overall_results = [] + overall_insert_count = 0 + overall_search_count = 0 + overall_insert_latencies = [] + overall_search_latencies = [] + + # Interval statistics for output file + interval_stats = [] + + # Convert generator to iterator for interval processing + query_iterator = iter(used_queries) + + # Process queries in intervals of 10K + while True: + # Get next interval chunk (up to 10K queries) + interval_queries = list(islice(query_iterator, interval_size)) + if not interval_queries: + break # No more queries + + interval_counter += 1 + current_interval_size = len(interval_queries) + + if parallel == 1: + # Single-threaded execution for this interval + interval_start = time.perf_counter() + + # Force reset and set doc_id counter offset for single-threaded execution + # This ensures we override any previous initialization + self.__class__._doc_id_counter = itertools.count(global_doc_id_offset) + + # Process queries for this interval + interval_results = [] + interval_insert_count = 0 + interval_search_count = 0 + interval_insert_latencies = [] + interval_search_latencies = [] + + for query in interval_queries: + if random.random() < insert_fraction: + precision, latency = insert_one(query) + interval_insert_count += 1 + interval_insert_latencies.append(latency) + interval_results.append(('insert', precision, latency)) + else: + precision, latency = search_one(query) + interval_search_count += 1 + interval_search_latencies.append(latency) + interval_results.append(('search', precision, latency)) + + interval_time = time.perf_counter() - interval_start else: - # For lists, we can use the chunked_iterable function - query_chunks = list(chunked_iterable(used_queries, chunk_size)) - - # Create a queue to collect results - result_queue = Queue() - - # Create worker processes - processes = [] - for chunk in query_chunks: - process = Process(target=worker_function, args=(self, distance, search_one, insert_one, - chunk, result_queue, insert_fraction)) - processes.append(process) - - # Start worker processes - for process in processes: - process.start() - - # Collect results from all worker processes - results = [] - total_insert_count = 0 - total_search_count = 0 - all_insert_latencies = [] - all_search_latencies = [] - min_start_time = time.perf_counter() - - for _ in processes: - proc_start_time, chunk_results, insert_count, search_count, insert_latencies, search_latencies = result_queue.get() - results.extend(chunk_results) - total_insert_count += insert_count - total_search_count += search_count - all_insert_latencies.extend(insert_latencies) - all_search_latencies.extend(search_latencies) + # Parallel execution for this interval + # Dynamically calculate chunk size based on current interval size + chunk_size = max(1, current_interval_size // parallel) + + # For interval queries (always a list), use chunked_iterable + query_chunks = list(chunked_iterable(interval_queries, chunk_size)) + + # Create a queue to collect results + result_queue = Queue() + + # Create worker processes + processes = [] + for i, chunk in enumerate(query_chunks): + # Calculate unique doc_id offset for this worker in this interval + worker_doc_id_offset = global_doc_id_offset + (i * 1000000) + process = Process(target=worker_function, args=(self, distance, search_one, insert_one, + chunk, result_queue, insert_fraction, worker_doc_id_offset)) + processes.append(process) + + # Start worker processes + for process in processes: + process.start() + + # Collect results from all worker processes + interval_results = [] + interval_insert_count = 0 + interval_search_count = 0 + interval_insert_latencies = [] + interval_search_latencies = [] + min_start_time = time.perf_counter() + + for _ in processes: + proc_start_time, chunk_results, insert_count, search_count, insert_latencies, search_latencies = result_queue.get() + interval_results.extend(chunk_results) + interval_insert_count += insert_count + interval_search_count += search_count + interval_insert_latencies.extend(insert_latencies) + interval_search_latencies.extend(search_latencies) + + # Update min_start_time if necessary + if proc_start_time < min_start_time: + min_start_time = proc_start_time + + # Stop measuring time for the critical work + interval_time = time.perf_counter() - min_start_time + + # Wait for all worker processes to finish + for process in processes: + process.join() + + # Accumulate overall results + overall_results.extend(interval_results) + overall_insert_count += interval_insert_count + overall_search_count += interval_search_count + overall_insert_latencies.extend(interval_insert_latencies) + overall_search_latencies.extend(interval_search_latencies) + + # Update global doc_id offset for next interval + if parallel == 1: + # For single-threaded, reserve space based on actual inserts in this interval + global_doc_id_offset += max(1000000, interval_insert_count * 2) # Some buffer + else: + # Reserve space for all parallel workers in this interval + global_doc_id_offset += parallel * 1000000 + + # Report interval metrics if needed + if need_interval_reporting: + interval_search_precisions = [result[1] for result in interval_results if result[0] == 'search'] - # Update min_start_time if necessary - if proc_start_time < min_start_time: - min_start_time = proc_start_time - - # Stop measuring time for the critical work - total_time = time.perf_counter() - min_start_time - - # Wait for all worker processes to finish - for process in processes: - process.join() + # Create interval statistics for output file + interval_stat = { + "interval": interval_counter, + "operations": current_interval_size, + "time_seconds": float(interval_time), # Ensure it's a float + "rps": float(current_interval_size / interval_time), # Ensure it's a float + "searches": interval_search_count, + "inserts": interval_insert_count, + "search_precision": float(np.mean(interval_search_precisions)) if interval_search_precisions else None + } + interval_stats.append(interval_stat) + + # Debug: Print number of intervals collected so far + print(f"DEBUG: Collected {len(interval_stats)} intervals so far", flush=True) + + # Update progress bar with same metrics (this goes to terminal) + if interval_pbar: + interval_pbar.update(1) + interval_pbar.set_postfix({ + 'RPS': f"{current_interval_size / interval_time:.1f}", + 'Searches': interval_search_count, + 'Inserts': interval_insert_count, + 'Precision': f"{np.mean(interval_search_precisions):.4f}" if interval_search_precisions else "N/A" + }) + + # Close progress bar when done + if interval_pbar: + interval_pbar.close() + print() # Add a blank line after progress bar + + # Calculate total time for overall metrics + total_time = time.perf_counter() - overall_start_time + + # Use overall accumulated results + results = overall_results + total_insert_count = overall_insert_count + total_search_count = overall_search_count + all_insert_latencies = overall_insert_latencies + all_search_latencies = overall_search_latencies # Extract overall precisions and latencies all_precisions = [result[1] for result in results] @@ -247,6 +346,11 @@ def cycling_query_generator(queries, total_count): self.__class__.delete_client() + + if len(interval_stats) > 0: + print(f"DEBUG: First interval: {interval_stats[0]}", flush=True) + print(f"DEBUG: Last interval: {interval_stats[-1]}", flush=True) + return { # Overall metrics "total_time": total_time, @@ -274,6 +378,9 @@ def cycling_query_generator(queries, total_count): "actual_insert_fraction": total_insert_count / len(all_latencies) if len(all_latencies) > 0 else 0, "target_insert_fraction": insert_fraction, + # Interval statistics (only included if intervals were used) + "interval_stats": interval_stats if interval_stats else None, + # Legacy compatibility (for existing code that expects these) "mean_time": np.mean(all_latencies), "mean_precisions": np.mean(search_precisions) if search_precisions else 1.0, # Only search precisions @@ -326,7 +433,7 @@ def process_chunk(chunk, search_one, insert_one, insert_fraction): return results, insert_count, search_count, insert_latencies, search_latencies # Function to be executed by each worker process -def worker_function(self, distance, search_one, insert_one, chunk, result_queue, insert_fraction=0.0): +def worker_function(self, distance, search_one, insert_one, chunk, result_queue, insert_fraction=0.0, doc_id_offset=0): self.init_client( self.host, distance, @@ -335,6 +442,9 @@ def worker_function(self, distance, search_one, insert_one, chunk, result_queue, ) self.setup_search() + # Force set the doc_id counter offset for this worker (overrides any previous state) + self.__class__._doc_id_counter = itertools.count(doc_id_offset) + start_time = time.perf_counter() results, insert_count, search_count, insert_latencies, search_latencies = process_chunk( chunk, search_one, insert_one, insert_fraction From da61ef001e952753bf644120d96c457a39e99344 Mon Sep 17 00:00:00 2001 From: Martin Dimitrov Date: Wed, 10 Sep 2025 13:07:56 -0700 Subject: [PATCH 3/3] remove redundant mixed-workload flag --- engine/base_client/client.py | 3 ++- engine/base_client/search.py | 14 +++++++++++--- run.py | 8 ++++---- 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/engine/base_client/client.py b/engine/base_client/client.py index f0fc7ab9..8f648bed 100644 --- a/engine/base_client/client.py +++ b/engine/base_client/client.py @@ -180,8 +180,9 @@ def run_experiment( # Extract mixed workload parameters insert_fraction = 0.0 + seed = None if mixed_workload_params: - insert_fraction = mixed_workload_params.get("insert_fraction", 0.1) + insert_fraction = mixed_workload_params.get("insert_fraction", 0.0) seed = mixed_workload_params.get("seed", None) if seed is not None: random.seed(seed) # Set seed for reproducible patterns diff --git a/engine/base_client/search.py b/engine/base_client/search.py index 94e9e111..b66f3a8a 100644 --- a/engine/base_client/search.py +++ b/engine/base_client/search.py @@ -297,12 +297,18 @@ def cycling_query_generator(queries, total_count): if need_interval_reporting: interval_search_precisions = [result[1] for result in interval_results if result[0] == 'search'] + # Calculate separate RPS for searches and inserts + search_rps = interval_search_count / interval_time if interval_search_count > 0 else 0 + insert_rps = interval_insert_count / interval_time if interval_insert_count > 0 else 0 + # Create interval statistics for output file interval_stat = { "interval": interval_counter, "operations": current_interval_size, "time_seconds": float(interval_time), # Ensure it's a float - "rps": float(current_interval_size / interval_time), # Ensure it's a float + "total_rps": float(current_interval_size / interval_time), # Overall RPS + "search_rps": float(search_rps), # Search-only RPS + "insert_rps": float(insert_rps), # Insert-only RPS "searches": interval_search_count, "inserts": interval_insert_count, "search_precision": float(np.mean(interval_search_precisions)) if interval_search_precisions else None @@ -312,11 +318,13 @@ def cycling_query_generator(queries, total_count): # Debug: Print number of intervals collected so far print(f"DEBUG: Collected {len(interval_stats)} intervals so far", flush=True) - # Update progress bar with same metrics (this goes to terminal) + # Update progress bar with separate RPS metrics if interval_pbar: interval_pbar.update(1) interval_pbar.set_postfix({ - 'RPS': f"{current_interval_size / interval_time:.1f}", + 'Total_RPS': f"{current_interval_size / interval_time:.1f}", + 'Search_RPS': f"{search_rps:.1f}", + 'Insert_RPS': f"{insert_rps:.1f}", 'Searches': interval_search_count, 'Inserts': interval_insert_count, 'Precision': f"{np.mean(interval_search_precisions):.4f}" if interval_search_precisions else "N/A" diff --git a/run.py b/run.py index f389b9da..5698633e 100644 --- a/run.py +++ b/run.py @@ -30,14 +30,13 @@ def run( ef_runtime: List[int] = typer.Option([], help="Filter search experiments by ef runtime values. Only experiments with these ef values will be run."), describe: str = typer.Option(None, help="Describe available options: 'datasets' or 'engines'. When used, shows information and exits."), verbose: bool = typer.Option(False, "--verbose", "-v", help="Show detailed information when using --describe"), - mixed_workload: bool = typer.Option(False, help="Enable mixed workload mode"), - insert_fraction: float = typer.Option(0.1, help="Fraction of operations that are inserts (0.0-1.0)"), + insert_fraction: float = typer.Option(0.0, help="Fraction of operations that are inserts (0.0-1.0). Mixed workload is automatically enabled when > 0.0"), mixed_workload_seed: int = typer.Option(None, help="Random seed for reproducible mixed workload patterns"), ): """ Example: python3 run.py --engines *-m-16-* --engines qdrant-* --datasets glove-* - python3 run.py --engines redis --datasets glove-* --mixed-workload --insert-fraction 0.2 + python3 run.py --engines redis --datasets glove-* --insert-fraction 0.2 python3 run.py --describe datasets python3 run.py --describe engines --verbose """ @@ -69,7 +68,8 @@ def run( } mixed_params = {} - if mixed_workload: + # Automatically enable mixed workload when insert_fraction > 0 + if insert_fraction > 0: mixed_params = { "insert_fraction": insert_fraction, "seed": mixed_workload_seed