Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Storm metrics system #390

Merged
merged 14 commits into from Jan 24, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion conf/defaults.yaml
Expand Up @@ -87,6 +87,7 @@ topology.max.task.parallelism: null
topology.max.spout.pending: null topology.max.spout.pending: null
topology.state.synchronization.timeout.secs: 60 topology.state.synchronization.timeout.secs: 60
topology.stats.sample.rate: 0.05 topology.stats.sample.rate: 0.05
topology.builtin.metrics.bucket.size.secs: 60
topology.fall.back.on.java.serialization: true topology.fall.back.on.java.serialization: true
topology.worker.childopts: null topology.worker.childopts: null
topology.executor.receive.buffer.size: 1024 #batched topology.executor.receive.buffer.size: 1024 #batched
Expand All @@ -103,5 +104,4 @@ topology.max.error.report.per.interval: 5
topology.kryo.factory: "backtype.storm.serialization.DefaultKryoFactory" topology.kryo.factory: "backtype.storm.serialization.DefaultKryoFactory"
topology.trident.batch.emit.interval.millis: 500 topology.trident.batch.emit.interval.millis: 500



dev.zookeeper.path: "/tmp/dev-storm-zookeeper" dev.zookeeper.path: "/tmp/dev-storm-zookeeper"
7 changes: 7 additions & 0 deletions conf/storm.yaml.example
Expand Up @@ -21,3 +21,10 @@
# drpc.servers: # drpc.servers:
# - "server1" # - "server1"
# - "server2" # - "server2"

## Metrics Consumers
# topology.metrics.consumers.register:
# - class: "org.mycompany.MyMetricsConsumer"
# argument:
# - endpoint: "metrics-collector.mycompany.org"
# parallelism.hint: 1
2 changes: 1 addition & 1 deletion project.clj
Expand Up @@ -3,7 +3,7 @@
(do (println (str "ERROR: requires Leiningen 1.x but you are using " lein-version)) (do (println (str "ERROR: requires Leiningen 1.x but you are using " lein-version))
(System/exit 1))) (System/exit 1)))


(defproject storm "0.8.2-wip15" (defproject storm "0.8.2-wip16"
:source-path "src/clj" :source-path "src/clj"
:test-path "test/clj" :test-path "test/clj"
:java-source-path "src/jvm" :java-source-path "src/jvm"
Expand Down
64 changes: 64 additions & 0 deletions src/clj/backtype/storm/daemon/builtin_metrics.clj
@@ -0,0 +1,64 @@
(ns backtype.storm.daemon.builtin-metrics
(:import [backtype.storm.metric.api MultiCountMetric MultiReducedMetric MeanReducer])
(:import [backtype.storm Config])
(:use [backtype.storm.stats :only [stats-rate]]))

(defrecord BuiltinSpoutMetrics [^MultiCountMetric ack-count
^MultiReducedMetric complete-latency
^MultiCountMetric fail-count
^MultiCountMetric emit-count
^MultiCountMetric transfer-count])
(defrecord BuiltinBoltMetrics [^MultiCountMetric ack-count
^MultiReducedMetric process-latency
^MultiCountMetric fail-count
^MultiCountMetric execute-count
^MultiReducedMetric execute-latency
^MultiCountMetric emit-count
^MultiCountMetric transfer-count])

(defn make-data [executor-type]
(condp = executor-type
:spout (BuiltinSpoutMetrics. (MultiCountMetric.)
(MultiReducedMetric. (MeanReducer.))
(MultiCountMetric.)
(MultiCountMetric.)
(MultiCountMetric.))
:bolt (BuiltinBoltMetrics. (MultiCountMetric.)
(MultiReducedMetric. (MeanReducer.))
(MultiCountMetric.)
(MultiCountMetric.)
(MultiReducedMetric. (MeanReducer.))
(MultiCountMetric.)
(MultiCountMetric.))))

(defn register-all [builtin-metrics storm-conf topology-context]
(doseq [[kw imetric] builtin-metrics]
(.registerMetric topology-context (str "__" (name kw)) imetric
(int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)))))

(defn spout-acked-tuple! [^BuiltinSpoutMetrics m stats stream latency-ms]
(-> m .ack-count (.scope stream) (.incrBy (stats-rate stats)))
(-> m .complete-latency (.scope stream) (.update latency-ms)))

(defn spout-failed-tuple! [^BuiltinSpoutMetrics m stats stream]
(-> m .fail-count (.scope stream) (.incrBy (stats-rate stats))))

(defn bolt-execute-tuple! [^BuiltinBoltMetrics m stats comp-id stream latency-ms]
(let [scope (str comp-id ":" stream)]
(-> m .execute-count (.scope scope) (.incrBy (stats-rate stats)))
(-> m .execute-latency (.scope scope) (.update latency-ms))))

(defn bolt-acked-tuple! [^BuiltinBoltMetrics m stats comp-id stream latency-ms]
(let [scope (str comp-id ":" stream)]
(-> m .ack-count (.scope scope) (.incrBy (stats-rate stats)))
(-> m .process-latency (.scope scope) (.update latency-ms))))

(defn bolt-failed-tuple! [^BuiltinBoltMetrics m stats comp-id stream]
(let [scope (str comp-id ":" stream)]
(-> m .fail-count (.scope scope) (.incrBy (stats-rate stats)))))

(defn emitted-tuple! [m stats stream]
(-> m :emit-count (.scope stream) (.incrBy (stats-rate stats))))

(defn transferred-tuple! [m stats stream num-out-tasks]
(-> m :transfer-count (.scope stream) (.incrBy (* num-out-tasks (stats-rate stats)))))
73 changes: 63 additions & 10 deletions src/clj/backtype/storm/daemon/common.clj
Expand Up @@ -23,6 +23,8 @@


(def SYSTEM-COMPONENT-ID Constants/SYSTEM_COMPONENT_ID) (def SYSTEM-COMPONENT-ID Constants/SYSTEM_COMPONENT_ID)
(def SYSTEM-TICK-STREAM-ID Constants/SYSTEM_TICK_STREAM_ID) (def SYSTEM-TICK-STREAM-ID Constants/SYSTEM_TICK_STREAM_ID)
(def METRICS-STREAM-ID Constants/METRICS_STREAM_ID)
(def METRICS-TICK-STREAM-ID Constants/METRICS_TICK_STREAM_ID)


;; the task id is the virtual port ;; the task id is the virtual port
;; node->host is here so that tasks know who to talk to just from assignment ;; node->host is here so that tasks know who to talk to just from assignment
Expand Down Expand Up @@ -206,27 +208,78 @@
(.put_to_bolts ret "__acker" acker-bolt) (.put_to_bolts ret "__acker" acker-bolt)
)) ))


(defn add-metric-streams! [^StormTopology topology]
(doseq [[_ component] (all-components topology)
:let [common (.get_common component)]]
(.put_to_streams common METRICS-STREAM-ID
(thrift/output-fields ["task-info" "data-points"]))))

(defn add-system-streams! [^StormTopology topology] (defn add-system-streams! [^StormTopology topology]
(doseq [[_ component] (all-components topology) (doseq [[_ component] (all-components topology)
:let [common (.get_common component)]] :let [common (.get_common component)]]
(.put_to_streams common SYSTEM-STREAM-ID (thrift/output-fields ["event"])) (.put_to_streams common SYSTEM-STREAM-ID (thrift/output-fields ["event"]))))
;; TODO: consider adding a stats stream for stats aggregation
))
(defn map-occurrences [afn coll]
(->> coll
(reduce (fn [[counts new-coll] x]
(let [occurs (inc (get counts x 0))]
[(assoc counts x occurs) (cons (afn x occurs) new-coll)]))
[{} []])
(second)
(reverse)))

(defn number-duplicates [coll]
"(number-duplicates [\"a\", \"b\", \"a\"]) => [\"a\", \"b\", \"a#2\"]"
(map-occurrences (fn [x occurences] (if (>= occurences 2) (str x "#" occurences) x)) coll))

(defn metrics-consumer-register-ids [storm-conf]
"Generates a list of component ids for each metrics consumer
e.g. [\"__metrics_org.mycompany.MyMetricsConsumer\", ..] "
(->> (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER)
(map #(get % "class"))
(number-duplicates)
(map #(str Constants/METRICS_COMPONENT_ID_PREFIX %))))

(defn metrics-consumer-bolt-specs [components-ids-that-emit-metrics storm-conf]
(let [inputs (->> (for [comp-id components-ids-that-emit-metrics]
{[comp-id METRICS-STREAM-ID] :shuffle})
(into {}))

mk-bolt-spec (fn [class arg p]
(thrift/mk-bolt-spec*
inputs
(backtype.storm.metric.MetricsConsumerBolt. class arg)
{} :p p :conf {TOPOLOGY-TASKS p}))]

(map
(fn [component-id register]
[component-id (mk-bolt-spec (get register "class")
(get register "argument")
(or (get register "parallelism.hint") 1))])

(metrics-consumer-register-ids storm-conf)
(get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER))))

(defn add-metric-components! [storm-conf ^StormTopology topology]
(doseq [[comp-id bolt-spec] (metrics-consumer-bolt-specs (keys (all-components topology)) storm-conf)]
(.put_to_bolts topology comp-id bolt-spec)))


(defn add-system-components! [^StormTopology topology] (defn add-system-components! [^StormTopology topology]
(let [system-spout (thrift/mk-spout-spec* (let [system-spout (thrift/mk-spout-spec*
(NoOpSpout.) (NoOpSpout.)
{SYSTEM-TICK-STREAM-ID (thrift/output-fields ["rate_secs"]) {SYSTEM-TICK-STREAM-ID (thrift/output-fields ["rate_secs"])
} METRICS-TICK-STREAM-ID (thrift/output-fields ["interval"])}
:p 0 :p 0
:conf {TOPOLOGY-TASKS 0})] :conf {TOPOLOGY-TASKS 0})]
(.put_to_spouts topology SYSTEM-COMPONENT-ID system-spout) (.put_to_spouts topology SYSTEM-COMPONENT-ID system-spout)))
))


(defn system-topology! [storm-conf ^StormTopology topology] (defn system-topology! [storm-conf ^StormTopology topology]
(validate-basic! topology) (validate-basic! topology)
(let [ret (.deepCopy topology)] (let [ret (.deepCopy topology)]
(add-acker! storm-conf ret) (add-acker! storm-conf ret)
(add-metric-components! storm-conf ret)
(add-metric-streams! ret)
(add-system-streams! ret) (add-system-streams! ret)
(add-system-components! ret) (add-system-components! ret)
(validate-structure! ret) (validate-structure! ret)
Expand Down