Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

merging in new parse-args from nm-fn-api

  • Loading branch information...
commit f480c9a090f980ee2ec5543e4ba9c8670bed4f59 1 parent 7d62384
Bradford Cross authored
View
98 src/clj/cascading/clojure/api.clj
@@ -1,6 +1,7 @@
(ns cascading.clojure.api
(:refer-clojure :exclude (count first filter mapcat map))
(:use [clojure.contrib.seq-utils :only [find-first indexed]])
+ (:use cascading.clojure.parse)
(:require (clj-json [core :as json]))
(:import (cascading.tuple Tuple TupleEntry Fields)
(cascading.scheme TextLine)
@@ -24,42 +25,6 @@
(java.io File)
(java.lang RuntimeException)))
-(defn ns-fn-name-pair [v]
- (let [m (meta v)]
- [(str (:ns m)) (str (:name m))]))
-
-(defn fn-spec [v-or-coll]
- "v-or-coll => var or [var & params]
- Returns an Object array that is used to represent a Clojure function.
- If the argument is a var, the array represents that function.
- If the argument is a coll, the array represents the function returned
- by applying the first element, which should be a var, to the rest of the
- elements."
- (cond
- (var? v-or-coll)
- (into-array Object (ns-fn-name-pair v-or-coll))
- (coll? v-or-coll)
- (into-array Object
- (concat
- (ns-fn-name-pair (clojure.core/first v-or-coll))
- (next v-or-coll)))
- :else
- (throw (IllegalArgumentException. (str v-or-coll)))))
-
-(defn- collectify [obj]
- (if (sequential? obj) obj [obj]))
-
-(defn fields
- {:tag Fields}
- [obj]
- (if (or (nil? obj) (instance? Fields obj))
- obj
- (Fields. (into-array String (collectify obj)))))
-
-(defn fields-array
- [fields-seq]
- (into-array Fields (clojure.core/map fields fields-seq)))
-
(defn pipes-array
[pipes]
(into-array Pipe pipes))
@@ -70,67 +35,6 @@
[pipe-or-pipes] pipe-or-pipes)]
(into-array Pipe pipes)))
-(defn- fields-obj? [obj]
- "Returns true for a Fileds instance, a string, or an array of strings."
- (or
- (instance? Fields obj)
- (string? obj)
- (and (sequential? obj) (every? string? obj))))
-
-(defn- idx-of-first [coll pred]
- (clojure.core/first (find-first #(pred (last %)) (indexed coll))))
-
-; in-fields: subset of fields from incoming pipe that are passed to function
-; defaults to Fields/ALL
-; func-fields: fields declared to be returned by the function
-; defaults to Fields/ARGS
-; out-fields: subset of (union in-fields func-fields) that flow out of the pipe
-; defaults to Fields/RESULTS
-
-; Regarding the operation's fields declarations, if you resort to
-; Fields/UNKNOWN, you will lose all your field names. You should deafult the
-; fields declarations to Fields/ARGS. This means you emit the same fields that
-; you take in via the input selector - a common scenario. If you want to add new
-; fields or change them, you must explicitly declare them. Regarding input
-; selectors, Fields/All is the logical default. Chris says that - regarding the
-; outputselector - we might tend to use Fields/RESULTS more for each and
-; Fields/All more for every, but perhaps fields/RESULTS is the best fit for out
-; way of thinking.
-(defn- parse-func [func-obj]
- "func-obj =>
- #'func
- [#'func]
- [override-fields #'func]
- [#'func & params]
- [override-fields #'func & params]"
- (let [func-obj (collectify func-obj)
- i (idx-of-first func-obj var?)]
- (assert (<= i 1))
- (let [spec (fn-spec (drop i func-obj))
- func-var (nth func-obj i)
- func-fields (or (and (= i 1) (clojure.core/first func-obj))
- ((meta func-var) :fields))
- function-fields (if func-fields (fields func-fields) Fields/ARGS)]
- [function-fields spec])))
-
-(defn- parse-args
- "arr =>
- [func-obj]
- [in-fields func-obj]
- [in-fields func-obj out-fields]
- [func-obj out-fields]"
- [arr]
- (let [i (idx-of-first arr (complement fields-obj?))]
- (assert (<= i 1))
- (let [in-fields (if (= i 1)
- (fields (clojure.core/first arr))
- Fields/ALL)
- [func-fields spec] (parse-func (nth arr i))
- out-fields (if (< i (dec (clojure.core/count arr)))
- (fields (last arr))
- Fields/RESULTS)]
- [in-fields func-fields spec out-fields])))
-
(defn- uuid []
(str (UUID/randomUUID)))
View
45 src/clj/cascading/clojure/parse.clj
@@ -19,7 +19,7 @@
(coll? v-or-coll)
(into-array Object
(concat
- (ns-fn-name-pair (clojure.core/first v-or-coll))
+ (ns-fn-name-pair (first v-or-coll))
(next v-or-coll)))
:else
(throw (IllegalArgumentException. (str v-or-coll)))))
@@ -36,7 +36,7 @@
(defn fields-array
[fields-seq]
- (into-array Fields (clojure.core/map fields fields-seq)))
+ (into-array Fields (map fields fields-seq)))
(defn- fields-obj? [obj]
"Returns true for a Fileds instance, a string, or an array of strings."
@@ -46,7 +46,7 @@
(and (sequential? obj) (every? string? obj))))
(defn- idx-of-first [aseq pred]
- (clojure.core/first (find-first #(pred (last %)) (indexed aseq))))
+ (first (find-first #(pred (last %)) (indexed aseq))))
(defn parse-args
"
@@ -57,16 +57,29 @@
([arr] (parse-args arr Fields/RESULTS))
([arr defaultout]
(let
- [func-args (clojure.core/first arr)
- varargs (rest arr)
- spec (fn-spec func-args)
- func-var (if (var? func-args) func-args (clojure.core/first func-args))
- first-elem (clojure.core/first varargs)
- [in-fields keyargs] (if (or (nil? first-elem)
- (keyword? first-elem))
- [Fields/ALL (apply hash-map varargs)]
- [(fields (clojure.core/first varargs))
- (apply hash-map (rest varargs))])
- options (merge {:fn> (:fields (meta func-var)) :> defaultout} keyargs)
- result [in-fields (fields (:fn> options)) spec (fields (:> options))]]
- result )))
+ [func-args (first arr)
+ varargs (rest arr)
+ spec (fn-spec func-args)
+ func-var (if (var? func-args)
+ func-args
+ (first func-args))
+ defaults {:fn> (or
+ (:fields (meta func-var))
+ Fields/ARGS)
+ :fn spec
+ :> defaultout}
+ first-elem (first varargs)
+ keyargs (if (or (nil? first-elem)
+ (keyword? first-elem))
+ (apply hash-map
+ (concat [:in Fields/ALL]
+ varargs))
+ (apply hash-map
+ (concat [:in (first varargs)]
+ (rest varargs))))
+ options (merge defaults keyargs)
+ fieldsized (into {} (for [[k v] options]
+ (if (= k :fn)
+ [k v]
+ [k (fields v)])))]
+ (vals fieldsized))))
View
3  src/clj/cascading/clojure/testing.clj
@@ -96,8 +96,7 @@
(defn in-pipe [in-label in-fields]
(-> (c/pipe in-label)
- (c/map [in-fields
- #'deserialize-tuple])))
+ (c/map #'deserialize-tuple :fn> in-fields)))
(defn in-pipes [fields-spec]
(if (not (map? fields-spec))
View
30 test/cascading/clojure/api_test.clj
@@ -1,13 +1,15 @@
(ns cascading.clojure.api-test
(:use clojure.test)
- (:require (cascading.clojure [api :as c] [testing :as t]))
+ (:require (cascading.clojure [api :as c]
+ [testing :as t]
+ [parse :as p]))
(:import (cascading.tuple Fields)
(cascading.pipe Pipe)
(cascading.clojure ClojureFilter ClojureMap ClojureMapcat
ClojureAggregator ClojureBuffer Util)))
(deftest test-ns-fn-name-pair
- (let [[ns-name fn-name] (c/ns-fn-name-pair #'str)]
+ (let [[ns-name fn-name] (p/ns-fn-name-pair #'str)]
(is (= "clojure.core" ns-name))
(is (= "str" fn-name))))
@@ -26,12 +28,12 @@
[(inc num1) (inc num2)])
(deftest test-fn-spec-simple
- (let [fs (c/fn-spec #'inc1)]
+ (let [fs (p/fn-spec #'inc1)]
(is (instance? obj-array-class fs))
(is (= '("cascading.clojure.api-test" "inc1") (seq fs)))))
(deftest test-fn-spec-hof
- (let [fs (c/fn-spec [#'incn 3])]
+ (let [fs (p/fn-spec [#'incn 3])]
(is (instance? obj-array-class fs))
(is (= `("cascading.clojure.api-test" "incn" 3) (seq fs)))))
@@ -46,12 +48,12 @@
(is (= [4] (f 1)))))
(deftest test-1-field
- (let [f1 (c/fields "foo")]
+ (let [f1 (p/fields "foo")]
(is (instance? Fields f1))
(is (= '("foo") (seq f1)))))
(deftest test-n-fields
- (let [f2 (c/fields ["foo" "bar"])]
+ (let [f2 (p/fields ["foo" "bar"])]
(is (instance? Fields f2))
(is (= `("foo" "bar") (seq f2)))))
@@ -66,17 +68,17 @@
(is (= "foo" (.getName np)))))
(deftest test-clojure-filter
- (let [fil (ClojureFilter. (c/fn-spec #'odd?))]
+ (let [fil (ClojureFilter. (p/fn-spec #'odd?))]
(is (= false (t/invoke-filter fil [1])))
(is (= true (t/invoke-filter fil [2])))))
(deftest test-clojure-map-one-field
- (let [m1 (ClojureMap. (c/fields "num") (c/fn-spec #'inc-wrapped))
- m2 (ClojureMap. (c/fields "num") (c/fn-spec #'inc))]
+ (let [m1 (ClojureMap. (p/fields "num") (p/fn-spec #'inc-wrapped))
+ m2 (ClojureMap. (p/fields "num") (p/fn-spec #'inc))]
(are [m] (= [[2]] (t/invoke-function m [1])) m1 m2)))
(deftest test-clojure-map-multiple-fields
- (let [m (ClojureMap. (c/fields ["num1" "num2"]) (c/fn-spec #'inc-both))]
+ (let [m (ClojureMap. (p/fields ["num1" "num2"]) (p/fn-spec #'inc-both))]
(is (= [[2 3]] (t/invoke-function m [1 2])))))
(defn iterate-inc-wrapped [num]
@@ -86,14 +88,14 @@
(list (+ num 1) (+ num 2) (+ num 3)))
(deftest test-clojure-mapcat-one-field
- (let [m1 (ClojureMapcat. (c/fields "num") (c/fn-spec #'iterate-inc-wrapped))
- m2 (ClojureMapcat. (c/fields "num") (c/fn-spec #'iterate-inc))]
+ (let [m1 (ClojureMapcat. (p/fields "num") (p/fn-spec #'iterate-inc-wrapped))
+ m2 (ClojureMapcat. (p/fields "num") (p/fn-spec #'iterate-inc))]
(are [m] (= [[2] [3] [4]] (t/invoke-function m [1])) m1 m2)))
(def sum (c/agg + 0))
(deftest test-clojure-aggregator
- (let [a (ClojureAggregator. (c/fields "sum") (c/fn-spec #'sum))]
+ (let [a (ClojureAggregator. (p/fields "sum") (p/fn-spec #'sum))]
(is (= [[6]] (t/invoke-aggregator a [[1] [2] [3]])))))
(defn buff [it]
@@ -104,5 +106,5 @@
; tuples. if we want to return only a single tuple, then we need to wrap the
; tuple in a seq.
(deftest test-clojure-buffer
- (let [a (ClojureBuffer. (c/fields "sum") (c/fn-spec #'buff))]
+ (let [a (ClojureBuffer. (p/fields "sum") (p/fn-spec #'buff))]
(is (= [[2][3][4]] (t/invoke-buffer a [[1] [2] [3]])))))
View
6 test/cascading/clojure/buffer_test.clj
@@ -40,8 +40,8 @@
(in-tuples [["bar" 1] ["bat" 7] ["bar" 3] ["bar" 2] ["bat" 4]])
(fn [in] (-> in
(c/group-by "word")
- (c/buffer [["word" "subcount" "maxword" "maxsubcount"]
- #'maxpairs])))
+ (c/buffer #'maxpairs
+ :fn> ["word" "subcount" "maxword" "maxsubcount"])))
[["bar" 1 "bar" 3]
["bar" 2 "bar" 3]
- ["bat" 4 "bat" 7]]))
+ ["bat" 4 "bat" 7]]))
View
10 test/cascading/clojure/flow_test.clj
@@ -27,9 +27,7 @@
(test-flow
(in-pipes ["x" "y" "foo"])
(in-tuples [[2 3 "blah"] [7 3 "blah"]])
- (fn [in] (-> in (c/map ["x" "y"]
- ["sum" #'+]
- ["sum"])))
+ (fn [in] (-> in (c/map #'+ ["x" "y"] :fn> "sum" :> "sum")))
[[5] [10]]))
(defn extract-key
@@ -41,7 +39,7 @@
(test-flow
(in-pipes ["val" "num"])
(in-tuples [["foo(bar)bat" 1] ["biz(ban)hat" 2]])
- (fn [in] (-> in (c/map "val" #'extract-key ["key" "num"])))
+ (fn [in] (-> in (c/map #'extract-key "val" :> ["key" "num"])))
[["bar" 1] ["ban" 2]]))
(def sum (c/agg + 0))
@@ -52,7 +50,7 @@
(in-tuples [["bar" 1] ["bat" 2] ["bar" 3] ["bar" 2] ["bat" 1]])
(fn [in] (-> in
(c/group-by "word")
- (c/aggregate ["subcount"] ["count" #'sum] ["word" "count"])))
+ (c/aggregate #'sum "subcount" :fn> "count" :> ["word" "count"])))
[["bar" 6] ["bat" 3]]))
(defn transform
@@ -66,7 +64,7 @@
sink (temp-path "sink")]
(let [lines [{"name" "foo" "age" 23} {"name" "bar" "age" 14}]]
(write-lines-in source "source.data" (map json/generate-string lines))
- (let [trans (-> (c/pipe "j") (c/map ["name" "age"] #'transform))
+ (let [trans (-> (c/pipe "j") (c/map #'transform ["name" "age"]))
flow (c/flow
{"j" (c/lfs-tap (c/json-map-line ["name" "age"]) source)}
(c/lfs-tap (c/json-map-line ["up-name" "inc-age"]) sink)
View
17 test/cascading/clojure/parse_test.clj
@@ -1,7 +1,7 @@
(ns cascading.clojure.parse-test
(:use clojure.test)
(:import (cascading.tuple Fields))
- (:use cascading.clojure.parse))
+ (:use [cascading.clojure parse testing]))
(defn example [x] x)
@@ -16,12 +16,21 @@
(deftest parse-everything
(is (= [(fields ["foo"])
- (fields ["bar"])
+ (fields ["bar"])
["cascading.clojure.parse-test" "example"]
(fields ["baz"])]
(extract-obj-array
(parse-args [#'example "foo" :fn> "bar" :> "baz"])))))
+(deftest parse-everything-multiple-ins
+ (is (= [(fields ["foo" "bat"])
+ (fields ["bar"])
+ ["cascading.clojure.parse-test" "example"]
+ (fields ["baz"])]
+ (extract-obj-array
+ (parse-args [#'example ["foo" "bat"]
+ :fn> "bar" :> "baz"])))))
+
(deftest parse-no-input-selectors
(is (= [Fields/ALL
(fields ["bar"])
@@ -40,8 +49,8 @@
(deftest parse-no-input-or-output-or-fn-selectors
(is (= [Fields/ALL
- nil
+ Fields/ARGS
["cascading.clojure.parse-test" "example"]
Fields/RESULTS]
(extract-obj-array
- (parse-args [#'example])))))
+ (parse-args [#'example])))))
View
4 test/cascading/clojure/scenarios.clj
@@ -80,8 +80,8 @@
(in-tuples {"p1" [[0 1 5] [2 1 6] [0 1 7] [2 1 9]]
"p2" [[0 0 1] [2 2 1] [0 2 1] [3 0 1]]})
(fn [{p1 "p1" p2 "p2"}]
- (let [new-left (c/extract p1 ["dualgroup" #'x-and-y])
- new-right (c/extract p2 ["dualgroup" #'y-and-z])]
+ (let [new-left (c/extract p1 #'x-and-y :fn> "dualgroup")
+ new-right (c/extract p2 #'y-and-z :fn> "dualgroup")]
(c/group-by [new-left new-right]
["dualgroup"] ["z"])))
[[0 0 1 "01"] [3 0 1 "01"] [0 1 5 "01"] [0 1 7 "01"]
Please sign in to comment.
Something went wrong with that request. Please try again.