Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[data] streaming generator integration #37736

Merged
merged 1 commit into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions python/ray/data/_internal/execution/bulk_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,16 +98,20 @@ def _naive_run_until_complete(op: PhysicalOperator) -> List[RefBundle]:
The list of output ref bundles for the operator.
"""
output = []
tasks = op.get_work_refs()
tasks = op.get_active_tasks()
if tasks:
bar = ProgressBar(op.name, total=op.num_outputs_total())
while tasks:
waitable_to_tasks = {task.get_waitable(): task for task in tasks}
done, _ = ray.wait(
tasks, num_returns=len(tasks), fetch_local=True, timeout=0.1
list(waitable_to_tasks.keys()),
num_returns=len(tasks),
fetch_local=True,
timeout=0.1,
)
for ready in done:
op.notify_work_completed(ready)
tasks = op.get_work_refs()
waitable_to_tasks[ready].on_waitable_ready()
tasks = op.get_active_tasks()
while op.has_next():
bar.update(1)
output.append(op.get_next())
Expand Down
136 changes: 114 additions & 22 deletions python/ray/data/_internal/execution/interfaces/physical_operator.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,118 @@
from typing import Callable, Dict, List, Optional
from abc import ABC, abstractmethod
from typing import Callable, Dict, List, Optional, Union

import ray
from .ref_bundle import RefBundle
from ray._raylet import StreamingObjectRefGenerator
from ray.data._internal.execution.interfaces.execution_options import (
ExecutionOptions,
ExecutionResources,
)
from ray.data._internal.logical.interfaces import Operator
from ray.data._internal.stats import StatsDict

# TODO(hchen): Ray Core should have a common interface for these two types.
Waitable = Union[ray.ObjectRef, StreamingObjectRefGenerator]


class OpTask(ABC):
"""Abstract class that represents a task that is created by an PhysicalOperator.

The task can be either a regular task or an actor task.
"""

@abstractmethod
def get_waitable(self) -> Waitable:
"""Return the ObjectRef or StreamingObjectRefGenerator to wait on."""
pass

@abstractmethod
def on_waitable_ready(self):
"""Called when the waitable is ready.

This method may get called multiple times if the waitable is a
streaming generator.
"""
pass


class DataOpTask(OpTask):
"""Represents an OpTask that handles Block data."""

def __init__(
self,
streaming_gen: StreamingObjectRefGenerator,
output_ready_callback: Callable[[RefBundle], None],
task_done_callback: Callable[[], None],
):
"""
Args:
streaming_gen: The streaming generator of this task. It should yield blocks.
output_ready_callback: The callback to call when a new RefBundle is output
from the generator.
task_done_callback: The callback to call when the task is done.
"""
# TODO(hchen): Right now, the streaming generator is required to yield a Block
# and a BlockMetadata each time. We should unify task submission with an unified
# interface. So each individual operator don't need to take care of the
# BlockMetadata.
self._streaming_gen = streaming_gen
self._output_ready_callback = output_ready_callback
self._task_done_callback = task_done_callback

def get_waitable(self) -> StreamingObjectRefGenerator:
return self._streaming_gen

def on_waitable_ready(self):
# Handle all the available outputs of the streaming generator.
while True:
try:
block_ref = self._streaming_gen._next_sync(0)
if block_ref.is_nil():
# The generator currently doesn't have new output.
# And it's not stopped yet.
return
except StopIteration:
self._task_done_callback()
return

try:
meta = ray.get(next(self._streaming_gen))
except StopIteration:
# The generator should always yield 2 values (block and metadata)
# each time. If we get a StopIteration here, it means an error
# happened in the task.
# And in this case, the block_ref is the exception object.
# TODO(hchen): Ray Core should have a better interface for
# detecting and obtaining the exception.
ex = ray.get(block_ref)
self._task_done_callback()
raise ex
self._output_ready_callback(
RefBundle([(block_ref, meta)], owns_blocks=True)
)


class MetadataOpTask(OpTask):
"""Represents an OpTask that only handles metadata, instead of Block data."""

def __init__(
self, object_ref: ray.ObjectRef, task_done_callback: Callable[[], None]
):
"""
Args:
object_ref: The ObjectRef of the task.
task_done_callback: The callback to call when the task is done.
"""
self._object_ref = object_ref
self._task_done_callback = task_done_callback

def get_waitable(self) -> ray.ObjectRef:
return self._object_ref

def on_waitable_ready(self):
self._task_done_callback()


class PhysicalOperator(Operator):
"""Abstract class for physical operators.
Expand Down Expand Up @@ -63,7 +167,7 @@ def completed(self) -> bool:
"""
return (
self._inputs_complete
and len(self.get_work_refs()) == 0
and self.num_active_tasks() == 0
and not self.has_next()
) or self._dependents_complete

Expand Down Expand Up @@ -176,13 +280,16 @@ def get_next(self) -> RefBundle:
"""
raise NotImplementedError

def get_work_refs(self) -> List[ray.ObjectRef]:
"""Get a list of object references the executor should wait on.
def get_active_tasks(self) -> List[OpTask]:
"""Get a list of the active tasks of this operator."""
return []

def num_active_tasks(self) -> int:
"""Return the number of active tasks.

When a reference becomes ready, the executor must call
`notify_work_completed(ref)` to tell this operator of the state change.
Subclasses can override this as a performance optimization.
"""
return []
return len(self.get_active_tasks())

def throttling_disabled(self) -> bool:
"""Whether to disable resource throttling for this operator.
Expand All @@ -193,28 +300,13 @@ def throttling_disabled(self) -> bool:
"""
return False

def num_active_work_refs(self) -> int:
"""Return the number of active work refs.

Subclasses can override this as a performance optimization.
"""
return len(self.get_work_refs())

def internal_queue_size(self) -> int:
"""If the operator has an internal input queue, return its size.

This is used to report tasks pending submission to actor pools.
"""
return 0

def notify_work_completed(self, work_ref: ray.ObjectRef) -> None:
"""Executor calls this when the given work is completed and local.

This must be called as soon as the operator is aware that `work_ref` is
ready.
"""
raise NotImplementedError

def shutdown(self) -> None:
"""Abort execution and release all resources used by this operator.

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import collections
from dataclasses import dataclass
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Union
from typing import Any, Callable, Dict, Iterator, List, Optional, Union

import ray
from ray._raylet import ObjectRefGenerator
from ray.data._internal.compute import ActorPoolStrategy
from ray.data._internal.dataset_logger import DatasetLogger
from ray.data._internal.execution.interfaces import (
Expand All @@ -14,11 +13,7 @@
RefBundle,
TaskContext,
)
from ray.data._internal.execution.operators.map_operator import (
MapOperator,
_map_task,
_TaskState,
)
from ray.data._internal.execution.operators.map_operator import MapOperator, _map_task
from ray.data._internal.execution.util import locality_string
from ray.data.block import Block, BlockMetadata, _CallableClassProtocol
from ray.data.context import DataContext
Expand Down Expand Up @@ -83,11 +78,6 @@ def __init__(

# Create autoscaling policy from compute strategy.
self._autoscaling_policy = autoscaling_policy
# A map from task output futures to task state and the actor on which its
# running.
self._tasks: Dict[
ObjectRef[ObjectRefGenerator], Tuple[_TaskState, ray.actor.ActorHandle]
] = {}
# A pool of running actors on which we can execute mapper tasks.
self._actor_pool = _ActorPool(autoscaling_policy._config.max_tasks_in_flight)
# A queue of bundles awaiting dispatch to actors.
Expand All @@ -96,7 +86,6 @@ def __init__(
self._cls = None
# Whether no more submittable bundles will be added.
self._inputs_done = False
self._next_task_idx = 0

def get_init_fn(self) -> Callable[[], None]:
return self._init_fn
Expand Down Expand Up @@ -143,7 +132,23 @@ def _start_actor(self):
assert self._cls is not None
ctx = DataContext.get_current()
actor = self._cls.remote(ctx, src_fn_name=self.name, init_fn=self._init_fn)
self._actor_pool.add_pending_actor(actor, actor.get_location.remote())
res_ref = actor.get_location.remote()

def _task_done_callback(res_ref):
# res_ref is a future for a now-ready actor; move actor from pending to the
# active actor pool.
has_actor = self._actor_pool.pending_to_running(res_ref)
if not has_actor:
# Actor has already been killed.
return
# A new actor has started, we try to dispatch queued tasks.
self._dispatch_tasks()

self._submit_metadata_task(
res_ref,
lambda: _task_done_callback(res_ref),
)
self._actor_pool.add_pending_actor(actor, res_ref)

def _add_bundled_input(self, bundle: RefBundle):
self._bundle_queue.append(bundle)
Expand All @@ -170,14 +175,25 @@ def _dispatch_tasks(self):
# Submit the map task.
bundle = self._bundle_queue.popleft()
input_blocks = [block for block, _ in bundle.blocks]
ctx = TaskContext(task_idx=self._next_task_idx)
ref = actor.submit.options(num_returns="dynamic", name=self.name).remote(
ctx = TaskContext(task_idx=self._next_data_task_idx)
gen = actor.submit.options(num_returns="streaming", name=self.name).remote(
self._transform_fn_ref, ctx, *input_blocks
)
self._next_task_idx += 1
task = _TaskState(bundle)
self._tasks[ref] = (task, actor)
self._handle_task_submitted(task)

def _task_done_callback(actor_to_return):
# Return the actor that was running the task to the pool.
self._actor_pool.return_actor(actor_to_return)
# Dipsatch more tasks.
self._dispatch_tasks()

# For some reason, if we don't define a new variable `actor_to_return`,
# the following lambda won't capture the correct `actor` variable.
actor_to_return = actor
self._submit_data_task(
gen,
bundle,
lambda: _task_done_callback(actor_to_return),
)

# Needed in the bulk execution path for triggering autoscaling. This is a
# no-op in the streaming execution case.
Expand Down Expand Up @@ -212,28 +228,6 @@ def _scale_down_if_needed(self):
# break out of the scale-down loop.
break

def notify_work_completed(
self, ref: Union[ObjectRef[ObjectRefGenerator], ray.ObjectRef]
):
# This actor pool MapOperator implementation has both task output futures AND
# worker started futures to handle here.
if ref in self._tasks:
# Get task state and set output.
task, actor = self._tasks.pop(ref)
task.output = self._map_ref_to_ref_bundle(ref)
self._handle_task_done(task)
# Return the actor that was running the task to the pool.
self._actor_pool.return_actor(actor)
else:
# ref is a future for a now-ready actor; move actor from pending to the
# active actor pool.
has_actor = self._actor_pool.pending_to_running(ref)
if not has_actor:
# Actor has already been killed.
return
# For either a completed task or ready worker, we try to dispatch queued tasks.
self._dispatch_tasks()

def all_inputs_done(self):
# Call base implementation to handle any leftover bundles. This may or may not
# trigger task dispatch.
Expand Down Expand Up @@ -285,15 +279,6 @@ def shutdown(self):
"the parallelism when creating the Dataset."
)

def get_work_refs(self) -> List[ray.ObjectRef]:
# Work references that we wish the executor to wait on includes both task
# futures AND worker ready futures.
return list(self._tasks.keys()) + self._actor_pool.get_pending_actor_refs()

def num_active_work_refs(self) -> int:
# Active work references only includes running tasks, not pending actor starts.
return len(self._tasks)

def progress_str(self) -> str:
base = f"{self._actor_pool.num_running_actors()} actors"
pending = self._actor_pool.num_pending_actors()
Expand Down
Loading
Loading