Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReduceOperations.hyperLogLog #245

Merged
merged 10 commits into from
Feb 20, 2013
35 changes: 28 additions & 7 deletions src/main/scala/com/twitter/scalding/ReduceOperations.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.twitter.algebird.{
Moments,
SortedTakeListMonoid,
HyperLogLogMonoid,
HLL,
Aggregator
}

Expand Down Expand Up @@ -86,19 +87,39 @@ trait ReduceOperations[+Self <: ReduceOperations[Self]] extends java.io.Serializ
* For each key:
* {{{
* 10% error ~ 256 bytes
* 5% error ~ 1kb
* 1% error ~ 8kb
* 0.5% error ~ 64kb
* 0.25% error ~ 256kb
* 5% error ~ 1kB
* 2% error ~ 4kB
* 1% error ~ 16kB
* 0.5% error ~ 64kB
* 0.25% error ~ 256kB
* }}}
*/
def approxUniques(f : (Fields, Fields), errPercent : Double = 1.0) = {
def approximateUniqueCount[T <% Array[Byte] : TupleConverter]
(f : (Fields, Fields), errPercent : Double = 1.0) = {
hyperLogLogMap[T,Double](f, errPercent) { _.estimatedSize }
}

def hyperLogLog[T <% Array[Byte] : TupleConverter]
(f : (Fields, Fields), errPercent : Double = 1.0) = {
hyperLogLogMap[T,HLL](f, errPercent) { hll => hll }
}

@deprecated("use of approximateUniqueCount is preferred.", "0.8.3")
def approxUniques(f : (Fields, Fields), errPercent : Double = 1.0) = {
// Legacy (pre-bijection) approximate unique count that uses in.toString.getBytes to
// obtain a long hash code. We specify the kludgy CTuple => Array[Byte] bijection
// explicitly.
implicit def kludgeHasher(in: CTuple) = in.toString.getBytes("UTF-8")
hyperLogLogMap[CTuple,Double](f, errPercent) { _.estimatedSize }
}

private[this] def hyperLogLogMap[T <% Array[Byte] : TupleConverter, U : TupleSetter]
(f : (Fields, Fields), errPercent : Double = 1.0)(fn : HLL => U) = {
//bits = log(m) == 2 *log(104/errPercent) = 2log(104) - 2*log(errPercent)
def log2(x : Double) = scala.math.log(x)/scala.math.log(2.0)
val bits = 2 * scala.math.ceil(log2(104) - log2(errPercent)).toInt
implicit val hmm = new HyperLogLogMonoid(bits)
mapPlusMap(f) { (in : CTuple) => hmm.create(in.toString.getBytes("UTF-8")) }
{ hmm.estimateSize(_) }
mapPlusMap(f) { (t : T) => hmm(t) } (fn)
}

/**
Expand Down