From 4bae51639c972ad30f6fc31c84022d72c7121134 Mon Sep 17 00:00:00 2001 From: Bradford Cross Date: Sat, 6 Feb 2010 22:07:06 -0800 Subject: [PATCH] adding tests for map and join flows --- src/clj/cascading/clojure/api.clj | 40 ++++++++++++----- src/clj/cascading/clojure/io.clj | 37 +++++++++++++++ test/cascading/clojure/join_tests.clj | 65 +++++++++++++++++++++++++++ 3 files changed, 131 insertions(+), 11 deletions(-) create mode 100644 src/clj/cascading/clojure/io.clj create mode 100644 test/cascading/clojure/join_tests.clj diff --git a/src/clj/cascading/clojure/api.clj b/src/clj/cascading/clojure/api.clj index c392cf2..6f4ae35 100644 --- a/src/clj/cascading/clojure/api.clj +++ b/src/clj/cascading/clojure/api.clj @@ -11,11 +11,11 @@ (cascading.pipe.cogroup InnerJoin) (cascading.scheme Scheme) (cascading.tap Hfs Lfs Tap) - (java.util Properties Map) + (java.util Properties Map UUID) (cascading.clojure ClojureFilter ClojureMapcat ClojureMap ClojureAggregator) (clojure.lang Var) - (java.util UUID))) + (java.lang RuntimeException))) (defn ns-fn-name-pair [v] (let [m (meta v)] @@ -124,29 +124,47 @@ (defn count [#^Pipe previous #^String count-fields] (Every. previous (Count. (fields count-fields)))) -(defn inner-join [[#^Pipe lhs #^Pipe rhs] [lhs-fields rhs-fields]] +(defn inner-join + ([[#^Pipe lhs #^Pipe rhs] [lhs-fields rhs-fields]] (CoGroup. lhs (fields lhs-fields) rhs (fields rhs-fields) (InnerJoin.))) + ([[#^Pipe lhs #^Pipe rhs] [lhs-fields rhs-fields] declared-fields] + (CoGroup. lhs (fields lhs-fields) rhs (fields rhs-fields) (fields declared-fields) (InnerJoin.)))) (defn select [#^Pipe previous keep-fields] (Each. previous (fields keep-fields) (Identity.))) -(defn text-line-scheme [field-names] - (TextLine. (fields field-names) (fields field-names))) +(defn text-line + ([] (TextLine. Fields/FIRST)) + ([field-names] + (TextLine. (fields field-names) (fields field-names)))) -(defn hfs-tap [#^Scheme scheme #^String path] - (Hfs. scheme path)) +(defn path [x] (if (string? x) x (.getAbsolutePath x))) -(defn flow [jar-path config #^Map source-map #^Tap sink #^Pipe pipe] +(defn hfs-tap [#^Scheme scheme path-or-file] + (Hfs. scheme (path path-or-file))) + +(defn lfs-tap [#^Scheme scheme path-or-file] + (Lfs. scheme (path path-or-file))) + +(defn flow +([#^Map source-map #^Tap sink #^Pipe pipe] + (flow nil {} source-map sink pipe)) +([jar-path config #^Map source-map #^Tap sink #^Pipe pipe] (let [props (Properties.)] (when jar-path (FlowConnector/setApplicationJarPath props jar-path)) (doseq [[k v] config] (.setProperty props k v)) (let [flow-connector (FlowConnector. props)] - (.connect flow-connector source-map sink pipe)))) + (.connect flow-connector source-map sink pipe))))) (defn write-dot [#^Flow flow #^String path] (.writeDOT flow path)) -(defn complete [#^Flow flow] - (.complete flow)) +(defn exec [#^Flow flow] + (try + (doto flow .start .complete) + (catch cascading.flow.PlannerException e + (.writeDOT e "exception.dot") + (throw (RuntimeException. "see exception.dot to visualize your broken plan." e))))) + diff --git a/src/clj/cascading/clojure/io.clj b/src/clj/cascading/clojure/io.clj new file mode 100644 index 0000000..38c6aa7 --- /dev/null +++ b/src/clj/cascading/clojure/io.clj @@ -0,0 +1,37 @@ +(ns cascading.clojure.io + (:import java.io.File) + (:use clojure.contrib.java-utils) + (:use clojure.contrib.duck-streams)) + +(defn temp-path [sub-path] + (file (System/getProperty "java.io.tmpdir") sub-path)) + +(defn temp-dir +"1) creates a directory in System.getProperty(\"java.io.tmpdir\") + 2) calls tempDir.deleteOn Exit() so the file is deleted by the jvm. + reference: ;http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4735419 + deleteOnExit is last resort cleanup on jvm exit. +" + [sub-path] + (let [tmp-dir (temp-path sub-path)] + (if (not (.exists tmp-dir)) + (.mkdir tmp-dir)) + (.deleteOnExit tmp-dir) + tmp-dir)) + +(defn delete-all [bindings] +"delete-file-recursively is preemptive delete on exiting the code block for repl and tests run in the same process." + (doall (for [file (reverse + (map second + (partition 2 bindings))) + :when (.exists file)] + (delete-file-recursively file)))) + +(defmacro with-tmp-files [bindings & body] + `(let ~bindings + (try ~@body + (finally (delete-all ~bindings))))) + +(defn write-lines-in [root filename lines] + (write-lines + (file (.getAbsolutePath root) filename) lines)) diff --git a/test/cascading/clojure/join_tests.clj b/test/cascading/clojure/join_tests.clj new file mode 100644 index 0000000..4722470 --- /dev/null +++ b/test/cascading/clojure/join_tests.clj @@ -0,0 +1,65 @@ +(ns cascading.clojure.join-tests + (:use clojure.test) + (:use cascading.clojure.io) + (:import (cascading.tuple Fields) + (cascading.pipe Pipe) + (cascading.clojure Util ClojureMap)) + (:require (cascading.clojure [api :as c]))) + +(def get-name (comp :name read-string)) + +;;TODO: this is some funky shit - surely there is a better way. +(defn as-vector + [entry] + (into [] + (.split + (second + (iterator-seq + (.iterator (.getTuple entry)))) + "\t"))) + +(deftest map-test + (with-tmp-files [source (temp-dir "source") + sink (temp-path "sink")] + (let [lines [{:name "foo" :a 1} {:name "bar" :b 2}] + _ (write-lines-in source "source.data" lines) + extract (c/map (c/pipe "m") ["name" #'get-name] ["name" "val"]) + map-flow (c/flow + {"m" (c/lfs-tap (c/text-line ["val"]) source)} + (c/lfs-tap (c/text-line ["name" "val"]) sink) + extract) + out (.openSink (c/exec map-flow))] + (is (= ["foo" (pr-str {:name "foo" :a 1})] + (as-vector (.next out)))) + (is (= ["bar" (pr-str {:name "bar" :b 2})] + (as-vector (.next out)))) + (is (not (.hasNext out)))))) + +;;taps-map requires named pipes to be the same as key names identifying pipes +;;must be a way to encapsulated all this plumbing. +(deftest inner-join-test + (with-tmp-files [rhs (temp-dir "rhs") + lhs (temp-dir "lhs") + sink (temp-path "sink")] + (let [rhs-lines [{:name "foo" :a 1} {:name "bar" :b 2}] + lhs-lines [{:name "foo" :a 5} {:name "bar" :b 6}] + _ (write-lines-in rhs "rhs.data" rhs-lines) + _ (write-lines-in lhs "lhs.data" lhs-lines) + lhs-extract (c/map (c/pipe "lhs") ["name" #'get-name] ["name" "val"]) + rhs-extract (c/map (c/pipe "rhs") ["name" #'get-name] ["name" "val"]) + join-pipe (c/inner-join + [lhs-extract rhs-extract] + [["name"] ["name"]] + ["name1" "val1" "name2" "val2"]) + keep-only (c/select join-pipe ["val1" "val2"]) + join-flow (c/flow + {"rhs" (c/lfs-tap (c/text-line ["val"]) rhs) + "lhs" (c/lfs-tap (c/text-line ["val"]) lhs)} + (c/lfs-tap (c/text-line ["val1" "val2"]) sink) + join-pipe) + out (.openSink (c/exec join-flow))] + (is (= [(pr-str {:name "bar" :b 6}) (pr-str {:name "bar" :b 2})] + (as-vector (.next out)))) + (is (= [(pr-str {:name "foo" :a 5}) (pr-str {:name "foo" :a 1})] + (as-vector (.next out)))) + (is (not (.hasNext out))))))