-
Notifications
You must be signed in to change notification settings - Fork 0
Add cache messager unit tests #3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add cache messager unit tests #3
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| def _install_dependency_stubs(): | ||
| paddle = _ensure_module("paddle") | ||
| paddle.Tensor = _FakeTensor | ||
| paddle.bfloat16 = "bfloat16" | ||
|
|
||
| def _full(shape, fill_value=0, dtype="float32"): | ||
| dtype_str = dtype if isinstance(dtype, str) else str(dtype) | ||
| return _FakeTensor(np.full(shape, fill_value), dtype=dtype_str) | ||
|
|
||
| def _to_tensor(data, dtype="float32", place=None): # pylint: disable=unused-argument | ||
| dtype_str = dtype if isinstance(dtype, str) else str(dtype) | ||
| return _FakeTensor(np.array(data), dtype=dtype_str) | ||
|
|
||
| paddle.full = _full | ||
| paddle.to_tensor = _to_tensor | ||
|
|
||
| def _set_device(_name): | ||
| return None | ||
|
|
||
| paddle.set_device = _set_device | ||
|
|
||
| device_mod = types.ModuleType("paddle.device") | ||
| device_mod.set_device = lambda _name: None | ||
| cuda_mod = types.ModuleType("paddle.device.cuda") | ||
| cuda_mod.memory_allocated = lambda: 0 | ||
| device_mod.cuda = cuda_mod | ||
| paddle.device = device_mod | ||
| sys.modules["paddle.device"] = device_mod | ||
| sys.modules["paddle.device.cuda"] = cuda_mod | ||
|
|
||
| fastdeploy_pkg = _ensure_module("fastdeploy") | ||
| fastdeploy_pkg.__path__ = [str(PROJECT_ROOT / "fastdeploy")] | ||
|
|
||
| utils_module = types.ModuleType("fastdeploy.utils") | ||
| envs_module = types.ModuleType("fastdeploy.utils.envs") | ||
| envs_module.FD_ENGINE_TASK_QUEUE_WITH_SHM = False | ||
| envs_module.ENABLE_V1_KVCACHE_SCHEDULER = False | ||
|
|
||
| class _Logger: | ||
| def __init__(self): | ||
| self.messages = {"info": [], "debug": [], "error": []} | ||
|
|
||
| def info(self, msg): | ||
| self.messages["info"].append(msg) | ||
|
|
||
| def debug(self, msg): | ||
| self.messages["debug"].append(msg) | ||
|
|
||
| def error(self, msg): | ||
| self.messages["error"].append(msg) | ||
|
|
||
| def _get_logger(_name, _filename=None): # pylint: disable=unused-argument | ||
| return _Logger() | ||
|
|
||
| utils_module.envs = envs_module | ||
| utils_module.get_logger = _get_logger | ||
| sys.modules["fastdeploy.utils"] = utils_module | ||
| sys.modules["fastdeploy.utils.envs"] = envs_module | ||
| fastdeploy_pkg.utils = utils_module | ||
|
|
||
| transfer_factory = types.ModuleType("fastdeploy.cache_manager.transfer_factory") | ||
| transfer_factory.IPCCommManager = _IPCCommManager | ||
| transfer_factory.RDMACommManager = _RDMACommManager | ||
| sys.modules["fastdeploy.cache_manager.transfer_factory"] = transfer_factory | ||
|
|
||
| config_module = types.ModuleType("fastdeploy.config") | ||
|
|
||
| class _SpeculativeConfig: | ||
| def __init__(self, config_dict): | ||
| self.num_extra_cache_layer = config_dict.get("num_extra_cache_layer", 0) | ||
| self.num_gpu_block_expand_ratio = config_dict.get("num_gpu_block_expand_ratio", 0) | ||
|
|
||
| config_module.SpeculativeConfig = _SpeculativeConfig | ||
| sys.modules["fastdeploy.config"] = config_module | ||
| fastdeploy_pkg.config = config_module | ||
|
|
||
| inter_comm_module = types.ModuleType("fastdeploy.inter_communicator") | ||
| inter_comm_module.EngineWorkerQueue = _EngineWorkerQueue | ||
| inter_comm_module.IPCSignal = _IPCSignal | ||
| inter_comm_module.shared_memory_exists = lambda _name: False | ||
| sys.modules["fastdeploy.inter_communicator"] = inter_comm_module | ||
|
|
||
| ops_gpu_module = types.ModuleType("fastdeploy.model_executor.ops.gpu") | ||
|
|
||
| def _get_output_kv_signal(buffer, rank_id, flag): # pylint: disable=unused-argument | ||
| sequence = getattr(_get_output_kv_signal, "sequence", None) | ||
| if not sequence: | ||
| raise SystemExit("kv signal stop") | ||
|
|
||
| step = sequence.pop(0) | ||
| if step.get("stop"): | ||
| raise SystemExit("kv signal stop") | ||
|
|
||
| data = buffer.numpy() | ||
| data.fill(-1) | ||
| tasks = step.get("tasks", -1) | ||
| data[0] = tasks | ||
| if tasks == -1: | ||
| return | ||
| data[1] = step.get("layer", 0) | ||
| data[2] = step.get("engine", 0) | ||
| data[3] = step.get("offset", 0) | ||
| data[4] = step.get("current", 0) | ||
|
|
||
| ops_gpu_module.get_output_kv_signal = _get_output_kv_signal | ||
| ops_gpu_module.set_data_ipc = lambda *args, **kwargs: None | ||
| sys.modules["fastdeploy.model_executor.ops.gpu"] = ops_gpu_module | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid leaking stubbed modules into global test environment
The test helper _install_dependency_stubs replaces paddle, fastdeploy, and several submodules in sys.modules without restoring the originals after the suite finishes. Because this runs at module import time, every subsequent test in the same process will see the fake implementations instead of the real ones, which can cause unrelated tests to fail or pass incorrectly once real dependencies are available. Please scope the stubs to the tests (e.g., with fixtures or context managers) or restore the previous modules in tearDown/addCleanup to prevent cross‑test contamination.
Useful? React with 👍 / 👎.
| def _install_dependency_stubs(): | ||
| # Stub paddle and paddle.distributed used during module imports. | ||
| paddle = _ensure_module("paddle") | ||
| paddle.__dict__.setdefault("__version__", "0.0.0") | ||
| paddle.Tensor = np.ndarray | ||
|
|
||
| def _split(array, sections, axis=0): | ||
| if isinstance(sections, int): | ||
| return np.array_split(array, sections, axis=axis) | ||
| raise NotImplementedError("sections must be an integer in tests") | ||
|
|
||
| def _concat(arrays, axis=0): | ||
| return np.concatenate(list(arrays), axis=axis) | ||
|
|
||
| def _to_tensor(array, dtype=None): | ||
| return np.asarray(array, dtype=dtype) | ||
|
|
||
| def _get_default_dtype(): | ||
| return np.float32 | ||
|
|
||
| class _CUDAPinnedPlace: | ||
| def __repr__(self): # pragma: no cover - representation helper | ||
| return "CUDAPinnedPlace()" | ||
|
|
||
| paddle.split = _split | ||
| paddle.concat = _concat | ||
| paddle.to_tensor = _to_tensor | ||
| paddle.get_default_dtype = _get_default_dtype | ||
| paddle.CUDAPinnedPlace = _CUDAPinnedPlace | ||
| dist = types.ModuleType("paddle.distributed") | ||
| dist.get_world_size = lambda: 1 | ||
| dist.get_rank = lambda: 0 | ||
| dist.is_initialized = lambda: False | ||
| sys.modules["paddle.distributed"] = dist | ||
| paddle.distributed = dist | ||
|
|
||
| # Stub paddleformers pieces referenced by tp_utils. | ||
| paddleformers = _ensure_module("paddleformers") | ||
| paddleformers.__path__ = [] | ||
|
|
||
| transformers = types.ModuleType("paddleformers.transformers") | ||
|
|
||
| class _PretrainedModel: | ||
| @classmethod | ||
| def _get_tensor_parallel_mappings(cls, *_args, **_kwargs): | ||
| return {} | ||
|
|
||
| @classmethod | ||
| def _resolve_prefix_keys(cls, keys, _safetensor_keys): | ||
| return {k: k for k in keys} | ||
|
|
||
| transformers.PretrainedModel = _PretrainedModel | ||
| sys.modules["paddleformers.transformers"] = transformers | ||
| paddleformers.transformers = transformers | ||
|
|
||
| conversion_utils = types.ModuleType("paddleformers.transformers.conversion_utils") | ||
|
|
||
| def _split_or_merge_func(is_split, tensor_parallel_degree, tensor_parallel_rank, **_kwargs): | ||
| axis = -1 | ||
|
|
||
| def _fn(weight, *, is_column=True, is_naive_2fuse=False): # pylint: disable=unused-argument | ||
| current_axis = axis if is_column else 0 | ||
| if is_split: | ||
| chunks = np.array_split(weight, tensor_parallel_degree, axis=current_axis) | ||
| if tensor_parallel_rank is None: | ||
| return chunks | ||
| return chunks[tensor_parallel_rank] | ||
| return np.concatenate(weight, axis=current_axis) | ||
|
|
||
| return _fn | ||
|
|
||
| conversion_utils.split_or_merge_func = _split_or_merge_func | ||
| sys.modules["paddleformers.transformers.conversion_utils"] = conversion_utils | ||
|
|
||
| utils_pkg = types.ModuleType("paddleformers.utils") | ||
| utils_pkg.__path__ = [] | ||
| sys.modules["paddleformers.utils"] = utils_pkg | ||
|
|
||
| log_module = types.ModuleType("paddleformers.utils.log") | ||
| log_module.logger = _DummyLogger() | ||
| sys.modules["paddleformers.utils.log"] = log_module | ||
| utils_pkg.log = log_module | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Module stubs persist beyond tensor-parallel tests
Similar to the cache messager tests, _install_dependency_stubs here injects stubbed versions of paddle, paddleformers, and fastdeploy into sys.modules as a global side effect and never cleans them up. When the wider test suite runs in the same Python process, later tests will interact with these stubs instead of the actual libraries, leading to incorrect failures or silently masking regressions. Consider using fixtures to temporarily patch sys.modules and restore originals after each test module completes.
Useful? React with 👍 / 👎.
Summary
Testing
Codex Task