Permalink
Browse files

Merge pull request #245 from asiegel34/hll-aggregate

ReduceOperations.hyperLogLog
  • Loading branch information...
2 parents 98505d4 + 79e6db3 commit cb68c466c3f0f4832f792b5deecf87f12b31347a @sritchie sritchie committed Feb 20, 2013
Showing with 28 additions and 7 deletions.
  1. +28 −7 src/main/scala/com/twitter/scalding/ReduceOperations.scala
@@ -24,6 +24,7 @@ import com.twitter.algebird.{
AveragedValue,
Moments,
HyperLogLogMonoid,
+ HLL,
Aggregator
}
@@ -89,19 +90,39 @@ trait ReduceOperations[+Self <: ReduceOperations[Self]] extends java.io.Serializ
* For each key:
* {{{
* 10% error ~ 256 bytes
- * 5% error ~ 1kb
- * 1% error ~ 8kb
- * 0.5% error ~ 64kb
- * 0.25% error ~ 256kb
+ * 5% error ~ 1kB
+ * 2% error ~ 4kB
+ * 1% error ~ 16kB
+ * 0.5% error ~ 64kB
+ * 0.25% error ~ 256kB
* }}}
*/
- def approxUniques(f : (Fields, Fields), errPercent : Double = 1.0) = {
+ def approximateUniqueCount[T <% Array[Byte] : TupleConverter]
+ (f : (Fields, Fields), errPercent : Double = 1.0) = {
+ hyperLogLogMap[T,Double](f, errPercent) { _.estimatedSize }
+ }
+
+ def hyperLogLog[T <% Array[Byte] : TupleConverter]
+ (f : (Fields, Fields), errPercent : Double = 1.0) = {
+ hyperLogLogMap[T,HLL](f, errPercent) { hll => hll }
+ }
+
+ @deprecated("use of approximateUniqueCount is preferred.", "0.8.3")
+ def approxUniques(f : (Fields, Fields), errPercent : Double = 1.0) = {
+ // Legacy (pre-bijection) approximate unique count that uses in.toString.getBytes to
+ // obtain a long hash code. We specify the kludgy CTuple => Array[Byte] bijection
+ // explicitly.
+ implicit def kludgeHasher(in: CTuple) = in.toString.getBytes("UTF-8")
+ hyperLogLogMap[CTuple,Double](f, errPercent) { _.estimatedSize }
+ }
+
+ private[this] def hyperLogLogMap[T <% Array[Byte] : TupleConverter, U : TupleSetter]
+ (f : (Fields, Fields), errPercent : Double = 1.0)(fn : HLL => U) = {
//bits = log(m) == 2 *log(104/errPercent) = 2log(104) - 2*log(errPercent)
def log2(x : Double) = scala.math.log(x)/scala.math.log(2.0)
val bits = 2 * scala.math.ceil(log2(104) - log2(errPercent)).toInt
implicit val hmm = new HyperLogLogMonoid(bits)
- mapPlusMap(f) { (in : CTuple) => hmm.create(in.toString.getBytes("UTF-8")) }
- { hmm.estimateSize(_) }
+ mapPlusMap(f) { (t : T) => hmm(t) } (fn)
}
/**

0 comments on commit cb68c46

Please sign in to comment.