Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Datasets] Enable lazy execution by default #31286

Merged
merged 6 commits into from Jan 6, 2023
Merged

Conversation

c21
Copy link
Contributor

@c21 c21 commented Dec 22, 2022

Signed-off-by: Cheng Su scnju13@gmail.com

Why are these changes needed?

This PR is to enable lazy execution by default. See ray-project/enhancements#19 for motivation. The change includes:

  • Change Dataset constructor: Dataset.__init__(lazy: bool = True). Also remove defer_execution field, as it's no longer needed.
  • read_api.py:read_datasource() returns a lazy Dataset with computing the first input block.
  • Add ds.fully_executed() calls to required unit tests, to make sure they are passing.

TODO:

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@ericl
Copy link
Contributor

ericl commented Dec 22, 2022

Nice. Shall we also update the documentation in the same change?

@c21
Copy link
Contributor Author

c21 commented Dec 22, 2022

Shall we also update the documentation in the same change?

@ericl - I am thinking about in a separate PR for easier doc review (assuming there're more code change for fixing unit tests). But I can also do in same PR if people prefer.

Copy link
Contributor

@ericl ericl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just tried this out, seems to work well! A few thoughts:

  1. Currently, any execution will "cache" a snapshot of the final stage of blocks. Should we change this behavior to only "cache" on a call to fully_executed() or add an explicit cache() action?
  2. The str-form of Datasets will include "num_rows=?" and "schema=Unknown schema" a lot now. Can we change this to num_rows=<Pending execution> and schema=<Pending execution> for clarity? Better yet, we could improve the str-form to show the pending stages that will be executed.

These changes could go into separate PRs.

Edit: filed #31417 for (2)

@ericl ericl added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Jan 3, 2023
@clarkzinzow
Copy link
Contributor

@c21 We should also audit all benchmarks that involve Datasets, to ensure that setup operations that were previously executed eagerly are still executed eagerly, so we're not accidentally including e.g. reading or setup transformations when we're trying to benchmark a single downstream operation. The .map_batches() benchmarks come to mind.

@c21
Copy link
Contributor Author

c21 commented Jan 5, 2023

We should also audit all benchmarks that involve Datasets, to ensure that setup operations that were previously executed eagerly are still executed eagerly, so we're not accidentally including e.g. reading or setup transformations when we're trying to benchmark a single downstream operation. The .map_batches() benchmarks come to mind.

@clarkzinzow - thanks, will go over all nightly tests.

@c21
Copy link
Contributor Author

c21 commented Jan 5, 2023

Currently, any execution will "cache" a snapshot of the final stage of blocks. Should we change this behavior to only "cache" on a call to fully_executed() or add an explicit cache() action?

Discussed offline with @ericl, we shall postpone it later given impact is low.

The str-form of Datasets will include "num_rows=?" and "schema=Unknown schema" a lot now. Can we change this to num_rows= and schema= for clarity? Better yet, we could improve the str-form to show the pending stages that will be executed.

Will do it in a separate PR.

@c21 c21 changed the title [WIP][Datasets] Enable lazy execution by default [Datasets] Enable lazy execution by default Jan 5, 2023
@c21 c21 removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Jan 5, 2023
@c21
Copy link
Contributor Author

c21 commented Jan 5, 2023

All CI tests are passed. The failed nightly tests will be addressed in #31460 .

@@ -147,7 +147,8 @@ def test_automatic_enable_gpu_from_num_gpus_per_worker(shutdown_only):
with pytest.raises(
ValueError, match="DummyPredictor does not support GPU prediction"
):
_ = batch_predictor.predict(test_dataset, num_gpus_per_worker=1)
ds = batch_predictor.predict(test_dataset, num_gpus_per_worker=1)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the major behavior change needed to call out - batch_predictor.predict() becomes lazy now, and not force the execution. Users need to call batch_predictor.predict().fully_executed() to force the prediction to actually run.

The pros is we can chain multiple predictors now for free - batch_predictor2.predict(batch_predictor1.predict()).

The cons is this will probably be a surprising behavior change for current users.

If the impact of pros is not big, we can change to force execution inside batch_predictor.predict().

cc @ericl, @amogkam.

Copy link
Contributor

@amogkam amogkam Jan 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm strongly in favor of having the default behavior being calling fully_executed inside predict, at least for now.

Laziness for the chained case is something we can handle separately.

Copy link
Contributor

@ericl ericl Jan 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's force execution inside batch predictor? Chaining is not really a use case here, and you can always fall back to using the Data API directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, let me make the change to force execution inside predictor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to force execution inside batch predictor.

Copy link
Contributor

@clarkzinzow clarkzinzow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly LGTM, the big thing that we need resolve IMO is the semi-lazy read behavior. Semi-lazy reading (eagerly reading the first block/file) has been confusing for users, results in redundant reading when there's stage fusion or conversion to a pipeline, and I think we should try to get rid of it when switching to lazy execution by default, if possible. I think that it would be much better if we had fully lazy mode and eager mode, where semi-lazy execution is an execution optimization that is enabled when we determine we only need to compute a subset of blocks; these semantics should translate pretty well to our new execution planner and streaming execution model (e.g. limit pushdown, metadata peeking, streaming consumption of narrow op chains).

Instead of always computing the first block, I think that we should make the read fully lazy, and if the user immediately calls ds.schema() right after reading, we only then trigger reading of the first block (and only if the schema isn't already available from e.g. file metadata).

I think it should be pretty straightforward to move this lazy_block_list.compute_first_block()/progressive computation logic to the ExecutionPlan, since we can have ExecutionPlan.schema() (and other ops that only require the first block) trigger semi-lazy execution with e.g. a certain flag set, and if that flag is set AND the plan only consists of a read stage AND the input blocks are a lazy block list, we trigger minimal computation of that read stage.

@@ -580,7 +580,7 @@ def train_func(config):
read_dataset(data_path)
)

num_columns = len(train_dataset.schema().names)
num_columns = len(train_dataset.schema(fetch_if_missing=True).names)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we change the fetch_if_missing default value to True in ds.schema()? I.e. should we transparently trigger execution when fetching metadata on a lazy dataset?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we transparently trigger execution when fetching metadata on a lazy dataset?

Yes agreed. Do we want to do in a separate PR ? #31286 (comment) .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep doing that in a separate PR sounds good!

python/ray/data/_internal/plan.py Outdated Show resolved Hide resolved
python/ray/data/read_api.py Outdated Show resolved Hide resolved
python/ray/data/dataset.py Show resolved Hide resolved
python/ray/data/_internal/plan.py Show resolved Hide resolved
Comment on lines 338 to 339
block_list.compute_first_block()
block_list.ensure_metadata_for_first_block()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO we should get rid of this default behavior of always computing the first block and ensuring the metadata for the first block, and instead make the dataset fully lazy by default. We can still progressively launch read tasks to e.g. fetch the schema, see the first few rows, streaming iteration directly on the read, etc. We'd just move that logic into ExecutionPlan.schema().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, that seems fine as long as we still resolve the metadata for parquet and so on. This would only apply to JSON/CSV presumably. Maybe we can also add an optimized schema resolver for these file types that peeks at the header of the file only.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes agreed, we shall improve the schema resolver in the long term. I am 100% agree to get rid of always computing the first block. Do we want to do it in a separate PR? #31286 (comment)

clarkzinzow added a commit that referenced this pull request Jan 5, 2023
This PR is fixing the issue found in #31286. Previously we always eagerly clears up non-lazy input blocks (plan._in_blocks) when executing the plan. This is not safe as the input blocks might be used by downstream operations later.

Signed-off-by: Cheng Su <scnju13@gmail.com>
Co-authored-by: Clark Zinzow <clark@anyscale.com>
@ericl
Copy link
Contributor

ericl commented Jan 6, 2023

FWIW, getting rid of the first block reading would also help with integrating fully streaming execution (right now actually that breaks streaming actually). How about we do that as a separate PR followup from this though? I think this PR is already a pretty extensive change, and we should generally avoid mixing complex changes.

@c21
Copy link
Contributor Author

c21 commented Jan 6, 2023

FWIW, getting rid of the first block reading would also help with integrating fully streaming execution (right now actually that breaks streaming actually). How about we do that as a separate PR followup from this though? I think this PR is already a pretty extensive change, and we should generally avoid mixing complex changes.

SGTM. @clarkzinzow - WDYT?

btw I will make the batch_predictor.predict change in this PR.

Signed-off-by: Cheng Su <scnju13@gmail.com>
Signed-off-by: Cheng Su <scnju13@gmail.com>
Signed-off-by: Cheng Su <scnju13@gmail.com>
Signed-off-by: Cheng Su <scnju13@gmail.com>
Signed-off-by: Cheng Su <scnju13@gmail.com>
Signed-off-by: Cheng Su <scnju13@gmail.com>
@clarkzinzow
Copy link
Contributor

clarkzinzow commented Jan 6, 2023

How about we do that as a separate PR followup from this though?

@ericl @c21 As long as it's done as a P0 follow-up PR that we're sure will get in before the next release, that sounds good to me! I would normally say that we shouldn't enable lazy execution by default in master until we have the fully lazy semantics, but since we're actively iterating on the execution model and we have a good bit of time before the release, we can be pragmatic here.

In hindsight, it probably would have been better to do the following sequence of PRs:

  1. Move semi-lazy reading from an eager computing of the first block in read_datasource to a metadata peeking and streaming read optimization in the ExecutionPlan, which would keep the existing "eagerly compute first block" semantics for eager mode and would make lazy mode fully lazy.
  2. Enable lazy execution by default.
  3. Port metadata peeking optimization to plan optimization + streaming executor, since the policy of "only compute as many blocks as needed for operation" is really just a pushdown optimization rule + streaming execution.

Copy link
Contributor

@clarkzinzow clarkzinzow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Are we going to update documentation and/or improve the repr in this PR?

@c21
Copy link
Contributor Author

c21 commented Jan 6, 2023

As long as it's done as a P0 follow-up PR that we're sure will get in before the next release, that sounds good to me! I would normally say that we shouldn't enable lazy execution by default in master until we have the fully lazy semantics, but since we're actively iterating on the execution model and we have a good bit of time before the release, we can be pragmatic here.

@clarkzinzow - yeah agree here. The TODOs for this PR (1.Remove the behavior to eagerly compute first block for read, 2.improve the str/repr of dataset, 3.update documentation) are definitely P0 which I will work on next week immediately.

@ericl ericl merged commit 9cb9c0e into ray-project:master Jan 6, 2023
@c21 c21 deleted the lazy branch January 6, 2023 21:39
AmeerHajAli pushed a commit that referenced this pull request Jan 12, 2023
This PR is fixing the issue found in #31286. Previously we always eagerly clears up non-lazy input blocks (plan._in_blocks) when executing the plan. This is not safe as the input blocks might be used by downstream operations later.

Signed-off-by: Cheng Su <scnju13@gmail.com>
Co-authored-by: Clark Zinzow <clark@anyscale.com>
AmeerHajAli pushed a commit that referenced this pull request Jan 12, 2023
This PR is to enable lazy execution by default. See ray-project/enhancements#19 for motivation. The change includes:
* Change `Dataset` constructor: `Dataset.__init__(lazy: bool = True)`. Also remove `defer_execution` field, as it's no longer needed.
* `read_api.py:read_datasource()` returns a lazy `Dataset` with computing the first input block.
* Add `ds.fully_executed()` calls to required unit tests, to make sure they are passing.

TODO:
- [x] Fix all unit tests
- [x] #31459
- [x] #31460 
- [ ] Remove the behavior to eagerly compute first block for read
- [ ] #31417
- [ ] Update documentation
clarkzinzow pushed a commit that referenced this pull request Jan 19, 2023
…1460)

This is followup from #31286 (comment), here we audit all data nightly tests to make sure they are still working with lazy execution enabled by default.

Signed-off-by: Cheng Su <scnju13@gmail.com>
andreapiso pushed a commit to andreapiso/ray that referenced this pull request Jan 22, 2023
…y-project#31460)

This is followup from ray-project#31286 (comment), here we audit all data nightly tests to make sure they are still working with lazy execution enabled by default.

Signed-off-by: Cheng Su <scnju13@gmail.com>
Signed-off-by: Andrea Pisoni <andreapiso@gmail.com>
tamohannes pushed a commit to ju2ez/ray that referenced this pull request Jan 25, 2023
This PR is fixing the issue found in ray-project#31286. Previously we always eagerly clears up non-lazy input blocks (plan._in_blocks) when executing the plan. This is not safe as the input blocks might be used by downstream operations later.

Signed-off-by: Cheng Su <scnju13@gmail.com>
Co-authored-by: Clark Zinzow <clark@anyscale.com>
Signed-off-by: tmynn <hovhannes.tamoyan@gmail.com>
tamohannes pushed a commit to ju2ez/ray that referenced this pull request Jan 25, 2023
This PR is to enable lazy execution by default. See ray-project/enhancements#19 for motivation. The change includes:
* Change `Dataset` constructor: `Dataset.__init__(lazy: bool = True)`. Also remove `defer_execution` field, as it's no longer needed.
* `read_api.py:read_datasource()` returns a lazy `Dataset` with computing the first input block.
* Add `ds.fully_executed()` calls to required unit tests, to make sure they are passing.

TODO:
- [x] Fix all unit tests
- [x] ray-project#31459
- [x] ray-project#31460 
- [ ] Remove the behavior to eagerly compute first block for read
- [ ] ray-project#31417
- [ ] Update documentation

Signed-off-by: tmynn <hovhannes.tamoyan@gmail.com>
tamohannes pushed a commit to ju2ez/ray that referenced this pull request Jan 25, 2023
This PR is fixing the issue found in ray-project#31286. Previously we always eagerly clears up non-lazy input blocks (plan._in_blocks) when executing the plan. This is not safe as the input blocks might be used by downstream operations later.

Signed-off-by: Cheng Su <scnju13@gmail.com>
Co-authored-by: Clark Zinzow <clark@anyscale.com>
Signed-off-by: tmynn <hovhannes.tamoyan@gmail.com>
tamohannes pushed a commit to ju2ez/ray that referenced this pull request Jan 25, 2023
This PR is to enable lazy execution by default. See ray-project/enhancements#19 for motivation. The change includes:
* Change `Dataset` constructor: `Dataset.__init__(lazy: bool = True)`. Also remove `defer_execution` field, as it's no longer needed.
* `read_api.py:read_datasource()` returns a lazy `Dataset` with computing the first input block.
* Add `ds.fully_executed()` calls to required unit tests, to make sure they are passing.

TODO:
- [x] Fix all unit tests
- [x] ray-project#31459
- [x] ray-project#31460 
- [ ] Remove the behavior to eagerly compute first block for read
- [ ] ray-project#31417
- [ ] Update documentation

Signed-off-by: tmynn <hovhannes.tamoyan@gmail.com>
cassidylaidlaw pushed a commit to cassidylaidlaw/ray that referenced this pull request Mar 28, 2023
…y-project#31460)

This is followup from ray-project#31286 (comment), here we audit all data nightly tests to make sure they are still working with lazy execution enabled by default.

Signed-off-by: Cheng Su <scnju13@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants