Permalink
Browse files

merge.

  • Loading branch information...
2 parents 432f3dd + efe8a33 commit 49404627b8daf79eadb29612760fabe177027a86 @waratuman committed Mar 18, 2010
View
@@ -9,6 +9,7 @@
[clj-json "0.2.0"]
[clj-serializer "0.1.0"]]
:dev-dependencies [[org.clojars.mmcgrana/lein-javac "0.1.0"]
+ [lein-clojars "0.5.0"]
[swank-clojure "1.1.0"]]
:namespaces [cascading.clojure.api
cascading.clojure.testing])
@@ -15,6 +15,7 @@
LeftJoin RightJoin MixedJoin)
(cascading.scheme Scheme)
(cascading.tap Hfs Lfs Tap)
+ (cascading.cascade Cascade)
(org.apache.hadoop.io Text)
(org.apache.hadoop.mapred TextInputFormat TextOutputFormat
OutputCollector JobConf)
@@ -47,25 +48,39 @@
([#^String name]
(Pipe. name)))
-(defn filter [#^Pipe previous & args]
- (let [opts (parse-args args)]
- (Each. previous (:< opts)
+(defn ensure-pipe
+ "Ensures that the argument is a pipe and returns it, with an appropriate
+ type hint."
+ {:tag Pipe}
+ [pipe]
+ (if (instance? Pipe pipe)
+ pipe
+ (throw (IllegalArgumentException.
+ (str "Expected a pipe but got: " pipe)))))
+
+(defn filter [previous & args]
+ (let [previous (ensure-pipe previous)
+ opts (parse-args args)]
+ (Each. previous #^Fields (:< opts)
(ClojureFilter. (:fn-spec opts)))))
-(defn mapcat [#^Pipe previous & args]
- (let [opts (parse-args args)]
- (Each. previous (:< opts)
- (ClojureMapcat. (:fn> opts) (:fn-spec opts)) (:> opts))))
+(defn mapcat [previous & args]
+ (let [previous (ensure-pipe previous)
+ opts (parse-args args)]
+ (Each. previous #^Fields (:< opts)
+ (ClojureMapcat. (:fn> opts) (:fn-spec opts)) #^Fields (:> opts))))
-(defn map [#^Pipe previous & args]
- (let [opts (parse-args args)]
- (Each. previous (:< opts)
- (ClojureMap. (:fn> opts) (:fn-spec opts)) (:> opts))))
+(defn map [previous & args]
+ (let [previous (ensure-pipe previous)
+ opts (parse-args args)]
+ (Each. previous #^Fields (:< opts)
+ (ClojureMap. (:fn> opts) (:fn-spec opts)) #^Fields (:> opts))))
-(defn extract [#^Pipe previous & args]
+(defn extract [previous & args]
"A map operation that extracts a new field, thus returning Fields/ALL."
- (let [opts (parse-args args)]
- (Each. previous (:< opts)
+ (let [previous (ensure-pipe previous)
+ opts (parse-args args)]
+ (Each. previous #^Fields (:< opts)
(ClojureMap. (:fn> opts) (:fn-spec opts)) Fields/ALL)))
(defn agg [f init]
@@ -74,15 +89,17 @@
([x] [x])
([x y] (f x y))))
-(defn aggregate [#^Pipe previous & args]
- (let [opts (parse-args args)]
- (Every. previous (:< opts)
- (ClojureAggregator. (:fn> opts) (:fn-spec opts)) (:> opts))))
+(defn aggregate [previous & args]
+ (let [previous (ensure-pipe previous)
+ opts (parse-args args)]
+ (Every. previous #^Fields (:< opts)
+ (ClojureAggregator. (:fn> opts) (:fn-spec opts)) #^Fields (:> opts))))
-(defn buffer [#^Pipe previous & args]
- (let [opts (parse-args args)]
- (Every. previous (:< opts)
- (ClojureBuffer. (:fn> opts) (:fn-spec opts)) (:> opts))))
+(defn buffer [previous & args]
+ (let [previous (ensure-pipe previous)
+ opts (parse-args args)]
+ (Every. previous #^Fields (:< opts)
+ (ClojureBuffer. (:fn> opts) (:fn-spec opts)) #^Fields (:> opts))))
(defn group-by
([previous group-fields]
@@ -229,3 +246,10 @@
(defn exec [#^Flow flow]
(doto flow .start .complete))
+
+(defn cascade [& args]
+ (let [casc (cascading.cascade.CascadeConnector.)]
+ (.connect casc (into-array args))))
+
+(defn run-cascade [#^Cascade c]
+ (.run c))
@@ -29,7 +29,8 @@
(ns-fn-name-pair (first v-or-c))
(next v-or-c))]
:else
- (throw (IllegalArgumentException. (str v-or-c)))))
+ (throw (IllegalArgumentException.
+ (str "Expected a var or coll, but got: " v-or-c)))))
(defn parse-fn-spec
"fn-var-or-coll => var or [var & params]
@@ -55,6 +55,12 @@
(is (instance? Pipe np))
(is (= "foo" (.getName np)))))
+(deftest test-ensure-pipe
+ (let [p (c/pipe "foo")]
+ (is (= p (c/ensure-pipe p)))
+ (is (thrown-with-msg? IllegalArgumentException #"Expected.*"
+ (c/ensure-pipe "foo")))))
+
(deftest test-clojure-filter
(let [fil (ClojureFilter. (p/parse-fn-spec #'odd?))]
(is (= false (t/invoke-filter fil [1])))
@@ -78,3 +78,22 @@
(is (= "{\"inc-age-data\":{\"age\":15},\"up-name\":\"BAR\"}\n{\"inc-age-data\":{\"age\":24},\"up-name\":\"FOO\"}\n"
(ds/slurp* (ju/file sink "part-00000")))))))))
+(defn nested-transform
+ {:fn> ["up-bar"]}
+ [foo]
+ [(.toUpperCase (foo "bar"))])
+
+(deftest nested-json-map-line-test
+ (with-log-level :warn
+ (with-tmp-files [source (temp-dir "source")
+ sink (temp-path "sink")]
+ (let [lines [{"foo" {"bar" "baz"}}]]
+ (write-lines-in source "source.data" (map json/generate-string lines))
+ (let [trans (-> (c/pipe "j") (c/map #'nested-transform :< ["foo"]))
+ flow (c/flow
+ {"j" (c/lfs-tap (c/json-map-line ["foo"]) source)}
+ (c/lfs-tap (c/json-map-line ["up-bar"]) sink)
+ trans)]
+ (c/exec flow)
+ (is (= "{\"up-bar\":\"BAZ\"}\n"
+ (ds/slurp* (ju/file sink "part-00000")))))))))
@@ -26,7 +26,8 @@
(parse-fn-spec [#'example 3]))))
(deftest test-parse-fn-spec-invalid
- (is (thrown? IllegalArgumentException (parse-fn-spec example))))
+ (is (thrown-with-msg? IllegalArgumentException #"Expected.*"
+ (parse-fn-spec example))))
(deftest test-parse-args-everything
(is (= {:fn-spec ["cascading.clojure.parse-test" "example"]

0 comments on commit 4940462

Please sign in to comment.