ReduceOperations.hyperLogLog #245

Merged
merged 10 commits into from Feb 20, 2013

Projects

None yet

3 participants

@aaron-siegel

This adds a ReduceOperations.hyperLogLog method, which is just like ReduceOperations.approxUniques except that it leaves the field in type HLL rather than converting to its estimate size.

Useful for output to persistent stores whose approximate counts might be amended at a later time.

@johnynek johnynek and 1 other commented on an outdated diff Dec 13, 2012
...ain/scala/com/twitter/scalding/ReduceOperations.scala
@@ -92,13 +93,22 @@ trait ReduceOperations[+Self <: ReduceOperations[Self]] extends java.io.Serializ
* 0.25% error ~ 256kb
* }}}
*/
- def approxUniques(f : (Fields, Fields), errPercent : Double = 1.0) = {
+ def approxUniques(f : (Fields, Fields), errPercent : Double = 1.0) = {
+ derivedHll(f, errPercent) { (monoid,hll) => monoid.estimateSize(hll) }
+ }
+
+ def hyperLogLog(f : (Fields, Fields), errPercent : Double = 1.0) = {
+ derivedHll(f, errPercent) { (_,hll) => hll }
+ }
+
+ private[this] def derivedHll[T](f : (Fields, Fields), errPercent : Double)
+ (fn : (HyperLogLogMonoid,HLL) => T) = {
//bits = log(m) == 2 *log(104/errPercent) = 2log(104) - 2*log(errPercent)
def log2(x : Double) = scala.math.log(x)/scala.math.log(2.0)
val bits = 2 * scala.math.ceil(log2(104) - log2(errPercent)).toInt
implicit val hmm = new HyperLogLogMonoid(bits)
mapPlusMap(f) { (in : CTuple) => hmm.create(in.toString.getBytes("UTF-8")) }
@johnynek
johnynek Dec 13, 2012

I like this. What about adding an additional function to go from toBytes: (U) => Array[Byte] and not make this private.

In hyperLogLog/approxUniques, you have:, U = CTuple and:

toBytes = { (in: CTuple) => in.toString.getBytes("UTF-8") }
@aaron-siegel
aaron-siegel Dec 13, 2012

So I assume we will also need a TupleConverter[U] parameter?

@aaron-siegel
aaron-siegel Dec 13, 2012

Scratch that - tuple conversion gets delegated to mapPlusMap, of course

@johnynek johnynek commented on an outdated diff Dec 14, 2012
...ain/scala/com/twitter/scalding/ReduceOperations.scala
@@ -92,13 +93,25 @@ trait ReduceOperations[+Self <: ReduceOperations[Self]] extends java.io.Serializ
* 0.25% error ~ 256kb
* }}}
*/
- def approxUniques(f : (Fields, Fields), errPercent : Double = 1.0) = {
+ def approxUniques(f : (Fields, Fields), errPercent : Double = 1.0) = {
+ hyperLogLogMap(f, errPercent) { (monoid,hll) => monoid.estimateSize(hll) }
+ { x: Any => x.toString.getBytes("UTF-8") }
+ }
+
+ def hyperLogLog(f : (Fields, Fields), errPercent : Double = 1.0) = {
+ hyperLogLogMap(f, errPercent) { (_,hll) => hll }
+ { x: Any => x.toString.getBytes("UTF-8") }
@johnynek
johnynek Dec 14, 2012

I think you actually need CTuple, not Any here. The issue is if someone counts multiple fields (like age, height combinations) the arity will be 2, but the default TupleGetter has arity 1 (singleGetter). By using the CTuple type, scalding just passes you the original cascading tuple.

@johnynek johnynek and 1 other commented on an outdated diff Dec 14, 2012
...ain/scala/com/twitter/scalding/ReduceOperations.scala
@@ -92,13 +93,25 @@ trait ReduceOperations[+Self <: ReduceOperations[Self]] extends java.io.Serializ
* 0.25% error ~ 256kb
* }}}
*/
- def approxUniques(f : (Fields, Fields), errPercent : Double = 1.0) = {
+ def approxUniques(f : (Fields, Fields), errPercent : Double = 1.0) = {
+ hyperLogLogMap(f, errPercent) { (monoid,hll) => monoid.estimateSize(hll) }
+ { x: Any => x.toString.getBytes("UTF-8") }
+ }
+
+ def hyperLogLog(f : (Fields, Fields), errPercent : Double = 1.0) = {
+ hyperLogLogMap(f, errPercent) { (_,hll) => hll }
+ { x: Any => x.toString.getBytes("UTF-8") }
+ }
+
+ def hyperLogLogMap[T,U](f : (Fields, Fields), errPercent : Double = 1.0)
+ (fn : (HyperLogLogMonoid,HLL) => U)
+ (toBytes : (T) => Array[Byte]) = {
@johnynek
johnynek Dec 14, 2012

Can we move the toBytes before fn, since it logically happens before ( T=> toBytes => HLL => fn ).

@aaron-siegel
aaron-siegel Dec 18, 2012

I wish this could be an implicit hasher...

@aaron-siegel

I've made the suggested changes. However it's worth noting that the HLL refactoring in algebird opens up the possibility of deriving approxUniques from hyperLogLog ... so I'm not sure it's necessary to have a separate hyperLogLogMap method. I'm partial to moving toward something like the following:

def hyperLogLog[T](f : (Fields,Fields), errPercent : Double = 1.0)(implicit hasher : Hasher[T,Array[Byte]]) = {
  // ... (the usual code for sizing and creating the monoid)
  mapPlusMap(f) { (t : T) => hmm.create(hasher.hash(t)) } { hll => hll }
}

def approxUniques[T](f : (Fields,Fields), errPercent : Double = 1.0)(implicit hasher : Hasher[T,Array[Byte]]) = {
  hyperLogLog(f, errPercent)(hasher).map { hll : HLL => hll.estimatedSize }
}
@aaron-siegel

This is a semi-breaking change, since it converts hyperLogLog to a type-parameterized method. But the existing implementation of hyperLogLog is semi-broken anyway, since if the input field represents a Set (which isn't a farfetched use case at all) then it's possible for two equal sets to get hashed differently.

@johnynek

I think this is an experimental method, and breaking changes are okay.

I agree with your point about a better API around hashing. I think we need to get that straight in algebird first. That should be a top priority.

@aaron-siegel

Ok. Switching from "Any" to "CTuple" on the kludge-hashers broke the unit tests due to some sort of type conversion failure in the test jobs. Rather than try to sort it out, I'm just going to set this aside until the hash methodology is cleaned up in algebird.

I agree it's important to arrive at robust and stable hash strategies. Stability is absolutely critical, since changes to the hash methodology will break backward compatibility with persisted HLL data. (I'm happy to lend a hand fleshing out a new hashing API if that would be helpful)

@sritchie
Twitter, Inc. member
@aaron-siegel

Thanks! Does there exist a library of T -> Array[Byte] hashers for various T? (Or are there plans to build one?)

@johnynek
@johnynek johnynek commented on an outdated diff Dec 20, 2012
...ain/scala/com/twitter/scalding/ReduceOperations.scala
* }}}
*/
- def approxUniques(f : (Fields, Fields), errPercent : Double = 1.0) = {
+ def approxUniques(f : (Fields, Fields), errPercent : Double = 1.0) = {
+ hyperLogLogMap(f, errPercent) { x: CTuple => x.toString.getBytes("UTF-8") }
+ { (monoid,hll) => monoid.estimateSize(hll) }
+
+ }
+
+ def hyperLogLog(f : (Fields, Fields), errPercent : Double = 1.0) = {
+ hyperLogLogMap(f, errPercent) { x: CTuple => x.toString.getBytes("UTF-8") }
+ { (_,hll) => hll }
+ }
+
+ def hyperLogLogMap[T,U](f : (Fields, Fields), errPercent : Double = 1.0)
+ (toBytes : (T) => Array[Byte])
+ (fn : (HyperLogLogMonoid,HLL) => U) = {
@johnynek
johnynek Dec 20, 2012

you are going to need to add T: TupleConverter. Without that, this method can't see which TupleConverter to use, and so it uses the singleConverter, which just is: getObject(0).asInstanceOf[T], which breaks because the internal object is not a CTuple.

Instead, the TupleConverter[CTuple] just asses the CTuple through, and the .toString does the "right thing" as along as all the internal items have a good toString method.

@johnynek

Aaron, can you fix this? If you do, I'll merge it and include it in scalding 0.8.2 which will probably go out soon.

@aaron-siegel

I'm not happy with this approach any more. I'd much prefer that these all be parameterized on an implicit Hashable[T,Array[Byte]], rather than an explicit toBytes function with a default toString kludge. So my preference is to hold off on this change until the hashing logic is fleshed out in algebird.

The approxUniques implementation can now be simplified also due to the HLL refactoring (as it's now possible to go from an HLL instance to its estimated size without having access to the monoid that created it).

@johnynek

Alternatively, you could pull an implicit Bijection[T, Array[Byte]] and the new Bijection library wil be public soon (the jars are already on maven, sshhhh). And then we don't have to refactor all of HLL before this works.

@aaron-siegel

Ping!

I updated this awhile ago to use Bijections more gracefully.

@johnynek johnynek commented on an outdated diff Feb 8, 2013
...ain/scala/com/twitter/scalding/ReduceOperations.scala
* }}}
*/
- def approxUniques(f : (Fields, Fields), errPercent : Double = 1.0) = {
+ def approxUniques[T <% Array[Byte]](f : (Fields, Fields), errPercent : Double = 1.0) = {
@johnynek
johnynek Feb 8, 2013

Super sorry for the delay.

Will click merge right now if you send one that doesn't change old signatures (just adds methods, otherwise we can't put it in 0.8.3 which I want to release soon.).

So, can you make approxUniques use the toString.getBytes("UTF-8") function?

Rename this new one something like: approxUniquesBytes or something?

@sritchie
Twitter, Inc. member

@asiegel34, let's try and get this in for 0.8.3 -- are you good to get this change in?

@aaron-siegel aaron-siegel was assigned Feb 14, 2013
@aaron-siegel

Sorry for the slow response - I was away all last week.

Calling it "approxUniqueBytes" feels like falling into the same trap as sum vs. plus ... the signature that we really want is "already taken" by an inferior implementation, so we use a less suitable signature, which then gets enshrined...

How about calling it "approximateUniques" and preserving the old "approxUniques" (at least through 0.8.X) as a deprecated method?

@aaron-siegel

Even better, I think, is "approximateUniqueCount". I don't mind long method names when they're appropriately descriptive.

@johnynek

Rename is good. As soon as the test is green, merge, and we'll cut 0.8.3.

@sritchie sritchie merged commit cb68c46 into twitter:develop Feb 20, 2013
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment