Permalink
Browse files

1) added CombinedMetric

2) error if registerMetrics called after component was prepared/opened.
  • Loading branch information...
1 parent d4d8e9b commit d210aec9c6880f7c388f014b2a76563617f45575 Jason Jackson committed Nov 1, 2012
@@ -28,7 +28,8 @@
(:default-shared-resources worker)
(:user-shared-resources worker)
(:shared-executor-data executor-data)
- (:interval->task->registered-metrics executor-data))))
+ (:interval->task->registered-metrics executor-data)
+ (:open-or-prepare-was-called? executor-data))))
(defn system-topology-context [worker executor-data tid]
((mk-topology-context-builder
@@ -0,0 +1,21 @@
+package backtype.storm.metric.api;
+
+public class CombinedMetric implements IMetric {
+ private final ICombiner _combiner;
+ private Object _value;
+
+ public CombinedMetric(ICombiner combiner) {
+ _combiner = combiner;
+ _value = _combiner.identity();
+ }
+
+ public void update(Object value) {
+ _value = _combiner.combine(_value, value);
+ }
+
+ public Object getValueAndReset() {
+ Object ret = _value;
+ _value = _combiner.identity();
+ return ret;
+ }
+}
@@ -0,0 +1,6 @@
+package backtype.storm.metric.api;
+
+public interface ICombiner<T> {
+ public T identity();
+ public T combine(T a, T b);
+}
@@ -1,7 +1,7 @@
package backtype.storm.metric.api;
public class ReducedMetric implements IMetric {
- private IReducer _reducer;
+ private final IReducer _reducer;
private Object _accumulator;
public ReducedMetric(IReducer reducer) {
@@ -6,8 +6,10 @@
import backtype.storm.hooks.ITaskHook;
import backtype.storm.metric.api.IMetric;
import backtype.storm.metric.api.IReducer;
-import backtype.storm.metric.MetricHolder;
+import backtype.storm.metric.api.ICombiner;
import backtype.storm.metric.api.ReducedMetric;
+import backtype.storm.metric.api.CombinedMetric;
+import backtype.storm.metric.MetricHolder;
import backtype.storm.state.ISubscribedState;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
@@ -34,20 +36,23 @@
private List<ITaskHook> _hooks = new ArrayList<ITaskHook>();
private Map<String, Object> _executorData;
private Map<Integer,Map<Integer, Collection<MetricHolder>>> _registeredMetrics;
+ private clojure.lang.Atom _openOrPrepareWasCalled;
public TopologyContext(StormTopology topology, Map stormConf,
Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks,
Map<String, Map<String, Fields>> componentToStreamToFields,
String stormId, String codeDir, String pidDir, Integer taskId,
Integer workerPort, List<Integer> workerTasks, Map<String, Object> defaultResources,
- Map<String, Object> userResources, Map<String, Object> executorData, Map registeredMetrics) {
+ Map<String, Object> userResources, Map<String, Object> executorData, Map registeredMetrics,
+ clojure.lang.Atom openOrPrepareWasCalled) {
super(topology, stormConf, taskToComponent, componentToSortedTasks,
componentToStreamToFields, stormId, codeDir, pidDir,
workerPort, workerTasks, defaultResources, userResources);
_taskId = taskId;
_executorData = executorData;
_registeredMetrics = registeredMetrics;
+ _openOrPrepareWasCalled = openOrPrepareWasCalled;
}
/**
@@ -205,6 +210,11 @@ public void addTaskHook(ITaskHook hook) {
* @return The IMetric argument unchanged.
*/
public IMetric registerMetric(String name, IMetric metric, int timeBucketSizeInSecs) {
+ if((Boolean)_openOrPrepareWasCalled.deref() == true) {
+ throw new RuntimeException("TopologyContext.registerMetric can only be called from within overridden " +
+ "IBolt::prepare() or ISpout::open() method.");
+ }
+
Map m1 = _registeredMetrics;
if(!m1.containsKey(timeBucketSizeInSecs)) {
m1.put(timeBucketSizeInSecs, new HashMap());
@@ -227,4 +237,10 @@ public IMetric registerMetric(String name, IMetric metric, int timeBucketSizeInS
public IMetric registerMetric(String name, IReducer reducer, int timeBucketSizeInSecs) {
return registerMetric(name, new ReducedMetric(reducer), timeBucketSizeInSecs);
}
+ /*
+ * Convinience method for registering ReducedMetric.
+ */
+ public IMetric registerMetric(String name, ICombiner combiner, int timeBucketSizeInSecs) {
+ return registerMetric(name, new CombinedMetric(combiner), timeBucketSizeInSecs);
+ }
}

0 comments on commit d210aec

Please sign in to comment.