Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Add a cluster level collector for node config settings #298

Merged
merged 5 commits into from
Jul 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.InstanceDetails;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.collector.NodeConfigCache;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.reader.ClusterDetailsEventProcessor;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
Expand All @@ -33,9 +34,13 @@
*/
public class AppContext {
private volatile ClusterDetailsEventProcessor clusterDetailsEventProcessor;
// initiate a node config cache within each AppContext space
// to store node config settings from ES
private final NodeConfigCache nodeConfigCache;

public AppContext() {
this.clusterDetailsEventProcessor = null;
this.nodeConfigCache = new NodeConfigCache();
}

public void setClusterDetailsEventProcessor(final ClusterDetailsEventProcessor clusterDetailsEventProcessor) {
Expand Down Expand Up @@ -104,4 +109,8 @@ public List<String> getPeerInstanceIps() {
.map(InstanceDetails::getInstanceIp)
.collect(Collectors.toList()));
}

public NodeConfigCache getNodeConfigCache() {
return this.nodeConfigCache;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;

/**
* a flowunit type to carry ES node configurations (queue/cache capacities, etc.)
Expand Down Expand Up @@ -84,6 +87,14 @@ public double readConfig(Resource resource) {
return configSummary.getValue();
}

/**
* get list of config settings that this flowunit contains
* @return list of config settings
*/
public List<Resource> getConfigList() {
return new ArrayList<>(configMap.keySet());
}

@Override
public boolean isEmpty() {
return configMap.isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ public void setAppContext(final AppContext appContext) {
this.appContext = appContext;
}

protected AppContext getAppContext() {
return this.appContext;
}

public InstanceDetails getInstanceDetails() {
InstanceDetails ret = new InstanceDetails(AllMetrics.NodeRole.UNKNOWN);
if (this.appContext != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.collector;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.grpc.Resource;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.builder.HashCodeBuilder;

/**
* we create a thread-safe unbounded cache instance in {@link com.amazon.opendistro.elasticsearch.performanceanalyzer.AppContext}
* to store the node config settings from each node. Any RCA vertex in RCA graph can read the node config directly from
* this cache instance. The key of this cache is NodeKey + Resource and value is the actual value of the config setting
* (i.e. size of write queue capacity)
*/
public class NodeConfigCache {

private static final int CACHE_TTL = 10;
private final Cache<NodeConfigKey, Double> nodeConfigCache;

//unbounded cache with TTL set to 10 mins
public NodeConfigCache() {
nodeConfigCache =
CacheBuilder.newBuilder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just Curious: Any particular reason we picked the unbounded cache? We may not be having fixed number of entries we want to put in the cache. So wouldn't leaving it unbounded might cause out of memory errors? It should be fine, If we are sure the number of entries are less.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the maxmium number of entries will be (number of node) * (node config on each node) which is not that big considering that stale entries will be evicted if reaches TTL.

.expireAfterWrite(CACHE_TTL, TimeUnit.MINUTES)
.build();
}

/**
* add config value into cache
* @param nodeKey the NodeKey of the node on which this config is collected
* @param config the config type
* @param value the config value
*/
public void put(NodeKey nodeKey, Resource config, double value) {
nodeConfigCache.put(new NodeConfigKey(nodeKey, config), value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to check here for Non-Null, Non-Negative values before putting them in HashMap?

Copy link
Contributor Author

@rguo-aws rguo-aws Jul 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we perform this check in the node level NodeConfigCollector when collection configs from DB.

}

/**
* returns the config value that is associated with the nodeKey and config
* @param nodeKey the NodeKey of the node
* @param config the config type
* @return the config value
* @throws IllegalArgumentException throws an exception if the config does not exist in cache
*/
public double get(NodeKey nodeKey, Resource config) throws IllegalArgumentException {
Double ret = nodeConfigCache.getIfPresent(new NodeConfigKey(nodeKey, config));
khushbr marked this conversation as resolved.
Show resolved Hide resolved
if (ret == null) {
throw new IllegalArgumentException();
}
return ret;
}

private static class NodeConfigKey {
private final NodeKey nodeKey;
private final Resource resource;

public NodeConfigKey(final NodeKey nodeKey, final Resource resource) {
this.nodeKey = nodeKey;
this.resource = resource;
}

public NodeKey getNodeKey() {
return this.nodeKey;
}

public Resource getResource() {
return this.resource;
}

@Override
public boolean equals(Object obj) {
if (obj instanceof NodeConfigKey) {
NodeConfigKey key = (NodeConfigKey)obj;
return nodeKey.equals(key.getNodeKey()) && resource.equals(key.getResource());
}
return false;
}

@Override
public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(nodeKey.hashCode())
.append(resource.hashCode())
.toHashCode();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.collector;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerApp;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.EmptyFlowUnit;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.flow_units.NodeConfigFlowUnit;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.NonLeafNode;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.ExceptionsAndErrors;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.RcaGraphMetrics;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.FlowUnitOperationArgWrapper;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* Cluster level node config collector that collect node config settings from each node and
* store them into the {@link NodeConfigCache}
*/
public class NodeConfigClusterCollector extends NonLeafNode<EmptyFlowUnit> {

private static final Logger LOG = LogManager.getLogger(NodeConfigClusterCollector.class);
private final NodeConfigCollector nodeConfigCollector;

public NodeConfigClusterCollector(final NodeConfigCollector nodeConfigCollector) {
super(0, 5);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constants here make the code unreadable, if we can add a variable with name suggesting what the constant represents, it is helpful.

Additionally, later we should refactor to introduce a defaultIntervalPeriod and use is across all collector and similarly for RCAs as well.

Copy link
Contributor Author

@rguo-aws rguo-aws Jul 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we can create a static const here. But since we will do another refactoring to remove the 5 second intervals form the super() constructor completely, it would be better to leave it as is at this moment ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, please add an issue for this.

this.nodeConfigCollector = nodeConfigCollector;
}

/**
* read and parse the NodeConfigFlowUnit. retrieve the list of configs from the flowunit
* and update the cache entries that are associated with the NodeKey + config type
*/
private void addNodeLevelConfigs() {
khushbr marked this conversation as resolved.
Show resolved Hide resolved
List<NodeConfigFlowUnit> flowUnits = nodeConfigCollector.getFlowUnits();
for (NodeConfigFlowUnit flowUnit : flowUnits) {
if (flowUnit.isEmpty() || !flowUnit.hasResourceSummary()) {
continue;
}
HotNodeSummary nodeSummary = flowUnit.getSummary();
NodeKey nodeKey = new NodeKey(nodeSummary.getNodeID(), nodeSummary.getHostAddress());
NodeConfigCache nodeConfigCache = getAppContext().getNodeConfigCache();
flowUnit.getConfigList().forEach(resource -> {
double value = flowUnit.readConfig(resource);
if (!Double.isNaN(value)) {
nodeConfigCache.put(nodeKey, resource, value);
}
});
}
}

@Override
public EmptyFlowUnit operate() {
addNodeLevelConfigs();
return new EmptyFlowUnit(System.currentTimeMillis());
}

@Override
public void generateFlowUnitListFromLocal(FlowUnitOperationArgWrapper args) {
LOG.debug("Collector: Executing fromLocal: {}", name());
long startTime = System.currentTimeMillis();

try {
this.operate();
} catch (Exception ex) {
LOG.error("Collector: Exception in operate", ex);
PerformanceAnalyzerApp.ERRORS_AND_EXCEPTIONS_AGGREGATOR.updateStat(
ExceptionsAndErrors.EXCEPTION_IN_OPERATE, name(), 1);
}
long duration = System.currentTimeMillis() - startTime;

PerformanceAnalyzerApp.RCA_GRAPH_METRICS_AGGREGATOR.updateStat(
RcaGraphMetrics.GRAPH_NODE_OPERATE_CALL, this.name(), duration);
}

/**
* NodeConfigClusterCollector does not have downstream nodes and does not emit flow units
*/
@Override
public void persistFlowUnit(FlowUnitOperationArgWrapper args) {
assert true;
}

@Override
public void generateFlowUnitListFromWire(FlowUnitOperationArgWrapper args) {
assert true;
}

@Override
public void handleNodeMuted() {
assert true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.remediation;
package com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.collector;

import static com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ThreadPoolDimension.THREAD_POOL_TYPE;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.HotShardSummary;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.GenericSummary;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.FlowUnitOperationArgWrapper;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.collector.NodeConfigCache;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;
import java.time.Clock;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -59,6 +61,11 @@ public void mockFlowUnits(List<ResourceFlowUnit<T>> flowUnitList) {
this.flowUnits = flowUnitList;
}

public double readConfig(NodeKey nodeKey, Resource resource) throws IllegalArgumentException {
NodeConfigCache nodeConfigCache = getAppContext().getNodeConfigCache();
return nodeConfigCache.get(nodeKey, resource);
}

public void setClock(Clock clock) {
this.clock = clock;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.store.collector;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.GradleTaskForRca;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.collector.NodeConfigCache;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(GradleTaskForRca.class)
public class NodeConfigCacheTest {

private NodeConfigCache nodeConfigCache;
private NodeKey nodeKey1;
private NodeKey nodeKey2;

@Before
public void init() {
this.nodeConfigCache = new NodeConfigCache();
this.nodeKey1 = new NodeKey("node1", "127.0.0.1");
this.nodeKey2 = new NodeKey("node2", "127.0.0.2");
}

@Test(expected = IllegalArgumentException.class)
public void testNonExistentKey() {
double val = nodeConfigCache.get(nodeKey1, ResourceUtil.WRITE_QUEUE_CAPACITY);
Assert.fail();
}

@Test(expected = IllegalArgumentException.class)
public void testReadWrongKey() {
nodeConfigCache.put(nodeKey1, ResourceUtil.WRITE_QUEUE_CAPACITY, 2.0);
double val = nodeConfigCache.get(nodeKey1, ResourceUtil.WRITE_QUEUE_REJECTION);
Assert.fail();
}

@Test
public void testSetAndGetValue() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe 1 UT for updating the value of the key and asserting if the updated value is returned.

nodeConfigCache.put(nodeKey1, ResourceUtil.WRITE_QUEUE_CAPACITY, 3.0);
double val = nodeConfigCache.get(nodeKey1, ResourceUtil.WRITE_QUEUE_CAPACITY);
Assert.assertEquals(3.0, val, 0.01);

nodeConfigCache.put(nodeKey1, ResourceUtil.WRITE_QUEUE_CAPACITY, 4.0);
val = nodeConfigCache.get(nodeKey1, ResourceUtil.WRITE_QUEUE_CAPACITY);
Assert.assertEquals(4.0, val, 0.01);
}
}
Loading