Skip to content

Commit

Permalink
pre-get the user context
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Marz committed May 21, 2012
1 parent fd358a8 commit 4824212
Showing 1 changed file with 5 additions and 3 deletions.
8 changes: 5 additions & 3 deletions src/clj/backtype/storm/daemon/executor.clj
Expand Up @@ -243,6 +243,7 @@
(let [^KryoTupleDeserializer deserializer (:deserializer executor-data)
report-error-and-die (:report-error-and-die executor-data)]
(fn [[task-id msg] sequence-id end-of-batch?]
;; TODO: optimize by batching emits onto the transfer queue
(with-error-reaction report-error-and-die
;;(log-debug "Processing message " msg)
(let [^Tuple tuple (if (instance? Tuple msg) msg (.deserialize deserializer msg))]
Expand Down Expand Up @@ -410,6 +411,7 @@
(doseq [[task-id task-data] task-datas
:let [^IBolt bolt-obj (:object task-data)
tasks-fn (:tasks-fn task-data)
user-context (:user-context task-data)
bolt-emit (fn [stream anchors values task]
(let [out-tasks (if task
(tasks-fn task stream values)
Expand All @@ -433,7 +435,7 @@
(or out-tasks [])))]]
(.prepare bolt-obj
storm-conf
(:user-context task-data)
user-context
(OutputCollector.
(reify IOutputCollector
(emit [this stream anchors values]
Expand All @@ -448,7 +450,7 @@
[root (bit-xor id ack-val)])
))
(let [delta (tuple-time-delta! tuple-start-times tuple)]
(task/apply-hooks (:user-context task-data) .boltAck (BoltAckInfo. tuple delta))
(task/apply-hooks user-context .boltAck (BoltAckInfo. tuple delta))
(when (sampler)
(stats/bolt-acked-tuple! executor-stats
(.getSourceComponent tuple)
Expand All @@ -462,7 +464,7 @@
ACKER-FAIL-STREAM-ID
[root]))
(let [delta (tuple-time-delta! tuple-start-times tuple)]
(task/apply-hooks (:user-context task-data) .boltFail (BoltFailInfo. tuple delta))
(task/apply-hooks user-context .boltFail (BoltFailInfo. tuple delta))
(when (sampler)
(stats/bolt-failed-tuple! executor-stats
(.getSourceComponent tuple)
Expand Down

0 comments on commit 4824212

Please sign in to comment.