Permalink
Browse files

refactorings, metrics API has it's own namespace.

  • Loading branch information...
1 parent 4a01197 commit a45098d7e32586484a611232cfe1727e008f46cb Jason Jackson committed Oct 31, 2012
@@ -6,7 +6,8 @@
(:import [backtype.storm.spout ISpoutWaitStrategy])
(:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo
EmitInfo BoltFailInfo BoltAckInfo BoltExecuteInfo])
- (:import [backtype.storm.metric MetricHolder IMetric])
+ (:import [backtype.storm.metric MetricHolder])
+ (:import [backtype.storm.metric.api IMetric])
(:require [backtype.storm [tuple :as tuple]])
(:require [backtype.storm.daemon [task :as task]])
)
@@ -1,5 +1,7 @@
package backtype.storm.metric;
+import backtype.storm.metric.api.IMetric;
+
public class MetricHolder {
private String _name;
private int _timeBucketIntervalInSecs;
@@ -1,5 +1,6 @@
package backtype.storm.metric;
+import backtype.storm.metric.api.IMetricsConsumer;
import backtype.storm.Config;
import backtype.storm.task.IBolt;
import backtype.storm.task.OutputCollector;
@@ -1,9 +1,9 @@
-package backtype.storm.metric;
+package backtype.storm.metric.api;
-public class FixedValueMetric implements IMetric {
+public class AssignableMetric implements IMetric {
Object _value;
- public FixedValueMetric(Object value) {
+ public AssignableMetric(Object value) {
_value = value;
}
@@ -1,9 +1,11 @@
-package backtype.storm.metric;
+package backtype.storm.metric.api;
-public class IncrementedMetric implements IMetric {
+import backtype.storm.metric.api.IMetric;
+
+public class CountMetric implements IMetric {
long _value = 0;
- public IncrementedMetric() {
+ public CountMetric() {
}
public void inc() {
@@ -1,4 +1,4 @@
-package backtype.storm.metric;
+package backtype.storm.metric.api;
public interface IMetric {
public Object getValueAndReset();
@@ -1,4 +1,4 @@
-package backtype.storm.metric;
+package backtype.storm.metric.api;
import backtype.storm.task.TopologyContext;
import java.util.Map;
@@ -1,4 +1,4 @@
-package backtype.storm.metric;
+package backtype.storm.metric.api;
public interface IReducer<T> {
T init();
@@ -1,4 +1,6 @@
-package backtype.storm.metric;
+package backtype.storm.metric.api;
+
+import backtype.storm.metric.api.IReducer;
class MeanReducerState {
public int count = 0;
@@ -1,4 +1,4 @@
-package backtype.storm.metric;
+package backtype.storm.metric.api;
public class ReducedMetric implements IMetric {
private IReducer _reducer;
@@ -4,8 +4,10 @@
import backtype.storm.generated.Grouping;
import backtype.storm.generated.StormTopology;
import backtype.storm.hooks.ITaskHook;
-import backtype.storm.metric.IMetric;
+import backtype.storm.metric.api.IMetric;
+import backtype.storm.metric.api.IReducer;
import backtype.storm.metric.MetricHolder;
+import backtype.storm.metric.api.ReducedMetric;
import backtype.storm.state.ISubscribedState;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
@@ -195,7 +197,22 @@ public void addTaskHook(ITaskHook hook) {
return _hooks;
}
- public void registerMetric(String name, IMetric metric, int timeBucketSizeInSecs) {
+ /*
+ * Register a IMetric instance.
+ * Storm will then call getValueAndReset on the metric every timeBucketSizeInSecs
+ * and the returned value is sent to all metrics consumers.
+ * You must call this during IBolt::prepare or ISpout::open.
+ * @return The IMetric argument unchanged.
+ */
+ public IMetric registerMetric(String name, IMetric metric, int timeBucketSizeInSecs) {
_registeredMetrics.add(new MetricHolder(name, metric, timeBucketSizeInSecs));
+ return metric;
+ }
+
+ /*
+ * Convinience method for registering ReducedMetric.
+ */
+ public IMetric registerMetric(String name, IReducer reducer, int timeBucketSizeInSecs) {
+ return registerMetric(name, new ReducedMetric(reducer), timeBucketSizeInSecs);
}
}

0 comments on commit a45098d

Please sign in to comment.