Browse files

converted relevant macros to functions w/ thunks.

  • Loading branch information...
1 parent df4692c commit 205d993ba7894d1998451f98c0e97d0442d324c3 @sritchie sritchie committed Jan 23, 2012
View
1 README.markdown
@@ -29,3 +29,4 @@ You must not remove this notice, or any other, from this software.
* Michael Montano ([@michaelmontano](http://twitter.com/michaelmontano))
* Dennis Zhuang ([@killme2008](https://github.com/killme2008))
* Trevor Smith ([@trevorsummerssmith](https://github.com/trevorsummerssmith))
+* Sam Ritchie ([@sritchie09](https://github.com/sritchie09))
View
157 src/clj/backtype/storm/testing.clj
@@ -35,37 +35,38 @@
(FileUtils/forceDelete (File. t))
))))
-(defmacro with-local-tmp [[& tmp-syms] & body]
- (let [tmp-paths (mapcat (fn [t] [t `(local-temp-path)]) tmp-syms)]
- `(let [~@tmp-paths]
- (try
- ~@body
- (finally
- (delete-all ~(vec tmp-syms)))
- ))
- ))
+(defn with-local-tmp* [n afn]
+ (let [tmp-paths (take n (repeatedly local-temp-path))]
+ (try (apply afn tmp-paths)
+ (finally
+ (delete-all tmp-paths)))))
+
+(defmacro with-local-tmp
+ [[& tmp-syms] & body]
+ `(with-local-tmp* ~(count tmp-syms)
+ (fn [~@tmp-syms] ~@body)))
(defn start-simulating-time! []
(Time/startSimulating))
(defn stop-simulating-time! []
(Time/stopSimulating))
+(defn simulated-time-call [f]
+ (do (start-simulating-time!)
+ (let [ret (f)]
+ (stop-simulating-time!)
+ ret)))
+
(defmacro with-simulated-time [& body]
- `(do
- (start-simulating-time!)
- (let [ret# (do ~@body)]
- (stop-simulating-time!)
- ret#
- )))
+ `(simulated-time-call (fn [] ~@body)))
(defn advance-time-ms! [ms]
(Time/advanceTime ms))
(defn advance-time-secs! [secs]
(advance-time-ms! (* (long secs) 1000)))
-
(defnk add-supervisor [cluster-map :ports 2 :conf {} :id nil]
(let [tmp-dir (local-temp-path)
port-ids (if (sequential? ports) ports (doall (repeatedly ports (:port-counter cluster-map))))
@@ -156,7 +157,6 @@
(rmr t)
))
-
(defn wait-until-cluster-waiting
"Wait until the cluster is idle. Should be used with time simulation."
[cluster-map]
@@ -185,31 +185,35 @@
(advance-cluster-time cluster-map secs 1)
))
+(defn with-local-cluster* [f & cluster-args]
+ (let [cluster (apply mk-local-storm-cluster cluster-args)]
+ (try (f cluster)
+ (catch Throwable t
+ (log-error t "Error in cluster"))
+ (finally
+ (kill-local-storm-cluster cluster)))))
+
(defmacro with-local-cluster [[cluster-sym & args] & body]
- `(let [~cluster-sym (mk-local-storm-cluster ~@args)]
- (try
- ~@body
- (catch Throwable t#
- (log-error t# "Error in cluster")
- )
- (finally
- (kill-local-storm-cluster ~cluster-sym)))
- ))
+ `(with-local-cluster*
+ (fn [~cluster-sym] ~@body)
+ ~@args))
(defmacro with-simulated-time-local-cluster [& args]
`(with-simulated-time
- (with-local-cluster ~@args)))
+ (with-local-cluster ~@args)))
;; TODO: should take in a port symbol and find available port automatically
+
+(defn with-inprocess-zookeeper* [f port]
+ (with-local-tmp [tmp]
+ (let [zks (zk/mk-inprocess-zookeeper tmp port)]
+ (try (f)
+ (finally (zk/shutdown-inprocess-zookeeper zks))))))
+
(defmacro with-inprocess-zookeeper [port & body]
- `(with-local-tmp [tmp#]
- (let [zks# (zk/mk-inprocess-zookeeper tmp# ~port)]
- (try
- ~@body
- (finally
- (zk/shutdown-inprocess-zookeeper zks#)
- ))
- )))
+ `(with-inprocess-zookeeper*
+ (fn [] ~@body)
+ ~port))
(defn submit-local-topology [nimbus storm-name conf topology]
(.submitTopology nimbus storm-name nil (to-json conf) topology))
@@ -248,21 +252,30 @@
(existing-fn conf supervisor-id worker-id worker-thread-pids-atom)
))))
+(defmacro capture-changed-workers* [f]
+ (let [launch-captured (atom {})
+ shutdown-captured (atom {})]
+ (with-var-roots [supervisor/launch-worker (mk-capture-launch-fn launch-captured)
+ supervisor/shutdown-worker (mk-capture-shutdown-fn
+ shutdown-captured)]
+ (f)
+ {:launched @launch-captured
+ :shutdown @shutdown-captured})))
+
+(def capture-launched-workers*
+ (comp :launched capture-changed-workers*))
+
+(def capture-shutdown-workers*
+ (comp :shutdown capture-changed-workers*))
+
(defmacro capture-changed-workers [& body]
- `(let [launch-captured# (atom {})
- shutdown-captured# (atom {})]
- (with-var-roots [supervisor/launch-worker (mk-capture-launch-fn launch-captured#)
- supervisor/shutdown-worker (mk-capture-shutdown-fn shutdown-captured#)]
- ~@body
- {:launched @launch-captured#
- :shutdown @shutdown-captured#}
- )))
+ `(capture-change-workers* (fn [] ~@body)))
(defmacro capture-launched-workers [& body]
- `(:launched (capture-changed-workers ~@body)))
+ `(capture-launched-workers* (fn [] ~@body)))
(defmacro capture-shutdown-workers [& body]
- `(:shutdown (capture-changed-workers ~@body)))
+ `(capture-shutdown-workers* (fn [] ~@body)))
(defnk aggregated-stat [cluster-map storm-name stat-key :component-ids nil]
(let [state (:storm-cluster-state cluster-map)
@@ -415,32 +428,36 @@
.get
))
+(defn with-tracked-cluster* [f & cluster-args]
+ (let [id (uuid)]
+ (RegisteredGlobalState/setState
+ id
+ (doto (ConcurrentHashMap.)
+ (.put "spout-emitted" (AtomicInteger. 0))
+ (.put "transferred" (AtomicInteger. 0))
+ (.put "processed" (AtomicInteger. 0))))
+ (with-var-roots [acker/mk-acker-bolt
+ (let [old acker/mk-acker-bolt]
+ (fn [& args]
+ (BoltTracker. (apply old args) id)))
+
+ worker/mk-transfer-fn
+ (let [old worker/mk-transfer-fn]
+ (fn [& args]
+ (let [transferrer (apply old args)]
+ (fn [& transfer-args]
+ ;; (log-message "Transferring: " transfer-args#)
+ (increment-global! id "transferred")
+ (apply transferrer transfer-args)))))]
+ (apply with-local-cluster*
+ (comp f #(assoc-track-id % id))
+ cluster-args))
+ (RegisteredGlobalState/clearState id)))
+
(defmacro with-tracked-cluster [[cluster-sym & cluster-args] & body]
- `(let [id# (uuid)]
- (RegisteredGlobalState/setState id#
- (doto (ConcurrentHashMap.)
- (.put "spout-emitted" (AtomicInteger. 0))
- (.put "transferred" (AtomicInteger. 0))
- (.put "processed" (AtomicInteger. 0))))
- (with-var-roots [acker/mk-acker-bolt (let [old# acker/mk-acker-bolt]
- (fn [& args#]
- (BoltTracker. (apply old# args#) id#)
- ))
- worker/mk-transfer-fn (let [old# worker/mk-transfer-fn]
- (fn [& args#]
- (let [transferrer# (apply old# args#)]
- (fn [& transfer-args#]
- ;; (log-message "Transferring: " transfer-args#)
- (increment-global! id# "transferred")
- (apply transferrer# transfer-args#)
- ))))
- ]
- (with-local-cluster [~cluster-sym ~@cluster-args]
- (let [~cluster-sym (assoc-track-id ~cluster-sym id#)]
- ~@body)
- ))
- (RegisteredGlobalState/clearState id#)
- ))
+ `(with-tracked-cluster*
+ (fn [~cluster-sym] ~@body)
+ ~@cluster-args))
(defn tracked-wait
"Waits until topology is idle and 'amt' more tuples have been emitted by spouts."
View
25 src/clj/backtype/storm/thrift.clj
@@ -55,19 +55,24 @@
(.open transport)
[client transport] ))
+(defn with-nimbus-connection* [f host port]
+ (let [[^Nimbus$Client client ^TTransport conn] (nimbus-client-and-conn host port)]
+ (try (f client)
+ (finally (.close conn)))))
+
+(defn with-configured-nimbus-onnection* [f]
+ `(let [conf (read-storm-config)
+ host (conf NIMBUS-HOST)
+ port (conf NIMBUS-THRIFT-PORT)]
+ (with-nimbus-connection* f host port)))
+
(defmacro with-nimbus-connection [[client-sym host port] & body]
- `(let [[^Nimbus$Client ~client-sym ^TTransport conn#] (nimbus-client-and-conn ~host ~port)]
- (try
- ~@body
- (finally (.close conn#)))
- ))
+ `(with-nimbus-connection*
+ (fn [~client-sym] ~@body) ~host ~port))
(defmacro with-configured-nimbus-connection [client-sym & body]
- `(let [conf# (read-storm-config)
- host# (conf# NIMBUS-HOST)
- port# (conf# NIMBUS-THRIFT-PORT)]
- (with-nimbus-connection [~client-sym host# port#]
- ~@body )))
+ `(with-configured-nimbus-connection*
+ (fn [~client-sym] ~@body)))
(defn mk-component-common [component parallelism-hint]
(let [getter (OutputFieldsGetter.)
View
12 src/clj/backtype/storm/ui/core.clj
@@ -18,10 +18,16 @@
(def *STORM-CONF* (read-storm-config))
+(defn with-nimbus-connection* [f host port]
+ (let [[^Nimbus$Client client ^TTransport conn] (nimbus-client-and-conn host port)]
+ (try (f client)
+ (finally (.close conn)))))
+
+(defn with-nimbus* [f]
+ (thrift/with-nimbus-connection f "localhost" (*STORM-CONF* NIMBUS-THRIFT-PORT)))
+
(defmacro with-nimbus [nimbus-sym & body]
- `(thrift/with-nimbus-connection [~nimbus-sym "localhost" (*STORM-CONF* NIMBUS-THRIFT-PORT)]
- ~@body
- ))
+ `(with-nimbus* (fn [~nimbus-sym] ~@body)))
(defn get-filled-stats [summs]
(->> summs
View
17 src/clj/backtype/storm/ui/helpers.clj
@@ -10,18 +10,19 @@
[compojure.handler :as handler]))
(defn split-divide [val divider]
- [(int (/ val divider)) (mod val divider)]
- )
+ [(int (/ val divider)) (mod val divider)])
(def PRETTY-SEC-DIVIDERS
- [["s" 60]
- ["m" 60]
- ["h" 24]
- ["d" nil]])
+ [["s" 60]
+ ["m" 60]
+ ["h" 24]
+ ["d" nil]])
+
+
(def PRETTY-MS-DIVIDERS
- (cons ["ms" 1000]
- PRETTY-SEC-DIVIDERS))
+ (cons ["ms" 1000]
+ PRETTY-SEC-DIVIDERS))
(defn pretty-uptime-str* [val dividers]
(let [val (if (string? val) (Integer/parseInt val) val)
View
34 src/clj/backtype/storm/util.clj
@@ -49,17 +49,18 @@
true x))
s))
+(defn with-file-lock* [f path]
+ (let [file (doto (File. path)
+ (.createNewFile))
+ rf (RandomAccessFile. file "rw")
+ lock (-> rf .getChannel .lock)]
+ (try (f)
+ (finally
+ (.release lock)
+ (.close rf)))))
+
(defmacro with-file-lock [path & body]
- `(let [f# (File. ~path)
- _# (.createNewFile f#)
- rf# (RandomAccessFile. f# "rw")
- lock# (.. rf# (getChannel) (lock))]
- (try
- ~@body
- (finally
- (.release lock#)
- (.close rf#))
- )))
+ `(with-file-lock* (fn [] ~@body) ~path))
(defn tokenize-path [^String path]
(let [toks (.split path "/")]
@@ -470,6 +471,11 @@
(and (>= val lower)
(<= val upper)))
+(defmacro benchmark [f]
+ (time
+ (doseq [_ (range 1000000)]
+ (f))))
+
(defmacro benchmark [& body]
`(time
(doseq [i# (range 1000000)]
@@ -521,9 +527,13 @@
(defn bit-xor-vals [vals]
(reduce bit-xor 0 vals))
+(defn with-error-reaction* [try-fn error-fn]
+ (try (try-fn)
+ (catch Throwable t# (error-fn t#))))
+
(defmacro with-error-reaction [afn & body]
- `(try ~@body
- (catch Throwable t# (~afn t#))))
+ `(try (fn [] ~@body)
+ (catch Throwable t# (~afn t#))))
(defn container []
(Container.))
View
6 src/clj/zilch/mq.clj
@@ -16,10 +16,8 @@
(:refer-clojure :exclude [send])
)
-(defmacro zeromq-imports []
- '(do
- (import '[org.zeromq ZMQ ZMQ$Context ZMQ$Socket])
- ))
+(defn zeromq-imports []
+ (import '[org.zeromq ZMQ ZMQ$Context ZMQ$Socket]))
(zeromq-imports)

0 comments on commit 205d993

Please sign in to comment.