Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

updaing buffer and buffer-test

  • Loading branch information...
commit f523b396aa32bee32700bb2bb2b1bd6ecac90a0b 1 parent c11dcb4
Bradford Cross authored
View
18 src/clj/cascading/clojure/api.clj
@@ -18,7 +18,7 @@
OutputCollector JobConf)
(java.util Properties Map UUID)
(cascading.clojure ClojureFilter ClojureMapcat ClojureMap
- ClojureAggregator Util)
+ ClojureAggregator ClojureBuffer Util)
(clojure.lang Var)
(java.io File)
(java.lang RuntimeException)))
@@ -143,11 +143,27 @@
(Each. previous in-fields
(ClojureMap. func-fields spec) out-fields)))
+(defn agg [f init]
+"a combinator that takes a fn and an init value and returns a reduce aggregator."
+ (fn ([] init)
+ ([x] [x])
+ ([x y] (f x y))))
+
(defn aggregate [#^Pipe previous & args]
(let [[#^Fields in-fields func-fields specs #^Fields out-fields] (parse-args args)]
(Every. previous in-fields
(ClojureAggregator. func-fields specs) out-fields)))
+(defn buffer [#^Pipe previous & args]
+ (let [[#^Fields in-fields func-fields specs #^Fields out-fields] (parse-args args)]
+ (Every. previous in-fields
+ (ClojureBuffer. func-fields specs) out-fields)))
+
+(defn tuple-seq [it]
+"takes Iterator<TupleEntry> and returns seq of tuples coerced to vectors."
+ (clojure.core/map #(Util/coerceFromTuple (.getTuple %))
+ (iterator-seq it)))
+
(defn group-by [#^Pipe previous group-fields]
(GroupBy. previous (fields group-fields)))
View
16 test/cascading/clojure/api_test.clj
@@ -90,22 +90,16 @@
m2 (ClojureMapcat. (c/fields "num") (c/fn-spec #'iterate-inc))]
(are [m] (= [[2] [3] [4]] (t/invoke-function m [1])) m1 m2)))
-(defn agg [f init]
- (fn ([] init)
- ([x] [x])
- ([x y] (f x y))))
-
-(def sum (agg + 0))
-
-(defn buff [it]
- (for [x (iterator-seq it)
- :let [t (Util/coerceFromTuple (.getTuple x))]]
- [(apply + 1 t)]))
+(def sum (c/agg + 0))
(deftest test-clojure-aggregator
(let [a (ClojureAggregator. (c/fields "sum") (c/fn-spec #'sum))]
(is (= [[6]] (t/invoke-aggregator a [[1] [2] [3]])))))
+(defn buff [it]
+ (for [x (c/tuple-seq it)]
+ [(apply + 1 x)]))
+
;;TODO: notice Buffer exects a fn that takes an iterator and returns a seq of tuples. if we want to return only a single tuple, then we need to wrap the tuple in a seq.
(deftest test-clojure-buffer
(let [a (ClojureBuffer. (c/fields "sum") (c/fn-spec #'buff))]
View
30 test/cascading/clojure/buffer_test.clj
@@ -0,0 +1,30 @@
+(ns cascading.clojure.buffer-test
+ (:use clojure.test
+ clojure.contrib.java-utils
+ cascading.clojure.testing
+ cascading.clojure.io)
+ (:import (cascading.tuple Fields)
+ (cascading.pipe Pipe)
+ (cascading.clojure Util ClojureMap))
+ (:require [clj-json :as json])
+ (:require [clojure.contrib.duck-streams :as ds])
+ (:require [clojure.contrib.java-utils :as ju])
+ (:require (cascading.clojure [api :as c])))
+
+(defn maxbuff [it]
+ (letfn [(maxer [max-tuple next-tuple]
+ (if (> (second max-tuple) (second next-tuple))
+ max-tuple
+ next-tuple))]
+ [(reduce maxer (c/tuple-seq it))]))
+
+(deftest buffer-test
+ (test-flow
+ (in-pipes ["word" "subcount"])
+ (in-tuples [["bar" 1] ["bat" 7] ["bar" 3] ["bar" 2] ["bat" 4]])
+ (fn [in] (-> in
+ (c/group-by "word")
+
+ (c/buffer ["word" "subcount"] [["word1" "subcount1"] #'maxbuff]
+ ["word1" "subcount1"])))
+ [["bar" 3] ["bat" 7]]))
View
8 test/cascading/clojure/flow_test.clj
@@ -44,13 +44,7 @@
(fn [in] (-> in (c/map "val" #'extract-key ["key" "num"])))
[["bar" 1] ["ban" 2]]))
-(defn sum
- ([]
- 0)
- ([mem v]
- (+ mem v))
- ([mem]
- [mem]))
+(def sum (c/agg + 0))
(deftest aggreate-test
(test-flow
Please sign in to comment.
Something went wrong with that request. Please try again.