Permalink
Browse files

Merged with bradford/multi-join

  • Loading branch information...
waratuman committed Feb 20, 2010
2 parents 6304ec8 + 4a40cbd commit 278121675f632632caff3b8d03b5b0587f2a4613
@@ -9,16 +9,16 @@
(cascading.operation.regex RegexGenerator RegexFilter)
(cascading.operation.aggregator First Count)
(cascading.pipe Pipe Each Every GroupBy CoGroup)
- (cascading.pipe.cogroup
- InnerJoin OuterJoin LeftJoin RightJoin MixedJoin)
+ (cascading.pipe.cogroup InnerJoin OuterJoin
+ LeftJoin RightJoin MixedJoin)
(cascading.scheme Scheme)
(cascading.tap Hfs Lfs Tap)
(org.apache.hadoop.io Text)
(org.apache.hadoop.mapred TextInputFormat TextOutputFormat
OutputCollector JobConf)
(java.util Properties Map UUID)
(cascading.clojure ClojureFilter ClojureMapcat ClojureMap
- ClojureAggregator Util)
+ ClojureAggregator ClojureBuffer Util)
(clojure.lang Var)
(java.io File)
(java.lang RuntimeException)))
@@ -74,13 +74,21 @@
(clojure.core/first (find-first #(pred (last %)) (indexed coll))))
; in-fields: subset of fields from incoming pipe that are passed to function
-; defaults to all
+; defaults to Fields/ALL
; func-fields: fields declared to be returned by the function
-; must be given in meta data or as [override-fields #'func-var]
-; no default, error if missing
+; defaults to Fields/ARGS
; out-fields: subset of (union in-fields func-fields) that flow out of the pipe
-; defaults to func-fields
-
+; defaults to Fields/RESULTS
+
+; Regarding the operation's fields declarations, if you resort to
+; Fields/UNKNOWN, you will lose all your field names. You should deafult the
+; fields declarations to Fields/ARGS. This means you emit the same fields that
+; you take in via the input selector - a common scenario. If you want to add new
+; fields or change them, you must explicitly declare them. Regarding input
+; selectors, Fields/All is the logical default. Chris says that - regarding the
+; outputselector - we might tend to use Fields/RESULTS more for each and
+; Fields/All more for every, but perhaps fields/RESULTS is the best fit for out
+; way of thinking.
(defn- parse-func [func-obj]
"func-obj =>
#'func
@@ -94,10 +102,9 @@
(let [spec (fn-spec (drop i func-obj))
func-var (nth func-obj i)
func-fields (or (and (= i 1) (clojure.core/first func-obj))
- ((meta func-var) :fields))]
- (when-not func-fields
- (throw (Exception. (str "no fields assocaiated with " func-obj))))
- [(fields func-fields) spec])))
+ ((meta func-var) :fields))
+ function-fields (if func-fields (fields func-fields) Fields/ARGS)]
+ [function-fields spec])))
(defn- parse-args
"arr =>
@@ -143,11 +150,26 @@
(Each. previous in-fields
(ClojureMap. func-fields spec) out-fields)))
+(defn agg [f init]
+ "A combinator that takes a fn and an init value and returns a reduce aggregator."
+ (fn ([] init)
+ ([x] [x])
+ ([x y] (f x y))))
+
(defn aggregate [#^Pipe previous & args]
(let [[#^Fields in-fields func-fields specs #^Fields out-fields] (parse-args args)]
(Every. previous in-fields
(ClojureAggregator. func-fields specs) out-fields)))
+(defn buffer [#^Pipe previous & args]
+ (let [[#^Fields in-fields func-fields specs #^Fields out-fields] (parse-args args)]
+ (Every. previous in-fields
+ (ClojureBuffer. func-fields specs) out-fields)))
+
+(defn tuple-seq [it]
+ "Takes Iterator<TupleEntry> and returns seq of tuples coerced to vectors."
+ (clojure.core/map #(Util/coerceFromTuple (.getTuple %)) (iterator-seq it)))
+
(defn group-by [#^Pipe previous group-fields]
(GroupBy. previous (fields group-fields)))
@@ -161,17 +183,21 @@
(defn co-group
[pipes-seq fields-seq declared-fields joiner]
(CoGroup.
- (pipes-array pipes-seq)
- (fields-array fields-seq)
- (fields declared-fields)
- joiner))
+ (pipes-array pipes-seq)
+ (fields-array fields-seq)
+ (fields declared-fields)
+ joiner))
+
+; TODO: create join abstractions. http://en.wikipedia.org/wiki/Join_(SQL)
-;;TODO create join abstractions. http://en.wikipedia.org/wiki/Join_(SQL)
-;;"join and drop" is called a natural join - inner join, followed by select to remove duplicate join keys.
+; "join and drop" is called a natural join - inner join, followed by select to
+; remove duplicate join keys.
-;;another kind of join and dop is to drop all the join keys - for example, when you have extracted a specil join key jsut for grouping, you typicly want to get rid of it after the group operation.
+; another kind of join and dop is to drop all the join keys - for example, when
+; you have extracted a specil join key jsut for grouping, you typicly want to get
+; rid of it after the group operation.
-;;another kind of "join and drop" is an outer-join followed by dropping the nils
+; another kind of "join and drop" is an outer-join followed by dropping the nils
(defn inner-join
[pipes-seq fields-seq declared-fields]
@@ -192,16 +218,16 @@
(defn mixed-join
[pipes-seq fields-seq declared-fields inner-bools]
(co-group pipes-seq fields-seq declared-fields
- (MixedJoin. (into-array Boolean inner-bools))))
+ (MixedJoin. (into-array Boolean inner-bools))))
(defn join-into
"outer-joins all pipes into the leftmost pipe"
[pipes-seq fields-seq declared-fields]
(co-group pipes-seq fields-seq declared-fields
- (MixedJoin.
- (boolean-array (cons true
- (repeat (- (clojure.core/count pipes-seq)
- 1) false))))))
+ (MixedJoin.
+ (boolean-array (cons true
+ (repeat (- (clojure.core/count pipes-seq)
+ 1) false))))))
(defn select [#^Pipe previous keep-fields]
(Each. previous (fields keep-fields) (Identity.)))
@@ -275,4 +301,4 @@
(.writeDOT flow path))
(defn exec [#^Flow flow]
- (doto flow .start .complete))
+ (doto flow .start .complete))
@@ -30,20 +30,10 @@
(swap! out-atom conj (Util/coerceFromTuple tuple)))))
(defn- op-call []
- (let [args-atom (atom nil)
- out-atom (atom [])
- context-atom (atom nil)]
+ (let [out-atom (atom [])]
(proxy [ConcreteCall IPersistentCollection] []
- (setArguments [tuple]
- (swap! args-atom (constantly tuple)))
- (getArguments []
- @args-atom)
(getOutputCollector []
(output-collector out-atom))
- (setContext [context]
- (swap! context-atom (constantly context)))
- (getContext []
- @context-atom)
(seq []
(seq @out-atom)))))
@@ -73,6 +63,17 @@
(.cleanup a fp-null ag-call)
(op-call-results ag-call)))
+(defn invoke-buffer [a colls]
+ (let [a (roundtrip a)
+ ag-call (op-call)
+ fp-null FlowProcess/NULL]
+ (.prepare a fp-null ag-call)
+ (.setArgumentsIterator ag-call
+ (.iterator (map #(TupleEntry. (Util/coerceToTuple %)) colls)))
+ (.operate a fp-null ag-call)
+ (.cleanup a fp-null ag-call)
+ (op-call-results ag-call)))
+
(defn- deserialize-tuple [line]
(read-string line))
@@ -96,17 +97,17 @@
(defn in-pipe [in-label in-fields]
(-> (c/pipe in-label)
(c/map [in-fields
- #'deserialize-tuple])))
+ #'deserialize-tuple])))
(defn in-pipes [fields-spec]
(if (not (map? fields-spec))
(in-pipe "in" fields-spec)
(mash (fn [[in-label one-in-fields]]
- [in-label (in-pipe in-label one-in-fields)])
- fields-spec)))
+ [in-label (in-pipe in-label one-in-fields)])
+ fields-spec)))
(defn in-tuples [tuples-spec]
- (if (map? tuples-spec)
+ (if (map? tuples-spec)
tuples-spec
{"in" tuples-spec}))
@@ -115,17 +116,17 @@
(with-tmp-files [source-dir-path (temp-dir "source")
sink-path (temp-path "sink")]
(doseq [[in-label in-tuples] in-tuples-spec]
- (write-lines-in source-dir-path in-label
- (map serialize-tuple in-tuples)))
+ (write-lines-in source-dir-path in-label
+ (map serialize-tuple in-tuples)))
(let [assembly (-> in-pipes-spec
assembler
(c/map #'serialize-vals))
- source-tap-map (mash (fn [[in-label _]]
- [in-label
- (c/lfs-tap (c/text-line "line")
- (file source-dir-path in-label))])
- in-tuples-spec)
- sink-tap (c/lfs-tap (c/text-line "line") sink-path)
- flow (c/flow source-tap-map sink-tap assembly)
- out-tuples (line-sink-seq (.openSink (c/exec flow)))]
- (is (= expected-out-tuples out-tuples))))))
+ source-tap-map (mash (fn [[in-label _]]
+ [in-label
+ (c/lfs-tap (c/text-line "line")
+ (file source-dir-path in-label))])
+ in-tuples-spec)
+ sink-tap (c/lfs-tap (c/text-line "line") sink-path)
+ flow (c/flow source-tap-map sink-tap assembly)
+ out-tuples (line-sink-seq (.openSink (c/exec flow)))]
+ (is (= expected-out-tuples out-tuples))))))
@@ -0,0 +1,41 @@
+package cascading.clojure;
+
+import cascading.operation.BaseOperation;
+import cascading.operation.Buffer;
+import cascading.operation.OperationCall;
+import cascading.operation.BufferCall;
+import cascading.flow.FlowProcess;
+import cascading.tuple.TupleEntry;
+import cascading.tuple.TupleEntryCollector;
+import cascading.tuple.Tuple;
+import cascading.tuple.Fields;
+import clojure.lang.IFn;
+import clojure.lang.RT;
+import clojure.lang.ISeq;
+import java.util.Collection;
+
+public class ClojureBuffer extends BaseOperation<Object>
+ implements Buffer<Object> {
+ private Object[] fn_spec;
+ private IFn fn;
+
+ public ClojureBuffer(Fields out_fields, Object[] fn_spec) {
+ super(out_fields);
+ this.fn_spec = fn_spec;
+ }
+
+ public void prepare(FlowProcess flow_process, OperationCall<Object> op_call) {
+ this.fn = Util.bootFn(fn_spec);
+ }
+
+ public void operate(FlowProcess flow_process, BufferCall<Object> buff_call) {
+ try {
+ Collection coll = (Collection) this.fn.invoke(buff_call.getArgumentsIterator());
+ for (Object tup : coll) {
+ buff_call.getOutputCollector().add(Util.coerceToTuple(tup));
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
@@ -4,7 +4,7 @@
(:import (cascading.tuple Fields)
(cascading.pipe Pipe)
(cascading.clojure ClojureFilter ClojureMap ClojureMapcat
- ClojureAggregator Util)))
+ ClojureAggregator ClojureBuffer Util)))
(deftest test-ns-fn-name-pair
(let [[ns-name fn-name] (c/ns-fn-name-pair #'str)]
@@ -94,14 +94,19 @@
m2 (ClojureMapcat. (c/fields "num") (c/fn-spec #'iterate-inc))]
(are [m] (= [[2] [3] [4]] (t/invoke-function m [1])) m1 m2)))
-(defn sum
- ([]
- 0)
- ([mem v]
- (+ mem v))
- ([mem]
- [mem]))
+(def sum (c/agg + 0))
(deftest test-clojure-aggregator
(let [a (ClojureAggregator. (c/fields "sum") (c/fn-spec #'sum))]
(is (= [[6]] (t/invoke-aggregator a [[1] [2] [3]])))))
+
+(defn buff [it]
+ (for [x (c/tuple-seq it)]
+ [(apply + 1 x)]))
+
+; TODO: notice Buffer exects a fn that takes an iterator and returns a seq of
+; tuples. if we want to return only a single tuple, then we need to wrap the
+; tuple in a seq.
+(deftest test-clojure-buffer
+ (let [a (ClojureBuffer. (c/fields "sum") (c/fn-spec #'buff))]
+ (is (= [[2][3][4]] (t/invoke-buffer a [[1] [2] [3]])))))
@@ -0,0 +1,48 @@
+(ns cascading.clojure.buffer-test
+ (:use clojure.test
+ cascading.clojure.testing)
+ (:import (cascading.tuple Fields)
+ (cascading.pipe Pipe)
+ (cascading.clojure Util ClojureMap))
+ (:require [clj-json :as json])
+ (:require [clojure.contrib.duck-streams :as ds])
+ (:require [clojure.contrib.java-utils :as ju])
+ (:require (cascading.clojure [api :as c])))
+
+(defn- max-by [keyfn coll]
+ (let [maxer (fn [max-elem next-elem]
+ (if (> (keyfn max-elem) (keyfn next-elem))
+ max-elem
+ next-elem))]
+ (reduce maxer coll)))
+
+(defn maxbuff [it]
+ (list (max-by second (c/tuple-seq it))))
+
+(deftest buffer-max-for-each-group
+ (test-flow
+ (in-pipes ["word" "subcount"])
+ (in-tuples [["bar" 1] ["bat" 7] ["bar" 3] ["bar" 2] ["bat" 4]])
+ (fn [in] (-> in
+ (c/group-by "word")
+ (c/buffer #'maxbuff)))
+ [["bar" 3] ["bat" 7]]))
+
+;;Note that you can not walk the tuple iterator more than once
+;;but you can hold on to the seq and walk that more than once.
+(defn maxpairs [it]
+ (let [tuples (c/tuple-seq it)
+ biggest (max-by second tuples)]
+ (map #(concat % biggest) (remove #(= % biggest) tuples))))
+
+(deftest buffer-max-and-pair
+ (test-flow
+ (in-pipes ["word" "subcount"])
+ (in-tuples [["bar" 1] ["bat" 7] ["bar" 3] ["bar" 2] ["bat" 4]])
+ (fn [in] (-> in
+ (c/group-by "word")
+ (c/buffer [["word" "subcount" "maxword" "maxsubcount"]
+ #'maxpairs])))
+ [["bar" 1 "bar" 3]
+ ["bar" 2 "bar" 3]
+ ["bat" 4 "bat" 7]]))
@@ -28,8 +28,8 @@
(in-pipes ["x" "y" "foo"])
(in-tuples [[2 3 "blah"] [7 3 "blah"]])
(fn [in] (-> in (c/map ["x" "y"]
- ["sum" #'+]
- ["sum"])))
+ ["sum" #'+]
+ ["sum"])))
[[5] [10]]))
(defn extract-key
@@ -44,13 +44,7 @@
(fn [in] (-> in (c/map "val" #'extract-key ["key" "num"])))
[["bar" 1] ["ban" 2]]))
-(defn sum
- ([]
- 0)
- ([mem v]
- (+ mem v))
- ([mem]
- [mem]))
+(def sum (c/agg + 0))
(deftest aggreate-test
(test-flow
Oops, something went wrong.

0 comments on commit 2781216

Please sign in to comment.