Permalink
Browse files

simplifies reduce step

  • Loading branch information...
1 parent 5c185ac commit f23004859ae67f84153426567087e9b2a38b82c0 @videlalvaro committed Aug 20, 2010
Showing with 35 additions and 32 deletions.
  1. +35 −32 run.clj
View
@@ -16,43 +16,44 @@
(defn dummy-reply []
(closerl/otp-list
(closerl/otp-tuple (closerl/otp-binary "a") (closerl/otp-long 1))))
-
-(defn map-fn [words]
- (do
- (println words)
- (map (fn [v] (closerl/otp-tuple (closerl/otp-binary v) (closerl/otp-long 1)))
- words)
- ))
-(defn remove-not-found [v]
- (try
- (= (first (first (second v))) "not_found")
- (catch Exception _ false)
- ))
-
-(defn reducer [vs]
- (let [the-vals (apply concat vs)]
- (do
- (println the-vals)
- (reduce (fn [m v]
- (assoc m (first v) (+ (get m (first v) 0) (second v)))) {}
- the-vals
- ))))
+(defn remove-not-found [items]
+ (remove (fn [v]
+ (try
+ (= (first (first (second v))) "not_found")
+ (catch Exception _ false)
+ )) items))
+
+(defn as-proplist [items key-fn val-fn]
+ (map (fn [k v] (closerl/otp-tuple (key-fn k) (val-fn v))) (keys items) (vals items)))
(defn red-fn [vs]
- (let [tm (reducer (remove remove-not-found vs))]
- (map (fn [k v] (closerl/otp-tuple (closerl/otp-binary k) (closerl/otp-long v))) (keys tm) (vals tm))))
+ (let [v1 (apply concat (remove-not-found vs))
+ v2 (reduce (fn [m v] (assoc m (first v) (+ (get m (first v) 0) (second v)))) {} v1)]
+ (as-proplist v2 closerl/otp-binary closerl/otp-long)))
+
+(defn error-reply [msg]
+ (closerl/otp-tuple (closerl/otp-atom "error") (closerl/otp-string msg)))
+
+(defn wrap-map-reply [reply]
+ (apply closerl/otp-list reply))
+
+(defn wrap-red-reply [reply]
+ (closerl/otp-tuple (closerl/otp-atom "struct") (apply closerl/otp-list reply)))
-(defn get-reply [command data]
+(defn get-reply [command data myfn]
(do
(println (str "got data: " data " command: " command))
- (if (= command "map")
- (apply closerl/otp-list (map-fn (re-seq #"\w+" (first data))))
- (closerl/otp-tuple (closerl/otp-atom "struct") (apply closerl/otp-list (red-fn data)))
- )))
+ (cond
+ (= command "map") (wrap-map-reply ((eval (read-string myfn)) data))
+ (= command "red") (wrap-red-reply (red-fn data))
+ :else (error-reply (str "Unknown command: " command)))))
(defn receive-riak-request [mbox]
- (closerl/proplist-to-map (closerl/otp-value (closerl/otp-receive mbox))))
+ (let [msg (closerl/otp-receive mbox)]
+ (do
+ (println (str "got message: " msg))
+ (closerl/proplist-to-map (closerl/otp-value msg)))))
(defn reply-to-riak [mbox pid reply]
(do
@@ -61,9 +62,11 @@
(closerl/otp-tuple (closerl/otp-atom "clj") (closerl/otp-list reply)))))
(defn mapred-server [mbox]
- (let [{pid :pid, command :command, r-value :r-value} (receive-riak-request mbox)]
- (reply-to-riak mbox pid (get-reply command r-value))
- (recur mbox)))
+ (let [{:keys [pid, command, r-value, clj-map-fn], :or {clj-map-fn "(fn [v] v)"}} (receive-riak-request mbox)]
+ (do
+ (println clj-map-fn)
+ (reply-to-riak mbox pid (get-reply command r-value clj-map-fn)))
+ (recur mbox)))
(println "running m/r server")
(mapred-server mbox)

0 comments on commit f230048

Please sign in to comment.