Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[data] implement per-op resource reservation #43026

Merged
merged 22 commits into from Feb 14, 2024

Conversation

raulchen
Copy link
Contributor

@raulchen raulchen commented Feb 7, 2024

Why are these changes needed?

Implement per-op resource reservation. This PR doesn't enable it by default. As it depends on #42930 and we also need to integrate it with backpressure policies.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Copy link
Member

@bveeramani bveeramani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM

Comment on lines 29 to 34
@classmethod
def zero(cls) -> "ExecutionResources":
"""Returns an ExecutionResources object with zero resources."""
if cls._ZERO is None:
cls._ZERO = ExecutionResources(0.0, 0.0, 0)
return cls._ZERO
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's our motivation for caching this?

Suggested change
@classmethod
def zero(cls) -> "ExecutionResources":
"""Returns an ExecutionResources object with zero resources."""
if cls._ZERO is None:
cls._ZERO = ExecutionResources(0.0, 0.0, 0)
return cls._ZERO
@classmethod
def zero(cls) -> "ExecutionResources":
"""Returns an ExecutionResources object with zero resources."""
return ExecutionResources(0.0, 0.0, 0)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One concern I have is that ExecutionResources are mutable.

>>> ExecutionResources.zero().cpu = 1
>>> ExecutionResources.zero()
ExecutionResources(cpu=1, gpu=0.0, object_store_memory=0)

This might become a gotcha.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch. we should make it immutable in the future.

def max(self, other: "ExecutionResources") -> "ExecutionResources":
"""Returns the maximum for each resource type."""
return ExecutionResources(
cpu=max(self.cpu or 0.0, other.cpu or 0.0),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: if both fields are None, do we expected the max to be zero? I'm wondering it it'd be more intuitive if the field would still be None

For example:

>>> result = ExecutionResources().max(ExecutionResources())
>>> result.cpu
0.0  # You might expect this to still be `None`?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is super confusing and annoying. but I think defaulting to 0 makes sense here.
The reason is that this ExecutionResources is used for 2 different purposes simultaneously:

  1. representing "resource usage": values should default to 0 in this case.
  2. representing "resource limits": values should default to inf in this case.

the min and max API should only be used for (1). I think maybe we should think more on how to sanitize this API and introduce (potentially breaking) changes before 2.10

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will use a follow-up PR to make these values non-nullable.

def min(self, other: "ExecutionResources") -> "ExecutionResources":
"""Returns the minimum for each resource type."""
return ExecutionResources(
cpu=min(self.cpu or float("inf"), other.cpu or float("inf")),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to max(), I'm wondering what the behavior should be if both self and other are None

>>> ExecutionResources().min(ExecutionResources())
ExecutionResources(cpu=inf, gpu=inf, object_store_memory=inf)

@@ -239,6 +239,10 @@ def __init__(
# The max number of blocks that can be buffered at the streaming generator of
# each `DataOpTask`.
self._max_num_blocks_in_streaming_gen_buffer = None
# Whether to enable ReservationOpResourceLimiter.
self.op_resource_reservation_enabled = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we planning on adding an environment variable for this? More generally, when don't we want to add an environment variable for a setting? Seems like most do right now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add environment varilable and add into parameter as other configs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will add an env var for this.
but regarding parameter of __init__, I don't think it's useful anymore. because there are too many parameters already. and now we already don't construct a DataContext with __init__ params.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but regarding parameter of init, I don't think it's useful anymore. because there are too many parameters already. and now we already don't construct a DataContext with init params.

We can clean it up later. @raulchen - is that you cannot add to __init__ because smoething is failing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh no, nothing was failing. I just think we should stop adding new params, and clean up the existing ones later.

return self._op_resource_limiter.get_op_limits(op)


class OpResourceLimiter(metaclass=ABCMeta):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
class OpResourceLimiter(metaclass=ABCMeta):
class OpResourceLimiter(abc.ABC):

Using ABC as a base class has essentially the same effect as specifying metaclass=abc.ABCMeta, but is simpler to type and easier to read.

Comment on lines 257 to 258

return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return

@@ -108,6 +121,8 @@ def get_global_limits(self) -> ExecutionResources:
gpu=gpu,
object_store_memory=object_store_memory,
)
if self._op_resource_limiter is not None:
self._op_resource_limiter.on_global_limits_updated(self._global_limits)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Felt confused at first about why we're calling on_global_limits_updated in get_global_limits. get_and_update_global_limits might be a clearer name, although maybe that's too verbose

Copy link
Contributor Author

@raulchen raulchen Feb 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the name of get_global_limits makes sense for the users of this function.
Basically this function is responsible for returning the current global limits. The internal cache is an implementation detail that users don't need to know.
And regarding on_global_limits_updated, from OpResourceLimiter's perspective, ResourceManager is responsible for calling this callback. When to call it is internal to ResourceManager.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually there is an issue, on_global_limits_updated may not get called before update_resources.
I've changed the code to call on_global_limits_updated within the OpResourceLimiter itself.

op.incremental_resource_usage()
)

def update_usages(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is update_usages always called whenever the global limits change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't need to. It only needs to be updated when ReousrceManager.update_usages is called.

"""Returns an ExecutionResources object with zero resources."""
if cls._ZERO is None:
cls._ZERO = ExecutionResources(0.0, 0.0, 0)
return cls._ZERO
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we always return a new object return ExecutionResources(0.0, 0.0, 0)? Worried about if the caller changes the value accidentally, and affects other callers.

"""
total = ExecutionResources()
if self.cpu is not None or other.cpu is not None:
total.cpu = (self.cpu or 0.0) - (other.cpu or 0.0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add a check when value is negative? what does it mean when the value is negative?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. I'll add a post-create check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops, I misunderstood your question. If the value is negative, it just means that self is less than other. I think it's okay here. For the concrete use cases, I'll do something like a.subtract(b).max(ExecutionResources.zero()) to make it non-negative.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, let's make sure do the .max() part. The negative execution resource is quite confusing.

Comment on lines +202 to +204
self._eligible_ops = [
op for op in self._resource_manager._topology if isinstance(op, MapOperator)
]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a TODO to handle sub-plan of map operators? i.e. map().map().repartition().map().map()...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed offline, we'll only enable this feature when the dataset only has map/limit/streaming_split. Because memory accounting is not correct for other ops yet.



class TestReservationOpResourceLimiter:
def test_basic(self, restore_data_context):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also add a unit test when the DAG has non-map operator as well?

Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
@bveeramani bveeramani self-assigned this Feb 13, 2024
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Copy link
Member

@bveeramani bveeramani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! LGTM!

Comment on lines +283 to +286
if op in self._op_limits:
return self._op_limits[op]
else:
return ExecutionResources.inf()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Optionally,

Suggested change
if op in self._op_limits:
return self._op_limits[op]
else:
return ExecutionResources.inf()
return self._op_limits.get(op, ExecutionResources.inf())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big deal, but the current way will be slightly more efficient, as inf() now still returns a new object each time.

2.5, float("inf"), 100
)

# Test global_limits exceeded.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: global_limits isn't exceeded in this test, right? 500 + 125 + 150 = 775 < 800? This is testing what happens when the shared memory is full?

@@ -138,3 +145,145 @@ def test_calculating_usage(self):
assert resource_manager.get_downstream_object_store_memory(o1) == 1700
assert resource_manager.get_downstream_object_store_memory(o2) == 1700
assert resource_manager.get_downstream_object_store_memory(o3) == 1000


class TestReservationOpResourceLimiter:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Not sure if I already missed this, but we could also add a test that we always reserve at minimum incremental_resource_usage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I missed that.

Copy link
Contributor

@c21 c21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

"""
total = ExecutionResources()
if self.cpu is not None or other.cpu is not None:
total.cpu = (self.cpu or 0.0) - (other.cpu or 0.0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, let's make sure do the .max() part. The negative execution resource is quite confusing.

Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
@raulchen raulchen merged commit ccf79b2 into ray-project:master Feb 14, 2024
8 of 9 checks passed
@raulchen raulchen deleted the per-op-resource-limit2 branch February 14, 2024 02:02
kevin85421 pushed a commit to kevin85421/ray that referenced this pull request Feb 17, 2024
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants