Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EventuallyAggregator and variants #407

Merged
merged 2 commits into from
Apr 13, 2015
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions algebird-core/src/main/scala/com/twitter/algebird/Eventually.scala
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,38 @@ class EventuallyRing[E, O](convert: O => E)(mustConvert: O => Boolean)(implicit
}

}

trait AbstractEventuallyAggregator[A, E, O, C]
extends Aggregator[A, Either[E, O], C] {
def prepare(a: A) = Right(rightAggregator.prepare(a))
def present(b: Either[E, O]) = b match {
case Right(o) => rightAggregator.present(o)
case Left(e) => presentLeft(e)
}

def presentLeft(e: E): C

def convert(o: O): E
def mustConvert(o: O): Boolean

def leftSemigroup: Semigroup[E]
def rightAggregator: Aggregator[A, O, C]
}

trait EventuallyAggregator[A, E, O, C]
extends AbstractEventuallyAggregator[A, E, O, C] {

//avoid init order issues and cyclical references
@transient lazy val semigroup =
new EventuallySemigroup[E, O](convert)(mustConvert)(leftSemigroup, rightAggregator.semigroup)
}

trait EventuallyMonoidAggregator[A, E, O, C]
extends AbstractEventuallyAggregator[A, E, O, C]
with MonoidAggregator[A, Either[E, O], C] {

def rightAggregator: MonoidAggregator[A, O, C]

@transient lazy val monoid =
new EventuallyMonoid[E, O](convert)(mustConvert)(leftSemigroup, rightAggregator.monoid)
}
Original file line number Diff line number Diff line change
Expand Up @@ -451,3 +451,15 @@ case class HyperLogLogAggregator(val hllMonoid: HyperLogLogMonoid) extends Monoi
def prepare(value: Array[Byte]) = monoid.create(value)
def present(hll: HLL) = hll
}

case class SetSizeAggregator[A](hllBits: Int, maxSetSize: Int = 10)(implicit toBytes: A => Array[Byte])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might make sense to make the default maxSetSize to be something that depends on hllBits. At the extreme you could serialize every item coming in and increment a byte counter (although the in-memory may be much less or more, but still), then when it exceeds 2^hllBits, switch over. I would think something like maxSetSize: Int = math.max(10, 1 << (hllBits - 5)) might be a decent default.

extends EventuallyMonoidAggregator[A, HLL, Set[A], Long] {

def presentLeft(hll: HLL) = hll.approximateSize.estimate

def mustConvert(set: Set[A]) = set.size > maxSetSize
def convert(set: Set[A]) = leftSemigroup.batchCreate(set.map(toBytes))

val leftSemigroup = new HyperLogLogMonoid(hllBits)
val rightAggregator = Aggregator.uniqueCount[A].andThenPresent { _.toLong }
}