Skip to content

Commit

Permalink
added decayed value and averaged value monoid.
Browse files Browse the repository at this point in the history
  • Loading branch information
sritchie committed Aug 2, 2012
1 parent 75e12cd commit acaa154
Showing 1 changed file with 86 additions and 0 deletions.
86 changes: 86 additions & 0 deletions src/main/scala/com/twitter/algebird/CommonMonoids.scala
Expand Up @@ -16,6 +16,92 @@ limitations under the License.

package com.twitter.algebird

// Represents a decayed value that is decayed of the form:
// \sum_i e^{-(t_i - t)} v_i
// 2^{-(t/th)} = exp(ln(2)(-t/th)) = exp(-t * (ln(2)/th))
// So time is measured in units of (half-life/ln(2)), so.
// t in seconds, 1 day half life means: t => t * ln(2)/(86400.0)

object DecayedValue extends java.io.Serializable {
def build[V <% Double](value : V, time : Double, halfLife : Double) = {
DecayedValue(value, time * scala.math.log(2.0)/halfLife)
}
val zero = DecayedValue(0.0, Double.NegativeInfinity)
def scale(newv : DecayedValue, oldv : DecayedValue, eps : Double) = {
val newValue = newv.value +
scala.math.exp(oldv.scaledTime - newv.scaledTime) * oldv.value
if( newValue > eps ) {
DecayedValue(newValue, newv.scaledTime)
}
else {
zero
}
}

def monoidWithEpsilon(eps : Double) = new Monoid[DecayedValue] {
override val zero = DecayedValue(0.0, Double.NegativeInfinity)
override def plus(left : DecayedValue, right : DecayedValue) = {
if (left < right) {
//left is older:
scale(right, left, eps)
}
else {
// right is older
scale(left, right, eps)
}
}
}
}

case class DecayedValue(value : Double, scaledTime : Double) extends Ordered[DecayedValue] {
def compare(that : DecayedValue) : Int = {
scaledTime.compareTo(that.scaledTime)
}
}

object AveragedValue {
def apply[V <% Double](v : V) = new AveragedValue(1L, v)
def apply[V <% Double](c : Long, v : V) = new AveragedValue(c, v)
}


case class AveragedValue(count : Long, value : Double)

object AveragedMonoid extends Monoid[AveragedValue] {
// When combining averages, if the counts sizes are too close we should use a different
// algorithm. This constant defines how close the ratio of the smaller to the total count
// can be:
private val STABILITY_CONSTANT = 0.1
/**
* uses a more stable online algorithm which should
* be suitable for large numbers of records
* similar to:
* http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm
*/
val zero = AveragedValue(0L, 0.0)
def plus(cntAve1 : AveragedValue, cntAve2 : AveragedValue) : AveragedValue = {
val (big, small) = if (cntAve1.count >= cntAve2.count)
(cntAve1, cntAve2)
else
(cntAve2, cntAve1)
val n = big.count
val k = small.count
if (k == 0L) {
// Handle zero without allocation
big
}
else {
val an = big.value
val ak = small.value
val newCnt = n+k
val scaling = k.toDouble/newCnt
// a_n + (a_k - a_n)*(k/(n+k)) is only stable if n is not approximately k
val newAve = if (scaling < STABILITY_CONSTANT) (an + (ak - an)*scaling) else (n*an + k*ak)/newCnt
new AveragedValue(newCnt, newAve)
}
}
}

/**
* This is an associative, but not commutative monoid
* Also, you must start on the right, with a value, and all subsequent RightFolded must
Expand Down

0 comments on commit acaa154

Please sign in to comment.