Skip to content

refactor: in-process torchrun + DeepSpeed launchers; fix multi-GPU SFT/DPO/PPO spawn#12

Merged
timzsu merged 7 commits into
mainfrom
zsu/torchrun-inprocess
May 3, 2026
Merged

refactor: in-process torchrun + DeepSpeed launchers; fix multi-GPU SFT/DPO/PPO spawn#12
timzsu merged 7 commits into
mainfrom
zsu/torchrun-inprocess

Conversation

@timzsu
Copy link
Copy Markdown
Collaborator

@timzsu timzsu commented May 1, 2026

Purpose

Bandit follow-up (#6). Migrate the SFT / DPO / PPO distributed launchers to in-process torch.distributed.run.main and deepspeed.launcher.runner.main (the canonical entry points the torchrun and deepspeed console scripts themselves invoke). Swap B404B603 in the bandit skip list so every subprocess call site has a per-line written rationale instead of a blanket skip. After this PR, no # nosec TODO remains in src/.

Once the launcher swap was in, the multi-GPU spawn path was actually exercised end-to-end for the first time. Three pre-existing bugs surfaced and are fixed in this PR — see the second bullet group under Changes.

Changes

Launcher swap (the original PR scope)

  • src/worker/executors/utils/distributed.py (new) — run_torchrun(), run_deepspeed(), deepspeed_available(). Lazy DeepSpeed import; capability check tolerates the package's CUDA-op probe at init. run_torchrun passes --tee 3 so rank stdout+stderr land on the parent console and in the per-rank elastic log dir; without it the elastic agent reports rank crashes only as an opaque ChildFailedError with error_file: <N/A>.
  • sft_executor.py / dpo_executor.py / ppo_executor.py — route the multi-GPU spawn through the new helpers; drop import subprocess (and import shutil from SFT).
  • pyproject.toml[tool.bandit].skips: add B404, drop B603. Add deepspeed.* to mypy follow_untyped_imports.
  • checkpoints.py — drop the # nosec B404 — TODO on the import (now project-skipped); add per-line # nosec B603 — argv list, no shell=True, absolute path via shutil.which() on the two pigz/tar subprocess.run calls.
  • AGENTS.md — adds B603 rule bullet.

Multi-GPU path fixes (newly tested e2e)

  • sft_executor.py — SFT spawn was dumping the raw WorkerTaskMessage pydantic model with json.dump(task, fh), raising Object of type WorkerTaskMessage is not JSON serializable on every multi-GPU SFT submission. Match the DPO/PPO line: json.dump(task.model_dump(mode="json", by_alias=True), fh).
  • *_dist_entry.py (all three) — deserializer was task = json.load(fh), then executor.run(task, ...) called task.spec and crashed with 'dict' object has no attribute 'spec'. WorkerTaskMessage serializes through dedup_json in mode="json" and has a model_validator(mode="before") that auto-restores the deduped form on model_validate. Route the loaded JSON through WorkerTaskMessage.model_validate so the dist entry hands executor.run a real envelope.
  • tests/worker/test_distributed_launcher.py — round-trip test that locks the writer/reader contract; updated argv assertion to include --tee 3.
  • templates/{sft,dpo_training,ppo_training}_llama_1b_multi_gpu.yaml — three smoke templates so the multi-GPU path stays reachable from flowmesh workflow submit. Bring up a single worker with multiple cuda_devices via flowmesh stack worker up --config <yaml> (the up gpu --targets a,b shortcut creates one worker per GPU, not one multi-GPU worker).

Test Plan

  • CI.
  • Multi-GPU e2e on two H200 NVL GPUs:
    uv run flowmesh stack build flowmesh_worker_gpu
    uv run flowmesh stack worker up --config <multi-gpu-worker.yaml>   # cuda_devices: [<id-a>, <id-b>]
    uv run flowmesh workflow submit templates/sft_llama_1b_multi_gpu.yaml         # run_deepspeed (auto-DS ZeRO-2)
    uv run flowmesh workflow submit templates/dpo_training_llama_1b_multi_gpu.yaml # run_torchrun
    uv run flowmesh workflow submit templates/ppo_training_llama_1b_multi_gpu.yaml # run_torchrun
    
    All three reach the rank training loop. SFT (DeepSpeed) finishes in ~40s on a 16-sample subset; DPO finishes on the inline 5-pair preference set. PPO ranks load both policy and reward models, enter PPOTrainer.train(), and then hang inside accelerate.synchronize_rng_state (see caveat below).

Caveat — PPO hangs upstream

PPO does not finish under run_torchrun with our current trl / accelerate pin: both ranks reach PPOTrainer.train(), the data loader iterator calls accelerate.synchronize_rng_state, and that collective never returns. Reproduced on two different GPU pairs ([0,2] and [0,3]) so it is not a NVLink-topology quirk on this host. Stack from py-spy (both ranks identical):

synchronize_rng_state    (accelerate/utils/random.py:146)
synchronize_rng_states   (accelerate/utils/random.py:156)
__iter__                 (accelerate/data_loader.py:560)
repeat_generator         (trl/trainer/ppo_trainer.py:397)
train                    (trl/trainer/ppo_trainer.py:450)

This is not a regression from this PR: DPO uses the same run_torchrun entry point and finishes cleanly, and SFT (DeepSpeed branch) finishes too. The bug is in TRL's PPOTrainer.repeat_generator interaction with accelerate.synchronize_rng_state and is independent of the launcher swap. PR #12's scope (launcher swap + the three multi-GPU bugs the e2e surfaced) is verified end-to-end; the PPO upstream hang is left as a known issue to be addressed in a follow-up dep bump or trainer patch.


Pre-submission Checklist
  • I have read the contribution guidelines.
  • I have run pre-commit run --all-files and fixed any issues.
  • I have added or updated tests covering my changes (if applicable).
  • I have verified that uv run pytest tests/ passes locally.
  • If I changed shared schemas or proto definitions, I have checked downstream compatibility across Server and Worker. N/A.
  • If I changed the SDK or CLI, I have verified the affected packages work. N/A.
  • If this is a breaking change, I have prefixed the PR title with [BREAKING]. Not breaking.
  • I have updated documentation or config examples if user-facing behavior changed. AGENTS.md gains the B603 rule bullet.

…n.main

The training executors (SFT / DPO / PPO) launched multi-GPU runs by
shelling out through ``subprocess.check_call(["torchrun", ...])``, which
required a top-level ``import subprocess`` (B404, suppressed via
``# nosec``) and an implicit dependency on ``torchrun`` being on $PATH.

Adds ``worker.executors.utils.distributed.run_torchrun`` — a thin wrapper
that calls ``torch.distributed.run.main`` directly, the same entry point
``torchrun`` uses. Worker ranks are still spawned by torch's elastic
agent under the hood, so the runtime semantics are unchanged. The
``PYTHONPATH`` and launcher-flag env mutations are scoped to the call so
reusing an executor instance does not see the launcher flag pre-set.

DPO and PPO now have no subprocess imports at all. SFT still uses
subprocess for the DeepSpeed-CLI branch (handled separately under its
own nosec/TODO); the torchrun branch routes through the new helper.

Closes the torchrun half of the bandit B404 follow-up tracked in #6.

Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
@timzsu timzsu force-pushed the zsu/torchrun-inprocess branch from 58c874b to edadf60 Compare May 1, 2026 11:32
@timzsu timzsu changed the title refactor: launch torchrun in-process via torch.distributed.run.main refactor: launch torchrun + deepspeed in-process; drop pigz/tar shellout May 1, 2026
@timzsu timzsu force-pushed the zsu/torchrun-inprocess branch from 345c99f to 3bf8389 Compare May 1, 2026 11:45
…her.runner

Twin to the in-process torchrun migration. The DeepSpeed branch of SFT's
multi-GPU launcher now calls ``deepspeed.launcher.runner.main`` directly
(the canonical entry point — ``.venv/bin/deepspeed`` is literally
``from deepspeed.launcher.runner import main; main()``). Worker ranks are
still spawned by DeepSpeed's launcher under the hood, so the runtime
semantics are unchanged.

``shutil.which("deepspeed")`` is replaced with ``importlib.util.find_spec``
so the capability check matches the actual import path the in-process
call needs. The check tolerates DeepSpeed's CUDA-op probe at package
init: a CUDA-less host raises ``MissingCUDAException`` during spec
resolution, which we treat as "not available" and fall back to torchrun.

After this commit, ``sft_executor.py`` no longer imports ``subprocess``
or ``shutil``, closing the SFT torchrun + DeepSpeed B404 nosec entirely.
Three remaining B404 sites are gone (``sft_executor``, ``dpo_executor``,
``ppo_executor``); ``checkpoints.archive_model_dir`` still has its
pigz/tar accelerator (separate PR will install ``pigz`` in the GPU image
and document the B404 rationale instead of dropping the feature).

Adds three launcher unit tests covering ``run_deepspeed`` argv,
env-scoping, and the ``deepspeed_available()`` capability check on
hosts where DeepSpeed's package init fails.

Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
@timzsu timzsu force-pushed the zsu/torchrun-inprocess branch from 3bf8389 to 63c8482 Compare May 1, 2026 12:35
@timzsu timzsu changed the title refactor: launch torchrun + deepspeed in-process; drop pigz/tar shellout refactor: launch torchrun + DeepSpeed CLI in-process May 1, 2026
Brings the bandit policy in line with how subprocess shellouts are
actually governed:

- ``B404`` (subprocess module import) joins the project-skip list. The
  blanket import warning has no follow-up action — the actually-dangerous
  patterns are caught by ``B602`` (``shell=True``) and ``B607`` (partial
  executable path), both still enforced.
- ``B603`` (subprocess.* called with argv list) leaves the project-skip
  list. Every call site now needs a per-line ``# nosec B603`` with a
  one-line written rationale, so each shellout is visible at the call
  line rather than hidden behind a blanket skip.

After this commit, the only remaining ``# nosec`` annotations in
``src/`` are the two on the pigz/tar accelerator in
``checkpoints.archive_model_dir``, both with rationale (no TODO):
``argv list, no shell=True, absolute path via shutil.which()``. The
audit no longer carries any "TODO" nosecs — every silenced rule is
either a project-skip with rationale in ``[tool.bandit]`` or a per-line
nosec with rationale at the call site.

Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
@timzsu timzsu changed the title refactor: launch torchrun + DeepSpeed CLI in-process refactor: launch torchrun + DeepSpeed in-process; swap B404↔B603 skip May 1, 2026
@timzsu timzsu requested a review from kaiitunnz May 1, 2026 13:17
@timzsu timzsu force-pushed the zsu/torchrun-inprocess branch from 24da439 to a3fa6e5 Compare May 3, 2026 06:04
The pre-existing multi-GPU spawn path was untested ('multi-GPU worker is
not supported officially') and broken in three places that only show up
when the path actually fires.

- SFT spawn dumped the raw `WorkerTaskMessage` pydantic model with
  `json.dump(task, fh)`, which raises 'Object of type WorkerTaskMessage
  is not JSON serializable'. Match the DPO/PPO line:
  `json.dump(task.model_dump(mode='json', by_alias=True), fh)`.

- All three `*_dist_entry.py` modules deserialize the spec back via
  vanilla `json.load(fh)` and pass the resulting dict to
  `executor.run`, which calls `task.spec` and crashes with
  `'dict' object has no attribute 'spec'`. `WorkerTaskMessage`
  serializes through `dedup_json` in `mode='json'` and has a
  `model_validator(mode='before')` that auto-restores the deduped
  form on `model_validate`. Route the loaded JSON through
  `WorkerTaskMessage.model_validate` so the dist entry reaches
  `executor.run` with a real envelope.

- `run_torchrun` did not pass `--tee`/`--redirects`, so when a
  rank crashed the elastic agent reported only an opaque
  `ChildFailedError` with `error_file: <N/A>` and the rank stderr
  was silently lost. Pass `--tee 3` so rank stdout+stderr land on
  the parent console and in the per-rank elastic log dir.

Adds a round-trip test that locks the writer/reader contract and
ships three multi-GPU templates (SFT/DPO/PPO) so the path stays
reachable.

Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
@timzsu timzsu force-pushed the zsu/torchrun-inprocess branch from a3fa6e5 to 8364221 Compare May 3, 2026 06:11
@timzsu timzsu changed the title refactor: launch torchrun + DeepSpeed in-process; swap B404↔B603 skip refactor(worker): in-process torchrun + DeepSpeed launchers; fix multi-GPU SFT/DPO/PPO spawn May 3, 2026
@timzsu timzsu changed the title refactor(worker): in-process torchrun + DeepSpeed launchers; fix multi-GPU SFT/DPO/PPO spawn refactor: in-process torchrun + DeepSpeed launchers; fix multi-GPU SFT/DPO/PPO spawn May 3, 2026
Copy link
Copy Markdown
Collaborator

@kaiitunnz kaiitunnz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comments. PTAL.

Comment thread src/worker/executors/utils/distributed.py
Comment thread src/worker/executors/utils/distributed.py Outdated
Comment thread src/worker/executors/utils/distributed.py Outdated
Comment thread src/worker/executors/utils/distributed.py Outdated
Comment thread src/worker/executors/sft_executor.py Outdated
Comment thread templates/dpo_training_llama_1b_multi_gpu.yaml
Comment thread tests/worker/test_distributed_launcher.py Outdated
timzsu added 3 commits May 3, 2026 15:25
…riptions

Replaces the changelog-style narration with standalone documentation per
review feedback (kaiitunnz): the module and function docs now describe
what the helpers do rather than what they replace, and ``deepspeed_available``
no longer references "the previous implementation". Renames ``_REPO_ROOT``
to ``_SRC_DIR`` to match the path it resolves to.

Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
Drops the dict round-trip through ``json.dump(task.model_dump(...))`` in
favour of ``fh.write(task.model_dump_json(by_alias=True))`` per review
suggestion (kaiitunnz). Same on-disk shape, one fewer hop.

Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
- Add ``gpu.memory`` minimums to the three multi-GPU smoke templates
  (16Gi for SFT, 24Gi for DPO, 32Gi for PPO) so the scheduler picks a
  worker with enough VRAM for each trainer's working set
  (kaiitunnz on dpo_training_llama_1b_multi_gpu.yaml).
- Extract the test-only launcher env-var placeholder into a named
  constant ``_TEST_LAUNCHER_FLAG`` with a comment noting why the value
  is arbitrary (it just needs to not collide with the runtime
  ``KV_*_DISTRIBUTED`` flags) (kaiitunnz on
  test_distributed_launcher.py).

Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
@timzsu timzsu requested a review from kaiitunnz May 3, 2026 15:38
Copy link
Copy Markdown
Collaborator

@kaiitunnz kaiitunnz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@timzsu timzsu merged commit f0828e3 into main May 3, 2026
10 checks passed
@timzsu timzsu deleted the zsu/torchrun-inprocess branch May 3, 2026 16:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants