Skip to content

Commit

Permalink
Merge remote-tracking branch 'ooyala/0.8.0-better-hooks'
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Marz committed Aug 1, 2012
2 parents d47da10 + 3368d34 commit 58ce226
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 14 deletions.
14 changes: 8 additions & 6 deletions src/clj/backtype/storm/daemon/executor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -275,22 +275,24 @@
)))

(defn- fail-spout-msg [executor-data task-data msg-id tuple-info time-delta]
(let [^ISpout spout (:object task-data)]
(let [^ISpout spout (:object task-data)
task-id (:task-id task-data)]
;;TODO: need to throttle these when there's lots of failures
(log-message "Failing message " msg-id ": " tuple-info)
(.fail spout msg-id)
(task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id time-delta))
(task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id task-id time-delta))
(when time-delta
(stats/spout-failed-tuple! (:stats executor-data) (:stream tuple-info) time-delta)
)))

(defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta]
(let [storm-conf (:storm-conf executor-data)
^ISpout spout (:object task-data)]
^ISpout spout (:object task-data)
task-id (:task-id task-data)]
(when (= true (storm-conf TOPOLOGY-DEBUG))
(log-message "Acking message " msg-id))
(.ack spout msg-id)
(task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id time-delta))
(task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta))
(when time-delta
(stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) time-delta)
)))
Expand Down Expand Up @@ -525,7 +527,7 @@
[root (bit-xor id ack-val)])
))
(let [delta (tuple-time-delta! tuple)]
(task/apply-hooks user-context .boltAck (BoltAckInfo. tuple delta))
(task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta))
(when delta
(stats/bolt-acked-tuple! executor-stats
(.getSourceComponent tuple)
Expand All @@ -538,7 +540,7 @@
ACKER-FAIL-STREAM-ID
[root]))
(let [delta (tuple-time-delta! tuple)]
(task/apply-hooks user-context .boltFail (BoltFailInfo. tuple delta))
(task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta))
(when delta
(stats/bolt-failed-tuple! executor-stats
(.getSourceComponent tuple)
Expand Down
7 changes: 4 additions & 3 deletions src/clj/backtype/storm/daemon/task.clj
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@
stream->component->grouper (:stream->component->grouper executor-data)
user-context (:user-context task-data)
executor-stats (:stats executor-data)
debug? (= true (storm-conf TOPOLOGY-DEBUG))]
debug? (= true (storm-conf TOPOLOGY-DEBUG))
task-id (:task-id task-data)]
(fn ([^Integer out-task-id ^String stream ^List values]
(when debug?
(log-message "Emitting direct: " out-task-id "; " component-id " " stream " " values))
Expand All @@ -113,7 +114,7 @@
out-task-id (if grouping out-task-id)]
(when (and (not-nil? grouping) (not= :direct grouping))
(throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping")))
(apply-hooks user-context .emit (EmitInfo. values stream [out-task-id]))
(apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id]))
(when (emit-sampler)
(stats/emitted-tuple! executor-stats stream)
(if out-task-id
Expand All @@ -133,7 +134,7 @@
(.addAll out-tasks comp-tasks)
(.add out-tasks comp-tasks)
)))
(apply-hooks user-context .emit (EmitInfo. values stream out-tasks))
(apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks))
(when (emit-sampler)
(stats/emitted-tuple! executor-stats stream)
(stats/transferred-tuples! executor-stats stream (count out-tasks)))
Expand Down
4 changes: 3 additions & 1 deletion src/jvm/backtype/storm/hooks/info/BoltAckInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

public class BoltAckInfo {
public Tuple tuple;
public int ackingTaskId;
public Long processLatencyMs; // null if it wasn't sampled

public BoltAckInfo(Tuple tuple, Long processLatencyMs) {
public BoltAckInfo(Tuple tuple, int ackingTaskId, Long processLatencyMs) {
this.tuple = tuple;
this.ackingTaskId = ackingTaskId;
this.processLatencyMs = processLatencyMs;
}
}
4 changes: 3 additions & 1 deletion src/jvm/backtype/storm/hooks/info/BoltFailInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

public class BoltFailInfo {
public Tuple tuple;
public int failingTaskId;
public Long failLatencyMs; // null if it wasn't sampled

public BoltFailInfo(Tuple tuple, Long failLatencyMs) {
public BoltFailInfo(Tuple tuple, int failingTaskId, Long failLatencyMs) {
this.tuple = tuple;
this.failingTaskId = failingTaskId;
this.failLatencyMs = failLatencyMs;
}
}
4 changes: 3 additions & 1 deletion src/jvm/backtype/storm/hooks/info/EmitInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
public class EmitInfo {
public List<Object> values;
public String stream;
public int taskId;
public Collection<Integer> outTasks;

public EmitInfo(List<Object> values, String stream, Collection<Integer> outTasks) {
public EmitInfo(List<Object> values, String stream, int taskId, Collection<Integer> outTasks) {
this.values = values;
this.stream = stream;
this.taskId = taskId;
this.outTasks = outTasks;
}
}
4 changes: 3 additions & 1 deletion src/jvm/backtype/storm/hooks/info/SpoutAckInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

public class SpoutAckInfo {
public Object messageId;
public int spoutTaskId;
public Long completeLatencyMs; // null if it wasn't sampled

public SpoutAckInfo(Object messageId, Long completeLatencyMs) {
public SpoutAckInfo(Object messageId, int spoutTaskId, Long completeLatencyMs) {
this.messageId = messageId;
this.spoutTaskId = spoutTaskId;
this.completeLatencyMs = completeLatencyMs;
}
}
4 changes: 3 additions & 1 deletion src/jvm/backtype/storm/hooks/info/SpoutFailInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

public class SpoutFailInfo {
public Object messageId;
public int spoutTaskId;
public Long failLatencyMs; // null if it wasn't sampled

public SpoutFailInfo(Object messageId, Long failLatencyMs) {
public SpoutFailInfo(Object messageId, int spoutTaskId, Long failLatencyMs) {
this.messageId = messageId;
this.spoutTaskId = spoutTaskId;
this.failLatencyMs = failLatencyMs;
}
}

0 comments on commit 58ce226

Please sign in to comment.