From bbeead6eab041480cd2526a8118b3b7dbcffbc98 Mon Sep 17 00:00:00 2001 From: Jim Duey Date: Sun, 25 Sep 2011 12:40:14 -0500 Subject: [PATCH] Feature complete (probably) --- src/conduit/core.clj | 443 ++++++++++++------------------------- test/conduit/test/core.clj | 116 +++++----- 2 files changed, 196 insertions(+), 363 deletions(-) diff --git a/src/conduit/core.clj b/src/conduit/core.clj index 8ffc267..801325e 100644 --- a/src/conduit/core.clj +++ b/src/conduit/core.clj @@ -6,12 +6,7 @@ (defn merge-parts [ps] (apply merge-with merge - (map :parts ps))) - -(defn map-vals [f m] - (into {} (map (fn [[k v]] - [k (f v)]) - m))) + (map (comp :parts meta) ps))) (defn abort-c [c] (c [])) @@ -28,7 +23,7 @@ (defn conduit-seq [l] "create a stream processor that emits the contents of a list regardless of what is fed to it" - {:fn (conduit-seq-fn l)}) + (conduit-seq-fn l)) (defn a-run [f] "execute a stream processor function" @@ -41,172 +36,10 @@ (cons (first y) (a-run new-f)))))) -#_( -(defn comp-fn [f1 f2] - "Link two processor functions together so that the output - of the first is fed into the second. Returns a function." - (cond - (nil? f1) f2 - (nil? f2) f1 - :else (fn a-comp [x] - (let [[x1 new1] (f1 x) - [x2 new2] (if (empty? x1) - [x1 f2] - (f2 (first x1)))] - (cond - (or (not new1) (not new2)) - [x2 nil] - - (and (= f1 new1) (= f2 new2)) - [x2 a-comp] - - :else - [x2 (comp-fn new1 new2)]))))) - -(defn ensure-vec [x-vec] - (if (vector? x-vec) - x-vec - (vec x-vec))) - -(defn nth-fn [n f] - "creates a processor function that always applies a function to the - nth element of a given input vector, returning a new vector" - (fn a-nth [x] - (let [x (ensure-vec x) - [y new-f] (f (nth x n)) - new-x (if (empty? y) - y - [(assoc x n (first y))])] - (if (= f new-f) - [new-x a-nth] - [new-x (when new-f - (nth-fn n new-f))])))) - -(defn split-results [results] - (reduce (fn [[new-xs new-fs] [new-x new-f]] - [(conj new-xs new-x) - (conj new-fs new-f)]) - [[] []] - results)) - -(defn sg-par-fn [fs xs] - (let [result-fns (doall (map #(%1 %2) fs xs)) - [new-xs new-fs] (split-results (map #(%) result-fns)) - new-x (if (some empty? new-xs) - [] - (vector (apply concat new-xs)))] - [new-x - (when (every? boolean new-fs) - (partial sg-par-fn new-fs))])) - -;; TODO: put under unit test -(defn par-fn [fs xs] - (let [[new-xs new-fs] (split-results (map #(%1 %2) fs xs)) - new-x (if (some empty? new-xs) - [] - (vector (apply concat new-xs)))] - [new-x (partial par-fn new-fs)])) - -(defn sg-loop-fn - ([body-fn prev-x curr-x] - (let [gather-fn (body-fn [prev-x curr-x])] - (fn [] - (let [[new-x new-f] (gather-fn)] - [new-x - (cond - (nil? new-f) nil - (empty? new-x) (partial sg-loop-fn new-f prev-x) - :else (partial sg-loop-fn new-f (first new-x)))])))) - ([body-fn feedback-fn prev-x curr-x] - (let [gather-fn (body-fn [prev-x curr-x])] - (fn [] - (let [[new-x new-f] (gather-fn) - [fb-x new-fb-f] (if-not (empty? new-x) - (feedback-fn (first new-x)))] - [new-x - (cond - (nil? new-f) nil - (empty? new-x) (partial sg-loop-fn new-f feedback-fn prev-x) - (nil? new-fb-f) nil - (empty? fb-x) (partial sg-loop-fn new-f new-fb-f prev-x) - :else (partial sg-loop-fn new-f new-fb-f (first fb-x)))]))))) - -(defn no-reply-select-fn [selection-map [v x]] - (let [v (if (contains? selection-map v) - v - '_)] - (if-let [f (get selection-map v)] - (let [[new-x new-f] (f x)] - [new-x (partial no-reply-select-fn - (assoc selection-map v new-f))]) - [[] (partial no-reply-select-fn selection-map)]))) - -(defn scatter-gather-select-fn [selection-map [v x]] - (if-let [f (if (contains? selection-map v) - (get selection-map v) - (get selection-map '_))] - (let [gather-fn (f x)] - (fn [] - (let [[new-x new-f] (gather-fn)] - [new-x (when new-f - (partial scatter-gather-select-fn - (assoc selection-map v new-f)))]))) - [[] (partial scatter-gather-select-fn selection-map)])) - -;; TODO: make this execute in a future -(defn scatter-gather-comp [f sg x] - (let [[new-x new-f] (f x)] - (if (empty? new-x) - (fn [] - [[] (when new-f - (partial scatter-gather-comp - new-f - sg))]) - (let [gather-fn (sg (first new-x))] - (fn [] - (let [[newer-x new-sg] (gather-fn)] - [newer-x (when (and new-f new-sg) - (partial scatter-gather-comp - new-f - new-sg))])))))) - -(defn scatter-gather-nth [n sg x] - (let [x (ensure-vec x) - gather-fn (sg (nth x n))] - (fn [] - (let [[y new-sg] (gather-fn)] - (if (empty? y) - [[] (when new-sg - (partial scatter-gather-nth - n - new-sg))] - [[(assoc x n (first y))] - (when new-sg - (scatter-gather-nth n new-sg))]))))) - -(defn a-par-scatter-gather [sgs x] - (let [x (ensure-vec x) - gather-fns (doall - (map #(%1 %2) - sgs - x))] - - (fn [] - (let [[new-xs new-sgs] (split-results - (map #(%) gather-fns)) - new-x (if (some empty? new-xs) - [] - (vector (apply concat new-xs)))] - [new-x - (when (every? boolean new-sgs) - (partial a-par-scatter-gather - new-sgs))])))) - ) - -(defn wait-for-reply [{f :fn} x] +(defn wait-for-reply [f x] ((second (f x)) identity)) -(defn enqueue [{f :fn} & xs] +(defn enqueue [f & xs] (loop [[x & xs] xs f f] (when x @@ -227,44 +60,41 @@ fs)] [(when-not (some nil? new-fs) (comp-fn new-fs)) - (fn [c] - (when c - (new-c c)))]))) + new-c]))) (defn nth-fn [n f] (fn curr-fn [xs] (if (<= (count xs) n) [curr-fn abort-c] - (let [[new-f new-c] (f (nth xs n)) - y (new-c identity)] + (let [[new-f new-c] (f (nth xs n))] [(nth-fn n new-f) - (if (empty? y) - abort-c - (fn [c] - (when c - (c [(assoc xs n (first y))]))))])))) + (fn [c] + (if (nil? c) + (new-c nil) + (let [y (new-c identity)] + (if (empty? y) + (c []) + (c [(assoc xs n (first y))])))))])))) + +(defn gather-fn [[fs ys] [f y]] + [(conj fs f) (conj ys y)]) (defn par-fn [fs] (fn curr-fn [xs] - (if (not= (count xs) (count fs)) - [curr-fn abort-c] - (let [futs (doall - (map (fn [f x] - (future (let [[new-f c] (f x) - y (c identity)] - [new-f y]))) - fs xs)) - fs-and-ys (map deref futs) - [new-fs ys] (reduce (fn [[fs ys] [f y]] - [(conj fs f) (conj ys y)]) - [[] []] - fs-and-ys)] - [(par-fn new-fs) - (if (some empty? ys) - abort-c + (if (not= (count xs) (count fs)) + [curr-fn abort-c] + (let [[new-fs cs] (reduce gather-fn + [[] []] + (map #(%1 %2) fs xs))] + [(par-fn new-fs) (fn [c] - (when c - (c [(apply concat ys)]))))])))) + (if (nil? c) + (doseq [c cs] + (c nil)) + (let [ys (map #(% identity) cs)] + (if (some empty? ys) + (c []) + (c [(apply concat ys)])))))])))) (defn select-fn [selection-map] (fn curr-fn [[v x]] @@ -301,58 +131,66 @@ (defarrow conduit [a-arr (fn [f] - {:created-by :a-arr - :args f - :fn (fn a-arr [x] - (let [y (f x)] - [a-arr (fn [c] - (when c - (c [y])))]))}) + (with-meta + (fn a-arr [x] + (let [y (f x)] + [a-arr (fn [c] + (when c + (c [y])))])) + {:created-by :a-arr + :args f})) a-comp (fn [& ps] - {:parts (merge-parts ps) - :created-by :a-comp - :args ps - :fn (if (< (count ps) 2) - (:fn (first ps)) - (comp-fn (map :fn ps)))}) + (with-meta + (if (< (count ps) 2) + (first ps) + (comp-fn ps)) + {:parts (merge-parts ps) + :created-by :a-comp + :args ps})) a-nth (fn [n p] - {:parts (:parts p) - :created-by :a-nth - :args [n p] - :fn (nth-fn n (:fn p))}) + (with-meta + (nth-fn n p) + {:parts (:parts p) + :created-by :a-nth + :args [n p]})) a-par (fn [& ps] - {:created-by :a-par - :args ps - :parts (merge-parts ps) - :fn (par-fn (map :fn ps))}) + (with-meta + (par-fn ps) + {:created-by :a-par + :args ps + :parts (merge-parts ps)})) a-all (fn [& ps] - (assoc (a-comp (a-arr (partial repeat (count ps))) + (with-meta + (a-comp (a-arr (partial repeat (count ps))) (apply a-par ps)) - :created-by :a-all - :args ps)) + {:created-by :a-all + :args ps})) a-select (fn [& vp-pairs] (let [pair-map (apply hash-map vp-pairs)] - {:created-by :a-select - :args pair-map - :parts (merge-parts (vals pair-map)) - :fn (select-fn (map-vals :fn pair-map))})) + (with-meta + (select-fn pair-map) + {:created-by :a-select + :args pair-map + :parts (merge-parts (vals pair-map))}))) a-loop (fn ([p initial-value] - {:created-by :a-loop - :args [p initial-value] - :parts (:parts p) - :fn (loop-fn (:fn p) initial-value)}) + (with-meta + (loop-fn p initial-value) + {:created-by :a-loop + :args [p initial-value] + :parts (:parts p)})) ([p initial-value fb-p] - {:created-by :a-loop - :args [p initial-value fb-p] - :parts (:parts p) - :fn (loop-fn (:fn p) (:fn fb-p) initial-value)})) + (with-meta + (loop-fn p fb-p initial-value) + {:created-by :a-loop + :args [p initial-value fb-p] + :parts (:parts p)}))) ]) (def a-arr (conduit :a-arr)) @@ -366,8 +204,7 @@ (defn conduit-map [p l] (if (empty? l) l - (a-run (comp-fn [(:fn (conduit-seq l)) - (:fn p)])))) + (a-run (comp-fn [(conduit-seq l) p])))) (def pass-through (a-arr identity)) @@ -400,89 +237,97 @@ (let [[new-catch c] (catch-f [e x])] [(a-catch f new-catch) c]) (throw e))))))] - {:parts (:parts p) - :fn (a-catch (:fn p) - (:fn catch-p)) - :created-by :a-catch - :args [class p catch-p]}))) + (with-meta + (a-catch p catch-p) + {:parts (:parts p) + :created-by :a-catch + :args [class p catch-p]})))) (defn a-finally [p final-p] - (letfn [(a-finally [f final-f x] - (try - (let [[new-f c] (f x)] - [(a-finally f final-f) c]) - (finally - (final-f x))))] - {:parts (:parts p) - :fn (a-finally (:fn p) - (:fn final-p)) - :created-by :a-finally - :args [p final-p]})) + (letfn [(a-finally [f final-f] + (fn [x] + (try + (let [[new-f c] (f x)] + [(a-finally new-f final-f) c]) + (finally + (final-f x)))))] + (with-meta + (a-finally p final-p) + {:parts (:parts p) + :created-by :a-finally + :args [p final-p]}))) (defmacro def-arr [name args & body] `(def ~name (a-arr (fn ~name ~args ~@body)))) (defn a-filter [f] - {:create-by :a-filter - :args f - :fn (fn curr-fn [x] - (if (f x) - [curr-fn (fn [c] - (when c - (c [x])))]))}) + (with-meta + (fn curr-fn [x] + (if (f x) + [curr-fn (fn [c] + (when c + (c [x])))] + [curr-fn abort-c])) + {:create-by :a-filter + :args f})) + +(defn tap [p] + (fn [x] + (let [[new-f new-c] (p x)] + (new-c nil) + [new-f (fn [c] + (when c + (c [x])))]))) (defn disperse [p] - (let [reply-fn (fn disperse [xs] - (if (seq xs) - (let [new-x (a-run (comp-fn (conduit-seq-fn xs) - (:reply p)))] - [[new-x] disperse]) - [[[]] disperse]))] + (with-meta + (fn curr-fn [xs] + (if (empty? xs) + [curr-fn (fn [c] + (when c + (c [xs])))] + (let [[new-f cs] (reduce (fn [[new-f cs] x] + (let [[new-f c] (new-f x)] + [new-f (conj cs c)])) + [p []] + xs)] + [(disperse new-f) (fn [c] + (if (nil? c) + (doseq [c cs] + (c nil)) + (let [ys (map #(% identity) cs)] + (if (some empty? ys) + (c []) + (c [(apply concat ys)])))))]))) {:created-by :disperse :args p - :parts (:parts p) - :reply reply-fn - :no-reply (fn disperse-final [xs] - [[(doall (conduit-map p xs))] - disperse-final]) - :scatter-gather (fn [xs] - (partial reply-fn xs))})) - -#_( + :parts (:parts p)})) + (defn test-conduit [p] - (binding [*testing-conduit* true] + (let [args (:args (meta p))] (condp = (:created-by p) nil p - :conduit-proc (conduit-proc (:args p)) - :a-arr (a-arr (:args p)) - :a-comp (apply a-comp (map test-conduit (:args p))) - :a-par (apply a-par (map test-conduit (:args p))) - :a-all (apply a-all (map test-conduit (:args p))) + :a-arr (a-arr args) + :a-comp (apply a-comp (map test-conduit args)) + :a-nth (apply a-nth (map test-conduit args)) + :a-par (apply a-par (map test-conduit args)) + :a-all (apply a-all (map test-conduit args)) :a-select (apply a-select (mapcat (fn [[k v]] [k (test-conduit v)]) - (:args p))) - :a-loop (let [[bp iv fb] (:args p)] + args)) + :a-loop (let [[bp iv fb] args] (if fb (a-loop (test-conduit bp) iv (test-conduit fb)) (a-loop (test-conduit bp) iv))) - :a-except (apply a-except (map test-conduit (:args p))) - :a-catch (apply a-catch (first (:args p)) - (map test-conduit (rest (:args p)))) - :a-finally (apply a-finally (map test-conduit (:args p))) - :disperse (disperse (test-conduit (:args p)))))) + :a-catch (apply a-catch (first args) + (map test-conduit (rest args))) + :a-finally (apply a-finally (map test-conduit args)) + :a-filter p + :disperse (disperse (test-conduit args))))) (defn test-conduit-fn [p] - (comp first (:reply (test-conduit p)))) - -(defn debug-proc [label p] - (a-comp (a-arr (fn [x] - (println label "received:" x) - x)) - p - (a-arr (fn [x] - (println label "produced:" x) - x)))) - ) + (fn [x] + ((second (p x)) identity))) diff --git a/test/conduit/test/core.clj b/test/conduit/test/core.clj index 672077c..a10c1d1 100644 --- a/test/conduit/test/core.clj +++ b/test/conduit/test/core.clj @@ -13,33 +13,6 @@ [(test-list-iter (rest l)) (fn [c] (c (first l)))]))) -(deftest test-conduit-map - (is (empty? (conduit-map pl []))) - (is (empty? (conduit-map pl nil))) - (is (= (range 10) - (conduit-map pass-through - (range 10))))) - -#_( -(deftest test-test-conduit - (def-proc bogus [x] - [(inc x)]) - (def tf (test-conduit bogus))) - -(deftest test-disperse - (def make-and-dec (a-comp (a-arr range) - (disperse - (a-arr dec)))) - (is (= [[] - [-1] - [-1 0] - [-1 0 1] - [-1 0 1 2] - [-1 0 1 2 3]] - (conduit-map make-and-dec - (range 6))))) - ) - (deftest test-a-run (testing "a-run" (testing "should ignore empty values" @@ -59,17 +32,24 @@ (is (= [:a :b :c] (a-run (conduit-seq-fn [:a :b :c]))))) +(def pl (a-arr inc)) +(def t2 (a-arr (partial * 2))) +(def flt (fn this-fn [x] + (if (odd? x) + [this-fn abort-c] + [this-fn (fn [c] (c [x]))]))) + +(deftest test-conduit-map + (is (empty? (conduit-map pl []))) + (is (empty? (conduit-map pl nil))) + (is (= (range 10) + (conduit-map pass-through + (range 10))))) + (deftest test-conduit-seq (is (= [:a :b :c] (conduit-map (conduit-seq [:a :b :c]) (range 0 5))))) -(def pl (a-arr inc)) -(def t2 (a-arr (partial * 2))) -(def flt {:fn (fn this-fn [x] - (if (odd? x) - [this-fn abort-c] - [this-fn (fn [c] (c [x]))]))}) - (deftest test-a-arr (is (= (range 1 6) (conduit-map pl (range 5)))) @@ -88,9 +68,9 @@ (deftest test-a-nth (let [tf (a-nth 0 pl) - tn (a-nth 1 {:fn (fn this-fn [x] - [this-fn (fn [c] - (c [3]))])})] + tn (a-nth 1 (fn this-fn [x] + [this-fn (fn [c] + (c [3]))]))] (is (= [[4 5]] (conduit-map tf [[3 5]]))) @@ -108,7 +88,7 @@ tp1 (a-par (conduit-seq [:a :b :c]) pl - {:fn (test-list-iter [[1] [] [2]])})] + (test-list-iter [[1] [] [2]]))] (is (= [[:a 4 10] [:b 4 10] [:c 4 10]] (conduit-map tp [[99 3 5] [98 3 5] [97 3 5]]))) @@ -123,16 +103,16 @@ (deftest test-a-select (let [tc (a-select - :oops {:fn (fn [x] - [nil abort-c])} + :oops (fn [x] + [nil abort-c]) true pl false t2)] (is (= [9 6] (conduit-map tc [[:oops 83] [true 8] [:bogus 100] [false 3]])))) (let [tc (a-select - :oops {:fn (fn [x] - [nil abort-c])} + :oops (fn [x] + [nil abort-c]) true pl false t2 '_ pass-through)] @@ -231,41 +211,49 @@ (deftest test-a-finally (let [main-count (atom 0) - secondary-count (atom 0) finally-count (atom 0) te (a-arr (fn [x] (when (even? x) (throw (Exception. "An even int"))) (swap! main-count inc) (* 2 x))) - x (assoc te - :scatter-gather (fn this-fn [x] - (when (zero? (mod x 3)) - (swap! main-count inc) - (throw (Exception. "Div by 3"))) - (fn [] - (when (even? x) - (swap! secondary-count inc) - (throw (Exception. "Even!!!"))) - [[(* 10 x)] this-fn]))) - tx (a-finally te (a-arr (fn [x] - (swap! finally-count inc) - x))) - ty (a-finally x (a-arr (fn [x] - (swap! finally-count inc) - x))) - tf (a-except tx (a-arr (constantly nil))) - tz (a-except ty (a-arr (fn [[_ x]] x)))] + x (a-arr (fn this-fn [x] + (when (zero? (mod x 3)) + (swap! main-count inc) + (throw (Exception. "Div by 3"))) + (* 10 x))) + fin-fn (a-arr (fn [x] + (swap! finally-count inc))) + tx (a-finally te fin-fn) + ty (a-finally x fin-fn) + tf (a-catch tx (a-arr (constantly nil))) + tz (a-catch ty (a-arr (fn [[_ x]] x)))] (is (= [nil 2 nil 6 nil] (conduit-map tf (range 5)))) (is (= 2 @main-count)) (is (= 5 @finally-count)) (reset! main-count 0) - (reset! secondary-count 0) - (is (= [[0 0] [10 10] [2 2] [3 3] [4 4] [50 50]] + (is (= [[0 0] [10 10] [20 20] [3 3] [40 40] [50 50]] (conduit-map (a-comp (a-all tz tz) pass-through) - (range 6)))))) + (range 6)))) + (is (= 4 @main-count)))) + +(deftest test-disperse + (let [make-and-dec (a-comp (a-arr range) + (disperse + (a-arr dec)))] + (is (= [[] + [-1] + [-1 0] + [-1 0 1] + [-1 0 1 2] + [-1 0 1 2 3]] + (conduit-map make-and-dec + (range 6)))))) + +(deftest test-text-conduit-fn + (is (= 5 (first ((test-conduit-fn pl) 4))))) (run-tests)