Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 31 additions & 11 deletions mlpstorage_py/benchmarks/dlio.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,26 @@ def _raise_unsupported_workload(self, workload_abs):
_OBJECT_URI_SCHEMES = frozenset({'s3', 's3a', 'az', 'gs'})
_LOCAL_URI_SCHEMES = frozenset({'file', 'direct'})

def _is_object_storage(self) -> bool:
"""True when ``storage.storage_type`` resolves to an object backend.

Reads the same signal as ``_check_storage_scheme_consistency`` —
what we actually told DLIO to use after ``_apply_object_storage_params``
has run — rather than the ``data_access_protocol`` CLI positional
(``file|object``). That makes the check robust to the
``--params storage.storage_type=s3`` path where an advanced user
wires up object storage manually under the ``file`` positional
instead of using the ``object`` positional. ``direct_fs`` (the
``--o-direct`` mode) is NOT object storage — it still resolves
to a local path and statvfs works.
"""
storage_type = (
self.params_dict.get('storage.storage_type')
or (self.combined_params or {}).get('storage', {}).get('storage_type')
or 'local'
)
return storage_type in self._OBJECT_STORAGE_TYPES

def _check_storage_scheme_consistency(self):
"""Fail fast on storage.storage_type vs data/checkpoint folder mismatch.

Expand Down Expand Up @@ -816,13 +836,13 @@ def _capacity_gate_destination(self) -> Optional[str]:
"""Return ``args.data_dir`` — the training dataset destination per
REQUIREMENTS.md CAP-01.

Object-mode runs (data_access_protocol == 'object') target an
``s3://`` URI; statvfs against a URI walks to filesystem root and
aborts with ``[E401] CAP-01: no valid parent``. Return None to
fire the A8 remote-backend escape hatch in ``_pre_execution_gate``.
See issue #568.
Object-storage runs have no local filesystem to statvfs; the URI
parent-walk in ``check_capacity_4field`` exhausts and aborts with
``[E401] CAP-01: no valid parent for s3://…``. Return ``None`` to
fire the A8 remote-backend escape hatch in
``_pre_execution_gate``. See issue #568.
"""
if getattr(self.args, 'data_access_protocol', None) == 'object':
if self._is_object_storage():
return None
return self.args.data_dir

Expand Down Expand Up @@ -972,14 +992,14 @@ def _capacity_gate_destination(self):
upstream CLI validation already requires checkpoint_folder for
real runs; this is defensive.
"""
# Object-storage runs target an s3:// URI; statvfs walks to root
# and aborts with [E401] CAP-01: no valid parent. A8 escape hatch —
# see issue #568.
if self._is_object_storage():
return None
cf = getattr(self.args, "checkpoint_folder", None)
if not cf:
return None
# Object-mode runs target an s3:// URI; statvfs against a URI
# walks to filesystem root and aborts with [E401] CAP-01: no
# valid parent. A8 escape hatch — see issue #568.
if getattr(self.args, 'data_access_protocol', None) == 'object':
return None
return os.path.join(cf, self.args.model)

def datasize(self):
Expand Down
3 changes: 3 additions & 0 deletions tests/integration/test_systemname_yaml_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -1609,6 +1609,9 @@ def test_checkpointing_uses_checkpoint_folder_joined_with_model_path(self):

bm = MagicMock(spec=CheckpointingBenchmark)
bm.args = SimpleNamespace(checkpoint_folder='/tmp/ck', model='llama3-8b')
# Issue #568: isolate from _is_object_storage so this A7 lock
# holds for the local path regardless of the helper.
bm._is_object_storage = MagicMock(return_value=False)
result = CheckpointingBenchmark._capacity_gate_destination(bm)
assert result == _os.path.join('/tmp/ck', 'llama3-8b')

Expand Down
151 changes: 114 additions & 37 deletions tests/unit/test_capacity_gate.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,37 +331,25 @@ def test_destination_is_args_data_dir(self):

bm = MagicMock(spec=TrainingBenchmark)
bm.args = SimpleNamespace(data_dir="/data/foo")
# Isolate this from _is_object_storage so the test holds whether
# or not the gate consults that helper. See issue #568.
bm._is_object_storage = MagicMock(return_value=False)
assert TrainingBenchmark._capacity_gate_destination(bm) == "/data/foo"

def test_destination_is_none_in_object_mode(self):
"""Issue #568: CAP-01 must fire the A8 escape hatch when the
destination is an object-storage URI. statvfs on s3:// walks the
parent chain to the filesystem root and aborts with
``[E401] CAP-01: no valid parent for s3://…``. ``data_access_protocol
== 'object'`` is the same signal _apply_object_storage_params keys on,
so returning None here mirrors the existing VectorDB/KVCache hatch.
def test_destination_is_none_when_object_storage(self):
"""Issue #568: object-mode runs target an s3:// URI; the local
statvfs in check_capacity_4field walks to filesystem root and
aborts with [E401] CAP-01: no valid parent. The destination
must return None so _pre_execution_gate fires the A8 escape
hatch and logs `CAP-01 skipped: destination not local`.
"""
from mlpstorage_py.benchmarks.dlio import TrainingBenchmark

bm = MagicMock(spec=TrainingBenchmark)
bm.args = SimpleNamespace(
data_dir="s3://mybucket/unet3d/data",
data_access_protocol="object",
)
bm.args = SimpleNamespace(data_dir="s3://mybucket/unet3d/data")
bm._is_object_storage = MagicMock(return_value=True)
assert TrainingBenchmark._capacity_gate_destination(bm) is None

def test_destination_is_data_dir_when_protocol_attribute_missing(self):
"""Defensive: not every code path attaches data_access_protocol to
args (older test fixtures, internal callers). Absence must NOT be
interpreted as object mode — fall through to the existing local
statvfs path.
"""
from mlpstorage_py.benchmarks.dlio import TrainingBenchmark

bm = MagicMock(spec=TrainingBenchmark)
bm.args = SimpleNamespace(data_dir="/data/foo")
assert TrainingBenchmark._capacity_gate_destination(bm) == "/data/foo"

def test_datasize_invokes_pre_execution_gate_before_calculate_training_data_size(self):
from mlpstorage_py.benchmarks.dlio import TrainingBenchmark

Expand Down Expand Up @@ -528,6 +516,8 @@ def test_destination_is_checkpoint_folder_joined_with_model(self):

bm = MagicMock(spec=CheckpointingBenchmark)
bm.args = SimpleNamespace(checkpoint_folder="/cp", model="llama3-8b")
# Isolate from _is_object_storage — see issue #568.
bm._is_object_storage = MagicMock(return_value=False)
result = CheckpointingBenchmark._capacity_gate_destination(bm)
assert result == os.path.join("/cp", "llama3-8b")

Expand All @@ -536,10 +526,11 @@ def test_destination_is_none_when_checkpoint_folder_empty(self):

bm = MagicMock(spec=CheckpointingBenchmark)
bm.args = SimpleNamespace(checkpoint_folder=None, model="llama3-8b")
bm._is_object_storage = MagicMock(return_value=False)
result = CheckpointingBenchmark._capacity_gate_destination(bm)
assert result is None

def test_destination_is_none_in_object_mode(self):
def test_destination_is_none_when_object_storage(self):
"""Issue #568: same A8 escape hatch as TrainingBenchmark — the
checkpoint_folder is an s3:// URI under `checkpointing run object`,
so the local statvfs would walk to root and abort with
Expand All @@ -549,23 +540,109 @@ def test_destination_is_none_in_object_mode(self):

bm = MagicMock(spec=CheckpointingBenchmark)
bm.args = SimpleNamespace(
checkpoint_folder="s3://mybucket/checkpoints",
model="llama3-8b",
data_access_protocol="object",
checkpoint_folder="s3://mybucket/checkpoints", model="llama3-8b"
)
bm._is_object_storage = MagicMock(return_value=True)
assert CheckpointingBenchmark._capacity_gate_destination(bm) is None

def test_destination_joined_when_protocol_attribute_missing(self):
"""Defensive: absence of data_access_protocol must NOT be read as
object mode — fall through to the existing join behavior so file/
POSIX runs and older callers keep working.
"""
from mlpstorage_py.benchmarks.dlio import CheckpointingBenchmark

bm = MagicMock(spec=CheckpointingBenchmark)
bm.args = SimpleNamespace(checkpoint_folder="/cp", model="llama3-8b")
result = CheckpointingBenchmark._capacity_gate_destination(bm)
assert result == os.path.join("/cp", "llama3-8b")
class TestDLIOIsObjectStorage:
"""Issue #568: ``DLIOBenchmark._is_object_storage`` — the signal both
Training/Checkpointing ``_capacity_gate_destination`` overrides consult
to decide whether to skip the local statvfs gate.

Reads ``storage.storage_type`` (the value we actually hand to DLIO
after ``_apply_object_storage_params`` has run), not the
``data_access_protocol`` CLI positional (``file|object``). That makes
the gate skip even for the ``--params storage.storage_type=s3`` edge
case where an advanced user wires up object storage manually under
the ``file`` positional instead of using the ``object`` positional.
"""

@staticmethod
def _bm_with_state(params_dict=None, combined_params=None):
from mlpstorage_py.benchmarks.dlio import DLIOBenchmark, TrainingBenchmark
bm = MagicMock(spec=TrainingBenchmark)
bm.params_dict = params_dict if params_dict is not None else {}
bm.combined_params = combined_params if combined_params is not None else {}
# MagicMock(spec=…) auto-mocks class attributes too, so the
# ``in self._OBJECT_STORAGE_TYPES`` membership check would land
# on a mock. Pin the real frozenset.
bm._OBJECT_STORAGE_TYPES = DLIOBenchmark._OBJECT_STORAGE_TYPES
return bm

def test_returns_true_for_s3(self):
from mlpstorage_py.benchmarks.dlio import DLIOBenchmark
bm = self._bm_with_state(params_dict={'storage.storage_type': 's3'})
assert DLIOBenchmark._is_object_storage(bm) is True

def test_returns_true_for_s3_torch(self):
from mlpstorage_py.benchmarks.dlio import DLIOBenchmark
bm = self._bm_with_state(params_dict={'storage.storage_type': 's3_torch'})
assert DLIOBenchmark._is_object_storage(bm) is True

def test_returns_false_for_local(self):
from mlpstorage_py.benchmarks.dlio import DLIOBenchmark
bm = self._bm_with_state(params_dict={'storage.storage_type': 'local'})
assert DLIOBenchmark._is_object_storage(bm) is False

def test_returns_false_for_direct_fs_odirect_mode(self):
"""--o-direct sets storage_type='direct_fs', which routes I/O
through s3dlio's direct:// scheme but still lands on a LOCAL
filesystem (O_DIRECT, bypasses page cache). statvfs is valid
there — must NOT skip CAP-01. See dlio.py _apply_odirect_params
(storage_type='direct_fs' comment)."""
from mlpstorage_py.benchmarks.dlio import DLIOBenchmark
bm = self._bm_with_state(params_dict={'storage.storage_type': 'direct_fs'})
assert DLIOBenchmark._is_object_storage(bm) is False

def test_returns_false_when_storage_type_unset(self):
from mlpstorage_py.benchmarks.dlio import DLIOBenchmark
bm = self._bm_with_state(params_dict={}, combined_params={})
assert DLIOBenchmark._is_object_storage(bm) is False

def test_falls_back_to_combined_params_when_params_dict_empty(self):
"""params_dict carries CLI --params overrides; combined_params
carries the resolved YAML config. Either source may declare the
storage backend, so both must be consulted (matches the lookup
pattern in _check_storage_scheme_consistency at dlio.py:413)."""
from mlpstorage_py.benchmarks.dlio import DLIOBenchmark
bm = self._bm_with_state(
params_dict={},
combined_params={'storage': {'storage_type': 's3'}},
)
assert DLIOBenchmark._is_object_storage(bm) is True

def test_params_dict_takes_precedence_over_combined_params(self):
"""CLI --params is the user's explicit override and must win
over the YAML default."""
from mlpstorage_py.benchmarks.dlio import DLIOBenchmark
bm = self._bm_with_state(
params_dict={'storage.storage_type': 's3'},
combined_params={'storage': {'storage_type': 'local'}},
)
assert DLIOBenchmark._is_object_storage(bm) is True

def test_storage_type_set_via_params_under_file_positional(self):
"""Locks the edge case the data_access_protocol signal would
miss: an advanced user passes the `file` positional but adds
`--params storage.storage_type=s3` (and the matching storage_root
/ storage_options) to wire up object storage manually. The gate
must still skip — there's no local filesystem to statvfs
regardless of which CLI form the user took to get there."""
from mlpstorage_py.benchmarks.dlio import DLIOBenchmark
bm = self._bm_with_state(params_dict={'storage.storage_type': 's3'})
# Even with the file positional resolved onto args, the resolved
# storage_type wins.
bm.args = SimpleNamespace(data_access_protocol='file')
assert DLIOBenchmark._is_object_storage(bm) is True

def test_handles_none_combined_params(self):
"""Defensive — early in __init__ combined_params may not yet
be set. The `or {}` guard at the call site must not blow up."""
from mlpstorage_py.benchmarks.dlio import DLIOBenchmark
bm = self._bm_with_state(params_dict={}, combined_params=None)
assert DLIOBenchmark._is_object_storage(bm) is False


class TestVectorDBBenchmarkRequiredBytes:
Expand Down
Loading