Skip to content

Commit

Permalink
[apache#956] refactor: Changes the Boolean flag that determines wheth…
Browse files Browse the repository at this point in the history
…er a Node is healthy to a state (apache#959)

### What changes were proposed in this pull request?

Change the ServerNode health status from the original Boolean judgment to the unhealthy state

### Why are the changes needed?

Unhealthy states should not be isolated

Fix: apache#956

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing unit tests
  • Loading branch information
yl09099 committed Jun 22, 2023
1 parent fbc0b22 commit 0e24225
Show file tree
Hide file tree
Showing 21 changed files with 150 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -364,11 +364,19 @@ private void logAssignmentResult(String appId, int shuffleId, PartitionRangeAssi
}

private ServerNode toServerNode(ShuffleServerHeartBeatRequest request) {
ServerStatus serverStatus = request.hasStatus() ? ServerStatus.fromProto(request.getStatus()) : ServerStatus.ACTIVE;
boolean isHealthy = true;
if (request.hasIsHealthy()) {
isHealthy = request.getIsHealthy().getValue();
/**
* Compatible with older version
*/
if (isHealthy) {
serverStatus = ServerStatus.ACTIVE;
} else {
serverStatus = ServerStatus.UNHEALTHY;
}
}
ServerStatus serverStatus = request.hasStatus() ? ServerStatus.fromProto(request.getStatus()) : ServerStatus.ACTIVE;
return new ServerNode(request.getServerId().getId(),
request.getServerId().getIp(),
request.getServerId().getPort(),
Expand All @@ -377,7 +385,6 @@ private ServerNode toServerNode(ShuffleServerHeartBeatRequest request) {
request.getAvailableMemory(),
request.getEventNumInFlush(),
Sets.newHashSet(request.getTagsList()),
isHealthy,
serverStatus,
StorageInfoUtils.fromProto(request.getStorageInfoMap()),
request.getServerId().getNettyPort());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,8 @@ public ServerNode(
long preAllocatedMemory,
long availableMemory,
int eventNumInFlush,
Set<String> tags,
boolean isHealthy) {
this(id, ip, port, usedMemory, preAllocatedMemory, availableMemory, eventNumInFlush, tags, isHealthy,
Set<String> tags) {
this(id, ip, port, usedMemory, preAllocatedMemory, availableMemory, eventNumInFlush, tags,
ServerStatus.ACTIVE, Maps.newHashMap());
}

Expand All @@ -65,9 +64,8 @@ public ServerNode(
long availableMemory,
int eventNumInFlush,
Set<String> tags,
boolean isHealthy,
ServerStatus status) {
this(id, ip, port, usedMemory, preAllocatedMemory, availableMemory, eventNumInFlush, tags, isHealthy,
this(id, ip, port, usedMemory, preAllocatedMemory, availableMemory, eventNumInFlush, tags,
status, Maps.newHashMap());
}

Expand All @@ -80,10 +78,9 @@ public ServerNode(
long availableMemory,
int eventNumInFlush,
Set<String> tags,
boolean isHealthy,
ServerStatus status,
Map<String, StorageInfo> storageInfoMap) {
this(id, ip, port, usedMemory, preAllocatedMemory, availableMemory, eventNumInFlush, tags, isHealthy,
this(id, ip, port, usedMemory, preAllocatedMemory, availableMemory, eventNumInFlush, tags,
status, storageInfoMap, -1);
}

Expand All @@ -96,7 +93,6 @@ public ServerNode(
long availableMemory,
int eventNumInFlush,
Set<String> tags,
boolean isHealthy,
ServerStatus status,
Map<String, StorageInfo> storageInfoMap,
int nettyPort) {
Expand All @@ -109,7 +105,7 @@ public ServerNode(
this.eventNumInFlush = eventNumInFlush;
this.timestamp = System.currentTimeMillis();
this.tags = tags;
this.status = isHealthy ? status : ServerStatus.UNHEALTHY;
this.status = status;
this.storageInfo = storageInfoMap;
if (nettyPort > 0) {
this.nettyPort = nettyPort;
Expand Down Expand Up @@ -156,10 +152,6 @@ public long getUsedMemory() {
public Set<String> getTags() {
return tags;
}

public boolean isHealthy() {
return this.status != ServerStatus.UNHEALTHY;
}

public ServerStatus getStatus() {
return status;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ void nodesCheck() {
sn.setStatus(ServerStatus.LOST);
lostNodes.add(sn);
unhealthyNodes.remove(sn);
} else if (!sn.isHealthy()) {
} else if (ServerStatus.UNHEALTHY.equals(sn.getStatus())) {
LOG.warn("Found server {} was unhealthy, will not assign it.", sn);
unhealthyNodes.add(sn);
lostNodes.remove(sn);
Expand Down Expand Up @@ -242,7 +242,7 @@ public List<ServerNode> getServerList(Set<String> requiredTags) {
}
if (!excludeNodes.contains(node.getId())
&& node.getTags().containsAll(requiredTags)
&& node.isHealthy()) {
&& ServerStatus.ACTIVE.equals(node.getStatus())) {
availableNodes.add(node);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.common.config.Reconfigurable;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.Constants;
Expand Down Expand Up @@ -67,7 +68,8 @@ public AccessClusterLoadChecker(AccessManager accessManager) throws Exception {
public AccessCheckResult check(AccessInfo accessInfo) {
Set<String> tags = accessInfo.getTags();
List<ServerNode> servers = clusterManager.getServerList(tags);
int size = (int) servers.stream().filter(ServerNode::isHealthy).filter(this::checkMemory).count();
int size = (int) servers.stream().filter(serverNode -> serverNode.getStatus()
.equals(ServerStatus.ACTIVE)).filter(this::checkMemory).count();

// If the hard constraint number exist, directly check it
if (availableServerNumThreshold != -1 && size >= availableServerNumThreshold) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ public class ServerNodeTest {
public void compareTest() {
Set<String> tags = Sets.newHashSet("test");
ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
10, tags, true);
10, tags);
ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
10, tags, true);
10, tags);
ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
11, tags, true);
11, tags);
List<ServerNode> nodes = Lists.newArrayList(sn1, sn2, sn3);
Collections.sort(nodes);
assertEquals("sn2", nodes.get(0).getId());
Expand All @@ -55,7 +55,7 @@ public void compareTest() {
@Test
public void testStorageInfoOfServerNode() {
Set<String> tags = Sets.newHashSet("tag");
ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20, 10, tags, true);
ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20, 10, tags);
// default constructor creates ServerNode with zero size of LocalStorage
assertEquals(0, sn1.getStorageInfo().size());
Map<String, StorageInfo> localStorageInfo = Maps.newHashMap();
Expand All @@ -67,7 +67,7 @@ public void testStorageInfoOfServerNode() {
StorageStatus.NORMAL);
localStorageInfo.put("/mnt", info);
ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 20, 10, tags,
true, ServerStatus.ACTIVE, localStorageInfo);
ServerStatus.ACTIVE, localStorageInfo);
assertEquals(1, sn2.getStorageInfo().size());
}

Expand All @@ -76,10 +76,10 @@ public void testNettyPort() {
Set<String> tags = Sets.newHashSet("tag");
Map<String, StorageInfo> localStorageInfo = Maps.newHashMap();
ServerNode sn1 = new ServerNode("sn1", "ip", 1, 100L, 50L, 20, 10, tags,
true, ServerStatus.ACTIVE, localStorageInfo);
ServerStatus.ACTIVE, localStorageInfo);
assertEquals(sn1.getNettyPort(), -1);
ServerNode sn2 = new ServerNode("sn2", "ip", 1, 100L, 50L, 20, 10, tags,
true, ServerStatus.ACTIVE, localStorageInfo, 2);
ServerStatus.ACTIVE, localStorageInfo, 2);
assertEquals(sn2.getNettyPort(), 2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,11 @@ public void getServerListTest() throws Exception {
try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new Configuration())) {

ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
10, grpcTags, true);
10, grpcTags);
ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
10, grpcTags, true);
10, grpcTags);
ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
11, grpcTags, true);
11, grpcTags);
clusterManager.add(sn1);
clusterManager.add(sn2);
clusterManager.add(sn3);
Expand All @@ -100,11 +100,11 @@ public void getServerListTest() throws Exception {

// tag changes
sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
10, Sets.newHashSet("new_tag"), true);
10, Sets.newHashSet("new_tag"));
sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
10, Sets.newHashSet("test", "new_tag"), true);
10, Sets.newHashSet("test", "new_tag"));
ServerNode sn4 = new ServerNode("sn4", "ip", 0, 100L, 51L, 20,
10, grpcTags, true);
10, grpcTags);
clusterManager.add(sn1);
clusterManager.add(sn2);
clusterManager.add(sn4);
Expand Down Expand Up @@ -137,11 +137,11 @@ public void getLostServerListTest() throws Exception {
coordinatorConf.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 300L);
SimpleClusterManager clusterManager = new SimpleClusterManager(coordinatorConf, new Configuration());
ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
10, grpcTags, true);
10, grpcTags);
ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
10, grpcTags, true);
10, grpcTags);
ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
11, grpcTags, true);
11, grpcTags);
clusterManager.add(sn1);
clusterManager.add(sn2);
clusterManager.add(sn3);
Expand All @@ -154,7 +154,7 @@ public void getLostServerListTest() throws Exception {
);
// re-register sn3
sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
11, grpcTags, true);
11, grpcTags);
clusterManager.add(sn3);
Set<String> expectedIdsre = Sets.newHashSet("sn1", "sn2");
await().atMost(1, TimeUnit.SECONDS).until(() -> {
Expand All @@ -173,13 +173,13 @@ public void getUnhealthyServerList() throws Exception {
coordinatorConf.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 300L);
SimpleClusterManager clusterManager = new SimpleClusterManager(coordinatorConf, new Configuration());
ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
10, grpcTags, true);
10, grpcTags);
ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
10, grpcTags, true);
10, grpcTags);
ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
11, grpcTags, false);
11, grpcTags, ServerStatus.UNHEALTHY);
ServerNode sn4 = new ServerNode("sn4", "ip", 0, 100L, 50L, 20,
11, grpcTags, false);
11, grpcTags, ServerStatus.UNHEALTHY);
clusterManager.add(sn1);
clusterManager.add(sn2);
clusterManager.add(sn3);
Expand All @@ -193,7 +193,7 @@ public void getUnhealthyServerList() throws Exception {
});
// Register unhealthy node sn3 again
sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
11, grpcTags, false);
11, grpcTags, ServerStatus.UNHEALTHY);
clusterManager.add(sn3);
Set<String> expectedIdsre = Sets.newHashSet("sn3");
await().atMost(1, TimeUnit.SECONDS).until(() -> {
Expand All @@ -215,13 +215,13 @@ public void getServerListForNettyTest() throws Exception {
try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new Configuration())) {

ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
10, nettyTags, true, ServerStatus.ACTIVE, JavaUtils.newConcurrentMap(), 1);
10, nettyTags, ServerStatus.ACTIVE, JavaUtils.newConcurrentMap(), 1);
ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
10, nettyTags, true, ServerStatus.ACTIVE, JavaUtils.newConcurrentMap(), 1);
10, nettyTags, ServerStatus.ACTIVE, JavaUtils.newConcurrentMap(), 1);
ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
11, nettyTags, true, ServerStatus.ACTIVE, JavaUtils.newConcurrentMap(), 1);
11, nettyTags, ServerStatus.ACTIVE, JavaUtils.newConcurrentMap(), 1);
ServerNode sn4 = new ServerNode("sn4", "ip", 0, 100L, 50L, 20,
11, grpcTags, true);
11, grpcTags);
clusterManager.add(sn1);
clusterManager.add(sn2);
clusterManager.add(sn3);
Expand All @@ -241,12 +241,12 @@ public void getServerListForNettyTest() throws Exception {

// tag changes
sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
10, Sets.newHashSet("new_tag"), true, ServerStatus.ACTIVE, JavaUtils.newConcurrentMap(), 1);
10, Sets.newHashSet("new_tag"), ServerStatus.ACTIVE, JavaUtils.newConcurrentMap(), 1);
sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
10, Sets.newHashSet("test", "new_tag"),
true, ServerStatus.ACTIVE, JavaUtils.newConcurrentMap(), 1);
ServerStatus.ACTIVE, JavaUtils.newConcurrentMap(), 1);
sn4 = new ServerNode("sn4", "ip", 0, 100L, 51L, 20,
10, grpcTags, true);
10, grpcTags);
clusterManager.add(sn1);
clusterManager.add(sn2);
clusterManager.add(sn4);
Expand Down Expand Up @@ -275,11 +275,11 @@ public void testGetCorrectServerNodesWhenOneNodeRemovedAndUnhealthyNodeFound() t
ssc.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 30 * 1000L);
try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new Configuration())) {
ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
10, testTags, false);
10, testTags, ServerStatus.UNHEALTHY);
ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
10, testTags, true);
10, testTags);
ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
11, testTags, true);
11, testTags);
clusterManager.add(sn1);
clusterManager.add(sn2);
clusterManager.add(sn3);
Expand All @@ -303,7 +303,7 @@ public void testGetCorrectServerNodesWhenOneNodeRemovedAndUnhealthyNodeFound() t
}

private void addNode(String id, SimpleClusterManager clusterManager) {
ServerNode node = new ServerNode(id, "ip", 0, 100L, 50L, 30L, 10, testTags, true);
ServerNode node = new ServerNode(id, "ip", 0, 100L, 50L, 30L, 10, testTags);
LOG.info("Add node " + node.getId() + " " + node.getTimestamp());
clusterManager.add(node);
}
Expand Down Expand Up @@ -337,11 +337,11 @@ public void testGetCorrectServerNodesWhenOneNodeRemoved() throws Exception {
ssc.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 30 * 1000L);
try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new Configuration())) {
ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
10, testTags, true);
10, testTags);
ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
10, testTags, true);
10, testTags);
ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
11, testTags, true);
11, testTags);
clusterManager.add(sn1);
clusterManager.add(sn2);
clusterManager.add(sn3);
Expand Down Expand Up @@ -369,13 +369,13 @@ public void updateExcludeNodesTest() throws Exception {

try (SimpleClusterManager scm = new SimpleClusterManager(ssc, new Configuration())) {
scm.add(new ServerNode("node1-1999", "ip", 0, 100L, 50L, 20,
10, testTags, true));
10, testTags));
scm.add(new ServerNode("node2-1999", "ip", 0, 100L, 50L, 20,
10, testTags, true));
10, testTags));
scm.add(new ServerNode("node3-1999", "ip", 0, 100L, 50L, 20,
10, testTags, true));
10, testTags));
scm.add(new ServerNode("node4-1999", "ip", 0, 100L, 50L, 20,
10, testTags, true));
10, testTags));
assertTrue(scm.getExcludeNodes().isEmpty());

final Set<String> nodes = Sets.newHashSet("node1-1999", "node2-1999");
Expand Down
Loading

0 comments on commit 0e24225

Please sign in to comment.