New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support consistent hashing in soft affinity node selection strategy #17115
Conversation
rongrong
commented
Dec 16, 2021
•
edited
edited
Based on earlier work in #14738 |
92e61ed
to
708d014
Compare
708d014
to
986a913
Compare
986a913
to
eebc906
Compare
cc @maobaolong |
@@ -37,6 +37,8 @@ | |||
private int maxPendingSplitsPerTask = 10; | |||
private int maxUnacknowledgedSplitsPerTask = 500; | |||
private String networkTopology = NetworkTopologyType.LEGACY; | |||
private NodeSelectionHashStrategy nodeSelectionHashStrategy = NodeSelectionHashStrategy.MODULAR; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we use consistent hash as the default strategy if we didn't see any degradation in fb's staging&production?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason for us to keep the modular strategy in the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can delete modular strategy later once this is tested and rolled out fully. But to be able to safely test it we need both. And keeping the original behavior as default would make sure the feature is not accidentally on during deployment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed with Rongrong. Rolling out the new strategy will invalidate most of the existing local caches due to the shuffle of the nodes. We would like to roll it out in a more controlled manner and active monitor the clusters after it is released.
presto-main/src/main/java/com/facebook/presto/execution/scheduler/NodeScheduler.java
Outdated
Show resolved
Hide resolved
.../src/main/java/com/facebook/presto/execution/scheduler/nodeSelection/SimpleNodeSelector.java
Outdated
Show resolved
Hide resolved
@@ -37,6 +37,8 @@ | |||
private int maxPendingSplitsPerTask = 10; | |||
private int maxUnacknowledgedSplitsPerTask = 500; | |||
private String networkTopology = NetworkTopologyType.LEGACY; | |||
private NodeSelectionHashStrategy nodeSelectionHashStrategy = NodeSelectionHashStrategy.MODULAR; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed with Rongrong. Rolling out the new strategy will invalidate most of the existing local caches due to the shuffle of the nodes. We would like to roll it out in a more controlled manner and active monitor the clusters after it is released.
@@ -257,6 +270,13 @@ public NodeSelector createNodeSelector(Session session, ConnectorId connectorId, | |||
.map(InternalNode::getNodeIdentifier) | |||
.collect(toImmutableSet()); | |||
|
|||
int weight = (int) ceil(1.0 * minVirtualNodeCount / activeNodes.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is the logic of compute the actual virtual node count based on minVirtualNodeCount and physical node count?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"weight" is basically the number of virtual nodes created for each physical node. So the actual number of virtual node is weight * physicalNodeCount = ceil(minVirtualNodeCount / physicalNodeCount) * physicalNodeCount >= minVirtualNodeCount
. I figured it's better to make the number of virtual node configurable, but then the exact number is probably not important, mostly just the order of magnitudes. Directly configuring the weight is another option, but then the desired weight could be very different for a 10 node cluster vs a 500 node cluster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we directly change the config to node-scheduler.consistent-hashing-virtual-node-count
to represent the weight here?
so it would be an independent config from how many nodes a cluster have
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because, as I mentioned above, if you set weight directly, the actual number of virtual nodes on a hashing ring will depend on the number of physical node, which means it's hard to give a good default weight setting.
presto-main/src/main/java/com/facebook/presto/execution/scheduler/NodeMap.java
Outdated
Show resolved
Hide resolved
.../src/main/java/com/facebook/presto/execution/scheduler/nodeSelection/SimpleNodeSelector.java
Outdated
Show resolved
Hide resolved
int weight = (int) ceil(1.0 * minVirtualNodeCount / activeNodes.size()); | ||
for (InternalNode node : activeNodes) { | ||
for (int i = 0; i < weight; i++) { | ||
activeNodesByConsistentHashing.put(murmur3_32().hashString(format("%s%d", node.getNodeIdentifier(), i), UTF_8).asInt(), node); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should move those logic inside ConsistentHashingNodeProvider
given they're specific to ConsistentHashingNodeProvider
and NodeScheduler shouldn't be aware of it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case the map will be created for every query. Here it's created only once when node map is updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point, then should we actually only have one NodeProvider
in the system instead of creating one for each query
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about that. Basically we only need to create a NodeProvider
when NodeMap
is changed. Currently this is cumbersome to do because when we use soft affinity with modular hashing, we are using all nodes, rather than just active nodes, to reduce reshuffling due to temporary worker unavailability. Once re remove modular hashing, and always use active nodes as candidate nodes, the NodeProvider
logic can be moved into NodeMap
entirely.
eebc906
to
54972b5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add test for the ConsistentHashingNodeProvider
to test the even distribution?
We have an test example in gateway, shared in here: https://docs.google.com/document/d/1C_kMut0XS8T3pBYkbAE-xsef0_206XCmMpPzbEs9Q8w/edit?usp=sharing
presto-main/src/test/java/com/facebook/presto/execution/TestNodeScheduler.java
Show resolved
Hide resolved
39415b3
to
fd11fe8
Compare
Is the test stable enough? I'm hesitant on introducing non-deterministic tests in unit test. Distribution is an attribute of the hashing function we choose. So it's not really testing our logic. Is it really necessary? |
fd11fe8
to
747d6f6
Compare
@@ -257,6 +270,13 @@ public NodeSelector createNodeSelector(Session session, ConnectorId connectorId, | |||
.map(InternalNode::getNodeIdentifier) | |||
.collect(toImmutableSet()); | |||
|
|||
int weight = (int) ceil(1.0 * minVirtualNodeCount / activeNodes.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we directly change the config to node-scheduler.consistent-hashing-virtual-node-count
to represent the weight here?
so it would be an independent config from how many nodes a cluster have
if (nodeSelectionHashStrategy == MODULAR_HASHING) { | ||
nodeProvider = new ModularHashingNodeProvider(nodeMap.getAllNodes()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this part is the hacky override to make sure we're compatible with old implementation? Can we add a comment or todo here just in case people would get confused
public NodeProvider getActiveNodeProvider(NodeSelectionHashStrategy nodeSelectionHashStrategy) | ||
{ | ||
switch (nodeSelectionHashStrategy) { | ||
case MODULAR_HASHING: | ||
return new ModularHashingNodeProvider(activeNodes); | ||
case CONSISTENT_HASHING: | ||
return new ConsistentHashingNodeProvider(activeNodesByConsistentHashing); | ||
default: | ||
throw new IllegalArgumentException(format("Unknown NodeSelectionHashStrategy: %s", nodeSelectionHashStrategy)); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we create one for each query? we can move the creation into NodeMap
constructor, and getActiveNodeProvider
only return the corresponding instance
In this way, we can move the specific node logic into ConsistentHashingNodeProvider
(the one we were discussing in NodeScheduler.java)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, but this constructor is really cheap. It doesn't really matter.
{ | ||
switch (nodeSelectionHashStrategy) { | ||
case MODULAR_HASHING: | ||
return new ModularHashingNodeProvider(activeNodes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we just use allNodes
to keep the original logic?
then we can also remove the hacky part 166~168 in SimpleNodeSelector.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original logic uses activeNodes
for hard affinity, and allNodes
only with soft affinity. ActiveNodes
should be the right one to use. allNodes
is the hack.
95705f6
to
f60ea2b
Compare
I added a test to test hash distribution. It's not very uniform. The node identifier and split identifier used in the test are not realistic. I don't know "random" the actual values would be and whether those would generate a better distribution. |
f60ea2b
to
6bc6bf7
Compare
Had a chat with @kewang1024 offline today. I addressed the comments. There are still 2 open questions and 1 follow up: |
NavigableMap<Integer, InternalNode> activeNodesByConsistentHashing = new TreeMap<>(); | ||
for (InternalNode node : nodes) { | ||
for (int i = 0; i < weight; i++) { | ||
activeNodesByConsistentHashing.put(murmur3_32().hashString(format("%s%d", node.getNodeIdentifier(), i), UTF_8).asInt(), node); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move HASH_FUNCTION.hashString(format("%s%d", key, i), UTF_8).asInt()
to a helper function and reuse it for hashing the node and the value, so that they are always in sync.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, thanks!