Permalink
Browse files

Use the generated aggregators

  • Loading branch information...
johnynek committed Sep 18, 2013
1 parent 20165b2 commit d7ed7d747f38939ce16d8b5d7314e43762ada284
@@ -1,13 +1,13 @@
package com.twitter.algebird
+/**
+ * Aggregators compose well.
+ *
+ * To create a parallel aggregator that operates on a single
+ * input in parallel, use:
+ * GeneratedTupleAggregator.from2((agg1, agg2))
+ */
object Aggregator extends java.io.Serializable {
- /** We could autogenerate 22 of these, and if find ourselves using them we should.
- * in the mean time, if you need 3, then compose twice: ((C1,C2),C3) and
- * use andThenPresent to map to (C1, C2, C3)
- */
- def compose[A,B1,C1,B2,C2](agg1: Aggregator[A,B1,C1], agg2: Aggregator[A,B2,C2]): Aggregator[A,(B1,B2),(C1,C2)] =
- new Aggregator2(agg1, agg2)
-
/** Using Aggregator.prepare,present you can add to this aggregator
*/
def fromReduce[T](red: (T,T) => T): Aggregator[T,T,T] = new Aggregator[T,T,T] {
@@ -33,19 +33,6 @@ object Aggregator extends java.io.Serializable {
}
}
-/** We make a non-anonymous class with public access to the underlying aggregators as
- * they may be useful for a runtime optimization
- */
-class Aggregator2[A,B1,B2,C1,C2](val agg1: Aggregator[A,B1,C1], val agg2: Aggregator[A,B2,C2]) extends Aggregator[A,(B1,B2),(C1,C2)] {
- def prepare(input : A) = (agg1.prepare(input), agg2.prepare(input))
- def reduce(l : (B1,B2), r : (B1,B2)) = {
- val b1next = agg1.reduce(l._1, r._1)
- val b2next = agg2.reduce(l._2, r._2)
- (b1next, b2next)
- }
- def present(reduction : (B1,B2)) = (agg1.present(reduction._1), agg2.present(reduction._2))
-}
-
trait Aggregator[-A,B,+C] extends Function1[TraversableOnce[A], C] with java.io.Serializable { self =>
def prepare(input : A) : B
def reduce(l : B, r : B) : B
@@ -45,7 +45,7 @@ object AggregatorLaws extends Properties("Aggregator") {
}
property("composing two Aggregators is correct") = forAll { (in: List[Int], ag1: Aggregator[Int,Int,Int], ag2:
Aggregator[Int,Double,String]) =>
- val c = Aggregator.compose(ag1, ag2)
+ val c = GeneratedTupleAggregator.from2(ag1, ag2)
in.isEmpty || c(in) == (ag1(in), ag2(in))
}
}
@@ -3,7 +3,7 @@ package com.twitter.algebird
import org.specs._
class TupleAggregatorsTest extends Specification {
- import GeneratedTupleAggregators._
+ import GeneratedTupleAggregator._
val data = List(1, 3, 2, 0, 5, 6)
val MinAgg = new Aggregator[Int, Int, Int] {
@@ -8,9 +8,9 @@ object GenTupleAggregators {
IO.write(place,
"""package com.twitter.algebird
-object GeneratedTupleAggregators extends GeneratedTupleAggregators
+object GeneratedTupleAggregator extends GeneratedTupleAggregator
-trait GeneratedTupleAggregators {
+trait GeneratedTupleAggregator {
""" + genAggregators(22) + "\n" + "}")
Seq(place)
@@ -29,15 +29,15 @@ trait GeneratedTupleAggregators {
val tupleTs = "Tuple%d[%s]".format(i, ts)
"""
-implicit def Tuple%dAggregator[A, %s, %s](aggs: Tuple%d[%s]): Aggregator[A, %s, %s] = {
+implicit def from%d[A, %s, %s](aggs: Tuple%d[%s]): Aggregator[A, %s, %s] = {
new Aggregator[A, %s, %s] {
def prepare(v: A) = (%s)
def reduce(v1: %s, v2: %s) = (%s)
def present(v: %s) = (%s)
}
}""".format(i, vs, ts, i, aggs, tupleVs, tupleTs,
- tupleVs, tupleTs,
- prepares,
+ tupleVs, tupleTs,
+ prepares,
tupleVs, tupleVs, reduces,
tupleVs, present)
}).mkString("\n")

0 comments on commit d7ed7d7

Please sign in to comment.