Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[data] streaming generator integration #37736

Merged
merged 1 commit into from
Aug 11, 2023

Conversation

raulchen
Copy link
Contributor

@raulchen raulchen commented Jul 24, 2023

Why are these changes needed?

This PR changes the MapOperator to use streaming generators, and move common code about task management from TaskPoolMapOperator and ActorPoolMapOperator to the base MapOperator class.

Note, ideally all operators should use streaming generators. But that requires a larger refactor, which will be done in follow-up PRs (see #37630).

Related issue number

#36444

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@@ -332,6 +335,27 @@ def process_completed_tasks(topology: Topology) -> None:
op = active_tasks.pop(ref)
op.notify_work_completed(ref)

if active_streaming_gens:
ready, _ = ray.wait(
Copy link
Contributor

@ericl ericl Jul 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably need to ray.wait() these together with the active_tasks for efficiency right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. I plan to convert everything to streaming gen. then there will be only one ray.wait.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found that ActorPoolMapOperator still need to use regular ObjectRefs. Unified them in one single ray.wait.

@raulchen raulchen changed the title [data] streaming generator integration prototype [wip][data] streaming generator integration Aug 2, 2023
@raulchen raulchen changed the title [wip][data] streaming generator integration [data] streaming generator integration Aug 3, 2023
# The generator should always yield 2 values (block and metadata) each time.
# If we get a StopIteration here, it means an error happened in the task.
# And in this case, the block_ref is the exception object.
# TODO(hchen): Ray Core should have a better interface for detecting and
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rkooo567 It looks like a bug that when retry_exceptions=True, gen._generator_task_exception is empty. So I have to use this workaround.

@stephanie-wang
Copy link
Contributor

Should we put this under a feature flag? A bit worried about regressions like the retry_exceptions thing.

@raulchen
Copy link
Contributor Author

raulchen commented Aug 4, 2023

Should we put this under a feature flag? A bit worried about regressions like the retry_exceptions thing.

The change is a bit too large to put under a feature flag. I think this should be fine. For correctness issues like the retry_exceptions param, unit tests should cover them. For perf impact, I'll double check the results of all benchmarks before merging.

@raulchen raulchen requested a review from a team as a code owner August 5, 2023 00:10
Copy link
Collaborator

@aslonnie aslonnie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(pr seems not related to ci or release test infra).

@@ -165,6 +165,7 @@ def main(
or os.environ.get("BUILDKITE_SOURCE", "manual") == "schedule"
or buildkite_branch.startswith("releases/")
)
report = 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will revert, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I wanted to manually run release tests and get all results. @can-anyscale said that only master and release branch will report results. So I had to temporarily change this.
BTW, is there a reason why we don't report other branches? It'd be great to avoid manually doing this.

@raulchen
Copy link
Contributor Author

raulchen commented Aug 6, 2023

All benchmark results (buildkite link). The former is this branch, and the latter is master. tfrecords-images-100-2048 seems to be regressed, otherwise should be okay. (update: running tfrecords-images-100-2048 again, perf is same as master. It could be that this test itself has some fluctuation. )

release_test_name test_case_name test_time throughput test_time throughput
read_images_benchmark_single_node.aws images-100-2048-rbg-jpg 1.3745465278625500   1.3416013717651400  
read_images_benchmark_single_node.aws images-100-2048-to-256-rbg-jpg 1.0301158428192100   1.0421686172485400  
read_images_benchmark_single_node.aws images-100-256-rbg-jpg 2.507178783416750   2.478651285171510  
read_images_benchmark_single_node.aws images-1000-miximages-1000-mix 0.45880886912345900   0.4426524043083190  
read_images_benchmark_single_node.aws images-imagenet-1g 17.029926300048800   13.084310531616200  
read_images_comparison_microbenchmark_single_node.aws ray_data   243.68563842773400   245.6484375
read_images_comparison_microbenchmark_single_node.aws ray_data_manual_load   332.1719055175780   333.2369384765630
read_images_comparison_microbenchmark_single_node.aws ray_data+dummy_np_transform   202.9370574951170   201.55650329589800
read_images_comparison_microbenchmark_single_node.aws ray_data+dummy_pyarrow_transform   223.16030883789100   225.13258361816400
read_images_comparison_microbenchmark_single_node.aws ray_data+transform   115.82697296142600   114.25790405273400
read_images_comparison_microbenchmark_single_node.aws ray_data+transform+zerocopy   123.05477142334000   123.59640502929700
read_parquet_benchmark_single_node.aws read-many-parquet-files-s3-1000-gzip 57.551910400390600   57.0578727722168  
read_parquet_benchmark_single_node.aws read-parquet-downsampled-nyc-taxi-2009-1-False 2.703848361968990   3.088991403579710  
read_parquet_benchmark_single_node.aws read-parquet-downsampled-nyc-taxi-2009-1-True 7.16847562789917   6.88452672958374  
read_parquet_benchmark_single_node.aws read-parquet-downsampled-nyc-taxi-2009-2-False 1.9989620447158800   2.074420928955080  
read_parquet_benchmark_single_node.aws read-parquet-downsampled-nyc-taxi-2009-2-True 1.5977108478546100   1.6103323698043800  
read_parquet_benchmark_single_node.aws read-parquet-downsampled-nyc-taxi-2009-4-False 1.3614615201950100   1.4458932876586900  
read_parquet_benchmark_single_node.aws read-parquet-downsampled-nyc-taxi-2009-4-True 1.9026415348053000   2.0650837421417200  
read_parquet_benchmark_single_node.aws read-parquet-random-data-1024-gzip 36.217491149902300   35.28858947753910  
read_parquet_benchmark_single_node.aws read-parquet-random-data-1024-snappy 29.211782455444300   28.45860481262210  
read_parquet_benchmark_single_node.aws read-parquet-random-data-128-gzip 9.592110633850100   9.692739486694340  
read_parquet_benchmark_single_node.aws read-parquet-random-data-128-snappy 7.027101993560790   7.100552082061770  
read_parquet_benchmark_single_node.aws read-parquet-random-data-8-gzip 4.916723728179930   5.388830661773680  
read_parquet_benchmark_single_node.aws read-parquet-random-data-8-snappy 3.794679641723630   3.808880567550660  
read_images_comparison_microbenchmark_single_node.aws tf_data   202.03773498535200   208.46630859375
read_images_comparison_microbenchmark_single_node.aws tf_data+transform   43.58201599121090   42.46131134033200
read_tfrecords_benchmark_single_node.aws tfrecords-images-100-2048 2.122201442718510   0.9023056030273440  
read_tfrecords_benchmark_single_node.aws tfrecords-images-100-256 0.1036926656961440   0.1052628830075260  
read_tfrecords_benchmark_single_node.aws tfrecords-images-1000-mix 0.18858177959919000   0.1073770597577100  
read_tfrecords_benchmark_single_node.aws tfrecords-random-bytes-1g 12.811789512634300   12.847920417785600  
read_tfrecords_benchmark_single_node.aws tfrecords-random-float-1g 127.39654541015600   127.54745483398400  
read_tfrecords_benchmark_single_node.aws tfrecords-random-int-1g 138.7222137451170   137.96482849121100  
read_images_comparison_microbenchmark_single_node.aws torch   137.11990356445300   142.5059356689450
read_images_comparison_microbenchmark_single_node.aws torch+transform   333.5290222167970   335.42523193359400
parquet_metadata_resolution.aws   33.86677169799810   40.26236343383790  
streaming_data_ingest_benchmark_1tb.aws   49.132667541503900   45.40182876586910  
streaming_data_ingest_benchmark_100gb_gpu_early_stop.aws   25.856122970581100   22.30063247680660  
streaming_data_ingest_benchmark_100gb_gpu.aws   36.999671936035200   35.67235565185550  

Copy link
Contributor

@ericl ericl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me at a high level, though I didn't do a detailed review. Consider breaking this down into a refactoring PR and the main change for streaming support.


@abstractmethod
def on_waitable_ready(self):
"""Called when the waitable is ready."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is called multiple times for the streaming refs right? Should clarify that in the comment.

task = _TaskState(bundle)
self._tasks[ref] = (task, actor)
self._handle_task_submitted(task)
# Note, for some reaosn, if we don't define a new variable,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Note, for some reaosn, if we don't define a new variable,
# Note, for some reason, if we don't define a new variable,

@@ -63,7 +69,12 @@ def __init__(
self._output_queue: _OutputQueue = None
# Output metadata, added to on get_next().
self._output_metadata: List[BlockMetadata] = []

# All active `DataOptasks`s.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# All active `DataOptasks`s.
# All active `DataOpTasks`s.


def _task_done_callback():
nonlocal task_index
nonlocal inputs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think an alternate pattern to bind these variables correctly is the following:

def _task_done(task_index, inputs):
    ...
    
    
DataOpTask(..., lambda task_index=task_index, inputs=inputs: _task_done(task_index, inputs))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion!

@ericl ericl added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Aug 8, 2023
@raulchen
Copy link
Contributor Author

raulchen commented Aug 9, 2023

During benchmarking a stress test, we found a race condition bug of streaming generator, which is being fixed by #38258

Copy link
Contributor

@c21 c21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

"""

@abstractmethod
def get_waitable(self) -> Union[ray.ObjectRef, StreamingObjectRefGenerator]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: def get_waitable(self) -> Waitable?

Comment on lines +257 to +259
# 2. This method should only take a block-processing function as input,
# instead of a streaming generator. The logic of submitting ray tasks
# can also be capsulated in the base class.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes this makes more sense to me, right now _submit_data_task is just doing bookkeeping here, the task already being submitted when creating gen.

wip

Signed-off-by: Hao Chen <chenh1024@gmail.com>

handle stop

Signed-off-by: Hao Chen <chenh1024@gmail.com>

destroy

Signed-off-by: Hao Chen <chenh1024@gmail.com>

wip

Signed-off-by: Hao Chen <chenh1024@gmail.com>

runnable

Signed-off-by: Hao Chen <chenh1024@gmail.com>

fix

Signed-off-by: Hao Chen <chenh1024@gmail.com>

fix exception handling

Signed-off-by: Hao Chen <chenh1024@gmail.com>

fix exception handling

Signed-off-by: Hao Chen <chenh1024@gmail.com>

fix

Signed-off-by: Hao Chen <chenh1024@gmail.com>

fix

Signed-off-by: Hao Chen <chenh1024@gmail.com>

fix metrics

Signed-off-by: Hao Chen <chenh1024@gmail.com>

optimize output queue memory

Signed-off-by: Hao Chen <chenh1024@gmail.com>

fix exception handling

Signed-off-by: Hao Chen <chenh1024@gmail.com>

reduce wait timeout

Signed-off-by: Hao Chen <chenh1024@gmail.com>

fix destroy

Signed-off-by: Hao Chen <chenh1024@gmail.com>

Revert "fix exception handling"

This reverts commit d04e69b.

fix metrics test

Signed-off-by: Hao Chen <chenh1024@gmail.com>

fix bulk executor

Signed-off-by: Hao Chen <chenh1024@gmail.com>

run_op_tasks_sync only_existing

Signed-off-by: Hao Chen <chenh1024@gmail.com>

fix

Signed-off-by: Hao Chen <chenh1024@gmail.com>

refine physcial_operator and map_operator

Signed-off-by: Hao Chen <chenh1024@gmail.com>

fix

Signed-off-by: Hao Chen <chenh1024@gmail.com>

refine actor map operator

Signed-off-by: Hao Chen <chenh1024@gmail.com>

refine

Signed-off-by: Hao Chen <chenh1024@gmail.com>

comment

Signed-off-by: Hao Chen <chenh1024@gmail.com>

comment

Signed-off-by: Hao Chen <chenh1024@gmail.com>

lint

Signed-off-by: Hao Chen <chenh1024@gmail.com>

lint

Signed-off-by: Hao Chen <chenh1024@gmail.com>

lint

Signed-off-by: Hao Chen <chenh1024@gmail.com>

trace_allocation

Signed-off-by: Hao Chen <chenh1024@gmail.com>

revert outqueue memory size

Signed-off-by: Hao Chen <chenh1024@gmail.com>

handle all existing outputs

Signed-off-by: Hao Chen <chenh1024@gmail.com>

lint

Signed-off-by: Hao Chen <chenh1024@gmail.com>

fix streaming gen

Signed-off-by: Hao Chen <chenh1024@gmail.com>

lambda

Signed-off-by: Hao Chen <chenh1024@gmail.com>

comments

Signed-off-by: Hao Chen <chenh1024@gmail.com>

Revert "fix streaming gen"

This reverts commit a218cad0e9610e003c6b175951582527c574ade7.

capture variable

Signed-off-by: Hao Chen <chenh1024@gmail.com>

fix

Signed-off-by: Hao Chen <chenh1024@gmail.com>

lint

Signed-off-by: Hao Chen <chenh1024@gmail.com>
@raulchen
Copy link
Contributor Author

raulchen commented Aug 11, 2023

The CI failure (test_iter_batches_no_spilling_upon_prior_transformation) is an existing issue about DatasetPipeline. But somehow this PR makes it more likely to happen. I've figured out the fix. Will merge this PR first and submit another PR to fix the issue today.

@raulchen raulchen merged commit 391dbbd into ray-project:master Aug 11, 2023
2 checks passed
@raulchen raulchen deleted the streaming-gen branch August 11, 2023 16:58
NripeshN pushed a commit to NripeshN/ray that referenced this pull request Aug 15, 2023
This PR changes the MapOperator to use streaming generators, and move common code about task management from `TaskPoolMapOperator` and `ActorPoolMapOperator` to the base `MapOperator` class.

Note, ideally all operators should use streaming generators. But that requires a larger refactor, which will be done in follow-up PRs (see ray-project#37630).

Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: NripeshN <nn2012@hw.ac.uk>
arvind-chandra pushed a commit to lmco/ray that referenced this pull request Aug 31, 2023
This PR changes the MapOperator to use streaming generators, and move common code about task management from `TaskPoolMapOperator` and `ActorPoolMapOperator` to the base `MapOperator` class.

Note, ideally all operators should use streaming generators. But that requires a larger refactor, which will be done in follow-up PRs (see ray-project#37630).

Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
vymao pushed a commit to vymao/ray that referenced this pull request Oct 11, 2023
This PR changes the MapOperator to use streaming generators, and move common code about task management from `TaskPoolMapOperator` and `ActorPoolMapOperator` to the base `MapOperator` class.

Note, ideally all operators should use streaming generators. But that requires a larger refactor, which will be done in follow-up PRs (see ray-project#37630).

Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Victor <vctr.y.m@example.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
@author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants