Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Introduce ClojureWrapper and associated internal changes; allows arbi…

…trary Clojure collections to be used as tuple values.
  • Loading branch information...
commit 1ee1b8ffc7dfcdd11eb00393b6f6174f62d7bc08 1 parent 891a233
@mmcgrana mmcgrana authored
View
3  project.clj
@@ -6,7 +6,8 @@
[org.clojure/clojure-contrib "1.1.0"]
[cascading/cascading "1.0.17-SNAPSHOT"
:exclusions [javax.mail/mail janino/janino]]
- [clj-json "0.2.0"]]
+ [clj-json "0.2.0"]
+ [clj-serializer "0.1.0"]]
:dev-dependencies [[org.clojars.mmcgrana/lein-javac "0.1.0"]
[swank-clojure "1.1.0"]]
:namespaces [cascading.clojure.api
View
21 src/clj/cascading/clojure/api.clj
@@ -92,8 +92,11 @@
([previous group-fields sort-fields reverse-order]
(GroupBy. (as-pipes previous) (fields group-fields) (fields sort-fields) reverse-order)))
-(defn first [#^Pipe previous]
- (Every. previous (First.)))
+(defn first
+ ([#^Pipe previous]
+ (Every. previous (First.)))
+ ([#^Pipe previous in-fields]
+ (Every. previous (fields in-fields) (First.))))
(defn count [#^Pipe previous #^String count-fields]
(Every. previous
@@ -178,10 +181,14 @@
json-vals (clojure.core/map json-map json-keys-arr)]
(Util/coerceToTuple json-vals)))
(sink [#^TupleEntry tuple-entry #^OutputCollector output-collector]
- (let [tuple (.selectTuple tuple-entry scheme-fields)
- json-map (areduce json-keys-arr i mem {}
- (assoc mem (aget json-keys-arr i)
- (.get tuple i)))
+ (let [elems (Util/coerceArrayFromTuple
+ (.selectTuple tuple-entry scheme-fields))
+ json-map (reduce
+ (fn [m i]
+ (assoc m (aget json-keys-arr i)
+ (aget elems i)))
+ {}
+ (range (alength json-keys-arr)))
json-str (json/generate-string json-map)]
(.collect output-collector nil (Tuple. json-str)))))))
@@ -205,6 +212,8 @@
(FlowConnector/setApplicationJarPath props jar-path))
(.setProperty props "mapred.used.genericoptionsparser" "true")
(.setProperty props "cascading.flow.job.pollinginterval" "200")
+ (.setProperty props "cascading.serialization.tokens"
+ "130=cascading.clojure.ClojureWrapper")
(doseq [[k v] config]
(.setProperty props k v))
(let [flow-connector (FlowConnector. props)]
View
46 src/jvm/cascading/clojure/ClojureWrapper.java
@@ -0,0 +1,46 @@
+package cascading.clojure;
+
+import clojure.lang.IPersistentCollection;
+import clojure.lang.Util;
+import org.apache.hadoop.io.Writable;
+import java.io.DataOutput;
+import java.io.DataInput;
+import java.io.IOException;
+import clj_serializer.Serializer;
+
+public class ClojureWrapper implements Comparable, Writable {
+ private static final Object EOF = new Object();
+ private Object obj;
+
+ public ClojureWrapper() {
+ this.obj = null;
+ }
+
+ public ClojureWrapper(Object obj) {
+ this.obj = obj;
+ }
+
+ public Object toClojure() {
+ return this.obj;
+ }
+
+ public int hashCode() {
+ return this.obj.hashCode();
+ }
+
+ public boolean equals(Object o) {
+ return ((ClojureWrapper)o).toClojure().equals(this.toClojure());
+ }
+
+ public int compareTo(Object o) {
+ return this.hashCode() - ((ClojureWrapper)o).hashCode();
+ }
+
+ public void write(DataOutput out) throws IOException {
+ Serializer.serialize(out, this.obj);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ this.obj = Serializer.deserialize(in, EOF);
+ }
+}
View
33 src/jvm/cascading/clojure/Util.java
@@ -3,6 +3,7 @@
import clojure.lang.RT;
import clojure.lang.IFn;
import clojure.lang.ISeq;
+import clojure.lang.IPersistentCollection;
import clojure.lang.IteratorSeq;
import clojure.lang.ArraySeq;
import cascading.tuple.Tuple;
@@ -37,8 +38,22 @@ public static IFn bootFn(Object[] fn_spec) {
}
}
+ public static Object[] coerceArrayFromTuple(Tuple tuple) {
+ int s = tuple.size();
+ Object[] obj_elems = new Object[s];
+ for (int i= 0; i < s; i++) {
+ Comparable comp_elem = tuple.get(i);
+ if (comp_elem instanceof ClojureWrapper) {
+ obj_elems[i] = ((ClojureWrapper)comp_elem).toClojure();
+ } else {
+ obj_elems[i] = comp_elem;
+ }
+ }
+ return obj_elems;
+ }
+
public static ISeq coerceFromTuple(Tuple tuple) {
- return IteratorSeq.create(tuple.iterator());
+ return ArraySeq.create(coerceArrayFromTuple(tuple));
}
public static ISeq coerceFromTuple(TupleEntry tuple) {
@@ -47,10 +62,18 @@ public static ISeq coerceFromTuple(TupleEntry tuple) {
public static Tuple coerceToTuple(Object obj) {
if(obj instanceof Collection) {
- Object[] raw_arr = ((Collection)obj).toArray();
- Comparable[] arr = new Comparable[raw_arr.length];
- System.arraycopy(raw_arr, 0, arr, 0, raw_arr.length);
- return new Tuple(arr);
+ Object[] obj_elems = ((Collection)obj).toArray();
+ int s = obj_elems.length;
+ Comparable[] comp_elems = new Comparable[s];
+ for (int i = 0; i < s; i++) {
+ Object obj_elem = obj_elems[i];
+ if (obj_elem instanceof IPersistentCollection) {
+ comp_elems[i] = new ClojureWrapper((IPersistentCollection)obj_elem);
+ } else {
+ comp_elems[i] = (Comparable) obj_elem;
+ }
+ }
+ return new Tuple(comp_elems);
} else {
return new Tuple((Comparable) obj);
}
View
4 test/cascading/clojure/api_test.clj
@@ -31,6 +31,10 @@
f (Util/bootFn spec)]
(is (= [4] (f 1)))))
+(deftest test-flexible-tuples
+ (let [elems [1 "two" :three [4 5 6] {7 "eight"} `(9 10 11)]]
+ (is (= elems (Util/coerceFromTuple (Util/coerceToTuple elems))))))
+
(deftest test-1-field
(let [f1 (p/fields "foo")]
(is (instance? Fields f1))
View
20 test/cascading/clojure/flow_test.clj
@@ -54,21 +54,25 @@
[["bar" 6] ["bat" 3]]))
(defn transform
- {:fn> ["up-name" "inc-age"]}
- [name age]
- [(.toUpperCase name) (inc age)])
+ {:fn> ["up-name" "inc-age-data"]}
+ [name data]
+ [(.toUpperCase name) (update-in data ["age"] inc)])
(deftest json-map-line-test
(with-log-level :warn
(with-tmp-files [source (temp-dir "source")
sink (temp-path "sink")]
- (let [lines [{"name" "foo" "age" 23} {"name" "bar" "age" 14}]]
+ (let [lines [{"name" "foo" "age-data" {:age 23}}
+ {"name" "bar" "age-data" {:age 14}}]]
(write-lines-in source "source.data" (map json/generate-string lines))
- (let [trans (-> (c/pipe "j") (c/map #'transform :< ["name" "age"]))
+ (let [trans (-> (c/pipe "j")
+ (c/map #'transform :< ["name" "age-data"])
+ (c/group-by "up-name")
+ (c/first "inc-age-data"))
flow (c/flow
- {"j" (c/lfs-tap (c/json-map-line ["name" "age"]) source)}
- (c/lfs-tap (c/json-map-line ["up-name" "inc-age"]) sink)
+ {"j" (c/lfs-tap (c/json-map-line ["name" "age-data"]) source)}
+ (c/lfs-tap (c/json-map-line ["up-name" "inc-age-data"]) sink)
trans)]
(c/exec flow)
- (is (= "{\"inc-age\":24,\"up-name\":\"FOO\"}\n{\"inc-age\":15,\"up-name\":\"BAR\"}\n"
+ (is (= "{\"inc-age-data\":{\"age\":15},\"up-name\":\"BAR\"}\n{\"inc-age-data\":{\"age\":24},\"up-name\":\"FOO\"}\n"
(ds/slurp* (ju/file sink "part-00000")))))))))
Please sign in to comment.
Something went wrong with that request. Please try again.