From bc3c351d4d5eec7d8eb86e1fe6b42c95df5b3848 Mon Sep 17 00:00:00 2001 From: xunyoyo <33387866+xunyoyo@users.noreply.github.com> Date: Thu, 13 Nov 2025 01:52:55 +0800 Subject: [PATCH 1/2] Add cache messager unit tests --- tests/cache_manager/test_cache_messager.py | 851 +++++++++++++++++++++ tests/model_executor/test_tp_utils.py | 488 ++++++++++++ 2 files changed, 1339 insertions(+) create mode 100644 tests/cache_manager/test_cache_messager.py create mode 100644 tests/model_executor/test_tp_utils.py diff --git a/tests/cache_manager/test_cache_messager.py b/tests/cache_manager/test_cache_messager.py new file mode 100644 index 00000000000..f6cafdba71a --- /dev/null +++ b/tests/cache_manager/test_cache_messager.py @@ -0,0 +1,851 @@ +"""Unit tests for the cache messager helpers.""" + +from __future__ import annotations + +import importlib.util +import math +import sys +import types +import unittest +from pathlib import Path +from unittest import mock + +import numpy as np + + +PROJECT_ROOT = Path(__file__).resolve().parents[2] + + +def _ensure_module(name: str) -> types.ModuleType: + module = sys.modules.get(name) + if module is None: + module = types.ModuleType(name) + sys.modules[name] = module + return module + + +class _FakePlace: + def __init__(self, device: str): + self._device = device + + def __str__(self): # pragma: no cover - representation helper + return f"Place({self._device})" + + +class _FakeTensor: + def __init__(self, array, dtype="float32", device="gpu:0"): + self._array = np.array(array) + self.shape = tuple(self._array.shape) + self.dtype = dtype + self.place = _FakePlace(device) + + def data_ptr(self): + return int(self._array.__array_interface__["data"][0]) + + def numel(self): + return int(self._array.size) + + def numpy(self): + return self._array + + def tolist(self): # pragma: no cover - convenience helper + return self.numpy().tolist() + + def __len__(self): + return len(self._array) + + def __iter__(self): # pragma: no cover - container helper + return iter(self._array) + + def __getitem__(self, idx): + value = self._array[idx] + if isinstance(value, np.ndarray): + return _FakeTensor(value, dtype=self.dtype) + return _FakeScalar(value) + + def __setitem__(self, idx, value): + self._array[idx] = value + + +class _FakeScalar: + def __init__(self, value): + self._value = value.item() if hasattr(value, "item") else value + + def numpy(self): + return np.array(self._value) + + def tolist(self): # pragma: no cover - compatibility helper + return self.numpy().tolist() + + def __int__(self): + return int(self._value) + + def __index__(self): # pragma: no cover - required for range() + return int(self._value) + + def __eq__(self, other): # pragma: no cover - comparison helper + return int(self._value) == other + + +class ParseArgsTest(unittest.TestCase): + def test_parse_args_reads_cli_values(self): + module = _load_cache_messager() + argv = [ + "prog", + "--splitwise_role", + "decode", + "--rank", + "3", + "--device_id", + "5", + "--num_layers", + "4", + "--key_cache_shape", + "2,3,4,5", + "--value_cache_shape", + "2,3,4,5", + "--rdma_port", + "1234", + "--mp_num", + "2", + "--engine_pid", + "abc", + "--protocol", + "ipc,rdma", + "--pod_ip", + "1.2.3.4", + "--cache_queue_port", + "9100", + "--engine_worker_queue_port", + "9101", + "--cache_dtype", + "uint8", + "--speculative_config", + "{\"num_extra_cache_layer\":1}", + "--local_data_parallel_id", + "7", + ] + with mock.patch.object(sys, "argv", argv): + args = module.parse_args() + + self.assertEqual(args.splitwise_role, "decode") + self.assertEqual(args.rank, 3) + self.assertEqual(args.device_id, 5) + self.assertEqual(args.num_layers, 4) + self.assertEqual(args.protocol, "ipc,rdma") + self.assertEqual(args.cache_dtype, "uint8") + self.assertEqual(args.local_data_parallel_id, 7) + self.assertEqual(args.speculative_config["num_extra_cache_layer"], 1) + + +class _Barrier: + def __init__(self): + self.wait_calls = 0 + + def wait(self): + self.wait_calls += 1 + + +class _IPCCommManager: + def __init__(self, rank, gpu_id, cache_k, cache_v): # pylint: disable=unused-argument + self.rank = rank + self.gpu_id = gpu_id + self.cache_k = cache_k + self.cache_v = cache_v + self.write_calls = [] + self.sync_targets = [] + + def write_cache(self, target_ip, target_id, src_block_ids, dest_block_ids, layer_idx): + self.write_calls.append( + (target_ip, target_id, tuple(src_block_ids), tuple(dest_block_ids), layer_idx) + ) + return 0 + + def write_block_by_sync(self, target_id): + self.sync_targets.append(target_id) + + +class _RDMACommManager: + def __init__( + self, + splitwise_role, + rank, + gpu_id, + cache_k_ptr_list, + cache_v_ptr_list, + max_block_num, + block_bytes, + rdma_port, + ): # pylint: disable=unused-argument + self.rank = rank + self.calls = [] + self.connect_results = [] + + def connect(self, target_ip, target_id): + result = True if not self.connect_results else self.connect_results.pop(0) + self.calls.append((target_ip, target_id, result)) + return result + + def write_cache(self, *args, **kwargs): # pragma: no cover - compatibility helper + return 0 + + +class _IPCSignal: + instances: dict[str, "_IPCSignal"] = {} + + def __init__(self, name, array, dtype=None, suffix=None, create=False): # noqa: D401 + # pylint: disable=unused-argument + self.name = name + self.value = np.array(array) + _IPCSignal.instances[name if suffix is None else f"{name}_{suffix}"] = self + + +class _EngineWorkerQueue: + def __init__( + self, + address, + is_server, + num_client, + client_id, + local_data_parallel_id, + ): + self.address = address + self.is_server = is_server + self.num_client = num_client + self.client_id = client_id + self.local_data_parallel_id = local_data_parallel_id + self.cache_info_barrier = _Barrier() + self.finish_send_cache_barrier = _Barrier() + self.finish_add_cache_task_barrier = _Barrier() + self.begin_send_cache_barrier = _Barrier() + self.connect_task_barrier = _Barrier() + self.connect_task_response_barrier = _Barrier() + self.cache_info_sequence = [] + self.cache_info_calls = 0 + self.stop_after_cache_info = False + self.signal_initializer = None + self.connect_tasks = [] + self.connect_task_calls = 0 + self.stop_after_connect_tasks = False + self.finished_requests = [] + self.connect_responses = [] + self.finished_add_cache_task_req = [] + + def get_cache_info(self): + if self.cache_info_calls == 0 and self.signal_initializer: + self.signal_initializer() + if self.cache_info_calls < len(self.cache_info_sequence): + info = self.cache_info_sequence[self.cache_info_calls] + self.cache_info_calls += 1 + return info + if self.stop_after_cache_info: + raise SystemExit("stop cache info") + return [] + + def put_finished_req(self, request_payload): + self.finished_requests.append(request_payload) + + def put_finished_add_cache_task_req(self, req_ids): + self.finished_add_cache_task_req.append(req_ids) + + def get_connect_rdma_task(self): + if self.connect_task_calls < len(self.connect_tasks): + task = self.connect_tasks[self.connect_task_calls] + self.connect_task_calls += 1 + return task, None + if self.stop_after_connect_tasks: + raise SystemExit("stop connect task") + return None, None + + def put_connect_rdma_task_response(self, response): + self.connect_responses.append(response) + + +def _install_dependency_stubs(): + paddle = _ensure_module("paddle") + paddle.Tensor = _FakeTensor + paddle.bfloat16 = "bfloat16" + + def _full(shape, fill_value=0, dtype="float32"): + dtype_str = dtype if isinstance(dtype, str) else str(dtype) + return _FakeTensor(np.full(shape, fill_value), dtype=dtype_str) + + def _to_tensor(data, dtype="float32", place=None): # pylint: disable=unused-argument + dtype_str = dtype if isinstance(dtype, str) else str(dtype) + return _FakeTensor(np.array(data), dtype=dtype_str) + + paddle.full = _full + paddle.to_tensor = _to_tensor + + def _set_device(_name): + return None + + paddle.set_device = _set_device + + device_mod = types.ModuleType("paddle.device") + device_mod.set_device = lambda _name: None + cuda_mod = types.ModuleType("paddle.device.cuda") + cuda_mod.memory_allocated = lambda: 0 + device_mod.cuda = cuda_mod + paddle.device = device_mod + sys.modules["paddle.device"] = device_mod + sys.modules["paddle.device.cuda"] = cuda_mod + + fastdeploy_pkg = _ensure_module("fastdeploy") + fastdeploy_pkg.__path__ = [str(PROJECT_ROOT / "fastdeploy")] + + utils_module = types.ModuleType("fastdeploy.utils") + envs_module = types.ModuleType("fastdeploy.utils.envs") + envs_module.FD_ENGINE_TASK_QUEUE_WITH_SHM = False + envs_module.ENABLE_V1_KVCACHE_SCHEDULER = False + + class _Logger: + def __init__(self): + self.messages = {"info": [], "debug": [], "error": []} + + def info(self, msg): + self.messages["info"].append(msg) + + def debug(self, msg): + self.messages["debug"].append(msg) + + def error(self, msg): + self.messages["error"].append(msg) + + def _get_logger(_name, _filename=None): # pylint: disable=unused-argument + return _Logger() + + utils_module.envs = envs_module + utils_module.get_logger = _get_logger + sys.modules["fastdeploy.utils"] = utils_module + sys.modules["fastdeploy.utils.envs"] = envs_module + fastdeploy_pkg.utils = utils_module + + transfer_factory = types.ModuleType("fastdeploy.cache_manager.transfer_factory") + transfer_factory.IPCCommManager = _IPCCommManager + transfer_factory.RDMACommManager = _RDMACommManager + sys.modules["fastdeploy.cache_manager.transfer_factory"] = transfer_factory + + config_module = types.ModuleType("fastdeploy.config") + + class _SpeculativeConfig: + def __init__(self, config_dict): + self.num_extra_cache_layer = config_dict.get("num_extra_cache_layer", 0) + self.num_gpu_block_expand_ratio = config_dict.get("num_gpu_block_expand_ratio", 0) + + config_module.SpeculativeConfig = _SpeculativeConfig + sys.modules["fastdeploy.config"] = config_module + fastdeploy_pkg.config = config_module + + inter_comm_module = types.ModuleType("fastdeploy.inter_communicator") + inter_comm_module.EngineWorkerQueue = _EngineWorkerQueue + inter_comm_module.IPCSignal = _IPCSignal + inter_comm_module.shared_memory_exists = lambda _name: False + sys.modules["fastdeploy.inter_communicator"] = inter_comm_module + + ops_gpu_module = types.ModuleType("fastdeploy.model_executor.ops.gpu") + + def _get_output_kv_signal(buffer, rank_id, flag): # pylint: disable=unused-argument + sequence = getattr(_get_output_kv_signal, "sequence", None) + if not sequence: + raise SystemExit("kv signal stop") + + step = sequence.pop(0) + if step.get("stop"): + raise SystemExit("kv signal stop") + + data = buffer.numpy() + data.fill(-1) + tasks = step.get("tasks", -1) + data[0] = tasks + if tasks == -1: + return + data[1] = step.get("layer", 0) + data[2] = step.get("engine", 0) + data[3] = step.get("offset", 0) + data[4] = step.get("current", 0) + + ops_gpu_module.get_output_kv_signal = _get_output_kv_signal + ops_gpu_module.set_data_ipc = lambda *args, **kwargs: None + sys.modules["fastdeploy.model_executor.ops.gpu"] = ops_gpu_module + + +def _load_cache_messager(): + module_name = "fastdeploy.cache_manager.cache_messager" + if module_name in sys.modules: + return sys.modules[module_name] + + _install_dependency_stubs() + + spec = importlib.util.spec_from_file_location( + module_name, PROJECT_ROOT / "fastdeploy" / "cache_manager" / "cache_messager.py" + ) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + sys.modules[module_name] = module + return module + + +def _make_cache_tensors(num_layers, dtype="bfloat16"): + cache = {} + for layer in range(num_layers): + cache[f"key_caches_{layer}_rank0_device0"] = _FakeTensor( + np.zeros((2, 3, 4, 5)), dtype=dtype + ) + cache[f"value_caches_{layer}_rank0_device0"] = _FakeTensor( + np.zeros((2, 3, 4, 5)), dtype=dtype + ) + return cache + + +class CacheMessagerInitTest(unittest.TestCase): + def setUp(self): + self.module = _load_cache_messager() + envs = sys.modules["fastdeploy.utils.envs"] + envs.FD_ENGINE_TASK_QUEUE_WITH_SHM = False + _IPCSignal.instances.clear() + ops_gpu = sys.modules["fastdeploy.model_executor.ops.gpu"] + ops_gpu.get_output_kv_signal.sequence = [] + + def test_initializes_with_ipc_and_rdma(self): + cache = _make_cache_tensors(num_layers=2) + messager = self.module.CacheMessager( + splitwise_role="mixed", + transfer_protocol="ipc,rdma", + pod_ip="127.0.0.1", + engine_worker_queue_port=9000, + local_data_parallel_id=0, + gpu_cache_kvs=cache, + rank=0, + nranks=1, + num_layers=2, + gpu_id=0, + rdma_port=55, + ) + + self.assertIsInstance(messager.engine_worker_queue, _EngineWorkerQueue) + self.assertEqual(messager.engine_worker_queue.address, ("127.0.0.1", 9000)) + self.assertIn("ipc", messager.messager) + self.assertIn("rdma", messager.messager) + expected_block_bytes = math.prod(cache["key_caches_0_rank0_device0"].shape[1:]) * 2 + self.assertEqual(messager.block_bytes, expected_block_bytes) + + def test_shm_socket_address_and_uint8_dtype(self): + envs = sys.modules["fastdeploy.utils.envs"] + envs.FD_ENGINE_TASK_QUEUE_WITH_SHM = True + cache = _make_cache_tensors(num_layers=1, dtype="uint8") + messager = self.module.CacheMessager( + splitwise_role="mixed", + transfer_protocol="ipc", + pod_ip="127.0.0.1", + engine_worker_queue_port=9010, + local_data_parallel_id=0, + gpu_cache_kvs=cache, + rank=0, + nranks=1, + num_layers=1, + gpu_id=0, + ) + + self.assertTrue(str(messager.engine_worker_queue.address).startswith("/dev/shm/fd_task_queue_")) + expected_block_bytes = math.prod(cache["key_caches_0_rank0_device0"].shape[1:]) + self.assertEqual(messager.block_bytes, expected_block_bytes) + + +class PrefillThreadTest(unittest.TestCase): + def setUp(self): + self.module = _load_cache_messager() + envs = sys.modules["fastdeploy.utils.envs"] + envs.FD_ENGINE_TASK_QUEUE_WITH_SHM = False + _IPCSignal.instances.clear() + ops_gpu = sys.modules["fastdeploy.model_executor.ops.gpu"] + ops_gpu.get_output_kv_signal.sequence = [] + + def test_prefill_thread_transfers_and_marks_finished(self): + cache = _make_cache_tensors(num_layers=1) + messager = self.module.CacheMessager( + splitwise_role="mixed", + transfer_protocol="ipc", + pod_ip="127.0.0.1", + engine_worker_queue_port=9001, + local_data_parallel_id=0, + gpu_cache_kvs=cache, + rank=0, + nranks=1, + num_layers=1, + gpu_id=0, + ) + + queue = messager.engine_worker_queue + queue.cache_info_sequence = [ + [ + { + "request_id": "req-1", + "transfer_protocol": "ipc", + "src_block_ids": [0, 1], + "dest_block_ids": [2, 3], + "current_id": 0, + "status": "init", + "layer_idx": 0, + "device_ids": {0: 0}, + } + ] + ] + queue.stop_after_cache_info = True + + def _set_signals(instance): + step_key = f"splitwise_complete_prefilled_step_{instance.rank_id}_{instance.gpu_id}" + layer_key = f"splitwise_complete_prefilled_layer_{instance.rank_id}_{instance.gpu_id}" + _IPCSignal.instances[step_key].value[0] = 0 + _IPCSignal.instances[layer_key].value[0] = 0 + + queue.signal_initializer = lambda: _set_signals(messager) + + with self.assertRaises(SystemExit): + messager.prefill_layerwise_send_cache_thread() + + self.assertEqual(queue.finish_send_cache_barrier.wait_calls, 1) + self.assertEqual(queue.finished_requests, [[["req-1", "finished"]]]) + self.assertEqual( + messager.messager["ipc"].write_calls, + [("0.0.0.0", 0, (0, 1), (2, 3), 0)], + ) + + +class HandleConnectTaskTest(unittest.TestCase): + def setUp(self): + self.module = _load_cache_messager() + envs = sys.modules["fastdeploy.utils.envs"] + envs.FD_ENGINE_TASK_QUEUE_WITH_SHM = False + _IPCSignal.instances.clear() + ops_gpu = sys.modules["fastdeploy.model_executor.ops.gpu"] + ops_gpu.get_output_kv_signal.sequence = [] + + def test_handle_connect_task_success_and_failure(self): + cache = _make_cache_tensors(num_layers=1) + messager = self.module.CacheMessager( + splitwise_role="decode", + transfer_protocol="rdma", + pod_ip="127.0.0.1", + engine_worker_queue_port=9002, + local_data_parallel_id=0, + gpu_cache_kvs=cache, + rank=0, + nranks=1, + num_layers=1, + gpu_id=0, + rdma_port=88, + ) + + rdma_manager = messager.messager["rdma"] + rdma_manager.connect_results = [True, False] + + queue = messager.engine_worker_queue + queue.connect_tasks = [ + { + "task_id": 1, + "ip": "10.0.0.1", + "rdma_ports": {0: 7}, + }, + { + "task_id": 2, + "ip": "10.0.0.2", + "rdma_ports": {0: 9}, + }, + ] + queue.stop_after_connect_tasks = True + + with self.assertRaises(SystemExit): + messager._handle_connect_task() + + self.assertEqual( + queue.connect_responses, + [ + {"task_id": 1, "success": True}, + {"task_id": 2, "success": False}, + ], + ) + + +class CacheMessagerV1Test(unittest.TestCase): + def setUp(self): + self.module = _load_cache_messager() + envs = sys.modules["fastdeploy.utils.envs"] + envs.FD_ENGINE_TASK_QUEUE_WITH_SHM = False + _IPCSignal.instances.clear() + ops_gpu = sys.modules["fastdeploy.model_executor.ops.gpu"] + ops_gpu.get_output_kv_signal.sequence = [] + + def test_consume_signals_populates_queue(self): + cache = _make_cache_tensors(num_layers=1) + envs = sys.modules["fastdeploy.utils.envs"] + envs.ENABLE_V1_KVCACHE_SCHEDULER = True + + with mock.patch("threading.Thread") as thread_cls: + def _fake_thread(*_args, **_kwargs): + return types.SimpleNamespace(start=lambda: None) + + thread_cls.side_effect = _fake_thread + messager = self.module.CacheMessagerV1( + splitwise_role="prefill", + transfer_protocol="ipc", + pod_ip="127.0.0.1", + engine_worker_queue_port=9003, + local_data_parallel_id=0, + gpu_cache_kvs=cache, + rank=0, + nranks=1, + num_layers=1, + gpu_id=0, + ) + + ops_gpu = sys.modules["fastdeploy.model_executor.ops.gpu"] + ops_gpu.get_output_kv_signal.sequence = [ + {"tasks": -1}, + {"tasks": 1, "layer": 0, "engine": 0, "offset": 0, "current": 4}, + {"stop": True}, + ] + messager.cache_info = {"req": {"status": "init"}} + + with self.assertRaises(SystemExit): + messager.consume_signals() + + queued = messager.cache_prefilled_engine_ids_queue.get_nowait() + self.assertEqual(queued, [(0, 4)]) + + def test_add_cache_task_thread_updates_state(self): + cache = _make_cache_tensors(num_layers=1) + envs = sys.modules["fastdeploy.utils.envs"] + envs.ENABLE_V1_KVCACHE_SCHEDULER = True + + with mock.patch("threading.Thread") as thread_cls: + def _fake_thread(*_args, **_kwargs): + return types.SimpleNamespace(start=lambda: None) + + thread_cls.side_effect = _fake_thread + messager = self.module.CacheMessagerV1( + splitwise_role="prefill", + transfer_protocol="ipc", + pod_ip="127.0.0.1", + engine_worker_queue_port=9006, + local_data_parallel_id=0, + gpu_cache_kvs=cache, + rank=0, + nranks=1, + num_layers=1, + gpu_id=0, + ) + + messager.cache_info = { + "req-existing": { + "request_id": "req-existing", + "src_block_ids": [0, 1, 2, 3], + "dest_block_ids": [0, 1], + "current_id": 5, + "transfer_protocol": "ipc", + "status": "pending", + "rdma_ports": {0: 0}, + } + } + + queue = messager.engine_worker_queue + queue.cache_info_sequence = [ + [ + { + "request_id": "req-existing", + "src_block_ids": [0, 1, 2, 3], + "dest_block_ids": [0, 1], + "current_id": 5, + "transfer_protocol": "ipc", + }, + { + "request_id": "req-new", + "src_block_ids": [10, 11], + "dest_block_ids": [12, 13], + "current_id": 7, + "transfer_protocol": "rdma", + "status": "pending", + "ip": "10.0.0.5", + "rdma_ports": {0: 4}, + "device_ids": {0: 1}, + }, + ] + ] + queue.stop_after_cache_info = True + + with self.assertRaises(SystemExit): + messager._add_cache_task_thread() + + self.assertEqual(queue.cache_info_barrier.wait_calls, 1) + self.assertEqual(queue.finish_add_cache_task_barrier.wait_calls, 1) + self.assertEqual(queue.finished_add_cache_task_req, [["req-existing"]]) + updated = messager.cache_info["req-existing"] + self.assertEqual(updated["decode_cached_tokens"], 2 * messager.block_size) + self.assertEqual(updated["sended_block_num"], 2) + self.assertIn(5, messager.idx_cache_task_dict) + self.assertIn("req-new", messager.cache_info) + + def test_prefill_layerwise_send_cache_thread_finishes_request(self): + cache = _make_cache_tensors(num_layers=1) + envs = sys.modules["fastdeploy.utils.envs"] + envs.ENABLE_V1_KVCACHE_SCHEDULER = True + + with mock.patch("threading.Thread") as thread_cls: + def _fake_thread(*_args, **_kwargs): + return types.SimpleNamespace(start=lambda: None) + + thread_cls.side_effect = _fake_thread + messager = self.module.CacheMessagerV1( + splitwise_role="prefill", + transfer_protocol="ipc", + pod_ip="127.0.0.1", + engine_worker_queue_port=9007, + local_data_parallel_id=0, + gpu_cache_kvs=cache, + rank=0, + nranks=1, + num_layers=1, + gpu_id=0, + ) + + class _QueueStub: + def __init__(self, payloads): + self._payloads = list(payloads) + + def get(self): + if not self._payloads: + raise SystemExit("stop prefill v1") + return self._payloads.pop(0) + + task = { + "request_id": "req-1", + "transfer_protocol": "ipc", + "device_ids": {0: 0}, + "rdma_ports": {0: 0}, + "src_block_ids": [0, 1], + "dest_block_ids": [2, 3], + "status": "init", + "sended_layer_id": -1, + "sended_block_num": 0, + "current_id": 0, + "need_prefill_tokens": 4, + } + + messager.idx_cache_task_dict = {0: task} + messager.cache_info = {"req-1": task} + messager.engine_cache_tasks[0] = {"prefilled_layer_idx": 0, "prefilled_token_num": 4} + messager.cache_prefilled_engine_ids_queue = _QueueStub([[(0, 4)]]) + + with self.assertRaises(SystemExit): + messager.prefill_layerwise_send_cache_thread() + + queue = messager.engine_worker_queue + self.assertEqual(queue.begin_send_cache_barrier.wait_calls, 1) + self.assertEqual(queue.finish_send_cache_barrier.wait_calls, 1) + self.assertEqual(queue.finished_requests, [[["req-1", "finished"]]]) + self.assertEqual(messager.messager["ipc"].sync_targets, [0]) + self.assertNotIn("req-1", messager.cache_info) + + +class CacheMessagerV1ConnectTest(unittest.TestCase): + def setUp(self): + self.module = _load_cache_messager() + envs = sys.modules["fastdeploy.utils.envs"] + envs.FD_ENGINE_TASK_QUEUE_WITH_SHM = False + _IPCSignal.instances.clear() + ops_gpu = sys.modules["fastdeploy.model_executor.ops.gpu"] + ops_gpu.get_output_kv_signal.sequence = [] + + def test_handle_connect_task_rdma_paths(self): + cache = _make_cache_tensors(num_layers=1) + with mock.patch("threading.Thread") as thread_cls: + def _fake_thread(*_args, **_kwargs): + return types.SimpleNamespace(start=lambda: None) + + thread_cls.side_effect = _fake_thread + messager = self.module.CacheMessagerV1( + splitwise_role="decode", + transfer_protocol="ipc,rdma", + pod_ip="127.0.0.1", + engine_worker_queue_port=9008, + local_data_parallel_id=0, + gpu_cache_kvs=cache, + rank=0, + nranks=1, + num_layers=1, + gpu_id=0, + ) + + rdma_manager = messager.messager["rdma"] + rdma_manager.connect_results = [True, False] + + queue = messager.engine_worker_queue + queue.connect_tasks = [ + { + "task_id": 11, + "ip": "10.0.0.1", + "rdma_ports": {0: 5}, + }, + { + "task_id": 12, + "ip": "10.0.0.2", + "rdma_ports": {0: 6}, + }, + ] + queue.stop_after_connect_tasks = True + + with self.assertRaises(SystemExit): + messager._handle_connect_task() + + self.assertEqual( + queue.connect_responses, + [ + {"task_id": 11, "success": True}, + {"task_id": 12, "success": False}, + ], + ) + +class MainEntryTest(unittest.TestCase): + def setUp(self): + self.module = _load_cache_messager() + envs = sys.modules["fastdeploy.utils.envs"] + envs.FD_ENGINE_TASK_QUEUE_WITH_SHM = False + envs.ENABLE_V1_KVCACHE_SCHEDULER = False + _IPCSignal.instances.clear() + ops_gpu = sys.modules["fastdeploy.model_executor.ops.gpu"] + ops_gpu.get_output_kv_signal.sequence = [] + + def test_main_initializes_and_triggers_prefill(self): + args = types.SimpleNamespace( + splitwise_role="prefill", + device_id=0, + rank=0, + num_layers=1, + key_cache_shape="2,3,4,5", + value_cache_shape="2,3,4,5", + rdma_port=None, + mp_num=1, + pod_ip="127.0.0.1", + cache_queue_port=9004, + engine_worker_queue_port=9005, + cache_dtype="bfloat16", + speculative_config={"num_extra_cache_layer": 1, "num_gpu_block_expand_ratio": 0}, + protocol="ipc", + engine_pid="42", + local_data_parallel_id=0, + ) + self.module.args = args + + with mock.patch.object( + self.module.CacheMessager, + "prefill_layerwise_send_cache_thread", + side_effect=SystemExit("stop prefill"), + ) as prefill_mock: + with self.assertRaises(SystemExit): + self.module.main() + + prefill_mock.assert_called_once() + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/model_executor/test_tp_utils.py b/tests/model_executor/test_tp_utils.py new file mode 100644 index 00000000000..a956d7bdd5e --- /dev/null +++ b/tests/model_executor/test_tp_utils.py @@ -0,0 +1,488 @@ +"""Unit tests for tensor parallel utility helpers.""" + +from __future__ import annotations + +import importlib.util +import sys +import types +import unittest +from functools import partial +from pathlib import Path + +import numpy as np + + +PROJECT_ROOT = Path(__file__).resolve().parents[2] + + +class _DummyLogger: + def __init__(self): + self.errors = [] + + def error(self, message): + self.errors.append(message) + + def clear(self): + self.errors.clear() + + +def _ensure_module(name: str) -> types.ModuleType: + module = sys.modules.get(name) + if module is None: + module = types.ModuleType(name) + sys.modules[name] = module + return module + + +def _install_dependency_stubs(): + # Stub paddle and paddle.distributed used during module imports. + paddle = _ensure_module("paddle") + paddle.__dict__.setdefault("__version__", "0.0.0") + paddle.Tensor = np.ndarray + + def _split(array, sections, axis=0): + if isinstance(sections, int): + return np.array_split(array, sections, axis=axis) + raise NotImplementedError("sections must be an integer in tests") + + def _concat(arrays, axis=0): + return np.concatenate(list(arrays), axis=axis) + + def _to_tensor(array, dtype=None): + return np.asarray(array, dtype=dtype) + + def _get_default_dtype(): + return np.float32 + + class _CUDAPinnedPlace: + def __repr__(self): # pragma: no cover - representation helper + return "CUDAPinnedPlace()" + + paddle.split = _split + paddle.concat = _concat + paddle.to_tensor = _to_tensor + paddle.get_default_dtype = _get_default_dtype + paddle.CUDAPinnedPlace = _CUDAPinnedPlace + dist = types.ModuleType("paddle.distributed") + dist.get_world_size = lambda: 1 + dist.get_rank = lambda: 0 + dist.is_initialized = lambda: False + sys.modules["paddle.distributed"] = dist + paddle.distributed = dist + + # Stub paddleformers pieces referenced by tp_utils. + paddleformers = _ensure_module("paddleformers") + paddleformers.__path__ = [] + + transformers = types.ModuleType("paddleformers.transformers") + + class _PretrainedModel: + @classmethod + def _get_tensor_parallel_mappings(cls, *_args, **_kwargs): + return {} + + @classmethod + def _resolve_prefix_keys(cls, keys, _safetensor_keys): + return {k: k for k in keys} + + transformers.PretrainedModel = _PretrainedModel + sys.modules["paddleformers.transformers"] = transformers + paddleformers.transformers = transformers + + conversion_utils = types.ModuleType("paddleformers.transformers.conversion_utils") + + def _split_or_merge_func(is_split, tensor_parallel_degree, tensor_parallel_rank, **_kwargs): + axis = -1 + + def _fn(weight, *, is_column=True, is_naive_2fuse=False): # pylint: disable=unused-argument + current_axis = axis if is_column else 0 + if is_split: + chunks = np.array_split(weight, tensor_parallel_degree, axis=current_axis) + if tensor_parallel_rank is None: + return chunks + return chunks[tensor_parallel_rank] + return np.concatenate(weight, axis=current_axis) + + return _fn + + conversion_utils.split_or_merge_func = _split_or_merge_func + sys.modules["paddleformers.transformers.conversion_utils"] = conversion_utils + + utils_pkg = types.ModuleType("paddleformers.utils") + utils_pkg.__path__ = [] + sys.modules["paddleformers.utils"] = utils_pkg + + log_module = types.ModuleType("paddleformers.utils.log") + log_module.logger = _DummyLogger() + sys.modules["paddleformers.utils.log"] = log_module + utils_pkg.log = log_module + + # Provide a lightweight FDConfig replacement consumed by tp_utils. + fastdeploy_pkg = _ensure_module("fastdeploy") + fastdeploy_pkg.__path__ = [str(PROJECT_ROOT / "fastdeploy")] + + fd_config_module = types.ModuleType("fastdeploy.config") + + class _ParallelConfig: + def __init__(self, tensor_parallel_size): + self.tensor_parallel_size = tensor_parallel_size + + class _ModelConfig: + def __init__(self, pretrained_config): + self.pretrained_config = pretrained_config + + class FDConfig: + def __init__(self, tensor_parallel_size=1, pretrained_config=None): + self.parallel_config = _ParallelConfig(tensor_parallel_size) + self.model_config = _ModelConfig(pretrained_config) + + fd_config_module.FDConfig = FDConfig + sys.modules["fastdeploy.config"] = fd_config_module + fastdeploy_pkg.config = fd_config_module + + model_executor_pkg = _ensure_module("fastdeploy.model_executor") + model_executor_pkg.__path__ = [str(PROJECT_ROOT / "fastdeploy" / "model_executor")] + models_pkg = _ensure_module("fastdeploy.model_executor.models") + models_pkg.__path__ = [str(PROJECT_ROOT / "fastdeploy" / "model_executor" / "models")] + + # Load the real utils module so enums are shared with production code. + utils_name = "fastdeploy.model_executor.models.utils" + if utils_name not in sys.modules: + utils_spec = importlib.util.spec_from_file_location( + utils_name, PROJECT_ROOT / "fastdeploy" / "model_executor" / "models" / "utils.py" + ) + utils_module = importlib.util.module_from_spec(utils_spec) + utils_spec.loader.exec_module(utils_module) + sys.modules[utils_name] = utils_module + models_pkg.utils = utils_module + + +def _load_tp_utils(): + module_name = "fastdeploy.model_executor.models.tp_utils" + if module_name in sys.modules: + return sys.modules[module_name] + + _install_dependency_stubs() + + spec = importlib.util.spec_from_file_location( + module_name, PROJECT_ROOT / "fastdeploy" / "model_executor" / "models" / "tp_utils.py" + ) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + sys.modules[module_name] = module + + parent = sys.modules["fastdeploy.model_executor.models"] + parent.tp_utils = module + return module + + +_tp_utils = _load_tp_utils() +_logger = sys.modules["paddleformers.utils.log"].logger + + +class CheckTensorParallelPrerequisitesTest(unittest.TestCase): + def setUp(self): + _logger.clear() + + def test_tensor_parallel_disabled_noop(self): + cfg = sys.modules["fastdeploy.config"].FDConfig(tensor_parallel_size=1, pretrained_config={}) + filtered = {} + + _tp_utils.check_tensor_parallel_prerequisites( + cfg, _tp_utils.PretrainedModel, filtered, safetensor_keys=[] + ) + + self.assertEqual(filtered, {}) + self.assertEqual(_logger.errors, []) + + def test_tensor_parallel_mappings_populated(self): + calls = {"is_split": [], "keys": None, "safetensor": None} + + class _PopulatedModel(_tp_utils.PretrainedModel): + @classmethod + def _get_tensor_parallel_mappings(cls, _config, is_split=True): + calls["is_split"].append(is_split) + return {"encoder": partial(lambda prefix, value: (prefix, value), "encoder")} + + @classmethod + def _resolve_prefix_keys(cls, keys, safetensor_keys): + calls["keys"] = tuple(keys) + calls["safetensor"] = tuple(safetensor_keys) + return {"encoder": "encoder.layer.weight"} + + cfg = sys.modules["fastdeploy.config"].FDConfig(tensor_parallel_size=2, pretrained_config={}) + filtered = {} + + _tp_utils.check_tensor_parallel_prerequisites( + cfg, + _PopulatedModel, + filtered, + safetensor_keys=["encoder.layer.weight", "decoder.layer.weight"], + ) + + self.assertEqual(list(filtered.keys()), ["encoder.layer.weight"]) + self.assertEqual(filtered["encoder.layer.weight"]("data"), ("encoder", "data")) + self.assertEqual(_logger.errors, []) + self.assertEqual(calls["is_split"], [True]) + self.assertEqual(calls["keys"], ("encoder",)) + self.assertEqual(calls["safetensor"], ("encoder.layer.weight", "decoder.layer.weight")) + + def test_missing_tensor_parallel_map_logs_error(self): + class _EmptyModel(_tp_utils.PretrainedModel): + @classmethod + def _get_tensor_parallel_mappings(cls, *_args, **_kwargs): + return {} + + cfg = sys.modules["fastdeploy.config"].FDConfig(tensor_parallel_size=4, pretrained_config={}) + filtered = {} + + _tp_utils.check_tensor_parallel_prerequisites( + cfg, _EmptyModel, filtered, safetensor_keys=["encoder.layer.weight"] + ) + + self.assertEqual(filtered, {}) + self.assertTrue(any("filtered_quant_map" in msg for msg in _logger.errors)) + + def test_inconsistent_tensor_parallel_keys_logs_error(self): + class _InconsistentModel(_tp_utils.PretrainedModel): + @classmethod + def _get_tensor_parallel_mappings(cls, *_args, **_kwargs): + return {"encoder": partial(lambda: None)} + + @classmethod + def _resolve_prefix_keys(cls, keys, safetensor_keys): + return {} + + cfg = sys.modules["fastdeploy.config"].FDConfig(tensor_parallel_size=8, pretrained_config={}) + filtered = {} + + _tp_utils.check_tensor_parallel_prerequisites( + cfg, _InconsistentModel, filtered, safetensor_keys=["encoder.layer.weight"] + ) + + self.assertEqual(filtered, {}) + self.assertTrue(any("tensor_parallel_filtered_map" in msg for msg in _logger.errors)) + + +class HelperFunctionTest(unittest.TestCase): + def test_extract_prefix_variants(self): + self.assertEqual(_tp_utils.extract_prefix("layer.weight"), "layer") + self.assertEqual(_tp_utils.extract_prefix("bias"), "") + self.assertEqual(_tp_utils.extract_prefix(".hidden"), "") + + def test_has_prefix(self): + self.assertTrue(_tp_utils.has_prefix("layer", "layer.weight")) + self.assertFalse(_tp_utils.has_prefix("layer", "other.weight")) + + def test_extract_placeholders(self): + placeholders = _tp_utils.extract_placeholders("proj.{layer_id}.weight") + self.assertEqual(placeholders, {"layer_id"}) + + def test_safe_dict_preserves_unknown(self): + mapping = _tp_utils.SafeDict({"known": "value"}) + self.assertEqual(mapping["known"], "value") + self.assertEqual(mapping["missing"], "{missing}") + + def test_has_placeholders(self): + self.assertTrue(_tp_utils.has_placeholders({"a"})) + self.assertFalse(_tp_utils.has_placeholders(set())) + + def test_update_final_actions_formats_keys(self): + final_actions = {} + _tp_utils.update_final_actions({"layer_id": 3}, final_actions, "proj.{layer_id}", "action") + self.assertEqual(final_actions, {"proj.3": "action"}) + + +class BuildExpandedKeysTest(unittest.TestCase): + def test_no_placeholder_keys_pass_through(self): + actions = {"weight": "copy"} + expanded = _tp_utils.build_expanded_keys(actions, num_layers=2) + self.assertEqual(expanded, actions) + + def test_layer_id_placeholder(self): + actions = {"layer.{layer_id}.weight": "split"} + expanded = _tp_utils.build_expanded_keys(actions, num_layers=3) + expected = { + "layer.0.weight": "split", + "layer.1.weight": "split", + "layer.2.weight": "split", + } + self.assertEqual(expanded, expected) + + def test_ffn_layer_id_requires_start(self): + actions = {"ffn.{ffn_layer_id}.weight": "split"} + expanded = _tp_utils.build_expanded_keys(actions, num_layers=4, start_layer=3) + expected = { + "ffn.0.weight": "split", + "ffn.1.weight": "split", + "ffn.2.weight": "split", + } + self.assertEqual(expanded, expected) + + def test_moe_layer_and_expert_id(self): + actions = {"moe.{moe_layer_id}.expert.{export_id}": "dispatch"} + expanded = _tp_utils.build_expanded_keys( + actions, num_layers=4, start_layer=1, num_experts=2 + ) + expected_keys = { + "moe.1.expert.0", + "moe.1.expert.1", + "moe.2.expert.0", + "moe.2.expert.1", + "moe.3.expert.0", + "moe.3.expert.1", + } + self.assertEqual(set(expanded.keys()), expected_keys) + self.assertTrue(all(value == "dispatch" for value in expanded.values())) + + def test_moe_layer_and_text_expert_id(self): + actions = {"moe.{moe_layer_id}.text.{text_export_id}": "dispatch"} + expanded = _tp_utils.build_expanded_keys( + actions, num_layers=3, start_layer=0, text_num_experts=2 + ) + expected_keys = { + "moe.0.text.0", + "moe.0.text.1", + "moe.1.text.0", + "moe.1.text.1", + "moe.2.text.0", + "moe.2.text.1", + } + self.assertEqual(set(expanded.keys()), expected_keys) + + def test_moe_layer_and_image_expert_id(self): + actions = {"moe.{moe_layer_id}.img.{img_export_id}": "dispatch"} + expanded = _tp_utils.build_expanded_keys( + actions, + num_layers=2, + start_layer=0, + text_num_experts=1, + img_num_experts=2, + ) + expected_keys = { + "moe.0.img.1", + "moe.0.img.2", + "moe.1.img.1", + "moe.1.img.2", + } + self.assertEqual(set(expanded.keys()), expected_keys) + + def test_moe_layer_only(self): + actions = {"moe.{moe_layer_id}.shared": "collect"} + expanded = _tp_utils.build_expanded_keys(actions, num_layers=4, start_layer=2) + self.assertEqual( + expanded, + { + "moe.2.shared": "collect", + "moe.3.shared": "collect", + }, + ) + + def test_invalid_placeholder_raises(self): + actions = {"unsupported.{unknown}": "noop"} + with self.assertRaises(ValueError): + _tp_utils.build_expanded_keys(actions, num_layers=1) + + +class GQATensorOpsTest(unittest.TestCase): + def test_gqa_split_returns_all_partitions(self): + func = _tp_utils.gqa_qkv_split_func( + tensor_parallel_degree=2, + tensor_parallel_rank=None, + num_attention_heads=4, + num_key_value_heads=2, + head_dim=1, + ) + weights = np.arange(8, dtype=np.float32) + shards = func(weights, is_column=True) + + self.assertEqual(len(shards), 2) + np.testing.assert_array_equal(shards[0], np.array([0, 1, 4, 6], dtype=np.float32)) + np.testing.assert_array_equal(shards[1], np.array([2, 3, 5, 7], dtype=np.float32)) + + def test_gqa_split_with_rank_and_repeat_kv(self): + func = _tp_utils.gqa_qkv_split_func( + tensor_parallel_degree=2, + tensor_parallel_rank=1, + num_attention_heads=2, + num_key_value_heads=1, + head_dim=2, + ) + weights = np.arange(8, dtype=np.float32) + shard = func(weights, is_column=True) + np.testing.assert_array_equal(shard, np.array([2, 3, 4, 5, 6, 7], dtype=np.float32)) + + def test_gqa_split_on_matrix_rows(self): + func = _tp_utils.gqa_qkv_split_func( + tensor_parallel_degree=2, + tensor_parallel_rank=None, + num_attention_heads=4, + num_key_value_heads=2, + head_dim=1, + ) + weights = np.arange(16, dtype=np.float32).reshape(2, 8) + shards = func(weights, is_column=False) + self.assertEqual(len(shards), 2) + np.testing.assert_array_equal( + shards[0], np.array([[0, 1, 2, 3, 4, 5, 6, 7]], dtype=np.float32) + ) + + def test_gqa_merge_reconstructs_weights(self): + weight_list = [ + np.array([0, 1, 4, 6], dtype=np.float32), + np.array([2, 3, 5, 7], dtype=np.float32), + ] + merge = _tp_utils.gqa_qkv_merge_func( + num_attention_heads=4, num_key_value_heads=2, head_dim=1 + ) + merged = merge(weight_list, is_column=True) + np.testing.assert_array_equal(merged, np.arange(8, dtype=np.float32)) + + def test_split_or_merge_qkv_dispatch(self): + weights = np.arange(8, dtype=np.float32) + split = _tp_utils.split_or_merge_qkv_func( + True, 2, None, 4, 2, 1 + ) + shards = split(weights, is_column=True) + merge = _tp_utils.split_or_merge_qkv_func( + False, 2, None, 4, 2, 1 + ) + restored = merge(shards, is_column=True) + np.testing.assert_array_equal(restored, weights) + + def test_split_or_merge_func_v1_row_bias(self): + fn = _tp_utils.split_or_merge_func_v1( + is_split=True, + tensor_parallel_degree=4, + tensor_parallel_rank=0, + ) + bias = np.ones(4, dtype=np.float32) + scaled = fn(bias, is_tp_row_bias=True) + np.testing.assert_array_equal(scaled, np.ones(4, dtype=np.float32) / 4) + + def test_split_or_merge_func_v1_gqa_path(self): + fn = _tp_utils.split_or_merge_func_v1( + is_split=True, + tensor_parallel_degree=2, + tensor_parallel_rank=None, + num_attention_heads=4, + num_key_value_heads=2, + head_dim=1, + ) + weights = np.arange(8, dtype=np.float32).reshape(2, 4) + shards = fn(weights, is_gqa=True, is_column=False) + self.assertEqual(len(shards), 2) + + def test_split_or_merge_func_v1_default_path(self): + fn = _tp_utils.split_or_merge_func_v1( + is_split=False, + tensor_parallel_degree=2, + tensor_parallel_rank=None, + num_attention_heads=4, + ) + parts = [np.array([0, 1], dtype=np.float32), np.array([2, 3], dtype=np.float32)] + merged = fn(parts, is_column=True, is_naive_2fuse=True) + np.testing.assert_array_equal(merged, np.array([0, 1, 2, 3], dtype=np.float32)) + +if __name__ == "__main__": # pragma: no cover - entry point for python -m unittest + unittest.main() From 9182521cd3e581e18fd129b0208dd5ef58d4e105 Mon Sep 17 00:00:00 2001 From: xunyoyo <33387866+xunyoyo@users.noreply.github.com> Date: Thu, 13 Nov 2025 01:55:04 +0800 Subject: [PATCH 2/2] Delete tests/model_executor/test_tp_utils.py --- tests/model_executor/test_tp_utils.py | 488 -------------------------- 1 file changed, 488 deletions(-) delete mode 100644 tests/model_executor/test_tp_utils.py diff --git a/tests/model_executor/test_tp_utils.py b/tests/model_executor/test_tp_utils.py deleted file mode 100644 index a956d7bdd5e..00000000000 --- a/tests/model_executor/test_tp_utils.py +++ /dev/null @@ -1,488 +0,0 @@ -"""Unit tests for tensor parallel utility helpers.""" - -from __future__ import annotations - -import importlib.util -import sys -import types -import unittest -from functools import partial -from pathlib import Path - -import numpy as np - - -PROJECT_ROOT = Path(__file__).resolve().parents[2] - - -class _DummyLogger: - def __init__(self): - self.errors = [] - - def error(self, message): - self.errors.append(message) - - def clear(self): - self.errors.clear() - - -def _ensure_module(name: str) -> types.ModuleType: - module = sys.modules.get(name) - if module is None: - module = types.ModuleType(name) - sys.modules[name] = module - return module - - -def _install_dependency_stubs(): - # Stub paddle and paddle.distributed used during module imports. - paddle = _ensure_module("paddle") - paddle.__dict__.setdefault("__version__", "0.0.0") - paddle.Tensor = np.ndarray - - def _split(array, sections, axis=0): - if isinstance(sections, int): - return np.array_split(array, sections, axis=axis) - raise NotImplementedError("sections must be an integer in tests") - - def _concat(arrays, axis=0): - return np.concatenate(list(arrays), axis=axis) - - def _to_tensor(array, dtype=None): - return np.asarray(array, dtype=dtype) - - def _get_default_dtype(): - return np.float32 - - class _CUDAPinnedPlace: - def __repr__(self): # pragma: no cover - representation helper - return "CUDAPinnedPlace()" - - paddle.split = _split - paddle.concat = _concat - paddle.to_tensor = _to_tensor - paddle.get_default_dtype = _get_default_dtype - paddle.CUDAPinnedPlace = _CUDAPinnedPlace - dist = types.ModuleType("paddle.distributed") - dist.get_world_size = lambda: 1 - dist.get_rank = lambda: 0 - dist.is_initialized = lambda: False - sys.modules["paddle.distributed"] = dist - paddle.distributed = dist - - # Stub paddleformers pieces referenced by tp_utils. - paddleformers = _ensure_module("paddleformers") - paddleformers.__path__ = [] - - transformers = types.ModuleType("paddleformers.transformers") - - class _PretrainedModel: - @classmethod - def _get_tensor_parallel_mappings(cls, *_args, **_kwargs): - return {} - - @classmethod - def _resolve_prefix_keys(cls, keys, _safetensor_keys): - return {k: k for k in keys} - - transformers.PretrainedModel = _PretrainedModel - sys.modules["paddleformers.transformers"] = transformers - paddleformers.transformers = transformers - - conversion_utils = types.ModuleType("paddleformers.transformers.conversion_utils") - - def _split_or_merge_func(is_split, tensor_parallel_degree, tensor_parallel_rank, **_kwargs): - axis = -1 - - def _fn(weight, *, is_column=True, is_naive_2fuse=False): # pylint: disable=unused-argument - current_axis = axis if is_column else 0 - if is_split: - chunks = np.array_split(weight, tensor_parallel_degree, axis=current_axis) - if tensor_parallel_rank is None: - return chunks - return chunks[tensor_parallel_rank] - return np.concatenate(weight, axis=current_axis) - - return _fn - - conversion_utils.split_or_merge_func = _split_or_merge_func - sys.modules["paddleformers.transformers.conversion_utils"] = conversion_utils - - utils_pkg = types.ModuleType("paddleformers.utils") - utils_pkg.__path__ = [] - sys.modules["paddleformers.utils"] = utils_pkg - - log_module = types.ModuleType("paddleformers.utils.log") - log_module.logger = _DummyLogger() - sys.modules["paddleformers.utils.log"] = log_module - utils_pkg.log = log_module - - # Provide a lightweight FDConfig replacement consumed by tp_utils. - fastdeploy_pkg = _ensure_module("fastdeploy") - fastdeploy_pkg.__path__ = [str(PROJECT_ROOT / "fastdeploy")] - - fd_config_module = types.ModuleType("fastdeploy.config") - - class _ParallelConfig: - def __init__(self, tensor_parallel_size): - self.tensor_parallel_size = tensor_parallel_size - - class _ModelConfig: - def __init__(self, pretrained_config): - self.pretrained_config = pretrained_config - - class FDConfig: - def __init__(self, tensor_parallel_size=1, pretrained_config=None): - self.parallel_config = _ParallelConfig(tensor_parallel_size) - self.model_config = _ModelConfig(pretrained_config) - - fd_config_module.FDConfig = FDConfig - sys.modules["fastdeploy.config"] = fd_config_module - fastdeploy_pkg.config = fd_config_module - - model_executor_pkg = _ensure_module("fastdeploy.model_executor") - model_executor_pkg.__path__ = [str(PROJECT_ROOT / "fastdeploy" / "model_executor")] - models_pkg = _ensure_module("fastdeploy.model_executor.models") - models_pkg.__path__ = [str(PROJECT_ROOT / "fastdeploy" / "model_executor" / "models")] - - # Load the real utils module so enums are shared with production code. - utils_name = "fastdeploy.model_executor.models.utils" - if utils_name not in sys.modules: - utils_spec = importlib.util.spec_from_file_location( - utils_name, PROJECT_ROOT / "fastdeploy" / "model_executor" / "models" / "utils.py" - ) - utils_module = importlib.util.module_from_spec(utils_spec) - utils_spec.loader.exec_module(utils_module) - sys.modules[utils_name] = utils_module - models_pkg.utils = utils_module - - -def _load_tp_utils(): - module_name = "fastdeploy.model_executor.models.tp_utils" - if module_name in sys.modules: - return sys.modules[module_name] - - _install_dependency_stubs() - - spec = importlib.util.spec_from_file_location( - module_name, PROJECT_ROOT / "fastdeploy" / "model_executor" / "models" / "tp_utils.py" - ) - module = importlib.util.module_from_spec(spec) - spec.loader.exec_module(module) - sys.modules[module_name] = module - - parent = sys.modules["fastdeploy.model_executor.models"] - parent.tp_utils = module - return module - - -_tp_utils = _load_tp_utils() -_logger = sys.modules["paddleformers.utils.log"].logger - - -class CheckTensorParallelPrerequisitesTest(unittest.TestCase): - def setUp(self): - _logger.clear() - - def test_tensor_parallel_disabled_noop(self): - cfg = sys.modules["fastdeploy.config"].FDConfig(tensor_parallel_size=1, pretrained_config={}) - filtered = {} - - _tp_utils.check_tensor_parallel_prerequisites( - cfg, _tp_utils.PretrainedModel, filtered, safetensor_keys=[] - ) - - self.assertEqual(filtered, {}) - self.assertEqual(_logger.errors, []) - - def test_tensor_parallel_mappings_populated(self): - calls = {"is_split": [], "keys": None, "safetensor": None} - - class _PopulatedModel(_tp_utils.PretrainedModel): - @classmethod - def _get_tensor_parallel_mappings(cls, _config, is_split=True): - calls["is_split"].append(is_split) - return {"encoder": partial(lambda prefix, value: (prefix, value), "encoder")} - - @classmethod - def _resolve_prefix_keys(cls, keys, safetensor_keys): - calls["keys"] = tuple(keys) - calls["safetensor"] = tuple(safetensor_keys) - return {"encoder": "encoder.layer.weight"} - - cfg = sys.modules["fastdeploy.config"].FDConfig(tensor_parallel_size=2, pretrained_config={}) - filtered = {} - - _tp_utils.check_tensor_parallel_prerequisites( - cfg, - _PopulatedModel, - filtered, - safetensor_keys=["encoder.layer.weight", "decoder.layer.weight"], - ) - - self.assertEqual(list(filtered.keys()), ["encoder.layer.weight"]) - self.assertEqual(filtered["encoder.layer.weight"]("data"), ("encoder", "data")) - self.assertEqual(_logger.errors, []) - self.assertEqual(calls["is_split"], [True]) - self.assertEqual(calls["keys"], ("encoder",)) - self.assertEqual(calls["safetensor"], ("encoder.layer.weight", "decoder.layer.weight")) - - def test_missing_tensor_parallel_map_logs_error(self): - class _EmptyModel(_tp_utils.PretrainedModel): - @classmethod - def _get_tensor_parallel_mappings(cls, *_args, **_kwargs): - return {} - - cfg = sys.modules["fastdeploy.config"].FDConfig(tensor_parallel_size=4, pretrained_config={}) - filtered = {} - - _tp_utils.check_tensor_parallel_prerequisites( - cfg, _EmptyModel, filtered, safetensor_keys=["encoder.layer.weight"] - ) - - self.assertEqual(filtered, {}) - self.assertTrue(any("filtered_quant_map" in msg for msg in _logger.errors)) - - def test_inconsistent_tensor_parallel_keys_logs_error(self): - class _InconsistentModel(_tp_utils.PretrainedModel): - @classmethod - def _get_tensor_parallel_mappings(cls, *_args, **_kwargs): - return {"encoder": partial(lambda: None)} - - @classmethod - def _resolve_prefix_keys(cls, keys, safetensor_keys): - return {} - - cfg = sys.modules["fastdeploy.config"].FDConfig(tensor_parallel_size=8, pretrained_config={}) - filtered = {} - - _tp_utils.check_tensor_parallel_prerequisites( - cfg, _InconsistentModel, filtered, safetensor_keys=["encoder.layer.weight"] - ) - - self.assertEqual(filtered, {}) - self.assertTrue(any("tensor_parallel_filtered_map" in msg for msg in _logger.errors)) - - -class HelperFunctionTest(unittest.TestCase): - def test_extract_prefix_variants(self): - self.assertEqual(_tp_utils.extract_prefix("layer.weight"), "layer") - self.assertEqual(_tp_utils.extract_prefix("bias"), "") - self.assertEqual(_tp_utils.extract_prefix(".hidden"), "") - - def test_has_prefix(self): - self.assertTrue(_tp_utils.has_prefix("layer", "layer.weight")) - self.assertFalse(_tp_utils.has_prefix("layer", "other.weight")) - - def test_extract_placeholders(self): - placeholders = _tp_utils.extract_placeholders("proj.{layer_id}.weight") - self.assertEqual(placeholders, {"layer_id"}) - - def test_safe_dict_preserves_unknown(self): - mapping = _tp_utils.SafeDict({"known": "value"}) - self.assertEqual(mapping["known"], "value") - self.assertEqual(mapping["missing"], "{missing}") - - def test_has_placeholders(self): - self.assertTrue(_tp_utils.has_placeholders({"a"})) - self.assertFalse(_tp_utils.has_placeholders(set())) - - def test_update_final_actions_formats_keys(self): - final_actions = {} - _tp_utils.update_final_actions({"layer_id": 3}, final_actions, "proj.{layer_id}", "action") - self.assertEqual(final_actions, {"proj.3": "action"}) - - -class BuildExpandedKeysTest(unittest.TestCase): - def test_no_placeholder_keys_pass_through(self): - actions = {"weight": "copy"} - expanded = _tp_utils.build_expanded_keys(actions, num_layers=2) - self.assertEqual(expanded, actions) - - def test_layer_id_placeholder(self): - actions = {"layer.{layer_id}.weight": "split"} - expanded = _tp_utils.build_expanded_keys(actions, num_layers=3) - expected = { - "layer.0.weight": "split", - "layer.1.weight": "split", - "layer.2.weight": "split", - } - self.assertEqual(expanded, expected) - - def test_ffn_layer_id_requires_start(self): - actions = {"ffn.{ffn_layer_id}.weight": "split"} - expanded = _tp_utils.build_expanded_keys(actions, num_layers=4, start_layer=3) - expected = { - "ffn.0.weight": "split", - "ffn.1.weight": "split", - "ffn.2.weight": "split", - } - self.assertEqual(expanded, expected) - - def test_moe_layer_and_expert_id(self): - actions = {"moe.{moe_layer_id}.expert.{export_id}": "dispatch"} - expanded = _tp_utils.build_expanded_keys( - actions, num_layers=4, start_layer=1, num_experts=2 - ) - expected_keys = { - "moe.1.expert.0", - "moe.1.expert.1", - "moe.2.expert.0", - "moe.2.expert.1", - "moe.3.expert.0", - "moe.3.expert.1", - } - self.assertEqual(set(expanded.keys()), expected_keys) - self.assertTrue(all(value == "dispatch" for value in expanded.values())) - - def test_moe_layer_and_text_expert_id(self): - actions = {"moe.{moe_layer_id}.text.{text_export_id}": "dispatch"} - expanded = _tp_utils.build_expanded_keys( - actions, num_layers=3, start_layer=0, text_num_experts=2 - ) - expected_keys = { - "moe.0.text.0", - "moe.0.text.1", - "moe.1.text.0", - "moe.1.text.1", - "moe.2.text.0", - "moe.2.text.1", - } - self.assertEqual(set(expanded.keys()), expected_keys) - - def test_moe_layer_and_image_expert_id(self): - actions = {"moe.{moe_layer_id}.img.{img_export_id}": "dispatch"} - expanded = _tp_utils.build_expanded_keys( - actions, - num_layers=2, - start_layer=0, - text_num_experts=1, - img_num_experts=2, - ) - expected_keys = { - "moe.0.img.1", - "moe.0.img.2", - "moe.1.img.1", - "moe.1.img.2", - } - self.assertEqual(set(expanded.keys()), expected_keys) - - def test_moe_layer_only(self): - actions = {"moe.{moe_layer_id}.shared": "collect"} - expanded = _tp_utils.build_expanded_keys(actions, num_layers=4, start_layer=2) - self.assertEqual( - expanded, - { - "moe.2.shared": "collect", - "moe.3.shared": "collect", - }, - ) - - def test_invalid_placeholder_raises(self): - actions = {"unsupported.{unknown}": "noop"} - with self.assertRaises(ValueError): - _tp_utils.build_expanded_keys(actions, num_layers=1) - - -class GQATensorOpsTest(unittest.TestCase): - def test_gqa_split_returns_all_partitions(self): - func = _tp_utils.gqa_qkv_split_func( - tensor_parallel_degree=2, - tensor_parallel_rank=None, - num_attention_heads=4, - num_key_value_heads=2, - head_dim=1, - ) - weights = np.arange(8, dtype=np.float32) - shards = func(weights, is_column=True) - - self.assertEqual(len(shards), 2) - np.testing.assert_array_equal(shards[0], np.array([0, 1, 4, 6], dtype=np.float32)) - np.testing.assert_array_equal(shards[1], np.array([2, 3, 5, 7], dtype=np.float32)) - - def test_gqa_split_with_rank_and_repeat_kv(self): - func = _tp_utils.gqa_qkv_split_func( - tensor_parallel_degree=2, - tensor_parallel_rank=1, - num_attention_heads=2, - num_key_value_heads=1, - head_dim=2, - ) - weights = np.arange(8, dtype=np.float32) - shard = func(weights, is_column=True) - np.testing.assert_array_equal(shard, np.array([2, 3, 4, 5, 6, 7], dtype=np.float32)) - - def test_gqa_split_on_matrix_rows(self): - func = _tp_utils.gqa_qkv_split_func( - tensor_parallel_degree=2, - tensor_parallel_rank=None, - num_attention_heads=4, - num_key_value_heads=2, - head_dim=1, - ) - weights = np.arange(16, dtype=np.float32).reshape(2, 8) - shards = func(weights, is_column=False) - self.assertEqual(len(shards), 2) - np.testing.assert_array_equal( - shards[0], np.array([[0, 1, 2, 3, 4, 5, 6, 7]], dtype=np.float32) - ) - - def test_gqa_merge_reconstructs_weights(self): - weight_list = [ - np.array([0, 1, 4, 6], dtype=np.float32), - np.array([2, 3, 5, 7], dtype=np.float32), - ] - merge = _tp_utils.gqa_qkv_merge_func( - num_attention_heads=4, num_key_value_heads=2, head_dim=1 - ) - merged = merge(weight_list, is_column=True) - np.testing.assert_array_equal(merged, np.arange(8, dtype=np.float32)) - - def test_split_or_merge_qkv_dispatch(self): - weights = np.arange(8, dtype=np.float32) - split = _tp_utils.split_or_merge_qkv_func( - True, 2, None, 4, 2, 1 - ) - shards = split(weights, is_column=True) - merge = _tp_utils.split_or_merge_qkv_func( - False, 2, None, 4, 2, 1 - ) - restored = merge(shards, is_column=True) - np.testing.assert_array_equal(restored, weights) - - def test_split_or_merge_func_v1_row_bias(self): - fn = _tp_utils.split_or_merge_func_v1( - is_split=True, - tensor_parallel_degree=4, - tensor_parallel_rank=0, - ) - bias = np.ones(4, dtype=np.float32) - scaled = fn(bias, is_tp_row_bias=True) - np.testing.assert_array_equal(scaled, np.ones(4, dtype=np.float32) / 4) - - def test_split_or_merge_func_v1_gqa_path(self): - fn = _tp_utils.split_or_merge_func_v1( - is_split=True, - tensor_parallel_degree=2, - tensor_parallel_rank=None, - num_attention_heads=4, - num_key_value_heads=2, - head_dim=1, - ) - weights = np.arange(8, dtype=np.float32).reshape(2, 4) - shards = fn(weights, is_gqa=True, is_column=False) - self.assertEqual(len(shards), 2) - - def test_split_or_merge_func_v1_default_path(self): - fn = _tp_utils.split_or_merge_func_v1( - is_split=False, - tensor_parallel_degree=2, - tensor_parallel_rank=None, - num_attention_heads=4, - ) - parts = [np.array([0, 1], dtype=np.float32), np.array([2, 3], dtype=np.float32)] - merged = fn(parts, is_column=True, is_naive_2fuse=True) - np.testing.assert_array_equal(merged, np.array([0, 1, 2, 3], dtype=np.float32)) - -if __name__ == "__main__": # pragma: no cover - entry point for python -m unittest - unittest.main()