From d00f183bacf6174d9d0530588d1e59ff9383aca8 Mon Sep 17 00:00:00 2001 From: Avi Bryant Date: Sat, 5 Apr 2014 13:42:42 -0700 Subject: [PATCH 1/3] HyperLogLogSeries --- .../twitter/algebird/HyperLogLogSeries.scala | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 algebird-core/src/main/scala/com/twitter/algebird/HyperLogLogSeries.scala diff --git a/algebird-core/src/main/scala/com/twitter/algebird/HyperLogLogSeries.scala b/algebird-core/src/main/scala/com/twitter/algebird/HyperLogLogSeries.scala new file mode 100644 index 000000000..44d39ec7f --- /dev/null +++ b/algebird-core/src/main/scala/com/twitter/algebird/HyperLogLogSeries.scala @@ -0,0 +1,68 @@ +/* +Copyright 2014 Twitter, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package com.twitter.algebird + +case class HLLSeries(bits: Int, rows: Vector[Map[Int,Long]]) { + def since(threshold: Long) = + HLLSeries( + bits, + rows.map{_.filter{case (j, ts) => ts >= threshold}}) + + def toHLL : HLL = + if(rows.isEmpty) + SparseHLL(bits, Map()) + else + rows.zipWithIndex.map{ case (map, i) => + SparseHLL(bits, map.mapValues{ts => Max((i+1).toByte)}) : HLL + }.reduce{_ + _} +} + +class HyperLogLogSeriesMonoid(val bits : Int) extends Monoid[HLLSeries] { + import HyperLogLog._ + + val zero = HLLSeries(bits, Vector()) + + def create(example : Array[Byte], timestamp: Long) : HLLSeries = { + val hashed = hash(example) + val (j,rhow) = jRhoW(hashed, bits) + + val vector = Vector.fill(rhow-1){Map[Int,Long]()} ++ Vector(Map(j -> timestamp)) + HLLSeries(bits, vector) + } + + def plus(left: HLLSeries, right: HLLSeries) : HLLSeries = { + if(left.rows.size > right.rows.size) + plus(right, left) + else { + val zipped = left.rows.zip(right.rows).map{case (l, r) => + combine(l, r) + } + HLLSeries( + bits, + zipped ++ right.rows.slice(left.rows.size, right.rows.size)) + } + } + + private def combine(left: Map[Int,Long], right: Map[Int,Long]) : Map[Int,Long]= { + if(left.size > right.size) + combine(right, left) + else { + right ++ + left.map{case (k,v) => k -> (right.getOrElse(k, 0L).max(v))} + } + } +} From 59c280c9de927cc4057eeaac01f23b1f9f2e08e7 Mon Sep 17 00:00:00 2001 From: Danielle Sucher Date: Fri, 10 Apr 2015 17:52:47 -0400 Subject: [PATCH 2/3] Add tests for HyperLogLogSeries --- .../algebird/HyperLogLogSeriesTest.scala | 63 +++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 algebird-test/src/test/scala/com/twitter/algebird/HyperLogLogSeriesTest.scala diff --git a/algebird-test/src/test/scala/com/twitter/algebird/HyperLogLogSeriesTest.scala b/algebird-test/src/test/scala/com/twitter/algebird/HyperLogLogSeriesTest.scala new file mode 100644 index 000000000..6e2fa4ec3 --- /dev/null +++ b/algebird-test/src/test/scala/com/twitter/algebird/HyperLogLogSeriesTest.scala @@ -0,0 +1,63 @@ +package com.twitter.algebird + +import org.scalatest._ + +import org.scalacheck.{ Gen, Arbitrary } + +import HyperLogLog._ //Get the implicit int2bytes, long2Bytes + +class HyperLogLogSeriesLaws extends CheckProperties { + import BaseProperties._ + + implicit val hllSeriesMonoid = new HyperLogLogSeriesMonoid(5) //5 bits + + implicit val hllSeriesGen = Arbitrary { + for ( + v <- Gen.choose(0, 10000) + ) yield (hllSeriesMonoid.create(v, v)) + } + + property("HyperLogLogSeries is a Monoid") { + monoidLawsEq[HLLSeries]{ _.toHLL == _.toHLL } + } +} + +class HyperLogLogSeriesTest extends WordSpec with Matchers { + def getHllCount[T <% Array[Byte]](it: Iterable[T], hll: HyperLogLogMonoid) = { + hll.sizeOf(hll.sum(it.map { hll(_) })).estimate.toDouble + } + + def aveErrorOf(bits: Int): Double = 1.04 / scala.math.sqrt(1 << bits) + + def testApproximatelyEqual(hllSeries: HLLSeries, hllCount: Double, bits: Int) = { + val seriesResult = hllSeries.toHLL.estimatedSize + assert(scala.math.abs(seriesResult - hllCount) / seriesResult < (3.5 * aveErrorOf(bits))) + } + + "HyperLogLogSeries" should { + "properly calculate .since" in { + val bits = 12 + val hllSeriesMonoid = new HyperLogLogSeriesMonoid(bits) + val hll = new HyperLogLogMonoid(bits) + + val timestamps = (1 to 100).map { _.toLong } + val r = new java.util.Random + val timestampedData = timestamps.map { t => (r.nextLong, t) } + + val series = timestampedData + .map{ case (value, timestamp) => hllSeriesMonoid.create(value, timestamp) } + .reduce{ hllSeriesMonoid.plus(_, _) } + + timestamps.foreach { timestamp => + val seriesResult = series.since(timestamp) + + val dataSinceTimestamp = timestampedData + .dropWhile { case (_, t) => t < timestamp } + .map { case (value, _) => value } + val expected = getHllCount(dataSinceTimestamp, hll) + + testApproximatelyEqual(seriesResult, expected, bits) + } + } + } +} From 6efc5907b784711840cca416aa8d9197bde00fe5 Mon Sep 17 00:00:00 2001 From: Danielle Sucher Date: Tue, 14 Apr 2015 17:59:55 -0400 Subject: [PATCH 3/3] autoformatting --- .../twitter/algebird/HyperLogLogSeries.scala | 54 ++++++++++--------- 1 file changed, 28 insertions(+), 26 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/HyperLogLogSeries.scala b/algebird-core/src/main/scala/com/twitter/algebird/HyperLogLogSeries.scala index 44d39ec7f..ead3f8536 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/HyperLogLogSeries.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/HyperLogLogSeries.scala @@ -16,53 +16,55 @@ limitations under the License. package com.twitter.algebird -case class HLLSeries(bits: Int, rows: Vector[Map[Int,Long]]) { +case class HLLSeries(bits: Int, rows: Vector[Map[Int, Long]]) { def since(threshold: Long) = HLLSeries( bits, - rows.map{_.filter{case (j, ts) => ts >= threshold}}) + rows.map{ _.filter{ case (j, ts) => ts >= threshold } }) - def toHLL : HLL = - if(rows.isEmpty) + def toHLL: HLL = + if (rows.isEmpty) SparseHLL(bits, Map()) else - rows.zipWithIndex.map{ case (map, i) => - SparseHLL(bits, map.mapValues{ts => Max((i+1).toByte)}) : HLL - }.reduce{_ + _} + rows.zipWithIndex.map{ + case (map, i) => + SparseHLL(bits, map.mapValues{ ts => Max((i + 1).toByte) }): HLL + }.reduce{ _ + _ } } -class HyperLogLogSeriesMonoid(val bits : Int) extends Monoid[HLLSeries] { +class HyperLogLogSeriesMonoid(val bits: Int) extends Monoid[HLLSeries] { import HyperLogLog._ val zero = HLLSeries(bits, Vector()) - def create(example : Array[Byte], timestamp: Long) : HLLSeries = { + def create(example: Array[Byte], timestamp: Long): HLLSeries = { val hashed = hash(example) - val (j,rhow) = jRhoW(hashed, bits) + val (j, rhow) = jRhoW(hashed, bits) - val vector = Vector.fill(rhow-1){Map[Int,Long]()} ++ Vector(Map(j -> timestamp)) + val vector = Vector.fill(rhow - 1){ Map[Int, Long]() } ++ Vector(Map(j -> timestamp)) HLLSeries(bits, vector) } - def plus(left: HLLSeries, right: HLLSeries) : HLLSeries = { - if(left.rows.size > right.rows.size) - plus(right, left) - else { - val zipped = left.rows.zip(right.rows).map{case (l, r) => - combine(l, r) - } - HLLSeries( - bits, - zipped ++ right.rows.slice(left.rows.size, right.rows.size)) + def plus(left: HLLSeries, right: HLLSeries): HLLSeries = { + if (left.rows.size > right.rows.size) + plus(right, left) + else { + val zipped = left.rows.zip(right.rows).map{ + case (l, r) => + combine(l, r) } + HLLSeries( + bits, + zipped ++ right.rows.slice(left.rows.size, right.rows.size)) + } } - private def combine(left: Map[Int,Long], right: Map[Int,Long]) : Map[Int,Long]= { - if(left.size > right.size) - combine(right, left) + private def combine(left: Map[Int, Long], right: Map[Int, Long]): Map[Int, Long] = { + if (left.size > right.size) + combine(right, left) else { - right ++ - left.map{case (k,v) => k -> (right.getOrElse(k, 0L).max(v))} + right ++ + left.map{ case (k, v) => k -> (right.getOrElse(k, 0L).max(v)) } } } }