Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

groupby with multiple pipes

  • Loading branch information...
commit 896af9683dc42eb693e0e814c5e3894fa5330aa2 1 parent 228a66f
Bradford Cross authored
View
21 src/clj/cascading/clojure/api.clj
@@ -6,6 +6,7 @@
(cascading.scheme TextLine)
(cascading.flow Flow FlowConnector)
(cascading.operation Identity)
+ (cascading.operation.filter Limit)
(cascading.operation.regex RegexGenerator RegexFilter)
(cascading.operation.aggregator First Count)
(cascading.pipe Pipe Each Every GroupBy CoGroup)
@@ -63,6 +64,12 @@
[pipes]
(into-array Pipe pipes))
+(defn as-pipes
+ [pipe-or-pipes]
+ (let [pipes (if (instance? Pipe pipe-or-pipes)
+ [pipe-or-pipes] pipe-or-pipes)]
+ (into-array Pipe pipes)))
+
(defn- fields-obj? [obj]
"Returns true for a Fileds instance, a string, or an array of strings."
(or
@@ -150,6 +157,12 @@
(Each. previous in-fields
(ClojureMap. func-fields spec) out-fields)))
+(defn extract [#^Pipe previous & args]
+"a map operation that extracts a new field, thus returning fields/ALL."
+ (let [[#^Fields in-fields func-fields spec #^Fields out-fields] (parse-args args)]
+ (Each. previous in-fields
+ (ClojureMap. func-fields spec) Fields/ALL)))
+
(defn agg [f init]
"A combinator that takes a fn and an init value and returns a reduce aggregator."
(fn ([] init)
@@ -171,12 +184,12 @@
(clojure.core/map #(Util/coerceFromTuple (.getTuple %)) (iterator-seq it)))
(defn group-by
- ([#^Pipe previous group-fields]
- (GroupBy. previous (fields group-fields)))
+ ([previous group-fields]
+ (GroupBy. (as-pipes previous) (fields group-fields)))
([previous group-fields sort-fields]
- (GroupBy. previous (fields group-fields) (fields sort-fields)))
+ (GroupBy. (as-pipes previous) (fields group-fields) (fields sort-fields)))
([previous group-fields sort-fields reverse-order]
- (GroupBy. previous (fields group-fields) (fields sort-fields) reverse-order)))
+ (GroupBy. (as-pipes previous) (fields group-fields) (fields sort-fields) reverse-order)))
(defn first [#^Pipe previous]
(Every. previous (First.)))
View
22 test/cascading/clojure/scenarios.clj
@@ -65,4 +65,24 @@
["x1" "y1" "num1"
"x2" "y2" "num2"])]
(c/select joined ["num1" "num2"])))
- [[5 nil] [7 nil] [9 5] [9 6] [9 7] [6 nil]]))
+ [[5 nil] [7 nil] [9 5] [9 6] [9 7] [6 nil]]))
+
+(defn x-and-y [x y z]
+ (str x y))
+
+(defn y-and-z [x y z]
+ (str y z))
+
+(deftest merging-different-groups
+ (test-flow
+ (in-pipes {"p1" ["x" "y" "z"]
+ "p2" ["x" "y" "z"]})
+ (in-tuples {"p1" [[0 1 5] [2 1 6] [0 1 7] [2 1 9]]
+ "p2" [[0 0 1] [2 2 1] [0 2 1] [3 0 1]]})
+ (fn [{p1 "p1" p2 "p2"}]
+ (let [new-left (c/extract p1 ["dualgroup" #'x-and-y])
+ new-right (c/extract p2 ["dualgroup" #'y-and-z])]
+ (c/group-by [new-left new-right]
+ ["dualgroup"] ["z"])))
+ [[0 0 1 "01"] [3 0 1 "01"] [0 1 5 "01"] [0 1 7 "01"]
+ [2 2 1 "21"] [0 2 1 "21"] [2 1 6 "21"] [2 1 9 "21"]]))
Please sign in to comment.
Something went wrong with that request. Please try again.