Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
adding tests for map and join flows
  • Loading branch information
bradford committed Feb 7, 2010
1 parent e1187be commit 4bae516
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 11 deletions.
40 changes: 29 additions & 11 deletions src/clj/cascading/clojure/api.clj
Expand Up @@ -11,11 +11,11 @@
(cascading.pipe.cogroup InnerJoin)
(cascading.scheme Scheme)
(cascading.tap Hfs Lfs Tap)
(java.util Properties Map)
(java.util Properties Map UUID)
(cascading.clojure ClojureFilter ClojureMapcat ClojureMap
ClojureAggregator)
(clojure.lang Var)
(java.util UUID)))
(java.lang RuntimeException)))

(defn ns-fn-name-pair [v]
(let [m (meta v)]
Expand Down Expand Up @@ -124,29 +124,47 @@
(defn count [#^Pipe previous #^String count-fields]
(Every. previous (Count. (fields count-fields))))

(defn inner-join [[#^Pipe lhs #^Pipe rhs] [lhs-fields rhs-fields]]
(defn inner-join
([[#^Pipe lhs #^Pipe rhs] [lhs-fields rhs-fields]]
(CoGroup. lhs (fields lhs-fields) rhs (fields rhs-fields) (InnerJoin.)))
([[#^Pipe lhs #^Pipe rhs] [lhs-fields rhs-fields] declared-fields]
(CoGroup. lhs (fields lhs-fields) rhs (fields rhs-fields) (fields declared-fields) (InnerJoin.))))

(defn select [#^Pipe previous keep-fields]
(Each. previous (fields keep-fields) (Identity.)))

(defn text-line-scheme [field-names]
(TextLine. (fields field-names) (fields field-names)))
(defn text-line
([] (TextLine. Fields/FIRST))
([field-names]
(TextLine. (fields field-names) (fields field-names))))

(defn hfs-tap [#^Scheme scheme #^String path]
(Hfs. scheme path))
(defn path [x] (if (string? x) x (.getAbsolutePath x)))

(defn flow [jar-path config #^Map source-map #^Tap sink #^Pipe pipe]
(defn hfs-tap [#^Scheme scheme path-or-file]
(Hfs. scheme (path path-or-file)))

(defn lfs-tap [#^Scheme scheme path-or-file]
(Lfs. scheme (path path-or-file)))

(defn flow
([#^Map source-map #^Tap sink #^Pipe pipe]
(flow nil {} source-map sink pipe))
([jar-path config #^Map source-map #^Tap sink #^Pipe pipe]
(let [props (Properties.)]
(when jar-path
(FlowConnector/setApplicationJarPath props jar-path))
(doseq [[k v] config]
(.setProperty props k v))
(let [flow-connector (FlowConnector. props)]
(.connect flow-connector source-map sink pipe))))
(.connect flow-connector source-map sink pipe)))))

(defn write-dot [#^Flow flow #^String path]
(.writeDOT flow path))

(defn complete [#^Flow flow]
(.complete flow))
(defn exec [#^Flow flow]
(try
(doto flow .start .complete)
(catch cascading.flow.PlannerException e
(.writeDOT e "exception.dot")
(throw (RuntimeException. "see exception.dot to visualize your broken plan." e)))))

37 changes: 37 additions & 0 deletions src/clj/cascading/clojure/io.clj
@@ -0,0 +1,37 @@
(ns cascading.clojure.io
(:import java.io.File)
(:use clojure.contrib.java-utils)
(:use clojure.contrib.duck-streams))

(defn temp-path [sub-path]
(file (System/getProperty "java.io.tmpdir") sub-path))

(defn temp-dir
"1) creates a directory in System.getProperty(\"java.io.tmpdir\")
2) calls tempDir.deleteOn Exit() so the file is deleted by the jvm.
reference: ;http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4735419
deleteOnExit is last resort cleanup on jvm exit.
"
[sub-path]
(let [tmp-dir (temp-path sub-path)]
(if (not (.exists tmp-dir))
(.mkdir tmp-dir))
(.deleteOnExit tmp-dir)
tmp-dir))

(defn delete-all [bindings]
"delete-file-recursively is preemptive delete on exiting the code block for repl and tests run in the same process."
(doall (for [file (reverse
(map second
(partition 2 bindings)))
:when (.exists file)]
(delete-file-recursively file))))

(defmacro with-tmp-files [bindings & body]
`(let ~bindings
(try ~@body
(finally (delete-all ~bindings)))))

(defn write-lines-in [root filename lines]
(write-lines
(file (.getAbsolutePath root) filename) lines))
65 changes: 65 additions & 0 deletions test/cascading/clojure/join_tests.clj
@@ -0,0 +1,65 @@
(ns cascading.clojure.join-tests
(:use clojure.test)
(:use cascading.clojure.io)
(:import (cascading.tuple Fields)
(cascading.pipe Pipe)
(cascading.clojure Util ClojureMap))
(:require (cascading.clojure [api :as c])))

(def get-name (comp :name read-string))

;;TODO: this is some funky shit - surely there is a better way.
(defn as-vector
[entry]
(into []
(.split
(second
(iterator-seq
(.iterator (.getTuple entry))))
"\t")))

(deftest map-test
(with-tmp-files [source (temp-dir "source")
sink (temp-path "sink")]
(let [lines [{:name "foo" :a 1} {:name "bar" :b 2}]
_ (write-lines-in source "source.data" lines)
extract (c/map (c/pipe "m") ["name" #'get-name] ["name" "val"])
map-flow (c/flow
{"m" (c/lfs-tap (c/text-line ["val"]) source)}
(c/lfs-tap (c/text-line ["name" "val"]) sink)
extract)
out (.openSink (c/exec map-flow))]
(is (= ["foo" (pr-str {:name "foo" :a 1})]
(as-vector (.next out))))
(is (= ["bar" (pr-str {:name "bar" :b 2})]
(as-vector (.next out))))
(is (not (.hasNext out))))))

;;taps-map requires named pipes to be the same as key names identifying pipes
;;must be a way to encapsulated all this plumbing.
(deftest inner-join-test
(with-tmp-files [rhs (temp-dir "rhs")
lhs (temp-dir "lhs")
sink (temp-path "sink")]
(let [rhs-lines [{:name "foo" :a 1} {:name "bar" :b 2}]
lhs-lines [{:name "foo" :a 5} {:name "bar" :b 6}]
_ (write-lines-in rhs "rhs.data" rhs-lines)
_ (write-lines-in lhs "lhs.data" lhs-lines)
lhs-extract (c/map (c/pipe "lhs") ["name" #'get-name] ["name" "val"])
rhs-extract (c/map (c/pipe "rhs") ["name" #'get-name] ["name" "val"])
join-pipe (c/inner-join
[lhs-extract rhs-extract]
[["name"] ["name"]]
["name1" "val1" "name2" "val2"])
keep-only (c/select join-pipe ["val1" "val2"])
join-flow (c/flow
{"rhs" (c/lfs-tap (c/text-line ["val"]) rhs)
"lhs" (c/lfs-tap (c/text-line ["val"]) lhs)}
(c/lfs-tap (c/text-line ["val1" "val2"]) sink)
join-pipe)
out (.openSink (c/exec join-flow))]
(is (= [(pr-str {:name "bar" :b 6}) (pr-str {:name "bar" :b 2})]
(as-vector (.next out))))
(is (= [(pr-str {:name "foo" :a 5}) (pr-str {:name "foo" :a 1})]
(as-vector (.next out))))
(is (not (.hasNext out))))))

0 comments on commit 4bae516

Please sign in to comment.