Skip to content

Commit

Permalink
optimize map usage in tasks-fn by using HashMap
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Marz committed May 22, 2012
1 parent e882800 commit 11862aa
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 26 deletions.
7 changes: 4 additions & 3 deletions conf/defaults.yaml
Expand Up @@ -85,7 +85,8 @@ topology.state.synchronization.timeout.secs: 60
topology.stats.sample.rate: 0.05
topology.fall.back.on.java.serialization: true
topology.worker.childopts: null
topology.executor.receive.buffer.size: 16384
topology.executor.send.buffer.size: 16384
topology.transfer.buffer.size: 1048576
topology.executor.receive.buffer.size: 8192 #batched
topology.executor.send.buffer.size: 16384 #individual messages
topology.transfer.buffer.size: 32 # batched

dev.zookeeper.path: "/tmp/dev-storm-zookeeper"
47 changes: 27 additions & 20 deletions src/clj/backtype/storm/daemon/executor.clj
Expand Up @@ -75,29 +75,35 @@
:direct
)))

(defn- outbound-groupings [^WorkerTopologyContext worker-context out-fields component->grouping]
(->> component->grouping
(filter-key #(-> worker-context
(.getComponentTasks %)
count
pos?))
(map (fn [[component tgrouping]]
[component
(mk-grouper worker-context
out-fields
tgrouping
(.getComponentTasks worker-context component)
)]))
(into {})
(HashMap.)))

(defn outbound-components
"Returns map of stream id to component id to grouper"
[^WorkerTopologyContext worker-context component-id]
(let [output-groupings (clojurify-structure (.getTargets worker-context component-id))]
(into {}
(for [[stream-id component->grouping] output-groupings
:let [out-fields (.getComponentOutputFields worker-context component-id stream-id)
component->grouping (filter-key #(-> worker-context
(.getComponentTasks %)
count
pos?)
component->grouping)]]
[stream-id
(into {}
(for [[component tgrouping] component->grouping]
[component (mk-grouper worker-context
out-fields
tgrouping
(.getComponentTasks worker-context component)
)]
))]))))


(->> (.getTargets worker-context component-id)
clojurify-structure
(map (fn [[stream-id component->grouping]]
[stream-id
(outbound-groupings
worker-context
(.getComponentOutputFields worker-context component-id stream-id)
component->grouping)]))
(into {})
(HashMap.)))

(defn executor-type [^WorkerTopologyContext context component-id]
(let [topology (.getRawTopology context)
Expand Down Expand Up @@ -278,6 +284,7 @@

pending (TimeCacheMap.
(int (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS))
2 ;; microoptimize for performance of .size method
(reify TimeCacheMap$ExpiredCallback
(expire [this msg-id [task-id spout-id tuple-info start-time-ms]]
(let [time-delta (time-delta-ms start-time-ms)]
Expand Down
4 changes: 2 additions & 2 deletions src/clj/backtype/storm/daemon/task.clj
Expand Up @@ -96,7 +96,7 @@
(when (= true (storm-conf TOPOLOGY-DEBUG))
(log-message "Emitting direct: " out-task-id "; " component-id " " stream " " values))
(let [target-component (.getComponentId worker-context out-task-id)
component->grouping (stream->component->grouper stream)
component->grouping (get stream->component->grouper stream)
grouping (get component->grouping target-component)
out-task-id (if grouping out-task-id)]
(when (and (not-nil? grouping) (not= :direct grouping))
Expand All @@ -112,7 +112,7 @@
(when (= true (storm-conf TOPOLOGY-DEBUG))
(log-message "Emitting: " component-id " " stream " " values))
(let [out-tasks (ArrayList.)]
(doseq [[out-component grouper] (stream->component->grouper stream)]
(doseq [[out-component grouper] (get stream->component->grouper stream)]
(when (= :direct grouper)
;; TODO: this is wrong, need to check how the stream was declared
(throw (IllegalArgumentException. "Cannot do regular emit to direct stream")))
Expand Down
2 changes: 1 addition & 1 deletion src/jvm/backtype/storm/utils/DisruptorQueue.java
Expand Up @@ -51,7 +51,7 @@ public void haltProcessing() {

public void shutdown() {
_disruptor.shutdown();
_executor.shutdown();
_executor.shutdownNow();
}

static class WaiterEventHandler implements EventHandler<MutableObject> {
Expand Down

0 comments on commit 11862aa

Please sign in to comment.