Skip to content

Commit

Permalink
pre-get the user context
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Marz committed May 21, 2012
1 parent fd358a8 commit 4824212
Showing 1 changed file with 5 additions and 3 deletions.
8 changes: 5 additions & 3 deletions src/clj/backtype/storm/daemon/executor.clj
Expand Up @@ -243,6 +243,7 @@
(let [^KryoTupleDeserializer deserializer (:deserializer executor-data) (let [^KryoTupleDeserializer deserializer (:deserializer executor-data)
report-error-and-die (:report-error-and-die executor-data)] report-error-and-die (:report-error-and-die executor-data)]
(fn [[task-id msg] sequence-id end-of-batch?] (fn [[task-id msg] sequence-id end-of-batch?]
;; TODO: optimize by batching emits onto the transfer queue
(with-error-reaction report-error-and-die (with-error-reaction report-error-and-die
;;(log-debug "Processing message " msg) ;;(log-debug "Processing message " msg)
(let [^Tuple tuple (if (instance? Tuple msg) msg (.deserialize deserializer msg))] (let [^Tuple tuple (if (instance? Tuple msg) msg (.deserialize deserializer msg))]
Expand Down Expand Up @@ -410,6 +411,7 @@
(doseq [[task-id task-data] task-datas (doseq [[task-id task-data] task-datas
:let [^IBolt bolt-obj (:object task-data) :let [^IBolt bolt-obj (:object task-data)
tasks-fn (:tasks-fn task-data) tasks-fn (:tasks-fn task-data)
user-context (:user-context task-data)
bolt-emit (fn [stream anchors values task] bolt-emit (fn [stream anchors values task]
(let [out-tasks (if task (let [out-tasks (if task
(tasks-fn task stream values) (tasks-fn task stream values)
Expand All @@ -433,7 +435,7 @@
(or out-tasks [])))]] (or out-tasks [])))]]
(.prepare bolt-obj (.prepare bolt-obj
storm-conf storm-conf
(:user-context task-data) user-context
(OutputCollector. (OutputCollector.
(reify IOutputCollector (reify IOutputCollector
(emit [this stream anchors values] (emit [this stream anchors values]
Expand All @@ -448,7 +450,7 @@
[root (bit-xor id ack-val)]) [root (bit-xor id ack-val)])
)) ))
(let [delta (tuple-time-delta! tuple-start-times tuple)] (let [delta (tuple-time-delta! tuple-start-times tuple)]
(task/apply-hooks (:user-context task-data) .boltAck (BoltAckInfo. tuple delta)) (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple delta))
(when (sampler) (when (sampler)
(stats/bolt-acked-tuple! executor-stats (stats/bolt-acked-tuple! executor-stats
(.getSourceComponent tuple) (.getSourceComponent tuple)
Expand All @@ -462,7 +464,7 @@
ACKER-FAIL-STREAM-ID ACKER-FAIL-STREAM-ID
[root])) [root]))
(let [delta (tuple-time-delta! tuple-start-times tuple)] (let [delta (tuple-time-delta! tuple-start-times tuple)]
(task/apply-hooks (:user-context task-data) .boltFail (BoltFailInfo. tuple delta)) (task/apply-hooks user-context .boltFail (BoltFailInfo. tuple delta))
(when (sampler) (when (sampler)
(stats/bolt-failed-tuple! executor-stats (stats/bolt-failed-tuple! executor-stats
(.getSourceComponent tuple) (.getSourceComponent tuple)
Expand Down

0 comments on commit 4824212

Please sign in to comment.