Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Time based aggregators #167

Merged
merged 10 commits into from
Oct 31, 2018
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
/*
* Copyright (c) 2017, Salesforce.com, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* * Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package com.salesforce.op.aggregators

import com.salesforce.op.features.types._
import com.twitter.algebird.{Monoid, MonoidAggregator}

import scala.reflect.runtime.universe.WeakTypeTag

private[op] abstract class TimeBasedAggregator[T <: FeatureType]
(
compareFun: (Long, Long) => Boolean,
val timeZero: Long
)(implicit val ttag: WeakTypeTag[T]) extends MonoidAggregator[Event[T], (Long, T#Value), T] {

val ftFactory = FeatureTypeFactory[T]()

val monoid: Monoid[(Long, T#Value)] = new Monoid[(Long, T#Value)] {
val zero = timeZero -> FeatureTypeDefaults.default[T].value
def plus(l: (Long, T#Value), r: (Long, T#Value)): (Long, T#Value) = if (compareFun(l._1, r._1)) r else l
}

def prepare(input: Event[T]): (Long, T#Value) = input.date -> input.value.v

def present(reduction: (Long, T#Value)): T = ftFactory.newInstance(reduction._2)
}

/**
* Gives last (most recent) value of feature
* @param ttag feature type tag
* @tparam T type of feature
*/
abstract class LastAggregator[T <: FeatureType](implicit ttag: WeakTypeTag[T]) extends
TimeBasedAggregator(compareFun = (l: Long, r: Long) => l < r, timeZero = 0L)(ttag = ttag)


/**
* Gives the first value of feature
* @param ttag feature type tag
* @tparam T type of feature
*/
abstract class FirstAggregator[T <: FeatureType](implicit ttag: WeakTypeTag[T]) extends
TimeBasedAggregator(compareFun = (l: Long, r: Long) => l >= r, timeZero = Long.MaxValue)(ttag = ttag)


case object LastVector extends LastAggregator[OPVector]
case object FirstVector extends FirstAggregator[OPVector]

case object LastTextList extends LastAggregator[TextList]
case object FirstTextList extends FirstAggregator[TextList]

case object LastDateList extends LastAggregator[DateList]
case object FirstDateList extends FirstAggregator[DateList]

case object LastDateTimeList extends LastAggregator[DateTimeList]
case object FirstDateTimeList extends FirstAggregator[DateTimeList]

case object LastGeolocation extends LastAggregator[Geolocation]
case object FirstGeolocation extends FirstAggregator[Geolocation]

case object LastBase64Map extends LastAggregator[Base64Map]
case object FirstBase64Map extends FirstAggregator[Base64Map]

case object LastBinaryMap extends LastAggregator[BinaryMap]
case object FirstBinaryMap extends FirstAggregator[BinaryMap]

case object LastComboBoxMap extends LastAggregator[ComboBoxMap]
case object FirstComboBoxMap extends FirstAggregator[ComboBoxMap]

case object LastCurrencyMap extends LastAggregator[CurrencyMap]
case object FirstCurrencyMap extends FirstAggregator[CurrencyMap]

case object LastDateMap extends LastAggregator[DateMap]
case object FirstDateMap extends FirstAggregator[DateMap]

case object LastDateTimeMap extends LastAggregator[DateTimeMap]
case object FirstDateTimeMap extends FirstAggregator[DateTimeMap]

case object LastEmailMap extends LastAggregator[EmailMap]
case object FirstEmailMap extends FirstAggregator[EmailMap]

case object LastIDMap extends LastAggregator[IDMap]
case object FirstIDMap extends FirstAggregator[IDMap]

case object LastIntegralMap extends LastAggregator[IntegralMap]
case object FirstIntegralMap extends FirstAggregator[IntegralMap]

case object LastMultiPickListMap extends LastAggregator[MultiPickListMap]
case object FirstMultiPickListMap extends FirstAggregator[MultiPickListMap]

case object LastPercentMap extends LastAggregator[PercentMap]
case object FirstPercentMap extends FirstAggregator[PercentMap]

case object LastPhoneMap extends LastAggregator[PhoneMap]
case object FirstPhoneMap extends FirstAggregator[PhoneMap]

case object LastPickListMap extends LastAggregator[PickListMap]
case object FirstPickListMap extends FirstAggregator[PickListMap]

case object LastRealMap extends LastAggregator[RealMap]
case object FirstRealMap extends FirstAggregator[RealMap]

case object LastTextAreaMap extends LastAggregator[TextAreaMap]
case object FirstTextAreaMap extends FirstAggregator[TextAreaMap]

case object LastTextMap extends LastAggregator[TextMap]
case object FirstTextMap extends FirstAggregator[TextMap]

case object LastURLMap extends LastAggregator[URLMap]
case object FirstURLMap extends FirstAggregator[URLMap]

case object LastCountryMap extends LastAggregator[CountryMap]
case object FirstCountryMap extends FirstAggregator[CountryMap]

case object LastStateMap extends LastAggregator[StateMap]
case object FirstStateMap extends FirstAggregator[StateMap]

case object LastCityMap extends LastAggregator[CityMap]
case object FirstCityMap extends FirstAggregator[CityMap]

case object LastPostalCodeMap extends LastAggregator[PostalCodeMap]
case object FirstPostalCodeMap extends FirstAggregator[PostalCodeMap]

case object LastStreetMap extends LastAggregator[StreetMap]
case object FirstStreetMap extends FirstAggregator[StreetMap]

case object LastGeolocationMap extends LastAggregator[GeolocationMap]
case object FirstGeolocationMap extends FirstAggregator[GeolocationMap]

case object LastBinary extends LastAggregator[Binary]
case object FirstBinary extends FirstAggregator[Binary]

case object LastCurrency extends LastAggregator[Currency]
case object FirstCurrency extends FirstAggregator[Currency]

case object LastDate extends LastAggregator[Date]
case object FirstDate extends FirstAggregator[Date]

case object LastDateTime extends LastAggregator[DateTime]
case object FirstDateTime extends FirstAggregator[DateTime]

case object LastIntegral extends LastAggregator[Integral]
case object FirstIntegral extends FirstAggregator[Integral]

case object LastPercent extends LastAggregator[Percent]
case object FirstPercent extends FirstAggregator[Percent]

case object LastReal extends LastAggregator[Real]
case object FirstReal extends FirstAggregator[Real]

case object LastMultiPickList extends LastAggregator[MultiPickList]
case object FirstMultiPickList extends FirstAggregator[MultiPickList]

case object LastBase64 extends LastAggregator[Base64]
case object FirstBase64 extends FirstAggregator[Base64]

case object LastComboBox extends LastAggregator[ComboBox]
case object FirstComboBox extends FirstAggregator[ComboBox]

case object LastEmail extends LastAggregator[Email]
case object FirstEmail extends FirstAggregator[Email]

case object LastID extends LastAggregator[ID]
case object FirstID extends FirstAggregator[ID]

case object LastPhone extends LastAggregator[Phone]
case object FirstPhone extends FirstAggregator[Phone]

case object LastPickList extends LastAggregator[PickList]
case object FirstPickList extends FirstAggregator[Phone]

case object LastText extends LastAggregator[Text]
case object FirstText extends FirstAggregator[Text]

case object LastTextArea extends LastAggregator[TextArea]
case object FirstTextArea extends FirstAggregator[TextArea]

case object LastURL extends LastAggregator[URL]
case object FirstURL extends FirstAggregator[URL]

case object LastCountry extends LastAggregator[Country]
case object FirstCountry extends FirstAggregator[Country]

case object LastState extends LastAggregator[State]
case object FirstState extends FirstAggregator[State]

case object LastCity extends LastAggregator[City]
case object FirstCity extends FirstAggregator[City]

case object LastPostalCode extends LastAggregator[PostalCode]
case object FirstPostalCode extends FirstAggregator[PostalCode]

case object LastStreet extends LastAggregator[Street]
case object FirstStreet extends FirstAggregator[Street]



Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright (c) 2017, Salesforce.com, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* * Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package com.salesforce.op.aggregators

import com.salesforce.op.features.FeatureBuilder
import com.salesforce.op.features.types._
import com.salesforce.op.stages.FeatureGeneratorStage
import com.salesforce.op.test.TestCommon
import org.joda.time.Duration
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner

@RunWith(classOf[JUnitRunner])
class TimeBasedAggregatorTest extends FlatSpec with TestCommon {

private val data = Seq(TimeBasedTest(100L, 1.0, "a", Map("a" -> "a")),
TimeBasedTest(200L, 2.0, "b", Map("b" -> "b")),
TimeBasedTest(300L, 3.0, "c", Map("c" -> "c")),
TimeBasedTest(400L, 4.0, "d", Map("d" -> "d")),
TimeBasedTest(500L, 5.0, "e", Map("e" -> "e")),
TimeBasedTest(600L, 6.0, "f", Map("f" -> "f"))
)

private val timeExt = Option((d: TimeBasedTest) => d.time)

Spec[LastAggregator[_]] should "return the most recent event" in {
val feature = FeatureBuilder.Real[TimeBasedTest].extract(_.real.toRealNN)
.aggregate(LastReal).asPredictor
val aggregator = feature.originStage.asInstanceOf[FeatureGeneratorStage[TimeBasedTest, _]].featureAggregator
val extracted = aggregator.extract(data, timeExt, CutOffTime.NoCutoff())
extracted shouldBe Real(Some(6.0))
}

it should "return the most recent event within the time window" in {
val feature = FeatureBuilder.Text[TimeBasedTest].extract(_.string.toText)
.aggregate(LastText).asResponse
val aggregator = feature.originStage.asInstanceOf[FeatureGeneratorStage[TimeBasedTest, _]].featureAggregator
val extracted = aggregator.extract(data, timeExt, CutOffTime.UnixEpoch(300L),
responseWindow = Option(new Duration(201L)))
extracted shouldBe Text(Some("e"))
}

it should "return the feature type empty value when no events are passed in" in {
val feature = FeatureBuilder.TextMap[TimeBasedTest].extract(_.map.toTextMap)
.aggregate(LastTextMap).asPredictor
val aggregator = feature.originStage.asInstanceOf[FeatureGeneratorStage[TimeBasedTest, _]].featureAggregator
val extracted = aggregator.extract(Seq(), timeExt, CutOffTime.NoCutoff())
extracted shouldBe TextMap.empty
}

Spec[FirstAggregator[_]] should "return the first event" in {
val feature = FeatureBuilder.TextAreaMap[TimeBasedTest].extract(_.map.toTextAreaMap)
.aggregate(FirstTextAreaMap).asResponse
val aggregator = feature.originStage.asInstanceOf[FeatureGeneratorStage[TimeBasedTest, _]].featureAggregator
val extracted = aggregator.extract(data, timeExt, CutOffTime.UnixEpoch(301L))
extracted shouldBe TextAreaMap(Map("d" -> "d"))
}

it should "return the first event within the time window" in {
val feature = FeatureBuilder.Currency[TimeBasedTest].extract(_.real.toCurrency)
.aggregate(FirstCurrency).asPredictor
val aggregator = feature.originStage.asInstanceOf[FeatureGeneratorStage[TimeBasedTest, _]].featureAggregator
val extracted = aggregator.extract(data, timeExt, CutOffTime.UnixEpoch(400L),
predictorWindow = Option(new Duration(201L)))
extracted shouldBe Currency(Some(2.0))
}

it should "return the feature type empty value when no events are passed in" in {
val feature = FeatureBuilder.State[TimeBasedTest].extract(_.string.toState)
.aggregate(FirstState).asPredictor
val aggregator = feature.originStage.asInstanceOf[FeatureGeneratorStage[TimeBasedTest, _]].featureAggregator
val extracted = aggregator.extract(Seq(), timeExt, CutOffTime.NoCutoff())
extracted shouldBe State.empty
}
}

case class TimeBasedTest(time: Long, real: Double, string: String, map: Map[String, String])