Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds a presenting benchmark and optimizes it #427

Merged
merged 11 commits into from
Apr 1, 2015
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ matrix:
include:
- scala: 2.10.4
script: ./sbt ++$TRAVIS_SCALA_VERSION clean test

- scala: 2.11.5
script: ./sbt ++$TRAVIS_SCALA_VERSION clean coverage test
script: ./sbt ++$TRAVIS_SCALA_VERSION clean test
after_success: "./sbt coveralls"
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import com.twitter.bijection._
import java.util.concurrent.Executors
import com.twitter.algebird.util._
import com.google.caliper.{ Param, SimpleBenchmark }
import com.twitter.algebird.HyperLogLogMonoid
import java.nio.ByteBuffer

import scala.math._
Expand All @@ -19,7 +18,7 @@ class OldMonoid(bits: Int) extends HyperLogLogMonoid(bits) {
else {
val buffer = new Array[Byte](size)
items.foreach { _.updateInto(buffer) }
Some(DenseHLL(bits, buffer.toIndexedSeq))
Some(DenseHLL(bits, Bytes(buffer)))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.twitter.algebird.caliper

import com.google.caliper.{ SimpleBenchmark, Param }
import com.twitter.algebird.{ HyperLogLogMonoid, HLL }
import com.twitter.bijection._
import java.nio.ByteBuffer

class HLLPresentBenchmark extends SimpleBenchmark {
@Param(Array("5", "10", "17", "20"))
val bits: Int = 0

@Param(Array("10", "100", "500", "1000", "10000"))
val max: Int = 0

@Param(Array("10", "20", "100"))
val numHLL: Int = 0

var data: IndexedSeq[HLL] = _

implicit val byteEncoder = implicitly[Injection[Long, Array[Byte]]]

override def setUp {
val hllMonoid = new HyperLogLogMonoid(bits)
val r = new scala.util.Random(12345L)
data = (0 until numHLL).map { _ =>
val input = (0 until max).map(_ => r.nextLong).toSet
hllMonoid.batchCreate(input)(byteEncoder.toFunction)
}.toIndexedSeq

}

def timeBatchCreate(reps: Int): Int = {
var dummy = 0
while (dummy < reps) {
data.foreach { hll =>
hll.approximateSize
}
dummy += 1
}
dummy
}
}
7 changes: 5 additions & 2 deletions algebird-core/src/main/scala/com/twitter/algebird/Bytes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,23 @@ final case class Bytes(array: Array[Byte]) extends java.io.Serializable {
* intentionally do not create a defensive, immutable copy because of performance considerations).
*/
override def equals(that: Any): Boolean = that match {
case Bytes(thatArray) => array sameElements thatArray
case Bytes(thatArray) => java.util.Arrays.equals(array, thatArray)
case _ => false
}

override def toString: String = array.map(_.toString).mkString("Bytes(", ",", ")")

def apply(idx: Int) = array.apply(idx)

def size = array.size
}

object Bytes {

private val HashSeed = 0

implicit val ordering: Ordering[Bytes] = new Ordering[Bytes] {
def compare(a: Bytes, b: Bytes): Int = ByteBuffer.wrap(a.array) compareTo ByteBuffer.wrap(b.array)
def compare(a: Bytes, b: Bytes): Int = ByteBuffer.wrap(a.array).compareTo(ByteBuffer.wrap(b.array))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class CMSMonoid[K: CMSHasher](eps: Double, delta: Double, seed: Int) extends Mon
/**
* Creates a sketch out of a single item.
*/
def create(item: K): CMS[K] = CMSItem[K](item, params)
def create(item: K): CMS[K] = CMSItem[K](item, 1L, params)

/**
* Creates a sketch out of multiple items.
Expand Down Expand Up @@ -426,7 +426,7 @@ case class CMSZero[K](override val params: CMSParams[K]) extends CMS[K](params)

override val totalCount: Long = 0L

override def +(item: K, count: Long): CMS[K] = CMSInstance[K](params) + (item, count)
override def +(item: K, count: Long): CMS[K] = CMSItem[K](item, count, params)

override def ++(other: CMS[K]): CMS[K] = other

Expand All @@ -439,9 +439,7 @@ case class CMSZero[K](override val params: CMSParams[K]) extends CMS[K](params)
/**
* Used for holding a single element, to avoid repeatedly adding elements from sparse counts tables.
*/
case class CMSItem[K](item: K, override val params: CMSParams[K]) extends CMS[K](params) {

override val totalCount: Long = 1L
case class CMSItem[K](item: K, override val totalCount: Long, override val params: CMSParams[K]) extends CMS[K](params) {

override def +(x: K, count: Long): CMS[K] = CMSInstance[K](params) + item + (x, count)

Expand Down
131 changes: 81 additions & 50 deletions algebird-core/src/main/scala/com/twitter/algebird/HyperLogLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ object HyperLogLog {
buf
}

def twopow(i: Int): Double = scala.math.pow(2.0, i)
@inline
def twopow(i: Int): Double = java.lang.Math.pow(2.0, i)

/**
* the value 'j' is equal to <w_0, w_1 ... w_(bits-1)>
Expand Down Expand Up @@ -135,27 +136,26 @@ object HyperLogLog {
}
buf

case DenseHLL(bits, v) => ((2: Byte) +: bits.toByte +: v).toArray
case DenseHLL(bits, v) =>
val bb = ByteBuffer.allocate(v.size + 2)
bb.put(2: Byte)
bb.put(bits.toByte)
bb.put(v.array)
bb.array
}
}

// Make sure to be reversible so fromBytes(toBytes(x)) == x
def fromBytes(bytes: Array[Byte]): HLL = {
val bb = ByteBuffer.wrap(bytes)
bb.get.toInt match {
case 2 => DenseHLL(bb.get, bytes.toIndexedSeq.tail.tail)
case 3 => sparseFromByteBuffer(bb)
case n => throw new Exception("Unrecognized HLL type: " + n)
}
}
def fromBytes(bytes: Array[Byte]): HLL =
fromByteBuffer(ByteBuffer.wrap(bytes))

def fromByteBuffer(bb: ByteBuffer): HLL = {
bb.get.toInt match {
case 2 =>
val bits = bb.get
val buf = new Array[Byte](bb.remaining)
bb.get(buf)
DenseHLL(bits, buf)
DenseHLL(bits, Bytes(buf))
case 3 => sparseFromByteBuffer(bb)
case n => throw new Exception("Unrecognized HLL type: " + n)
}
Expand Down Expand Up @@ -201,11 +201,15 @@ sealed abstract class HLL extends java.io.Serializable {

def approximateSize = asApprox(initialEstimate)

def estimatedSize = approximateSize.estimate.toDouble
def estimatedSize: Double = initialEstimate

private lazy val initialEstimate: Double = {

private def initialEstimate = {
val sizeDouble: Double = size.toDouble
val smallE = 5 * sizeDouble / 2.0
val factor = alpha(bits) * sizeDouble * sizeDouble

val e = factor * z
val e: Double = factor * z
// There are large and small value corrections from the paper
// We stopped using the large value corrections since when using Long's
// there was pathalogically bad results. See https://github.com/twitter/algebird/issues/284
Expand All @@ -225,10 +229,6 @@ sealed abstract class HLL extends java.io.Serializable {
Approximate(lowerBound, v.toLong, upperBound, prob3StdDev)
}

private def factor = alpha(bits) * size.toDouble * size.toDouble

private def smallE = 5 * size / 2.0

private def smallEstimate(e: Double): Double = {
if (zeroCnt == 0) {
e
Expand Down Expand Up @@ -329,31 +329,36 @@ case class SparseHLL(bits: Int, maxRhow: Map[Int, Max[Byte]]) extends HLL {
if (allMaxRhow.size * 16 <= size) {
SparseHLL(bits, allMaxRhow)
} else {
DenseHLL(bits, sparseMapToSequence(allMaxRhow))
DenseHLL(bits, sparseMapToArray(allMaxRhow))
}

case DenseHLL(bits, oldV) =>
assert(oldV.size == size, "Incompatible HLL size: " + oldV.size + " != " + size)
val newV = maxRhow.foldLeft(oldV) {
case (v, (j, rhow)) =>
if (rhow.get > v(j)) {
v.updated(j, rhow.get)
} else {
v
}
val newContents: Array[Byte] = oldV.array.clone
val siz = newContents.size

val iter: Iterator[(Int, Max[Byte])] = maxRhow.iterator
while (iter.hasNext) {
val (idx, maxB) = iter.next
val existing: Byte = newContents(idx)
val other: Byte = maxRhow(idx).get

if (other > existing)
newContents.update(idx, other)
}
DenseHLL(bits, newV)

DenseHLL(bits, Bytes(newContents))
}
}

def sparseMapToSequence(values: Map[Int, Max[Byte]]): IndexedSeq[Byte] = {
def sparseMapToArray(values: Map[Int, Max[Byte]]): Bytes = {
val array = Array.fill[Byte](size)(0: Byte)
values.foreach { case (j, rhow) => array.update(j, rhow.get) }
array.toIndexedSeq
Bytes(array)
}

lazy val toDenseHLL = DenseHLL(bits, sparseMapToSequence(maxRhow))
lazy val toDenseHLL = DenseHLL(bits, sparseMapToArray(maxRhow))

def updateInto(buffer: Array[Byte]): Unit = {
assert(buffer.length == size, "Length mismatch")
maxRhow.foreach {
Expand All @@ -378,17 +383,35 @@ case class SparseHLL(bits: Int, maxRhow: Map[Int, Max[Byte]]) extends HLL {
/**
* These are the individual instances which the Monoid knows how to add
*/
case class DenseHLL(bits: Int, v: IndexedSeq[Byte]) extends HLL {
case class DenseHLL(bits: Int, v: Bytes) extends HLL {

assert(v.size == (1 << bits), "Invalid size for dense vector: " + size + " != (1 << " + bits + ")")

def size = v.size

lazy val zeroCnt = v.count { _ == 0 }

// Named from the parameter in the paper, probably never useful to anyone
// except HyperLogLogMonoid
lazy val z = 1.0 / (v.map { mj => HyperLogLog.twopow(-mj) }.sum)

lazy val (zeroCnt, z) = {
var count: Int = 0
var res: Double = 0

// goto while loop to avoid closure
val arr: Array[Byte] = v.array
val arrSize: Int = arr.size
var idx: Int = 0
while (idx < arrSize) {
val mj = arr(idx)
if (mj == 0) {
count += 1
res += 1.0
} else {
res += java.lang.Math.pow(2.0, -mj)
}
idx += 1
}
(count, 1.0 / res)
}

def +(other: HLL): HLL = {

Expand All @@ -398,21 +421,35 @@ case class DenseHLL(bits: Int, v: IndexedSeq[Byte]) extends HLL {

case DenseHLL(_, ov) =>
assert(ov.size == v.size, "Incompatible HLL size: " + ov.size + " != " + v.size)
DenseHLL(bits,
v
.view
.zip(ov)
.map { case (rhow, oRhow) => rhow max oRhow }
.toIndexedSeq)

val siz: Int = ov.size
val newContents: Array[Byte] = new Array[Byte](siz)

val other: Array[Byte] = ov.array
val thisArray: Array[Byte] = v.array

var indx: Int = 0
while (indx < siz) {
val rhow = thisArray(indx)
val oRhow = other(indx)
newContents.update(indx, rhow max oRhow)
indx += 1
}

DenseHLL(bits, Bytes(newContents))
}
}

val toDenseHLL = this
def updateInto(buffer: Array[Byte]): Unit = {
assert(buffer.length == size, "Length mismatch")
var idx = 0
v.foreach { maxb =>

val arr: Array[Byte] = v.array
val arrSize: Int = arr.size
var idx: Int = 0

while (idx < arrSize) {
val maxb = arr(idx)
buffer.update(idx, (buffer(idx)) max maxb)
idx += 1
}
Expand All @@ -426,16 +463,10 @@ case class DenseHLL(bits: Int, v: IndexedSeq[Byte]) extends HLL {
val newRhoW = reducedV(newJ)
reducedV.update(newJ, modifiedRhoW max newRhoW)
}
DenseHLL(reducedBits, new IndexedSeqArrayByte(reducedV))
DenseHLL(reducedBits, Bytes(reducedV))
}
}

class IndexedSeqArrayByte(buf: Array[Byte]) extends scala.collection.IndexedSeq[Byte] {
def length = buf.length
def apply(idx: Int): Byte = buf.apply(idx)
override def stringPrefix = "Array"
}

/*
* Error is about 1.04/sqrt(2^{bits}), so you want something like 12 bits for 1% error
* which means each HLLInstance is about 2^{12} = 4kb per instance.
Expand All @@ -458,7 +489,7 @@ class HyperLogLogMonoid(val bits: Int) extends Monoid[HLL] {
existing.updateInto(buffer)
iter.foreach { _.updateInto(buffer) }

DenseHLL(bits, new IndexedSeqArrayByte(buffer))
DenseHLL(bits, Bytes(buffer))
}

override def sumOption(items: TraversableOnce[HLL]): Option[HLL] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,14 @@ class AggregatorLaws extends CheckProperties {
}
property("MonoidAggregator.either is correct") {
forAll { (in: List[(Int, Int)], agl: MonoidAggregator[Int, Int, Int], agr: MonoidAggregator[Int, Int, Int]) =>
assert(agl.zip(agr).apply(in) ==
agl.either(agr).apply(in.flatMap { case (l, r) => List(Left(l), Right(r)) }))
agl.zip(agr).apply(in) ==
agl.either(agr).apply(in.flatMap { case (l, r) => List(Left(l), Right(r)) })
}
}

property("MonoidAggregator.filter is correct") {
forAll { (in: List[Int], ag: MonoidAggregator[Int, Int, Int], fn: Int => Boolean) =>
assert(ag.filterBefore(fn).apply(in) == ag.apply(in.filter(fn)))
ag.filterBefore(fn).apply(in) == ag.apply(in.filter(fn))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.twitter.algebird

import org.scalacheck.Gen
import org.scalatest.prop.GeneratorDrivenPropertyChecks
import org.scalatest.{Matchers, WordSpec}
import org.scalatest.{ Matchers, WordSpec }

class BytesSpec extends WordSpec with Matchers with GeneratorDrivenPropertyChecks {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class CollectionSpecification extends CheckProperties {

property("Array Monoid laws") {
monoidLawsEq[Array[Int]]{
case (a,b) => a.deep == b.deep
case (a, b) => a.deep == b.deep
}
}

Expand Down
Loading