From 59adaa2d3011666a369947fab87bbb78105e3c87 Mon Sep 17 00:00:00 2001 From: Luis Gaspar Schroeder Date: Fri, 13 Jun 2025 13:20:30 +0200 Subject: [PATCH 1/8] Minor structural changes and small bug fixes --- benchmarks/_plotter_helper.py | 2 +- benchmarks/benchmark.py | 89 +++++++++---------- tests/ReadMe.md | 5 ++ .../Benchmark/{test.py => test_benchmark.py} | 0 .../{test.py => test_embedding_engine.py} | 0 .../{test.py => test_embedding_metadata.py} | 0 .../{test.py => test_eviction_policy.py} | 0 .../{test.py => test_inference_engine.py} | 0 .../{test.py => test_similarity_evaluator.py} | 0 .../{test.py => test_vector_db.py} | 0 vcache/vcache_core/cache/cache.py | 6 +- 11 files changed, 49 insertions(+), 53 deletions(-) rename tests/unit/Benchmark/{test.py => test_benchmark.py} (100%) rename tests/unit/EmbeddingEngineStrategy/{test.py => test_embedding_engine.py} (100%) rename tests/unit/EmbeddingMetadataStrategy/{test.py => test_embedding_metadata.py} (100%) rename tests/unit/EvictionPolicyStrategy/{test.py => test_eviction_policy.py} (100%) rename tests/unit/InferenceEngineStrategy/{test.py => test_inference_engine.py} (100%) rename tests/unit/SimilarityEvalutatorStrategy/{test.py => test_similarity_evaluator.py} (100%) rename tests/unit/VectorDBStrategy/{test.py => test_vector_db.py} (100%) diff --git a/benchmarks/_plotter_helper.py b/benchmarks/_plotter_helper.py index 4030085..4555728 100644 --- a/benchmarks/_plotter_helper.py +++ b/benchmarks/_plotter_helper.py @@ -19,7 +19,7 @@ def convert_to_dataframe_from_benchmark(benchmark: "Benchmark") -> tuple: "tn_list": benchmark.tn_list, "fn_list": benchmark.fn_list, "latency_direct_list": benchmark.latency_direct_list, - "latency_vectorq_list": benchmark.latency_vcach_list, + "latency_vectorq_list": benchmark.latency_vcache_list, } df = pd.DataFrame(data) diff --git a/benchmarks/benchmark.py b/benchmarks/benchmark.py index a9c920f..b39de34 100644 --- a/benchmarks/benchmark.py +++ b/benchmarks/benchmark.py @@ -33,6 +33,7 @@ HNSWLibVectorDB, SimilarityMetricType, ) +from vcache.vcache_core.similarity_evaluator import SimilarityEvaluator from vcache.vcache_core.similarity_evaluator.strategies.llm_comparison import ( LLMComparisonSimilarityEvaluator, ) @@ -101,7 +102,7 @@ class Dataset(Enum): ECOMMERCE_DATASET = "ecommerce_dataset" -class GenerateResultsOnly(Enum): +class GeneratePlotsOnly(Enum): YES = True NO = False @@ -112,62 +113,37 @@ class GenerateResultsOnly(Enum): MAX_SAMPLES: int = 60000 -CONFIDENCE_INTERVALS_ITERATIONS: int = 5 -IS_LLM_JUDGE_BENCHMARK: bool = False -DISABLE_PROGRESS_BAR: bool = True +CONFIDENCE_INTERVALS_ITERATIONS: int = 2 +DISABLE_PROGRESS_BAR: bool = False KEEP_SPLIT: int = 100 RUN_COMBINATIONS: List[ - Tuple[EmbeddingModel, LargeLanguageModel, Dataset, GenerateResultsOnly] + Tuple[EmbeddingModel, LargeLanguageModel, Dataset, GeneratePlotsOnly] ] = [ ( EmbeddingModel.GTE, LargeLanguageModel.LLAMA_3_8B, - Dataset.SEM_BENCHMARK_SEARCH_QUERIES, - GenerateResultsOnly.YES, + Dataset.SEM_BENCHMARK_CLASSIFICATION, + GeneratePlotsOnly.NO, + StringComparisonSimilarityEvaluator(), ), ( EmbeddingModel.GTE, LargeLanguageModel.GPT_4O_MINI, Dataset.SEM_BENCHMARK_ARENA, - GenerateResultsOnly.YES, - ), - ( - EmbeddingModel.E5_LARGE_V2, - LargeLanguageModel.GPT_4O_MINI, - Dataset.SEM_BENCHMARK_ARENA, - GenerateResultsOnly.YES, - ), - ( - EmbeddingModel.E5_LARGE_V2, - LargeLanguageModel.LLAMA_3_8B, - Dataset.SEM_BENCHMARK_CLASSIFICATION, - GenerateResultsOnly.YES, - ), - ( - EmbeddingModel.GTE, - LargeLanguageModel.LLAMA_3_8B, - Dataset.SEM_BENCHMARK_CLASSIFICATION, - GenerateResultsOnly.YES, - ), - ( - EmbeddingModel.GTE, - LargeLanguageModel.LLAMA_3_70B, - Dataset.SEM_BENCHMARK_CLASSIFICATION, - GenerateResultsOnly.YES, + GeneratePlotsOnly.NO, + LLMComparisonSimilarityEvaluator(), ), ] BASELINES_TO_RUN: List[Baseline] = [ # Baseline.IID, # Baseline.GPTCache, - # Baseline.VCacheLocal, + Baseline.VCacheLocal, # Baseline.BerkeleyEmbedding, # Baseline.VCacheBerkeleyEmbedding, ] -DATASETS_TO_RUN: List[str] = [Dataset.SEM_BENCHMARK_SEARCH_QUERIES] - STATIC_THRESHOLDS: List[float] = [ 0.80, 0.81, @@ -220,7 +196,7 @@ def stats_set_up(self): self.tn_list: List[int] = [] self.fn_list: List[int] = [] self.latency_direct_list: List[float] = [] - self.latency_vcach_list: List[float] = [] + self.latency_vcache_list: List[float] = [] self.observations_dict: Dict[str, Dict[str, float]] = {} self.gammas_dict: Dict[str, float] = {} self.t_hats_dict: Dict[str, float] = {} @@ -465,7 +441,7 @@ def dump_results_to_json(self): "tn_list": self.tn_list, "fn_list": self.fn_list, "latency_direct_list": self.latency_direct_list, - "latency_vectorq_list": self.latency_vcach_list, + "latency_vectorq_list": self.latency_vcache_list, "observations_dict": self.observations_dict, "gammas_dict": self.gammas_dict, "t_hats_dict": self.t_hats_dict, @@ -498,12 +474,8 @@ def __run_baseline( timestamp: str, delta: float, threshold: float, + similarity_evaluator: SimilarityEvaluator, ): - if IS_LLM_JUDGE_BENCHMARK: - similarity_evaluator = LLMComparisonSimilarityEvaluator() - else: - similarity_evaluator = StringComparisonSimilarityEvaluator() - vcache_config: VCacheConfig = VCacheConfig( inference_engine=BenchmarkInferenceEngine(), embedding_engine=BenchmarkEmbeddingEngine(), @@ -547,8 +519,15 @@ def main(): timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M") - for embedding_model, llm_model, dataset, generate_results_only in RUN_COMBINATIONS: + for ( + embedding_model, + llm_model, + dataset, + generate_plots_only, + similarity_evaluator, + ) in RUN_COMBINATIONS: try: + print(f"DatasetPath: {datasets_dir}, Dataset: {dataset.value}") dataset_file = os.path.join(datasets_dir, f"{dataset.value}.json") logging.info( f"Running benchmark for dataset: {dataset}, embedding model: {embedding_model.value[1]}, LLM model: {llm_model.value[1]}\n" @@ -557,7 +536,10 @@ def main(): ##################################################### ### Baseline: vCache Local - if Baseline.VCacheLocal in BASELINES_TO_RUN and not generate_results_only: + if ( + Baseline.VCacheLocal in BASELINES_TO_RUN + and not generate_plots_only.value + ): for delta in DELTAS: for i in range(0, CONFIDENCE_INTERVALS_ITERATIONS): path = os.path.join( @@ -583,11 +565,15 @@ def main(): timestamp=timestamp, delta=delta, threshold=-1, + similarity_evaluator=similarity_evaluator, ) ##################################################### ### Baseline: vCache Global - if Baseline.VCacheGlobal in BASELINES_TO_RUN and not generate_results_only: + if ( + Baseline.VCacheGlobal in BASELINES_TO_RUN + and not generate_plots_only.value + ): for delta in DELTAS: path = os.path.join( results_dir, @@ -612,13 +598,14 @@ def main(): timestamp=timestamp, delta=delta, threshold=-1, + similarity_evaluator=similarity_evaluator, ) ##################################################### ### Baseline: Berkeley Embedding if ( Baseline.BerkeleyEmbedding in BASELINES_TO_RUN - and not generate_results_only + and not generate_plots_only.value ): for threshold in STATIC_THRESHOLDS: if embedding_model == EmbeddingModel.E5_MISTRAL_7B: @@ -658,13 +645,14 @@ def main(): timestamp=timestamp, delta=-1, threshold=threshold, + similarity_evaluator=similarity_evaluator, ) ##################################################### ### Baseline: vCache + Berkeley Embedding if ( Baseline.VCacheBerkeleyEmbedding in BASELINES_TO_RUN - and not generate_results_only + and not generate_plots_only.value ): for delta in DELTAS: for i in range(0, CONFIDENCE_INTERVALS_ITERATIONS): @@ -707,11 +695,12 @@ def main(): timestamp=timestamp, delta=delta, threshold=-1, + similarity_evaluator=similarity_evaluator, ) ##################################################### ### Baseline: IID Local - if Baseline.IID in BASELINES_TO_RUN and not generate_results_only: + if Baseline.IID in BASELINES_TO_RUN and not generate_plots_only.value: for delta in DELTAS: for i in range(0, CONFIDENCE_INTERVALS_ITERATIONS): path = os.path.join( @@ -737,11 +726,12 @@ def main(): timestamp=timestamp, delta=delta, threshold=-1, + similarity_evaluator=similarity_evaluator, ) ##################################################### ### Baseline: GPTCache - if Baseline.GPTCache in BASELINES_TO_RUN and not generate_results_only: + if Baseline.GPTCache in BASELINES_TO_RUN and not generate_plots_only.value: for threshold in STATIC_THRESHOLDS: path = os.path.join( results_dir, @@ -764,6 +754,7 @@ def main(): timestamp=timestamp, delta=-1, threshold=threshold, + similarity_evaluator=similarity_evaluator, ) ##################################################### diff --git a/tests/ReadMe.md b/tests/ReadMe.md index 7ab2408..e4c36a1 100644 --- a/tests/ReadMe.md +++ b/tests/ReadMe.md @@ -25,6 +25,11 @@ vCache includes both **unit tests** and **integration tests** to ensure correctn Unit tests verify the **logic of individual module strategies** (e.g., caching policies, embedding engines, similarity evaluators) in isolation. They are designed to be fast, deterministic, and independent of external services. +#### Running Unit Tests + +```bash +python -m pytest tests/unit/ +``` ### Integration Tests diff --git a/tests/unit/Benchmark/test.py b/tests/unit/Benchmark/test_benchmark.py similarity index 100% rename from tests/unit/Benchmark/test.py rename to tests/unit/Benchmark/test_benchmark.py diff --git a/tests/unit/EmbeddingEngineStrategy/test.py b/tests/unit/EmbeddingEngineStrategy/test_embedding_engine.py similarity index 100% rename from tests/unit/EmbeddingEngineStrategy/test.py rename to tests/unit/EmbeddingEngineStrategy/test_embedding_engine.py diff --git a/tests/unit/EmbeddingMetadataStrategy/test.py b/tests/unit/EmbeddingMetadataStrategy/test_embedding_metadata.py similarity index 100% rename from tests/unit/EmbeddingMetadataStrategy/test.py rename to tests/unit/EmbeddingMetadataStrategy/test_embedding_metadata.py diff --git a/tests/unit/EvictionPolicyStrategy/test.py b/tests/unit/EvictionPolicyStrategy/test_eviction_policy.py similarity index 100% rename from tests/unit/EvictionPolicyStrategy/test.py rename to tests/unit/EvictionPolicyStrategy/test_eviction_policy.py diff --git a/tests/unit/InferenceEngineStrategy/test.py b/tests/unit/InferenceEngineStrategy/test_inference_engine.py similarity index 100% rename from tests/unit/InferenceEngineStrategy/test.py rename to tests/unit/InferenceEngineStrategy/test_inference_engine.py diff --git a/tests/unit/SimilarityEvalutatorStrategy/test.py b/tests/unit/SimilarityEvalutatorStrategy/test_similarity_evaluator.py similarity index 100% rename from tests/unit/SimilarityEvalutatorStrategy/test.py rename to tests/unit/SimilarityEvalutatorStrategy/test_similarity_evaluator.py diff --git a/tests/unit/VectorDBStrategy/test.py b/tests/unit/VectorDBStrategy/test_vector_db.py similarity index 100% rename from tests/unit/VectorDBStrategy/test.py rename to tests/unit/VectorDBStrategy/test_vector_db.py diff --git a/vcache/vcache_core/cache/cache.py b/vcache/vcache_core/cache/cache.py index f0ee3fe..7d7afe0 100644 --- a/vcache/vcache_core/cache/cache.py +++ b/vcache/vcache_core/cache/cache.py @@ -43,7 +43,7 @@ def add(self, prompt: str, response: str) -> int: The id of the embedding. """ embedding = self.embedding_engine.get_embedding(prompt) - self.embedding_store.add_embedding(embedding, response) + return self.embedding_store.add_embedding(embedding, response) def remove(self, embedding_id: int) -> int: """ @@ -55,7 +55,7 @@ def remove(self, embedding_id: int) -> int: Returns: The id of the embedding. """ - self.embedding_store.remove(embedding_id) + return self.embedding_store.remove(embedding_id) def get_knn(self, prompt: str, k: int) -> List[tuple[float, int]]: """ @@ -102,7 +102,7 @@ def update_metadata( Returns: The updated metadata of the embedding. """ - self.embedding_store.update_metadata(embedding_id, embedding_metadata) + return self.embedding_store.update_metadata(embedding_id, embedding_metadata) def get_current_capacity(self) -> int: """ From 215cd06390ba26457cd892b22c612754aaf9650a Mon Sep 17 00:00:00 2001 From: Luis Gaspar Schroeder Date: Fri, 13 Jun 2025 13:54:20 +0200 Subject: [PATCH 2/8] Implemented async label generation with queue logic --- .../strategies/dynamic_local_threshold.py | 224 ++++++++++++++++-- 1 file changed, 210 insertions(+), 14 deletions(-) diff --git a/vcache/vcache_policy/strategies/dynamic_local_threshold.py b/vcache/vcache_policy/strategies/dynamic_local_threshold.py index af9010a..77b96b4 100644 --- a/vcache/vcache_policy/strategies/dynamic_local_threshold.py +++ b/vcache/vcache_policy/strategies/dynamic_local_threshold.py @@ -1,4 +1,7 @@ +import queue import random +import threading +from concurrent.futures import ThreadPoolExecutor from enum import Enum from typing import Dict, List, Optional, Tuple @@ -22,6 +25,57 @@ from vcache.vcache_policy.vcache_policy import VCachePolicy +class CallbackQueue(queue.Queue): + """ + A queue that processes items with a callback function in a worker thread. + """ + + def __init__(self, callback_function): + """ + Initializes the CallbackQueue. + + Args: + callback_function: The function to call for each item in the queue. + It will be executed by the worker thread. + """ + super().__init__() + self.callback_function = callback_function + self._stop_event = threading.Event() + self.worker_thread = threading.Thread(target=self._worker, daemon=True) + + def _worker(self): + """ + The main loop for the worker thread. + + It continuously fetches items from the queue and processes them using the + callback function. The loop includes a timeout to allow for graceful + shutdown checks. + """ + while True: + should_stop = self._stop_event.is_set() + if should_stop: + break + + try: + item = self.get(timeout=1) + if item is None: # Sentinel value to stop + break + self.callback_function(item) + self.task_done() + except queue.Empty: + continue + + def start(self): + """Starts the worker thread.""" + self.worker_thread.start() + + def stop(self): + """Stops the worker thread gracefully.""" + if self.worker_thread.is_alive(): + self.put(None) + self.worker_thread.join() + + class DynamicLocalThresholdPolicy(VCachePolicy): """ Dynamic local threshold policy that computes optimal thresholds for each embedding. @@ -31,19 +85,30 @@ def __init__(self, delta: float = 0.01): """ Initialize dynamic local threshold policy. + Initializes the core algorithm and sets up placeholders for the thread + pool executor and callback queue which will be created in `setup`. + Args: delta: The delta value to use for threshold computation. """ self.bayesian = _Algorithm(delta=delta) - self.similarity_evaluator: SimilarityEvaluator = None - self.inference_engine: InferenceEngine = None - self.cache: Cache = None + self.similarity_evaluator: Optional[SimilarityEvaluator] = None + self.inference_engine: Optional[InferenceEngine] = None + self.cache: Optional[Cache] = None + + self.executor: Optional[ThreadPoolExecutor] = None + self.callback_queue: Optional[CallbackQueue] = None @override def setup(self, config: VCacheConfig): """ Setup the policy with the given configuration. + This method initializes the cache, similarity evaluator, and inference + engine. It also sets up and starts the background processing components: + a ThreadPoolExecutor for concurrent tasks and a CallbackQueue for + serialized cache updates. + Args: config: The VCache configuration to use. """ @@ -58,6 +123,21 @@ def setup(self, config: VCacheConfig): eviction_policy=config.eviction_policy, ) + self.callback_queue = CallbackQueue( + callback_function=self.__perform_cache_update + ) + self.callback_queue.start() + self.executor = ThreadPoolExecutor(max_workers=64) + + def shutdown(self): + """ + Shuts down the thread pool and callback queue gracefully. + """ + if self.callback_queue: + self.callback_queue.stop() + if self.executor: + self.executor.shutdown(wait=True) + @override def process_request( self, prompt: str, system_prompt: Optional[str] @@ -65,6 +145,11 @@ def process_request( """ Process a request using dynamic local threshold policy. + It determines whether to serve a cached response or generate a new one. + If the policy decides to 'explore', it generates a new response and + triggers an asynchronous background task to evaluate the decision and + update the cache, without blocking the current request. + Args: prompt: The prompt to check for cache hit. system_prompt: The optional system prompt to use for the response. It will override the system prompt in the VCacheConfig if provided. @@ -96,21 +181,132 @@ def process_request( response = self.inference_engine.create( prompt=prompt, system_prompt=system_prompt ) - should_have_exploited = self.similarity_evaluator.answers_similar( - a=response, b=metadata.response - ) - self.bayesian.update_metadata( - similarity_score=similarity_score, - is_correct=should_have_exploited, + + self.__update_cache( + response=response, metadata=metadata, + similarity_score=similarity_score, + embedding_id=embedding_id, + prompt=prompt, ) - if not should_have_exploited: - self.cache.add(prompt=prompt, response=response) - self.cache.update_metadata( - embedding_id=embedding_id, embedding_metadata=metadata - ) + return False, response, metadata.response + def __update_cache( + self, + response: str, + metadata: EmbeddingMetadataObj, + similarity_score: float, + embedding_id: int, + prompt: str, + ) -> None: + """ + Asynchronously validates the correctness of the cached response and updates the cache. + + The validation whether the response is correct can involve a latency expensive LLM-judge call. + Because this evaluation does not impact the returned response, we process it in the background. + The LLM-judge call (or any other strategy like an embedding or string-based similarity check) in its own thread + and returns a label (True/False) whether the response is correct. + vCache maintains a global queue that waits for the labels. When a label gets available, + vCache updates the metadata and the vector database accordingly. + + Args: + response: The response to check for correctness. + metadata: The metadata of the embedding. + similarity_score: The similarity score between the query and the embedding. + embedding_id: The id of the embedding. + prompt: The prompt that was used to generate the response. + """ + if self.executor is None: + raise ValueError("Executor not initialized. Call setup() first.") + + self.executor.submit( + self.__submit_for_background_update, + response, + metadata, + similarity_score, + embedding_id, + prompt, + ) + + def __submit_for_background_update( + self, + response: str, + metadata: EmbeddingMetadataObj, + similarity_score: float, + embedding_id: int, + prompt: str, + ): + """ + Submits a task to check answer similarity and queue a cache update. + + This method is executed by the ThreadPoolExecutor. It performs the + potentially slow `answers_similar` check and then puts the result + and context onto the `callback_queue` for sequential processing. + + Args: + response: The newly generated response. + metadata: The metadata of the nearest neighbor embedding from the cache. + similarity_score: The similarity between the prompt and the nearest neighbor. + embedding_id: The ID of the nearest neighbor embedding. + prompt: The original user prompt. + """ + should_have_exploited = self.similarity_evaluator.answers_similar( + a=response, b=metadata.response + ) + self.callback_queue.put( + ( + should_have_exploited, + response, + metadata, + similarity_score, + embedding_id, + prompt, + ) + ) + + def __perform_cache_update(self, update_args: tuple) -> None: + """ + Performs the actual cache update based on the background check. + + This method is executed sequentially by the CallbackQueue's worker + thread, ensuring thread-safe updates to the cache metadata and + vector database. + + Args: + update_args: A tuple containing the context required for the update, + as passed from `__submit_for_background_update`. It + contains the following elements in order: + + - should_have_exploited (bool): Whether the cache hit + should have been exploited. + - response (str): The newly generated response. + - metadata (EmbeddingMetadataObj): The metadata of the + nearest neighbor. + - similarity_score (float): The similarity score. + - embedding_id (int): The ID of the nearest neighbor. + - prompt (str): The original user prompt. + """ + ( + should_have_exploited, + response, + metadata, + similarity_score, + embedding_id, + prompt, + ) = update_args + + self.bayesian.update_metadata( + similarity_score=similarity_score, + is_correct=should_have_exploited, + metadata=metadata, + ) + if not should_have_exploited: + self.cache.add(prompt=prompt, response=response) + self.cache.update_metadata( + embedding_id=embedding_id, embedding_metadata=metadata + ) + class _Action(Enum): """ From 56cb50bf54e7858ade67f54d28eca4adab85cecd Mon Sep 17 00:00:00 2001 From: Luis Gaspar Schroeder Date: Fri, 13 Jun 2025 14:18:11 +0200 Subject: [PATCH 3/8] Decoupled metadata logic to synchronous thread --- .../strategies/dynamic_local_threshold.py | 50 ++++++++++--------- 1 file changed, 27 insertions(+), 23 deletions(-) diff --git a/vcache/vcache_policy/strategies/dynamic_local_threshold.py b/vcache/vcache_policy/strategies/dynamic_local_threshold.py index 77b96b4..df561dd 100644 --- a/vcache/vcache_policy/strategies/dynamic_local_threshold.py +++ b/vcache/vcache_policy/strategies/dynamic_local_threshold.py @@ -223,19 +223,19 @@ def __update_cache( self.executor.submit( self.__submit_for_background_update, response, - metadata, similarity_score, embedding_id, prompt, + metadata.response, ) def __submit_for_background_update( self, - response: str, - metadata: EmbeddingMetadataObj, + new_response: str, similarity_score: float, embedding_id: int, prompt: str, + cached_response: str, ): """ Submits a task to check answer similarity and queue a cache update. @@ -245,20 +245,19 @@ def __submit_for_background_update( and context onto the `callback_queue` for sequential processing. Args: - response: The newly generated response. - metadata: The metadata of the nearest neighbor embedding from the cache. + new_response: The newly generated response. similarity_score: The similarity between the prompt and the nearest neighbor. embedding_id: The ID of the nearest neighbor embedding. prompt: The original user prompt. + cached_response: The response from the cached nearest neighbor. """ should_have_exploited = self.similarity_evaluator.answers_similar( - a=response, b=metadata.response + a=new_response, b=cached_response ) self.callback_queue.put( ( should_have_exploited, - response, - metadata, + new_response, similarity_score, embedding_id, prompt, @@ -271,40 +270,45 @@ def __perform_cache_update(self, update_args: tuple) -> None: This method is executed sequentially by the CallbackQueue's worker thread, ensuring thread-safe updates to the cache metadata and - vector database. + vector database. It fetches the latest metadata before updating to + prevent race conditions with evictions or other updates. Args: update_args: A tuple containing the context required for the update, as passed from `__submit_for_background_update`. It contains the following elements in order: - - - should_have_exploited (bool): Whether the cache hit - should have been exploited. - - response (str): The newly generated response. - - metadata (EmbeddingMetadataObj): The metadata of the - nearest neighbor. - - similarity_score (float): The similarity score. - - embedding_id (int): The ID of the nearest neighbor. - - prompt (str): The original user prompt. + - should_have_exploited (bool): Whether the cache hit + should have been exploited. + - new_response (str): The newly generated response. + - similarity_score (float): The similarity score. + - embedding_id (int): The ID of the nearest neighbor. + - prompt (str): The original user prompt. """ ( should_have_exploited, - response, - metadata, + new_response, similarity_score, embedding_id, prompt, ) = update_args + # Fetch the latest metadata within the synchronized queue to avoid race conditions + latest_metdata_object = self.cache.get_metadata(embedding_id=embedding_id) + item_was_evicted = latest_metdata_object is None + if item_was_evicted: + return + self.bayesian.update_metadata( similarity_score=similarity_score, is_correct=should_have_exploited, - metadata=metadata, + metadata=latest_metdata_object, ) + if not should_have_exploited: - self.cache.add(prompt=prompt, response=response) + self.cache.add(prompt=prompt, response=new_response) + self.cache.update_metadata( - embedding_id=embedding_id, embedding_metadata=metadata + embedding_id=embedding_id, embedding_metadata=latest_metdata_object ) From 32b3af739b45247f906bc3470ce26c3811e34323 Mon Sep 17 00:00:00 2001 From: Luis Gaspar Schroeder Date: Fri, 13 Jun 2025 14:42:57 +0200 Subject: [PATCH 4/8] Added locks to vector db to avoid race between embedding add and metadata creation --- vcache/vcache_core/cache/cache.py | 6 +++- .../cache/embedding_store/embedding_store.py | 33 ++++++++++++------- 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/vcache/vcache_core/cache/cache.py b/vcache/vcache_core/cache/cache.py index 7d7afe0..60adf17 100644 --- a/vcache/vcache_core/cache/cache.py +++ b/vcache/vcache_core/cache/cache.py @@ -33,7 +33,11 @@ def __init__( def add(self, prompt: str, response: str) -> int: """ - Add a prompt-response pair to the cache. + Compute the embedding for the prompt, add an embedding to the vector database and a new metadata object. + + IMPORTANT: The embedding is computed first and then added to the vector database. + The metadata object is added last. + Consider this when implementing asynchronous logic to prevent race conditions. Args: prompt: The prompt to add to the cache. diff --git a/vcache/vcache_core/cache/embedding_store/embedding_store.py b/vcache/vcache_core/cache/embedding_store/embedding_store.py index ba664dd..c36d10e 100644 --- a/vcache/vcache_core/cache/embedding_store/embedding_store.py +++ b/vcache/vcache_core/cache/embedding_store/embedding_store.py @@ -1,3 +1,4 @@ +import threading from typing import List from vcache.vcache_core.cache.embedding_store.embedding_metadata_storage import ( @@ -28,10 +29,14 @@ def __init__( """ self.vector_db = vector_db self.embedding_metadata_storage = embedding_metadata_storage + self._add_lock = threading.Lock() + self._remove_lock = threading.Lock() def add_embedding(self, embedding: List[float], response: str) -> int: """ - Add an embedding and its associated response to the store. + Add an embedding to the vector database and a new metadata object. + + This operation is thread-safe. Args: embedding: The embedding vector to add. @@ -40,28 +45,32 @@ def add_embedding(self, embedding: List[float], response: str) -> int: Returns: The ID of the added embedding. """ - embedding_id = self.vector_db.add(embedding) - metadata = EmbeddingMetadataObj( - embedding_id=embedding_id, - response=response, - ) - self.embedding_metadata_storage.add_metadata( - embedding_id=embedding_id, metadata=metadata - ) - return embedding_id + with self._add_lock: + embedding_id = self.vector_db.add(embedding) + metadata = EmbeddingMetadataObj( + embedding_id=embedding_id, + response=response, + ) + self.embedding_metadata_storage.add_metadata( + embedding_id=embedding_id, metadata=metadata + ) + return embedding_id def remove(self, embedding_id: int) -> int: """ Remove an embedding and its metadata from the store. + This operation is thread-safe. + Args: embedding_id: The ID of the embedding to remove. Returns: The ID of the removed embedding. """ - self.embedding_metadata_storage.remove_metadata(embedding_id) - return self.vector_db.remove(embedding_id) + with self._remove_lock: + self.embedding_metadata_storage.remove_metadata(embedding_id) + return self.vector_db.remove(embedding_id) def get_knn(self, embedding: List[float], k: int) -> List[tuple[float, int]]: """ From e5f42e23ee301a1fafb7ff2915fa3dfc9cac0f06 Mon Sep 17 00:00:00 2001 From: Luis Gaspar Schroeder Date: Fri, 13 Jun 2025 15:13:34 +0200 Subject: [PATCH 5/8] Added fallback for evicted embeddings and waiting time for benchmark --- benchmarks/benchmark.py | 3 +++ .../strategies/dynamic_local_threshold.py | 12 +++++++++++- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/benchmarks/benchmark.py b/benchmarks/benchmark.py index b39de34..14f6ef6 100644 --- a/benchmarks/benchmark.py +++ b/benchmarks/benchmark.py @@ -267,6 +267,9 @@ def test_run_benchmark(self): if not is_cache_hit: latency_vcache += llm_generation_latency + # This is important for the async logic + time.sleep(0.002) + # 3) Update Stats self.update_stats( is_cache_hit=is_cache_hit, diff --git a/vcache/vcache_policy/strategies/dynamic_local_threshold.py b/vcache/vcache_policy/strategies/dynamic_local_threshold.py index df561dd..7be033f 100644 --- a/vcache/vcache_policy/strategies/dynamic_local_threshold.py +++ b/vcache/vcache_policy/strategies/dynamic_local_threshold.py @@ -169,7 +169,17 @@ def process_request( return False, response, "" similarity_score, embedding_id = knn[0] - metadata = self.cache.get_metadata(embedding_id=embedding_id) + + try: + metadata = self.cache.get_metadata(embedding_id=embedding_id) + except Exception: + # Cache eviction fallback + new_response = self.inference_engine.create( + prompt=prompt, system_prompt=system_prompt + ) + self.cache.add(prompt=prompt, response=new_response) + return False, new_response, new_response + action = self.bayesian.select_action( similarity_score=similarity_score, metadata=metadata ) From 2ff10c1de546b5215251bb3531da0b7c57df792b Mon Sep 17 00:00:00 2001 From: Luis Gaspar Schroeder Date: Fri, 13 Jun 2025 16:08:45 +0200 Subject: [PATCH 6/8] Implemented test to validate the correctness of the DynamicLocalThresholdPolicy --- .../test_vcache_policy.py | 250 ++++++++++++++++++ 1 file changed, 250 insertions(+) create mode 100644 tests/unit/VCachePolicyStrategy/test_vcache_policy.py diff --git a/tests/unit/VCachePolicyStrategy/test_vcache_policy.py b/tests/unit/VCachePolicyStrategy/test_vcache_policy.py new file mode 100644 index 0000000..cb9fcc7 --- /dev/null +++ b/tests/unit/VCachePolicyStrategy/test_vcache_policy.py @@ -0,0 +1,250 @@ +import unittest +from concurrent.futures import ThreadPoolExecutor, as_completed +from unittest.mock import MagicMock, patch + +from vcache.config import VCacheConfig +from vcache.vcache_core.cache.embedding_store.embedding_metadata_storage.embedding_metadata_obj import ( + EmbeddingMetadataObj, +) +from vcache.vcache_policy.strategies.dynamic_local_threshold import ( + DynamicLocalThresholdPolicy, + _Action, +) + + +class TestDynamicLocalThresholdPolicy(unittest.TestCase): + def setUp(self): + """Set up a new policy and mock dependencies for each test.""" + self.mock_inference_engine = MagicMock() + self.mock_similarity_evaluator = MagicMock() + self.mock_cache = MagicMock() + + # Create a stateful mock for the cache + self.mock_metadata_store = {} + self.next_embedding_id = 0 + + def add_to_cache(prompt, response): + self.next_embedding_id += 1 + # Simulate adding metadata + mock_meta = MagicMock(spec=EmbeddingMetadataObj) + mock_meta.response = response + mock_meta.observations = [] + self.mock_metadata_store[self.next_embedding_id] = mock_meta + return self.next_embedding_id + + def get_metadata(embedding_id): + # Allow raising KeyError to simulate not found + if embedding_id not in self.mock_metadata_store: + raise KeyError("Metadata not found") + return self.mock_metadata_store[embedding_id] + + def update_metadata(embedding_id, embedding_metadata): + self.mock_metadata_store[embedding_id] = embedding_metadata + + self.mock_cache.get_metadata.side_effect = get_metadata + self.mock_cache.update_metadata.side_effect = update_metadata + self.mock_cache.add.side_effect = add_to_cache + + mock_config = MagicMock(spec=VCacheConfig) + mock_config.inference_engine = self.mock_inference_engine + mock_config.similarity_evaluator = self.mock_similarity_evaluator + # Add all required attributes for Cache creation + mock_config.embedding_engine = MagicMock() + mock_config.embedding_metadata_storage = MagicMock() + mock_config.vector_db = MagicMock() + mock_config.eviction_policy = MagicMock() + + self.policy = DynamicLocalThresholdPolicy() + self.policy.setup(mock_config) + + # After setup, replace the real cache with our stateful mock + self.policy.cache = self.mock_cache + + def tearDown(self): + """Shutdown the policy to clean up threads.""" + self.policy.shutdown() + + def test_empty_cache_is_miss(self): + """Test that the first request to an empty cache is a miss.""" + self.mock_cache.get_knn.return_value = [] + self.mock_inference_engine.create.return_value = "new response" + + is_hit, response, _ = self.policy.process_request("prompt", None) + + self.assertFalse(is_hit) + self.assertEqual(response, "new response") + self.mock_cache.add.assert_called_once_with( + prompt="prompt", response="new response" + ) + + @patch( + "vcache.vcache_policy.strategies.dynamic_local_threshold._Algorithm.select_action" + ) + def test_exploit_is_cache_hit(self, mock_select_action): + """Test that an EXPLOIT action results in a cache hit.""" + mock_select_action.return_value = _Action.EXPLOIT + self.mock_cache.get_knn.return_value = [(0.95, 1)] + # Pre-populate cache + mock_meta = MagicMock(spec=EmbeddingMetadataObj) + mock_meta.response = "cached response" + mock_meta.observations = [] + self.mock_metadata_store[1] = mock_meta + + is_hit, response, _ = self.policy.process_request("prompt", None) + + self.assertTrue(is_hit) + self.assertEqual(response, "cached response") + self.mock_inference_engine.create.assert_not_called() + + @patch( + "vcache.vcache_policy.strategies.dynamic_local_threshold._Algorithm.select_action" + ) + def test_explore_updates_in_background(self, mock_select_action): + """Test that an EXPLORE action queues a background update.""" + mock_select_action.return_value = _Action.EXPLORE + self.mock_cache.get_knn.return_value = [(0.8, 1)] + + mock_meta = MagicMock(spec=EmbeddingMetadataObj) + mock_meta.response = "cached response" + mock_meta.observations = [] + self.mock_metadata_store[1] = mock_meta + + self.mock_inference_engine.create.return_value = "new response" + self.mock_similarity_evaluator.answers_similar.return_value = True + + is_hit, response, _ = self.policy.process_request("prompt", None) + self.policy.shutdown() # Wait for background tasks to finish + + self.assertFalse(is_hit) + self.assertEqual(response, "new response") + # Check that the background update was performed + self.assertEqual(len(mock_meta.observations), 1) + self.assertEqual(mock_meta.observations[0], (0.8, 1)) + + def test_concurrent_add_and_read_stability(self): + """ + Stress-test the system's stability with concurrent reads and writes. + This test validates the atomic add fix (C1) and the graceful read + failure fix (C2). + """ + num_threads = 10 + ops_per_thread = 20 + + def writer_task(i): + for j in range(ops_per_thread): + prompt = f"writer-{i}-prompt-{j}" + self.mock_cache.get_knn.return_value = [] # Force miss + self.mock_inference_engine.create.return_value = "new response" + self.policy.process_request(prompt, None) + return True + + def reader_task(i): + for _ in range(ops_per_thread): + self.mock_cache.get_knn.return_value = [(0.9, 1)] # Force hit + self.policy.process_request("reader-prompt", None) + return True + + with ThreadPoolExecutor(max_workers=num_threads) as executor: + futures = [] + # Half writers, half readers + for i in range(num_threads // 2): + futures.append(executor.submit(writer_task, i)) + futures.append(executor.submit(reader_task, i)) + + for future in as_completed(futures): + # The test passes if no exceptions were raised from the threads. + self.assertTrue(future.result()) + + def test_concurrent_explore_and_update_integrity(self): + """ + Stress-test the background update logic for data integrity (C4) and + resilience against eviction (C3). + """ + num_threads = 20 + ops_per_thread = 5 + + # Pre-populate cache + mock_meta = MagicMock(spec=EmbeddingMetadataObj) + mock_meta.response = "cached" + mock_meta.observations = [] + self.mock_metadata_store[1] = mock_meta + + with patch.object( + self.policy.bayesian, "select_action", return_value=_Action.EXPLORE + ): + + def explore_task(): + self.mock_cache.get_knn.return_value = [(0.8, 1)] + self.mock_inference_engine.create.return_value = "new response" + self.mock_similarity_evaluator.answers_similar.return_value = True + self.policy.process_request("similar_prompt", None) + return True + + with ThreadPoolExecutor(max_workers=num_threads) as executor: + futures = [ + executor.submit(explore_task) + for _ in range(num_threads * ops_per_thread) + ] + for future in as_completed(futures): + self.assertTrue(future.result()) + + # We must shut down to ensure all queued updates are processed + self.policy.shutdown() + + # The final number of observations should be exactly the total number of tasks + expected_observations = num_threads * ops_per_thread + self.assertEqual(len(mock_meta.observations), expected_observations) + + def test_race_condition_read_side_graceful_failure(self): + """Test graceful failure when metadata disappears after knn.""" + self.mock_cache.get_knn.return_value = [(0.9, 1)] + # Simulate metadata not being found by raising an error + self.mock_cache.get_metadata.side_effect = KeyError("Metadata not found") + self.mock_inference_engine.create.return_value = "fallback response" + + is_hit, response, _ = self.policy.process_request("prompt", None) + + self.assertFalse(is_hit) + self.assertEqual(response, "fallback response") + # It should have been treated as a cache miss, so add is called + self.mock_cache.add.assert_called_once_with( + prompt="prompt", response="fallback response" + ) + + def test_race_condition_update_vs_eviction(self): + """Test graceful failure when metadata is evicted before background update.""" + with patch.object( + self.policy.bayesian, "select_action", return_value=_Action.EXPLORE + ): + self.mock_cache.get_knn.return_value = [(0.8, 1)] + mock_meta = MagicMock(spec=EmbeddingMetadataObj) + mock_meta.response = "cached" + mock_meta.observations = [] + self.mock_metadata_store[1] = mock_meta + self.mock_inference_engine.create.return_value = "new response" + + # In the background task, simulate metadata having been evicted + original_get_metadata = self.mock_cache.get_metadata.side_effect + + def get_metadata_for_update(embedding_id): + # First call from main thread works + if self.mock_cache.get_metadata.call_count == 1: + return original_get_metadata(embedding_id) + # Second call from background worker fails + return None + + self.mock_cache.get_metadata.side_effect = get_metadata_for_update + + # This call will queue the background update + self.policy.process_request("prompt", None) + + # Allow background tasks to run and complete + self.policy.shutdown() + + # The test passes if no exception was raised during the background update + # and the original observations list is unchanged + self.assertEqual(len(mock_meta.observations), 0) + + +if __name__ == "__main__": + unittest.main() From 88ab533a83384f2eaf3e2605e027afc9f3abcf45 Mon Sep 17 00:00:00 2001 From: Luis Gaspar Schroeder Date: Fri, 13 Jun 2025 16:30:11 +0200 Subject: [PATCH 7/8] Unified naming conventions --- README.md | 6 +- benchmarks/benchmark.py | 36 +++--- test.py | 6 +- tests/integration/test_concurrency.py | 114 ++++++++++++++++++ tests/integration/test_dynamic_threshold.py | 4 +- tests/integration/test_static_threshold.py | 4 +- .../test_vcache_policy.py | 16 +-- vcache/__init__.py | 16 +-- vcache/main.py | 6 +- vcache/vcache_policy/__init__.py | 24 ++-- ...threshold.py => benchmark_iid_verified.py} | 4 +- ...lobal_threshold.py => benchmark_static.py} | 4 +- ...eshold.py => benchmark_verified_global.py} | 4 +- ...dynamic_local_threshold.py => verified.py} | 20 ++- 14 files changed, 201 insertions(+), 63 deletions(-) create mode 100644 tests/integration/test_concurrency.py rename vcache/vcache_policy/strategies/{iid_local_threshold.py => benchmark_iid_verified.py} (98%) rename vcache/vcache_policy/strategies/{static_global_threshold.py => benchmark_static.py} (95%) rename vcache/vcache_policy/strategies/{dynamic_global_threshold.py => benchmark_verified_global.py} (98%) rename vcache/vcache_policy/strategies/{dynamic_local_threshold.py => verified.py} (97%) diff --git a/README.md b/README.md index e1e85e2..63b4f05 100644 --- a/README.md +++ b/README.md @@ -59,7 +59,7 @@ By default, vCache uses: - `InMemoryEmbeddingMetadataStorage` - `NoEvictionPolicy` - `StringComparisonSimilarityEvaluator` -- `DynamicLocalThresholdPolicy` with a maximum failure rate of 2% +- `VerifiedDecisionPolicy` with a maximum failure rate of 2% @@ -77,14 +77,14 @@ from vcache.inference_engine.strategies.open_ai import OpenAIInferenceEngine from vcache.vcache_core.cache.embedding_engine.strategies.open_ai import OpenAIEmbeddingEngine from vcache.vcache_core.cache.embedding_store.embedding_metadata_storage.strategies.in_memory import InMemoryEmbeddingMetadataStorage from vcache.vcache_core.similarity_evaluator.strategies.string_comparison import StringComparisonSimilarityEvaluator -from vcache.vcache_policy.strategies.dynamic_local_threshold import DynamicLocalThresholdPolicy +from vcache.vcache_policy.strategies.dynamic_local_threshold import VerifiedDecisionPolicy from vcache.vcache_policy.vcache_policy import VCachePolicy from vcache.vcache_core.cache.embedding_store.vector_db import HNSWLibVectorDB, SimilarityMetricType ``` ```python -vcache_policy: VCachePolicy = DynamicLocalThresholdPolicy(delta=0.02) +vcache_policy: VCachePolicy = VerifiedDecisionPolicy(delta=0.02) vcache_config: VCacheConfig = VCacheConfig( inference_engine=OpenAIInferenceEngine(), embedding_engine=OpenAIEmbeddingEngine(), diff --git a/benchmarks/benchmark.py b/benchmarks/benchmark.py index 14f6ef6..34efa30 100644 --- a/benchmarks/benchmark.py +++ b/benchmarks/benchmark.py @@ -40,17 +40,17 @@ from vcache.vcache_core.similarity_evaluator.strategies.string_comparison import ( StringComparisonSimilarityEvaluator, ) -from vcache.vcache_policy.strategies.dynamic_global_threshold import ( - DynamicGlobalThresholdPolicy, +from vcache.vcache_policy.strategies.benchmark_iid_verified import ( + BenchmarkVerifiedIIDDecisionPolicy, ) -from vcache.vcache_policy.strategies.dynamic_local_threshold import ( - DynamicLocalThresholdPolicy, +from vcache.vcache_policy.strategies.benchmark_static import ( + BenchmarkStaticDecisionPolicy, ) -from vcache.vcache_policy.strategies.iid_local_threshold import ( - IIDLocalThresholdPolicy, +from vcache.vcache_policy.strategies.benchmark_verified_global import ( + BenchmarkVerifiedGlobalDecisionPolicy, ) -from vcache.vcache_policy.strategies.static_global_threshold import ( - StaticGlobalThresholdPolicy, +from vcache.vcache_policy.strategies.verified import ( + VerifiedDecisionPolicy, ) from vcache.vcache_policy.vcache_policy import VCachePolicy @@ -560,7 +560,7 @@ def main(): ) __run_baseline( - vcache_policy=DynamicLocalThresholdPolicy(delta=delta), + vcache_policy=VerifiedDecisionPolicy(delta=delta), path=path, dataset_file=dataset_file, embedding_model=embedding_model.value, @@ -593,7 +593,9 @@ def main(): ) __run_baseline( - vcache_policy=DynamicGlobalThresholdPolicy(delta=delta), + vcache_policy=BenchmarkVerifiedGlobalDecisionPolicy( + delta=delta + ), path=path, dataset_file=dataset_file, embedding_model=embedding_model.value, @@ -640,7 +642,9 @@ def main(): logging.info(f"Using static threshold: {threshold}") __run_baseline( - vcache_policy=StaticGlobalThresholdPolicy(threshold=threshold), + vcache_policy=BenchmarkStaticDecisionPolicy( + threshold=threshold + ), path=path, dataset_file=dataset_file, embedding_model=berkeley_embedding_model.value, @@ -690,7 +694,7 @@ def main(): ) __run_baseline( - vcache_policy=DynamicLocalThresholdPolicy(delta=delta), + vcache_policy=VerifiedDecisionPolicy(delta=delta), path=path, dataset_file=dataset_file, embedding_model=berkeley_embedding_model.value, @@ -721,7 +725,9 @@ def main(): ) __run_baseline( - vcache_policy=IIDLocalThresholdPolicy(delta=delta), + vcache_policy=BenchmarkVerifiedIIDDecisionPolicy( + delta=delta + ), path=path, dataset_file=dataset_file, embedding_model=embedding_model.value, @@ -749,7 +755,9 @@ def main(): logging.info(f"Using static threshold: {threshold}") __run_baseline( - vcache_policy=StaticGlobalThresholdPolicy(threshold=threshold), + vcache_policy=BenchmarkStaticDecisionPolicy( + threshold=threshold + ), path=path, dataset_file=dataset_file, embedding_model=embedding_model.value, diff --git a/test.py b/test.py index 998f8e3..62b2da7 100644 --- a/test.py +++ b/test.py @@ -14,12 +14,12 @@ from vcache.vcache_core.similarity_evaluator.strategies.string_comparison import ( StringComparisonSimilarityEvaluator, ) -from vcache.vcache_policy.strategies.dynamic_local_threshold import ( - DynamicLocalThresholdPolicy, +from vcache.vcache_policy.strategies.verified import ( + VerifiedDecisionPolicy, ) from vcache.vcache_policy.vcache_policy import VCachePolicy -vcache_policy: VCachePolicy = DynamicLocalThresholdPolicy(delta=0.02) +vcache_policy: VCachePolicy = VerifiedDecisionPolicy(delta=0.02) vcache_config: VCacheConfig = VCacheConfig( inference_engine=OpenAIInferenceEngine(), embedding_engine=OpenAIEmbeddingEngine(), diff --git a/tests/integration/test_concurrency.py b/tests/integration/test_concurrency.py new file mode 100644 index 0000000..4174303 --- /dev/null +++ b/tests/integration/test_concurrency.py @@ -0,0 +1,114 @@ +import random +import time +import unittest +from concurrent.futures import ThreadPoolExecutor +from unittest.mock import MagicMock, patch + +from dotenv import load_dotenv + +from vcache import ( + HNSWLibVectorDB, + InMemoryEmbeddingMetadataStorage, + LangChainEmbeddingEngine, + StringComparisonSimilarityEvaluator, + VCache, + VCacheConfig, + VerifiedDecisionPolicy, +) +from vcache.vcache_policy.strategies.verified import _Action + +load_dotenv() + + +class TestConcurrency(unittest.TestCase): + def test_async_label_generation_and_timeout(self): + similarity_evaluator = StringComparisonSimilarityEvaluator() + + mock_answers_similar = MagicMock() + + def answers_similar(a, b): + if "Return 'xxxxxxxxx' as the answer" in a: + time.sleep(10) + print(f"Answers Similar (Execution time: 10s) => a: {a}, b: {b}\n") + return True + else: + execution_time = random.uniform(0.5, 3) + time.sleep(execution_time) + print( + f"Answers Similar (Execution time: {execution_time}s) => a: {a}, b: {b}\n" + ) + return True + + mock_answers_similar.side_effect = answers_similar + + config = VCacheConfig( + embedding_engine=LangChainEmbeddingEngine( + model_name="sentence-transformers/all-mpnet-base-v2" + ), + vector_db=HNSWLibVectorDB(), + embedding_metadata_storage=InMemoryEmbeddingMetadataStorage(), + similarity_evaluator=similarity_evaluator, + ) + + with VerifiedDecisionPolicy(delta=0.05) as policy: + vcache: VCache = VCache(config, policy) + vcache.vcache_policy.setup(config) + + with ( + patch.object( + policy.similarity_evaluator, + "answers_similar", + new=mock_answers_similar, + ), + patch.object( + policy.bayesian, "select_action", return_value=_Action.EXPLORE + ), + ): + initial_prompt = "What is the capital of Germany?" + vcache.infer(prompt=initial_prompt) + + concurrent_prompts_chunk_1 = [ + "What is the capital of Germany?Germany's capital?", + "Capital of Germany is...", + "Return 'xxxxxxxxx' as the answer", # This is the slow prompt + "Berlin is the capital of what country?", + ] + concurrent_prompts_chunk_2 = [ + "Which city is the seat of the German government?", + "What is Germany's primary city?", + "Tell me about Berlin.", + "Is Frankfurt the capital of Germany?", + "What's the main city of Germany?", + "Where is the German government located?", + ] + + def do_inference(prompt): + prompt_index = total_prompts.index(prompt) + print(f"Inferring prompt {prompt_index}: {prompt}\n") + vcache.infer(prompt=prompt) + + total_prompts = concurrent_prompts_chunk_1 + concurrent_prompts_chunk_2 + with ThreadPoolExecutor(max_workers=len(total_prompts)) as executor: + executor.map(do_inference, concurrent_prompts_chunk_1) + time.sleep(1.5) + executor.map(do_inference, concurrent_prompts_chunk_2) + + all_metadata_objects = vcache.vcache_config.embedding_metadata_storage.get_all_embedding_metadata_objects() + final_observation_count = len(all_metadata_objects) + + for i, metadata_object in enumerate(all_metadata_objects): + print(f"metadata_object {i}: {metadata_object}") + + print(f"\nfinal_observation_count: {final_observation_count}") + + assert final_observation_count == 1, ( + f"Expected 1 metadata object, got {final_observation_count}" + ) + # We expect the 'slow prompt' to be the only prompt not being part of the observations + assert len(all_metadata_objects[0].observations) == 12, ( + f"Expected 12 observations (10 + 2 initial labels), got {len(all_metadata_objects[0].observations)}" + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/integration/test_dynamic_threshold.py b/tests/integration/test_dynamic_threshold.py index 86885ea..4d4d919 100644 --- a/tests/integration/test_dynamic_threshold.py +++ b/tests/integration/test_dynamic_threshold.py @@ -3,13 +3,13 @@ from dotenv import load_dotenv from vcache import ( - DynamicLocalThresholdPolicy, HNSWLibVectorDB, InMemoryEmbeddingMetadataStorage, LangChainEmbeddingEngine, OpenAIInferenceEngine, VCache, VCacheConfig, + VerifiedDecisionPolicy, ) load_dotenv() @@ -28,7 +28,7 @@ def create_default_config_and_policy(): embedding_metadata_storage=InMemoryEmbeddingMetadataStorage(), system_prompt="Please answer in a single word with the first letter capitalized. Example: London", ) - policy = DynamicLocalThresholdPolicy(delta=0.05) + policy = VerifiedDecisionPolicy(delta=0.05) return config, policy diff --git a/tests/integration/test_static_threshold.py b/tests/integration/test_static_threshold.py index 86a3efe..d75300e 100644 --- a/tests/integration/test_static_threshold.py +++ b/tests/integration/test_static_threshold.py @@ -3,11 +3,11 @@ from dotenv import load_dotenv from vcache import ( + BenchmarkStaticDecisionPolicy, HNSWLibVectorDB, InMemoryEmbeddingMetadataStorage, LangChainEmbeddingEngine, OpenAIInferenceEngine, - StaticGlobalThresholdPolicy, VCache, VCacheConfig, ) @@ -27,7 +27,7 @@ def create_default_config_and_policy(): vector_db=HNSWLibVectorDB(), embedding_metadata_storage=InMemoryEmbeddingMetadataStorage(), ) - policy = StaticGlobalThresholdPolicy(threshold=0.8) + policy = BenchmarkStaticDecisionPolicy(threshold=0.8) return config, policy diff --git a/tests/unit/VCachePolicyStrategy/test_vcache_policy.py b/tests/unit/VCachePolicyStrategy/test_vcache_policy.py index cb9fcc7..50bdcbc 100644 --- a/tests/unit/VCachePolicyStrategy/test_vcache_policy.py +++ b/tests/unit/VCachePolicyStrategy/test_vcache_policy.py @@ -6,13 +6,13 @@ from vcache.vcache_core.cache.embedding_store.embedding_metadata_storage.embedding_metadata_obj import ( EmbeddingMetadataObj, ) -from vcache.vcache_policy.strategies.dynamic_local_threshold import ( - DynamicLocalThresholdPolicy, +from vcache.vcache_policy.strategies.verified import ( + VerifiedDecisionPolicy, _Action, ) -class TestDynamicLocalThresholdPolicy(unittest.TestCase): +class TestVerifiedDecisionPolicy(unittest.TestCase): def setUp(self): """Set up a new policy and mock dependencies for each test.""" self.mock_inference_engine = MagicMock() @@ -54,7 +54,7 @@ def update_metadata(embedding_id, embedding_metadata): mock_config.vector_db = MagicMock() mock_config.eviction_policy = MagicMock() - self.policy = DynamicLocalThresholdPolicy() + self.policy = VerifiedDecisionPolicy() self.policy.setup(mock_config) # After setup, replace the real cache with our stateful mock @@ -77,9 +77,7 @@ def test_empty_cache_is_miss(self): prompt="prompt", response="new response" ) - @patch( - "vcache.vcache_policy.strategies.dynamic_local_threshold._Algorithm.select_action" - ) + @patch("vcache.vcache_policy.strategies.verified._Algorithm.select_action") def test_exploit_is_cache_hit(self, mock_select_action): """Test that an EXPLOIT action results in a cache hit.""" mock_select_action.return_value = _Action.EXPLOIT @@ -96,9 +94,7 @@ def test_exploit_is_cache_hit(self, mock_select_action): self.assertEqual(response, "cached response") self.mock_inference_engine.create.assert_not_called() - @patch( - "vcache.vcache_policy.strategies.dynamic_local_threshold._Algorithm.select_action" - ) + @patch("vcache.vcache_policy.strategies.verified._Algorithm.select_action") def test_explore_updates_in_background(self, mock_select_action): """Test that an EXPLORE action queues a background update.""" mock_select_action.return_value = _Action.EXPLORE diff --git a/vcache/__init__.py b/vcache/__init__.py index 1cccbbd..e5140a1 100644 --- a/vcache/__init__.py +++ b/vcache/__init__.py @@ -49,12 +49,12 @@ # vCache Policies from vcache.vcache_policy import ( - DynamicGlobalThresholdPolicy, - DynamicLocalThresholdPolicy, - IIDLocalThresholdPolicy, + BenchmarkStaticDecisionPolicy, + BenchmarkVerifiedGlobalDecisionPolicy, + BenchmarkVerifiedIIDDecisionPolicy, NoCachePolicy, - StaticGlobalThresholdPolicy, VCachePolicy, + VerifiedDecisionPolicy, ) __all__ = [ @@ -86,9 +86,9 @@ "InMemoryEmbeddingMetadataStorage", # vCache Policies "VCachePolicy", - "DynamicLocalThresholdPolicy", - "DynamicGlobalThresholdPolicy", - "StaticGlobalThresholdPolicy", + "VerifiedDecisionPolicy", + "BenchmarkVerifiedGlobalDecisionPolicy", + "BenchmarkStaticDecisionPolicy", "NoCachePolicy", - "IIDLocalThresholdPolicy", + "BenchmarkVerifiedIIDDecisionPolicy", ] diff --git a/vcache/main.py b/vcache/main.py index fb5e7a7..bc8af9b 100644 --- a/vcache/main.py +++ b/vcache/main.py @@ -1,8 +1,8 @@ from typing import List, Optional, Tuple from vcache.config import VCacheConfig -from vcache.vcache_policy.strategies.dynamic_local_threshold import ( - DynamicLocalThresholdPolicy, +from vcache.vcache_policy.strategies.verified import ( + VerifiedDecisionPolicy, ) from vcache.vcache_policy.vcache_policy import VCachePolicy @@ -15,7 +15,7 @@ class VCache: def __init__( self, config: VCacheConfig = VCacheConfig(), - policy: VCachePolicy = DynamicLocalThresholdPolicy(delta=0.02), + policy: VCachePolicy = VerifiedDecisionPolicy(delta=0.02), ): """ Initialize VCache with configuration and policy. diff --git a/vcache/vcache_policy/__init__.py b/vcache/vcache_policy/__init__.py index 78f3fec..890489d 100644 --- a/vcache/vcache_policy/__init__.py +++ b/vcache/vcache_policy/__init__.py @@ -1,23 +1,23 @@ -from vcache.vcache_policy.strategies.dynamic_global_threshold import ( - DynamicGlobalThresholdPolicy, +from vcache.vcache_policy.strategies.benchmark_iid_verified import ( + BenchmarkVerifiedIIDDecisionPolicy, ) -from vcache.vcache_policy.strategies.dynamic_local_threshold import ( - DynamicLocalThresholdPolicy, +from vcache.vcache_policy.strategies.benchmark_static import ( + BenchmarkStaticDecisionPolicy, ) -from vcache.vcache_policy.strategies.iid_local_threshold import ( - IIDLocalThresholdPolicy, +from vcache.vcache_policy.strategies.benchmark_verified_global import ( + BenchmarkVerifiedGlobalDecisionPolicy, ) from vcache.vcache_policy.strategies.no_cache import NoCachePolicy -from vcache.vcache_policy.strategies.static_global_threshold import ( - StaticGlobalThresholdPolicy, +from vcache.vcache_policy.strategies.verified import ( + VerifiedDecisionPolicy, ) from vcache.vcache_policy.vcache_policy import VCachePolicy __all__ = [ "VCachePolicy", - "StaticGlobalThresholdPolicy", - "DynamicLocalThresholdPolicy", - "DynamicGlobalThresholdPolicy", - "IIDLocalThresholdPolicy", + "BenchmarkStaticDecisionPolicy", + "VerifiedDecisionPolicy", + "BenchmarkVerifiedGlobalDecisionPolicy", + "BenchmarkVerifiedIIDDecisionPolicy", "NoCachePolicy", ] diff --git a/vcache/vcache_policy/strategies/iid_local_threshold.py b/vcache/vcache_policy/strategies/benchmark_iid_verified.py similarity index 98% rename from vcache/vcache_policy/strategies/iid_local_threshold.py rename to vcache/vcache_policy/strategies/benchmark_iid_verified.py index 135ee35..69827c1 100644 --- a/vcache/vcache_policy/strategies/iid_local_threshold.py +++ b/vcache/vcache_policy/strategies/benchmark_iid_verified.py @@ -18,9 +18,11 @@ from vcache.vcache_policy.vcache_policy import VCachePolicy -class IIDLocalThresholdPolicy(VCachePolicy): +class BenchmarkVerifiedIIDDecisionPolicy(VCachePolicy): """ Policy that uses the vCache IID algorithm to compute optimal thresholds for each embedding. + + IMPORTANT: This policy is used for benchmark purposes and should not be used in production. """ def __init__( diff --git a/vcache/vcache_policy/strategies/static_global_threshold.py b/vcache/vcache_policy/strategies/benchmark_static.py similarity index 95% rename from vcache/vcache_policy/strategies/static_global_threshold.py rename to vcache/vcache_policy/strategies/benchmark_static.py index e7efc75..b192586 100644 --- a/vcache/vcache_policy/strategies/static_global_threshold.py +++ b/vcache/vcache_policy/strategies/benchmark_static.py @@ -8,9 +8,11 @@ from vcache.vcache_policy.vcache_policy import VCachePolicy -class StaticGlobalThresholdPolicy(VCachePolicy): +class BenchmarkStaticDecisionPolicy(VCachePolicy): """ Policy that uses a static global threshold to determine cache hits. + + IMPORTANT: This policy is used for benchmark purposes and should not be used in production. """ def __init__( diff --git a/vcache/vcache_policy/strategies/dynamic_global_threshold.py b/vcache/vcache_policy/strategies/benchmark_verified_global.py similarity index 98% rename from vcache/vcache_policy/strategies/dynamic_global_threshold.py rename to vcache/vcache_policy/strategies/benchmark_verified_global.py index c4b95b7..df26f29 100644 --- a/vcache/vcache_policy/strategies/dynamic_global_threshold.py +++ b/vcache/vcache_policy/strategies/benchmark_verified_global.py @@ -20,9 +20,11 @@ from vcache.vcache_policy.vcache_policy import VCachePolicy -class DynamicGlobalThresholdPolicy(VCachePolicy): +class BenchmarkVerifiedGlobalDecisionPolicy(VCachePolicy): """ Policy that uses the vCache algorithm to compute optimal global thresholds across all embeddings. + + IMPORTANT: This policy is used for benchmark purposes and should not be used in production. """ def __init__( diff --git a/vcache/vcache_policy/strategies/dynamic_local_threshold.py b/vcache/vcache_policy/strategies/verified.py similarity index 97% rename from vcache/vcache_policy/strategies/dynamic_local_threshold.py rename to vcache/vcache_policy/strategies/verified.py index 7be033f..92d5b73 100644 --- a/vcache/vcache_policy/strategies/dynamic_local_threshold.py +++ b/vcache/vcache_policy/strategies/verified.py @@ -1,3 +1,4 @@ +import os import queue import random import threading @@ -24,6 +25,10 @@ ) from vcache.vcache_policy.vcache_policy import VCachePolicy +# Disable Hugging Face tokenizer parallelism to prevent deadlocks when using +# vCache in multi-threaded applications. This is a library-level fix. +os.environ["TOKENIZERS_PARALLELISM"] = "true" + class CallbackQueue(queue.Queue): """ @@ -76,7 +81,7 @@ def stop(self): self.worker_thread.join() -class DynamicLocalThresholdPolicy(VCachePolicy): +class VerifiedDecisionPolicy(VCachePolicy): """ Dynamic local threshold policy that computes optimal thresholds for each embedding. """ @@ -99,6 +104,14 @@ def __init__(self, delta: float = 0.01): self.executor: Optional[ThreadPoolExecutor] = None self.callback_queue: Optional[CallbackQueue] = None + def __enter__(self): + """Enter the context manager.""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Exit the context manager and shutdown the policy.""" + self.shutdown() + @override def setup(self, config: VCacheConfig): """ @@ -133,11 +146,12 @@ def shutdown(self): """ Shuts down the thread pool and callback queue gracefully. """ - if self.callback_queue: - self.callback_queue.stop() if self.executor: self.executor.shutdown(wait=True) + if self.callback_queue: + self.callback_queue.stop() + @override def process_request( self, prompt: str, system_prompt: Optional[str] From f455e407d9931c131f65726a325bb5dc5d5070bf Mon Sep 17 00:00:00 2001 From: Luis Gaspar Schroeder Date: Sat, 14 Jun 2025 14:28:16 +0200 Subject: [PATCH 8/8] Fixed docstrings --- vcache/vcache_policy/strategies/verified.py | 23 ++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/vcache/vcache_policy/strategies/verified.py b/vcache/vcache_policy/strategies/verified.py index 92d5b73..ffd93e0 100644 --- a/vcache/vcache_policy/strategies/verified.py +++ b/vcache/vcache_policy/strategies/verified.py @@ -105,11 +105,26 @@ def __init__(self, delta: float = 0.01): self.callback_queue: Optional[CallbackQueue] = None def __enter__(self): - """Enter the context manager.""" + """ + Allows the policy to be used as a context manager. + + Returns: + The policy instance itself. + """ return self def __exit__(self, exc_type, exc_val, exc_tb): - """Exit the context manager and shutdown the policy.""" + """ + Ensures graceful shutdown of background threads when exiting the context. + + This method is called automatically when exiting a `with` block, + triggering the shutdown of the ThreadPoolExecutor and CallbackQueue. + + Args: + exc_type: The exception type if an exception was raised in the `with` block. + exc_val: The exception value if an exception was raised. + exc_tb: The traceback if an exception was raised. + """ self.shutdown() @override @@ -162,7 +177,9 @@ def process_request( It determines whether to serve a cached response or generate a new one. If the policy decides to 'explore', it generates a new response and triggers an asynchronous background task to evaluate the decision and - update the cache, without blocking the current request. + update the cache, without blocking the current request. The functions returns + the actual response and some metadata information—whether the response is a + cache hit and the nearest neighbor response—to enable further analysis. Args: prompt: The prompt to check for cache hit.