Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

merging in java-level changes from nm-fn-api to wrap iterator at the …

…java level for buffers and add coercefromtuple for tupleentries.
  • Loading branch information...
commit 47a6f27de759a70d0842231d1eab36754cfad5aa 1 parent 896af96
Bradford Cross authored
View
4 src/clj/cascading/clojure/api.clj
@@ -179,10 +179,6 @@
(Every. previous in-fields
(ClojureBuffer. func-fields specs) out-fields)))
-(defn tuple-seq [it]
- "Takes Iterator<TupleEntry> and returns seq of tuples coerced to vectors."
- (clojure.core/map #(Util/coerceFromTuple (.getTuple %)) (iterator-seq it)))
-
(defn group-by
([previous group-fields]
(GroupBy. (as-pipes previous) (fields group-fields)))
View
44 src/jvm/cascading/clojure/ClojureBuffer.java
@@ -1,24 +1,45 @@
package cascading.clojure;
-
+
import cascading.operation.BaseOperation;
import cascading.operation.Buffer;
import cascading.operation.OperationCall;
import cascading.operation.BufferCall;
import cascading.flow.FlowProcess;
-import cascading.tuple.TupleEntry;
import cascading.tuple.TupleEntryCollector;
-import cascading.tuple.Tuple;
import cascading.tuple.Fields;
+import cascading.tuple.TupleEntry;
import clojure.lang.IFn;
-import clojure.lang.RT;
import clojure.lang.ISeq;
-import java.util.Collection;
+import clojure.lang.IteratorSeq;
+import java.util.Iterator;
+import clojure.lang.RT;
+
public class ClojureBuffer extends BaseOperation<Object>
implements Buffer<Object> {
private Object[] fn_spec;
private IFn fn;
+ protected static class TupleSeqConverter implements Iterator<ISeq> {
+ private Iterator<TupleEntry> _tuples;
+
+ public TupleSeqConverter(Iterator<TupleEntry> tuples) {
+ _tuples = tuples;
+ }
+
+ public boolean hasNext() {
+ return _tuples.hasNext();
+ }
+
+ public ISeq next() {
+ return Util.coerceFromTuple(_tuples.next());
+ }
+
+ public void remove() {
+ _tuples.remove();
+ }
+ }
+
public ClojureBuffer(Fields out_fields, Object[] fn_spec) {
super(out_fields);
this.fn_spec = fn_spec;
@@ -27,15 +48,18 @@ public ClojureBuffer(Fields out_fields, Object[] fn_spec) {
public void prepare(FlowProcess flow_process, OperationCall<Object> op_call) {
this.fn = Util.bootFn(fn_spec);
}
-
+
public void operate(FlowProcess flow_process, BufferCall<Object> buff_call) {
try {
- Collection coll = (Collection) this.fn.invoke(buff_call.getArgumentsIterator());
- for (Object tup : coll) {
- buff_call.getOutputCollector().add(Util.coerceToTuple(tup));
+ ISeq result_seq = RT.seq(this.fn.invoke(IteratorSeq.create(new TupleSeqConverter(buff_call.getArgumentsIterator()))));
+ TupleEntryCollector collector = buff_call.getOutputCollector();
+ while (result_seq != null) {
+ Object obj = result_seq.first();
+ collector.add(Util.coerceToTuple(obj));
+ result_seq = result_seq.next();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
-}
+}
View
7 src/jvm/cascading/clojure/Util.java
@@ -6,6 +6,7 @@
import clojure.lang.IteratorSeq;
import clojure.lang.ArraySeq;
import cascading.tuple.Tuple;
+import cascading.tuple.TupleEntry;
import cascading.operation.OperationCall;
import java.util.Collection;
@@ -39,7 +40,11 @@ public static IFn bootFn(Object[] fn_spec) {
public static ISeq coerceFromTuple(Tuple tuple) {
return IteratorSeq.create(tuple.iterator());
}
-
+
+ public static ISeq coerceFromTuple(TupleEntry tuple) {
+ return coerceFromTuple(tuple.getTuple());
+ }
+
public static Tuple coerceToTuple(Object obj) {
if(obj instanceof Collection) {
Object[] raw_arr = ((Collection)obj).toArray();
View
2  test/cascading/clojure/api_test.clj
@@ -97,7 +97,7 @@
(is (= [[6]] (t/invoke-aggregator a [[1] [2] [3]])))))
(defn buff [it]
- (for [x (c/tuple-seq it)]
+ (for [x it]
[(apply + 1 x)]))
; TODO: notice Buffer exects a fn that takes an iterator and returns a seq of
View
4 test/cascading/clojure/buffer_test.clj
@@ -17,7 +17,7 @@
(reduce maxer coll)))
(defn maxbuff [it]
- (list (max-by second (c/tuple-seq it))))
+ (list (max-by second it)))
(deftest buffer-max-for-each-group
(test-flow
@@ -31,7 +31,7 @@
;;Note that you can not walk the tuple iterator more than once
;;but you can hold on to the seq and walk that more than once.
(defn maxpairs [it]
- (let [tuples (c/tuple-seq it)
+ (let [tuples it
biggest (max-by second tuples)]
(map #(concat % biggest) (remove #(= % biggest) tuples))))
Please sign in to comment.
Something went wrong with that request. Please try again.