Permalink
Browse files

Throw with helpful error message when pipe argument is missing.

  • Loading branch information...
1 parent eeb4957 commit efe8a33f369f3aa02d2ef49b606490af5d40b0ae @mmcgrana mmcgrana committed Mar 17, 2010
Showing with 45 additions and 23 deletions.
  1. +39 −23 src/clj/cascading/clojure/api.clj
  2. +6 −0 test/cascading/clojure/api_test.clj
@@ -48,25 +48,39 @@
([#^String name]
(Pipe. name)))
-(defn filter [#^Pipe previous & args]
- (let [opts (parse-args args)]
- (Each. previous (:< opts)
+(defn ensure-pipe
+ "Ensures that the argument is a pipe and returns it, with an appropriate
+ type hint."
+ {:tag Pipe}
+ [pipe]
+ (if (instance? Pipe pipe)
+ pipe
+ (throw (IllegalArgumentException.
+ (str "Expected a pipe but got: " pipe)))))
+
+(defn filter [previous & args]
+ (let [previous (ensure-pipe previous)
+ opts (parse-args args)]
+ (Each. previous #^Fields (:< opts)
(ClojureFilter. (:fn-spec opts)))))
-(defn mapcat [#^Pipe previous & args]
- (let [opts (parse-args args)]
- (Each. previous (:< opts)
- (ClojureMapcat. (:fn> opts) (:fn-spec opts)) (:> opts))))
+(defn mapcat [previous & args]
+ (let [previous (ensure-pipe previous)
+ opts (parse-args args)]
+ (Each. previous #^Fields (:< opts)
+ (ClojureMapcat. (:fn> opts) (:fn-spec opts)) #^Fields (:> opts))))
-(defn map [#^Pipe previous & args]
- (let [opts (parse-args args)]
- (Each. previous (:< opts)
- (ClojureMap. (:fn> opts) (:fn-spec opts)) (:> opts))))
+(defn map [previous & args]
+ (let [previous (ensure-pipe previous)
+ opts (parse-args args)]
+ (Each. previous #^Fields (:< opts)
+ (ClojureMap. (:fn> opts) (:fn-spec opts)) #^Fields (:> opts))))
-(defn extract [#^Pipe previous & args]
+(defn extract [previous & args]
"A map operation that extracts a new field, thus returning Fields/ALL."
- (let [opts (parse-args args)]
- (Each. previous (:< opts)
+ (let [previous (ensure-pipe previous)
+ opts (parse-args args)]
+ (Each. previous #^Fields (:< opts)
(ClojureMap. (:fn> opts) (:fn-spec opts)) Fields/ALL)))
(defn agg [f init]
@@ -75,15 +89,17 @@
([x] [x])
([x y] (f x y))))
-(defn aggregate [#^Pipe previous & args]
- (let [opts (parse-args args)]
- (Every. previous (:< opts)
- (ClojureAggregator. (:fn> opts) (:fn-spec opts)) (:> opts))))
-
-(defn buffer [#^Pipe previous & args]
- (let [opts (parse-args args)]
- (Every. previous (:< opts)
- (ClojureBuffer. (:fn> opts) (:fn-spec opts)) (:> opts))))
+(defn aggregate [previous & args]
+ (let [previous (ensure-pipe previous)
+ opts (parse-args args)]
+ (Every. previous #^Fields (:< opts)
+ (ClojureAggregator. (:fn> opts) (:fn-spec opts)) #^Fields (:> opts))))
+
+(defn buffer [previous & args]
+ (let [previous (ensure-pipe previous)
+ opts (parse-args args)]
+ (Every. previous #^Fields (:< opts)
+ (ClojureBuffer. (:fn> opts) (:fn-spec opts)) #^Fields (:> opts))))
(defn group-by
([previous group-fields]
@@ -55,6 +55,12 @@
(is (instance? Pipe np))
(is (= "foo" (.getName np)))))
+(deftest test-ensure-pipe
+ (let [p (c/pipe "foo")]
+ (is (= p (c/ensure-pipe p)))
+ (is (thrown-with-msg? IllegalArgumentException #"Expected.*"
+ (c/ensure-pipe "foo")))))
+
(deftest test-clojure-filter
(let [fil (ClojureFilter. (p/parse-fn-spec #'odd?))]
(is (= false (t/invoke-filter fil [1])))

0 comments on commit efe8a33

Please sign in to comment.