Skip to content

Commit

Permalink
Merge pull request #407 from avibryant/avi-eventually-aggregator
Browse files Browse the repository at this point in the history
EventuallyAggregator and variants
  • Loading branch information
johnynek committed Apr 13, 2015
2 parents 0cc3b5a + 76ea354 commit 062f560
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 0 deletions.
35 changes: 35 additions & 0 deletions algebird-core/src/main/scala/com/twitter/algebird/Eventually.scala
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,38 @@ class EventuallyRing[E, O](convert: O => E)(mustConvert: O => Boolean)(implicit
}

}

trait AbstractEventuallyAggregator[A, E, O, C]
extends Aggregator[A, Either[E, O], C] {
def prepare(a: A) = Right(rightAggregator.prepare(a))
def present(b: Either[E, O]) = b match {
case Right(o) => rightAggregator.present(o)
case Left(e) => presentLeft(e)
}

def presentLeft(e: E): C

def convert(o: O): E
def mustConvert(o: O): Boolean

def leftSemigroup: Semigroup[E]
def rightAggregator: Aggregator[A, O, C]
}

trait EventuallyAggregator[A, E, O, C]
extends AbstractEventuallyAggregator[A, E, O, C] {

//avoid init order issues and cyclical references
@transient lazy val semigroup =
new EventuallySemigroup[E, O](convert)(mustConvert)(leftSemigroup, rightAggregator.semigroup)
}

trait EventuallyMonoidAggregator[A, E, O, C]
extends AbstractEventuallyAggregator[A, E, O, C]
with MonoidAggregator[A, Either[E, O], C] {

def rightAggregator: MonoidAggregator[A, O, C]

@transient lazy val monoid =
new EventuallyMonoid[E, O](convert)(mustConvert)(leftSemigroup, rightAggregator.monoid)
}
Original file line number Diff line number Diff line change
Expand Up @@ -572,3 +572,15 @@ case class HyperLogLogAggregator(val hllMonoid: HyperLogLogMonoid) extends Monoi
def prepare(value: Array[Byte]) = monoid.create(value)
def present(hll: HLL) = hll
}

case class SetSizeAggregator[A](hllBits: Int, maxSetSize: Int = 10)(implicit toBytes: A => Array[Byte])
extends EventuallyMonoidAggregator[A, HLL, Set[A], Long] {

def presentLeft(hll: HLL) = hll.approximateSize.estimate

def mustConvert(set: Set[A]) = set.size > maxSetSize
def convert(set: Set[A]) = leftSemigroup.batchCreate(set.map(toBytes))

val leftSemigroup = new HyperLogLogMonoid(hllBits)
val rightAggregator = Aggregator.uniqueCount[A].andThenPresent { _.toLong }
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,48 @@ class EventuallyTest extends WordSpec with Matchers {
}

}

class EventuallyAggregatorLaws extends PropSpec with PropertyChecks with Matchers {
implicit def aggregator[A, B, C](implicit prepare: Arbitrary[A => B],
sg: Semigroup[B],
present: Arbitrary[B => C]): Arbitrary[Aggregator[A, B, C]] = Arbitrary {
for {
pp <- prepare.arbitrary
ps <- present.arbitrary
} yield new Aggregator[A, B, C] {
def prepare(a: A) = pp(a)
def semigroup = sg
def present(b: B) = ps(b)
}
}

def eventuallyAggregator(rightAg: Aggregator[Int, Int, Int])(pred: (Int => Boolean)): EventuallyAggregator[Int, Double, Int, String] = {
new EventuallyAggregator[Int, Double, Int, String] {
def presentLeft(e: Double) = "Left"

def convert(o: Int) = o.toDouble
def mustConvert(o: Int) = pred(o)

val leftSemigroup = Semigroup.doubleSemigroup
def rightAggregator = rightAg.andThenPresent{ _ => "Right" }
}
}

def isConvertedCorrectly(s: String, semi: EventuallySemigroup[Double, Int], in: List[Right[Double, Int]]) = {
val isFromLeft = s == "Left"
val shouldBeFromLeft = semi.sumOption(in).get.isLeft

if (shouldBeFromLeft) isFromLeft else !isFromLeft
}

property("EventuallyAggregator converts correctly") {
forAll{ (in: List[Int], pred: (Int => Boolean), rightAg: Aggregator[Int, Int, Int]) =>
val eventuallyAg = eventuallyAggregator(rightAg)(pred)
val semi = eventuallyAg.semigroup
val rightIn = in.map { Right(_) }

assert(in.isEmpty || isConvertedCorrectly(eventuallyAg(in), semi, rightIn))
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -255,4 +255,34 @@ class HyperLogLogTest extends WordSpec with Matchers {
fromByteBuffer(java.nio.ByteBuffer.wrap(toBytes(h))) shouldEqual h
}
}

"SetSizeAggregator" should {
"work as an Aggregator and return exact size when <= maxSetSize" in {
List(5, 7, 10).foreach(i => {
val maxSetSize = 10000
val aggregator = SetSizeAggregator[Int](i, maxSetSize)

val maxUniqueDataSize = maxSetSize / 2
val data = (0 to maxUniqueDataSize).map { _ => r.nextInt(1000) }
val exact = exactCount(data).toDouble
val result = aggregator(data)
assert(result == exact)
})
}

"work as an Aggregator and return approximate size when > maxSetSize" in {
List(5, 7, 10).foreach(i => {
val maxSetSize = 10000
val aggregator = SetSizeAggregator[Int](i, maxSetSize)

val maxUniqueDataSize = maxSetSize + i
val data = 0 to maxUniqueDataSize
val exact = exactCount(data).toDouble

val estimate = aggregator(data)
assert(estimate != exact)
assert(scala.math.abs(exact - estimate) / exact < 3.5 * aveErrorOf(i))
})
}
}
}

0 comments on commit 062f560

Please sign in to comment.