From 84f9bca57c9ea820c6bd871e5f71287e9d1a19c5 Mon Sep 17 00:00:00 2001 From: Nathan Marz Date: Sun, 29 Jul 2012 14:21:29 -0700 Subject: [PATCH] worker checks and warns for missing outbound connections from assignment, now drops messages for which doesn't have outbound connection --- src/clj/backtype/storm/daemon/worker.clj | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/src/clj/backtype/storm/daemon/worker.clj b/src/clj/backtype/storm/daemon/worker.clj index 4b5f3a18e..7e6162e3d 100644 --- a/src/clj/backtype/storm/daemon/worker.clj +++ b/src/clj/backtype/storm/daemon/worker.clj @@ -223,10 +223,11 @@ (select-keys outbound-tasks) (#(map-val endpoint->string %))) ;; we dont need a connection for the local tasks anymore - needed-connections (->> my-assignment - (filter-key (complement (-> worker :task-ids set))) - vals - set) + needed-assignment (->> my-assignment + (filter-key (complement (-> worker :task-ids set)))) + needed-connections (-> needed-assignment vals set) + needed-tasks (-> needed-assignment keys) + current-connections (set (keys @(:cached-node+port->socket worker))) new-connections (set/difference needed-connections current-connections) remove-connections (set/difference current-connections needed-connections)] @@ -252,7 +253,12 @@ (:cached-node+port->socket worker) #(HashMap. (dissoc (into {} %1) %&)) remove-connections) - ))))) + + (let [missing-tasks (->> needed-tasks + (filter (complement my-assignment)))] + (when-not (empty? missing-tasks) + (log-warn "Missing assignment for following tasks: " (pr-str missing-tasks)) + ))))))) (defn refresh-storm-active ([worker] @@ -286,9 +292,10 @@ (fast-list-iter [[task ser-tuple] drainer] ;; TODO: consider write a batch of tuples here to every target worker ;; group by node+port, do multipart send - (let [socket (get node+port->socket (get task->node+port task))] - (msg/send socket task ser-tuple) - )))) + (let [node-port (get task->node+port task)] + (when node-port + (msg/send (get node+port->socket node-port) task ser-tuple)) + )))) (.clear drainer)))))) (defn launch-receive-thread [worker]