From c91649b1ee874fb1870b2fd408cdebf780019ed6 Mon Sep 17 00:00:00 2001 From: DNXie Date: Wed, 24 Sep 2025 15:15:03 -0700 Subject: [PATCH 01/13] grpo main --- apps/grpo/main.py | 12 ++++++------ apps/grpo/qwen3_1_7b.yaml | 6 ------ apps/grpo/qwen3_8b.yaml | 6 ------ apps/grpo/qwen3_multinode.yaml | 6 ------ 4 files changed, 6 insertions(+), 24 deletions(-) diff --git a/apps/grpo/main.py b/apps/grpo/main.py index 15642272c..61d46f292 100644 --- a/apps/grpo/main.py +++ b/apps/grpo/main.py @@ -259,17 +259,17 @@ async def main(cfg: DictConfig): ref_model, reward_actor, ) = await asyncio.gather( - DatasetActor.options(**cfg.services.dataset).as_service(**cfg.dataset), + DatasetActor.options(**cfg.services.dataset).as_actor(**cfg.dataset), Policy.options(**cfg.services.policy).as_service(**cfg.policy), - RLTrainer.options(**cfg.services.trainer).as_service( + RLTrainer.options(**cfg.services.trainer).as_actor( **cfg.trainer, loss=simple_grpo_loss ), - ReplayBuffer.options(**cfg.services.replay_buffer).as_service( + ReplayBuffer.options(**cfg.services.replay_buffer).as_actor( **cfg.replay_buffer, collate=collate ), - ComputeAdvantages.options(**cfg.services.compute_advantages).as_service(), - ReferenceModel.options(**cfg.services.ref_model).as_service(**cfg.ref_model), - RewardActor.options(**cfg.services.reward_actor).as_service( + ComputeAdvantages.options(**cfg.services.compute_advantages).as_actor(), + ReferenceModel.options(**cfg.services.ref_model).as_actor(**cfg.ref_model), + RewardActor.options(**cfg.services.reward_actor).as_actor( reward_functions=[MathReward(), ThinkingReward()] ), ) diff --git a/apps/grpo/qwen3_1_7b.yaml b/apps/grpo/qwen3_1_7b.yaml index 5c0481528..619b173cf 100644 --- a/apps/grpo/qwen3_1_7b.yaml +++ b/apps/grpo/qwen3_1_7b.yaml @@ -102,7 +102,6 @@ ref_model: services: dataset: procs: 1 - num_replicas: 1 with_gpus: false policy: procs: ${policy.engine_config.tensor_parallel_size} @@ -110,21 +109,16 @@ services: with_gpus: true trainer: procs: 1 - num_replicas: 1 with_gpus: true replay_buffer: procs: 1 - num_replicas: 1 with_gpus: false ref_model: procs: 1 - num_replicas: 1 with_gpus: true compute_advantages: procs: 1 - num_replicas: 1 with_gpus: false reward_actor: procs: 1 - num_replicas: 1 with_gpus: false diff --git a/apps/grpo/qwen3_8b.yaml b/apps/grpo/qwen3_8b.yaml index 1e562f873..80069d461 100644 --- a/apps/grpo/qwen3_8b.yaml +++ b/apps/grpo/qwen3_8b.yaml @@ -103,7 +103,6 @@ ref_model: services: dataset: procs: 1 - num_replicas: 1 with_gpus: false policy: procs: ${policy.engine_config.tensor_parallel_size} @@ -111,21 +110,16 @@ services: with_gpus: true trainer: procs: 2 - num_replicas: 1 with_gpus: true replay_buffer: procs: 1 - num_replicas: 1 with_gpus: false ref_model: procs: 1 - num_replicas: 1 with_gpus: true compute_advantages: procs: 1 - num_replicas: 1 with_gpus: false reward_actor: procs: 1 - num_replicas: 1 with_gpus: false diff --git a/apps/grpo/qwen3_multinode.yaml b/apps/grpo/qwen3_multinode.yaml index ade01855f..70c19e8b0 100644 --- a/apps/grpo/qwen3_multinode.yaml +++ b/apps/grpo/qwen3_multinode.yaml @@ -48,7 +48,6 @@ ref_model: services: dataset: procs: 1 - num_replicas: 1 with_gpus: false policy: procs: 1 @@ -58,21 +57,16 @@ services: trainer: procs: 1 hosts: 1 - num_replicas: 1 with_gpus: true replay_buffer: procs: 1 - num_replicas: 1 with_gpus: false compute_advantages: procs: 1 - num_replicas: 1 with_gpus: false ref_model: procs: 1 - num_replicas: 1 with_gpus: true reward_actor: procs: 1 - num_replicas: 1 with_gpus: false From e7ba81ea9d9d1e4b82cc21fafe8c1c4089a08ccc Mon Sep 17 00:00:00 2001 From: DNXie Date: Wed, 24 Sep 2025 15:24:23 -0700 Subject: [PATCH 02/13] move service config to actor section --- apps/grpo/main.py | 12 ++++++------ apps/grpo/qwen3_1_7b.yaml | 8 +++++--- apps/grpo/qwen3_8b.yaml | 8 +++++--- apps/grpo/qwen3_multinode.yaml | 8 +++++--- 4 files changed, 21 insertions(+), 15 deletions(-) diff --git a/apps/grpo/main.py b/apps/grpo/main.py index 61d46f292..db40e4411 100644 --- a/apps/grpo/main.py +++ b/apps/grpo/main.py @@ -259,17 +259,17 @@ async def main(cfg: DictConfig): ref_model, reward_actor, ) = await asyncio.gather( - DatasetActor.options(**cfg.services.dataset).as_actor(**cfg.dataset), + DatasetActor.options(**cfg.actors.dataset).as_actor(**cfg.dataset), Policy.options(**cfg.services.policy).as_service(**cfg.policy), - RLTrainer.options(**cfg.services.trainer).as_actor( + RLTrainer.options(**cfg.actors.trainer).as_actor( **cfg.trainer, loss=simple_grpo_loss ), - ReplayBuffer.options(**cfg.services.replay_buffer).as_actor( + ReplayBuffer.options(**cfg.actors.replay_buffer).as_actor( **cfg.replay_buffer, collate=collate ), - ComputeAdvantages.options(**cfg.services.compute_advantages).as_actor(), - ReferenceModel.options(**cfg.services.ref_model).as_actor(**cfg.ref_model), - RewardActor.options(**cfg.services.reward_actor).as_actor( + ComputeAdvantages.options(**cfg.actors.compute_advantages).as_actor(), + ReferenceModel.options(**cfg.actors.ref_model).as_actor(**cfg.ref_model), + RewardActor.options(**cfg.actors.reward_actor).as_actor( reward_functions=[MathReward(), ThinkingReward()] ), ) diff --git a/apps/grpo/qwen3_1_7b.yaml b/apps/grpo/qwen3_1_7b.yaml index 619b173cf..2eb2414c6 100644 --- a/apps/grpo/qwen3_1_7b.yaml +++ b/apps/grpo/qwen3_1_7b.yaml @@ -100,13 +100,15 @@ ref_model: # All resource allocations services: - dataset: - procs: 1 - with_gpus: false policy: procs: ${policy.engine_config.tensor_parallel_size} num_replicas: 1 with_gpus: true + +actors: + dataset: + procs: 1 + with_gpus: false trainer: procs: 1 with_gpus: true diff --git a/apps/grpo/qwen3_8b.yaml b/apps/grpo/qwen3_8b.yaml index 80069d461..bf74bb619 100644 --- a/apps/grpo/qwen3_8b.yaml +++ b/apps/grpo/qwen3_8b.yaml @@ -101,13 +101,15 @@ ref_model: # All resource allocations services: - dataset: - procs: 1 - with_gpus: false policy: procs: ${policy.engine_config.tensor_parallel_size} num_replicas: 1 with_gpus: true + +actors: + dataset: + procs: 1 + with_gpus: false trainer: procs: 2 with_gpus: true diff --git a/apps/grpo/qwen3_multinode.yaml b/apps/grpo/qwen3_multinode.yaml index 70c19e8b0..463a1ee58 100644 --- a/apps/grpo/qwen3_multinode.yaml +++ b/apps/grpo/qwen3_multinode.yaml @@ -46,14 +46,16 @@ ref_model: model_name: ${model} services: - dataset: - procs: 1 - with_gpus: false policy: procs: 1 hosts: 1 num_replicas: 1 with_gpus: true + +actors: + dataset: + procs: 1 + with_gpus: false trainer: procs: 1 hosts: 1 From df2e04c9c1fa13d7bf20a4daa8d9160cddb4bd14 Mon Sep 17 00:00:00 2001 From: DNXie Date: Wed, 24 Sep 2025 15:40:04 -0700 Subject: [PATCH 03/13] update toyrl --- apps/toy_rl/sumdigits.py | 14 ++++++-------- apps/toy_rl/sumdigits.yaml | 13 +++++-------- 2 files changed, 11 insertions(+), 16 deletions(-) diff --git a/apps/toy_rl/sumdigits.py b/apps/toy_rl/sumdigits.py index 4ef0833c2..7803eedb5 100644 --- a/apps/toy_rl/sumdigits.py +++ b/apps/toy_rl/sumdigits.py @@ -4,7 +4,7 @@ # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. -# Usage: python -m apps.grpo.main --config apps/grpo/qwen3_1_7b.yaml +# Usage: python -m apps.toy_rl.sumdigits --config apps/toy_rl/sumdigits.yaml import asyncio import random @@ -449,14 +449,12 @@ async def main(cfg: DictConfig): reward_actor, ref_model, ) = await asyncio.gather( - DatasetActor.options(**cfg.services.dataset).as_service(**cfg.dataset), + DatasetActor.options(**cfg.actors.dataset).as_actor(**cfg.dataset), Policy.options(**cfg.services.policy).as_service(**cfg.policy), - Trainer.options(**cfg.services.trainer).as_service(**cfg.trainer), - ReplayBuffer.options(**cfg.services.replay_buffer).as_service( - **cfg.replay_buffer - ), - RewardActor.options(**cfg.services.reward_actor).as_service(), - RefModel.options(**cfg.services.ref_model).as_service(**cfg.ref_model), + Trainer.options(**cfg.actors.trainer).as_actor(**cfg.trainer), + ReplayBuffer.options(**cfg.actors.replay_buffer).as_actor(**cfg.replay_buffer), + RewardActor.options(**cfg.actors.reward_actor).as_actor(), + RefModel.options(**cfg.actors.ref_model).as_actor(**cfg.ref_model), ) print("All services initialized successfully!") diff --git a/apps/toy_rl/sumdigits.yaml b/apps/toy_rl/sumdigits.yaml index 86c6a3c52..8e2392504 100644 --- a/apps/toy_rl/sumdigits.yaml +++ b/apps/toy_rl/sumdigits.yaml @@ -40,27 +40,24 @@ replay_buffer: dp_size: 1 services: - dataset: - procs: 1 - num_replicas: 1 - with_gpus: false policy: procs: 1 num_replicas: 1 with_gpus: true + +actors: + dataset: + procs: 1 + with_gpus: false trainer: procs: 1 - num_replicas: 1 with_gpus: true replay_buffer: procs: 1 - num_replicas: 1 with_gpus: false reward_actor: procs: 1 - num_replicas: 1 with_gpus: false ref_model: procs: 1 - num_replicas: 1 with_gpus: true From e9fb976d470c801d29c9468a8e0288c0743762b7 Mon Sep 17 00:00:00 2001 From: DNXie Date: Thu, 25 Sep 2025 09:53:40 -0700 Subject: [PATCH 04/13] app/rl --- apps/rl/main.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/apps/rl/main.py b/apps/rl/main.py index 9f7314f53..d626b4df7 100644 --- a/apps/rl/main.py +++ b/apps/rl/main.py @@ -135,10 +135,8 @@ def simple_grpo_loss( async def run(cfg: DictConfig): - trainer = await RLTrainer.options( - procs=1, with_gpus=True, num_replicas=4 - ).as_service(**cfg.trainer) - replay_buffer = await ReplayBuffer.options(procs=1, num_replicas=1).as_service( + trainer = await RLTrainer.options(procs=1, with_gpus=True).as_actor(**cfg.trainer) + replay_buffer = await ReplayBuffer.options(procs=1, num_replicas=1).as_actor( **cfg.replay_buffer ) From 17c7b5bd8c9676e46829c3f48fc43a47bc9070d2 Mon Sep 17 00:00:00 2001 From: DNXie Date: Thu, 25 Sep 2025 10:17:33 -0700 Subject: [PATCH 05/13] fix rl/main --- apps/rl/llama3_8b.yaml | 4 ++-- apps/rl/main.py | 25 +++++++++++++++++++++++-- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/apps/rl/llama3_8b.yaml b/apps/rl/llama3_8b.yaml index a7d2e3d96..e366685d6 100644 --- a/apps/rl/llama3_8b.yaml +++ b/apps/rl/llama3_8b.yaml @@ -35,7 +35,7 @@ trainer: parallelism: data_parallel_replicate_degree: 1 - data_parallel_shard_degree: 4 + data_parallel_shard_degree: 1 tensor_parallel_degree: 1 pipeline_parallel_degree: 1 context_parallel_degree: 1 @@ -59,4 +59,4 @@ replay_buffer: batch_size: 4 max_policy_age: 2 seed: None - dp_size: 4 + dp_size: ${trainer.parallelism.data_parallel_shard_degree} diff --git a/apps/rl/main.py b/apps/rl/main.py index d626b4df7..9eb656698 100644 --- a/apps/rl/main.py +++ b/apps/rl/main.py @@ -94,6 +94,25 @@ def collate(batches: list[list[Episode]]): return inputs, targets +def simple_grpo_loss( + logits: torch.Tensor, + response: torch.Tensor, + ref_logprobs: torch.Tensor, + advantages: torch.Tensor, + padding_mask: torch.Tensor, + beta: float = 0.1, +) -> torch.Tensor: + logprobs = compute_logprobs(logits, response) + kl = torch.exp(ref_logprobs - logprobs) - (ref_logprobs - logprobs) - 1 + per_token_policy_loss = torch.exp(logprobs - logprobs.detach()) * advantages + per_token_loss = -(per_token_policy_loss - beta * kl) + loss = ( + ((per_token_loss * padding_mask).sum(dim=1)) + / (padding_mask.sum(dim=1).clamp(min=1.0)) + ).mean() + return loss + + def compute_logprobs( logits: Tensor, input_ids: torch.Tensor, temperature: float = 1.0 ) -> Tensor: @@ -135,9 +154,11 @@ def simple_grpo_loss( async def run(cfg: DictConfig): - trainer = await RLTrainer.options(procs=1, with_gpus=True).as_actor(**cfg.trainer) + trainer = await RLTrainer.options(procs=1, with_gpus=True).as_actor( + **cfg.trainer, loss=simple_grpo_loss + ) replay_buffer = await ReplayBuffer.options(procs=1, num_replicas=1).as_actor( - **cfg.replay_buffer + **cfg.replay_buffer, collate=collate ) print("Services initialized....") From 5e79423844e4330cea934b49cf4bb052c2eacb49 Mon Sep 17 00:00:00 2001 From: DNXie Date: Thu, 25 Sep 2025 10:35:15 -0700 Subject: [PATCH 06/13] fix rl.main --- apps/rl/llama3_8b.yaml | 14 ++++++++++++-- apps/rl/main.py | 23 ++--------------------- 2 files changed, 14 insertions(+), 23 deletions(-) diff --git a/apps/rl/llama3_8b.yaml b/apps/rl/llama3_8b.yaml index e366685d6..b1aa4b8e4 100644 --- a/apps/rl/llama3_8b.yaml +++ b/apps/rl/llama3_8b.yaml @@ -29,13 +29,14 @@ trainer: max_norm: 1.0 steps: 5 dataset: "c4" + dtype: bfloat16 compile: enable: false parallelism: data_parallel_replicate_degree: 1 - data_parallel_shard_degree: 1 + data_parallel_shard_degree: -1 tensor_parallel_degree: 1 pipeline_parallel_degree: 1 context_parallel_degree: 1 @@ -59,4 +60,13 @@ replay_buffer: batch_size: 4 max_policy_age: 2 seed: None - dp_size: ${trainer.parallelism.data_parallel_shard_degree} + dp_size: 2 + + +actors: + trainer: + procs: 2 + with_gpus: true + replay_buffer: + procs: 1 + with_gpus: false diff --git a/apps/rl/main.py b/apps/rl/main.py index 9eb656698..3a361535c 100644 --- a/apps/rl/main.py +++ b/apps/rl/main.py @@ -94,25 +94,6 @@ def collate(batches: list[list[Episode]]): return inputs, targets -def simple_grpo_loss( - logits: torch.Tensor, - response: torch.Tensor, - ref_logprobs: torch.Tensor, - advantages: torch.Tensor, - padding_mask: torch.Tensor, - beta: float = 0.1, -) -> torch.Tensor: - logprobs = compute_logprobs(logits, response) - kl = torch.exp(ref_logprobs - logprobs) - (ref_logprobs - logprobs) - 1 - per_token_policy_loss = torch.exp(logprobs - logprobs.detach()) * advantages - per_token_loss = -(per_token_policy_loss - beta * kl) - loss = ( - ((per_token_loss * padding_mask).sum(dim=1)) - / (padding_mask.sum(dim=1).clamp(min=1.0)) - ).mean() - return loss - - def compute_logprobs( logits: Tensor, input_ids: torch.Tensor, temperature: float = 1.0 ) -> Tensor: @@ -154,10 +135,10 @@ def simple_grpo_loss( async def run(cfg: DictConfig): - trainer = await RLTrainer.options(procs=1, with_gpus=True).as_actor( + trainer = await RLTrainer.options(**cfg.actors.trainer).as_actor( **cfg.trainer, loss=simple_grpo_loss ) - replay_buffer = await ReplayBuffer.options(procs=1, num_replicas=1).as_actor( + replay_buffer = await ReplayBuffer.options(**cfg.actors.replay_buffer).as_actor( **cfg.replay_buffer, collate=collate ) From fca81566af5394f3d4a1725fb2d5079a9dbb921d Mon Sep 17 00:00:00 2001 From: DNXie Date: Thu, 25 Sep 2025 12:31:01 -0700 Subject: [PATCH 07/13] fix toy_rl, drop rl --- apps/rl/__init__.py | 5 - apps/rl/llama3_8b.yaml | 72 --------------- apps/rl/main.py | 182 ------------------------------------- apps/toy_rl/sumdigits.yaml | 2 + src/forge/actors/policy.py | 4 +- 5 files changed, 5 insertions(+), 260 deletions(-) delete mode 100644 apps/rl/__init__.py delete mode 100644 apps/rl/llama3_8b.yaml delete mode 100644 apps/rl/main.py diff --git a/apps/rl/__init__.py b/apps/rl/__init__.py deleted file mode 100644 index 2e41cd717..000000000 --- a/apps/rl/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the BSD-style license found in the -# LICENSE file in the root directory of this source tree. diff --git a/apps/rl/llama3_8b.yaml b/apps/rl/llama3_8b.yaml deleted file mode 100644 index b1aa4b8e4..000000000 --- a/apps/rl/llama3_8b.yaml +++ /dev/null @@ -1,72 +0,0 @@ -# Config for GRPO finetuning using a Llama3.1 8B Instruct model -# -# This config assumes that you've run the following command before launching -# this run: -# export HF_HUB_DISABLE_XET=1 -# uv run forge download meta-llama/Meta-Llama-3.1-8B-Instruct - - -trainer: - comm: - trace_buf_size: 0 - - model: - name: llama3 - flavor: 8B - hf_assets_path: /tmp/Meta-Llama-3.1-8B-Instruct - - optimizer: - name: AdamW - lr: 1e-5 - eps: 1e-8 - - lr_scheduler: - warmup_steps: 1 - - training: - local_batch_size: 1 - seq_len: 2048 - max_norm: 1.0 - steps: 5 - dataset: "c4" - dtype: bfloat16 - - compile: - enable: false - - parallelism: - data_parallel_replicate_degree: 1 - data_parallel_shard_degree: -1 - tensor_parallel_degree: 1 - pipeline_parallel_degree: 1 - context_parallel_degree: 1 - expert_parallel_degree: 1 - disable_loss_parallel: false - - checkpoint: - enable: true - folder: /tmp/Meta-Llama-3.1-8B-Instruct/saved_checkpoints - initial_load_path: /tmp/Meta-Llama-3.1-8B-Instruct/ - initial_load_in_hf: true - last_save_in_hf: true - interval: 500 - async_mode: "disabled" - - activation_checkpoint: - mode: selective - selective_ac_option: op - -replay_buffer: - batch_size: 4 - max_policy_age: 2 - seed: None - dp_size: 2 - - -actors: - trainer: - procs: 2 - with_gpus: true - replay_buffer: - procs: 1 - with_gpus: false diff --git a/apps/rl/main.py b/apps/rl/main.py deleted file mode 100644 index 3a361535c..000000000 --- a/apps/rl/main.py +++ /dev/null @@ -1,182 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the BSD-style license found in the -# LICENSE file in the root directory of this source tree. -"""A working example showcasing a practical example of forge with RL. - -Run this with: - python -m apps.rl.main --config apps/rl/llama3_8b.yaml - -""" - -import asyncio -import logging -import sys -from dataclasses import dataclass -from typing import Any - -import torch -import torch.nn.functional as F -from forge.actors import ReplayBuffer, RLTrainer -from forge.cli.config import parse - -from omegaconf import DictConfig -from torch import Tensor - -logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) - - -@dataclass -class Episode: - # TODO: add adtional layer for multi-turn - episode_id: str - request: str - policy_version: int - pad_id: int - request_len: int - response_len: int - target: Any | None = None - # processed data - response: str | None = None - request_tokens: list[int] | None = None - response_tokens: list[int] | None = None - ref_logprobs: Tensor | None = None - reward: float | None = None - advantage: float | None = None - - @property - def request_tensor(self): - tensor = torch.tensor(self.request_tokens, dtype=torch.long) - if tensor.shape[0] < self.request_len: # left pad - diff = self.request_len - tensor.shape[0] - tensor = F.pad(tensor, (diff, 0), value=self.pad_id) - return tensor - - @property - def response_tensor(self): - tensor = torch.tensor(self.response_tokens, dtype=torch.long) - if tensor.shape[0] < self.response_len: # right pad - diff = self.response_len - tensor.shape[0] - tensor = F.pad(tensor, (0, diff), value=self.pad_id) - return tensor - - -def collate(batches: list[list[Episode]]): - inputs = [] - targets = [] - for batch in batches: - request = [e.request_tensor for e in batch] - request = torch.stack(request) # [b x s] - - response = [e.response_tensor for e in batch] - response = torch.stack(response) # [b x s] - - ref_logprobs = [e.ref_logprobs for e in batch] - ref_logprobs = torch.stack(ref_logprobs).squeeze() # [b x s] - - advantages = [e.advantage for e in batch] - advantages = torch.tensor(advantages).unsqueeze(-1) # [b x 1] - - pad_id = batch[0].pad_id - mask = response != pad_id - - input = {"tokens": torch.cat([request, response], dim=1)} - target = { - "response": response, - "ref_logprobs": ref_logprobs, - "advantages": advantages, - "padding_mask": mask, - } - inputs.append(input) - targets.append(target) - return inputs, targets - - -def compute_logprobs( - logits: Tensor, input_ids: torch.Tensor, temperature: float = 1.0 -) -> Tensor: - context_length = logits.shape[1] - input_ids.shape[1] - - # Truncate request logits and drop last - logits = logits[:, context_length - 1 : -1] - - # Compute logprobs - logprobs = torch.log_softmax(logits / temperature, dim=-1) - logprobs = torch.gather(logprobs, 2, input_ids.unsqueeze(-1)).squeeze(-1) - - return logprobs - - -def simple_grpo_loss( - logits: Tensor, - response: Tensor, - ref_logprobs: Tensor, - advantages: Tensor, - padding_mask: Tensor, - beta: float = 0.1, -): - """Simplified GRPO Loss for simplified single step updates - Copied from https://github.com/pytorch/torchtune/blob/main/torchtune/dev/grpo/loss.py. - """ - logprobs = compute_logprobs(logits, response) - per_token_kl = ( - torch.exp(ref_logprobs.detach() - logprobs) - - (ref_logprobs.detach() - logprobs) - - 1 - ) - per_token_policy_loss = torch.exp(logprobs - logprobs.detach()) * advantages - per_token_loss = -(per_token_policy_loss - beta * per_token_kl) - loss = ( - (per_token_loss * padding_mask).sum(dim=1) / (padding_mask.sum(dim=1) + 1e-8) - ).mean() - return loss - - -async def run(cfg: DictConfig): - trainer = await RLTrainer.options(**cfg.actors.trainer).as_actor( - **cfg.trainer, loss=simple_grpo_loss - ) - replay_buffer = await ReplayBuffer.options(**cfg.actors.replay_buffer).as_actor( - **cfg.replay_buffer, collate=collate - ) - - print("Services initialized....") - - print("Collecting Data...") - g = torch.manual_seed(0) - global_batch_size = cfg.replay_buffer.batch_size * cfg.replay_buffer.dp_size - for i in range(global_batch_size): - req_len, res_len = torch.randint(64, 256, (2,), generator=g).tolist() - e = Episode( - episode_id=i, - request="", - policy_version=0, - pad_id=0, - request_len=256, - response_len=256, - request_tokens=torch.randint(64_000, (req_len,), generator=g).tolist(), - response_tokens=torch.randint(64_000, (res_len,), generator=g).tolist(), - ref_logprobs=torch.randn((256,), generator=g), - advantage=torch.randn((1,), generator=g), - ) - await replay_buffer.add.choose(e) - - print("Train step...") - inputs, targets = await replay_buffer.sample.choose(curr_policy_version=0) - outputs = await trainer.train_step.choose(inputs, targets) - print("Loss: ", outputs["loss"]) - - print("Shutting down...") - await trainer.shutdown() - await replay_buffer.shutdown() - - -@parse -def recipe_main(cfg: DictConfig) -> None: - asyncio.run(run(cfg)) - - -if __name__ == "__main__": - sys.exit(recipe_main()) diff --git a/apps/toy_rl/sumdigits.yaml b/apps/toy_rl/sumdigits.yaml index 8e2392504..7171e3263 100644 --- a/apps/toy_rl/sumdigits.yaml +++ b/apps/toy_rl/sumdigits.yaml @@ -13,6 +13,7 @@ dataset: # Policy configuration policy: + use_dcp: false engine_config: model: ${model} tensor_parallel_size: 1 @@ -24,6 +25,7 @@ policy: temperature: 1.0 top_p: 1.0 + # Trainer configuration trainer: model_name: ${model} diff --git a/src/forge/actors/policy.py b/src/forge/actors/policy.py index a281fb414..0fd68cfde 100644 --- a/src/forge/actors/policy.py +++ b/src/forge/actors/policy.py @@ -128,6 +128,7 @@ class Policy(PolicyInterface): engine_config: EngineConfig | Mapping = field(default_factory=EngineConfig) sampling_config: SamplingConfig | Mapping = field(default_factory=SamplingConfig) available_devices: str | None = None + use_dcp: bool = True # Gets set up by setup sampling_params: SamplingParams | None = None lora_request: LoRARequest | None = None @@ -153,6 +154,7 @@ async def launch( # pyright: ignore[reportIncompatibleMethodOverride] engine_config: EngineConfig | Mapping = EngineConfig(), sampling_config: SamplingConfig | Mapping = SamplingConfig(), available_devices: str | None = None, + use_dcp: bool = True, **kwargs, ) -> "Policy": # Note - get_proc_mesh will set MASTER_ADDR, MASTER_PORT and CUDA_VISIBLE_DEVICES @@ -182,7 +184,7 @@ async def launch( # pyright: ignore[reportIncompatibleMethodOverride] vllm_config = engine_config.create_vllm_config() workers = await worker_procs.spawn( - "vllm_worker", PolicyWorker, vllm_config=vllm_config + "vllm_worker", PolicyWorker, vllm_config=vllm_config, use_dcp=use_dcp ) if isinstance(sampling_config, Mapping): From 5aa40292fa6d19c17154a50909a16e5fbd3c352a Mon Sep 17 00:00:00 2001 From: DNXie Date: Thu, 25 Sep 2025 21:54:47 -0700 Subject: [PATCH 08/13] only trainer and rb are actors --- apps/grpo/main.py | 22 +++++++++++----------- apps/grpo/qwen3_1_7b.yaml | 20 ++++++++++++-------- apps/grpo/qwen3_8b.yaml | 21 +++++++++++++-------- apps/grpo/qwen3_multinode.yaml | 22 +++++++++++++--------- apps/toy_rl/sumdigits.py | 20 ++++++++++---------- apps/toy_rl/sumdigits.yaml | 19 +++++++++++-------- 6 files changed, 70 insertions(+), 54 deletions(-) diff --git a/apps/grpo/main.py b/apps/grpo/main.py index 5e916a095..a599451f9 100644 --- a/apps/grpo/main.py +++ b/apps/grpo/main.py @@ -259,7 +259,7 @@ async def main(cfg: DictConfig): ref_model, reward_actor, ) = await asyncio.gather( - DatasetActor.options(**cfg.actors.dataset).as_actor(**cfg.dataset), + DatasetActor.options(**cfg.services.dataset).as_service(**cfg.dataset), Policy.options(**cfg.services.policy).as_service(**cfg.policy), RLTrainer.options(**cfg.actors.trainer).as_actor( **cfg.trainer, loss=simple_grpo_loss @@ -267,9 +267,9 @@ async def main(cfg: DictConfig): ReplayBuffer.options(**cfg.actors.replay_buffer).as_actor( **cfg.replay_buffer, collate=collate ), - ComputeAdvantages.options(**cfg.actors.compute_advantages).as_actor(), - ReferenceModel.options(**cfg.actors.ref_model).as_actor(**cfg.ref_model), - RewardActor.options(**cfg.actors.reward_actor).as_actor( + ComputeAdvantages.options(**cfg.services.compute_advantages).as_service(), + ReferenceModel.options(**cfg.services.ref_model).as_service(**cfg.ref_model), + RewardActor.options(**cfg.services.reward_actor).as_service( reward_functions=[MathReward(), ThinkingReward()] ), ) @@ -326,14 +326,14 @@ async def continuous_rollouts(): advantages = await compute_advantages.compute.route(group) for episode, advantage in zip(group.episodes, advantages): episode.advantage = advantage - await replay_buffer.add.route(episode) + await replay_buffer.add.choose(episode) # Log metrics avg_response_len = ( sum(len(e.response_tokens) for e in group.episodes) / group_size ) mlogger.log("avg_response_len/rollout", avg_response_len, rollout_count) - buffer_size = await replay_buffer._numel.route() + buffer_size = await replay_buffer._numel.choose() mlogger.log("buffer_size/rollout", buffer_size, rollout_count) avg_reward = sum(e.reward for e in group.episodes) / group_size mlogger.log("avg_reward/rollout", avg_reward, rollout_count) @@ -343,15 +343,15 @@ async def continuous_rollouts(): async def continuous_training(): training_step = 0 while True: - batch = await replay_buffer.sample.route(curr_policy_version=training_step) + batch = await replay_buffer.sample.choose(curr_policy_version=training_step) if batch is None: await asyncio.sleep(0.1) else: inputs, targets = batch - loss = await trainer.train_step.route(inputs, targets) + loss = await trainer.train_step.choose(inputs, targets) training_step += 1 mlogger.log("loss/training_step", loss, training_step) - await trainer.push_weights.fanout(training_step) + await trainer.push_weights.call(training_step) await policy.update_weights.fanout(training_step) print("Starting GRPO training loops...") @@ -370,8 +370,8 @@ async def continuous_training(): await asyncio.gather( dataloader.shutdown(), policy.shutdown(), - trainer.shutdown(), - replay_buffer.shutdown(), + RLTrainer.shutdown(trainer), + ReplayBuffer.shutdown(replay_buffer), compute_advantages.shutdown(), ref_model.shutdown(), reward_actor.shutdown(), diff --git a/apps/grpo/qwen3_1_7b.yaml b/apps/grpo/qwen3_1_7b.yaml index 2eb2414c6..3391730af 100644 --- a/apps/grpo/qwen3_1_7b.yaml +++ b/apps/grpo/qwen3_1_7b.yaml @@ -104,23 +104,27 @@ services: procs: ${policy.engine_config.tensor_parallel_size} num_replicas: 1 with_gpus: true - -actors: dataset: procs: 1 - with_gpus: false - trainer: - procs: 1 - with_gpus: true - replay_buffer: - procs: 1 + num_replicas: 1 with_gpus: false ref_model: procs: 1 + num_replicas: 1 with_gpus: true compute_advantages: procs: 1 + num_replicas: 1 with_gpus: false reward_actor: + procs: 1 + num_replicas: 1 + with_gpus: false + +actors: + trainer: + procs: 1 + with_gpus: true + replay_buffer: procs: 1 with_gpus: false diff --git a/apps/grpo/qwen3_8b.yaml b/apps/grpo/qwen3_8b.yaml index bf74bb619..4cfbf129d 100644 --- a/apps/grpo/qwen3_8b.yaml +++ b/apps/grpo/qwen3_8b.yaml @@ -105,23 +105,28 @@ services: procs: ${policy.engine_config.tensor_parallel_size} num_replicas: 1 with_gpus: true - -actors: dataset: procs: 1 - with_gpus: false - trainer: - procs: 2 - with_gpus: true - replay_buffer: - procs: 1 + num_replicas: 1 with_gpus: false ref_model: procs: 1 + num_replicas: 1 with_gpus: true compute_advantages: procs: 1 + num_replicas: 1 with_gpus: false reward_actor: + procs: 1 + num_replicas: 1 + with_gpus: false + + +actors: + trainer: + procs: 2 + with_gpus: true + replay_buffer: procs: 1 with_gpus: false diff --git a/apps/grpo/qwen3_multinode.yaml b/apps/grpo/qwen3_multinode.yaml index 463a1ee58..e8cde6a62 100644 --- a/apps/grpo/qwen3_multinode.yaml +++ b/apps/grpo/qwen3_multinode.yaml @@ -51,24 +51,28 @@ services: hosts: 1 num_replicas: 1 with_gpus: true - -actors: dataset: procs: 1 - with_gpus: false - trainer: - procs: 1 - hosts: 1 - with_gpus: true - replay_buffer: - procs: 1 + num_replicas: 1 with_gpus: false compute_advantages: procs: 1 + num_replicas: 1 with_gpus: false ref_model: procs: 1 + num_replicas: 1 with_gpus: true reward_actor: + procs: 1 + num_replicas: 1 + with_gpus: false + +actors: + trainer: + procs: 1 + hosts: 1 + with_gpus: true + replay_buffer: procs: 1 with_gpus: false diff --git a/apps/toy_rl/sumdigits.py b/apps/toy_rl/sumdigits.py index 6600383f7..ae59880e2 100644 --- a/apps/toy_rl/sumdigits.py +++ b/apps/toy_rl/sumdigits.py @@ -449,12 +449,12 @@ async def main(cfg: DictConfig): reward_actor, ref_model, ) = await asyncio.gather( - DatasetActor.options(**cfg.actors.dataset).as_actor(**cfg.dataset), + DatasetActor.options(**cfg.services.dataset).as_service(**cfg.dataset), Policy.options(**cfg.services.policy).as_service(**cfg.policy), Trainer.options(**cfg.actors.trainer).as_actor(**cfg.trainer), ReplayBuffer.options(**cfg.actors.replay_buffer).as_actor(**cfg.replay_buffer), - RewardActor.options(**cfg.actors.reward_actor).as_actor(), - RefModel.options(**cfg.actors.ref_model).as_actor(**cfg.ref_model), + RewardActor.options(**cfg.services.reward_actor).as_service(), + RefModel.options(**cfg.services.ref_model).as_service(**cfg.ref_model), ) print("All services initialized successfully!") @@ -495,7 +495,7 @@ async def continuous_rollouts(): ) episode.advantage = episode.reward # simple case for now for episode in group.episodes: - await replay_buffer.add.route(episode) + await replay_buffer.add.choose(episode) avg_response_len = ( sum(len(e.response_tokens) for e in group.episodes) / group_size ) @@ -508,18 +508,18 @@ async def continuous_rollouts(): async def continuous_training(): training_step = 0 while True: - batch = await replay_buffer.sample.route(curr_policy_version=training_step) + batch = await replay_buffer.sample.choose(curr_policy_version=training_step) if batch is None: await asyncio.sleep(0.1) else: - loss = await trainer.train_step.route(batch[0]) + loss = await trainer.train_step.choose(batch[0]) training_step += 1 mlogger.log("loss/training_step", loss, training_step) print(f"loss/training_step: {loss} at training step {training_step}") - await trainer.push_weights.fanout(training_step) + await trainer.push_weights.call(training_step) await policy.update_weights.fanout(training_step) # NOTE: hard-coded to be on-policy for faster convergence - await replay_buffer.clear.fanout() + await replay_buffer.clear.call() print("Starting training loop.") # TODO: Start multiple rollouts once all serivces support it @@ -537,8 +537,8 @@ async def continuous_training(): await asyncio.gather( dataloader.shutdown(), policy.shutdown(), - trainer.shutdown(), - replay_buffer.shutdown(), + Trainer.shutdown(trainer), + ReplayBuffer.shutdown(replay_buffer), reward_actor.shutdown(), ) # TODO - add a global shutdown that implicitly shuts down all services diff --git a/apps/toy_rl/sumdigits.yaml b/apps/toy_rl/sumdigits.yaml index 7171e3263..6e6dba6f9 100644 --- a/apps/toy_rl/sumdigits.yaml +++ b/apps/toy_rl/sumdigits.yaml @@ -46,20 +46,23 @@ services: procs: 1 num_replicas: 1 with_gpus: true - -actors: dataset: procs: 1 - with_gpus: false - trainer: - procs: 1 - with_gpus: true - replay_buffer: - procs: 1 + num_replicas: 1 with_gpus: false reward_actor: procs: 1 + num_replicas: 1 with_gpus: false ref_model: procs: 1 + num_replicas: 1 with_gpus: true + +actors: + trainer: + procs: 1 + with_gpus: true + replay_buffer: + procs: 1 + with_gpus: false From ff7b43555e30f9c8da820a01c0c6836bb5d28846 Mon Sep 17 00:00:00 2001 From: DNXie Date: Thu, 25 Sep 2025 21:56:43 -0700 Subject: [PATCH 09/13] for easier comparison --- apps/grpo/qwen3_1_7b.yaml | 8 ++++---- apps/grpo/qwen3_8b.yaml | 8 ++++---- apps/grpo/qwen3_multinode.yaml | 8 ++++---- apps/toy_rl/sumdigits.yaml | 8 ++++---- 4 files changed, 16 insertions(+), 16 deletions(-) diff --git a/apps/grpo/qwen3_1_7b.yaml b/apps/grpo/qwen3_1_7b.yaml index 3391730af..ace750e85 100644 --- a/apps/grpo/qwen3_1_7b.yaml +++ b/apps/grpo/qwen3_1_7b.yaml @@ -100,14 +100,14 @@ ref_model: # All resource allocations services: - policy: - procs: ${policy.engine_config.tensor_parallel_size} - num_replicas: 1 - with_gpus: true dataset: procs: 1 num_replicas: 1 with_gpus: false + policy: + procs: ${policy.engine_config.tensor_parallel_size} + num_replicas: 1 + with_gpus: true ref_model: procs: 1 num_replicas: 1 diff --git a/apps/grpo/qwen3_8b.yaml b/apps/grpo/qwen3_8b.yaml index 4cfbf129d..d65b8e69c 100644 --- a/apps/grpo/qwen3_8b.yaml +++ b/apps/grpo/qwen3_8b.yaml @@ -101,14 +101,14 @@ ref_model: # All resource allocations services: - policy: - procs: ${policy.engine_config.tensor_parallel_size} - num_replicas: 1 - with_gpus: true dataset: procs: 1 num_replicas: 1 with_gpus: false + policy: + procs: ${policy.engine_config.tensor_parallel_size} + num_replicas: 1 + with_gpus: true ref_model: procs: 1 num_replicas: 1 diff --git a/apps/grpo/qwen3_multinode.yaml b/apps/grpo/qwen3_multinode.yaml index e8cde6a62..6c63eff8d 100644 --- a/apps/grpo/qwen3_multinode.yaml +++ b/apps/grpo/qwen3_multinode.yaml @@ -46,15 +46,15 @@ ref_model: model_name: ${model} services: + dataset: + procs: 1 + num_replicas: 1 + with_gpus: false policy: procs: 1 hosts: 1 num_replicas: 1 with_gpus: true - dataset: - procs: 1 - num_replicas: 1 - with_gpus: false compute_advantages: procs: 1 num_replicas: 1 diff --git a/apps/toy_rl/sumdigits.yaml b/apps/toy_rl/sumdigits.yaml index 6e6dba6f9..c07f88d37 100644 --- a/apps/toy_rl/sumdigits.yaml +++ b/apps/toy_rl/sumdigits.yaml @@ -42,14 +42,14 @@ replay_buffer: dp_size: 1 services: - policy: - procs: 1 - num_replicas: 1 - with_gpus: true dataset: procs: 1 num_replicas: 1 with_gpus: false + policy: + procs: 1 + num_replicas: 1 + with_gpus: true reward_actor: procs: 1 num_replicas: 1 From 5e521d0ff12f49667a12dc22a59c3f204b9c2cef Mon Sep 17 00:00:00 2001 From: DNXie Date: Fri, 26 Sep 2025 10:22:16 -0700 Subject: [PATCH 10/13] change dataloader etc to actor --- apps/grpo/main.py | 14 +++++++------- apps/grpo/qwen3_1_7b.yaml | 14 ++++++-------- apps/grpo/qwen3_8b.yaml | 15 ++++++--------- apps/grpo/qwen3_multinode.yaml | 14 ++++++-------- apps/toy_rl/sumdigits.py | 8 ++++---- apps/toy_rl/sumdigits.yaml | 7 +++---- 6 files changed, 32 insertions(+), 40 deletions(-) diff --git a/apps/grpo/main.py b/apps/grpo/main.py index a599451f9..74fb0e7c5 100644 --- a/apps/grpo/main.py +++ b/apps/grpo/main.py @@ -259,7 +259,7 @@ async def main(cfg: DictConfig): ref_model, reward_actor, ) = await asyncio.gather( - DatasetActor.options(**cfg.services.dataset).as_service(**cfg.dataset), + DatasetActor.options(**cfg.actors.dataset).as_actor(**cfg.dataset), Policy.options(**cfg.services.policy).as_service(**cfg.policy), RLTrainer.options(**cfg.actors.trainer).as_actor( **cfg.trainer, loss=simple_grpo_loss @@ -267,7 +267,7 @@ async def main(cfg: DictConfig): ReplayBuffer.options(**cfg.actors.replay_buffer).as_actor( **cfg.replay_buffer, collate=collate ), - ComputeAdvantages.options(**cfg.services.compute_advantages).as_service(), + ComputeAdvantages.options(**cfg.actors.compute_advantages).as_actor(), ReferenceModel.options(**cfg.services.ref_model).as_service(**cfg.ref_model), RewardActor.options(**cfg.services.reward_actor).as_service( reward_functions=[MathReward(), ThinkingReward()] @@ -278,9 +278,9 @@ async def main(cfg: DictConfig): # ---- Core RL loops ---- # async def continuous_rollouts(): rollout_count = 0 - pad_id = await dataloader.pad_token.route() + pad_id = await dataloader.pad_token.choose() while True: - sample = await dataloader.sample.route() + sample = await dataloader.sample.choose() if sample is None: print("Dataloader is empty, exiting continuous rollout") return @@ -323,7 +323,7 @@ async def continuous_rollouts(): del ref_logits, ref_logprobs, input_ids # Calculate advantages and add to replay buffer - advantages = await compute_advantages.compute.route(group) + advantages = await compute_advantages.compute.choose(group) for episode, advantage in zip(group.episodes, advantages): episode.advantage = advantage await replay_buffer.add.choose(episode) @@ -368,11 +368,11 @@ async def continuous_training(): finally: print("Shutting down...") await asyncio.gather( - dataloader.shutdown(), + DatasetActor.shutdown(dataloader), policy.shutdown(), RLTrainer.shutdown(trainer), ReplayBuffer.shutdown(replay_buffer), - compute_advantages.shutdown(), + ComputeAdvantages.shutdown(compute_advantages), ref_model.shutdown(), reward_actor.shutdown(), ) diff --git a/apps/grpo/qwen3_1_7b.yaml b/apps/grpo/qwen3_1_7b.yaml index ace750e85..56d5f6589 100644 --- a/apps/grpo/qwen3_1_7b.yaml +++ b/apps/grpo/qwen3_1_7b.yaml @@ -100,10 +100,6 @@ ref_model: # All resource allocations services: - dataset: - procs: 1 - num_replicas: 1 - with_gpus: false policy: procs: ${policy.engine_config.tensor_parallel_size} num_replicas: 1 @@ -112,19 +108,21 @@ services: procs: 1 num_replicas: 1 with_gpus: true - compute_advantages: - procs: 1 - num_replicas: 1 - with_gpus: false reward_actor: procs: 1 num_replicas: 1 with_gpus: false actors: + dataset: + procs: 1 + with_gpus: false trainer: procs: 1 with_gpus: true replay_buffer: procs: 1 with_gpus: false + compute_advantages: + procs: 1 + with_gpus: false diff --git a/apps/grpo/qwen3_8b.yaml b/apps/grpo/qwen3_8b.yaml index d65b8e69c..2af581d1a 100644 --- a/apps/grpo/qwen3_8b.yaml +++ b/apps/grpo/qwen3_8b.yaml @@ -101,10 +101,6 @@ ref_model: # All resource allocations services: - dataset: - procs: 1 - num_replicas: 1 - with_gpus: false policy: procs: ${policy.engine_config.tensor_parallel_size} num_replicas: 1 @@ -113,20 +109,21 @@ services: procs: 1 num_replicas: 1 with_gpus: true - compute_advantages: - procs: 1 - num_replicas: 1 - with_gpus: false reward_actor: procs: 1 num_replicas: 1 with_gpus: false - actors: + dataset: + procs: 1 + with_gpus: false trainer: procs: 2 with_gpus: true replay_buffer: procs: 1 with_gpus: false + compute_advantages: + procs: 1 + with_gpus: false diff --git a/apps/grpo/qwen3_multinode.yaml b/apps/grpo/qwen3_multinode.yaml index 6c63eff8d..cc0c913cf 100644 --- a/apps/grpo/qwen3_multinode.yaml +++ b/apps/grpo/qwen3_multinode.yaml @@ -46,19 +46,11 @@ ref_model: model_name: ${model} services: - dataset: - procs: 1 - num_replicas: 1 - with_gpus: false policy: procs: 1 hosts: 1 num_replicas: 1 with_gpus: true - compute_advantages: - procs: 1 - num_replicas: 1 - with_gpus: false ref_model: procs: 1 num_replicas: 1 @@ -69,6 +61,12 @@ services: with_gpus: false actors: + dataset: + procs: 1 + with_gpus: false + compute_advantages: + procs: 1 + with_gpus: false trainer: procs: 1 hosts: 1 diff --git a/apps/toy_rl/sumdigits.py b/apps/toy_rl/sumdigits.py index ae59880e2..053dd3bfb 100644 --- a/apps/toy_rl/sumdigits.py +++ b/apps/toy_rl/sumdigits.py @@ -449,7 +449,7 @@ async def main(cfg: DictConfig): reward_actor, ref_model, ) = await asyncio.gather( - DatasetActor.options(**cfg.services.dataset).as_service(**cfg.dataset), + DatasetActor.options(**cfg.actors.dataset).as_actor(**cfg.dataset), Policy.options(**cfg.services.policy).as_service(**cfg.policy), Trainer.options(**cfg.actors.trainer).as_actor(**cfg.trainer), ReplayBuffer.options(**cfg.actors.replay_buffer).as_actor(**cfg.replay_buffer), @@ -462,10 +462,10 @@ async def main(cfg: DictConfig): # ---- Core RL loops ---- # async def continuous_rollouts(): rollout_count = 0 - pad_id = await dataloader.pad_token.route() + pad_id = await dataloader.pad_token.choose() while True: # Pass rollout_count for curriculum learning - sample = await dataloader.sample.route(rollout_count) + sample = await dataloader.sample.choose(rollout_count) if sample is None: print("Dataloader is empty, exiting continuous rollout") return @@ -535,7 +535,7 @@ async def continuous_training(): finally: print("Shutting down...") await asyncio.gather( - dataloader.shutdown(), + DatasetActor.shutdown(dataloader), policy.shutdown(), Trainer.shutdown(trainer), ReplayBuffer.shutdown(replay_buffer), diff --git a/apps/toy_rl/sumdigits.yaml b/apps/toy_rl/sumdigits.yaml index c07f88d37..767bf7f3b 100644 --- a/apps/toy_rl/sumdigits.yaml +++ b/apps/toy_rl/sumdigits.yaml @@ -42,10 +42,6 @@ replay_buffer: dp_size: 1 services: - dataset: - procs: 1 - num_replicas: 1 - with_gpus: false policy: procs: 1 num_replicas: 1 @@ -60,6 +56,9 @@ services: with_gpus: true actors: + dataset: + procs: 1 + with_gpus: false trainer: procs: 1 with_gpus: true From bc95d9c191326c36e8c44d7a10f8925261cf0dfc Mon Sep 17 00:00:00 2001 From: DNXie Date: Fri, 26 Sep 2025 10:28:26 -0700 Subject: [PATCH 11/13] fix lint --- apps/grpo/main.py | 2 +- apps/toy_rl/sumdigits.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/grpo/main.py b/apps/grpo/main.py index b32df9355..6fd88ef80 100644 --- a/apps/grpo/main.py +++ b/apps/grpo/main.py @@ -359,7 +359,7 @@ async def continuous_training(): training_step += 1 mlogger.log("loss/training_step", loss, training_step) await trainer.push_weights.call( - training_step, vllm_tp_DEPRECATED=policy_tp_size + training_step, vllm_tp_DEPRECATED=policy_tp_size ) await policy.update_weights.fanout(training_step) diff --git a/apps/toy_rl/sumdigits.py b/apps/toy_rl/sumdigits.py index 05d5be3e8..a9342e4e2 100644 --- a/apps/toy_rl/sumdigits.py +++ b/apps/toy_rl/sumdigits.py @@ -523,7 +523,7 @@ async def continuous_training(): mlogger.log("loss/training_step", loss, training_step) print(f"loss/training_step: {loss} at training step {training_step}") await trainer.push_weights.call( - training_step, vllm_tp_DEPRECATED=policy_tp_size + training_step, vllm_tp_DEPRECATED=policy_tp_size ) await policy.update_weights.fanout(training_step) # NOTE: hard-coded to be on-policy for faster convergence From 1d07edecfbe3b13309b0048f536784009bae684a Mon Sep 17 00:00:00 2001 From: DNXie Date: Mon, 29 Sep 2025 11:01:36 -0700 Subject: [PATCH 12/13] use call_one instead of choose --- apps/grpo/main.py | 16 +++++++++------- apps/toy_rl/sumdigits.py | 12 +++++++----- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/apps/grpo/main.py b/apps/grpo/main.py index 6ed3a4e2c..7d942e04b 100644 --- a/apps/grpo/main.py +++ b/apps/grpo/main.py @@ -283,9 +283,9 @@ async def main(cfg: DictConfig): # ---- Core RL loops ---- # async def continuous_rollouts(): rollout_count = 0 - pad_id = await dataloader.pad_token.choose() + pad_id = await dataloader.pad_token.call_one() while True: - sample = await dataloader.sample.choose() + sample = await dataloader.sample.call_one() if sample is None: print("Dataloader is empty, exiting continuous rollout") return @@ -332,17 +332,17 @@ async def continuous_rollouts(): del ref_logits, ref_logprobs, input_ids # Calculate advantages and add to replay buffer - advantages = await compute_advantages.compute.choose(group) + advantages = await compute_advantages.compute.call_one(group) for episode, advantage in zip(group.episodes, advantages): episode.advantage = advantage - await replay_buffer.add.choose(episode) + await replay_buffer.add.call_one(episode) # Log metrics avg_response_len = ( sum(len(e.response_tokens) for e in group.episodes) / group_size ) mlogger.log("avg_response_len/rollout", avg_response_len, rollout_count) - buffer_size = await replay_buffer._numel.choose() + buffer_size = await replay_buffer._numel.call_one() mlogger.log("buffer_size/rollout", buffer_size, rollout_count) avg_reward = sum(e.reward for e in group.episodes) / group_size mlogger.log("avg_reward/rollout", avg_reward, rollout_count) @@ -352,12 +352,14 @@ async def continuous_rollouts(): async def continuous_training(): training_step = 0 while True: - batch = await replay_buffer.sample.choose(curr_policy_version=training_step) + batch = await replay_buffer.sample.call_one( + curr_policy_version=training_step + ) if batch is None: await asyncio.sleep(0.1) else: inputs, targets = batch - loss = await trainer.train_step.choose(inputs, targets) + loss = await trainer.train_step.call_one(inputs, targets) training_step += 1 mlogger.log("loss/training_step", loss, training_step) diff --git a/apps/toy_rl/sumdigits.py b/apps/toy_rl/sumdigits.py index 204cde0d0..7903ca8bb 100644 --- a/apps/toy_rl/sumdigits.py +++ b/apps/toy_rl/sumdigits.py @@ -494,10 +494,10 @@ async def main(cfg: DictConfig): # ---- Core RL loops ---- # async def continuous_rollouts(): rollout_count = 0 - pad_id = await dataloader.pad_token.choose() + pad_id = await dataloader.pad_token.call_one() while True: # Pass rollout_count for curriculum learning - sample = await dataloader.sample.choose(rollout_count) + sample = await dataloader.sample.call_one(rollout_count) if sample is None: print("Dataloader is empty, exiting continuous rollout") return @@ -529,7 +529,7 @@ async def continuous_rollouts(): ) episode.advantage = episode.reward # simple case for now for episode in group.episodes: - await replay_buffer.add.choose(episode) + await replay_buffer.add.call_one(episode) avg_response_len = ( sum(len(e.response_tokens) for e in group.episodes) / group_size ) @@ -542,11 +542,13 @@ async def continuous_rollouts(): async def continuous_training(): training_step = 0 while True: - batch = await replay_buffer.sample.choose(curr_policy_version=training_step) + batch = await replay_buffer.sample.call_one( + curr_policy_version=training_step + ) if batch is None: await asyncio.sleep(0.1) else: - loss = await trainer.train_step.choose(batch[0]) + loss = await trainer.train_step.call_one(batch[0]) training_step += 1 mlogger.log("loss/training_step", loss, training_step) print(f"loss/training_step: {loss} at training step {training_step}") From de7a47aec3629b5aa89022780a25dd685b6137c7 Mon Sep 17 00:00:00 2001 From: DNXie Date: Mon, 29 Sep 2025 11:25:26 -0700 Subject: [PATCH 13/13] fix toy_rl errors. --- apps/toy_rl/sumdigits-tp.yaml | 23 +++++++++++------------ apps/toy_rl/sumdigits.py | 6 ++---- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/apps/toy_rl/sumdigits-tp.yaml b/apps/toy_rl/sumdigits-tp.yaml index 1f2b30d44..87f58d5ea 100644 --- a/apps/toy_rl/sumdigits-tp.yaml +++ b/apps/toy_rl/sumdigits-tp.yaml @@ -42,22 +42,10 @@ replay_buffer: dp_size: 1 services: - dataset: - procs: 1 - num_replicas: 1 - with_gpus: false policy: procs: 1 num_replicas: 1 with_gpus: true - trainer: - procs: 1 - num_replicas: 1 - with_gpus: true - replay_buffer: - procs: 1 - num_replicas: 1 - with_gpus: false reward_actor: procs: 1 num_replicas: 1 @@ -66,3 +54,14 @@ services: procs: 1 num_replicas: 1 with_gpus: true + +actors: + dataset: + procs: 1 + with_gpus: false + trainer: + procs: 1 + with_gpus: true + replay_buffer: + procs: 1 + with_gpus: false diff --git a/apps/toy_rl/sumdigits.py b/apps/toy_rl/sumdigits.py index 7903ca8bb..57971e9b9 100644 --- a/apps/toy_rl/sumdigits.py +++ b/apps/toy_rl/sumdigits.py @@ -16,9 +16,9 @@ import torch import torch.nn.functional as F import torchstore as ts +from forge.actors._torchstore_utils import get_param_key from forge.actors.policy import Policy from forge.actors.replay_buffer import ReplayBuffer -from forge.actors.torchstore_utils import get_param_key from forge.actors.trainer import _qwen3_hf_to_vllm from forge.cli.config import parse from forge.controller.actor import ForgeActor @@ -552,9 +552,7 @@ async def continuous_training(): training_step += 1 mlogger.log("loss/training_step", loss, training_step) print(f"loss/training_step: {loss} at training step {training_step}") - await trainer.push_weights.call( - training_step, vllm_tp_DEPRECATED=policy_tp_size - ) + await trainer.push_weights.call(training_step) await policy.update_weights.fanout(training_step) # NOTE: hard-coded to be on-policy for faster convergence await replay_buffer.clear.call()