From 72d1e541ef009884dffd83bf931d0e9d1371b024 Mon Sep 17 00:00:00 2001 From: "Jiyue (Jennifer) Wang" Date: Mon, 22 Sep 2025 16:10:43 -0400 Subject: [PATCH 1/5] use public api from ValueMesh --- src/forge/actors/policy.py | 2 +- src/forge/controller/service/replica.py | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/forge/actors/policy.py b/src/forge/actors/policy.py index de21b3e9e..1b391269b 100644 --- a/src/forge/actors/policy.py +++ b/src/forge/actors/policy.py @@ -248,7 +248,7 @@ async def setup(self): # Setup scheduler # TODO: Add support for `log_stats` kv_cache_configs = await self.policy_worker.setup_kv_cache.call() - kv_cache_config = kv_cache_configs._values[0] + kv_cache_config = next(kv_cache_configs.items()) self.vllm_config.cache_config.num_gpu_blocks = kv_cache_config.num_blocks self.vllm_config.cache_config.num_cpu_blocks = 0 diff --git a/src/forge/controller/service/replica.py b/src/forge/controller/service/replica.py index b84e5eec7..7a1083be9 100644 --- a/src/forge/controller/service/replica.py +++ b/src/forge/controller/service/replica.py @@ -244,10 +244,8 @@ async def _process_single_request(self, request: ServiceRequest) -> bool: # Unwrap ValueMesh if configured to return first rank result if ( self.return_first_rank_result - and hasattr(result, "_values") - and result._values ): - result = result._values[0] + result = next(result.items()) request.future.set_result(result) except ActorError as e: logger.warning(f"Got failure on replica {self.idx}. Error:\n{e}") From 90fef97ef4b7dd5ad16373ed4cb087e98ec20aeb Mon Sep 17 00:00:00 2001 From: "Jiyue (Jennifer) Wang" Date: Mon, 22 Sep 2025 16:15:08 -0400 Subject: [PATCH 2/5] nit --- src/forge/controller/service/replica.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/forge/controller/service/replica.py b/src/forge/controller/service/replica.py index 7a1083be9..70063cc60 100644 --- a/src/forge/controller/service/replica.py +++ b/src/forge/controller/service/replica.py @@ -242,9 +242,7 @@ async def _process_single_request(self, request: ServiceRequest) -> bool: try: result = await endpoint_func.call(*request.args, **request.kwargs) # Unwrap ValueMesh if configured to return first rank result - if ( - self.return_first_rank_result - ): + if self.return_first_rank_result: result = next(result.items()) request.future.set_result(result) except ActorError as e: From 548e695faebda6e7d19099dc094520f3f2bc667c Mon Sep 17 00:00:00 2001 From: "Jiyue (Jennifer) Wang" Date: Mon, 22 Sep 2025 16:17:22 -0400 Subject: [PATCH 3/5] tuple --- src/forge/actors/policy.py | 2 +- src/forge/controller/service/replica.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/forge/actors/policy.py b/src/forge/actors/policy.py index 1b391269b..710174e38 100644 --- a/src/forge/actors/policy.py +++ b/src/forge/actors/policy.py @@ -248,7 +248,7 @@ async def setup(self): # Setup scheduler # TODO: Add support for `log_stats` kv_cache_configs = await self.policy_worker.setup_kv_cache.call() - kv_cache_config = next(kv_cache_configs.items()) + _, kv_cache_config = next(kv_cache_configs.items()) self.vllm_config.cache_config.num_gpu_blocks = kv_cache_config.num_blocks self.vllm_config.cache_config.num_cpu_blocks = 0 diff --git a/src/forge/controller/service/replica.py b/src/forge/controller/service/replica.py index 70063cc60..a84e93005 100644 --- a/src/forge/controller/service/replica.py +++ b/src/forge/controller/service/replica.py @@ -243,7 +243,7 @@ async def _process_single_request(self, request: ServiceRequest) -> bool: result = await endpoint_func.call(*request.args, **request.kwargs) # Unwrap ValueMesh if configured to return first rank result if self.return_first_rank_result: - result = next(result.items()) + _, result = next(result.items()) request.future.set_result(result) except ActorError as e: logger.warning(f"Got failure on replica {self.idx}. Error:\n{e}") From ffe80cc64bdb2ed7e5a51e8c8c48b0abcd9280ea Mon Sep 17 00:00:00 2001 From: "Jiyue (Jennifer) Wang" Date: Mon, 22 Sep 2025 17:38:09 -0400 Subject: [PATCH 4/5] one more place --- src/forge/actors/policy.py | 4 +--- src/forge/controller/service/replica.py | 3 ++- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/forge/actors/policy.py b/src/forge/actors/policy.py index 710174e38..8a10baf5d 100644 --- a/src/forge/actors/policy.py +++ b/src/forge/actors/policy.py @@ -351,15 +351,13 @@ def preprocess_add_request(self, request: EngineCoreRequest) -> tuple[Request, i async def run(self): # TODO: add support for `iteration_stats` # TODO: move postprocessing out of loop to not block - parallel_config = self.vllm_config.parallel_config - output_rank = parallel_config.world_size - parallel_config.tensor_parallel_size self.running = True while self.running: scheduler_output = self.scheduler.schedule() worker_outputs = await self.policy_worker.execute_model.call( scheduler_output ) - worker_output = worker_outputs._values[output_rank] + _, worker_output = next(worker_outputs.items()) outputs = self.scheduler.update_from_output(scheduler_output, worker_output) outputs = outputs.get(0) or EngineCoreOutputs() await asyncio.sleep(0) # Release control before processing outputs diff --git a/src/forge/controller/service/replica.py b/src/forge/controller/service/replica.py index a84e93005..85df718df 100644 --- a/src/forge/controller/service/replica.py +++ b/src/forge/controller/service/replica.py @@ -243,7 +243,8 @@ async def _process_single_request(self, request: ServiceRequest) -> bool: result = await endpoint_func.call(*request.args, **request.kwargs) # Unwrap ValueMesh if configured to return first rank result if self.return_first_rank_result: - _, result = next(result.items()) + _, first_result = next(result.items()) + result = first_result request.future.set_result(result) except ActorError as e: logger.warning(f"Got failure on replica {self.idx}. Error:\n{e}") From 3ba8c929cb023d5cf1be61add65f88c9590175a4 Mon Sep 17 00:00:00 2001 From: "Jiyue (Jennifer) Wang" Date: Mon, 22 Sep 2025 22:52:47 -0400 Subject: [PATCH 5/5] comment --- src/forge/actors/policy.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/forge/actors/policy.py b/src/forge/actors/policy.py index 8a10baf5d..38889656d 100644 --- a/src/forge/actors/policy.py +++ b/src/forge/actors/policy.py @@ -357,6 +357,7 @@ async def run(self): worker_outputs = await self.policy_worker.execute_model.call( scheduler_output ) + # the results of `execute_model` is gathered on the driver rank (rank 0) _, worker_output = next(worker_outputs.items()) outputs = self.scheduler.update_from_output(scheduler_output, worker_output) outputs = outputs.get(0) or EngineCoreOutputs()