Skip to content
Browse files

merge

  • Loading branch information...
2 parents d171319 + 1ee1b8f commit 432f3dd8cddb6ae70c07b80d7a98731c63d66ef7 @waratuman committed Mar 12, 2010
View
4 project.clj
@@ -6,8 +6,8 @@
[org.clojure/clojure-contrib "1.1.0-master-SNAPSHOT"]
[cascading/cascading "1.0.17-SNAPSHOT"
:exclusions [javax.mail/mail janino/janino]]
- [clj-serializer "0.1.0-SNAPSHOT"]
- [clj-json "0.2.0"]]
+ [clj-json "0.2.0"]
+ [clj-serializer "0.1.0"]]
:dev-dependencies [[org.clojars.mmcgrana/lein-javac "0.1.0"]
[swank-clojure "1.1.0"]]
:namespaces [cascading.clojure.api
View
9 src/clj/cascading/clojure/api.clj
@@ -92,8 +92,11 @@
([previous group-fields sort-fields reverse-order]
(GroupBy. (as-pipes previous) (fields group-fields) (fields sort-fields) reverse-order)))
-(defn first [#^Pipe previous in-fields]
- (Every. previous (fields in-fields) (First.)))
+(defn first
+ ([#^Pipe previous]
+ (Every. previous (First.)))
+ ([#^Pipe previous in-fields]
+ (Every. previous (fields in-fields) (First.))))
(defn count [#^Pipe previous #^String count-fields]
(Every. previous
@@ -209,6 +212,8 @@
(FlowConnector/setApplicationJarPath props jar-path))
(.setProperty props "mapred.used.genericoptionsparser" "true")
(.setProperty props "cascading.flow.job.pollinginterval" "200")
+ (.setProperty props "cascading.serialization.tokens"
+ "130=cascading.clojure.ClojureWrapper")
(doseq [[k v] config]
(.setProperty props k v))
(let [flow-connector (FlowConnector. props)]
View
2 src/jvm/cascading/clojure/ClojureWrapper.java
@@ -39,7 +39,7 @@ public int compareTo(Object o) {
public void write(DataOutput out) throws IOException {
Serializer.serialize(out, this.obj);
}
-
+
public void readFields(DataInput in) throws IOException {
this.obj = Serializer.deserialize(in, EOF);
}
View
31 test/cascading/clojure/flow_test.clj
@@ -54,26 +54,27 @@
[["bar" 6] ["bat" 3]]))
(defn transform
- {:fn> ["up-name" "inc-age"]}
- [name age]
- [(.toUpperCase name) (inc age)])
+ {:fn> ["up-name" "inc-age-data"]}
+ [name data]
+ [(.toUpperCase name) (update-in data ["age"] inc)])
(deftest json-map-line-test
(with-log-level :warn
(with-tmp-files [source (temp-dir "source")
sink (temp-path "sink")]
- (let [lines [{"name" "foo" "age" 23} {"name" "bar" "age" 14}]]
- (write-lines-in source "source.data" (map json/generate-string
- lines))
+ (let [lines [{"name" "foo" "age-data" {:age 23}}
+ {"name" "bar" "age-data" {:age 14}}]]
+ (write-lines-in source "source.data" (map json/generate-string lines))
(let [trans (-> (c/pipe "j")
- (c/map #'transform :< ["name" "age"]))
- flow (c/flow {"j" (c/lfs-tap (c/json-map-line ["name" "age"]) source)}
- (c/lfs-tap (c/json-map-line ["up-name" "inc-age"])
- sink)
- trans)]
- (c/exec flow)
- (is (= "{\"inc-age\":24,\"up-name\":\"FOO\"}\n{\"inc-age\":15,\"up-name\":\"BAR\"}\n"
- (ds/slurp* (ju/file sink "part-00000")))))))))
-
+ (c/map #'transform :< ["name" "age-data"])
+ (c/group-by "up-name")
+ (c/first "inc-age-data"))
+ flow (c/flow
+ {"j" (c/lfs-tap (c/json-map-line ["name" "age-data"]) source)}
+ (c/lfs-tap (c/json-map-line ["up-name" "inc-age-data"]) sink)
+ trans)]
+ (c/exec flow)
+ (is (= "{\"inc-age-data\":{\"age\":15},\"up-name\":\"BAR\"}\n{\"inc-age-data\":{\"age\":24},\"up-name\":\"FOO\"}\n"
+ (ds/slurp* (ju/file sink "part-00000")))))))))

0 comments on commit 432f3dd

Please sign in to comment.
Something went wrong with that request. Please try again.