Joe Nievelt committed Aug 5, 2015
commit dd67cd2
Expand Up @@ -720,6 +720,47 @@ case class TopCMSInstance[K](override val cms: CMS[K], hhs: HeavyHitters[K], par


class TopCMSMonoid[K](cms: CMS[K], logic: HeavyHittersLogic[K]) extends Monoid[TopCMS[K]] {

val params: TopCMSParams[K] = TopCMSParams(logic)

val zero: TopCMS[K] = TopCMSZero[K](cms, params)

* Combines the two sketches.
* The sketches must use the same hash functions.
def plus(left: TopCMS[K], right: TopCMS[K]): TopCMS[K] = {
require(left.cms.params.hashes == right.cms.params.hashes, "The sketches must use the same hash functions.")
left ++ right

* Creates a sketch out of a single item.
def create(item: K): TopCMS[K] = TopCMSItem[K](item, cms + item, params)

* Creates a sketch out of multiple items.
def create(data: Seq[K]): TopCMS[K] = {
data.foldLeft(zero) { case (acc, x) => plus(acc, create(x)) }


class TopCMSAggregator[K](cmsMonoid: TopCMSMonoid[K])
extends MonoidAggregator[K, TopCMS[K], TopCMS[K]] {

def monoid = cmsMonoid

def prepare(value: K): TopCMS[K] = monoid.create(value)

def present(cms: TopCMS[K]): TopCMS[K] = cms


* Controls how a CMS that implements [[CMSHeavyHitters]] tracks heavy hitters.
Expand Down Expand Up @@ -851,38 +892,7 @@ case class HeavyHitter[K](item: K, count: Long) extends
* Which type K should you pick in practice? For domains that have less than `2^64` unique elements, you'd
* typically use [[Long]]. For larger domains you can try [[BigInt]], for example.
class TopPctCMSMonoid[K](cms: CMS[K], heavyHittersPct: Double = 0.01) extends Monoid[TopCMS[K]] {

val params: TopCMSParams[K] = {
val logic = new TopPctLogic[K](heavyHittersPct)

val zero: TopCMS[K] = TopCMSZero[K](cms, params)

* Combines the two sketches.
* The sketches must use the same hash functions.
def plus(left: TopCMS[K], right: TopCMS[K]): TopCMS[K] = {
require(left.cms.params.hashes == right.cms.params.hashes, "The sketches must use the same hash functions.")
left ++ right

* Creates a sketch out of a single item.
def create(item: K): TopCMS[K] = TopCMSItem[K](item, cms + item, params)

* Creates a sketch out of multiple items.
def create(data: Seq[K]): TopCMS[K] = {
data.foldLeft(zero) { case (acc, x) => plus(acc, create(x)) }

class TopPctCMSMonoid[K](cms: CMS[K], heavyHittersPct: Double = 0.01) extends TopCMSMonoid[K](cms, TopPctLogic[K](heavyHittersPct))

object TopPctCMS {

Expand Down Expand Up @@ -915,16 +925,7 @@ object TopPctCMS {
* An Aggregator for [[TopPctCMS]]. Can be created using [[TopPctCMS.aggregator]].
case class TopPctCMSAggregator[K](cmsMonoid: TopPctCMSMonoid[K])
extends MonoidAggregator[K, TopCMS[K], TopCMS[K]] {

def monoid = cmsMonoid

def prepare(value: K): TopCMS[K] = monoid.create(value)

def present(cms: TopCMS[K]): TopCMS[K] = cms

case class TopPctCMSAggregator[K](cmsMonoid: TopPctCMSMonoid[K]) extends TopCMSAggregator(cmsMonoid)

* Monoid for top-N based [[TopCMS]] sketches. '''Use with care! (see warning below)'''
Expand Down Expand Up @@ -983,36 +984,7 @@ case class TopPctCMSAggregator[K](cmsMonoid: TopPctCMSMonoid[K])
* Which type K should you pick in practice? For domains that have less than `2^64` unique elements, you'd
* typically use [[Long]]. For larger domains you can try [[BigInt]], for example.
class TopNCMSMonoid[K](cms: CMS[K], heavyHittersN: Int = 100) extends Monoid[TopCMS[K]] {

val params: TopCMSParams[K] = {
val logic = new TopNLogic[K](heavyHittersN)

val zero: TopCMS[K] = TopCMSZero[K](cms, params)

* Combines the two sketches.
* The sketches must use the same hash functions.
def plus(left: TopCMS[K], right: TopCMS[K]): TopCMS[K] = {
require(left.cms.params.hashes == right.cms.params.hashes, "The sketches must use the same hash functions.")
left ++ right

* Creates a sketch out of a single item.
def create(item: K): TopCMS[K] = TopCMSItem[K](item, cms + item, params)

* Creates a sketch out of multiple items.
def create(data: Seq[K]): TopCMS[K] = data.foldLeft(zero) { case (acc, x) => plus(acc, create(x)) }

class TopNCMSMonoid[K](cms: CMS[K], heavyHittersN: Int = 100) extends TopCMSMonoid[K](cms, TopNLogic[K](heavyHittersN))

object TopNCMS {

Expand Down Expand Up @@ -1045,14 +1017,87 @@ object TopNCMS {
* An Aggregator for [[TopNCMS]]. Can be created using [[TopNCMS.aggregator]].
case class TopNCMSAggregator[K](cmsMonoid: TopNCMSMonoid[K])
extends MonoidAggregator[K, TopCMS[K], TopCMS[K]] {
case class TopNCMSAggregator[K](cmsMonoid: TopNCMSMonoid[K]) extends TopCMSAggregator(cmsMonoid)

val monoid = cmsMonoid
* K1 defines a scope for the CMS. For each k1, keep the top heavyHittersN
* associated k2 values.
case class ScopedTopNLogic[K1, K2](heavyHittersN: Int) extends HeavyHittersLogic[(K1, K2)] {

def prepare(value: K): TopCMS[K] = monoid.create(value)
require(heavyHittersN > 0, "heavyHittersN must be > 0")

def present(cms: TopCMS[K]): TopCMS[K] = cms
override def purgeHeavyHitters(cms: CMS[(K1, K2)])(hitters: HeavyHitters[(K1, K2)]): HeavyHitters[(K1, K2)] = {
val grouped = hitters.hhs.groupBy { hh => hh.item._1 }
val (underLimit, overLimit) = grouped.partition { _._2.size <= heavyHittersN }
val sorted = overLimit.mapValues { hhs => hhs.toSeq.sortBy { hh => hh.count } }
val purged = sorted.mapValues { hhs => hhs.takeRight(heavyHittersN) }
HeavyHitters[(K1, K2)](purged.values.flatten.toSet ++ underLimit.values.flatten.toSet)


* Monoid for Top-N values per key in an associative [[TopCMS]].
* Typical use case for this might be (Country, City) pairs. For a stream of such
* pairs, we might want to keep track of the most popular cities for each country.
* This can, of course, be achieved using a Map[Country, TopNCMS[City]], but this
* requires storing one CMS per distinct Country.
* Similarly, one could attempt to use a TopNCMS[(Country, City)], but less common
* countries may not make the cut if N is not "very large".
* ScopedTopNCMSMonoid[Country, City] will avoid having one Country drown others
* out, while still only using a single CMS.
* In general the eviction of K1 is not supported, and all distinct K1 values must
* be retained. Therefore it is important to only use this Monoid when the number
* of distinct K1 values is known to be reasonably bounded.
class ScopedTopNCMSMonoid[K1, K2](cms: CMS[(K1, K2)], heavyHittersN: Int = 100) extends TopCMSMonoid[(K1, K2)](cms, ScopedTopNLogic[K1, K2](heavyHittersN))

object ScopedTopNCMS {

def scopedHasher[K1: CMSHasher, K2: CMSHasher] = new CMSHasher[(K1, K2)] {
private val k1Hasher = implicitly[CMSHasher[K1]]
private val k2Hasher = implicitly[CMSHasher[K2]]

def hash(a: Int, b: Int, width: Int)(x: (K1, K2)): Int = {
val (k1, k2) = x
val xs = Seq(
k1Hasher.hash(a, b, width)(k1),
k2Hasher.hash(a, b, width)(k2),
(scala.util.hashing.MurmurHash3.seqHash(xs) & Int.MaxValue) % width

def monoid[K1: CMSHasher, K2: CMSHasher](eps: Double,
delta: Double,
seed: Int,
heavyHittersN: Int): ScopedTopNCMSMonoid[K1, K2] =
new ScopedTopNCMSMonoid[K1, K2](CMS(eps, delta, seed)(scopedHasher[K1, K2]), heavyHittersN)

def monoid[K1: CMSHasher, K2: CMSHasher](depth: Int,
width: Int,
seed: Int,
heavyHittersN: Int): ScopedTopNCMSMonoid[K1, K2] =
monoid(CMSFunctions.eps(width),, seed, heavyHittersN)

def aggregator[K1: CMSHasher, K2: CMSHasher](eps: Double,
delta: Double,
seed: Int,
heavyHittersN: Int): TopCMSAggregator[(K1, K2)] =
new TopCMSAggregator(monoid(eps, delta, seed, heavyHittersN))

def aggregator[K1: CMSHasher, K2: CMSHasher](depth: Int,
width: Int,
seed: Int,
heavyHittersN: Int): TopCMSAggregator[(K1, K2)] =
aggregator(CMSFunctions.eps(width),, seed, heavyHittersN)


Expand Up @@ -809,6 +809,64 @@ abstract class CMSTest[K: CMSHasher: FromIntLike] extends WordSpec with Matchers


"A Scoped Top-N Count-Min sketch implementing CMSHeavyHitters" should {

"create correct sketches out of a single item" in {
forAll{ (x: Int, y: Int) =>
val data = (x, y).toK[K]
val cmsMonoid = {
val heavyHittersN = 2
ScopedTopNCMS.monoid[K, K](EPS, DELTA, SEED, heavyHittersN)
val topCms = cmsMonoid.create(data)
topCms.totalCount should be(1)
topCms.cms.totalCount should be(1)
topCms.frequency((x, y).toK[K]).estimate should be(1)
// Poor man's way to come up with an item that is not x and that is very unlikely to hash to the same slot.
val otherItem = (x + 1, y)
topCms.frequency(otherItem.toK[K]).estimate should be(0)
// The following assert indirectly verifies whether the counting table is not all-zero (cf. GH-393).
topCms.innerProduct(topCms).estimate should be(1)

"(when adding CMS instances) keep all heavy hitters keys" in {
val heavyHittersN = 1
val monoid = ScopedTopNCMS.monoid[K, K](EPS, DELTA, SEED, heavyHittersN)
val cms1 = monoid.create(Seq((1, 1), (2, 3), (2, 3)).toK[K])
cms1.heavyHitters should be(Set((1, 1), (2, 3)).toK[K])
val cms2 = cms1 ++ monoid.create(Seq((3, 8), (3, 8), (3, 8)).toK[K])
cms2.heavyHitters should be(Set((1, 1), (2, 3), (3, 8)).toK[K])
val cms3 = cms2 ++ monoid.create(Seq((1, 1), (1, 1), (1, 1)).toK[K])
cms3.heavyHitters should be(Set((1, 1), (2, 3), (3, 8)).toK[K])
val cms4 = cms3 ++ monoid.create(Seq((6, 2), (6, 2), (6, 2), (6, 2), (6, 2), (6, 2)).toK[K])
cms4.heavyHitters should be(Set((1, 1), (2, 3), (3, 8), (6, 2)).toK[K])

"(when adding CMS instances) drop old heavy hitters for the same key when new heavy hitters replace them" in {
val heavyHittersN = 2
val monoid = ScopedTopNCMS.monoid[K, K](EPS, DELTA, SEED, heavyHittersN)
val cms1 = monoid.create(Seq((4, 1), (4, 2), (4, 2)).toK[K])
cms1.heavyHitters should be(Set((4, 1), (4, 2)).toK[K])
val cms2 = cms1 ++ monoid.create(Seq((4, 3), (4, 3), (4, 3)).toK[K])
cms2.heavyHitters should be(Set((4, 2), (4, 3)).toK[K])
val cms3 = cms2 ++ monoid.create(Seq((4, 1), (4, 1), (4, 1)).toK[K])
cms3.heavyHitters should be(Set((4, 3), (4, 1)).toK[K])
val cms4 = cms3 ++ monoid.create(Seq((4, 6), (4, 6), (4, 6), (4, 6), (4, 6), (4, 6)).toK[K])
cms4.heavyHitters should be(Set((4, 1), (4, 6)).toK[K])

"trim multiple keys at once" in {
val heavyHittersN = 2
val data =
Seq(1, 2, 2, 3, 3, 3, 6, 6, 6, 6, 6, 6).flatMap { i => Seq((4, i), (7, i + 2)) }.toK[K]
val monoid = ScopedTopNCMS.monoid[K, K](EPS, DELTA, SEED, heavyHittersN)
val cms = monoid.create(data)
cms.heavyHitters should be(Set((4, 3), (4, 6), (7, 5), (7, 8)).toK[K])



class CMSFunctionsSpec extends PropSpec with PropertyChecks with Matchers {
Expand Down Expand Up @@ -904,11 +962,26 @@ class CMSHasherBytesSpec extends CMSHasherSpec[Bytes]

abstract class CMSHasherSpec[K: CMSHasher: FromIntLike] extends PropSpec with PropertyChecks with Matchers {

property("returns positive hashes (i.e. slots) only") {
property("returns hashes (i.e. slots) in the range [0, width)") {
forAll { (a: Int, b: Int, width: Int, x: Int) =>
whenever (width > 0) {
val hash = CMSHash[K](a, b, width)
hash(x.toK[K]) should be >= 0
val hashValue = hash(x.toK[K])

hashValue should be >= 0
hashValue should be < width

property("returns scoped hashes in the range [0, width)") {
forAll { (a: Int, b: Int, width: Int, x: Int, y: Int) =>
whenever (width > 0) {
val hasher = ScopedTopNCMS.scopedHasher[K, K]
val hashValue = hasher.hash(a, b, width)((x, y).toK[K])

hashValue should be >= 0
hashValue should be < width
Expand Down Expand Up @@ -964,12 +1037,24 @@ object CmsTestImplicits {
def toK[T: FromIntLike]: T = implicitly[FromIntLike[T]].fromInt(x)

implicit class PairCast(x: (Int, Int)) {
def toK[T: FromIntLike]: (T, T) = (x._1.toK[T], x._2.toK[T])

implicit class SeqCast(xs: Seq[Int]) {
def toK[T: FromIntLike]: Seq[T] = xs map { _.toK[T] }

implicit class PairSeqCast(xs: Seq[(Int, Int)]) {
def toK[T: FromIntLike]: Seq[(T, T)] = xs map { _.toK[T] }

implicit class SetCast(xs: Set[Int]) {
def toK[T: FromIntLike]: Set[T] = xs map { _.toK[T] }

implicit class PairSetCast(xs: Set[(Int, Int)]) {
def toK[T: FromIntLike]: Set[(T, T)] = xs map { _.toK[T] }


