Skip to content

Commit

Permalink
worker checks and warns for missing outbound connections from assignm…
Browse files Browse the repository at this point in the history
…ent, now drops messages for which doesn't have outbound connection
  • Loading branch information
Nathan Marz committed Jul 29, 2012
1 parent 57fb6ce commit 84f9bca
Showing 1 changed file with 15 additions and 8 deletions.
23 changes: 15 additions & 8 deletions src/clj/backtype/storm/daemon/worker.clj
Expand Up @@ -223,10 +223,11 @@
(select-keys outbound-tasks)
(#(map-val endpoint->string %)))
;; we dont need a connection for the local tasks anymore
needed-connections (->> my-assignment
(filter-key (complement (-> worker :task-ids set)))
vals
set)
needed-assignment (->> my-assignment
(filter-key (complement (-> worker :task-ids set))))
needed-connections (-> needed-assignment vals set)
needed-tasks (-> needed-assignment keys)

current-connections (set (keys @(:cached-node+port->socket worker)))
new-connections (set/difference needed-connections current-connections)
remove-connections (set/difference current-connections needed-connections)]
Expand All @@ -252,7 +253,12 @@
(:cached-node+port->socket worker)
#(HashMap. (dissoc (into {} %1) %&))
remove-connections)
)))))

(let [missing-tasks (->> needed-tasks
(filter (complement my-assignment)))]
(when-not (empty? missing-tasks)
(log-warn "Missing assignment for following tasks: " (pr-str missing-tasks))
)))))))

(defn refresh-storm-active
([worker]
Expand Down Expand Up @@ -286,9 +292,10 @@
(fast-list-iter [[task ser-tuple] drainer]
;; TODO: consider write a batch of tuples here to every target worker
;; group by node+port, do multipart send
(let [socket (get node+port->socket (get task->node+port task))]
(msg/send socket task ser-tuple)
))))
(let [node-port (get task->node+port task)]
(when node-port
(msg/send (get node+port->socket node-port) task ser-tuple))
))))
(.clear drainer))))))

(defn launch-receive-thread [worker]
Expand Down

0 comments on commit 84f9bca

Please sign in to comment.