-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Datasets] Implement push-based shuffle #23758
[Datasets] Implement push-based shuffle #23758
Conversation
Co-authored-by: Clark Zinzow <clarkzinzow@gmail.com>
Co-authored-by: Clark Zinzow <clarkzinzow@gmail.com>
Co-authored-by: Clark Zinzow <clarkzinzow@gmail.com>
An initial run of |
merge_tasks_assigned += 1 | ||
leftover_cpu_map[node_id] -= 1 | ||
merge_task_args = [ | ||
{"scheduling_strategy": NodeAffinitySchedulingStrategy(node_id, soft=True)} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also use NodeAffinitySchedulingStrategy to colocate merge_factor
number of map tasks with the merge task?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is actually better to let Ray schedule the map tasks in case there are other factors like data locality being used to schedule those. I'll make a note of this in the code!
# reducer. | ||
merge_task_args = self._compute_merge_task_options( | ||
num_merge_tasks_per_round, merge_factor, cpu_map | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we unit test the above topology configuration logic? Concretely, we could split it out into helper method that returns a struct/dict, and we can unit test the topology configured for various cluster scenarios. This would also help with splitting the code up for readability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
"map": shuffle_map_metadata, | ||
"merge": shuffle_merge_metadata, | ||
"reduce": new_metadata, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some test to test_stats.py?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great! Some high level code structure suggestions.
@@ -28,6 +29,11 @@ | |||
# Whether to furthermore fuse prior map tasks with shuffle stages. | |||
DEFAULT_OPTIMIZE_FUSE_SHUFFLE_STAGES = True | |||
|
|||
# Whether to use push-based shuffle by default. | |||
DEFAULT_USE_PUSH_BASED_SHUFFLE = bool( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to set this env for all nodes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah right, good point... It only needs to be set on whoever calls the Dataset shuffle call; after that I think the context will get propagated through the usual mechanism. Do you think that's an issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @clarkzinzow for best practice/convention here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stephanie-wang @scv119 I think that this is fine, since we already rely on the user being able to mutate the global context in the driver before doing Dataset operations and expect those default overrides to propagate to all tasks, so this is no different IMO.
# reducer. | ||
merge_task_args = self._compute_merge_task_options( | ||
num_merge_tasks_per_round, merge_factor, cpu_map | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
I think some tests are failing here. I reviewed this at a high level and am ok with merging this to unblock further testing, though we might want to revisit the code structure at a later point (could be after we're happy with the performance). |
@@ -28,6 +29,11 @@ | |||
# Whether to furthermore fuse prior map tasks with shuffle stages. | |||
DEFAULT_OPTIMIZE_FUSE_SHUFFLE_STAGES = True | |||
|
|||
# Whether to use push-based shuffle by default. | |||
DEFAULT_USE_PUSH_BASED_SHUFFLE = bool( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @clarkzinzow for best practice/convention here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, same thoughts as @ericl: I have some ideas of how to improve the code structure here, but this looks good enough to merge, so let's unblock next steps and revisit after perf testing.
@@ -28,6 +29,11 @@ | |||
# Whether to furthermore fuse prior map tasks with shuffle stages. | |||
DEFAULT_OPTIMIZE_FUSE_SHUFFLE_STAGES = True | |||
|
|||
# Whether to use push-based shuffle by default. | |||
DEFAULT_USE_PUSH_BASED_SHUFFLE = bool( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stephanie-wang @scv119 I think that this is fine, since we already rely on the user being able to mutate the global context in the driver before doing Dataset operations and expect those default overrides to propagate to all tasks, so this is no different IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome seeing this work going into production! I left a few questions inline.
One thing we should keep in mind is data skew. The current pipelining schedule will not work very well if the input partitions are too imbalanced.
) -> Tuple[BlockList, Dict[str, List[BlockMetadata]]]: | ||
logger.info("Using experimental push-based shuffle.") | ||
# TODO(swang): For jobs whose reduce work is heavier than the map work, | ||
# we should support fractional merge factors. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand what fractional merge factors mean. Like, how would a merge task merge 1.5x map outputs? And why is this related with reduce/map workload ratio?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It just means you have more merge tasks than map tasks. So if you have map:merge = 1/2 and 10 map tasks for every round, each merge task will output 1/2 of a partition's worth of data. Presumably you'd also want to have many more reduce tasks than map tasks in this scenario.
But yeah, there's no evidence yet that we'd want to support this case. I haven't seen a case yet where the reduce-side computation is heavier than the map.
This reverts commit c1054a0.
The simple shuffle currently implemented in Datasets does not reliably scale past 1000+ partitions due to metadata and I/O overhead. This PR adds an experimental shuffle implementation for a "push-based shuffle", as described in this paper draft. This algorithm should see better performance at larger data scales. The algorithm works by merging intermediate map outputs at the reducer side while other map tasks are executing. Then, a final reduce task merges these merged outputs. Currently, the PR exposes this option through the DatasetContext. It can also be set through a hidden OS environment variable (RAY_DATASET_PUSH_BASED_SHUFFLE). Once we have more comprehensive benchmarks, we can better document this option and allow the algorithm to be chosen at run time. Redo for #23758 to fix CI.
Why are these changes needed?
The simple shuffle currently implemented in Datasets does not reliably scale past 1000+ partitions due to metadata and I/O overhead.
This PR adds an experimental shuffle implementation for a "push-based shuffle", as described in this paper draft. This algorithm should see better performance at larger data scales. The algorithm works by merging intermediate map outputs at the reducer side while other map tasks are executing. Then, a final reduce task merges these merged outputs.
Currently, the PR exposes this option through the DatasetContext. It can also be set through a hidden OS environment variable (
RAY_DATASET_PUSH_BASED_SHUFFLE
). Once we have more comprehensive benchmarks, we can better document this option and allow the algorithm to be chosen at run time.Related issue number
Closes #23758.