-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Data] Introduce concurrency
argument to replace ComputeStrategy in map-like APIs
#41461
Conversation
Will update all documentation + unit test code after we agree on the API change (o.w. too many places to change). |
Sorry, I don't have the context here.
What is the issue with ActorPoolStrategy? |
This was coming from batch inference CUJ feedback, users found it's unnecessary to learn a separate class and import |
Also, normal Data users may not know the differences between Ray tasks and actors. Would be better to not expose the implementation details. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, really like this API improvement!
Can you add unit tests to check for the concurrency
set but fn is not callable class cases?
@stephanie-wang, yes plan to add all unit test for the new |
@@ -115,19 +100,87 @@ uses tasks by default. | |||
.map_batches(increase_brightness) | |||
) | |||
|
|||
.. _transforming_data_actors: | |||
.. _configure_batch_format: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Github somehow shows the change, but this section (Configuring batch format
) is not changed.
.map_batches(drop_nas, batch_format="pandas") | ||
) | ||
|
||
Configuring batch size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Github somehow shows the change, but this section (Configuring batch size
) is not changed.
All comments are addressed, please take another look whenever you have time, thanks! |
python/ray/data/_internal/util.py
Outdated
# Check if `fn` is a function or not. | ||
# NOTE: use `inspect.isfunction(fn)` instead of `instanceof(fn, CallableClass)`, | ||
# because latter returns False for an object instance of callable class. | ||
if inspect.isfunction(fn): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the major behavior change. Previously we rely on checking if isinstance(fn, CallableClass)
, but it's not right when creating an object for the callable class. Changed to use inspect.isfunction(fn)
to check if it's a function, which is more stable. Added a unit test to cover it.
import ray
from ray.data.block import CallableClass
def foo(x):
return x
class BarClass:
def __init__(self, x):
self._x = x
def __call__(self, x):
return x
>>> isinstance(foo, CallableClass)
False
>>> isinstance(BarClass, CallableClass)
True
>>> isinstance(BarClass(1), CallableClass)
False # <-- This does not work with current logic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this intended though? The issue is that usually BarClass(1)
is a bug, since it means the user is instantiating the model on the driver. It also requires being able to serialize the instance, which is usually problematic.
By allowing this it seems that we are opening the API to more confusing / performance issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ericl - ah that makes sense. This was coming from one of test failure - https://github.com/ray-project/ray/blob/master/doc/source/ray-core/_examples/datasets_train/datasets_train.py#L702 . I changed the test code instead.
btw on master, we always allow to run this kind of object instance (as a function):
>>> class BarClass:
... def __init__(self, x):
... self._x = x
... def __call__(self, x):
... return x
...
>>>
>>> fn = BarClass(1)
>>> ds = ray.data.range(10)
>>> ds = ds.map(fn)
>>> ds.take_all()
[{'id': 0}, {'id': 2}, {'id': 3}, {'id': 5}, {'id': 7}, {'id': 8}, {'id': 9}, {'id': 1}, {'id': 4}, {'id': 6}]
Are you thinking we should explicitly disallow and throw error if user passes an object instance? I am a little hesitating given this is always allowed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, in this case, I think we should probably raise an error since it's definitely not doing what the user is expecting--- the __init__
won't be cached even though it looks like they're passing in an actor class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ericl - updated to throw error, and added a unit test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually hit more unit test failure after making the change. Found out the inspect.isfunction()
only works for plain function, but not works for class method, and partial function. For all of our Preprocessor classes, the function is passed as class method - Preprocessor._transform_pandas/_transform_numpy
. Example of test failure is here
So I reverted the change to not throw error here, and added a TODO. Prefer to handle this case in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the Preprocessor classes, you could wrap the class method in a lambda to fix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for our own code, we can work around with lambda, given we know the function signature, and what arguments to pass. What about user-defined arbitrary class method? Also I notice inspect.isfunction()
does not work for partial function. I am open to adopt another approach if anything better I am not aware of.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should allow callable instances, for the reasons above.
Signed-off-by: Cheng Su <scnju13@gmail.com>
Signed-off-by: Cheng Su <scnju13@gmail.com>
Signed-off-by: Cheng Su <scnju13@gmail.com>
Signed-off-by: Cheng Su <scnju13@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One more request: with this API change, can we disallow specifying unbounded autoscaling, e.g., None
? The reason is that the vast majority of the time, users get a better experience using a fixed sized pool. So we want to encourage this as the default, and make sure that autoscaling is an advanced feature users explicitly opt into.
The only reason this wasn't done before was backwards compatibility.
@ericl - what's the good default value you are thinking? I am thinking we can use the fixed actor pool size with 1 actor. But this would be quite slow, and not usable, so users have to set a Another option is to always request users to provide a |
Signed-off-by: Cheng Su <scnju13@gmail.com>
Signed-off-by: Cheng Su <scnju13@gmail.com>
Changed to require |
All comments are addressed except for this request - #41461 (comment) . Can you help take another look? Thanks @ericl and @stephanie-wang. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approve docs changes.
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
# Test concurrency not set. | ||
result = ds.map(udf).take_all() | ||
assert sorted(extract_values("id", result)) == list(range(10)), result | ||
error_message = "``concurrency`` must be specified when using a callable class." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
This PR is to fix the unit tests failure (was marked as flaky so not shown up in previous PR) - #41461. Signed-off-by: Cheng Su <scnju13@gmail.com>
Why are these changes needed?
Generated doc for review - https://anyscale-ray--41461.com.readthedocs.build/en/41461/data/transforming-data.html#transforming-with-python-class .
This PR is to add an extra
concurrency
argument into all map-like APIs (map_batches
,map
,filter
,flat_map
,add_column
,drop_columns
,select_columns
), with the motivation to deprecatecompute
argument.The typing for new
concurrency
isSo it allows user to set a fixed-sized actors pool, or an auto-scaling actors pool. For 2.9, the
compute
argument would still work, but will print out a warning message for users to migrate to useconcurrency
. So this PR does not break any existing code and maintains backward compatibility.Several other alternatives:
Use two arguments
min_concurrency
,max_concurrency
:max_concurrency
is already a reserved parameter for Ray Core. This represents the number of concurrent actor tasks in Ray Core. So this would introduce extra confusion for users. In addition, we are recommending our users to use a fixed-sized actors pool for now. These two arguments are only useful for auto-scaling actors pool.Introduce a class like
ConcurrencyOption
: Do not see a need right now, and it would go back to have same issue withActorPoolStrategy
. We can always overload the type ofconcurrency
and add more new types later, without breaking backward compatibility.Overload the type of existing argument
compute
: This would also work and requires minimal change from user side. The naming ofcompute
is more vague thanconcurrency
though.Related issue number
#40725
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.