Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[data] Inherit block size from downstream ops #41019

Merged
merged 27 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
a6fc9f2
first test, use task metrics first
stephanie-wang Oct 25, 2023
ed87288
More .schema() tests
stephanie-wang Oct 25, 2023
50027a7
Collect obj store and actor stats
stephanie-wang Oct 26, 2023
1144218
Test .limit()
stephanie-wang Oct 26, 2023
582b431
Basic block split test
stephanie-wang Oct 27, 2023
65aadd5
Merge remote-tracking branch 'upstream/master' into data-metrics-testing
stephanie-wang Nov 3, 2023
9763f43
merge
stephanie-wang Nov 3, 2023
791e0a1
Merge remote-tracking branch 'upstream/master' into data-metrics-testing
stephanie-wang Nov 3, 2023
6697f85
test
stephanie-wang Nov 4, 2023
28e0605
Merge remote-tracking branch 'upstream/master' into data-metrics-testing
stephanie-wang Nov 6, 2023
7a9a8ce
Merge remote-tracking branch 'upstream/master' into data-metrics-testing
stephanie-wang Nov 6, 2023
2e229a6
threads
stephanie-wang Nov 6, 2023
237c142
ci
stephanie-wang Nov 7, 2023
b66a90e
Merge branch 'data-metrics-testing' of github.com:stephanie-wang/ray …
stephanie-wang Nov 7, 2023
2337708
test
stephanie-wang Nov 7, 2023
52362dd
unit test
stephanie-wang Nov 7, 2023
2200478
detect parallelism as part of read op
stephanie-wang Nov 8, 2023
30079df
inherit target block sizes
stephanie-wang Nov 8, 2023
b4a4639
Merge remote-tracking branch 'upstream/master' into propagate-block-size
stephanie-wang Nov 21, 2023
cfcfdba
resolve comments
stephanie-wang Nov 21, 2023
9d4eecb
oops
stephanie-wang Nov 21, 2023
56b46a2
Fix
stephanie-wang Nov 22, 2023
7032bee
lint
stephanie-wang Nov 27, 2023
97d6991
x
stephanie-wang Nov 27, 2023
89977e9
fix test
stephanie-wang Nov 28, 2023
6171a68
union zip
stephanie-wang Nov 28, 2023
fde58c9
time
stephanie-wang Nov 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions python/ray/data/_internal/logical/operators/read_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,17 @@ def __init__(
self._datasource_or_legacy_reader = datasource_or_legacy_reader
self._parallelism = parallelism
self._mem_size = mem_size
self._detected_parallelism = None

def set_detected_parallelism(self, parallelism: int):
"""
Set the true parallelism that should be used during execution. This
should be detected by the optimizer.
"""
self._detected_parallelism = parallelism

def get_detected_parallelism(self) -> int:
"""
Get the true parallelism that should be used during execution.
"""
return self._detected_parallelism
10 changes: 6 additions & 4 deletions python/ray/data/_internal/logical/optimizers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@
add_user_provided_logical_rules,
add_user_provided_physical_rules,
)
from ray.data._internal.logical.rules.inherit_target_max_block_size import (
InheritTargetMaxBlockSizeRule,
)
from ray.data._internal.logical.rules.operator_fusion import OperatorFusionRule
from ray.data._internal.logical.rules.randomize_blocks import ReorderRandomizeBlocksRule
from ray.data._internal.logical.rules.split_read_output_blocks import (
SplitReadOutputBlocksRule,
)
from ray.data._internal.logical.rules.set_read_parallelism import SetReadParallelismRule
from ray.data._internal.logical.rules.zero_copy_map_fusion import (
EliminateBuildOutputBlocks,
)
Expand All @@ -25,7 +26,8 @@
]

DEFAULT_PHYSICAL_RULES = [
SplitReadOutputBlocksRule,
InheritTargetMaxBlockSizeRule,
SetReadParallelismRule,
OperatorFusionRule,
EliminateBuildOutputBlocks,
]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from typing import Optional

from ray.data._internal.execution.interfaces import PhysicalOperator
from ray.data._internal.logical.interfaces import PhysicalPlan, Rule


class InheritTargetMaxBlockSizeRule(Rule):
"""For each op that has overridden the default target max block size,
propagate to upstream ops until we reach an op that has also overridden the
target max block size."""

def apply(self, plan: PhysicalPlan) -> PhysicalPlan:
self._propagate_target_max_block_size_to_upstream_ops(plan.dag)
return plan

def _propagate_target_max_block_size_to_upstream_ops(
self, dag: PhysicalOperator, target_max_block_size: Optional[int] = None
):
if dag.target_max_block_size is not None:
# Set the target block size to inherit for
# upstream ops.
target_max_block_size = dag.target_max_block_size
elif target_max_block_size is not None:
# Inherit from downstream op.
dag.target_max_block_size = target_max_block_size

for upstream_op in dag.input_dependencies:
self._propagate_target_max_block_size_to_upstream_ops(
upstream_op, target_max_block_size
)
41 changes: 4 additions & 37 deletions python/ray/data/_internal/logical/rules/operator_fusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from ray.data._internal.execution.operators.base_physical_operator import (
AllToAllOperator,
)
from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer
from ray.data._internal.execution.operators.map_operator import MapOperator
from ray.data._internal.execution.operators.task_pool_map_operator import (
TaskPoolMapOperator,
Expand Down Expand Up @@ -79,8 +78,6 @@ def _fuse_map_operators_in_dag(self, dag: PhysicalOperator) -> MapOperator:
dag = self._get_fused_map_operator(dag, upstream_ops[0])
upstream_ops = dag.input_dependencies

self._propagate_target_max_block_size_to_input(dag)

# Done fusing back-to-back map operators together here,
# move up the DAG to find the next map operators to fuse.
dag._input_dependencies = [
Expand All @@ -105,23 +102,11 @@ def _fuse_all_to_all_operators_in_dag(
len(upstream_ops) == 1
and isinstance(dag, AllToAllOperator)
and isinstance(upstream_ops[0], MapOperator)
and self._can_fuse(dag, upstream_ops[0])
):
if self._can_fuse(dag, upstream_ops[0]):
# Fuse operator with its upstream op.
dag = self._get_fused_all_to_all_operator(dag, upstream_ops[0])
upstream_ops = dag.input_dependencies
else:
# Propagate target max block size to the upstream map op. This
# is necessary even when fusion is not allowed, so that the map
# op will produce the right block size for the shuffle op to
# consume.
map_op = upstream_ops[0]
map_op._target_max_block_size = self._get_merged_target_max_block_size(
upstream_ops[0].target_max_block_size, dag.target_max_block_size
)
break

self._propagate_target_max_block_size_to_input(dag)
# Fuse operator with its upstream op.
dag = self._get_fused_all_to_all_operator(dag, upstream_ops[0])
upstream_ops = dag.input_dependencies

# Done fusing MapOperator -> AllToAllOperator together here,
# move up the DAG to find the next pair of operators to fuse.
Expand Down Expand Up @@ -262,24 +247,6 @@ def _get_merged_target_max_block_size(
# blocks.
return down_target_max_block_size

def _propagate_target_max_block_size_to_input(self, dag):
# Operator fusion will merge target block sizes for adjacent operators,
# but if dag is the first op after a stage with read tasks, then we
# also need to propagate the block size to the input data buffer.
upstream_ops = dag.input_dependencies
if (
len(upstream_ops) == 1
and isinstance(upstream_ops[0], InputDataBuffer)
and self._can_merge_target_max_block_size(
upstream_ops[0].target_max_block_size, dag.target_max_block_size
)
):
upstream_ops[
0
]._target_max_block_size = self._get_merged_target_max_block_size(
upstream_ops[0].target_max_block_size, dag.target_max_block_size
)

def _get_fused_map_operator(
self, down_op: MapOperator, up_op: MapOperator
) -> MapOperator:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,29 @@ def compute_additional_split_factor(
return parallelism, reason, estimated_num_blocks, None


class SplitReadOutputBlocksRule(Rule):
class SetReadParallelismRule(Rule):
"""
This rule sets the read op's task parallelism based on the target block
size, the requested parallelism, the number of read files, and the
available resources in the cluster.

If the parallelism is lower than requested, this rule also sets a split
factor to split the output blocks of the read task, so that the following
stage will have the desired parallelism.
"""

def apply(self, plan: PhysicalPlan) -> PhysicalPlan:
ops = [plan.dag]

while len(ops) == 1 and not isinstance(ops[0], InputDataBuffer):
logical_op = plan.op_map[ops[0]]
if isinstance(logical_op, Read):
self._split_read_op_if_needed(ops[0], logical_op)
self._apply(ops[0], logical_op)
ops = ops[0].input_dependencies

return plan

def _split_read_op_if_needed(self, op: PhysicalOperator, logical_op: Read):
def _apply(self, op: PhysicalOperator, logical_op: Read):
(
detected_parallelism,
reason,
Expand All @@ -96,7 +106,9 @@ def _split_read_op_if_needed(self, op: PhysicalOperator, logical_op: Read):
op.actual_target_max_block_size,
op._additional_split_factor,
)

if logical_op._parallelism == -1:
logical_op._detected_parallelism = detected_parallelism
stephanie-wang marked this conversation as resolved.
Show resolved Hide resolved
assert reason != ""
logger.get_logger().info(
f"Using autodetected parallelism={detected_parallelism} "
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/_internal/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from ray.data._internal.lazy_block_list import LazyBlockList
from ray.data._internal.logical.operators.read_operator import Read
from ray.data._internal.logical.rules.operator_fusion import _are_remote_args_compatible
from ray.data._internal.logical.rules.split_read_output_blocks import (
from ray.data._internal.logical.rules.set_read_parallelism import (
compute_additional_split_factor,
)
from ray.data._internal.planner.plan_read_op import (
Expand Down
14 changes: 5 additions & 9 deletions python/ray/data/_internal/planner/plan_read_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
MapTransformFn,
)
from ray.data._internal.logical.operators.read_operator import Read
from ray.data._internal.util import _autodetect_parallelism, _warn_on_high_parallelism
from ray.data._internal.util import _warn_on_high_parallelism
from ray.data.block import Block
from ray.data.context import DataContext
from ray.data.datasource.datasource import ReadTask
Expand Down Expand Up @@ -48,14 +48,10 @@ def plan_read_op(op: Read) -> PhysicalOperator:
"""

def get_input_data(target_max_block_size) -> List[RefBundle]:
parallelism, _, min_safe_parallelism, _ = _autodetect_parallelism(
op._parallelism,
target_max_block_size,
DataContext.get_current(),
op._datasource_or_legacy_reader,
op._mem_size,
)

parallelism = op.get_detected_parallelism()
assert (
parallelism is not None
), "Read parallelism must be set by the optimizer before execution"
read_tasks = op._datasource_or_legacy_reader.get_read_tasks(parallelism)
_warn_on_high_parallelism(parallelism, len(read_tasks))

Expand Down
70 changes: 55 additions & 15 deletions python/ray/data/tests/test_block_sizing.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,20 +104,41 @@ def test_map(shutdown_only, restore_data_context):
"shuffle_op",
SHUFFLE_ALL_TO_ALL_OPS,
)
def test_shuffle(ray_start_regular_shared, restore_data_context, shuffle_op):
def test_shuffle(shutdown_only, restore_data_context, shuffle_op):
ray.init(
_system_config={
"max_direct_call_object_size": 1000,
},
num_cpus=2,
object_store_memory=int(100e6),
)

# Test AllToAll and Map -> AllToAll Datasets. Check that Map inherits
# AllToAll's target block size.
ctx = DataContext.get_current()
ctx.min_parallelism = 1
ctx.target_min_block_size = 1
mem_size = 8000
mem_size = 800_000
shuffle_fn, kwargs, fusion_supported = shuffle_op

ctx.target_shuffle_max_block_size = 100 * 8
ctx.target_shuffle_max_block_size = 10_000 * 8
num_blocks_expected = mem_size // ctx.target_shuffle_max_block_size
# ds = shuffle_fn(ray.data.range(1000), **kwargs)
# assert ds.materialize().num_blocks() == num_blocks_expected
ds = shuffle_fn(ray.data.range(1000).map(lambda x: x), **kwargs)
block_size_expected = ctx.target_shuffle_max_block_size
last_snapshot = get_initial_core_execution_metrics_snapshot()

ds = shuffle_fn(ray.data.range(100_000), **kwargs).materialize()
assert num_blocks_expected <= ds.num_blocks() <= num_blocks_expected * 1.5
last_snapshot = assert_blocks_expected_in_plasma(
last_snapshot,
# Dataset.sort produces some empty intermediate blocks because the
# input range is already partially sorted.
num_blocks_expected**2,
# Data is written out once before map phase if fusion is disabled, once
# during map phase, once during reduce phase.
total_bytes_expected=mem_size * 2 + (0 if fusion_supported else mem_size),
)

ds = shuffle_fn(ray.data.range(100_000).map(lambda x: x), **kwargs).materialize()
if not fusion_supported:
# TODO(swang): For some reason BlockBuilder's estimated
# memory usage for range(1000)->map is 2x the actual memory usage.
Expand All @@ -126,23 +147,42 @@ def test_shuffle(ray_start_regular_shared, restore_data_context, shuffle_op):

ctx.target_shuffle_max_block_size //= 2
num_blocks_expected = mem_size // ctx.target_shuffle_max_block_size
ds = shuffle_fn(ray.data.range(1000), **kwargs)
assert ds.materialize().num_blocks() == num_blocks_expected
ds = shuffle_fn(ray.data.range(1000).map(lambda x: x), **kwargs)
block_size_expected = ctx.target_shuffle_max_block_size

ds = shuffle_fn(ray.data.range(100_000), **kwargs).materialize()
assert num_blocks_expected <= ds.num_blocks() <= num_blocks_expected * 1.5
last_snapshot = assert_blocks_expected_in_plasma(
last_snapshot,
num_blocks_expected**2,
total_bytes_expected=mem_size * 2 + (0 if fusion_supported else mem_size),
)

ds = shuffle_fn(ray.data.range(100_000).map(lambda x: x), **kwargs).materialize()
if not fusion_supported:
num_blocks_expected *= 2
assert ds.materialize().num_blocks() == num_blocks_expected
block_size_expected //= 2
assert num_blocks_expected <= ds.num_blocks() <= num_blocks_expected * 1.5
last_snapshot = assert_blocks_expected_in_plasma(
last_snapshot,
num_blocks_expected**2,
total_bytes_expected=mem_size * 2 + (0 if fusion_supported else mem_size),
)

# Setting target max block size does not affect map ops when there is a
# shuffle downstream.
ctx.target_max_block_size = 200 * 8
ctx.target_max_block_size = ctx.target_shuffle_max_block_size * 2
num_blocks_expected = mem_size // ctx.target_shuffle_max_block_size
ds = shuffle_fn(ray.data.range(1000), **kwargs)
assert ds.materialize().num_blocks() == num_blocks_expected
block_size_expected = ctx.target_shuffle_max_block_size
if not fusion_supported:
num_blocks_expected *= 2
ds = shuffle_fn(ray.data.range(1000).map(lambda x: x), **kwargs)
assert ds.materialize().num_blocks() == num_blocks_expected
block_size_expected //= 2
ds = shuffle_fn(ray.data.range(100_000).map(lambda x: x), **kwargs).materialize()
assert num_blocks_expected <= ds.num_blocks() <= num_blocks_expected * 1.5
last_snapshot = assert_blocks_expected_in_plasma(
last_snapshot,
num_blocks_expected**2,
total_bytes_expected=mem_size * 2 + (0 if fusion_supported else mem_size),
)


if __name__ == "__main__":
Expand Down
Loading