Skip to content

Commit

Permalink
Remove usage of deprecated "master" APIs (#513)
Browse files Browse the repository at this point in the history
All usages have been replaced with the "cluster manager" variants.

Signed-off-by: Andrew Ross <andrross@amazon.com>
  • Loading branch information
andrross committed Sep 5, 2023
1 parent 2046483 commit 7a6f789
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.service.MasterService;
import org.opensearch.cluster.service.ClusterManagerService;
import org.opensearch.cluster.service.SourcePrioritizedRunnable;
import org.opensearch.common.util.concurrent.PrioritizedOpenSearchThreadPoolExecutor;
import org.opensearch.performanceanalyzer.OpenSearchResources;
Expand Down Expand Up @@ -82,7 +82,9 @@ public void collectMetrics(long startTime) {
try {
if (Objects.isNull(OpenSearchResources.INSTANCE.getClusterService())
|| Objects.isNull(
OpenSearchResources.INSTANCE.getClusterService().getMasterService())) {
OpenSearchResources.INSTANCE
.getClusterService()
.getClusterManagerService())) {
return;
}

Expand Down Expand Up @@ -186,7 +188,8 @@ void generateFinishMetrics(long startTime) {

// - Separated to have a unit test; and catch any code changes around this field
Field getClusterManagerServiceTPExecutorField() throws NoSuchFieldException {
Field threadPoolExecutorField = MasterService.class.getDeclaredField("threadPoolExecutor");
Field threadPoolExecutorField =
ClusterManagerService.class.getDeclaredField("threadPoolExecutor");
threadPoolExecutorField.setAccessible(true);
return threadPoolExecutorField;
}
Expand Down Expand Up @@ -223,8 +226,8 @@ Queue<Runnable> getClusterManagerServiceCurrentQueue()
throws NoSuchFieldException, IllegalAccessException {
if (clusterManagerServiceCurrentQueue == null) {
if (OpenSearchResources.INSTANCE.getClusterService() != null) {
MasterService clusterManagerService =
OpenSearchResources.INSTANCE.getClusterService().getMasterService();
ClusterManagerService clusterManagerService =
OpenSearchResources.INSTANCE.getClusterService().getClusterManagerService();

if (clusterManagerService != null) {
if (prioritizedOpenSearchThreadPoolExecutor == null) {
Expand Down Expand Up @@ -253,8 +256,8 @@ HashSet<Object> getClusterManagerServiceWorkers()
throws NoSuchFieldException, IllegalAccessException {
if (clusterManagerServiceWorkers == null) {
if (OpenSearchResources.INSTANCE.getClusterService() != null) {
MasterService clusterManagerService =
OpenSearchResources.INSTANCE.getClusterService().getMasterService();
ClusterManagerService clusterManagerService =
OpenSearchResources.INSTANCE.getClusterService().getClusterManagerService();

if (clusterManagerService != null) {
if (prioritizedOpenSearchThreadPoolExecutor == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ public String getMetricsPath(long startTime, String... keysPath) {
public void collectMetrics(long startTime) {
if (Objects.isNull(OpenSearchResources.INSTANCE.getClusterService())
|| Objects.isNull(
OpenSearchResources.INSTANCE.getClusterService().getMasterService())) {
OpenSearchResources.INSTANCE
.getClusterService()
.getClusterManagerService())) {
return;
}

Expand All @@ -72,7 +74,10 @@ public void collectMetrics(long startTime) {
* timeIn_queue: "86ms"
*/
List<PendingClusterTask> pendingTasks =
OpenSearchResources.INSTANCE.getClusterService().getMasterService().pendingTasks();
OpenSearchResources.INSTANCE
.getClusterService()
.getClusterManagerService()
.pendingTasks();
HashMap<String, Integer> pendingTaskCountPerTaskType = new HashMap<>();

pendingTasks.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import java.util.Objects;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.service.MasterService;
import org.opensearch.cluster.service.ClusterManagerService;
import org.opensearch.performanceanalyzer.OpenSearchResources;
import org.opensearch.performanceanalyzer.commons.collectors.MetricStatus;
import org.opensearch.performanceanalyzer.commons.collectors.PerformanceAnalyzerMetricsCollector;
Expand Down Expand Up @@ -67,7 +67,9 @@ public void collectMetrics(long startTime) {
}
if (Objects.isNull(OpenSearchResources.INSTANCE.getClusterService())
|| Objects.isNull(
OpenSearchResources.INSTANCE.getClusterService().getMasterService())) {
OpenSearchResources.INSTANCE
.getClusterService()
.getClusterManagerService())) {
return;
}

Expand Down Expand Up @@ -105,7 +107,7 @@ public void collectMetrics(long startTime) {
private boolean isClusterManagerThrottlingFeatureAvailable() {
try {
Class.forName(CLUSTER_MANAGER_THROTTLING_RETRY_LISTENER_PATH);
MasterService.class.getMethod(THROTTLED_PENDING_TASK_COUNT_METHOD_NAME);
ClusterManagerService.class.getMethod(THROTTLED_PENDING_TASK_COUNT_METHOD_NAME);
} catch (ClassNotFoundException | NoSuchMethodException e) {
return false;
}
Expand All @@ -114,9 +116,13 @@ private boolean isClusterManagerThrottlingFeatureAvailable() {

private long getTotalClusterManagerThrottledTaskCount()
throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
Method method = MasterService.class.getMethod(THROTTLED_PENDING_TASK_COUNT_METHOD_NAME);
Method method =
ClusterManagerService.class.getMethod(THROTTLED_PENDING_TASK_COUNT_METHOD_NAME);
return (long)
method.invoke(OpenSearchResources.INSTANCE.getClusterService().getMasterService());
method.invoke(
OpenSearchResources.INSTANCE
.getClusterService()
.getClusterManagerService());
}

private long getRetryingPendingTaskCount()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void collectMetrics(long startTime) {
DiscoveryNodes discoveryNodes =
OpenSearchResources.INSTANCE.getClusterService().state().nodes();

DiscoveryNode clusterManagerNode = discoveryNodes.getMasterNode();
DiscoveryNode clusterManagerNode = discoveryNodes.getClusterManagerNode();

Iterator<DiscoveryNode> discoveryNodeIterator = discoveryNodes.iterator();
addMetricsToStringBuilder(discoveryNodes.getLocalNode(), value, "", clusterManagerNode);
Expand Down Expand Up @@ -124,7 +124,7 @@ private String getNodeRole(final DiscoveryNode node) {
final NodeRole role =
node.isDataNode()
? NodeRole.DATA
: node.isMasterNode() ? NodeRole.CLUSTER_MANAGER : NodeRole.UNKNOWN;
: node.isClusterManagerNode() ? NodeRole.CLUSTER_MANAGER : NodeRole.UNKNOWN;
return role.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void testCollectMetrics() throws Exception {
.get(
OpenSearchResources.INSTANCE
.getClusterService()
.getMasterService());
.getClusterManagerService());
SourcePrioritizedRunnable runnable =
new SourcePrioritizedRunnable(Priority.HIGH, "_add_listener_") {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void testWithMockClusterService() {
assertNull(jsonStr);

OpenSearchResources.INSTANCE.setClusterService(mockedClusterService);
when(mockedClusterService.getMasterService()).thenThrow(new RuntimeException());
when(mockedClusterService.getClusterManagerService()).thenThrow(new RuntimeException());
clusterManagerServiceMetrics.run();
jsonStr = readMetricsInJsonString(0);
assertNull(jsonStr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ public void testNodeDetails() throws Exception {
discoBuilder = discoBuilder.add(node);
}

discoBuilder.masterNodeId(nodeId1);
discoBuilder.clusterManagerNodeId(nodeId1);
discoBuilder.localNodeId(nodeId2);

DiscoveryNodes discoveryNodes = discoBuilder.build();
Expand Down

0 comments on commit 7a6f789

Please sign in to comment.