Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

add debug logging to trace tuples

  • Loading branch information...
commit 9e68692335f433c5fdce8e0b38aa9b35baf0b27d 1 parent fbcbd93
@nathanmarz authored
View
2  src/clj/backtype/storm/daemon/executor.clj
@@ -200,7 +200,9 @@
(disruptor/handler [o seq-id batch-end?]
(let [^ArrayList alist (.getObject cached-emit)]
(.add alist o)
+ (log-message "Received " o " in batch-transfer->worker handler")
(when batch-end?
+ (log-message "Sending " alist " to worker-transfer-fn")
(worker-transfer-fn serializer alist)
(.setObject cached-emit (ArrayList.))
)))
View
5 src/clj/backtype/storm/daemon/worker.clj
@@ -87,6 +87,7 @@
local-transfer (:transfer-local-fn worker)
^DisruptorQueue transfer-queue (:transfer-queue worker)]
(fn [^KryoTupleSerializer serializer tuple-batch]
+ (log-message "In worker transfer-fn: " tuple-batch)
(let [local (ArrayList.)
remote (ArrayList.)]
(fast-list-iter [[task tuple :as pair] tuple-batch]
@@ -270,9 +271,12 @@
node+port->socket (:cached-node+port->socket worker)
task->node+port (:cached-task->node+port worker)
endpoint-socket-lock (:endpoint-socket-lock worker)
+ des (backtype.storm.serialization.KryoTupleDeserializer. (read-storm-config) (worker-context worker))
+
]
(disruptor/clojure-handler
(fn [packets _ batch-end?]
+ (log-message "Transferring " (dofor [[t msg] packets] (.deserialize des msg)))
(.addAll drainer packets)
(when batch-end?
(read-locked endpoint-socket-lock
@@ -297,6 +301,7 @@
(:port worker)
(:transfer-local-fn worker)
(-> worker :storm-conf (get TOPOLOGY-RECEIVER-BUFFER-SIZE))
+ (worker-context worker)
:kill-fn (fn [t] (halt-process! 11))))
(defn- close-resources [worker]
View
13 src/clj/backtype/storm/messaging/loader.clj
@@ -1,5 +1,5 @@
(ns backtype.storm.messaging.loader
- (:use [backtype.storm util log])
+ (:use [backtype.storm util log config])
(:import [java.util ArrayList])
(:import [backtype.storm.utils DisruptorQueue MutableObject])
(:require [backtype.storm.messaging [local :as local] [protocol :as msg]])
@@ -16,18 +16,21 @@
(apply afn args)))
(defnk launch-receive-thread!
- [context storm-id port transfer-local-fn max-buffer-size
+ [context storm-id port transfer-local-fn max-buffer-size tcontext
:daemon true
:kill-fn (fn [t] (System/exit 1))
:priority Thread/NORM_PRIORITY]
(let [max-buffer-size (int max-buffer-size)
vthread (async-loop
(fn []
- (let [socket (msg/bind context storm-id port)]
+ (let [des (backtype.storm.serialization.KryoTupleDeserializer. (read-storm-config) tcontext)
+ socket (msg/bind context storm-id port)]
(fn []
(let [batched (ArrayList.)
init (msg/recv socket)]
(loop [[task msg :as packet] init]
+ (when (and msg (not= task -1))
+ (log-message "Receive thread " task " " (.deserialize des msg)))
(if (= task -1)
(do (log-message "Receiving-thread:[" storm-id ", " port "] received shutdown notice")
(.close socket)
@@ -36,7 +39,9 @@
(when packet (.add batched packet))
(if (and packet (< (.size batched) max-buffer-size))
(recur (msg/recv-with-flags socket 1))
- (do (transfer-local-fn batched)
+ (do
+ ;; (log-message "Receive thread sending local batch" (dofor [msg batched] (deserialize-tuple context msg)))
+ (transfer-local-fn batched)
0 )))))))))
:factory? true
:daemon daemon
View
1  src/clj/backtype/storm/testing.clj
@@ -336,6 +336,7 @@
(extend-type FixedTupleSpout
CompletableSpout
(exhausted? [this]
+ (println "EXHAUST: " (-> this .getSourceTuples count) " vs " (.getCompleted this))
(= (-> this .getSourceTuples count)
(.getCompleted this)))
(cleanup [this]
Please sign in to comment.
Something went wrong with that request. Please try again.