Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Polarize actions based on impact vectors #332

Merged
merged 12 commits into from
Aug 31, 2020

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,18 @@
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ActionListener;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.FlipFlopDetector;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.TimedFlipFlopDetector;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.collator.Collator;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.NonLeafNode;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.ExceptionsAndErrors;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.RcaGraphMetrics;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.FlowUnitOperationArgWrapper;
import com.google.common.annotations.VisibleForTesting;

import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.collator;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.Action;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;
import java.util.List;
import java.util.Map;
import org.checkerframework.checker.nullness.qual.NonNull;

/**
* A grouping interface to provide different ways to group actions as needed by the {@link
* Collator}
*/
public interface ActionGrouper {

/**
* Groups the given list of actions by the nodes they impact.
*
* @param actions The list of actions that need to be grouped.
* @return A map of actions grouped by nodes they impact.
*/
@NonNull Map<NodeKey, List<Action>> groupByInstanceId(@NonNull final List<Action> actions);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.collator;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.Action;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector.Impact;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.Decider;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.Decision;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.AnalysisGraph;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.checkerframework.checker.nullness.qual.NonNull;

/**
* Collator collects and prunes the candidate decisions from each decider so that their impacts are
* aligned.
*
* <p>Decisions can increase or decrease pressure on different key resources on an Elasticearch
* node. This is encapsulated in each Action via the {@link ImpactVector}. Since each decider
* independently evaluates its decision, it is possible to have conflicting ImpactVectors from
* actions across deciders.
*
* <p>The collator prunes them to ensure we only take actions that either increase, or decrease
* pressure on a particular node. To resolve conflicts, we prefer stability over performance. In
* order for the above guarantee to work, there should be only one collator instance in an {@link
* AnalysisGraph}.
*/
public class Collator extends Decider {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There can only be one collator node in a graph right ? Can we add that to the comments ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


public static final String NAME = "collator";

/* Deciders can choose to publish decisions at different frequencies based on the
* type of resources monitored and rca signals. The collator should however, not introduce any
* unnecessary delays. As soon as a decision is taken, it should be evaluated and published downstream.
*/
private static final int collatorFrequency = 1; // Measured in terms of number of evaluationIntervalPeriods

private static final int evalIntervalSeconds = 5;

private final List<Decider> deciders;

private final ActionGrouper actionGrouper;

public Collator(Decider... deciders) {
this(new SingleNodeImpactActionGrouper(), deciders);
}

public Collator(ActionGrouper actionGrouper, Decider... deciders) {
super(evalIntervalSeconds, collatorFrequency);
this.deciders = Arrays.asList(deciders);
this.actionGrouper = actionGrouper;
}

@Override
public String name() {
return NAME;
}

/**
* The collator uses an action grouping strategy to first group actions by instanceIds. Then, the
* collator polarizes the list of actions per instance to be in the same direction of pressure,
* i.e. all the polarized actions either increase pressure on a node, or decrease pressure on a
* node.
*
* <p>When there are conflicting actions suggested by the deciders for an instance, the
* polarization logic prefers pruning actions that decrease stability retaining only those that
* increase stability. </p>
*
* @return A {@link Decision} instance that contains the list of polarized actions.
*/
@Override
public Decision operate() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be a good idea to add a javadoc comment for the operate() as this is the center piece of the RCAGraph node ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

final List<Action> proposedActions = getAllProposedActions();
final Map<NodeKey, List<Action>> actionsByNode = actionGrouper
.groupByInstanceId(proposedActions);
final List<Action> prunedActions = new ArrayList<>();
actionsByNode.forEach((nodeKey, actions) -> prunedActions.addAll(polarize(nodeKey, actions)));

final Decision finalDecision = new Decision(System.currentTimeMillis(), NAME);
finalDecision.addAllActions(prunedActions);
return finalDecision;
}

@NonNull
private List<Action> getAllProposedActions() {
final List<Action> proposedActions = new ArrayList<>();
if (deciders != null) {
for (final Decider decider : deciders) {
List<Decision> decisions = decider.getFlowUnits();
if (decisions != null) {
decisions.forEach(decision -> {
if (decision.getActions() != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use decision.isEmpty() here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will change in the next rev

proposedActions.addAll(decision.getActions());
}
});
}
}
}
return proposedActions;
}

private List<Action> polarize(final NodeKey nodeKey, final List<Action> actions) {
Copy link
Contributor

@vigyasharma vigyasharma Aug 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that this will need non-trivial changes when we have actions impacting multiple nodes. If an action is purged because of pressure conflict on one of the nodes, it must be removed from the list of all of the nodes.

Should polarize() then move to ActionGrouper? Is the overall above structure right for future use cases?


My ideas for solving this were more around compiling all impact vectors into a matrix and doing a second pass through it to accept or prune actions. Something on these lines:

Suppose the collator received 2 actions, moveShard from node A to node B, and moveShard from node B to node C

actions = [MoveShard: A -> B, MoveShard: B -> C]
Impact vectors:
	actions[0] :: MoveShard: A -> B
	impactedNodes = [A, B]
	A: [CPU:dec, Heap: dec, RAM: dec, NW: dec, Disk: dec]
	B: [CPU:inc, Heap: inc, RAM: inc, NW: inc, Disk: inc]

	actions[1] :: MoveShard: B -> C
	impactedNodes = [B, C]
	B: [CPU:dec, Heap: dec, RAM: dec, NW: dec, Disk: dec]
	C: [CPU:inc, Heap: inc, RAM: inc, NW: inc, Disk: inc]

First we create an impact matrix for the cluster:

  • node A has only decrease across each dim
  • B has 1 inc and 1 dec action,
  • C has 1 inc action for each dim
	       A 	B 	      C
cpu 	  i=0,d=1     i=1,d=1     i=1,d=0
heap 	  i=0,d=1     i=1,d=1     i=1,d=0
ram       i=0,d=1     i=1,d=1     i=1,d=0
nw        i=0,d=1     i=1,d=1     i=1,d=0
disk      i=0,d=1     i=1,d=1     i=1,d=0

Now we run the actions through this matrix again.

If an action is increasing pressure on a dim on a node, it is only allowed
if there is no other action trying to dec that dim on the node. i.e. d=0 for that node,dim.

When an action gets purged in this second pass, we remove it from the matrix by reducing the node,dim counters.

In above e.g. we process MoveShard(A->B) -

  • It cannot go through because d=1 for some dims on node B
  • We purge MoveShard(A->B)
  • Counters on A and B are updated. All counters in A become i=0, d=0. B becomes i=0,d=1

Now we process MoveShard(B->C) -

  • It increases pressure on C but there is no one decreasing pressure (d=0 for all dims on C), so it can go through
  • On B it is dec pressure anyway so it has a clear pass.

Hence MoveShard(B->C) gets picked.

This will also resolve cycles like A->B, B->C, C->A by picking either one or two of the actions (depending on order of 2nd pass).


It is okay if we handle multi-node actions in a separate PR later. You may also use different/better algorithms for polarizing impact vectors. Just wanted to check if the current Collator::polarize() and ActionGrouper::groupByInstanceId() is the right approach for future use cases, like the moveShard, splitShard etc..

It will also be good to at least handle the dimension level comparisons right away, as they help establish a structure for all use cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this sage advice, I had an unnecessarily complex system and I was able to rip it out until I got to something similar to what you've proposed here :) I've updated the PR to reflect these changes.

final List<Action> pressureIncreasingActions = new ArrayList<>();
final List<Action> pressureNonIncreasingActions = new ArrayList<>();

for (final Action action : actions) {
ImpactVector impactVector = action.impact().getOrDefault(nodeKey, new ImpactVector());

// Classify the action as pressure increasing action if the impact for any dimension is
// increasing pressure.
if (impactVector.getImpact()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This classifies an increase across any dimension as increasing pressure. Shouldn't we check on a per dimension basis? Otherwise there is no value in having different dimensions in the impact vector.

Suppose you have an action to offload some data from heap to a file on disk. This would reduce heap but increase disk pressure. If there is no other action trying to decrease disk pressure, then the overall disk on the node is not under contention and we should let this action go through.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I assumed for the first iteration we would not have such actions and had not thought of polarizing at a dimension level. I will change it.

.values()
.stream()
.anyMatch(impact -> impact == Impact.INCREASES_PRESSURE)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functional programming !! yay !

pressureIncreasingActions.add(action);
} else {
pressureNonIncreasingActions.add(action);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given "If there are any actions that decrease pressure for a node, prefer that over list of actions that increase pressure.", should we break here when we find a pressureNonIncreasingActions ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there might be other pressure decreasing actions beyond the one that increases it, we need to add that as well, so we can't break early.

}
}

// If there are any actions that decrease pressure for a node, prefer that over list of
// actions that increase pressure.
if (pressureNonIncreasingActions.size() > 0) {
return pressureNonIncreasingActions;
}

// Return list of actions that increase pressure only if no decider has proposed an action
// that will relieve pressure for this node.
return pressureIncreasingActions;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.collator;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.Action;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.checkerframework.checker.nullness.qual.NonNull;

public class SingleNodeImpactActionGrouper implements ActionGrouper {

/**
* Groups actions by instance they impact. This grouping method considers only those actions that
* impact a single node as valid candidates for grouping and as a result it filters out actions
* that impact multiple nodes.
*
* <p>Any action that impacts more than one node will need a more involved handling logic and
* this method is not to be used for grouping such actions.</p>
*
* @param actions The list of actions that need to be grouped.
* @return A map of actions grouped by instance they impact.
*/
@Override
@NonNull
public Map<NodeKey, List<Action>> groupByInstanceId(@NonNull List<Action> actions) {
final Map<NodeKey, List<Action>> actionsByNodeId = new HashMap<>();
actions.stream()
.filter(action -> action.impactedNodes().size() == 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, each action can be associated with 1 or 0 impacted Nodes ? Just wanted to clarify this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, is that not the understanding today? This class only considers those actions that impact exactly one node. The multi node impact action grouping will be done in the next PR.

@yojs brought up a similar point, do we create a single action with multiple nodes in the impacted nodes when we want to say decrease cache size for node1, node2, node3 or do we create multiple actions for each node we want to decrease the cache size for?

Copy link
Contributor

@khushbr khushbr Aug 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I clarified this with others and we will have actions which impact a single node and then another set of action which are for multiple nodes.

Taking the above example, cache increase action will be per node but for cache decrease, we will have 1 action with multiple nodes in the impactedNodes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actions can impact a single node or multiple nodes - this depends on the type of action. For a given action type (a class implementing Action interface), this stays fixed.

@khushbr's example above is slightly incorrect. ModifyCacheAction impacts only a single node. If we want to increase or decrease caches across multiple nodes, the decider will have to create multiple such actions.

There will be other types of actions in future, like MoveShard, which can impact multiple nodes (src node and dest node) or SplitShard which impacts even more nodes. For those actions, impactedNodes will contain all the nodes impacted and impact() will return the impactVector for each node - e.g. for MoveShard impact() will indicate that pressure gets reduced on src node and increased on the destination node.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment why are we filtering out the actions that ask for the same action to be performed on multiple nodes ? Is that rationale that we do it only one node per iteration ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting point, don't we create multiple actions in such cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we are creating one action per node even if its the same action for multiple nodes, added a comment.

.forEach(action -> actionsByNodeId.computeIfAbsent(action.impactedNodes()
.get(0), k -> new ArrayList<>())
.add(action));
return actionsByNodeId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to turn on some checkstyle rukes, this should have caused an error ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the .get(0) is aligned with .impactedNodes() because they're chained calls, and similarly, .add(action) is aligned with .computeIfAbsent() because they're chained calls.

This is what the formatter did and I thought it was helpful 😅 I'm using the GoogleStyle scheme. Let me know if I should use another formatter.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import static com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.RcaConsts.RcaTagConstants.TAG_LOCUS;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.CacheHealthDecider;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.Collator;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.Publisher;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.QueueHealthDecider;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.collator.Collator;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.CommonDimension;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.ShardStatsDerivedDimension;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.MetricsDB;
Expand Down Expand Up @@ -103,7 +103,6 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -274,7 +273,7 @@ public void construct() {
//constructResourceHeatMapGraph();

// Collator - Collects actions from all deciders and aligns impact vectors
Collator collator = new Collator(EVALUATION_INTERVAL_SECONDS, queueHealthDecider, cacheHealthDecider);
Collator collator = new Collator(queueHealthDecider, cacheHealthDecider);
collator.addTag(TAG_LOCUS, LOCUS_MASTER_NODE);
collator.addAllUpstreams(Arrays.asList(queueHealthDecider, cacheHealthDecider));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,18 @@
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ActionListener;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ImpactVector.Dimension;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.collator.Collator;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.plugins.Plugin;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.util.InstanceDetails;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.store.rca.cluster.NodeKey;
import com.google.common.collect.Lists;

import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.junit.Before;
import org.junit.Test;

import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
Expand Down
Loading