Skip to content
This repository

Creating SketchMap #151

Merged
merged 5 commits into from about 1 year ago

2 participants

Wen-Hao Lue P. Oscar Boykin
Wen-Hao Lue
wlue commented

A SketchMap is a more generalized version of the Count-Min Sketch that accepts any time of Key (K), and a Value with an ordering/monoid.

algebird-core/src/main/scala/com/twitter/algebird/SketchMap.scala
((17 lines not shown))
  17
+package com.twitter.algebird
  18
+
  19
+/**
  20
+ * A Sketch Map is a generalized version of the Count-Min Sketch that is an
  21
+ * approximation of Map[K, V] that stores reference to top heavy hitters. The
  22
+ * Sketch Map can approximate the sums of any summable value that has a monoid.
  23
+ */
  24
+
  25
+/**
  26
+ * Responsible for creating instances of SketchMap.
  27
+ */
  28
+class SketchMapMonoid[K, V](eps: Double, delta: Double, seed: Int, heavyHittersCount: Int)
  29
+                           (implicit serialization: K => Array[Byte], valueOrdering: Ordering[V], monoid: Monoid[V])
  30
+extends Monoid[SketchMap[K, V]] {
  31
+
  32
+  val hashes: Seq[SketchMapHash[K]] = {
1
P. Oscar Boykin Collaborator
johnynek added a note

Can we change the type here to Seq[(K) => Int] which abstracts us from the internals a bit better (should just work to change that).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
algebird-core/src/main/scala/com/twitter/algebird/SketchMap.scala
((190 lines not shown))
  190
+   * Returns a new set of sorted and concatenated heavy hitters given an
  191
+   * arbitrary list of keys.
  192
+   */
  193
+  private def updatedHeavyHitters(hitters: Seq[K], table: SketchMapValuesTable[V]): List[K] = {
  194
+    val mapping = calculateHeavyHittersMapping(hitters, table)
  195
+    val specificOrdering = Ordering.by[K, V] { mapping(_) } reverse
  196
+
  197
+    hitters.sorted(specificOrdering).take(params.heavyHittersCount).toList
  198
+  }
  199
+}
  200
+
  201
+
  202
+/**
  203
+ * Convenience class for holding constant parameters of a Sketch Map.
  204
+ */
  205
+case class SketchMapParams[K, V](hashes: Seq[SketchMapHash[K]], eps: Double, delta: Double, heavyHittersCount: Int) {
3
P. Oscar Boykin Collaborator
johnynek added a note

is V needed here?

Can we make this Seq[(K) => Int]?

P. Oscar Boykin Collaborator
johnynek added a note

let's just take width and depth here. Again it makes serialization easier. You can convert back at runtime with the SketchMap companion object.

Wen-Hao Lue
wlue added a note

I think the problem with storing width/depth is that the reverse conversion isn't accurate, since a width can map to many different eps values due to rounding.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
algebird-core/src/main/scala/com/twitter/algebird/SketchMap.scala
((13 lines not shown))
  13
+See the License for the specific language governing permissions and
  14
+limitations under the License.
  15
+*/
  16
+
  17
+package com.twitter.algebird
  18
+
  19
+/**
  20
+ * A Sketch Map is a generalized version of the Count-Min Sketch that is an
  21
+ * approximation of Map[K, V] that stores reference to top heavy hitters. The
  22
+ * Sketch Map can approximate the sums of any summable value that has a monoid.
  23
+ */
  24
+
  25
+/**
  26
+ * Responsible for creating instances of SketchMap.
  27
+ */
  28
+class SketchMapMonoid[K, V](eps: Double, delta: Double, seed: Int, heavyHittersCount: Int)
1
P. Oscar Boykin Collaborator
johnynek added a note

Can we store the equivalent intergers to eps and delta? i.e. width and depth? Integers are easy to reason about exactness, etc...

You have the method in the companion object to create a monoid given eps/delta for that, you can just use the functions in the companion object: eps(Int), delta(Int).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
algebird-core/src/main/scala/com/twitter/algebird/SketchMap.scala
((44 lines not shown))
  44
+  val params: SketchMapParams[K, V] = SketchMapParams[K, V](hashes, eps, delta, heavyHittersCount)
  45
+
  46
+  /**
  47
+   * A zero Sketch Map is one with zero elements.
  48
+   */
  49
+  val zero: SketchMap[K, V] = SketchMap[K, V](params, SketchMapValuesTable[V](params.depth, params.width), Nil, monoid.zero)
  50
+
  51
+  /**
  52
+   * We assume the Sketch Map on the left and right use the same hash functions.
  53
+   */
  54
+  def plus(left: SketchMap[K, V], right: SketchMap[K, V]): SketchMap[K, V] = left ++ right
  55
+
  56
+  /**
  57
+   * Create a Sketch Map sketch out of a single key.
  58
+   */
  59
+  def create(key: K, value: V): SketchMap[K, V] = zero + (key, value)
1
P. Oscar Boykin Collaborator
johnynek added a note

I think we should take kv: (K,V) here to be consistent with create, and note that the application becomes cleaner: zero + kv.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
algebird-core/src/main/scala/com/twitter/algebird/SketchMap.scala
((259 lines not shown))
  259
+   * Updates the value of a single cell in the table.
  260
+   */
  261
+  def +(pos: (Int, Int), value: V): SketchMapValuesTable[V] = {
  262
+    val (row, col) = pos
  263
+    val currValue: V = getValue(pos)
  264
+    val newValues = values.updated(row, values(row).updated(col, Monoid.plus(currValue, value)))
  265
+
  266
+    SketchMapValuesTable[V](newValues)
  267
+  }
  268
+
  269
+  /**
  270
+   * Adds another values table to this one, through elementwise addition.
  271
+   */
  272
+  def ++(other: SketchMapValuesTable[V]): SketchMapValuesTable[V] = {
  273
+    assert((depth, width) == (other.depth, other.width), "Tables must have the same dimensions.")
  274
+
1
P. Oscar Boykin Collaborator
johnynek added a note

Please add a comment about the jank here: scala 2.10 is more strict on recursive implicit resolution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
algebird-core/src/main/scala/com/twitter/algebird/SketchMap.scala
((228 lines not shown))
  228
+
  229
+
  230
+/**
  231
+ * The 2-dimensional table of values used in the Sketch Map.
  232
+ * Each row corresponds to a particular hash function.
  233
+ */
  234
+object SketchMapValuesTable {
  235
+  /**
  236
+   * Creates a new SketchMapValuesTable with counts initialized to all zeroes.
  237
+   */
  238
+  def apply[V](depth: Int, width: Int)(implicit monoid: Monoid[V]): SketchMapValuesTable[V] = {
  239
+    SketchMapValuesTable(AdaptiveVector.fill(depth)(AdaptiveVector.fill[V](width)(monoid.zero)))
  240
+  }
  241
+}
  242
+
  243
+case class SketchMapValuesTable[V](values: AdaptiveVector[AdaptiveVector[V]])(implicit monoid: Monoid[V]) {
1
P. Oscar Boykin Collaborator
johnynek added a note

Can you put this in a separate file and call it: AdaptiveMatrix[V] and remove the implicit monoid from the constructor?

You can add a companion object:

object AdaptiveMatrix {
def emptyV: AdaptiveMatrix[V]
implicit def monoid[V:Monoid]: Monoid[AdaptiveMatrix[V]] = {
// put the stuff here with the recursive AdaptiveVector wrapped in this type.
}
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
algebird-core/src/main/scala/com/twitter/algebird/SketchMap.scala
((246 lines not shown))
  246
+
  247
+  def depth: Int = values.size
  248
+  def width: Int = values(0).size
  249
+
  250
+  def getValue(pos: (Int, Int)): V = {
  251
+    val (row, col) = pos
  252
+
  253
+    assert(row < depth && col < width, "Position must be within the bounds of this table.")
  254
+
  255
+    values(row)(col)
  256
+  }
  257
+
  258
+  /**
  259
+   * Updates the value of a single cell in the table.
  260
+   */
  261
+  def +(pos: (Int, Int), value: V): SketchMapValuesTable[V] = {
1
P. Oscar Boykin Collaborator
johnynek added a note

Make this method take an implicit Monoid[V], don't make it global.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
algebird-core/src/main/scala/com/twitter/algebird/SketchMap.scala
((257 lines not shown))
  257
+
  258
+  /**
  259
+   * Updates the value of a single cell in the table.
  260
+   */
  261
+  def +(pos: (Int, Int), value: V): SketchMapValuesTable[V] = {
  262
+    val (row, col) = pos
  263
+    val currValue: V = getValue(pos)
  264
+    val newValues = values.updated(row, values(row).updated(col, Monoid.plus(currValue, value)))
  265
+
  266
+    SketchMapValuesTable[V](newValues)
  267
+  }
  268
+
  269
+  /**
  270
+   * Adds another values table to this one, through elementwise addition.
  271
+   */
  272
+  def ++(other: SketchMapValuesTable[V]): SketchMapValuesTable[V] = {
1
P. Oscar Boykin Collaborator
johnynek added a note

Use the code in the monoid you make above rather than here (that monoid will already have the Monoid[V]).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
algebird-core/src/main/scala/com/twitter/algebird/SketchMap.scala
((203 lines not shown))
  203
+ * Convenience class for holding constant parameters of a Sketch Map.
  204
+ */
  205
+case class SketchMapParams[K, V](hashes: Seq[SketchMapHash[K]], eps: Double, delta: Double, heavyHittersCount: Int) {
  206
+  assert(0 < eps && eps < 1, "eps must lie in (0, 1)")
  207
+  assert(0 < delta && delta < 1, "delta must lie in (0, 1)")
  208
+  assert(0 <= heavyHittersCount , "heavyHittersCount must be greater than 0")
  209
+
  210
+  val depth = SketchMap.depth(delta)
  211
+  val width = SketchMap.width(eps)
  212
+}
  213
+
  214
+
  215
+/**
  216
+ * Hashes an arbitrary key type to one that the Sketch Map can use.
  217
+ */
  218
+case class SketchMapHash[T](hasher: CMSHash, seed: Int)
1
P. Oscar Boykin Collaborator
johnynek added a note

Can this be a private inner class of SketchMapMonoid?

We've found the smaller the public API surface area is, the easier it is to keep things compatible.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
algebird-core/src/main/scala/com/twitter/algebird/AdaptiveMatrix.scala
((30 lines not shown))
  30
+   * Use recursive AdaptiveVector monoid.
  31
+   */
  32
+  implicit def monoid[V:Monoid]: Monoid[AdaptiveMatrix[V]] = new Monoid[AdaptiveMatrix[V]] {
  33
+    // Scala 2.10.0 is more strict with recursive implicit resolution, so hint
  34
+    // it with the inner monoid.
  35
+    private implicit val innerMonoid: Monoid[AdaptiveVector[V]] = AdaptiveVector.monoid[V]
  36
+    private val matrixMonoid = AdaptiveVector.monoid[AdaptiveVector[V]]
  37
+
  38
+    override def zero: AdaptiveMatrix[V] = AdaptiveMatrix[V](matrixMonoid.zero)
  39
+    override def plus(left: AdaptiveMatrix[V], right: AdaptiveMatrix[V]): AdaptiveMatrix[V] = {
  40
+      AdaptiveMatrix[V](matrixMonoid.plus(left.contents, right.contents))
  41
+    }
  42
+  }
  43
+}
  44
+
  45
+case class AdaptiveMatrix[V](contents: AdaptiveVector[AdaptiveVector[V]]) {
1
P. Oscar Boykin Collaborator
johnynek added a note

Can you change context to "rowsByCols" or something that indicates whether the rows ares on the outer vector or the columns are?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
algebird-core/src/main/scala/com/twitter/algebird/SketchMap.scala
((70 lines not shown))
  70
+
  71
+  /**
  72
+   * Create a Sketch Map sketch from a sequence of pairs.
  73
+   */
  74
+  def create(data: Seq[(K, V)]): SketchMap[K, V] = {
  75
+    data.foldLeft(zero) { case (acc, (key, value)) =>
  76
+      plus(acc, create(key, value))
  77
+    }
  78
+  }
  79
+}
  80
+
  81
+
  82
+/**
  83
+ * Convenience class for holding constant parameters of a Sketch Map.
  84
+ */
  85
+case class SketchMapParams[K, V](hashes: Seq[K => Int], width: Int, depth: Int, heavyHittersCount: Int) {
1
P. Oscar Boykin Collaborator
johnynek added a note

V is not needed here, right? Can't we remove the V type parameter?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
algebird-core/src/main/scala/com/twitter/algebird/SketchMap.scala
((75 lines not shown))
  75
+    data.foldLeft(zero) { case (acc, (key, value)) =>
  76
+      plus(acc, create(key, value))
  77
+    }
  78
+  }
  79
+}
  80
+
  81
+
  82
+/**
  83
+ * Convenience class for holding constant parameters of a Sketch Map.
  84
+ */
  85
+case class SketchMapParams[K, V](hashes: Seq[K => Int], width: Int, depth: Int, heavyHittersCount: Int) {
  86
+  assert(0 < width, "width must be greater than 0")
  87
+  assert(0 < depth, "depth must be greater than 0")
  88
+  assert(0 <= heavyHittersCount , "heavyHittersCount must be greater than 0")
  89
+
  90
+  val eps = SketchMap.eps(width)
1
P. Oscar Boykin Collaborator
johnynek added a note

Can we make this a def so we don't waste the memory on storing it? (same for delta).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
P. Oscar Boykin johnynek merged commit b75a6af into from
P. Oscar Boykin johnynek closed this
P. Oscar Boykin
Collaborator

Closes: #43

P. Oscar Boykin johnynek deleted the branch
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
57  algebird-core/src/main/scala/com/twitter/algebird/AdaptiveMatrix.scala
... ...
@@ -0,0 +1,57 @@
  1
+/*
  2
+Copyright 2012 Twitter, Inc.
  3
+
  4
+Licensed under the Apache License, Version 2.0 (the "License");
  5
+you may not use this file except in compliance with the License.
  6
+You may obtain a copy of the License at
  7
+
  8
+http://www.apache.org/licenses/LICENSE-2.0
  9
+
  10
+Unless required by applicable law or agreed to in writing, software
  11
+distributed under the License is distributed on an "AS IS" BASIS,
  12
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13
+See the License for the specific language governing permissions and
  14
+limitations under the License.
  15
+*/
  16
+
  17
+package com.twitter.algebird
  18
+
  19
+/**
  20
+ * A convenience class that composes AdaptiveVector[AdaptiveVector[V]] with
  21
+ * some useful functions.
  22
+ */
  23
+
  24
+object AdaptiveMatrix {
  25
+  def fill[V](rows: Int, columns: Int)(fill: V): AdaptiveMatrix[V] = {
  26
+    AdaptiveMatrix[V](AdaptiveVector.fill(rows)(AdaptiveVector.fill[V](columns)(fill)))
  27
+  }
  28
+
  29
+  /**
  30
+   * Use recursive AdaptiveVector monoid.
  31
+   */
  32
+  implicit def monoid[V:Monoid]: Monoid[AdaptiveMatrix[V]] = new Monoid[AdaptiveMatrix[V]] {
  33
+    // Scala 2.10.0 is more strict with recursive implicit resolution, so hint
  34
+    // it with the inner monoid.
  35
+    private implicit val innerMonoid: Monoid[AdaptiveVector[V]] = AdaptiveVector.monoid[V]
  36
+    private val matrixMonoid = AdaptiveVector.monoid[AdaptiveVector[V]]
  37
+
  38
+    override def zero: AdaptiveMatrix[V] = AdaptiveMatrix[V](matrixMonoid.zero)
  39
+    override def plus(left: AdaptiveMatrix[V], right: AdaptiveMatrix[V]): AdaptiveMatrix[V] = {
  40
+      AdaptiveMatrix[V](matrixMonoid.plus(left.rowsByColumns, right.rowsByColumns))
  41
+    }
  42
+  }
  43
+}
  44
+
  45
+case class AdaptiveMatrix[V](rowsByColumns: AdaptiveVector[AdaptiveVector[V]]) {
  46
+  /** Rows are the outer vectors, and columns are the inner vectors. */
  47
+  def rows: Int = rowsByColumns.size
  48
+  def columns: Int = rowsByColumns(0).size
  49
+
  50
+  def getValue(position: (Int, Int)): V = rowsByColumns(position._1)(position._2)
  51
+
  52
+  def updated(position: (Int, Int), value: V): AdaptiveMatrix[V] = {
  53
+    val (row, col) = position
  54
+    AdaptiveMatrix[V](rowsByColumns.updated(row, rowsByColumns(row).updated(col, value)))
  55
+  }
  56
+}
  57
+
2  algebird-core/src/main/scala/com/twitter/algebird/CountMinSketch.scala
@@ -413,7 +413,7 @@ case class CMSCountsTable(counts : Vector[Vector[Long]]) {
413 413
     val iil = Monoid.plus[IndexedSeq[IndexedSeq[Long]]](counts, other.counts)
414 414
     def toVector[V](is: IndexedSeq[V]): Vector[V] = {
415 415
       is match {
416  
-        case v: Vector[V] => v
  416
+        case v: Vector[_] => v
417 417
         case _ => Vector(is: _*)
418 418
       }
419 419
     }
227  algebird-core/src/main/scala/com/twitter/algebird/SketchMap.scala
... ...
@@ -0,0 +1,227 @@
  1
+/*
  2
+Copyright 2012 Twitter, Inc.
  3
+
  4
+Licensed under the Apache License, Version 2.0 (the "License");
  5
+you may not use this file except in compliance with the License.
  6
+You may obtain a copy of the License at
  7
+
  8
+http://www.apache.org/licenses/LICENSE-2.0
  9
+
  10
+Unless required by applicable law or agreed to in writing, software
  11
+distributed under the License is distributed on an "AS IS" BASIS,
  12
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13
+See the License for the specific language governing permissions and
  14
+limitations under the License.
  15
+*/
  16
+
  17
+package com.twitter.algebird
  18
+
  19
+/**
  20
+ * A Sketch Map is a generalized version of the Count-Min Sketch that is an
  21
+ * approximation of Map[K, V] that stores reference to top heavy hitters. The
  22
+ * Sketch Map can approximate the sums of any summable value that has a monoid.
  23
+ */
  24
+
  25
+
  26
+/**
  27
+ * Responsible for creating instances of SketchMap.
  28
+ */
  29
+class SketchMapMonoid[K, V](width: Int, depth: Int, seed: Int, heavyHittersCount: Int)
  30
+                           (implicit serialization: K => Array[Byte], valueOrdering: Ordering[V], monoid: Monoid[V])
  31
+extends Monoid[SketchMap[K, V]] {
  32
+  /**
  33
+   * Hashes an arbitrary key type to one that the Sketch Map can use.
  34
+   */
  35
+  private case class SketchMapHash(hasher: CMSHash, seed: Int) extends Function1[K, Int] {
  36
+    def apply(obj: K): Int = {
  37
+      val (first, second) = MurmurHash128(seed)(serialization(obj))
  38
+      hasher(first ^ second)
  39
+    }
  40
+  }
  41
+
  42
+  val hashes: Seq[K => Int] = {
  43
+    val r = new scala.util.Random(seed)
  44
+    val numHashes = depth
  45
+    val numCounters = width
  46
+    (0 to (numHashes - 1)).map { _ =>
  47
+      SketchMapHash(CMSHash(r.nextInt, 0, numCounters), seed)
  48
+    }
  49
+  }
  50
+
  51
+  /**
  52
+   * All Sketch Maps created with this monoid will have the same parameter configuration.
  53
+   */
  54
+  val params: SketchMapParams[K] = SketchMapParams[K](hashes, width, depth, heavyHittersCount)
  55
+
  56
+  /**
  57
+   * A zero Sketch Map is one with zero elements.
  58
+   */
  59
+  val zero: SketchMap[K, V] = SketchMap[K, V](params, AdaptiveMatrix.fill(params.depth, params.width)(monoid.zero), Nil, monoid.zero)
  60
+
  61
+  /**
  62
+   * We assume the Sketch Map on the left and right use the same hash functions.
  63
+   */
  64
+  def plus(left: SketchMap[K, V], right: SketchMap[K, V]): SketchMap[K, V] = left ++ right
  65
+
  66
+  /**
  67
+   * Create a Sketch Map sketch out of a single key/value pair.
  68
+   */
  69
+  def create(pair: (K, V)): SketchMap[K, V] = zero + pair
  70
+
  71
+  /**
  72
+   * Create a Sketch Map sketch from a sequence of pairs.
  73
+   */
  74
+  def create(data: Seq[(K, V)]): SketchMap[K, V] = {
  75
+    data.foldLeft(zero) { case (acc, (key, value)) =>
  76
+      plus(acc, create(key, value))
  77
+    }
  78
+  }
  79
+}
  80
+
  81
+
  82
+/**
  83
+ * Convenience class for holding constant parameters of a Sketch Map.
  84
+ */
  85
+case class SketchMapParams[K](hashes: Seq[K => Int], width: Int, depth: Int, heavyHittersCount: Int) {
  86
+  assert(0 < width, "width must be greater than 0")
  87
+  assert(0 < depth, "depth must be greater than 0")
  88
+  assert(0 <= heavyHittersCount , "heavyHittersCount must be greater than 0")
  89
+
  90
+  def eps = SketchMap.eps(width)
  91
+  def delta = SketchMap.delta(depth)
  92
+}
  93
+
  94
+
  95
+/**
  96
+ * Data structure representing an approximation of Map[K, V], where V has an
  97
+ * implicit ordering and monoid. This is a more generic version of
  98
+ * CountMinSketch.
  99
+ *
  100
+ * Values are stored in valuesTable, a 2D vector containing aggregated sums of
  101
+ * values inserted to the Sketch Map.
  102
+ *
  103
+ * The data structure stores top non-zero values, called Heavy Hitters. The
  104
+ * values are sorted by an implicit reverse ordering for the value, and the
  105
+ * number of heavy hitters stored is based on the heavyHittersCount set in
  106
+ * params.
  107
+ *
  108
+ * Use SketchMapMonoid to create instances of this class.
  109
+ */
  110
+
  111
+object SketchMap {
  112
+  /**
  113
+   * Functions to translate between (eps, delta) and (depth, width). The translation is:
  114
+   * depth = ceil(ln 1/delta)
  115
+   * width = ceil(e / eps)
  116
+   */
  117
+  def eps(width: Int): Double = scala.math.exp(1.0) / width
  118
+  def delta(depth: Int): Double = 1.0 / scala.math.exp(depth)
  119
+  def depth(delta: Double): Int = scala.math.ceil(scala.math.log(1.0 / delta)).toInt
  120
+  def width(eps: Double): Int = scala.math.ceil(scala.math.exp(1) / eps).toInt
  121
+
  122
+  /**
  123
+   * Generates a monoid used to create SketchMap instances. Requires a
  124
+   * serialization from K to Array[Byte] for hashing, an ordering for V, and a
  125
+   * monoid for V.
  126
+   */
  127
+  def monoid[K, V](eps: Double, delta: Double, seed: Int, heavyHittersCount: Int)
  128
+                  (implicit serialization: K => Array[Byte], valueOrdering: Ordering[V], monoid: Monoid[V]): SketchMapMonoid[K, V] = {
  129
+    new SketchMapMonoid(width(eps), depth(delta), seed, heavyHittersCount)(serialization, valueOrdering, monoid)
  130
+  }
  131
+}
  132
+
  133
+case class SketchMap[K, V](
  134
+  val params: SketchMapParams[K],
  135
+  val valuesTable: AdaptiveMatrix[V],
  136
+  val heavyHitterKeys: List[K],
  137
+  val totalValue: V
  138
+)(implicit ordering: Ordering[V], monoid: Monoid[V]) extends java.io.Serializable {
  139
+
  140
+  /**
  141
+   * All of the Heavy Hitter frequencies calculated all at once.
  142
+   */
  143
+  private val heavyHittersMapping: Map[K, V] = calculateHeavyHittersMapping(heavyHitterKeys, valuesTable)
  144
+
  145
+  /**
  146
+   * Ordering used to sort keys by its value. We use the reverse implicit
  147
+   * ordering on V because we want the hold the "largest" values.
  148
+   */
  149
+  private implicit val keyValueOrdering: Ordering[K] = Ordering.by[K, V] { heavyHittersMapping(_) } reverse
  150
+
  151
+  /**
  152
+   * These are not 100% accurate because of rounding.
  153
+   */
  154
+  def eps: Double = params.eps
  155
+  def delta: Double = params.delta
  156
+
  157
+  /**
  158
+   * Returns a sorted list of heavy hitter key/value tuples.
  159
+   */
  160
+  def heavyHitters: List[(K, V)] = heavyHitterKeys.map { item => (item, heavyHittersMapping(item)) }
  161
+
  162
+  /**
  163
+   * Calculates the frequencies for every heavy hitter.
  164
+   */
  165
+  private def calculateHeavyHittersMapping(keys: Iterable[K], table: AdaptiveMatrix[V]): Map[K, V] = {
  166
+    keys.map { item: K => (item, frequency(item, table)) } toMap
  167
+  }
  168
+
  169
+  /**
  170
+   * Calculates the frequency for a key given a values table.
  171
+   */
  172
+  private def frequency(key: K, table: AdaptiveMatrix[V]): V = {
  173
+    val estimates = table.rowsByColumns.zipWithIndex.map { case (row, i) =>
  174
+      row(params.hashes(i)(key))
  175
+    }
  176
+
  177
+    estimates.min
  178
+  }
  179
+
  180
+  /**
  181
+   * Calculates the approximate frequency for any key.
  182
+   */
  183
+  def frequency(key: K): V = {
  184
+    // If the key is a heavy hitter, then use the precalculated heavy hitters mapping.
  185
+    // Otherwise, calculate it normally.
  186
+    heavyHittersMapping.get(key).getOrElse(frequency(key, valuesTable))
  187
+  }
  188
+
  189
+  /**
  190
+   * Returns a new Sketch Map with a key value pair added.
  191
+   */
  192
+  def +(pair: (K, V)): SketchMap[K, V] = {
  193
+    val (key, value) = pair
  194
+
  195
+    val newHeavyHitters = key :: heavyHitterKeys
  196
+    val newValuesTable = (0 to (params.depth - 1)).foldLeft(valuesTable) { case (table, row) =>
  197
+      val pos = (row, params.hashes(row)(key))
  198
+      val currValue: V = table.getValue(pos)
  199
+      table.updated(pos, Monoid.plus(currValue, value))
  200
+    }
  201
+
  202
+    SketchMap(params, newValuesTable, updatedHeavyHitters(newHeavyHitters, newValuesTable), Monoid.plus(totalValue, value))
  203
+  }
  204
+
  205
+  /**
  206
+   * Returns a new Sketch Map summed with another Sketch Map. These should have
  207
+   * the same parameters, and be generated from the same monoid.
  208
+   */
  209
+  def ++(other: SketchMap[K, V]): SketchMap[K, V] = {
  210
+    val newValuesTable = Monoid.plus(valuesTable, other.valuesTable)
  211
+    val newHeavyHitters = (heavyHitterKeys ++ other.heavyHitterKeys).distinct
  212
+
  213
+    SketchMap(params, newValuesTable, updatedHeavyHitters(newHeavyHitters, newValuesTable), Monoid.plus(totalValue, other.totalValue))
  214
+  }
  215
+
  216
+  /**
  217
+   * Returns a new set of sorted and concatenated heavy hitters given an
  218
+   * arbitrary list of keys.
  219
+   */
  220
+  private def updatedHeavyHitters(hitters: Seq[K], table: AdaptiveMatrix[V]): List[K] = {
  221
+    val mapping = calculateHeavyHittersMapping(hitters, table)
  222
+    val specificOrdering = Ordering.by[K, V] { mapping(_) } reverse
  223
+
  224
+    hitters.sorted(specificOrdering).take(params.heavyHittersCount).toList
  225
+  }
  226
+}
  227
+
99  algebird-test/src/test/scala/com/twitter/algebird/SketchMapTest.scala
... ...
@@ -0,0 +1,99 @@
  1
+package com.twitter.algebird
  2
+
  3
+import org.specs._
  4
+
  5
+import org.scalacheck.Arbitrary
  6
+import org.scalacheck.Arbitrary.arbitrary
  7
+import org.scalacheck.Properties
  8
+import org.scalacheck.Gen.choose
  9
+import org.scalacheck.Prop.forAll
  10
+
  11
+object SketchMapTestImplicits {
  12
+  val DELTA = 1E-10
  13
+  val EPS = 0.001
  14
+  val SEED = 1
  15
+  val HEAVY_HITTERS_COUNT = 10
  16
+}
  17
+
  18
+
  19
+object SketchMapLaws extends Properties("SketchMap") {
  20
+  import BaseProperties._
  21
+  import SketchMapTestImplicits._
  22
+  import HyperLogLog.int2Bytes
  23
+
  24
+  implicit val smMonoid = SketchMap.monoid[Int, Long](EPS, DELTA, SEED, HEAVY_HITTERS_COUNT)
  25
+  implicit val smGen = Arbitrary {
  26
+    for (key: Int <- choose(0, 10000)) yield (smMonoid.create(key, 1L))
  27
+  }
  28
+
  29
+  property("SketchMap is a Monoid") = monoidLaws[SketchMap[Int, Long]]
  30
+}
  31
+
  32
+
  33
+class SketchMapTest extends Specification {
  34
+  import SketchMapTestImplicits._
  35
+  import HyperLogLog.int2Bytes
  36
+
  37
+  noDetailedDiffs()
  38
+
  39
+  val MONOID = SketchMap.monoid[Int, Long](EPS, DELTA, SEED, HEAVY_HITTERS_COUNT)
  40
+  val RAND = new scala.util.Random
  41
+
  42
+  "SketchMap" should {
  43
+    "count total number of elements in a stream" in {
  44
+      val totalCount = 1243
  45
+      val range = 234
  46
+      val data = (0 to (totalCount - 1)).map { _ => (RAND.nextInt(range), 1L) }
  47
+      val sm = MONOID.create(data)
  48
+
  49
+      sm.totalValue must be_==(totalCount)
  50
+    }
  51
+
  52
+    "exactly compute frequencies in a small stream" in {
  53
+      val one = MONOID.create(1, 1L)
  54
+      val two = MONOID.create(2, 1L)
  55
+      val sm = MONOID.plus(MONOID.plus(one, two), two)
  56
+
  57
+      sm.frequency(0) must be_==(0L)
  58
+      sm.frequency(1) must be_==(1L)
  59
+      sm.frequency(2) must be_==(2L)
  60
+
  61
+      val three = MONOID.create(1, 3L)
  62
+      three.frequency(1) must be_==(3L)
  63
+      val four = MONOID.create(1, 4L)
  64
+      four.frequency(1) must be_==(4L)
  65
+      val sm2 = MONOID.plus(four, three)
  66
+      sm2.frequency(1) must be_==(7L)
  67
+    }
  68
+
  69
+    "drop old heavy hitters when new heavy hitters replace them" in {
  70
+      val monoid = SketchMap.monoid[Int, Long](EPS, DELTA, SEED, 1)
  71
+
  72
+      val sm1 = monoid.create(Seq((1, 5L), (2, 4L)))
  73
+      sm1.heavyHitters must be_==(List((1, 5L)))
  74
+
  75
+      val sm2 = monoid.plus(sm1, monoid.create(2, 2L))
  76
+      sm2.heavyHitters must be_==(List((2, 6L)))
  77
+
  78
+      val sm3 = monoid.plus(sm2, monoid.create(1, 2L))
  79
+      sm3.heavyHitters must be_==(List((1, 7L)))
  80
+
  81
+      val sm4 = monoid.plus(sm3, monoid.create(0, 10L))
  82
+      sm4.heavyHitters must be_==(List((0, 10L)))
  83
+    }
  84
+
  85
+    "exactly compute heavy hitters in a small stream" in {
  86
+      val data = Seq((1, 1L), (2, 2L), (3, 3L), (4, 4L), (5, 5L))
  87
+
  88
+      val sm1 = SketchMap.monoid[Int, Long](EPS, DELTA, SEED, 5).create(data)
  89
+      val sm2 = SketchMap.monoid[Int, Long](EPS, DELTA, SEED, 3).create(data)
  90
+      val sm3 = SketchMap.monoid[Int, Long](EPS, DELTA, SEED, 1).create(data)
  91
+      val sm4 = SketchMap.monoid[Int, Long](EPS, DELTA, SEED, 0).create(data)
  92
+
  93
+      sm1.heavyHitterKeys must be_==(List(5, 4, 3, 2, 1))
  94
+      sm2.heavyHitterKeys must be_==(List(5, 4, 3))
  95
+      sm3.heavyHitterKeys must be_==(List(5))
  96
+      sm4.heavyHitterKeys must be_==(List.empty[Int])
  97
+    }
  98
+  }
  99
+}
Commit_comment_tip

Tip: You can add notes to lines in a file. Hover to the left of a line to make a note

Something went wrong with that request. Please try again.