Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CMS computes heavy hitters incorrectly when combining CMS instances with ++ #353

Closed
miguno opened this issue Oct 15, 2014 · 7 comments
Closed

Comments

@miguno
Copy link

miguno commented Oct 15, 2014

This is a follow-up on a conversation I had with @johnynek and @avibryant on Twitter. Many thanks to both of you for your help on narrowing this down. And please do let me know if I'm mistaken with what I'm writing here.

From what I understand today, I think I was partly right and partly wrong in my Twitter comments from yesterday:

  • I was right right in the sense there currently is a bug in the Algebird's CMS that miscomputes heavy hitters under certain (data) conditions. I describe these conditions in detail below and also propose a fix, and this fix is exactly what @avibryant and I briefly talked about on Twitter.
  • I was (partly) wrong though when I said this is related to the merging of heavy hitters not being associative -- here I over-generalized. In fact, the merging of heavy hitters (once fixed) is associative for top-% CMS variants (the current, pre A generic version of CountMinSketch[T] to decouple CMS from Long? #345 implementation of CMS is such a variant) but it is not associative for top-N CMS variants. On the positive side, this means once this fix is included, the current CMS variant works correctly. On the negative side, it means that a top-N CMS variant may still (and always?) have a problem when merging heavy hitters, and SketchMap might be similarly affected (and if SketchMap is not, which would be great, then we may have just learned we can't simply replace SketchMap with a top-N CMS variant).

In summary:

  • We seem to have a legitimate bug in the current (top-% based) CMS implementation that miscomputes heavy hitters under certain conditions. There's a fix for that, see below.
  • We seem to have a general limitation to correctly compute heavy hitters for any top-N based CMS.

Background

As part of my refactoring work in GH-345 I decoupled the counting/querying functionality of CMS from the heavy hitters functionality (as per @jnievelt's suggestion), and once I reached that point I could easily provide both top-% and top-N CMS variants (here, having a top-N CMS variant brings us one step closer to eventually retire SketchMap). And while testing the top-N variant I first discovered the heavy hitters issue I describe here.

Problem statement

In the discussion below I assume the two CMS instances we want to merge are called left and right, respectively.

The computation to combine two sets of heavy hitters is not correct right now when you use CMSInstance#++(other: CMS): CMS for combing two CMS instances. The problem itself is two-fold, though both aspects are related and can be fixed in one swing.

First, currently the way to combine the "left" and "right" heavy hitters is a simple call to ++ of the HeavyHitters case class (same ++ name, but different class; see code) -- or, alternatively speaking, that we don't post-process the combined heavy hitters to merge entries for the same item.

Here, heavy hitters elements for the same item are not summed/merged. The effect is that items that are elements of the intersection of left's and right's heavy hitters are not represented correctly.

Example REPL output:

    scala> import com.twitter.algebird._
    scala> import scala.collection.SortedSet
    scala> val s1 = SortedSet[HeavyHitter]()(HeavyHitter.ordering) + HeavyHitter(10L, 1) + HeavyHitter(20L, 2)
    s1: scala.collection.immutable.SortedSet[com.twitter.algebird.HeavyHitter] = TreeSet(HeavyHitter(10,1), HeavyHitter(20,2))

    scala> val s2 = SortedSet[HeavyHitter]()(HeavyHitter.ordering) + HeavyHitter(20L, 1)
    s2: scala.collection.immutable.SortedSet[com.twitter.algebird.HeavyHitter] = TreeSet(HeavyHitter(20,1))

    scala> val combinedHhs = s1 ++ s2
    combinedHhs: scala.collection.immutable.SortedSet[com.twitter.algebird.HeavyHitter] = TreeSet(HeavyHitter(10,1), HeavyHitter(20,1), HeavyHitter(20,2))

As you can see in this example the item 20L is actually represented twice in the combined heavy hitters combinedHhs -- HeavyHitter(20L, 1) and HeavyHitter(20L, 2) -- though it should be represented only once as a single HeavyHitter(20L, 3).

I think in summary the first problem means a) items that are in the left+right HH intersection will have lower effective counts than they should (max(individual counts) instead of sum(individual counts)), which means they are less likely to stay in the post-merge set of heavy hitters -- which is the opposite behavior of what one would intuitively expect for such intersection items -- and b) the cardinality of heavy hitters is >= the cardinality of the items in the heavy hitters -- because we have duplicate HH entries for some items -- which means the memory footprint is larger than it needs to be (at least until one or more subsequent "purging/dropping" operations are run on the heavy hitters), though this additional memory footprint may be of little concern in practice at the moment. In case we would have a truly parameterized CMS[K, V] at some point in the future, e.g. with V=HyperLogLog like Avi suggested, the additional memory footprint might become an issue.

This first problem could be fixed by properly merging the left and right sets (think: groupBy(._item)). But fixing only this two-set-merge is not sufficient, because there's a second issue:

Second, items that are elements of heavy hitters of either left or right ("left.heavyHitters and not right.heavyHitters", "right.heavyHitters and not left.heavyHitters") may not be represented correctly. For example, it may happen that item A is in left's heavy hitters and not in right's BUT it was seen by right and thus counted (A simply didn't make it into the heavy hitters). Here, in order to represent A correctly when combining left and right, we do not want to forget right's count of A when doing the merge.

Unit test that demonstrates the problem

You can see that problem by adding the following unit test to CountMinSketchTest.scala:

"should compute heavy hitters correctly (regression test of GH-353)" in {
      val monoid = new CountMinSketchMonoid(EPS, DELTA, SEED, 0.1)

      val data1 = Seq(1L, 1L, 1L, 2L, 2L, 3L)
      val data2 = Seq(3L, 4L, 4L, 4L, 5L, 5L)
      val data3 = Seq(3L, 6L, 6L, 6L, 7L, 7L)
      val data4 = Seq(3L, 8L, 8L, 8L, 9L, 9L)
      val singleData = data1 ++ data2 ++ data3 ++ data4

      /*
        Data sets from above shown in tabular view

        Item     1   2   3   4   total (= singleData)
        ----------------------------------------
        A (1L)   3   -   -   -   3
        B (2L)   2   -   -   -   2
        C (3L)   1   1   1   1   4  <<< C, with count 4, is the global top 1 heavy hitter
        D (4L)   -   3   -   -   3
        E (5L)   -   2   -   -   2
        F (6L)   -   -   3   -   3
        G (7L)   -   -   2   -   2
        H (8L)   -   -   -   3   3
        I (9L)   -   -   -   2   2

       */

      val cms1 = monoid.create(data1)
      val cms2 = monoid.create(data2)
      val cms3 = monoid.create(data3)
      val cms4 = monoid.create(data4)
      val aggregated = cms1 ++ cms2 ++ cms3 ++ cms4

      val single = monoid.create(singleData)
      aggregated.heavyHitters should be(single.heavyHitters)
      aggregated.heavyHitters contains(3L) // C=3L is global top 1 heavy hitter
    }

Here, we would expect that the final heavy hitters in the "merged" aggregated CMS are identical to the heavy hitters in the single CMS. However, in the current CMS implementation they are not.

How to fix

The basic algorithm to correctly combine two sets of heavy hitters is as follows. We assume the two corresponding CMSs are called left and right, respectively.

  1. Create the union of the items in left's heavy hitters and right's heavy hitters. This union is the set of candidates that may become the final, combined heavy hitters.
  2. Combine the counting tables of left and right.
  3. For each item in the union from step 1, estimate the "combined" frequency from
    the combined counting table from step 2.
  4. Drop all items whose count is below the minimum threshold. The remaining items are the correct heavy hitters.

Proof of concept fix for the current CMS implementation:

diff --git a/algebird-core/src/main/scala/com/twitter/algebird/CountMinSketch.scala b/algebird-core/src/main/scala/com/twitter/algebird/CountMinSketch.scala
index b87058e..c11560c 100644
--- a/algebird-core/src/main/scala/com/twitter/algebird/CountMinSketch.scala
+++ b/algebird-core/src/main/scala/com/twitter/algebird/CountMinSketch.scala
@@ -258,9 +258,98 @@ case class CMSInstance(countsTable: CMSCountsTable, totalCount: Long,
       case other: CMSZero => this
       case other: CMSItem => this + other.item
       case other: CMSInstance => {
         val newTotalCount = totalCount + other.totalCount
-        val newHhs = (hhs ++ other.hhs).dropCountsBelow(params.heavyHittersPct * newTotalCount)
-        CMSInstance(countsTable ++ other.countsTable, newTotalCount, newHhs, params)
+        val allItems = hhs.items ++ other.hhs.items
+        val combinedCounts = countsTable ++ other.countsTable
+        val newHhs = {
+          def freq(table: CMSCountsTable, x: Long): Approximate[Long] = {
+            val estimates = table.counts.zipWithIndex.map {
+              case (row, i) =>
+                row(params.hashes(i)(x))
+            }
+            makeApprox(estimates.min)
+          }
+          val hitters: Set[HeavyHitter] = allItems.map { case i => HeavyHitter(i, freq(combinedCounts, i).estimate) }
+          val hhs = HeavyHitters(hitters.foldLeft(SortedSet[HeavyHitter]()(HeavyHitter.ordering))(_ + _))
+          hhs.dropCountsBelow(params.heavyHittersPct * newTotalCount)
+        }
+        CMSInstance(combinedCounts, newTotalCount, newHhs, params)
       }
     }
   }

Please note that the purpose of the patch above is to let me demonstrate the idea of the fix in a simple, short diff. A proper, cleaner variant of the fix would touch multiple places in the CMS code. (For instance, such a clean fix is already part of the GH-345 code.)

Limitation of this fix

The fix approach above does address the heavy hitters problem for top-% CMS variants, i.e. a CMS that computes heavy hitters based on the rule count >= heavyHittersPct * totalCount. The current (pre GH-345) CMS code in Algebird is such a top-% CMS variant, and can thus be fixed with the approach above.

However, the fix does not address the heavy hitters problem for top-N CMS variants, i.e. a CMS that computes heavy hitters by sorting items by count, and keeping only the top N items. In a nutshell, the reason is that there may be items that should be promoted to the "global" heavy hitters list (aka post-merge) but which will not. What are the characteristics of these items? They happen to never make it into the heavy hitters of any of the CMS instances being merged (think: cms1 ++ cms2 ++ ...), and thus never make it into the union-based set of HH candidates (cf. step 1 in the "How to fix" section above). Alternatively speaking this limitation is caused by the operation of merging "top-N" sets not being associative. (The latter may or may not be what @johnynek was eluding to in his Twitter comment that "SketchMap's approach is not strictly associative [...]".)

For the current top-% CMS implementation in comparison, every item that should make it into the global, post-merge set of heavy hitters will at some point become an element of the left+right heavy hitter union set aka the list of HH candidates, thanks to the count >= heavyHittersPct * totalCount rule.

Next steps?

What I say below assume you agree there actually is an issue with the current CMS code. Please let me know if I'm mistaken, which may well be. :-)

My work in GH-345 already includes this fix in a "clean" way, and I should get open source approval by the end of this week. So depending on how pressing / important this issue is for you, we could opt to just wait until GH-345 is in.

Alternatively, we could patch the current CMS implementation -- either with the PoC fix in this issue, or with a cleaner variant (the PoC fix is intentionally not clean to better demonstrate the fix approach).

@miguno miguno changed the title CMS computes heavy hitters incorrectly CMS computes heavy hitters incorrectly when combining CMS instances with ++ Oct 15, 2014
@jnievelt
Copy link
Contributor

I'm fairly sure you're correct on both issues. Your fix seems to be what SketchMap already does (yet another reason that putting these together would be good); see SketchMapParams#updatedHeavyHitters etc.:

https://github.com/twitter/algebird/blob/develop/algebird-core/src/main/scala/com/twitter/algebird/SketchMap.scala#L159

But I believe @johnynek would be talking about this case (N=1):

(AAABBBB)(CCC) -> definitely B
(AAABB)(BBCCC) -> either A or C arbitrarily

@miguno
Copy link
Author

miguno commented Oct 15, 2014

Thanks for your response, Joe.

Slightly off-topic: As a follow-up on your SketchMap code link, is there a specific reason that in the current CMS code heavyHitters returns Set[K], although the set being returned is a SortedSet[K]? For instance, would it make sense to return List[K], similar to what SketchMap does?

I am asking because if I understand correctly the CMS code already sorts heavy hitters behind the scenes because of its use of HeavyHitter.ordering:

case class CMSInstance(...) {
  def heavyHitters: Set[Long] = hhs.items
}

case class HeavyHitters(hhs: SortedSet[HeavyHitter] = SortedSet[HeavyHitter]()(HeavyHitter.ordering)) {
  def items: Set[Long] = hhs.map { _.item } // <<< this `map` actually returns a SortedSet
}

object HeavyHitter {
  val ordering = Ordering.by { hh: HeavyHitter => (hh.count, hh.item) }
}

@miguno
Copy link
Author

miguno commented Oct 15, 2014

Updated original post to rename GH of @johnynek from "posco" to "johnynek". Why can't you use consistent names across systems, eh? ;-)

@miguno
Copy link
Author

miguno commented Oct 15, 2014

But I believe @johnynek would be talking about this case (N=1):

(AAABBBB)(CCC) -> definitely B
(AAABB)(BBCCC) -> either A or C arbitrarily

Yes, exactly.

This example is similar to what I had in mind when I was talking about merging being subject to "left-biased" mono-culture when folding left-to-right, i.e. the top-N CMS instances on the left get preferential treatment over instances on the right simply because they are being merged earlier -- a crude analogy would be preferential attachment from network theory.

My example:

(A)(A)(B)(B)(B) when folded left-to-right => A (after first merge A has count 2, meaning it would always win against the individual B=1's)
(A)(A)(B)(B)(B) when folded right-to-left => B (after first merges B has count 3, meaning it would always win against the individual A=1's)

Here, the correct behavior would be to always return B. In the example above, however, sometimes we incorrectly return A, and in the case where we do return B it is kinda by chance. ;-)

@miguno
Copy link
Author

miguno commented Oct 15, 2014

I documented the known misbehavior of top-N CMS in a unit test example:
https://gist.github.com/miguno/02309debbc4f0754f0a1

I think for the very same reason SketchMap will not correctly merge heavy hitters, too. The relevant code (see source) is:

hitters.sorted(specificOrdering).take(heavyHittersCount).toList

The problem is not the code itself, but that merging top-N based heavy hitters is not an associative operation (the take() in the example above in combination with the heavy hitters merge operation).

@jnievelt
Copy link
Contributor

is there a specific reason that in the current CMS code heavyHitters returns Set[K], although the set being returned is a SortedSet[K]? For instance, would it make sense to return List[K], similar to what SketchMap does?

I don't know exactly, but it's no doubt related to the use of dropWhile which only works when you're iterating through lowest to highest. Having said that, I do believe that it's reasonable to return a SortedSet[K] at the interface level too (perhaps not done here to avoid changing the interface). Actually as of #277 SketchMap did this, but it seems to have been reverted (intentionally or unintentionally I'm not quite sure).

But yes, the top-N associativity issue is a fundamental limitation directly related to the take(N), not related to the CMS or SketchMap implementations (if we got rid of take(N) it's hard to argue that we're top-N).

@miguno
Copy link
Author

miguno commented Oct 16, 2014

Addendum as FYI for future readers of this issue:

The top-N limitation only applies when adding CMS instances via ++ (i.e. cms1 ++ cms2). Heavy hitters are tracked correctly when adding/counting individual items via + (i.e. cms + item and cms + (item, count)).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants