Permalink
Browse files

added parameter OutputCollector to IMetricsConsumer::prepare, so that…

… MetricsConsumer can reportError.
  • Loading branch information...
1 parent b375d5a commit d32701f554bee472f9e848194a8d2070ec31f879 Jason Jackson committed Nov 2, 2012
@@ -266,14 +266,15 @@
(.getThisWorkerPort worker-context)
(:component-id executor-data)
task-id
- (System/currentTimeMillis)
+ (long (/ (System/currentTimeMillis) 1000))
interval)
data-points (->> metric-holders
(map (fn [^MetricHolder mh]
(IMetricsConsumer$DataPoint. (.name mh)
(.getValueAndReset ^IMetric (.metric mh)))))
(into []))]]
- (task/send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points]))))
+ (if data-points
+ (task/send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points])))))
(defn setup-ticks! [worker executor-data]
(let [storm-conf (:storm-conf executor-data)
@@ -28,7 +28,7 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll
throw new RuntimeException("Could not instantiate a class listed in config under section " +
Config.TOPOLOGY_METRICS_CONSUMER_REGISTER + " with fully qualified name " + _consumerClassName, e);
}
- _metricsConsumer.prepare(stormConf, _registrationArgument, context);
+ _metricsConsumer.prepare(stormConf, _registrationArgument, context, collector);
_collector = collector;
}
@@ -1,5 +1,6 @@
package backtype.storm.metric.api;
+import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import java.util.Collection;
import java.util.Map;
@@ -34,7 +35,7 @@ public String toString() {
public Object value;
}
- void prepare(Map stormConf, Object registrationArgument, TopologyContext context);
+ void prepare(Map stormConf, Object registrationArgument, TopologyContext context, OutputCollector outputCollector);
void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints);
void cleanup();
}

0 comments on commit d32701f

Please sign in to comment.