Permalink
Browse files

get rid of unncessary code in metrics implementation, get rid of prin…

…tln, and improve signatures of registerMetric methods to return more specific IMetric types
  • Loading branch information...
1 parent dd78113 commit 40264637f4188e5a9c2eb72632aeb3b9df275eeb @nathanmarz committed Nov 25, 2012
@@ -208,7 +208,7 @@ public void addTaskHook(ITaskHook hook) {
* You must call this during IBolt::prepare or ISpout::open.
* @return The IMetric argument unchanged.
*/
- public IMetric registerMetric(String name, IMetric metric, int timeBucketSizeInSecs) {
+ public <T extends IMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs) {
if((Boolean)_openOrPrepareWasCalled.deref() == true) {
throw new RuntimeException("TopologyContext.registerMetric can only be called from within overridden " +
"IBolt::prepare() or ISpout::open() method.");
@@ -237,13 +237,13 @@ public IMetric registerMetric(String name, IMetric metric, int timeBucketSizeInS
/*
* Convinience method for registering ReducedMetric.
*/
- public IMetric registerMetric(String name, IReducer reducer, int timeBucketSizeInSecs) {
+ public ReducedMetric registerMetric(String name, IReducer reducer, int timeBucketSizeInSecs) {
return registerMetric(name, new ReducedMetric(reducer), timeBucketSizeInSecs);
}
/*
* Convinience method for registering CombinedMetric.
*/
- public IMetric registerMetric(String name, ICombiner combiner, int timeBucketSizeInSecs) {
+ public CombinedMetric registerMetric(String name, ICombiner combiner, int timeBucketSizeInSecs) {
return registerMetric(name, new CombinedMetric(combiner), timeBucketSizeInSecs);
}
}
@@ -31,13 +31,13 @@ public int getPartitionIndex() {
return _topoContext.getThisTaskIndex();
}
- public IMetric registerMetric(String name, IMetric metric, int timeBucketSizeInSecs) {
+ public <T extends IMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs) {
return _topoContext.registerMetric(name, metric, timeBucketSizeInSecs);
}
- public IMetric registerMetric(String name, IReducer reducer, int timeBucketSizeInSecs) {
+ public ReducedMetric registerMetric(String name, IReducer reducer, int timeBucketSizeInSecs) {
return _topoContext.registerMetric(name, new ReducedMetric(reducer), timeBucketSizeInSecs);
}
- public IMetric registerMetric(String name, ICombiner combiner, int timeBucketSizeInSecs) {
+ public CombinedMetric registerMetric(String name, ICombiner combiner, int timeBucketSizeInSecs) {
return _topoContext.registerMetric(name, new CombinedMetric(combiner), timeBucketSizeInSecs);
}
}
@@ -57,8 +57,8 @@
(or
(and (not= N 0) (nil? taskid->buckets))
(not-every? #(<= N %) (map (comp count second) taskid->buckets))))
- (println "Waiting for at least" N "timebuckets to appear in FakeMetricsConsumer for component id" comp-id
- "and metric name" metric-name)
+;; (println "Waiting for at least" N "timebuckets to appear in FakeMetricsConsumer for component id" comp-id
+;; "and metric name" metric-name)
(Thread/sleep 10)))
(defn lookup-bucket-by-comp-id-&-metric-name! [comp-id metric-name]
@@ -80,8 +80,6 @@
[cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
[{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]}]
(let [feeder (feeder-spout ["field1"])
- tracker (AckFailMapTracker.)
- _ (.setAckFailDelegate feeder tracker)
topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec feeder)}
{"2" (thrift/mk-bolt-spec {"1" :global} count-acks)})]
@@ -110,8 +108,6 @@
TOPOLOGY-STATS-SAMPLE-RATE 1.0
TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 60}]
(let [feeder (feeder-spout ["field1"])
- tracker (AckFailMapTracker.)
- _ (.setAckFailDelegate feeder tracker)
topology (thrift/mk-topology
{"myspout" (thrift/mk-spout-spec feeder)}
{"mybolt" (thrift/mk-bolt-spec {"myspout" :shuffle} acking-bolt)})]

0 comments on commit 4026463

Please sign in to comment.