Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion engine/base_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,9 @@ def run_experiment(

# Extract mixed workload parameters
insert_fraction = 0.0
seed = None
if mixed_workload_params:
insert_fraction = mixed_workload_params.get("insert_fraction", 0.1)
insert_fraction = mixed_workload_params.get("insert_fraction", 0.0)
seed = mixed_workload_params.get("seed", None)
if seed is not None:
random.seed(seed) # Set seed for reproducible patterns
Expand Down
284 changes: 198 additions & 86 deletions engine/base_client/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ def search_all(
)
self.setup_search()

# Reset the doc_id counter to prevent any initialization during client setup
self.__class__._doc_id_counter = None

search_one = functools.partial(self.__class__._search_one, top=top)
insert_one = functools.partial(self.__class__._insert_one)

Expand Down Expand Up @@ -155,94 +158,192 @@ def cycling_query_generator(queries, total_count):
used_queries = queries_list
total_query_count = len(used_queries)

if parallel == 1:
# Create a progress bar with the correct total
pbar = tqdm.tqdm(total=total_query_count, desc="Processing queries", unit="queries")

# Single-threaded execution
start = time.perf_counter()

# Process queries with progress updates
results = []
total_insert_count = 0
total_search_count = 0
all_insert_latencies = []
all_search_latencies = []

for query in used_queries:
if random.random() < insert_fraction:
precision, latency = insert_one(query)
total_insert_count += 1
all_insert_latencies.append(latency)
results.append(('insert', precision, latency))
else:
precision, latency = search_one(query)
total_search_count += 1
all_search_latencies.append(latency)
results.append(('search', precision, latency))
pbar.update(1)

# Close the progress bar
pbar.close()
total_time = time.perf_counter() - start
# Interval reporting setup
interval_size = 10000 # Report every 10K operations
need_interval_reporting = total_query_count >= interval_size
interval_counter = 0
overall_start_time = time.perf_counter()

# Calculate total number of intervals for progress tracking
total_intervals = (total_query_count + interval_size - 1) // interval_size # Ceiling division

# Initialize progress bar for intervals if needed (only if output is to terminal)
if need_interval_reporting and os.isatty(1): # Check if stdout is a terminal
interval_pbar = tqdm.tqdm(total=total_intervals, desc="Intervals", unit="interval")
else:
# Dynamically calculate chunk size based on total_query_count
chunk_size = max(1, total_query_count // parallel)

# If used_queries is a generator, we need to handle it differently
if hasattr(used_queries, '__next__'):
# For generators, we'll create chunks on-the-fly
query_chunks = []
remaining = total_query_count
while remaining > 0:
current_chunk_size = min(chunk_size, remaining)
chunk = [next(used_queries) for _ in range(current_chunk_size)]
query_chunks.append(chunk)
remaining -= current_chunk_size
interval_pbar = None

# Initialize global doc_id offset to ensure uniqueness across intervals
global_doc_id_offset = 0

# Overall accumulators
overall_results = []
overall_insert_count = 0
overall_search_count = 0
overall_insert_latencies = []
overall_search_latencies = []

# Interval statistics for output file
interval_stats = []

# Convert generator to iterator for interval processing
query_iterator = iter(used_queries)

# Process queries in intervals of 10K
while True:
# Get next interval chunk (up to 10K queries)
interval_queries = list(islice(query_iterator, interval_size))
if not interval_queries:
break # No more queries

interval_counter += 1
current_interval_size = len(interval_queries)

if parallel == 1:
# Single-threaded execution for this interval
interval_start = time.perf_counter()

# Force reset and set doc_id counter offset for single-threaded execution
# This ensures we override any previous initialization
self.__class__._doc_id_counter = itertools.count(global_doc_id_offset)

# Process queries for this interval
interval_results = []
interval_insert_count = 0
interval_search_count = 0
interval_insert_latencies = []
interval_search_latencies = []

for query in interval_queries:
if random.random() < insert_fraction:
precision, latency = insert_one(query)
interval_insert_count += 1
interval_insert_latencies.append(latency)
interval_results.append(('insert', precision, latency))
else:
precision, latency = search_one(query)
interval_search_count += 1
interval_search_latencies.append(latency)
interval_results.append(('search', precision, latency))

interval_time = time.perf_counter() - interval_start
else:
# For lists, we can use the chunked_iterable function
query_chunks = list(chunked_iterable(used_queries, chunk_size))

# Create a queue to collect results
result_queue = Queue()

# Create worker processes
processes = []
for chunk in query_chunks:
process = Process(target=worker_function, args=(self, distance, search_one, insert_one,
chunk, result_queue, insert_fraction))
processes.append(process)

# Start worker processes
for process in processes:
process.start()

# Collect results from all worker processes
results = []
total_insert_count = 0
total_search_count = 0
all_insert_latencies = []
all_search_latencies = []
min_start_time = time.perf_counter()

for _ in processes:
proc_start_time, chunk_results, insert_count, search_count, insert_latencies, search_latencies = result_queue.get()
results.extend(chunk_results)
total_insert_count += insert_count
total_search_count += search_count
all_insert_latencies.extend(insert_latencies)
all_search_latencies.extend(search_latencies)
# Parallel execution for this interval
# Dynamically calculate chunk size based on current interval size
chunk_size = max(1, current_interval_size // parallel)

# For interval queries (always a list), use chunked_iterable
query_chunks = list(chunked_iterable(interval_queries, chunk_size))

# Create a queue to collect results
result_queue = Queue()

# Create worker processes
processes = []
for i, chunk in enumerate(query_chunks):
# Calculate unique doc_id offset for this worker in this interval
worker_doc_id_offset = global_doc_id_offset + (i * 1000000)
process = Process(target=worker_function, args=(self, distance, search_one, insert_one,
chunk, result_queue, insert_fraction, worker_doc_id_offset))
processes.append(process)

# Start worker processes
for process in processes:
process.start()

# Collect results from all worker processes
interval_results = []
interval_insert_count = 0
interval_search_count = 0
interval_insert_latencies = []
interval_search_latencies = []
min_start_time = time.perf_counter()

for _ in processes:
proc_start_time, chunk_results, insert_count, search_count, insert_latencies, search_latencies = result_queue.get()
interval_results.extend(chunk_results)
interval_insert_count += insert_count
interval_search_count += search_count
interval_insert_latencies.extend(insert_latencies)
interval_search_latencies.extend(search_latencies)

# Update min_start_time if necessary
if proc_start_time < min_start_time:
min_start_time = proc_start_time

# Stop measuring time for the critical work
interval_time = time.perf_counter() - min_start_time

# Wait for all worker processes to finish
for process in processes:
process.join()

# Accumulate overall results
overall_results.extend(interval_results)
overall_insert_count += interval_insert_count
overall_search_count += interval_search_count
overall_insert_latencies.extend(interval_insert_latencies)
overall_search_latencies.extend(interval_search_latencies)

# Update global doc_id offset for next interval
if parallel == 1:
# For single-threaded, reserve space based on actual inserts in this interval
global_doc_id_offset += max(1000000, interval_insert_count * 2) # Some buffer
else:
# Reserve space for all parallel workers in this interval
global_doc_id_offset += parallel * 1000000

# Report interval metrics if needed
if need_interval_reporting:
interval_search_precisions = [result[1] for result in interval_results if result[0] == 'search']

# Update min_start_time if necessary
if proc_start_time < min_start_time:
min_start_time = proc_start_time

# Stop measuring time for the critical work
total_time = time.perf_counter() - min_start_time

# Wait for all worker processes to finish
for process in processes:
process.join()
# Calculate separate RPS for searches and inserts
search_rps = interval_search_count / interval_time if interval_search_count > 0 else 0
insert_rps = interval_insert_count / interval_time if interval_insert_count > 0 else 0

# Create interval statistics for output file
interval_stat = {
"interval": interval_counter,
"operations": current_interval_size,
"time_seconds": float(interval_time), # Ensure it's a float
"total_rps": float(current_interval_size / interval_time), # Overall RPS
"search_rps": float(search_rps), # Search-only RPS
"insert_rps": float(insert_rps), # Insert-only RPS
"searches": interval_search_count,
"inserts": interval_insert_count,
"search_precision": float(np.mean(interval_search_precisions)) if interval_search_precisions else None
}
interval_stats.append(interval_stat)

# Debug: Print number of intervals collected so far
print(f"DEBUG: Collected {len(interval_stats)} intervals so far", flush=True)

# Update progress bar with separate RPS metrics
if interval_pbar:
interval_pbar.update(1)
interval_pbar.set_postfix({
'Total_RPS': f"{current_interval_size / interval_time:.1f}",
'Search_RPS': f"{search_rps:.1f}",
'Insert_RPS': f"{insert_rps:.1f}",
'Searches': interval_search_count,
'Inserts': interval_insert_count,
'Precision': f"{np.mean(interval_search_precisions):.4f}" if interval_search_precisions else "N/A"
})

# Close progress bar when done
if interval_pbar:
interval_pbar.close()
print() # Add a blank line after progress bar

# Calculate total time for overall metrics
total_time = time.perf_counter() - overall_start_time

# Use overall accumulated results
results = overall_results
total_insert_count = overall_insert_count
total_search_count = overall_search_count
all_insert_latencies = overall_insert_latencies
all_search_latencies = overall_search_latencies

# Extract overall precisions and latencies
all_precisions = [result[1] for result in results]
Expand All @@ -253,6 +354,11 @@ def cycling_query_generator(queries, total_count):

self.__class__.delete_client()


if len(interval_stats) > 0:
print(f"DEBUG: First interval: {interval_stats[0]}", flush=True)
print(f"DEBUG: Last interval: {interval_stats[-1]}", flush=True)

return {
# Overall metrics
"total_time": total_time,
Expand Down Expand Up @@ -280,6 +386,9 @@ def cycling_query_generator(queries, total_count):
"actual_insert_fraction": total_insert_count / len(all_latencies) if len(all_latencies) > 0 else 0,
"target_insert_fraction": insert_fraction,

# Interval statistics (only included if intervals were used)
"interval_stats": interval_stats if interval_stats else None,

# Legacy compatibility (for existing code that expects these)
"mean_time": np.mean(all_latencies),
"mean_precisions": np.mean(search_precisions) if search_precisions else 1.0, # Only search precisions
Expand Down Expand Up @@ -332,7 +441,7 @@ def process_chunk(chunk, search_one, insert_one, insert_fraction):
return results, insert_count, search_count, insert_latencies, search_latencies

# Function to be executed by each worker process
def worker_function(self, distance, search_one, insert_one, chunk, result_queue, insert_fraction=0.0):
def worker_function(self, distance, search_one, insert_one, chunk, result_queue, insert_fraction=0.0, doc_id_offset=0):
self.init_client(
self.host,
distance,
Expand All @@ -341,6 +450,9 @@ def worker_function(self, distance, search_one, insert_one, chunk, result_queue,
)
self.setup_search()

# Force set the doc_id counter offset for this worker (overrides any previous state)
self.__class__._doc_id_counter = itertools.count(doc_id_offset)

start_time = time.perf_counter()
results, insert_count, search_count, insert_latencies, search_latencies = process_chunk(
chunk, search_one, insert_one, insert_fraction
Expand Down
8 changes: 4 additions & 4 deletions run.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,13 @@ def run(
ef_runtime: List[int] = typer.Option([], help="Filter search experiments by ef runtime values. Only experiments with these ef values will be run."),
describe: str = typer.Option(None, help="Describe available options: 'datasets' or 'engines'. When used, shows information and exits."),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Show detailed information when using --describe"),
mixed_workload: bool = typer.Option(False, help="Enable mixed workload mode"),
insert_fraction: float = typer.Option(0.1, help="Fraction of operations that are inserts (0.0-1.0)"),
insert_fraction: float = typer.Option(0.0, help="Fraction of operations that are inserts (0.0-1.0). Mixed workload is automatically enabled when > 0.0"),
mixed_workload_seed: int = typer.Option(None, help="Random seed for reproducible mixed workload patterns"),
):
"""
Example:
python3 run.py --engines *-m-16-* --engines qdrant-* --datasets glove-*
python3 run.py --engines redis --datasets glove-* --mixed-workload --insert-fraction 0.2
python3 run.py --engines redis --datasets glove-* --insert-fraction 0.2
python3 run.py --describe datasets
python3 run.py --describe engines --verbose
"""
Expand Down Expand Up @@ -69,7 +68,8 @@ def run(
}

mixed_params = {}
if mixed_workload:
# Automatically enable mixed workload when insert_fraction > 0
if insert_fraction > 0:
mixed_params = {
"insert_fraction": insert_fraction,
"seed": mixed_workload_seed
Expand Down