Permalink
Browse files

merge w/ clj-sys/master

  • Loading branch information...
2 parents 467cd48 + 891a233 commit d17131965590a545eed403ebd0ae73f374313f13 @waratuman committed Mar 9, 2010
View
@@ -1,10 +1,8 @@
-cascading-clojure
-=================
+# cascading-clojure
A Clojure library for [Cascading](http://cascading.org).
-Hacking
--------
+## Hacking
Get [Leiningen](http://github.com/technomancy/leiningen) 1.1.0.
View
@@ -1,9 +0,0 @@
-foo 2
-bar 1
-bat 3
-foo 3
-bar 5
-bat 1
-biz 7
-bar 2
-foo 3
View
@@ -1,2 +0,0 @@
-foo bar bat
-bat biz bar
View
@@ -1 +0,0 @@
-bar biz foo
View
@@ -7,6 +7,8 @@
[cascading/cascading "1.0.17-SNAPSHOT"
:exclusions [javax.mail/mail janino/janino]]
[clj-serializer "0.1.0-SNAPSHOT"]
- [clj-json "0.2.0-SNAPSHOT"]]
- :dev-dependencies [[lein-javac "0.0.2-SNAPSHOT"]
- [org.clojure/swank-clojure "1.0"]])
+ [clj-json "0.2.0"]]
+ :dev-dependencies [[org.clojars.mmcgrana/lein-javac "0.1.0"]
+ [swank-clojure "1.1.0"]]
+ :namespaces [cascading.clojure.api
+ cascading.clojure.testing])
@@ -1,11 +1,13 @@
(ns cascading.clojure.api
(:refer-clojure :exclude (count first filter mapcat map))
- (:use [clojure.contrib.seq-utils :only [find-first indexed]])
- (:require (clj-json [core :as json]))
+ (:use [clojure.contrib.seq-utils :only (find-first indexed)]
+ (cascading.clojure parse))
+ (:require [clj-json.core :as json])
(:import (cascading.tuple Tuple TupleEntry Fields)
(cascading.scheme TextLine)
(cascading.flow Flow FlowConnector)
(cascading.operation Identity)
+ (cascading.operation.filter Limit)
(cascading.operation.regex RegexGenerator RegexFilter)
(cascading.operation.aggregator First Count)
(cascading.pipe Pipe Each Every GroupBy CoGroup)
@@ -23,106 +25,16 @@
(java.io File)
(java.lang RuntimeException)))
-(defn ns-fn-name-pair [v]
- (let [m (meta v)]
- [(str (:ns m)) (str (:name m))]))
-
-(defn fn-spec [v-or-coll]
- "v-or-coll => var or [var & params]
- Returns an Object array that is used to represent a Clojure function.
- If the argument is a var, the array represents that function.
- If the argument is a coll, the array represents the function returned
- by applying the first element, which should be a var, to the rest of the
- elements."
- (cond
- (var? v-or-coll)
- (into-array Object (ns-fn-name-pair v-or-coll))
- (coll? v-or-coll)
- (into-array Object
- (concat
- (ns-fn-name-pair (clojure.core/first v-or-coll))
- (next v-or-coll)))
- :else
- (throw (IllegalArgumentException. (str v-or-coll)))))
-
-(defn- collectify [obj]
- (if (sequential? obj) obj [obj]))
-
-(defn fields
- {:tag Fields}
- [obj]
- (if (or (nil? obj) (instance? Fields obj))
- obj
- (Fields. (into-array String (collectify obj)))))
-
-(defn fields-array
- [fields-seq]
- (into-array Fields (clojure.core/map fields fields-seq)))
-
(defn pipes-array
[pipes]
(into-array Pipe pipes))
-(defn- fields-obj? [obj]
- "Returns true for a Fileds instance, a string, or an array of strings."
- (or
- (instance? Fields obj)
- (string? obj)
- (and (sequential? obj) (every? string? obj))))
-
-(defn- idx-of-first [coll pred]
- (clojure.core/first (find-first #(pred (last %)) (indexed coll))))
-
-; in-fields: subset of fields from incoming pipe that are passed to function
-; defaults to Fields/ALL
-; func-fields: fields declared to be returned by the function
-; defaults to Fields/ARGS
-; out-fields: subset of (union in-fields func-fields) that flow out of the pipe
-; defaults to Fields/RESULTS
-
-; Regarding the operation's fields declarations, if you resort to
-; Fields/UNKNOWN, you will lose all your field names. You should deafult the
-; fields declarations to Fields/ARGS. This means you emit the same fields that
-; you take in via the input selector - a common scenario. If you want to add new
-; fields or change them, you must explicitly declare them. Regarding input
-; selectors, Fields/All is the logical default. Chris says that - regarding the
-; outputselector - we might tend to use Fields/RESULTS more for each and
-; Fields/All more for every, but perhaps fields/RESULTS is the best fit for out
-; way of thinking.
-(defn- parse-func [func-obj]
- "func-obj =>
- #'func
- [#'func]
- [override-fields #'func]
- [#'func & params]
- [override-fields #'func & params]"
- (let [func-obj (collectify func-obj)
- i (idx-of-first func-obj var?)]
- (assert (<= i 1))
- (let [spec (fn-spec (drop i func-obj))
- func-var (nth func-obj i)
- func-fields (or (and (= i 1) (clojure.core/first func-obj))
- ((meta func-var) :fields))
- function-fields (if func-fields (fields func-fields) Fields/ARGS)]
- [function-fields spec])))
-
-(defn- parse-args
- "arr =>
- [func-obj]
- [in-fields func-obj]
- [in-fields func-obj out-fields]
- [func-obj out-fields]"
- [arr]
- (let [i (idx-of-first arr (complement fields-obj?))]
- (assert (<= i 1))
- (let [in-fields (if (= i 1)
- (fields (clojure.core/first arr))
- Fields/ALL)
- [func-fields spec] (parse-func (nth arr i))
- out-fields (if (< i (dec (clojure.core/count arr)))
- (fields (last arr))
- Fields/RESULTS)]
- [in-fields func-fields spec out-fields])))
+(defn as-pipes
+ [pipe-or-pipes]
+ (pipes-array
+ (if (instance? Pipe pipe-or-pipes)
+ [pipe-or-pipes]
+ pipe-or-pipes)))
(defn- uuid []
(str (UUID/randomUUID)))
@@ -136,19 +48,25 @@
(Pipe. name)))
(defn filter [#^Pipe previous & args]
- (let [[#^Fields in-fields _ spec _] (parse-args args)]
- (Each. previous in-fields
- (ClojureFilter. spec))))
+ (let [opts (parse-args args)]
+ (Each. previous (:< opts)
+ (ClojureFilter. (:fn-spec opts)))))
(defn mapcat [#^Pipe previous & args]
- (let [[#^Fields in-fields func-fields spec #^Fields out-fields] (parse-args args)]
- (Each. previous in-fields
- (ClojureMapcat. func-fields spec) out-fields)))
+ (let [opts (parse-args args)]
+ (Each. previous (:< opts)
+ (ClojureMapcat. (:fn> opts) (:fn-spec opts)) (:> opts))))
(defn map [#^Pipe previous & args]
- (let [[#^Fields in-fields func-fields spec #^Fields out-fields] (parse-args args)]
- (Each. previous in-fields
- (ClojureMap. func-fields spec) out-fields)))
+ (let [opts (parse-args args)]
+ (Each. previous (:< opts)
+ (ClojureMap. (:fn> opts) (:fn-spec opts)) (:> opts))))
+
+(defn extract [#^Pipe previous & args]
+ "A map operation that extracts a new field, thus returning Fields/ALL."
+ (let [opts (parse-args args)]
+ (Each. previous (:< opts)
+ (ClojureMap. (:fn> opts) (:fn-spec opts)) Fields/ALL)))
(defn agg [f init]
"A combinator that takes a fn and an init value and returns a reduce aggregator."
@@ -157,21 +75,22 @@
([x y] (f x y))))
(defn aggregate [#^Pipe previous & args]
- (let [[#^Fields in-fields func-fields specs #^Fields out-fields] (parse-args args)]
- (Every. previous in-fields
- (ClojureAggregator. func-fields specs) out-fields)))
+ (let [opts (parse-args args)]
+ (Every. previous (:< opts)
+ (ClojureAggregator. (:fn> opts) (:fn-spec opts)) (:> opts))))
(defn buffer [#^Pipe previous & args]
- (let [[#^Fields in-fields func-fields specs #^Fields out-fields] (parse-args args)]
- (Every. previous in-fields
- (ClojureBuffer. func-fields specs) out-fields)))
-
-(defn tuple-seq [it]
- "Takes Iterator<TupleEntry> and returns seq of tuples coerced to vectors."
- (clojure.core/map #(Util/coerceFromTuple (.getTuple %)) (iterator-seq it)))
-
-(defn group-by [#^Pipe previous group-fields]
- (GroupBy. previous (fields group-fields)))
+ (let [opts (parse-args args)]
+ (Every. previous (:< opts)
+ (ClojureBuffer. (:fn> opts) (:fn-spec opts)) (:> opts))))
+
+(defn group-by
+ ([previous group-fields]
+ (GroupBy. (as-pipes previous) (fields group-fields)))
+ ([previous group-fields sort-fields]
+ (GroupBy. (as-pipes previous) (fields group-fields) (fields sort-fields)))
+ ([previous group-fields sort-fields reverse-order]
+ (GroupBy. (as-pipes previous) (fields group-fields) (fields sort-fields) reverse-order)))
(defn first [#^Pipe previous in-fields]
(Every. previous (fields in-fields) (First.)))
@@ -180,6 +99,10 @@
(Every. previous
(Count. (fields count-fields))))
+(defn- fields-array
+ [fields-seq]
+ (into-array Fields (clojure.core/map fields fields-seq)))
+
(defn co-group
[pipes-seq fields-seq declared-fields joiner]
(CoGroup.
@@ -284,11 +207,10 @@
(let [props (Properties.)]
(when jar-path
(FlowConnector/setApplicationJarPath props jar-path))
+ (.setProperty props "mapred.used.genericoptionsparser" "true")
+ (.setProperty props "cascading.flow.job.pollinginterval" "200")
(doseq [[k v] config]
(.setProperty props k v))
- (.setProperty props "mapred.used.genericoptionsparser" "true")
- (.setProperty props "cascading.serialization.tokens"
- "130=cascading.clojure.ClojureWrapper")
(let [flow-connector (FlowConnector. props)]
(try
(.connect flow-connector source-map sink pipe)
@@ -0,0 +1,60 @@
+(ns cascading.clojure.parse
+ (:import (cascading.tuple Tuple TupleEntry Fields))
+ (:use [clojure.contrib.seq-utils :only [find-first indexed]]))
+
+(defn- collectify [obj]
+ (if (sequential? obj) obj [obj]))
+
+(defn- map-vals [f m]
+ (into {} (map (fn [[k v]] [k (f v)]) m)))
+
+(defn fields
+ {:tag Fields}
+ [obj]
+ (if (or (nil? obj) (instance? Fields obj))
+ obj
+ (Fields. (into-array String (collectify obj)))))
+
+(defn- ns-fn-name-pair [fn-var]
+ (let [m (meta fn-var)]
+ [(str (:ns m)) (str (:name m))]))
+
+(defn- parse-fn-spec* [v-or-c]
+ (cond
+ (var? v-or-c)
+ [v-or-c (ns-fn-name-pair v-or-c)]
+ (coll? v-or-c)
+ [(first v-or-c)
+ (concat
+ (ns-fn-name-pair (first v-or-c))
+ (next v-or-c))]
+ :else
+ (throw (IllegalArgumentException. (str v-or-c)))))
+
+(defn parse-fn-spec
+ "fn-var-or-coll => var or [var & params]
+ Returns a coll of Objects used to serializably describe the function.
+ If the argument is a Var, this coll represents that function directly.
+ If the argument is a coll, this coll represents the function returned
+ by applying its first element, which should be a Var, to the rest of its
+ elements."
+ [fn-var-or-coll]
+ (second (parse-fn-spec* fn-var-or-coll)))
+
+(def opt-defaults
+ {:< Fields/ALL
+ :fn> Fields/ARGS
+ :> Fields/RESULTS})
+
+(defn parse-args
+ "arr => [fn-var-or-coll (:< in-fields)? (:fn> func-fields)? (:> out-fields)?]
+ Returns a map with keys :fn-spec, :<, :fn>, and :>."
+ [arr]
+ (let [[fn-var-or-coll & opts] arr
+ [fn-var fn-spec] (parse-fn-spec* fn-var-or-coll)
+ raw (merge
+ opt-defaults
+ (select-keys (meta fn-var) [:fn>])
+ (apply hash-map opts))
+ as-fields (map-vals fields raw)]
+ (assoc as-fields :fn-spec fn-spec)))
@@ -78,12 +78,12 @@
(read-string line))
(defn- serialize-tuple
- {:fields "line"}
+ {:fn> "line"}
[tuple]
(pr-str tuple))
(defn- serialize-vals
- {:fields "line"}
+ {:fn> "line"}
[& vals]
(pr-str (vec vals)))
@@ -96,15 +96,14 @@
(defn in-pipe [in-label in-fields]
(-> (c/pipe in-label)
- (c/map [in-fields
- #'deserialize-tuple])))
+ (c/map #'deserialize-tuple :fn> in-fields)))
(defn in-pipes [fields-spec]
(if (not (map? fields-spec))
(in-pipe "in" fields-spec)
(mash (fn [[in-label one-in-fields]]
- [in-label (in-pipe in-label one-in-fields)])
- fields-spec)))
+ [in-label (in-pipe in-label one-in-fields)])
+ fields-spec)))
(defn in-tuples [tuples-spec]
(if (map? tuples-spec)
@@ -116,17 +115,17 @@
(with-tmp-files [source-dir-path (temp-dir "source")
sink-path (temp-path "sink")]
(doseq [[in-label in-tuples] in-tuples-spec]
- (write-lines-in source-dir-path in-label
- (map serialize-tuple in-tuples)))
- (let [assembly (-> in-pipes-spec
- assembler
- (c/map #'serialize-vals))
- source-tap-map (mash (fn [[in-label _]]
- [in-label
- (c/lfs-tap (c/text-line "line")
- (file source-dir-path in-label))])
- in-tuples-spec)
- sink-tap (c/lfs-tap (c/text-line "line") sink-path)
- flow (c/flow source-tap-map sink-tap assembly)
- out-tuples (line-sink-seq (.openSink (c/exec flow)))]
+ (write-lines-in source-dir-path in-label
+ (map serialize-tuple in-tuples)))
+ (let [assembly (-> in-pipes-spec
+ assembler
+ (c/map #'serialize-vals))
+ source-tap-map (mash (fn [[in-label _]]
+ [in-label
+ (c/lfs-tap (c/text-line "line")
+ (file source-dir-path in-label))])
+ in-tuples-spec)
+ sink-tap (c/lfs-tap (c/text-line "line") sink-path)
+ flow (c/flow source-tap-map sink-tap assembly)
+ out-tuples (line-sink-seq (.openSink (c/exec flow)))]
(is (= expected-out-tuples out-tuples))))))
Oops, something went wrong.

0 comments on commit d171319

Please sign in to comment.