Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimized Local Scheduling of splits in Presto #799

Merged
merged 2 commits into from Jun 12, 2019

Conversation

3 participants
@garvit-gupta
Copy link
Member

commented May 21, 2019

fixes #680

for (Split split : splits) {
if (split.isRemotelyAccessible() && (split.getAddresses().size() > 0)
&& split.getAddresses().stream().anyMatch(address -> !address.getHostText().equals("localhost"))
&& !(CatalogName.isInternalSystemConnector(split.getCatalogName()) || split.getCatalogName().toString().equalsIgnoreCase("jmx"))) {

This comment has been minimized.

Copy link
@Praveen2112

Praveen2112 May 21, 2019

Member

Any specific reason why we check for information schema connector or jmx connector ? Plus it is not required as the splits created for these connectors are not remotely accessible so split.isRemotelyAccessible() itself might return false and exits the if statement

This comment has been minimized.

Copy link
@garvit-gupta

garvit-gupta May 22, 2019

Author Member

I have removed that check as per your suggestion. It was relevant for our use case but is redundant here as you pointed out. Thanks for the observation.

This comment has been minimized.

Copy link
@Praveen2112

Praveen2112 May 22, 2019

Member

split.getAddresses().stream().anyMatch(address -> !address.getHostText().equals("localhost"))

Any specific reason for checking the addresses too. I guess this would be redundant too. As selectExactNodes might return an empty list if the address is localHost. (Not sure but I guess it should return an empty list)

This comment has been minimized.

Copy link
@garvit-gupta

garvit-gupta May 22, 2019

Author Member

This check has been taken from

return addresses.stream().anyMatch(address -> !address.getHostText().equals("localhost"));

It is to prevent splits from entering the locality-assignment stage (and correspondingly the redistribution stage) when Presto is configured with data having no co-location with the worker nodes. In such a case, when data is read from s3, it has been seen that the splits have localhost by default in their address. This check makes sure that splits don't enter the first loop and the assignment is the same as the assignment made by SimpleNodeSelector.

This comment has been minimized.

Copy link
@Praveen2112

Praveen2112 May 24, 2019

Member

Oh okay but instead of doing this check. I think we can return an empty list if they have localhost by default in their address. (It is not related to the current PR then I think we don't need this check)

Show resolved Hide resolved ...n/src/main/java/io/prestosql/execution/scheduler/SimpleNodeSelector.java Outdated

@cla-bot cla-bot bot added the cla-signed label May 22, 2019

&& split.getAddresses().stream().anyMatch(address -> !address.getHostText().equals("localhost"))) {
splitsToBeRedistributed = true;
List<InternalNode> candidateNodes;
candidateNodes = selectExactNodes(nodeMap, split.getAddresses(), includeCoordinator);

This comment has been minimized.

Copy link
@Praveen2112

Praveen2112 May 22, 2019

Member

I guess we can inline this statement

This comment has been minimized.

Copy link
@garvit-gupta

garvit-gupta May 22, 2019

Author Member

Yes, thanks. I have made the change.

for (Split split : splits) {
if (split.isRemotelyAccessible() && (split.getAddresses().size() > 0)
&& split.getAddresses().stream().anyMatch(address -> !address.getHostText().equals("localhost"))) {
splitsToBeRedistributed = true;

This comment has been minimized.

Copy link
@Praveen2112

Praveen2112 May 22, 2019

Member

splitsToBeRedistributed should be set to true only if a node is selected right ?

This comment has been minimized.

Copy link
@garvit-gupta

garvit-gupta May 22, 2019

Author Member

We are setting a this flag as true whenever splits enter the first stage (locality based assignment). We wanted the third stage (redistribution) to be invoked only when splits are entering the first stage, and that is what the splitsToBeRedistributed flag does.

If splits are entering the 1st stage, they will remain unassigned in the 1st stage only when their local node is filled up. If only a few nodes are filled up after the 1st and the 2nd stage, leading to some splits remaining unassigned in the first stage (they would have been assigned in the second stage), redistribution will ensure a uniform workload. If all nodes are filled up, redistribution can not happen anyway.

This comment has been minimized.

Copy link
@Praveen2112

Praveen2112 May 24, 2019

Member

they will remain unassigned in the 1st stage only when their local node is filled up

But they will remain unassigned even if their candidatesNodes are empty ?

This comment has been minimized.

Copy link
@garvit-gupta

garvit-gupta May 24, 2019

Author Member

Thanks for the suggestion. I have moved the location of where we are setting splitsToBeRedistributed to catch the case where splits have some addresses but candidateNodes are empty.

@garvit-gupta garvit-gupta force-pushed the garvit-gupta:prestosqlOLS branch from ad80eb4 to e5ee93e May 27, 2019

@electrum electrum requested a review from dain May 29, 2019

@dain
Copy link
Member

left a comment

I really like these changes and I think it is a pure improvement to the existing SimpleNodeSelector. So I think we should add the new code into the existing SimpleNodeSelector instead of creating a new scheduler class. As a safety measure, I think we should include a kill switch around step 1 of the selector code, incase we have a bug or performance issue. In a few releases we can drop the kill switch.

BTW, I really dug into this because I want to make it the default, so I have a lot of comments. As a summary:

  • Just improve the SimpleNodeSelector!
  • I proposed a new redistribute algorithm, that I think has a lower complexity cost (and maybe simpler).
  • The allNodes() should be avoided. It creates a new snapshot of an expensive global structure each call.
  • I didn't review the tests, because I'm not sure how much will still be needed after these changes.

I'm also going to ask @martint to read over the redistribute algorithm to get more eyes on this hot-path code.


Optional<InternalNode> chosenNode = candidateNodes.stream()
.filter(ownerNode -> assignmentStats.getTotalSplitCount(ownerNode) < maxSplitsPerNode)
.min((node1, node2) -> Integer.compare(assignmentStats.getTotalSplitCount(node1), assignmentStats.getTotalSplitCount(node2)));

This comment has been minimized.

Copy link
@dain

dain Jun 1, 2019

Member
.min(comparingInt(assignmentStats::getTotalSplitCount));
* redistribute a Non-local split. This case is possible when there are multiple queries running simultaneously.
* If a Non-local split cannot be found in the maxNode, any split is selected and reassigned.
*/
void redistributeSingleSplit(Multimap<InternalNode, Split> assignment, InternalNode maxNode, InternalNode minNode)

This comment has been minimized.

Copy link
@dain

dain Jun 1, 2019

Member

this can be static

int min = Integer.MAX_VALUE;
InternalNode minNode = null;
for (InternalNode node : allNodes()) {
if ((assignmentStats.getTotalSplitCount(node) > max && assignment.containsKey(node)) && assignment.get(node).size() > 0) {

This comment has been minimized.

Copy link
@dain

dain Jun 1, 2019

Member

The last conjunct should be !assignment.get(node).isEmpty()

* redistribute a Non-local split. This case is possible when there are multiple queries running simultaneously.
* If a Non-local split cannot be found in the maxNode, any split is selected and reassigned.
*/
void redistributeSingleSplit(Multimap<InternalNode, Split> assignment, InternalNode maxNode, InternalNode minNode)

This comment has been minimized.

Copy link
@dain

dain Jun 1, 2019

Member

Add @VisibleForTesting

This comment has been minimized.

Copy link
@dain

dain Jun 1, 2019

Member

I'd change this signature of this to:

    static void moveSplitAssignment(Multimap<InternalNode, Split> assignment, InternalNode fromNode, InternalNode toNode)

I found the redistribute name really confusing

assertEquals(assignment.get(node2).size(), 10);

// Redistribute 1 split from Node 1 to Node 2
OptimizedLocalNodeSelector ns = (OptimizedLocalNodeSelector) nodeSelector;

This comment has been minimized.

Copy link
@dain

dain Jun 1, 2019

Member

This won't be used once redistributeSingleSplit is static

* @param assignmentStats required to obtain info regarding splits assigned to a node outside the current batch of assignment
* with no changes if it is empty or if force-local-scheduling is enabled
*/
private void equateDistribution(Multimap<InternalNode, Split> assignment, NodeAssignmentStats assignmentStats)

This comment has been minimized.

Copy link
@dain

dain Jun 1, 2019

Member

I'm concerned about the computational complexity of the algorithm here. For each loop, the all nodes are processed, which I think will result in some thing like number_of_splits_to_move * node_count. I think we might be able to simplify this by using a pair of PriorityQueues (heaps), a max one for the nodes where we have assignments, and a min one for all nodes. We can't use a MinMaxPriortyQueue because they contain different elements. Here is what I came up with, but it has a different stopping condition (see my question below), but the tests seem to pass:

    private static void equateDistribution(Multimap<InternalNode, Split> assignment, NodeAssignmentStats assignmentStats, NodeMap nodeMap)
    {
        if (assignment.isEmpty()) {
            return;
        }

        Collection<InternalNode> allNodes = nodeMap.getNodesByHostAndPort().values();
        if (allNodes.size() < 2) {
            return;
        }

        PriorityQueue<InternalNode> maxNodes = new PriorityQueue<>(assignment.size(), comparingInt(assignmentStats::getTotalSplitCount).reversed());
        maxNodes.addAll(assignment.keySet());

        PriorityQueue<InternalNode> minNodes = new PriorityQueue<>(allNodes.size(), comparingInt(assignmentStats::getTotalSplitCount));
        minNodes.addAll(allNodes);

        while (true) {
            // fetch min and max node
            InternalNode maxNode = maxNodes.remove();
            InternalNode minNode = minNodes.remove();
            if (assignmentStats.getTotalSplitCount(maxNode) - assignmentStats.getTotalSplitCount(minNode) <= 1) {
                return;
            }

            // move split from max to min
            boolean minAlreadyHasAssignment = assignment.containsKey(minNode);
            moveSplit(assignment, maxNode, minNode);
            assignmentStats.removeAssignedSplit(maxNode);
            assignmentStats.addAssignedSplit(minNode);

            // add max back into maxNodes only if it still has assignments
            if (assignment.containsKey(maxNode)) {
                maxNodes.add(maxNode);
            }
            // add min node into maxNodes heap only if it was not already in the heap (i.e., already had assignments)
            if (!minAlreadyHasAssignment) {
                maxNodes.add(minNode);
            }
            // always add min back into minNodes
            minNodes.add(minNode);
        }
    }
if (candidateNodes.isEmpty()) {
continue;
}
splitsToBeRedistributed = true;

This comment has been minimized.

Copy link
@dain

dain Jun 1, 2019

Member

I think this should be moved inside of the if statement below, since we only need to redistribute if we actually did a local assignment

// splitsToBeRedistributed remains false for internal splits or if force-local-scheduling is true
boolean splitsToBeRedistributed = false;

//prioritize assignment of splits to local nodes when the local nodes have slots available in the first pass

This comment has been minimized.

Copy link
@dain

dain Jun 1, 2019

Member

missing space after //

&& split.getAddresses().stream().anyMatch(address -> !address.getHostText().equals("localhost"))) {
List<InternalNode> candidateNodes = selectExactNodes(nodeMap, split.getAddresses(), includeCoordinator);

if (candidateNodes.isEmpty()) {

This comment has been minimized.

Copy link
@dain

dain Jun 1, 2019

Member

This isn't needed once splitsToBeRedistributed = true is moved (see next comment)

boolean splitsToBeRedistributed = false;

//prioritize assignment of splits to local nodes when the local nodes have slots available in the first pass
Set<Split> splitsToBeScheduled = new HashSet<>(splits);

This comment has been minimized.

Copy link
@dain

dain Jun 1, 2019

Member

I think it would be easier to read if you, started with an empty set and added the splits that need to be scheduled, instead of starting with all splits and then removing some. Also this is where we would put the kill switch. Here is what I can up with after applying the comments below:

        Set<Split> remainingSplits;
        if (optimizeLocalAssignments) {
            remainingSplits = new HashSet<>(splits);
            for (Split split : splits) {
                if (split.isRemotelyAccessible() && !split.getAddresses().isEmpty() && split.getAddresses().stream().anyMatch(address -> !address.getHostText().equals("localhost"))) {
                    List<InternalNode> candidateNodes = selectExactNodes(nodeMap, split.getAddresses(), includeCoordinator);
    
                    Optional<InternalNode> chosenNode = candidateNodes.stream()
                            .filter(ownerNode -> assignmentStats.getTotalSplitCount(ownerNode) < maxSplitsPerNode)
                            .min(comparingInt(assignmentStats::getTotalSplitCount));
    
                    if (chosenNode.isPresent()) {
                        assignment.put(chosenNode.get(), split);
                        assignmentStats.addAssignedSplit(chosenNode.get());
                        remainingSplits.remove(split);
                        splitsToBeRedistributed = true;
                        continue;
                    }
                }
                remainingSplits.add(split);
            }
        }
        else {
            remainingSplits = splits;
        }

@dain dain requested a review from martint Jun 1, 2019

@garvit-gupta garvit-gupta force-pushed the garvit-gupta:prestosqlOLS branch from e5ee93e to 90f0259 Jun 4, 2019

@garvit-gupta

This comment has been minimized.

Copy link
Member Author

commented Jun 4, 2019

@dain Thank you for the detailed review. We have updated the PR according to your suggestions.

@dain dain self-requested a review Jun 4, 2019

@dain
Copy link
Member

left a comment

Only one major comment about IndexedPriorityQueue which we talked about on Slack. Most of the other comments were about the test.

@@ -35,6 +35,7 @@
private int maxSplitsPerNode = 100;
private int maxPendingSplitsPerTask = 10;
private String networkTopology = NetworkTopologyType.LEGACY;
private boolean optimizedLocalScheduling;

This comment has been minimized.

Copy link
@dain

dain Jun 5, 2019

Member

Set the default to true

// optimizedLocalScheduling enables prioritized assignment of splits to local nodes when splits contain locality information
if (optimizedLocalScheduling) {
for (Split split : splits) {
if (split.isRemotelyAccessible() && !split.getAddresses().isEmpty() && split.getAddresses().stream().anyMatch(address -> !address.getHostText().equals("localhost"))) {

This comment has been minimized.

Copy link
@dain

dain Jun 5, 2019

Member

I believe Hive was the only connector producing "localhost" splits, and @electrum recently changed the Hive connector use an empty list instead of localhost.

This comment has been minimized.

Copy link
@garvit-gupta

garvit-gupta Jun 6, 2019

Author Member

So if (split.isRemotelyAccessible() && !split.getAddresses().isEmpty()) should suffice?

return;
}

PriorityQueue<InternalNode> maxNodes = new PriorityQueue<>(assignment.keySet().size(), comparingInt(assignmentStats::getTotalSplitCount).reversed());

This comment has been minimized.

Copy link
@dain

dain Jun 5, 2019

Member

Consider using IndexedPriorityQueue instead to avoid the issue where the min node might be in the max heap and we change it's value without updating the heap. IndexedPriorityQueue has an nice addOrUpdate for this case.

splits.add(new Split(CATALOG_NAME, new TestSplitRemote(), Lifespan.taskWide()));

Multimap<InternalNode, Split> assignments = nodeSelector.computeAssignments(splits, ImmutableList.copyOf(taskMap.values())).getAssignments();
assertTrue(assignments.isEmpty());

This comment has been minimized.

Copy link
@dain

dain Jun 5, 2019

Member

I'm confused why this ends with an assert when the test is expected to throw

This comment has been minimized.

Copy link
@garvit-gupta

garvit-gupta Jun 6, 2019

Author Member

Will remove

Set<Split> splits = new HashSet<>();
splits.add(new Split(CATALOG_NAME, new TestSplitRemote(), Lifespan.taskWide()));

Multimap<InternalNode, Split> assignments = nodeSelector.computeAssignments(splits, ImmutableList.copyOf(taskMap.values())).getAssignments();

This comment has been minimized.

Copy link
@dain

dain Jun 5, 2019

Member

Use something like this so you can verify the message and error code:

assertThatThrownBy(() -> doSomething())
        .isInstanceOf(PrestoException.class)
        .matches(e -> ((PrestoException) e).getErrorCode().equals(NOT_SUPPORTED.toErrorCode()))
        .hasMessageMatching("something failed .*")
@Override
public List<HostAddress> getAddresses()
{
return ImmutableList.of(HostAddress.fromString("127.0.0.1"));

This comment has been minimized.

Copy link
@dain

dain Jun 5, 2019

Member

Leave the random port from TestNodeScheduler

assertTrue(redistributedSplit.iterator().next().getConnectorSplit() instanceof TestSplitRemote);
}

private static class TestSplitLocalToNode1

This comment has been minimized.

Copy link
@dain

dain Jun 5, 2019

Member

We can also leave the name from TestNodeScheduler

assertTrue(assignments2.keySet().contains(node1));

// When optimized-local-scheduling is enabled, the split with node1 as local node should be assigned
int countLocalSplits = 0;

This comment has been minimized.

Copy link
@dain

dain Jun 5, 2019

Member

The stream api for this is much easier to read... up date the other counts in this class also.

int countLocalSplits = assignments2.values().stream()
        .filter(TestSplitLocalToNode1.class::isInstance)
        .count();
splits.add(new Split(CATALOG_NAME, new TestSplitRemote(), Lifespan.taskWide()));
}
// computeAssignments just returns a mapping of nodes with splits to be assigned, it does not assign splits
Multimap<InternalNode, Split> assignments1 = nodeSelector.computeAssignments(splits, ImmutableList.copyOf(taskMap.values())).getAssignments();

This comment has been minimized.

Copy link
@dain

dain Jun 5, 2019

Member

maybe name this initialAssignments

@Test
public void testPrioritizedAssignmentOfLocalSplit()
{
InternalNode node1 = new InternalNode("node1", URI.create("http://127.0.0.1"), NodeVersion.UNKNOWN, false);

This comment has been minimized.

Copy link
@dain

dain Jun 5, 2019

Member

For the tests where is there is only one node, just name the variable node

@garvit-gupta garvit-gupta force-pushed the garvit-gupta:prestosqlOLS branch 2 times, most recently from 7978b5a to f5399c1 Jun 7, 2019

@dain dain self-requested a review Jun 11, 2019

@dain
Copy link
Member

left a comment

I only had one question about using poll instead of peek + remove. If that change isn't needed, I think we are ready to merge.

Just let me know if you don't think we should change that, or push an updated version.

@dain

This comment has been minimized.

Copy link
Member

commented Jun 11, 2019

For the question on the outdated commit:

So if (split.isRemotelyAccessible() && !split.getAddresses().isEmpty()) should suffice?

Yes I think so. BTW here is David's PR if you are interested: #869

@garvit-gupta garvit-gupta force-pushed the garvit-gupta:prestosqlOLS branch from f5399c1 to 8290140 Jun 12, 2019

@garvit-gupta garvit-gupta force-pushed the garvit-gupta:prestosqlOLS branch from 8290140 to 7433f36 Jun 12, 2019

@dain

dain approved these changes Jun 12, 2019

@dain dain merged commit a6556fe into prestosql:master Jun 12, 2019

1 check passed

Travis CI - Pull Request Build Passed
Details
@dain

This comment has been minimized.

Copy link
Member

commented Jun 12, 2019

Merged! Thanks!

@dain dain added this to the 315 milestone Jun 12, 2019

@dain dain referenced this pull request Jun 12, 2019

Closed

Release notes for 315 #948

@garvit-gupta garvit-gupta deleted the garvit-gupta:prestosqlOLS branch Jun 12, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.