Skip to content

Commit

Permalink
Merge branch 'master' of github.com:clj-sys/cascading-clojure
Browse files Browse the repository at this point in the history
  • Loading branch information
jared strate committed Mar 12, 2010
2 parents 8f6c0d7 + 1ee1b8f commit 20429b5
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 20 deletions.
3 changes: 2 additions & 1 deletion project.clj
Expand Up @@ -6,7 +6,8 @@
[org.clojure/clojure-contrib "1.1.0"]
[cascading/cascading "1.0.17-SNAPSHOT"
:exclusions [javax.mail/mail janino/janino]]
[clj-json "0.2.0"]]
[clj-json "0.2.0"]
[clj-serializer "0.1.0"]]
:dev-dependencies [[org.clojars.mmcgrana/lein-javac "0.1.0"]
[swank-clojure "1.1.0"]]
:namespaces [cascading.clojure.api
Expand Down
21 changes: 15 additions & 6 deletions src/clj/cascading/clojure/api.clj
Expand Up @@ -92,8 +92,11 @@
([previous group-fields sort-fields reverse-order]
(GroupBy. (as-pipes previous) (fields group-fields) (fields sort-fields) reverse-order)))

(defn first [#^Pipe previous]
(Every. previous (First.)))
(defn first
([#^Pipe previous]
(Every. previous (First.)))
([#^Pipe previous in-fields]
(Every. previous (fields in-fields) (First.))))

(defn count [#^Pipe previous #^String count-fields]
(Every. previous
Expand Down Expand Up @@ -178,10 +181,14 @@
json-vals (clojure.core/map json-map json-keys-arr)]
(Util/coerceToTuple json-vals)))
(sink [#^TupleEntry tuple-entry #^OutputCollector output-collector]
(let [tuple (.selectTuple tuple-entry scheme-fields)
json-map (areduce json-keys-arr i mem {}
(assoc mem (aget json-keys-arr i)
(.get tuple i)))
(let [elems (Util/coerceArrayFromTuple
(.selectTuple tuple-entry scheme-fields))
json-map (reduce
(fn [m i]
(assoc m (aget json-keys-arr i)
(aget elems i)))
{}
(range (alength json-keys-arr)))
json-str (json/generate-string json-map)]
(.collect output-collector nil (Tuple. json-str)))))))

Expand All @@ -205,6 +212,8 @@
(FlowConnector/setApplicationJarPath props jar-path))
(.setProperty props "mapred.used.genericoptionsparser" "true")
(.setProperty props "cascading.flow.job.pollinginterval" "200")
(.setProperty props "cascading.serialization.tokens"
"130=cascading.clojure.ClojureWrapper")
(doseq [[k v] config]
(.setProperty props k v))
(let [flow-connector (FlowConnector. props)]
Expand Down
46 changes: 46 additions & 0 deletions src/jvm/cascading/clojure/ClojureWrapper.java
@@ -0,0 +1,46 @@
package cascading.clojure;

import clojure.lang.IPersistentCollection;
import clojure.lang.Util;
import org.apache.hadoop.io.Writable;
import java.io.DataOutput;
import java.io.DataInput;
import java.io.IOException;
import clj_serializer.Serializer;

public class ClojureWrapper implements Comparable, Writable {
private static final Object EOF = new Object();
private Object obj;

public ClojureWrapper() {
this.obj = null;
}

public ClojureWrapper(Object obj) {
this.obj = obj;
}

public Object toClojure() {
return this.obj;
}

public int hashCode() {
return this.obj.hashCode();
}

public boolean equals(Object o) {
return ((ClojureWrapper)o).toClojure().equals(this.toClojure());
}

public int compareTo(Object o) {
return this.hashCode() - ((ClojureWrapper)o).hashCode();
}

public void write(DataOutput out) throws IOException {
Serializer.serialize(out, this.obj);
}

public void readFields(DataInput in) throws IOException {
this.obj = Serializer.deserialize(in, EOF);
}
}
33 changes: 28 additions & 5 deletions src/jvm/cascading/clojure/Util.java
Expand Up @@ -3,6 +3,7 @@
import clojure.lang.RT;
import clojure.lang.IFn;
import clojure.lang.ISeq;
import clojure.lang.IPersistentCollection;
import clojure.lang.IteratorSeq;
import clojure.lang.ArraySeq;
import cascading.tuple.Tuple;
Expand Down Expand Up @@ -37,8 +38,22 @@ public static IFn bootFn(Object[] fn_spec) {
}
}

public static Object[] coerceArrayFromTuple(Tuple tuple) {
int s = tuple.size();
Object[] obj_elems = new Object[s];
for (int i= 0; i < s; i++) {
Comparable comp_elem = tuple.get(i);
if (comp_elem instanceof ClojureWrapper) {
obj_elems[i] = ((ClojureWrapper)comp_elem).toClojure();
} else {
obj_elems[i] = comp_elem;
}
}
return obj_elems;
}

public static ISeq coerceFromTuple(Tuple tuple) {
return IteratorSeq.create(tuple.iterator());
return ArraySeq.create(coerceArrayFromTuple(tuple));
}

public static ISeq coerceFromTuple(TupleEntry tuple) {
Expand All @@ -47,10 +62,18 @@ public static ISeq coerceFromTuple(TupleEntry tuple) {

public static Tuple coerceToTuple(Object obj) {
if(obj instanceof Collection) {
Object[] raw_arr = ((Collection)obj).toArray();
Comparable[] arr = new Comparable[raw_arr.length];
System.arraycopy(raw_arr, 0, arr, 0, raw_arr.length);
return new Tuple(arr);
Object[] obj_elems = ((Collection)obj).toArray();
int s = obj_elems.length;
Comparable[] comp_elems = new Comparable[s];
for (int i = 0; i < s; i++) {
Object obj_elem = obj_elems[i];
if (obj_elem instanceof IPersistentCollection) {
comp_elems[i] = new ClojureWrapper((IPersistentCollection)obj_elem);
} else {
comp_elems[i] = (Comparable) obj_elem;
}
}
return new Tuple(comp_elems);
} else {
return new Tuple((Comparable) obj);
}
Expand Down
4 changes: 4 additions & 0 deletions test/cascading/clojure/api_test.clj
Expand Up @@ -31,6 +31,10 @@
f (Util/bootFn spec)]
(is (= [4] (f 1)))))

(deftest test-flexible-tuples
(let [elems [1 "two" :three [4 5 6] {7 "eight"} `(9 10 11)]]
(is (= elems (Util/coerceFromTuple (Util/coerceToTuple elems))))))

(deftest test-1-field
(let [f1 (p/fields "foo")]
(is (instance? Fields f1))
Expand Down
20 changes: 12 additions & 8 deletions test/cascading/clojure/flow_test.clj
Expand Up @@ -54,23 +54,27 @@
[["bar" 6] ["bat" 3]]))

(defn transform
{:fn> ["up-name" "inc-age"]}
[name age]
[(.toUpperCase name) (inc age)])
{:fn> ["up-name" "inc-age-data"]}
[name data]
[(.toUpperCase name) (update-in data ["age"] inc)])

(deftest json-map-line-test
(with-log-level :warn
(with-tmp-files [source (temp-dir "source")
sink (temp-path "sink")]
(let [lines [{"name" "foo" "age" 23} {"name" "bar" "age" 14}]]
(let [lines [{"name" "foo" "age-data" {:age 23}}
{"name" "bar" "age-data" {:age 14}}]]
(write-lines-in source "source.data" (map json/generate-string lines))
(let [trans (-> (c/pipe "j") (c/map #'transform :< ["name" "age"]))
(let [trans (-> (c/pipe "j")
(c/map #'transform :< ["name" "age-data"])
(c/group-by "up-name")
(c/first "inc-age-data"))
flow (c/flow
{"j" (c/lfs-tap (c/json-map-line ["name" "age"]) source)}
(c/lfs-tap (c/json-map-line ["up-name" "inc-age"]) sink)
{"j" (c/lfs-tap (c/json-map-line ["name" "age-data"]) source)}
(c/lfs-tap (c/json-map-line ["up-name" "inc-age-data"]) sink)
trans)]
(c/exec flow)
(is (= "{\"inc-age\":24,\"up-name\":\"FOO\"}\n{\"inc-age\":15,\"up-name\":\"BAR\"}\n"
(is (= "{\"inc-age-data\":{\"age\":15},\"up-name\":\"BAR\"}\n{\"inc-age-data\":{\"age\":24},\"up-name\":\"FOO\"}\n"
(ds/slurp* (ju/file sink "part-00000")))))))))

(defn nested-transform
Expand Down

0 comments on commit 20429b5

Please sign in to comment.