Permalink
Browse files

starting on revamped clojure dsl

  • Loading branch information...
1 parent 195ddaf commit 04002f1d1c94685a68f459b298e3e1bdd080f003 @nathanmarz committed Mar 26, 2012
Showing with 91 additions and 12 deletions.
  1. +91 −12 src/clj/backtype/storm/clojure.clj
@@ -1,5 +1,6 @@
(ns backtype.storm.clojure
(:use [clojure.contrib.def :only [defnk defalias]])
+ (:use [clojure.contrib.seq-utils :only [find-first]])
(:use [backtype.storm bootstrap util])
(:import [backtype.storm StormSubmitter])
(:import [backtype.storm.generated StreamInfo])
@@ -41,24 +42,99 @@
(concat [name args] impl)
))
-(defmacro bolt [& body]
- (let [[bolt-fns other-fns] (split-with #(not (symbol? %)) body)
- fns (normalize-fns bolt-fns)]
- `(reify IBolt
+(defn mk-concise-reify [klass body]
+ (let [[reify-fns other-fns] (split-with #(not (symbol? %)) body)
+ fns (normalize-fns reify-fns)]
+ `(reify ~klass
~@fns
~@other-fns)))
-(defmacro bolt-execute [& body]
- `(bolt
- (~'execute ~@body)))
+(defmacro bolt [& body]
+ (mk-concise-reify 'IBolt body))
(defmacro spout [& body]
- (let [[spout-fns other-fns] (split-with #(not (symbol? %)) body)
- fns (normalize-fns spout-fns)]
- `(reify ISpout
- ~@fns
- ~@other-fns)))
+ (mk-concise-reify 'ISpout body))
+{:keys [url service_description]
+ :or {service_description "Ganglia Web Frontend"}
+ :as options}
+
+(defn gen-prep-args [klass-sym prep-sym collector-sym]
+ (let [params (->> klass-sym
+ resolve
+ .getMethods
+ (filter #(= (.getName %) (str prep-sym)))
+ first
+ .getParameterTypes)]
+ (for [p params]
+ (if (.contains (.getSimpleName p) "Collector")
+ collector-sym
+ (gensym)
+ ))))
+
+(defn mk-delegate-class [name main-interface prepare-method delegate-maker committer?]
+ (let [methods (-> main-interface resolve .getMethods)
+ interfaces (if committer? [main-interface 'ICommitter] [main-interface])
+ prefix (str name "-")
+ implementations (for [m methods]
+ (let []
+ `(defn ~(str prefix (.getName m))
+ )))]
+ `(do
+ ;; TODO: Might need to auto-ns qualify name
+ ;; need a constructor whichs stores an object reference
+ ;; need to type hint the object as "main-interface" to avoid reflection
+ (gen-class
+ :name ~name
+ :implements ~interfaces
+ :prefix ~prefix)
+ ~@implementations
+ )))
+
+;; TODO: need something else that will auto-gen "bolt", "spout"
+;; want it to work for batchbolt, richbolt, richspout, basicbolt
+;; need to be able to add extra interfaces
+(defn mk-executor [main-interface ;; e.g., IRichBolt, IRichSpout
+ prepare-method
+ reify-sym
+ [name output-spec & [opts & impl :as all]]
+ & {:keys [default-method prepare-default]
+ :or [prepare-default true]
+ :as executor-opts}]
+ (if-not (map? opts)
+ (apply mk-executor main-interface prepare-method reify-sym
+ (concat [name output-spec {}] all)
+ (mapcat identity executor-opt))
+ (let [worker-name (symbol (str name "__"))
+ conf-fn-name (symbol (str name "__conf__"))
+ params (:params opts)
+ conf-code (:conf opts)
+ committer? (:committer opts) ;; kind of a hack...
+ prepare? (if (contains? opts :prepare) (:prepare opts) prepare-default)
+ fn-body (if prepare?
+ (cons 'fn impl)
+ (let [[args & impl-body] impl
+ coll-sym (last args)
+ args (butlast args)
+ prepargs (gen-prep-args main-interface prepare-method coll-sym)]
+ `(fn ~(vec prepargs) (~reify-sym (~default-method ~(vec args) ~@impl-body)))))
+ definer (if params
+ `(defn ~name [& args#]
+ (clojure-bolt ~output-spec ~worker-name ~conf-fn-name args#))
+ `(def ~name
+ (clojure-bolt ~output-spec ~worker-name ~conf-fn-name []))
+ )
+ ]
+ `(do
+ ;; TODO: need to do a gen-class for this object using "name"
+ (defn ~conf-fn-name ~(if params params [])
+ ~conf-code
+ )
+ (defn ~worker-name ~(if params params [])
+ ~fn-body
+ )
+ ~definer
+ ))))
(defmacro defbolt [name output-spec & [opts & impl :as all]]
(if-not (map? opts)
`(defbolt ~name ~output-spec {} ~@all)
@@ -71,6 +147,9 @@
(let [[args & impl-body] impl
coll-sym (nth args 1)
args (vec (take 1 args))
+ ;; unify by taking collector as last arg
+ ;; (batch bolt should force prepare...)
+ ;; need options to implement other interfaces
prepargs [(gensym "conf") (gensym "context") coll-sym]]
`(fn ~prepargs (bolt (~'execute ~args ~@impl-body)))))
definer (if params

0 comments on commit 04002f1

Please sign in to comment.