diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java index e012f03b38..1ab782d74f 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java @@ -364,11 +364,19 @@ private void logAssignmentResult(String appId, int shuffleId, PartitionRangeAssi } private ServerNode toServerNode(ShuffleServerHeartBeatRequest request) { + ServerStatus serverStatus = request.hasStatus() ? ServerStatus.fromProto(request.getStatus()) : ServerStatus.ACTIVE; boolean isHealthy = true; if (request.hasIsHealthy()) { isHealthy = request.getIsHealthy().getValue(); + /** + * Compatible with older version + */ + if (isHealthy) { + serverStatus = ServerStatus.ACTIVE; + } else { + serverStatus = ServerStatus.UNHEALTHY; + } } - ServerStatus serverStatus = request.hasStatus() ? ServerStatus.fromProto(request.getStatus()) : ServerStatus.ACTIVE; return new ServerNode(request.getServerId().getId(), request.getServerId().getIp(), request.getServerId().getPort(), @@ -377,7 +385,6 @@ private ServerNode toServerNode(ShuffleServerHeartBeatRequest request) { request.getAvailableMemory(), request.getEventNumInFlush(), Sets.newHashSet(request.getTagsList()), - isHealthy, serverStatus, StorageInfoUtils.fromProto(request.getStorageInfoMap()), request.getServerId().getNettyPort()); diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java index 6cc720ede5..cafa8b544e 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java @@ -50,9 +50,8 @@ public ServerNode( long preAllocatedMemory, long availableMemory, int eventNumInFlush, - Set tags, - boolean isHealthy) { - this(id, ip, port, usedMemory, preAllocatedMemory, availableMemory, eventNumInFlush, tags, isHealthy, + Set tags) { + this(id, ip, port, usedMemory, preAllocatedMemory, availableMemory, eventNumInFlush, tags, ServerStatus.ACTIVE, Maps.newHashMap()); } @@ -65,9 +64,8 @@ public ServerNode( long availableMemory, int eventNumInFlush, Set tags, - boolean isHealthy, ServerStatus status) { - this(id, ip, port, usedMemory, preAllocatedMemory, availableMemory, eventNumInFlush, tags, isHealthy, + this(id, ip, port, usedMemory, preAllocatedMemory, availableMemory, eventNumInFlush, tags, status, Maps.newHashMap()); } @@ -80,10 +78,9 @@ public ServerNode( long availableMemory, int eventNumInFlush, Set tags, - boolean isHealthy, ServerStatus status, Map storageInfoMap) { - this(id, ip, port, usedMemory, preAllocatedMemory, availableMemory, eventNumInFlush, tags, isHealthy, + this(id, ip, port, usedMemory, preAllocatedMemory, availableMemory, eventNumInFlush, tags, status, storageInfoMap, -1); } @@ -96,7 +93,6 @@ public ServerNode( long availableMemory, int eventNumInFlush, Set tags, - boolean isHealthy, ServerStatus status, Map storageInfoMap, int nettyPort) { @@ -109,7 +105,7 @@ public ServerNode( this.eventNumInFlush = eventNumInFlush; this.timestamp = System.currentTimeMillis(); this.tags = tags; - this.status = isHealthy ? status : ServerStatus.UNHEALTHY; + this.status = status; this.storageInfo = storageInfoMap; if (nettyPort > 0) { this.nettyPort = nettyPort; @@ -156,10 +152,6 @@ public long getUsedMemory() { public Set getTags() { return tags; } - - public boolean isHealthy() { - return this.status != ServerStatus.UNHEALTHY; - } public ServerStatus getStatus() { return status; diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java index a11823ea88..6729e8f3c8 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java @@ -135,7 +135,7 @@ void nodesCheck() { sn.setStatus(ServerStatus.LOST); lostNodes.add(sn); unhealthyNodes.remove(sn); - } else if (!sn.isHealthy()) { + } else if (ServerStatus.UNHEALTHY.equals(sn.getStatus())) { LOG.warn("Found server {} was unhealthy, will not assign it.", sn); unhealthyNodes.add(sn); lostNodes.remove(sn); @@ -242,7 +242,7 @@ public List getServerList(Set requiredTags) { } if (!excludeNodes.contains(node.getId()) && node.getTags().containsAll(requiredTags) - && node.isHealthy()) { + && ServerStatus.ACTIVE.equals(node.getStatus())) { availableNodes.add(node); } } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessClusterLoadChecker.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessClusterLoadChecker.java index 67eb584dbf..093709a334 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessClusterLoadChecker.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessClusterLoadChecker.java @@ -24,6 +24,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.uniffle.common.ServerStatus; import org.apache.uniffle.common.config.Reconfigurable; import org.apache.uniffle.common.config.RssConf; import org.apache.uniffle.common.util.Constants; @@ -67,7 +68,8 @@ public AccessClusterLoadChecker(AccessManager accessManager) throws Exception { public AccessCheckResult check(AccessInfo accessInfo) { Set tags = accessInfo.getTags(); List servers = clusterManager.getServerList(tags); - int size = (int) servers.stream().filter(ServerNode::isHealthy).filter(this::checkMemory).count(); + int size = (int) servers.stream().filter(serverNode -> serverNode.getStatus() + .equals(ServerStatus.ACTIVE)).filter(this::checkMemory).count(); // If the hard constraint number exist, directly check it if (availableServerNumThreshold != -1 && size >= availableServerNumThreshold) { diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/ServerNodeTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/ServerNodeTest.java index f60ff4978d..7fc4041a8c 100644 --- a/coordinator/src/test/java/org/apache/uniffle/coordinator/ServerNodeTest.java +++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/ServerNodeTest.java @@ -40,11 +40,11 @@ public class ServerNodeTest { public void compareTest() { Set tags = Sets.newHashSet("test"); ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20, - 10, tags, true); + 10, tags); ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21, - 10, tags, true); + 10, tags); ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20, - 11, tags, true); + 11, tags); List nodes = Lists.newArrayList(sn1, sn2, sn3); Collections.sort(nodes); assertEquals("sn2", nodes.get(0).getId()); @@ -55,7 +55,7 @@ public void compareTest() { @Test public void testStorageInfoOfServerNode() { Set tags = Sets.newHashSet("tag"); - ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20, 10, tags, true); + ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20, 10, tags); // default constructor creates ServerNode with zero size of LocalStorage assertEquals(0, sn1.getStorageInfo().size()); Map localStorageInfo = Maps.newHashMap(); @@ -67,7 +67,7 @@ public void testStorageInfoOfServerNode() { StorageStatus.NORMAL); localStorageInfo.put("/mnt", info); ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 20, 10, tags, - true, ServerStatus.ACTIVE, localStorageInfo); + ServerStatus.ACTIVE, localStorageInfo); assertEquals(1, sn2.getStorageInfo().size()); } @@ -76,10 +76,10 @@ public void testNettyPort() { Set tags = Sets.newHashSet("tag"); Map localStorageInfo = Maps.newHashMap(); ServerNode sn1 = new ServerNode("sn1", "ip", 1, 100L, 50L, 20, 10, tags, - true, ServerStatus.ACTIVE, localStorageInfo); + ServerStatus.ACTIVE, localStorageInfo); assertEquals(sn1.getNettyPort(), -1); ServerNode sn2 = new ServerNode("sn2", "ip", 1, 100L, 50L, 20, 10, tags, - true, ServerStatus.ACTIVE, localStorageInfo, 2); + ServerStatus.ACTIVE, localStorageInfo, 2); assertEquals(sn2.getNettyPort(), 2); } } diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java index e08193f6bd..1457847f4e 100644 --- a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java +++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java @@ -85,11 +85,11 @@ public void getServerListTest() throws Exception { try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new Configuration())) { ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20, - 10, grpcTags, true); + 10, grpcTags); ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21, - 10, grpcTags, true); + 10, grpcTags); ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20, - 11, grpcTags, true); + 11, grpcTags); clusterManager.add(sn1); clusterManager.add(sn2); clusterManager.add(sn3); @@ -100,11 +100,11 @@ public void getServerListTest() throws Exception { // tag changes sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20, - 10, Sets.newHashSet("new_tag"), true); + 10, Sets.newHashSet("new_tag")); sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21, - 10, Sets.newHashSet("test", "new_tag"), true); + 10, Sets.newHashSet("test", "new_tag")); ServerNode sn4 = new ServerNode("sn4", "ip", 0, 100L, 51L, 20, - 10, grpcTags, true); + 10, grpcTags); clusterManager.add(sn1); clusterManager.add(sn2); clusterManager.add(sn4); @@ -137,11 +137,11 @@ public void getLostServerListTest() throws Exception { coordinatorConf.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 300L); SimpleClusterManager clusterManager = new SimpleClusterManager(coordinatorConf, new Configuration()); ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20, - 10, grpcTags, true); + 10, grpcTags); ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21, - 10, grpcTags, true); + 10, grpcTags); ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20, - 11, grpcTags, true); + 11, grpcTags); clusterManager.add(sn1); clusterManager.add(sn2); clusterManager.add(sn3); @@ -154,7 +154,7 @@ public void getLostServerListTest() throws Exception { ); // re-register sn3 sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20, - 11, grpcTags, true); + 11, grpcTags); clusterManager.add(sn3); Set expectedIdsre = Sets.newHashSet("sn1", "sn2"); await().atMost(1, TimeUnit.SECONDS).until(() -> { @@ -173,13 +173,13 @@ public void getUnhealthyServerList() throws Exception { coordinatorConf.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 300L); SimpleClusterManager clusterManager = new SimpleClusterManager(coordinatorConf, new Configuration()); ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20, - 10, grpcTags, true); + 10, grpcTags); ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21, - 10, grpcTags, true); + 10, grpcTags); ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20, - 11, grpcTags, false); + 11, grpcTags, ServerStatus.UNHEALTHY); ServerNode sn4 = new ServerNode("sn4", "ip", 0, 100L, 50L, 20, - 11, grpcTags, false); + 11, grpcTags, ServerStatus.UNHEALTHY); clusterManager.add(sn1); clusterManager.add(sn2); clusterManager.add(sn3); @@ -193,7 +193,7 @@ public void getUnhealthyServerList() throws Exception { }); // Register unhealthy node sn3 again sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20, - 11, grpcTags, false); + 11, grpcTags, ServerStatus.UNHEALTHY); clusterManager.add(sn3); Set expectedIdsre = Sets.newHashSet("sn3"); await().atMost(1, TimeUnit.SECONDS).until(() -> { @@ -215,13 +215,13 @@ public void getServerListForNettyTest() throws Exception { try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new Configuration())) { ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20, - 10, nettyTags, true, ServerStatus.ACTIVE, JavaUtils.newConcurrentMap(), 1); + 10, nettyTags, ServerStatus.ACTIVE, JavaUtils.newConcurrentMap(), 1); ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21, - 10, nettyTags, true, ServerStatus.ACTIVE, JavaUtils.newConcurrentMap(), 1); + 10, nettyTags, ServerStatus.ACTIVE, JavaUtils.newConcurrentMap(), 1); ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20, - 11, nettyTags, true, ServerStatus.ACTIVE, JavaUtils.newConcurrentMap(), 1); + 11, nettyTags, ServerStatus.ACTIVE, JavaUtils.newConcurrentMap(), 1); ServerNode sn4 = new ServerNode("sn4", "ip", 0, 100L, 50L, 20, - 11, grpcTags, true); + 11, grpcTags); clusterManager.add(sn1); clusterManager.add(sn2); clusterManager.add(sn3); @@ -241,12 +241,12 @@ public void getServerListForNettyTest() throws Exception { // tag changes sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20, - 10, Sets.newHashSet("new_tag"), true, ServerStatus.ACTIVE, JavaUtils.newConcurrentMap(), 1); + 10, Sets.newHashSet("new_tag"), ServerStatus.ACTIVE, JavaUtils.newConcurrentMap(), 1); sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21, 10, Sets.newHashSet("test", "new_tag"), - true, ServerStatus.ACTIVE, JavaUtils.newConcurrentMap(), 1); + ServerStatus.ACTIVE, JavaUtils.newConcurrentMap(), 1); sn4 = new ServerNode("sn4", "ip", 0, 100L, 51L, 20, - 10, grpcTags, true); + 10, grpcTags); clusterManager.add(sn1); clusterManager.add(sn2); clusterManager.add(sn4); @@ -275,11 +275,11 @@ public void testGetCorrectServerNodesWhenOneNodeRemovedAndUnhealthyNodeFound() t ssc.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 30 * 1000L); try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new Configuration())) { ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20, - 10, testTags, false); + 10, testTags, ServerStatus.UNHEALTHY); ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21, - 10, testTags, true); + 10, testTags); ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20, - 11, testTags, true); + 11, testTags); clusterManager.add(sn1); clusterManager.add(sn2); clusterManager.add(sn3); @@ -303,7 +303,7 @@ public void testGetCorrectServerNodesWhenOneNodeRemovedAndUnhealthyNodeFound() t } private void addNode(String id, SimpleClusterManager clusterManager) { - ServerNode node = new ServerNode(id, "ip", 0, 100L, 50L, 30L, 10, testTags, true); + ServerNode node = new ServerNode(id, "ip", 0, 100L, 50L, 30L, 10, testTags); LOG.info("Add node " + node.getId() + " " + node.getTimestamp()); clusterManager.add(node); } @@ -337,11 +337,11 @@ public void testGetCorrectServerNodesWhenOneNodeRemoved() throws Exception { ssc.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 30 * 1000L); try (SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new Configuration())) { ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20, - 10, testTags, true); + 10, testTags); ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21, - 10, testTags, true); + 10, testTags); ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20, - 11, testTags, true); + 11, testTags); clusterManager.add(sn1); clusterManager.add(sn2); clusterManager.add(sn3); @@ -369,13 +369,13 @@ public void updateExcludeNodesTest() throws Exception { try (SimpleClusterManager scm = new SimpleClusterManager(ssc, new Configuration())) { scm.add(new ServerNode("node1-1999", "ip", 0, 100L, 50L, 20, - 10, testTags, true)); + 10, testTags)); scm.add(new ServerNode("node2-1999", "ip", 0, 100L, 50L, 20, - 10, testTags, true)); + 10, testTags)); scm.add(new ServerNode("node3-1999", "ip", 0, 100L, 50L, 20, - 10, testTags, true)); + 10, testTags)); scm.add(new ServerNode("node4-1999", "ip", 0, 100L, 50L, 20, - 10, testTags, true)); + 10, testTags)); assertTrue(scm.getExcludeNodes().isEmpty()); final Set nodes = Sets.newHashSet("node1-1999", "node2-1999"); diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessClusterLoadCheckerTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessClusterLoadCheckerTest.java index a48e69a464..eeb3b7d03d 100644 --- a/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessClusterLoadCheckerTest.java +++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessClusterLoadCheckerTest.java @@ -30,6 +30,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.apache.uniffle.common.ServerStatus; import org.apache.uniffle.coordinator.AccessManager; import org.apache.uniffle.coordinator.ApplicationManager; import org.apache.uniffle.coordinator.ClusterManager; @@ -75,8 +76,7 @@ public void testAccessInfoRequiredShuffleServers() throws Exception { 20, 1000, 0, - null, - true); + null); ServerNode node2 = new ServerNode( "1", "1", @@ -85,8 +85,7 @@ public void testAccessInfoRequiredShuffleServers() throws Exception { 20, 1000, 0, - null, - true); + null); nodes.add(node1); nodes.add(node2); @@ -155,7 +154,7 @@ public void testWhenAvailableServerThresholdSpecified() throws Exception { 30, 0, null, - false); + ServerStatus.UNHEALTHY); serverNodeList.add(node1); final String filePath = Objects.requireNonNull( getClass().getClassLoader().getResource("coordinator.conf")).getFile(); @@ -178,8 +177,7 @@ public void testWhenAvailableServerThresholdSpecified() throws Exception { 40, 10, 0, - null, - true); + null); serverNodeList.add(node2); ServerNode node3 = new ServerNode( "1", @@ -189,8 +187,7 @@ public void testWhenAvailableServerThresholdSpecified() throws Exception { 25, 20, 0, - null, - true); + null); serverNodeList.add(node3); assertFalse(accessClusterLoadChecker.check(new AccessInfo("test")).isSuccess()); ServerNode node4 = new ServerNode( @@ -201,8 +198,7 @@ public void testWhenAvailableServerThresholdSpecified() throws Exception { 25, 25, 0, - null, - true); + null); serverNodeList.add(node4); assertTrue(accessClusterLoadChecker.check(new AccessInfo("test")).isSuccess()); } diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessQuotaCheckerTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessQuotaCheckerTest.java index 79aabb753a..4d61a50b0e 100644 --- a/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessQuotaCheckerTest.java +++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessQuotaCheckerTest.java @@ -72,8 +72,7 @@ public void testAccessInfoRequiredShuffleServers() throws Exception { 20, 1000, 0, - null, - true); + null); ServerNode node2 = new ServerNode( "1", "1", @@ -82,8 +81,7 @@ public void testAccessInfoRequiredShuffleServers() throws Exception { 20, 1000, 0, - null, - true); + null); nodes.add(node1); nodes.add(node2); diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/BasicAssignmentStrategyTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/BasicAssignmentStrategyTest.java index 03992edc06..4ffdd40767 100644 --- a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/BasicAssignmentStrategyTest.java +++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/BasicAssignmentStrategyTest.java @@ -68,7 +68,7 @@ public void tearDown() throws IOException { public void testAssign() { for (int i = 0; i < 20; ++i) { clusterManager.add(new ServerNode(String.valueOf(i), "127.0.0." + i, 0, 0, 0, - 20 - i, 0, tags, true)); + 20 - i, 0, tags)); } PartitionRangeAssignment pra = strategy.assign(100, 10, 2, tags, -1, -1); @@ -95,7 +95,7 @@ public void testAssign() { public void testRandomAssign() { for (int i = 0; i < 20; ++i) { clusterManager.add(new ServerNode(String.valueOf(i), "127.0.0." + i, 0, 0, 0, - 0, 0, tags, true)); + 0, 0, tags)); } PartitionRangeAssignment pra = strategy.assign(100, 10, 2, tags, -1, -1); SortedMap> assignments = pra.getAssignments(); @@ -118,11 +118,11 @@ public void testRandomAssign() { @Test public void testAssignWithDifferentNodeNum() { final ServerNode sn1 = new ServerNode("sn1", "", 0, 0, 0, - 20, 0, tags, true); + 20, 0, tags); final ServerNode sn2 = new ServerNode("sn2", "", 0, 0, 0, - 10, 0, tags, true); + 10, 0, tags); final ServerNode sn3 = new ServerNode("sn3", "", 0, 0, 0, - 0, 0, tags, true); + 0, 0, tags); clusterManager.add(sn1); PartitionRangeAssignment pra = strategy.assign(100, 10, 2, tags, -1, -1); @@ -161,7 +161,7 @@ public void testAssignmentShuffleNodesNum() { for (int i = 0; i < 20; ++i) { clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, - 20 - i, 0, serverTags, true)); + 20 - i, 0, serverTags)); } /** @@ -231,7 +231,7 @@ public void testAssignmentShuffleNodesNum() { serverTags = Sets.newHashSet("tag-2"); for (int i = 0; i < shuffleNodesMax - 1; ++i) { clusterManager.add(new ServerNode("t2-" + i, "", 0, 0, 0, - 20 - i, 0, serverTags, true)); + 20 - i, 0, serverTags)); } pra = strategy.assign(100, 10, 1, serverTags, shuffleNodesMax, -1); assertEquals( @@ -283,8 +283,7 @@ void updateServerResource(List resources) { 5L, resources.get(i), 5, - tags, - true); + tags); clusterManager.add(node); } } diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategyTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategyTest.java index f4dcb25992..19af678a7f 100644 --- a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategyTest.java +++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategyTest.java @@ -194,8 +194,7 @@ void updateServerResource(List resources) { 5L, resources.get(i), 5, - tags, - true); + tags); clusterManager.add(node); } } @@ -206,7 +205,7 @@ public void testAssignmentShuffleNodesNum() { for (int i = 0; i < 20; ++i) { clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, - 20 - i, 0, serverTags, true)); + 20 - i, 0, serverTags)); } /** @@ -276,7 +275,7 @@ public void testAssignmentShuffleNodesNum() { serverTags = Sets.newHashSet("tag-2"); for (int i = 0; i < shuffleNodesMax - 1; ++i) { clusterManager.add(new ServerNode("t2-" + i, "127.0.0." + i, 0, 0, 0, - 20 - i, 0, serverTags, true)); + 20 - i, 0, serverTags)); } pra = strategy.assign(100, 1, 1, serverTags, shuffleNodesMax, -1); assertEquals( @@ -304,11 +303,11 @@ public void testAssignmentWithMustDiff() throws Exception { for (int i = 0; i < 5; ++i) { clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, - 20 - i, 0, serverTags, true)); + 20 - i, 0, serverTags)); } for (int i = 0; i < 5; ++i) { clusterManager.add(new ServerNode("t2-" + i, "127.0.0." + i, 1, 0, 0, - 20 - i, 0, serverTags, true)); + 20 - i, 0, serverTags)); } PartitionRangeAssignment pra = strategy.assign(100, 1, 5, serverTags, -1, -1); pra.getAssignments().values().forEach((nodeList) -> { @@ -349,11 +348,11 @@ public void testAssignmentWithPreferDiff() throws Exception { for (int i = 0; i < 3; ++i) { clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, - 20 - i, 0, serverTags, true)); + 20 - i, 0, serverTags)); } for (int i = 0; i < 2; ++i) { clusterManager.add(new ServerNode("t2-" + i, "127.0.0." + i, 1, 0, 0, - 20 - i, 0, serverTags, true)); + 20 - i, 0, serverTags)); } PartitionRangeAssignment pra = strategy.assign(100, 1, 5, serverTags, -1, -1); pra.getAssignments().values().forEach((nodeList) -> { @@ -364,11 +363,11 @@ public void testAssignmentWithPreferDiff() throws Exception { clusterManager = new SimpleClusterManager(ssc, new Configuration()); for (int i = 0; i < 3; ++i) { clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, - 20 - i, 0, serverTags, true)); + 20 - i, 0, serverTags)); } for (int i = 0; i < 2; ++i) { clusterManager.add(new ServerNode("t2-" + i, "127.0.0." + i, 1, 0, 0, - 20 - i, 0, serverTags, true)); + 20 - i, 0, serverTags)); } strategy = new PartitionBalanceAssignmentStrategy(clusterManager, ssc); pra = strategy.assign(100, 1, 3, serverTags, -1, -1); @@ -394,11 +393,11 @@ public void testAssignmentWithNone() throws Exception { for (int i = 0; i < 3; ++i) { clusterManager.add(new ServerNode("t1-" + i, "127.0.0." + i, 0, 0, 0, - 20 - i, 0, serverTags, true)); + 20 - i, 0, serverTags)); } for (int i = 0; i < 2; ++i) { clusterManager.add(new ServerNode("t2-" + i, "127.0.0." + i, 1, 0, 0, - 20 - i, 0, serverTags, true)); + 20 - i, 0, serverTags)); } PartitionRangeAssignment pra = strategy.assign(100, 1, 5, serverTags, -1, -1); pra.getAssignments().values().forEach((nodeList) -> { diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionRangeAssignmentTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionRangeAssignmentTest.java index 4d3dc215f2..806801ddc7 100644 --- a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionRangeAssignmentTest.java +++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionRangeAssignmentTest.java @@ -40,7 +40,7 @@ public void test() { for (int i = 0; i < 9; i = i + 3) { PartitionRange range = new PartitionRange(i, i + 2); List nodes = Collections.singletonList(new ServerNode( - String.valueOf(i), "127.0.0." + i, i / 3, 0, 0, 0, 0, Sets.newHashSet("test"), true)); + String.valueOf(i), "127.0.0." + i, i / 3, 0, 0, 0, 0, Sets.newHashSet("test"))); sortedMap.put(range, nodes); } diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/partition/ContinuousSelectPartitionStrategyTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/partition/ContinuousSelectPartitionStrategyTest.java index 26329d7d75..926153067f 100644 --- a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/partition/ContinuousSelectPartitionStrategyTest.java +++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/partition/ContinuousSelectPartitionStrategyTest.java @@ -89,8 +89,7 @@ private List generateServerResource(List resources) { 5L, resources.get(i), 5, - tags, - true); + tags); serverNodes.add(node); } return serverNodes; diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/HealthCheckCoordinatorGrpcTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/HealthCheckCoordinatorGrpcTest.java index 8338aa92a4..313a1a2b27 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/HealthCheckCoordinatorGrpcTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/HealthCheckCoordinatorGrpcTest.java @@ -32,6 +32,7 @@ import org.apache.uniffle.client.request.RssGetShuffleAssignmentsRequest; import org.apache.uniffle.client.response.RssGetShuffleAssignmentsResponse; +import org.apache.uniffle.common.ServerStatus; import org.apache.uniffle.common.rpc.StatusCode; import org.apache.uniffle.common.util.Constants; import org.apache.uniffle.coordinator.CoordinatorConf; @@ -41,7 +42,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; public class HealthCheckCoordinatorGrpcTest extends CoordinatorTestBase { @@ -113,7 +113,7 @@ public void healthCheckTest() throws Exception { coordinatorClient.getShuffleAssignments(request); assertFalse(response.getPartitionToServers().isEmpty()); for (ServerNode node : nodes) { - assertTrue(node.isHealthy()); + assertEquals(ServerStatus.ACTIVE, node.getStatus()); } byte[] bytes = new byte[writeDataSize]; new Random().nextBytes(bytes); @@ -125,7 +125,7 @@ public void healthCheckTest() throws Exception { nodes = coordinators.get(0).getClusterManager().list(); assertEquals(2, nodes.size()); for (ServerNode node : nodes) { - assertFalse(node.isHealthy()); + assertEquals(ServerStatus.UNHEALTHY, node.getStatus()); } nodes = coordinators.get(0).getClusterManager().getServerList(Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION)); assertEquals(0, nodes.size()); @@ -144,7 +144,7 @@ public void healthCheckTest() throws Exception { } } while (nodes.size() != 2); for (ServerNode node : nodes) { - assertTrue(node.isHealthy()); + assertEquals(ServerStatus.ACTIVE, node.getStatus()); } assertEquals(2, nodes.size()); response = diff --git a/server/src/test/java/org/apache/uniffle/server/HealthCheckTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/HealthCheckTest.java similarity index 76% rename from server/src/test/java/org/apache/uniffle/server/HealthCheckTest.java rename to integration-test/common/src/test/java/org/apache/uniffle/test/HealthCheckTest.java index f1bbd0c662..c0b8001351 100644 --- a/server/src/test/java/org/apache/uniffle/server/HealthCheckTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/HealthCheckTest.java @@ -15,24 +15,29 @@ * limitations under the License. */ -package org.apache.uniffle.server; +package org.apache.uniffle.test; import java.util.Arrays; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import com.google.common.collect.Lists; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.apache.uniffle.common.ServerStatus; +import org.apache.uniffle.server.HealthCheck; +import org.apache.uniffle.server.HealthyMockChecker; +import org.apache.uniffle.server.LocalStorageChecker; +import org.apache.uniffle.server.ShuffleServerConf; +import org.apache.uniffle.server.ShuffleServerMetrics; +import org.apache.uniffle.server.UnHealthyMockChecker; import org.apache.uniffle.storage.util.StorageType; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -public class HealthCheckTest { - +public class HealthCheckTest extends CoordinatorTestBase { @BeforeAll public static void setup() { ShuffleServerMetrics.register(); @@ -73,26 +78,26 @@ public void buildInCheckerTest() { } @Test - public void checkTest() { - AtomicBoolean healthy = new AtomicBoolean(false); + public void checkTest() throws Exception { ShuffleServerConf conf = new ShuffleServerConf(); + AtomicReference serverStatusAtomicReference = new AtomicReference(ServerStatus.ACTIVE); conf.setString(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES.key(), HealthyMockChecker.class.getCanonicalName()); - HealthCheck checker = new HealthCheck(healthy, conf, Lists.newArrayList()); + HealthCheck checker = new HealthCheck(serverStatusAtomicReference, conf, Lists.newArrayList()); checker.check(); - assertTrue(healthy.get()); + assertEquals(ServerStatus.ACTIVE, checker.getServerStatus()); assertEquals(0, ShuffleServerMetrics.gaugeIsHealthy.get()); conf.setString(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES.key(), UnHealthyMockChecker.class.getCanonicalName()); - checker = new HealthCheck(healthy, conf, Lists.newArrayList()); + checker = new HealthCheck(serverStatusAtomicReference, conf, Lists.newArrayList()); checker.check(); - assertFalse(healthy.get()); + assertEquals(ServerStatus.UNHEALTHY, checker.getServerStatus()); assertEquals(1, ShuffleServerMetrics.gaugeIsHealthy.get()); conf.setString(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES.key(), UnHealthyMockChecker.class.getCanonicalName() + "," + HealthyMockChecker.class.getCanonicalName()); - checker = new HealthCheck(healthy, conf, Lists.newArrayList()); + checker = new HealthCheck(serverStatusAtomicReference, conf, Lists.newArrayList()); checker.check(); - assertFalse(healthy.get()); + assertEquals(ServerStatus.UNHEALTHY, checker.getServerStatus()); assertEquals(1, ShuffleServerMetrics.gaugeIsHealthy.get()); } diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java index 4ecaf78f4a..08625404f3 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java @@ -26,7 +26,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.protobuf.BoolValue; import com.google.protobuf.Empty; import io.grpc.ManagedChannel; import io.grpc.StatusRuntimeException; @@ -117,7 +116,6 @@ public ShuffleServerHeartBeatResponse doSendHeartBeat( int eventNumInFlush, long timeout, Set tags, - boolean isHealthy, ServerStatus serverStatus, Map storageInfo, int nettyPort) { @@ -131,7 +129,6 @@ public ShuffleServerHeartBeatResponse doSendHeartBeat( .setAvailableMemory(availableMemory) .setEventNumInFlush(eventNumInFlush) .addAllTags(tags) - .setIsHealthy(BoolValue.newBuilder().setValue(isHealthy).build()) .setStatusValue(serverStatus.ordinal()) .putAllStorageInfo(StorageInfoUtils.toProto(storageInfo)) .build(); @@ -197,7 +194,6 @@ public RssSendHeartBeatResponse sendHeartBeat(RssSendHeartBeatRequest request) { request.getEventNumInFlush(), request.getTimeout(), request.getTags(), - request.isHealthy(), request.getServerStatus(), request.getStorageInfo(), request.getNettyPort()); diff --git a/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java b/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java index afa5d0edaf..a9d2137715 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java @@ -35,7 +35,6 @@ public class RssSendHeartBeatRequest { private final int eventNumInFlush; private final Set tags; private final long timeout; - private final boolean isHealthy; private final ServerStatus serverStatus; private final Map storageInfo; private final int nettyPort; @@ -50,7 +49,6 @@ public RssSendHeartBeatRequest( int eventNumInFlush, long timeout, Set tags, - boolean isHealthy, ServerStatus serverStatus, Map storageInfo, int nettyPort) { @@ -63,7 +61,6 @@ public RssSendHeartBeatRequest( this.eventNumInFlush = eventNumInFlush; this.tags = tags; this.timeout = timeout; - this.isHealthy = isHealthy; this.serverStatus = serverStatus; this.storageInfo = storageInfo; this.nettyPort = nettyPort; @@ -105,10 +102,6 @@ public Set getTags() { return tags; } - public boolean isHealthy() { - return isHealthy; - } - public ServerStatus getServerStatus() { return serverStatus; } diff --git a/server/src/main/java/org/apache/uniffle/server/HealthCheck.java b/server/src/main/java/org/apache/uniffle/server/HealthCheck.java index ecfa28b1d4..27a30851d3 100644 --- a/server/src/main/java/org/apache/uniffle/server/HealthCheck.java +++ b/server/src/main/java/org/apache/uniffle/server/HealthCheck.java @@ -20,15 +20,16 @@ import java.lang.reflect.Constructor; import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Uninterruptibles; import org.apache.commons.collections.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.uniffle.common.ServerStatus; + /** * HealthCheck will check every server whether it has the ability to process shuffle data. Currently, we only support * disk checker. If enough disks don't have enough disk space, server will become unhealthy, and only enough disks @@ -38,14 +39,17 @@ public class HealthCheck { private static final Logger LOG = LoggerFactory.getLogger(HealthCheck.class); - private final AtomicBoolean isHealthy; + private AtomicReference serverStatus; private final long checkIntervalMs; private final Thread thread; private volatile boolean isStop = false; private List checkers = Lists.newArrayList(); - public HealthCheck(AtomicBoolean isHealthy, ShuffleServerConf conf, List buildInCheckers) { - this.isHealthy = isHealthy; + public HealthCheck( + AtomicReference serverStatus, + ShuffleServerConf conf, + List buildInCheckers) { + this.serverStatus = serverStatus; this.checkIntervalMs = conf.getLong(ShuffleServerConf.HEALTH_CHECK_INTERVAL); List configuredCheckers = conf.get(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES); if (CollectionUtils.isEmpty(configuredCheckers) && buildInCheckers.isEmpty()) { @@ -78,17 +82,23 @@ public HealthCheck(AtomicBoolean isHealthy, ShuffleServerConf conf, List tags, - boolean isHealthy, ServerStatus serverStatus, Map localStorageInfo, int nettyPort) { @@ -116,7 +114,6 @@ boolean sendHeartBeat( eventNumInFlush, heartBeatInterval, tags, - isHealthy, serverStatus, localStorageInfo, nettyPort); diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java index 212dd2d34d..13a4400995 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java @@ -22,7 +22,7 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; @@ -87,10 +87,10 @@ public class ShuffleServer { private StorageManager storageManager; private HealthCheck healthCheck; private Set tags = Sets.newHashSet(); - private AtomicBoolean isHealthy = new AtomicBoolean(true); private GRPCMetrics grpcMetrics; private MetricReporter metricReporter; - private volatile ServerStatus serverStatus = ServerStatus.ACTIVE; + + private AtomicReference serverStatus = new AtomicReference(ServerStatus.ACTIVE); private volatile boolean running; private ExecutorService executorService; private Future decommissionFuture; @@ -226,7 +226,7 @@ private void initialization() throws Exception { if (healthCheckEnable) { List builtInCheckers = Lists.newArrayList(); builtInCheckers.add(storageManager.getStorageChecker()); - healthCheck = new HealthCheck(isHealthy, shuffleServerConf, builtInCheckers); + healthCheck = new HealthCheck(serverStatus, shuffleServerConf, builtInCheckers); healthCheck.start(); } @@ -315,7 +315,11 @@ private void blockUntilShutdown() throws InterruptedException { } public ServerStatus getServerStatus() { - return serverStatus; + return serverStatus.get(); + } + + public void setServerStatus(ServerStatus serverStatus) { + this.serverStatus.set(serverStatus); } public synchronized void decommission() { @@ -323,11 +327,11 @@ public synchronized void decommission() { LOG.info("Shuffle Server is decommissioning. Nothing needs to be done."); return; } - if (!ServerStatus.ACTIVE.equals(serverStatus)) { + if (!ServerStatus.ACTIVE.equals(serverStatus.get())) { throw new InvalidRequestException( "Shuffle Server is processing other procedures, current status:" + serverStatus); } - serverStatus = ServerStatus.DECOMMISSIONING; + serverStatus.set(ServerStatus.DECOMMISSIONING); LOG.info("Shuffle Server is decommissioning."); if (executorService == null) { executorService = ThreadUtils.getDaemonSingleThreadExecutor("shuffle-server-decommission"); @@ -342,7 +346,7 @@ private void waitDecommissionFinish() { while (isDecommissioning()) { remainApplicationNum = shuffleTaskManager.getAppIds().size(); if (remainApplicationNum == 0) { - serverStatus = ServerStatus.DECOMMISSIONED; + serverStatus.set(ServerStatus.DECOMMISSIONED); LOG.info("All applications finished. Current status is " + serverStatus); if (shutdownAfterDecommission) { LOG.info("Exiting..."); @@ -374,11 +378,11 @@ public synchronized void cancelDecommission() { LOG.info("Shuffle server is not decommissioning. Nothing needs to be done."); return; } - if (ServerStatus.DECOMMISSIONED.equals(serverStatus)) { - serverStatus = ServerStatus.ACTIVE; + if (ServerStatus.DECOMMISSIONED.equals(serverStatus.get())) { + serverStatus.set(ServerStatus.ACTIVE); return; } - serverStatus = ServerStatus.ACTIVE; + serverStatus.set(ServerStatus.ACTIVE); if (decommissionFuture.cancel(true)) { LOG.info("Decommission canceled."); } else { @@ -453,13 +457,9 @@ public Set getTags() { return Collections.unmodifiableSet(tags); } - public boolean isHealthy() { - return isHealthy.get(); - } - @VisibleForTesting public void markUnhealthy() { - isHealthy.set(false); + serverStatus.set(ServerStatus.UNHEALTHY); } public GRPCMetrics getGrpcMetrics() { @@ -467,8 +467,8 @@ public GRPCMetrics getGrpcMetrics() { } public boolean isDecommissioning() { - return ServerStatus.DECOMMISSIONING.equals(serverStatus) - || ServerStatus.DECOMMISSIONED.equals(serverStatus); + return ServerStatus.DECOMMISSIONING.equals(serverStatus.get()) + || ServerStatus.DECOMMISSIONED.equals(serverStatus.get()); } @VisibleForTesting diff --git a/server/src/test/java/org/apache/uniffle/server/HealthyMockChecker.java b/server/src/test/java/org/apache/uniffle/server/HealthyMockChecker.java index 18b16bb8c7..1fce9cd3b6 100644 --- a/server/src/test/java/org/apache/uniffle/server/HealthyMockChecker.java +++ b/server/src/test/java/org/apache/uniffle/server/HealthyMockChecker.java @@ -17,7 +17,7 @@ package org.apache.uniffle.server; -class HealthyMockChecker extends Checker { +public class HealthyMockChecker extends Checker { @SuppressWarnings("checkstyle:RedundantModifier") public HealthyMockChecker(ShuffleServerConf conf) { diff --git a/server/src/test/java/org/apache/uniffle/server/UnHealthyMockChecker.java b/server/src/test/java/org/apache/uniffle/server/UnHealthyMockChecker.java index 1629335e43..18e673fb43 100644 --- a/server/src/test/java/org/apache/uniffle/server/UnHealthyMockChecker.java +++ b/server/src/test/java/org/apache/uniffle/server/UnHealthyMockChecker.java @@ -17,7 +17,7 @@ package org.apache.uniffle.server; -class UnHealthyMockChecker extends Checker { +public class UnHealthyMockChecker extends Checker { @SuppressWarnings("checkstyle:RedundantModifier") public UnHealthyMockChecker(ShuffleServerConf conf) {