From c3e153ee883d87eff0576a3f6b55d8f871c9b4d4 Mon Sep 17 00:00:00 2001 From: DNXie Date: Wed, 15 Oct 2025 11:34:22 -0700 Subject: [PATCH 1/5] add load step. still testing --- apps/grpo/main.py | 2 +- apps/grpo/qwen3_1_7b.yaml | 5 ++++- src/forge/actors/trainer.py | 13 +++++-------- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/apps/grpo/main.py b/apps/grpo/main.py index 1dbef0b76..badc60e68 100644 --- a/apps/grpo/main.py +++ b/apps/grpo/main.py @@ -322,7 +322,7 @@ async def main(cfg: DictConfig): DatasetActor.options(**cfg.actors.dataset).as_actor(**cfg.dataset), Policy.options(**cfg.services.policy).as_service(**cfg.policy), RLTrainer.options(**cfg.actors.trainer).as_actor( - **cfg.trainer, loss=simple_grpo_loss + **cfg.trainer, loss=simple_grpo_loss, step=cfg.trainer.checkpoint.load_step ), ReplayBuffer.options(**cfg.actors.replay_buffer).as_actor( **cfg.replay_buffer, collate=collate diff --git a/apps/grpo/qwen3_1_7b.yaml b/apps/grpo/qwen3_1_7b.yaml index 14e4871cf..1b36f7229 100644 --- a/apps/grpo/qwen3_1_7b.yaml +++ b/apps/grpo/qwen3_1_7b.yaml @@ -74,7 +74,9 @@ trainer: disable_loss_parallel: true checkpoint: enable: true - initial_load_path: hf://${model} + folder: "./checkpoint" # Directory to save or resume checkpoints (default: ./checkpoints) + load_step: -1 # Step to load from; -1 means load from initial_load_path. (default: -1) + initial_load_path: hf://${model} # Optional: path or HF identifier to load model weights initially, will be ignored if `folder` exists initial_load_in_hf: true last_save_in_hf: true interval: 500 @@ -109,6 +111,7 @@ ref_model: expert_parallel_degree: 1 checkpoint: enable: true + folder: "" initial_load_path: hf://${model} initial_load_in_hf: true diff --git a/src/forge/actors/trainer.py b/src/forge/actors/trainer.py index dd85b3c82..d730b5f3e 100644 --- a/src/forge/actors/trainer.py +++ b/src/forge/actors/trainer.py @@ -136,6 +136,7 @@ class RLTrainer(ForgeActor): TORCHSTORE_USE_RDMA.get_value() == 0 ) # torchstore currently only accepts 0 or 1 dcp_path: str = "forge_dcp_tmp" + step: int = 1 def __post_init__(self): """Initializes config types and env variables. @@ -159,8 +160,9 @@ def __post_init__(self): raise TypeError( f"{f.name} should be a {f.type} type or a dict like object" ) - - self.step = 1 # fragile contract. + self.step = max( + self.step, 1 + ) # start from 1 if not loading from a saved checkpoint self.num_training_steps = self.training.steps self.gradient_accumulation_steps = 1 self.rank = current_rank().rank @@ -186,12 +188,7 @@ def __post_init__(self): async def setup(self): # TODO: update ForgeEngine to not use ForgeJobConfig engine_config = {f.name: getattr(self, f.name) for f in fields(self)} - for key in { - "loss", - "state_dict_key", - "use_dcp", - "dcp_path", - }: + for key in {"loss", "state_dict_key", "use_dcp", "dcp_path", "step"}: engine_config.pop(key) # Not part of job config self.engine = ForgeEngine(ForgeJobConfig(**engine_config)) self.engine.checkpointer.load(step=self.step) From a6aa3730d65abb13322174ea38a4e2a601d47e87 Mon Sep 17 00:00:00 2001 From: DNXie Date: Wed, 15 Oct 2025 12:29:24 -0700 Subject: [PATCH 2/5] a working version --- apps/grpo/main.py | 11 ++++++++++- apps/grpo/qwen3_1_7b.yaml | 8 ++++---- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/apps/grpo/main.py b/apps/grpo/main.py index badc60e68..c67854283 100644 --- a/apps/grpo/main.py +++ b/apps/grpo/main.py @@ -353,6 +353,14 @@ async def main(cfg: DictConfig): ) print("Torchstore successfully initialized with local rank strategy") + start_version = int(cfg.trainer.checkpoint.load_step or 0) + if start_version > 0: + # Ensure the trainer’s loaded checkpoint is materialized in torchstore at `start_version` + await trainer.push_weights.call(start_version) + + # Warm the policy to that exact version so new rollouts carry generator_version == start_version + await policy.update_weights.fanout(start_version) + # ---- Core RL loops ---- # async def continuous_rollouts(): rollout_count = 0 @@ -420,12 +428,13 @@ async def continuous_rollouts(): t.stop() async def continuous_training(): - training_step = 0 + training_step = max(cfg.trainer.checkpoint.load_step, 0) restart_tracer = True # Flag to control when to restart tracer while max_steps == -1 or training_step < max_steps: # Restart tracer when needed (initial start or after completing a training step) # Otherwise, we cannot measure time waiting for buffer + print(f"[DEBUG], training_step: {training_step}") if restart_tracer: t = Tracer("main_perf/continuous_training") t.start() diff --git a/apps/grpo/qwen3_1_7b.yaml b/apps/grpo/qwen3_1_7b.yaml index 1b36f7229..5576f6301 100644 --- a/apps/grpo/qwen3_1_7b.yaml +++ b/apps/grpo/qwen3_1_7b.yaml @@ -74,12 +74,12 @@ trainer: disable_loss_parallel: true checkpoint: enable: true - folder: "./checkpoint" # Directory to save or resume checkpoints (default: ./checkpoints) - load_step: -1 # Step to load from; -1 means load from initial_load_path. (default: -1) + folder: "/home/dxie/tmp/checkpoint" # Directory to save or resume checkpoints (default: ./checkpoints) + load_step: 200 # Step to load from; cannot be hf ckpt; -1 means load from initial_load_path. (default: -1) initial_load_path: hf://${model} # Optional: path or HF identifier to load model weights initially, will be ignored if `folder` exists - initial_load_in_hf: true + initial_load_in_hf: true # If true, interpret initial_load_path as a HuggingFace model repo last_save_in_hf: true - interval: 500 + interval: 2 async_mode: "disabled" activation_checkpoint: mode: selective From 863f3794c6580804eb6ceefc256a58180ab54bcf Mon Sep 17 00:00:00 2001 From: DNXie Date: Wed, 15 Oct 2025 14:22:24 -0700 Subject: [PATCH 3/5] clean up --- apps/grpo/main.py | 4 ++-- apps/grpo/qwen3_1_7b.yaml | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/apps/grpo/main.py b/apps/grpo/main.py index c67854283..52f6e65dc 100644 --- a/apps/grpo/main.py +++ b/apps/grpo/main.py @@ -355,7 +355,7 @@ async def main(cfg: DictConfig): start_version = int(cfg.trainer.checkpoint.load_step or 0) if start_version > 0: - # Ensure the trainer’s loaded checkpoint is materialized in torchstore at `start_version` + # Ensure the trainer’s loaded checkpoint is pushed to torchstore at `start_version` await trainer.push_weights.call(start_version) # Warm the policy to that exact version so new rollouts carry generator_version == start_version @@ -428,7 +428,7 @@ async def continuous_rollouts(): t.stop() async def continuous_training(): - training_step = max(cfg.trainer.checkpoint.load_step, 0) + training_step = start_version restart_tracer = True # Flag to control when to restart tracer while max_steps == -1 or training_step < max_steps: diff --git a/apps/grpo/qwen3_1_7b.yaml b/apps/grpo/qwen3_1_7b.yaml index 5576f6301..926676d7f 100644 --- a/apps/grpo/qwen3_1_7b.yaml +++ b/apps/grpo/qwen3_1_7b.yaml @@ -74,12 +74,12 @@ trainer: disable_loss_parallel: true checkpoint: enable: true - folder: "/home/dxie/tmp/checkpoint" # Directory to save or resume checkpoints (default: ./checkpoints) - load_step: 200 # Step to load from; cannot be hf ckpt; -1 means load from initial_load_path. (default: -1) + folder: ./checkpoint # Directory to save or resume checkpoints (default: ./checkpoints) + load_step: -1 # Step to load from; cannot be hf ckpt; -1 means load from initial_load_path. (default: -1) initial_load_path: hf://${model} # Optional: path or HF identifier to load model weights initially, will be ignored if `folder` exists initial_load_in_hf: true # If true, interpret initial_load_path as a HuggingFace model repo last_save_in_hf: true - interval: 2 + interval: 500 async_mode: "disabled" activation_checkpoint: mode: selective From 03b582392489125d4c1d61035c6f8c30167c4208 Mon Sep 17 00:00:00 2001 From: DNXie Date: Wed, 15 Oct 2025 14:31:15 -0700 Subject: [PATCH 4/5] cleanup --- apps/grpo/main.py | 3 +-- apps/grpo/qwen3_1_7b.yaml | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/apps/grpo/main.py b/apps/grpo/main.py index 52f6e65dc..7aeaaec15 100644 --- a/apps/grpo/main.py +++ b/apps/grpo/main.py @@ -353,7 +353,7 @@ async def main(cfg: DictConfig): ) print("Torchstore successfully initialized with local rank strategy") - start_version = int(cfg.trainer.checkpoint.load_step or 0) + start_version = max(cfg.trainer.checkpoint.load_step or 0) if start_version > 0: # Ensure the trainer’s loaded checkpoint is pushed to torchstore at `start_version` await trainer.push_weights.call(start_version) @@ -434,7 +434,6 @@ async def continuous_training(): while max_steps == -1 or training_step < max_steps: # Restart tracer when needed (initial start or after completing a training step) # Otherwise, we cannot measure time waiting for buffer - print(f"[DEBUG], training_step: {training_step}") if restart_tracer: t = Tracer("main_perf/continuous_training") t.start() diff --git a/apps/grpo/qwen3_1_7b.yaml b/apps/grpo/qwen3_1_7b.yaml index 926676d7f..748225510 100644 --- a/apps/grpo/qwen3_1_7b.yaml +++ b/apps/grpo/qwen3_1_7b.yaml @@ -111,7 +111,6 @@ ref_model: expert_parallel_degree: 1 checkpoint: enable: true - folder: "" initial_load_path: hf://${model} initial_load_in_hf: true From 1eeabe04ac499bdf97c07d6475f452df2f5dd76a Mon Sep 17 00:00:00 2001 From: DNXie Date: Wed, 15 Oct 2025 14:42:37 -0700 Subject: [PATCH 5/5] typo --- apps/grpo/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/grpo/main.py b/apps/grpo/main.py index 7aeaaec15..d5208c4f8 100644 --- a/apps/grpo/main.py +++ b/apps/grpo/main.py @@ -353,7 +353,7 @@ async def main(cfg: DictConfig): ) print("Torchstore successfully initialized with local rank strategy") - start_version = max(cfg.trainer.checkpoint.load_step or 0) + start_version = max(cfg.trainer.checkpoint.load_step, 0) if start_version > 0: # Ensure the trainer’s loaded checkpoint is pushed to torchstore at `start_version` await trainer.push_weights.call(start_version)