Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

IMetricsConsumer handles data points in batches.

  • Loading branch information...
commit d4d8e9bd4696f128f454b45da57c23a8283b0fc7 1 parent a45098d
Jason Jackson authored
View
4 conf/defaults.yaml
@@ -103,5 +103,9 @@ topology.max.error.report.per.interval: 5
topology.kryo.factory: "backtype.storm.serialization.DefaultKryoFactory"
topology.trident.batch.emit.interval.millis: 500
+### register classes used in implementation of metrics api.
+topology.kryo.register:
+ - backtype.storm.metric.api.IMetricsConsumer$TaskInfo
+ - backtype.storm.metric.api.IMetricsConsumer$DataPoint
dev.zookeeper.path: "/tmp/dev-storm-zookeeper"
View
4 src/clj/backtype/storm/daemon/common.clj
@@ -212,7 +212,7 @@
(doseq [[_ component] (all-components topology)
:let [common (.get_common component)]]
(.put_to_streams common METRICS-STREAM-ID
- (thrift/output-fields ["worker-host" "worker-port" "interval" "timestamp" "name" "value"]))))
+ (thrift/output-fields ["task-info" "data-points"]))))
(defn add-system-streams! [^StormTopology topology]
(doseq [[_ component] (all-components topology)
@@ -230,7 +230,7 @@
(reverse)))
(defn number-duplicates [coll]
- "(number-duplicates [\"a\", \"b\", \"a\"]) => [\"a\", \"b\", \"a2\"]"
+ "(number-duplicates [\"a\", \"b\", \"a\"]) => [\"a\", \"b\", \"a#2\"]"
(map-occurrences (fn [x occurences] (if (>= occurences 2) (str x "#" occurences) x)) coll))
(defn metrics-consumer-register-ids [storm-conf]
View
35 src/clj/backtype/storm/daemon/executor.clj
@@ -7,7 +7,7 @@
(:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo
EmitInfo BoltFailInfo BoltAckInfo BoltExecuteInfo])
(:import [backtype.storm.metric MetricHolder])
- (:import [backtype.storm.metric.api IMetric])
+ (:import [backtype.storm.metric.api IMetric IMetricsConsumer$TaskInfo IMetricsConsumer$DataPoint])
(:require [backtype.storm [tuple :as tuple]])
(:require [backtype.storm.daemon [task :as task]])
)
@@ -214,7 +214,7 @@
:type executor-type
;; TODO: should refactor this to be part of the executor specific map (spout or bolt with :common field)
:stats (mk-executor-stats <> (sampling-rate storm-conf))
- :registered-metrics (ArrayList.)
+ :interval->task->registered-metrics (HashMap.)
:task->component (:task->component worker)
:stream->component->grouper (outbound-components worker-context component-id)
:report-error (throttled-report-error-fn <>)
@@ -244,8 +244,8 @@
:kill-fn (:report-error-and-die executor-data))))
(defn setup-metrics! [executor-data]
- (let [{:keys [storm-conf receive-queue worker-context registered-metrics]} executor-data
- distinct-time-bucket-intervals (->> registered-metrics (map #(.getTimeBucketIntervalInSecs %)) distinct)]
+ (let [{:keys [storm-conf receive-queue worker-context interval->task->registered-metrics]} executor-data
+ distinct-time-bucket-intervals (keys interval->task->registered-metrics)]
(doseq [interval distinct-time-bucket-intervals]
(schedule-recurring
(:user-timer (:worker executor-data))
@@ -257,18 +257,23 @@
[[nil (TupleImpl. worker-context [interval] -1 Constants/METRICS_TICK_STREAM_ID)]]))))))
(defn metrics-tick [executor-data task-datas ^TupleImpl tuple]
- (let [{:keys [registered-metrics ^WorkerTopologyContext worker-context]} executor-data
+ (let [{:keys [interval->task->registered-metrics ^WorkerTopologyContext worker-context]} executor-data
interval (.getInteger tuple 0)]
- (doseq [^MetricHolder mh registered-metrics]
- (when (= interval (.getTimeBucketIntervalInSecs mh))
- (let [^IMetric metric (.getMetric mh)
- name (.getName mh)
- value (.getValueAndReset metric)
- timestamp (System/currentTimeMillis)
- worker-host (. (java.net.InetAddress/getLocalHost) getCanonicalHostName)
- worker-port (.getThisWorkerPort worker-context)]
- (doseq [[task-id task-data] task-datas]
- (task/send-unanchored task-data Constants/METRICS_STREAM_ID [worker-host worker-port interval timestamp name value])))))))
+ (doseq [[task-id task-data] task-datas
+ :let [metric-holders (-> interval->task->registered-metrics (get interval) (get task-id))
+ task-info (IMetricsConsumer$TaskInfo.
+ (. (java.net.InetAddress/getLocalHost) getCanonicalHostName)
+ (.getThisWorkerPort worker-context)
+ (:component-id executor-data)
+ task-id
+ (System/currentTimeMillis)
+ interval)
+ data-points (->> metric-holders
+ (map (fn [^MetricHolder mh]
+ (IMetricsConsumer$DataPoint. (.name mh)
+ (.getValueAndReset ^IMetric (.metric mh)))))
+ (into []))]]
+ (task/send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points]))))
(defn setup-ticks! [worker executor-data]
(let [storm-conf (:storm-conf executor-data)
View
2  src/clj/backtype/storm/daemon/task.clj
@@ -28,7 +28,7 @@
(:default-shared-resources worker)
(:user-shared-resources worker)
(:shared-executor-data executor-data)
- (:registered-metrics executor-data))))
+ (:interval->task->registered-metrics executor-data))))
(defn system-topology-context [worker executor-data tid]
((mk-topology-context-builder
View
2  src/clj/backtype/storm/testing.clj
@@ -588,6 +588,6 @@
{}
{}
(HashMap.)
- (ArrayList.))]
+ (HashMap.))]
(TupleImpl. context values 1 stream)
))
View
16 src/jvm/backtype/storm/metric/MetricHolder.java
@@ -3,17 +3,11 @@
import backtype.storm.metric.api.IMetric;
public class MetricHolder {
- private String _name;
- private int _timeBucketIntervalInSecs;
- private IMetric _metric;
+ public String name;
+ public IMetric metric;
- public MetricHolder(String name, IMetric metric, int timeBucketIntervalInSecs) {
- _name = name;
- _timeBucketIntervalInSecs = timeBucketIntervalInSecs;
- _metric = metric;
+ public MetricHolder(String name, IMetric metric) {
+ this.name = name;
+ this.metric = metric;
}
-
- public String getName() { return _name; }
- public int getTimeBucketIntervalInSecs() { return _timeBucketIntervalInSecs; }
- public IMetric getMetric() { return _metric; }
}
View
13 src/jvm/backtype/storm/metric/MetricsConsumerBolt.java
@@ -6,6 +6,7 @@
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Tuple;
+import java.util.Collection;
import java.util.Map;
public class MetricsConsumerBolt implements IBolt {
@@ -33,17 +34,7 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll
@Override
public void execute(Tuple input) {
- IMetricsConsumer.DataPoint d = new IMetricsConsumer.DataPoint();
- d.srcComponentId = input.getSourceComponent();
- d.srcTaskId = input.getSourceTask();
- d.srcWorkerHost = input.getString(0);
- d.srcWorkerPort = input.getInteger(1);
- d.updateIntervalSecs = input.getInteger(2);
- d.timestamp = input.getLong(3);
- d.name = input.getString(4);
- d.value = input.getValue(5);
-
- _metricsConsumer.handleDataPoint(d);
+ _metricsConsumer.handleDataPoints((IMetricsConsumer.TaskInfo)input.getValue(0), (Collection)input.getValue(1));
_collector.ack(input);
}
View
25 src/jvm/backtype/storm/metric/api/IMetricsConsumer.java
@@ -1,21 +1,40 @@
package backtype.storm.metric.api;
import backtype.storm.task.TopologyContext;
+import java.util.Collection;
import java.util.Map;
public interface IMetricsConsumer {
- public static class DataPoint {
+ public static class TaskInfo {
+ public TaskInfo(String srcWorkerHost, int srcWorkerPort, String srcComponentId, int srcTaskId, long timestamp, int updateIntervalSecs) {
+ this.srcWorkerHost = srcWorkerHost;
+ this.srcWorkerPort = srcWorkerPort;
+ this.srcComponentId = srcComponentId;
+ this.srcTaskId = srcTaskId;
+ this.timestamp = timestamp;
+ this.updateIntervalSecs = updateIntervalSecs;
+ }
public String srcWorkerHost;
public int srcWorkerPort;
public String srcComponentId;
public int srcTaskId;
public long timestamp;
public int updateIntervalSecs;
+ }
+ public static class DataPoint {
+ public DataPoint(String name, Object value) {
+ this.name = name;
+ this.value = value;
+ }
+ @Override
+ public String toString() {
+ return "[" + name + " = " + value + "]";
+ }
public String name;
public Object value;
}
- void prepare(Map stormConf, Object registrationOptions, TopologyContext context);
- void handleDataPoint(DataPoint dataPoint);
+ void prepare(Map stormConf, Object registrationArgument, TopologyContext context);
+ void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints);
void cleanup();
}
View
18 src/jvm/backtype/storm/task/TopologyContext.java
@@ -33,7 +33,7 @@
private Map<String, Object> _taskData = new HashMap<String, Object>();
private List<ITaskHook> _hooks = new ArrayList<ITaskHook>();
private Map<String, Object> _executorData;
- private List<MetricHolder> _registeredMetrics;
+ private Map<Integer,Map<Integer, Collection<MetricHolder>>> _registeredMetrics;
public TopologyContext(StormTopology topology, Map stormConf,
@@ -41,7 +41,7 @@ public TopologyContext(StormTopology topology, Map stormConf,
Map<String, Map<String, Fields>> componentToStreamToFields,
String stormId, String codeDir, String pidDir, Integer taskId,
Integer workerPort, List<Integer> workerTasks, Map<String, Object> defaultResources,
- Map<String, Object> userResources, Map<String, Object> executorData, List<MetricHolder> registeredMetrics) {
+ Map<String, Object> userResources, Map<String, Object> executorData, Map registeredMetrics) {
super(topology, stormConf, taskToComponent, componentToSortedTasks,
componentToStreamToFields, stormId, codeDir, pidDir,
workerPort, workerTasks, defaultResources, userResources);
@@ -205,7 +205,19 @@ public void addTaskHook(ITaskHook hook) {
* @return The IMetric argument unchanged.
*/
public IMetric registerMetric(String name, IMetric metric, int timeBucketSizeInSecs) {
- _registeredMetrics.add(new MetricHolder(name, metric, timeBucketSizeInSecs));
+ Map m1 = _registeredMetrics;
+ if(!m1.containsKey(timeBucketSizeInSecs)) {
+ m1.put(timeBucketSizeInSecs, new HashMap());
+ }
+
+ Map m2 = (Map)m1.get(timeBucketSizeInSecs);
+ if(!m2.containsKey(_taskId)) {
+ m2.put(_taskId, new ArrayList());
+ }
+
+ Collection c1 = (Collection)m2.get(_taskId);
+ c1.add(new MetricHolder(name, metric));
+
return metric;
}
Please sign in to comment.
Something went wrong with that request. Please try again.