diff --git a/algebird-core/src/main/scala/com/twitter/algebird/HyperLogLogSeries.scala b/algebird-core/src/main/scala/com/twitter/algebird/HyperLogLogSeries.scala new file mode 100644 index 000000000..ead3f8536 --- /dev/null +++ b/algebird-core/src/main/scala/com/twitter/algebird/HyperLogLogSeries.scala @@ -0,0 +1,70 @@ +/* +Copyright 2014 Twitter, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package com.twitter.algebird + +case class HLLSeries(bits: Int, rows: Vector[Map[Int, Long]]) { + def since(threshold: Long) = + HLLSeries( + bits, + rows.map{ _.filter{ case (j, ts) => ts >= threshold } }) + + def toHLL: HLL = + if (rows.isEmpty) + SparseHLL(bits, Map()) + else + rows.zipWithIndex.map{ + case (map, i) => + SparseHLL(bits, map.mapValues{ ts => Max((i + 1).toByte) }): HLL + }.reduce{ _ + _ } +} + +class HyperLogLogSeriesMonoid(val bits: Int) extends Monoid[HLLSeries] { + import HyperLogLog._ + + val zero = HLLSeries(bits, Vector()) + + def create(example: Array[Byte], timestamp: Long): HLLSeries = { + val hashed = hash(example) + val (j, rhow) = jRhoW(hashed, bits) + + val vector = Vector.fill(rhow - 1){ Map[Int, Long]() } ++ Vector(Map(j -> timestamp)) + HLLSeries(bits, vector) + } + + def plus(left: HLLSeries, right: HLLSeries): HLLSeries = { + if (left.rows.size > right.rows.size) + plus(right, left) + else { + val zipped = left.rows.zip(right.rows).map{ + case (l, r) => + combine(l, r) + } + HLLSeries( + bits, + zipped ++ right.rows.slice(left.rows.size, right.rows.size)) + } + } + + private def combine(left: Map[Int, Long], right: Map[Int, Long]): Map[Int, Long] = { + if (left.size > right.size) + combine(right, left) + else { + right ++ + left.map{ case (k, v) => k -> (right.getOrElse(k, 0L).max(v)) } + } + } +} diff --git a/algebird-test/src/test/scala/com/twitter/algebird/HyperLogLogSeriesTest.scala b/algebird-test/src/test/scala/com/twitter/algebird/HyperLogLogSeriesTest.scala new file mode 100644 index 000000000..6e2fa4ec3 --- /dev/null +++ b/algebird-test/src/test/scala/com/twitter/algebird/HyperLogLogSeriesTest.scala @@ -0,0 +1,63 @@ +package com.twitter.algebird + +import org.scalatest._ + +import org.scalacheck.{ Gen, Arbitrary } + +import HyperLogLog._ //Get the implicit int2bytes, long2Bytes + +class HyperLogLogSeriesLaws extends CheckProperties { + import BaseProperties._ + + implicit val hllSeriesMonoid = new HyperLogLogSeriesMonoid(5) //5 bits + + implicit val hllSeriesGen = Arbitrary { + for ( + v <- Gen.choose(0, 10000) + ) yield (hllSeriesMonoid.create(v, v)) + } + + property("HyperLogLogSeries is a Monoid") { + monoidLawsEq[HLLSeries]{ _.toHLL == _.toHLL } + } +} + +class HyperLogLogSeriesTest extends WordSpec with Matchers { + def getHllCount[T <% Array[Byte]](it: Iterable[T], hll: HyperLogLogMonoid) = { + hll.sizeOf(hll.sum(it.map { hll(_) })).estimate.toDouble + } + + def aveErrorOf(bits: Int): Double = 1.04 / scala.math.sqrt(1 << bits) + + def testApproximatelyEqual(hllSeries: HLLSeries, hllCount: Double, bits: Int) = { + val seriesResult = hllSeries.toHLL.estimatedSize + assert(scala.math.abs(seriesResult - hllCount) / seriesResult < (3.5 * aveErrorOf(bits))) + } + + "HyperLogLogSeries" should { + "properly calculate .since" in { + val bits = 12 + val hllSeriesMonoid = new HyperLogLogSeriesMonoid(bits) + val hll = new HyperLogLogMonoid(bits) + + val timestamps = (1 to 100).map { _.toLong } + val r = new java.util.Random + val timestampedData = timestamps.map { t => (r.nextLong, t) } + + val series = timestampedData + .map{ case (value, timestamp) => hllSeriesMonoid.create(value, timestamp) } + .reduce{ hllSeriesMonoid.plus(_, _) } + + timestamps.foreach { timestamp => + val seriesResult = series.since(timestamp) + + val dataSinceTimestamp = timestampedData + .dropWhile { case (_, t) => t < timestamp } + .map { case (value, _) => value } + val expected = getHllCount(dataSinceTimestamp, hll) + + testApproximatelyEqual(seriesResult, expected, bits) + } + } + } +}