New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support dynamic node assignment for grouped execution #11693

Merged
merged 7 commits into from Dec 2, 2018

Conversation

Projects
None yet
6 participants
@wenleix
Contributor

wenleix commented Oct 11, 2018

  • Make connector bucket to node map optional
  • Introduce BucketNodeMap
  • Support dynamic node assignment for grouped execution
  • Cleanup and Rebase on #11649

Minor fixes requested by the reviews:

  • Make FixedLifespanScheduler a top level class
  • Fix nodeScheduler.createNodeSelector(null)
  • Move newDriverGroupReady.set(null) out of critical section.
@arhimondr

This comment has been minimized.

Member

arhimondr commented Oct 11, 2018

Just curious. Why was it connector responsibility to assign nodes to the buckets? And what changed now that we can move it to the presto-main? It seems like the presto-main was the better place to do node assignments from the begging.

@dain

This comment has been minimized.

Contributor

dain commented Oct 11, 2018

@arhimondr The first use of this was Raptor for colocated joins. In the Raptor case the buckets are physically assigned to specific nodes, so the engine can not make this decision.

@wenleix wenleix force-pushed the wenleix:dyanmicb3 branch from 0dc5f47 to 10dd670 Oct 12, 2018

@wenleix

This comment has been minimized.

Contributor

wenleix commented Oct 12, 2018

I will do a rebase once #11649 is merged.

In the meanwhile, @dain, @haozhun It would be great to have an initial round of structural/design level review of this PR? Thanks!

@wenleix wenleix requested review from haozhun and dain Oct 12, 2018

@wenleix wenleix force-pushed the wenleix:dyanmicb3 branch from 33eb640 to 93f48f0 Oct 15, 2018

@wenleix wenleix force-pushed the wenleix:dyanmicb3 branch 2 times, most recently from 881ea1b to 3fc70fe Oct 15, 2018

@wenleix wenleix changed the title from [WIP] Support dynamic node assignment for grouped execution to Support dynamic node assignment for grouped execution Oct 15, 2018

if (plan.getSubStages().isEmpty() // no remote source
&& groupedExecutionForStage
// connector has no bucket to node map
&& !nodePartitioningManager.getBucketToNode(session, partitioningHandle).isPresent()) {

This comment has been minimized.

@wenleix

wenleix Oct 16, 2018

Contributor

I am also thinking to control this by a session property (e.g. dyanmic-schedule-for-grouped-execution).

Or, it might be a enum grouped-execution-lifespan-schedule:

FIXED,
DYNAMIC_IF_POSSIBLE,
DYNAMIC_AND_RECOVERABLE_IF_POSSIBLE

This comment has been minimized.

@dain

dain Oct 18, 2018

Contributor

I'm concerned about getBucketToNode being called multiple times. If possible, I suggest we restructure the code to only do this once.

@dain

I'd like @haozhun to review this also.

I'm a bit concerned about the number of abstractions we have in this area. Can we consolidate some of these abstractions? Can we reduce the surface area of the abstractions? It is fine if the answer is no, but I'd like you two to think about it.

Show resolved Hide resolved ...c/main/java/com/facebook/presto/sql/planner/NodePartitioningManager.java Outdated
{
Set<Node> nodes = nodeManager.getRequiredWorkerNodes();
checkState(!nodes.isEmpty(), "No TPCDS nodes available");
checkState(!nodes.isEmpty(), "No nodes available");

This comment has been minimized.

@dain

dain Oct 18, 2018

Contributor

Assuming you verify that the count is at least one in ConnectorNodePartitioningProvider, you don't need this here, and the other ones.

This comment has been minimized.

@haozhun

haozhun Oct 22, 2018

Contributor

Presto allows each worker to load a subset of plugins, and respect that when assigning splits. I'm not sure whether this is useful at all. But this is why the error message said "No TPCDS nodes available" in the first place. I don't think it should be changed.

I disagree with dain on that the assertion should be removed. It is a reasonable assertion to verify an assumption that subsequent code depends on, and it's cheap. As a result, I don't see a downside in keeping it here, close to where the assumption is depended on.

This comment has been minimized.

@wenleix

wenleix Oct 26, 2018

Contributor

Thanks for clarification. I think it makes sense to keep the original message.

This comment has been minimized.

@dain

dain Nov 2, 2018

Contributor

Actually, getRequiredWorkerNodes, will never return an empty set. Take a look the code.

Show resolved Hide resolved ...c/main/java/com/facebook/presto/sql/planner/NodePartitioningManager.java Outdated
Show resolved Hide resolved ...c/main/java/com/facebook/presto/sql/planner/NodePartitioningManager.java Outdated
Show resolved Hide resolved ...c/main/java/com/facebook/presto/sql/planner/NodePartitioningManager.java Outdated
Show resolved Hide resolved .../facebook/presto/execution/scheduler/group/DynamicLifespanScheduler.java Outdated
Show resolved Hide resolved .../facebook/presto/execution/scheduler/group/DynamicLifespanScheduler.java
Show resolved Hide resolved .../facebook/presto/execution/scheduler/group/DynamicLifespanScheduler.java
Show resolved Hide resolved .../facebook/presto/execution/scheduler/group/DynamicLifespanScheduler.java
Show resolved Hide resolved .../facebook/presto/execution/scheduler/group/DynamicLifespanScheduler.java Outdated
@haozhun

"Make connector bucket to node map optional"

Show resolved Hide resolved ...c/main/java/com/facebook/presto/sql/planner/NodePartitioningManager.java Outdated
Show resolved Hide resolved ...c/main/java/com/facebook/presto/sql/planner/NodePartitioningManager.java Outdated
Show resolved Hide resolved ...com/facebook/presto/spi/connector/ConnectorNodePartitioningProvider.java Outdated
{
Set<Node> nodes = nodeManager.getRequiredWorkerNodes();
checkState(!nodes.isEmpty(), "No TPCDS nodes available");
checkState(!nodes.isEmpty(), "No nodes available");

This comment has been minimized.

@haozhun

haozhun Oct 22, 2018

Contributor

Presto allows each worker to load a subset of plugins, and respect that when assigning splits. I'm not sure whether this is useful at all. But this is why the error message said "No TPCDS nodes available" in the first place. I don't think it should be changed.

I disagree with dain on that the assertion should be removed. It is a reasonable assertion to verify an assumption that subsequent code depends on, and it's cheap. As a result, I don't see a downside in keeping it here, close to where the assumption is depended on.

@wenleix wenleix assigned wenleix and unassigned dain and haozhun Oct 25, 2018

@wenleix wenleix force-pushed the wenleix:dyanmicb3 branch 8 times, most recently from 51e271c to b2b066c Oct 28, 2018

@haozhun

"Minor fix in SqlQueryScheduler" - looks good

@haozhun

"Move SettableFuture#set outside critical section" - looks good

@haozhun

"Add session property for grouped execution dynamic schedule" - comments

Show resolved Hide resolved ...src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java
assertQuery(colocatedAllGroupsAtOnceDynamic, joinThreeBucketedTable, expectedJoinQuery);
assertQuery(colocatedAllGroupsAtOnceDynamic, joinThreeMixedTable, expectedJoinQuery);
assertQuery(colocatedOneGroupAtATimeDynamic, joinThreeBucketedTable, expectedJoinQuery);
assertQuery(colocatedOneGroupAtATimeDynamic, joinThreeMixedTable, expectedJoinQuery);

This comment has been minimized.

@haozhun

haozhun Nov 8, 2018

Contributor

I feel like it's excessive to run every tests with all the session property combinations.

colocatedAllGroupsAtOnceDynamic is probably not particularly interesting in general. As a result, it probably only needs to be tested for a small number of queries.

colocatedOneGroupAtATimeDynamic is probably not particularly interesting for queries with remote exchanges. As a result, this property should probably only be tested on a small number of such queries.

@wenleix wenleix force-pushed the wenleix:dyanmicb3 branch 9 times, most recently from 64b26bf to 8129d3d Nov 14, 2018

@wenleix wenleix force-pushed the wenleix:dyanmicb3 branch 4 times, most recently from c7bee84 to 745710f Dec 2, 2018

wenleix added some commits Oct 11, 2018

Make connector bucket to node map optional
For shared-storage connectors (e.g. Hive connector), there is no
preference about which node to load the bucket.
Previously connector will randomly assign buckets to nodes.

Making this optional opens opportunities for engine to schedule bucket
execution in a more flexible way.
Introduce BucketNodeMap
Splits from the same buckets are scheduled to the same node to allow
bucketed execution. Previously, NodeSelector#computeAssignments takes
NodePartitionMap, which contains extra information.

This commit introduces BucketNodeMap to represent the mapping in a
separate class. This also opens opportunities for engine to schedule
bucket execution in a more flexible way.
Minor fix in SqlQueryScheduler
When bucket processing is enabled, the node selector should be created
with the connectorId since we need to schedule on nodes that have the
connector installed.
Move SettableFuture#set outside critical section
SettableFuture#set triggers callback, which should be avoided inside
critical section.

@wenleix wenleix force-pushed the wenleix:dyanmicb3 branch from 745710f to 942b564 Dec 2, 2018

@wenleix wenleix merged commit c2c5407 into prestodb:master Dec 2, 2018

1 check passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details

@wenleix wenleix deleted the wenleix:dyanmicb3 branch Dec 2, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment