Skip to content

Commit

Permalink
Merge pull request ReactiveX#344 from daveray/covariant-support-clj
Browse files Browse the repository at this point in the history
Update Clojure interop to support new OnSubscribeFunc with rx/fn.
  • Loading branch information
benjchristensen committed Sep 4, 2013
2 parents 80cd791 + 974856c commit 00b199a
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@
(let [f-name (gensym "rc")]
`(let [~f-name ~f]
(reify
; If they want Func1, give them onSubscribe as well so Observable/create can be
; used seemlessly with rx/fn.
~@(if (and (= prefix "rx.util.functions.Func")
(some #{1} arities))
`(rx.Observable$OnSubscribeFunc
(~'onSubscribe [~'this observer#]
(~f-name observer#))))

~@(mapcat (clojure.core/fn [n]
(let [ifc-sym (symbol (str prefix n))
arg-syms (map #(symbol (str "v" %)) (range n))]
Expand All @@ -31,6 +39,10 @@
If the f has the wrong arity, an ArityException will be thrown at runtime.
This will also implement rx.Observable$OnSubscribeFunc.onSubscribe for use with
Observable/create. In this case, the function must take an Observable as its single
argument and return a subscription object.
Example:
(.reduce my-numbers (rx/fn* +))
Expand Down Expand Up @@ -64,6 +76,15 @@
(.map my-observable (rx/fn [a] (* 2 a)))
or, to create an Observable:
(Observable/create (rx/fn [observer]
(.onNext observer 10)
(.onCompleted observer)
(Subscriptions/empty)))
See:
rx.lang.clojure.interop/fn*
"
[& fn-form]
; preserve metadata so type hints work
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
(deftest test-fn*
(testing "implements Func0-9"
(let [f (rx/fn* vector)]
(is (instance? rx.Observable$OnSubscribeFunc f))
(is (instance? rx.util.functions.Func0 f))
(is (instance? rx.util.functions.Func1 f))
(is (instance? rx.util.functions.Func2 f))
Expand Down Expand Up @@ -113,6 +114,14 @@

(deftest test-basic-usage

(testing "can create an observable"
(is (= 99
(-> (Observable/create (rx/fn [^rx.Observer o]
(.onNext o 99)
(.onCompleted o)
(rx.subscriptions.Subscriptions/empty)))
(BlockingObservable/single)))))

(testing "can pass rx/fn to map and friends"
(is (= (+ 1 4 9)
(-> (Observable/from [1 2 3])
Expand Down

0 comments on commit 00b199a

Please sign in to comment.