Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Add listeners for publisher actions (#295)
Browse files Browse the repository at this point in the history
Add support for registering action listeners with the Publisher. These listeners provide for different ways to react to published actions.
  • Loading branch information
vigyasharma committed Jul 30, 2020
1 parent b603245 commit 1180718
Show file tree
Hide file tree
Showing 12 changed files with 507 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,6 @@ public interface Action {
/** Time to wait since last recommendation, before suggesting this action again */
long coolOffPeriodInMillis();

/**
* Called when the action is invoked.
*
* <p>Specific implementation may include executing the action, or invoking downstream APIs
*/
void execute();

/** Returns a list of Elasticsearch nodes impacted by this action. */
List<NodeKey> impactedNodes();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions;

/**
* This listener is notified whenever an action suggestion is
* published by the decision maker Publisher
*/
public interface ActionListener {

/**
* Called when Publisher emits an action
*/
void actionPublished(Action action);
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,6 @@ public Map<NodeKey, ImpactVector> impact() {
return Collections.singletonMap(esNode, impactVector);
}

@Override
public void execute() {
// Making this a no-op for now
// TODO: Modify based on downstream CoS agent API calls
assert true;
}

@Override
public String summary() {
if (!isActionable()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,6 @@ public Map<NodeKey, ImpactVector> impact() {
return Collections.singletonMap(esNode, impactVector);
}

@Override
public void execute() {
// Making this a no-op for now
// TODO: Modify based on downstream agent API calls
assert true;
}

@Override
public String summary() {
if (!isActionable()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,22 @@

import com.amazon.opendistro.elasticsearch.performanceanalyzer.PerformanceAnalyzerApp;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.Action;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ActionListener;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.FlipFlopDetector;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.TimedFlipFlopDetector;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.core.NonLeafNode;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.ExceptionsAndErrors;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.metrics.RcaGraphMetrics;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.scheduler.FlowUnitOperationArgWrapper;
import com.google.common.annotations.VisibleForTesting;

import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -40,10 +45,12 @@ public class Publisher extends NonLeafNode<EmptyFlowUnit> {
private FlipFlopDetector flipFlopDetector;
private boolean isMuted = false;
private Map<String, Long> actionToExecutionTime;
private List<ActionListener> actionListeners;

public Publisher(int evalIntervalSeconds, Collator collator) {
super(0, evalIntervalSeconds);
this.collator = collator;
this.actionListeners = new ArrayList<>();
this.actionToExecutionTime = new HashMap<>();
// TODO please bring in guice so we can configure this with DI
this.flipFlopDetector = new TimedFlipFlopDetector(1, TimeUnit.HOURS);
Expand All @@ -53,8 +60,8 @@ public Publisher(int evalIntervalSeconds, Collator collator) {
/**
* Returns true if a given {@link Action}'s last execution time was >= {@link Action#coolOffPeriodInMillis()} ago
*
* <p>If this Publisher has never executed the action, the last execution time is defined as the time that the publisher
* object was constructed.
* <p>If this Publisher has never executed the action, the last execution time is defined as the time that the
* publisher object was constructed.
*
* @param action The {@link Action} to test
* @return true if a given {@link Action}'s last execution time was >= {@link Action#coolOffPeriodInMillis()} ago
Expand All @@ -67,7 +74,7 @@ public boolean isCooledOff(Action action) {
} else {
LOG.debug("Publisher: Action {} still has {} ms left in its cool off period",
action.name(),
action.coolOffPeriodInMillis() - elapsed);
action.coolOffPeriodInMillis() - elapsed);
return false;
}
}
Expand All @@ -78,10 +85,11 @@ public EmptyFlowUnit operate() {
Decision decision = collator.getFlowUnits().get(0);
for (Action action : decision.getActions()) {
if (isCooledOff(action) && !flipFlopDetector.isFlipFlop(action)) {
LOG.info("Publisher: Executing action: [{}]", action.name());
action.execute();
flipFlopDetector.recordAction(action);
actionToExecutionTime.put(action.name(), Instant.now().toEpochMilli());
for (ActionListener listener : actionListeners) {
listener.actionPublished(action);
}
}
}
return new EmptyFlowUnit(Instant.now().toEpochMilli());
Expand All @@ -105,6 +113,15 @@ public void generateFlowUnitListFromLocal(FlowUnitOperationArgWrapper args) {
RcaGraphMetrics.GRAPH_NODE_OPERATE_CALL, this.name(), duration);
}

/**
* Register an action listener with Publisher
*
* <p>The listener is notified whenever an action is published
*/
public void addActionListener(ActionListener listener) {
actionListeners.add(listener);
}

/**
* Publisher does not have downstream nodes and does not emit flow units
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.plugins;

/**
* Allows adding custom extensions to the analysis graph.
*
* <p>RCA framework plugins can be installed to extend the analysis graph through custom
* metric nodes, rca nodes, deciders or action listeners. These can subscribe to flow
* units from existing nodes to add new functionality, or override existing graph nodes to
* customize for specific use cases.
*/
public abstract class Plugin {

public abstract String name();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.plugins;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ActionListener;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.Publisher;
import com.google.common.annotations.VisibleForTesting;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class PluginController {

private static final Logger LOG = LogManager.getLogger(PluginController.class);
private final Publisher publisher;
private List<Plugin> plugins;
private PluginControllerConfig pluginControllerConfig;

public PluginController(PluginControllerConfig pluginConfig, Publisher publisher) {
this.pluginControllerConfig = pluginConfig;
this.publisher = publisher;
this.plugins = new ArrayList<>();
}

public void initPlugins() {
loadFrameworkPlugins();
registerActionListeners();
}

private void loadFrameworkPlugins() {
for (Class<?> pluginClass : pluginControllerConfig.getFrameworkPlugins()) {
final Constructor<?>[] constructors = pluginClass.getConstructors();
if (constructors.length == 0) {
throw new IllegalStateException(
"no public constructor found for plugin class: [" + pluginClass.getName() + "]");
}
if (constructors.length > 1) {
throw new IllegalStateException(
"unique constructor expected for plugin class: [" + pluginClass.getName() + "]");
}
if (constructors[0].getParameterCount() != 0) {
throw new IllegalStateException(
"default constructor expected for plugin class: [" + pluginClass.getName() + "]");
}

try {
plugins.add((Plugin) constructors[0].newInstance());
LOG.info("loaded plugin: [{}]", plugins.get(plugins.size() - 1).name());
} catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
LOG.error("Failed to instantiate plugin", e);
throw new IllegalStateException("Failed to instantiate plugin: [" + pluginClass.getName() + "]", e);
}
}
}

private void registerActionListeners() {
for (Plugin plugin: plugins) {
if (ActionListener.class.isAssignableFrom(plugin.getClass())) {
publisher.addActionListener((ActionListener)plugin);
}
}
}

@VisibleForTesting
List<Plugin> getPlugins() {
return plugins;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.plugins;

import java.util.ArrayList;
import java.util.List;

public class PluginControllerConfig {

private List<Class<? extends Plugin>> frameworkPlugins;

public PluginControllerConfig() {
frameworkPlugins = new ArrayList<>();
frameworkPlugins.add(PublisherEventsLogger.class);
}

/**
* Returns a list of entry point classes for internal framework plugins
*/
public List<Class<? extends Plugin>> getFrameworkPlugins() {
return frameworkPlugins;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistro.elasticsearch.performanceanalyzer.plugins;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.Action;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.actions.ActionListener;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* A simple listener that logs all actions published by the publisher
*/
public class PublisherEventsLogger extends Plugin implements ActionListener {

private static final Logger LOG = LogManager.getLogger(PublisherEventsLogger.class);
public static final String NAME = "publisher_events_logger_plugin";

@Override
public void actionPublished(Action action) {
LOG.info("Action: [{}] published by decision maker publisher.", action.name());
}

@Override
public String name() {
return NAME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.amazon.opendistro.elasticsearch.performanceanalyzer.decisionmaker.deciders.QueueHealthDecider;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metrics.AllMetrics.CommonDimension;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.metricsdb.MetricsDB;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.plugins.PluginController;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.plugins.PluginControllerConfig;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.AnalysisGraph;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Metric;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.rca.framework.api.Rca;
Expand Down Expand Up @@ -214,6 +216,11 @@ public void construct() {
Publisher publisher = new Publisher(EVALUATION_INTERVAL_SECONDS, collator);
publisher.addTag(TAG_LOCUS, LOCUS_MASTER_NODE);
publisher.addAllUpstreams(Collections.singletonList(collator));

// TODO: Refactor using DI to move out of construct method
PluginControllerConfig pluginControllerConfig = new PluginControllerConfig();
PluginController pluginController = new PluginController(pluginControllerConfig, publisher);
pluginController.initPlugins();
}

private void constructShardResourceUsageGraph() {
Expand Down
Loading

0 comments on commit 1180718

Please sign in to comment.