diff --git a/features/src/main/scala/com/salesforce/op/aggregators/CustomMonoidAggregator.scala b/features/src/main/scala/com/salesforce/op/aggregators/CustomMonoidAggregator.scala index 66db8768f6..02e527f39e 100644 --- a/features/src/main/scala/com/salesforce/op/aggregators/CustomMonoidAggregator.scala +++ b/features/src/main/scala/com/salesforce/op/aggregators/CustomMonoidAggregator.scala @@ -42,7 +42,7 @@ import scala.reflect.runtime.universe.WeakTypeTag * @param associativeFn associative function to combine values * @tparam O type of feature */ -case class CustomMonoidAggregator[O <: FeatureType] +abstract class CustomMonoidAggregator[O <: FeatureType] ( zero: O#Value, associativeFn: (O#Value, O#Value) => O#Value diff --git a/features/src/main/scala/com/salesforce/op/features/FeatureBuilder.scala b/features/src/main/scala/com/salesforce/op/features/FeatureBuilder.scala index 53b093ea1d..31e28905ae 100644 --- a/features/src/main/scala/com/salesforce/op/features/FeatureBuilder.scala +++ b/features/src/main/scala/com/salesforce/op/features/FeatureBuilder.scala @@ -275,26 +275,6 @@ final class FeatureBuilderWithExtract[I, O <: FeatureType] var aggregator: MonoidAggregator[Event[O], _, O] = MonoidAggregatorDefaults.aggregatorOf[O](tto) var aggregateWindow: Option[Duration] = None - /** - * Feature aggregation function with zero value - * @param zero a zero element for aggregation - * @param fn aggregation function - */ - def aggregate(zero: O#Value, fn: (O#Value, O#Value) => O#Value): this.type = { - aggregator = new CustomMonoidAggregator[O](associativeFn = fn, zero = zero)(tto) - this - } - - /** - * Feature aggregation function with zero value of [[FeatureTypeDefaults.default[O].value]] - * @param fn aggregation function - */ - def aggregate(fn: (O#Value, O#Value) => O#Value): this.type = { - val zero = FeatureTypeDefaults.default[O](tto).value - aggregator = new CustomMonoidAggregator[O](associativeFn = fn, zero = zero)(tto) - this - } - /** * Feature aggregation with a monoid aggregator * @param monoid a monoid aggregator diff --git a/features/src/test/scala/com/salesforce/op/aggregators/MonoidAggregatorDefaultsTest.scala b/features/src/test/scala/com/salesforce/op/aggregators/MonoidAggregatorDefaultsTest.scala index 3c6a1b4b34..bd14bd104c 100644 --- a/features/src/test/scala/com/salesforce/op/aggregators/MonoidAggregatorDefaultsTest.scala +++ b/features/src/test/scala/com/salesforce/op/aggregators/MonoidAggregatorDefaultsTest.scala @@ -534,8 +534,8 @@ class MonoidAggregatorDefaultsTest extends FlatSpec with TestCommon { } Spec[CustomMonoidAggregator[_]] should "work" in { - val customAgg = new CustomMonoidAggregator[Real](zero = None, associativeFn = (r1, r2) => (r1 -> r2).map(_ + _)) - assertAggr(customAgg, realTestSeq, Option(doubleBase.flatten.sum)) + val customSum = new CustomMonoidAggregator[Real](zero = None, associativeFn = (r1, r2) => (r1 -> r2).map(_ + _)){} + assertAggr(customSum, realTestSeq, Option(doubleBase.flatten.sum)) } diff --git a/features/src/test/scala/com/salesforce/op/features/FeatureBuilderTest.scala b/features/src/test/scala/com/salesforce/op/features/FeatureBuilderTest.scala index 961c197c9e..61025fba4b 100644 --- a/features/src/test/scala/com/salesforce/op/features/FeatureBuilderTest.scala +++ b/features/src/test/scala/com/salesforce/op/features/FeatureBuilderTest.scala @@ -34,7 +34,6 @@ import java.util import com.salesforce.op.aggregators._ import com.salesforce.op.features.types._ -import com.salesforce.op.stages.FeatureGeneratorStage import com.salesforce.op.test.{FeatureAsserts, Passenger, TestSparkContext} import org.apache.spark.sql.{DataFrame, Row} import org.joda.time.Duration @@ -155,27 +154,4 @@ class FeatureBuilderTest extends FlatSpec with TestSparkContext with FeatureAsse assertFeature[Passenger, Real](feature)(name = name, in = passenger, out = 1.toReal, aggregator = _ => MaxReal) } - it should "build an aggregated feature with a custom aggregate function" in { - val feature = - FeatureBuilder.Real[Passenger] - .extract(p => Option(p.getAge).map(_.toDouble).toReal) - .aggregate((v1, _) => v1) - .asPredictor - - assertFeature[Passenger, Real](feature)(name = name, in = passenger, out = 1.toReal, - aggregator = _ => feature.originStage.asInstanceOf[FeatureGeneratorStage[Passenger, Real]].aggregator - ) - } - - it should "build an aggregated feature with a custom aggregate function with zero" in { - val feature = FeatureBuilder.Real[Passenger] - .extract(p => Option(p.getAge).map(_.toDouble).toReal) - .aggregate(Real.empty.v, (v1, _) => v1) - .asPredictor - - assertFeature[Passenger, Real](feature)(name = name, in = passenger, out = 1.toReal, - aggregator = _ => feature.originStage.asInstanceOf[FeatureGeneratorStage[Passenger, Real]].aggregator - ) - } - } diff --git a/readers/src/main/scala/com/salesforce/op/test/PassengerFeaturesTest.scala b/readers/src/main/scala/com/salesforce/op/test/PassengerFeaturesTest.scala index 2645f1cdf9..896320035e 100644 --- a/readers/src/main/scala/com/salesforce/op/test/PassengerFeaturesTest.scala +++ b/readers/src/main/scala/com/salesforce/op/test/PassengerFeaturesTest.scala @@ -30,19 +30,15 @@ package com.salesforce.op.test +import com.salesforce.op.aggregators.MaxReal import com.salesforce.op.features.types._ import com.salesforce.op.features.{FeatureBuilder, OPFeature} -import com.salesforce.op.utils.tuples.RichTuple._ import org.joda.time.Duration trait PassengerFeaturesTest { - val age = FeatureBuilder.Real[Passenger] - .extract(_.getAge.toReal) - .aggregate((l, r) => (l -> r).map(breeze.linalg.max(_, _))) - .asPredictor - + val age = FeatureBuilder.Real[Passenger].extract(_.getAge.toReal).aggregate(MaxReal).asPredictor val gender = FeatureBuilder.MultiPickList[Passenger].extract(p => Set(p.getGender).toMultiPickList).asPredictor val genderPL = FeatureBuilder.PickList[Passenger].extract(p => p.getGender.toPickList).asPredictor diff --git a/readers/src/test/scala/com/salesforce/op/readers/DataReadersTest.scala b/readers/src/test/scala/com/salesforce/op/readers/DataReadersTest.scala index 4cfcbb48e8..2f66c3dee9 100644 --- a/readers/src/test/scala/com/salesforce/op/readers/DataReadersTest.scala +++ b/readers/src/test/scala/com/salesforce/op/readers/DataReadersTest.scala @@ -31,7 +31,7 @@ package com.salesforce.op.readers import com.salesforce.op.OpParams -import com.salesforce.op.aggregators.CutOffTime +import com.salesforce.op.aggregators.{CutOffTime, LogicalAnd} import com.salesforce.op.features.FeatureBuilder import com.salesforce.op.features.types._ import com.salesforce.op.test._ @@ -62,7 +62,7 @@ class DataReadersTest extends FlatSpec with PassengerSparkFixtureTest with TestC val survivedResponse = FeatureBuilder.Binary[PassengerCaseClass] .extract(_.survived.toBinary) - .aggregate(zero = Some(true), (l, r) => Some(l.getOrElse(false) && r.getOrElse(false))) + .aggregate(LogicalAnd) .asResponse val aggregateParameters = AggregateParams( diff --git a/readers/src/test/scala/com/salesforce/op/readers/JoinedDataReaderDataGenerationTest.scala b/readers/src/test/scala/com/salesforce/op/readers/JoinedDataReaderDataGenerationTest.scala index 609a3552a7..26163e47f3 100644 --- a/readers/src/test/scala/com/salesforce/op/readers/JoinedDataReaderDataGenerationTest.scala +++ b/readers/src/test/scala/com/salesforce/op/readers/JoinedDataReaderDataGenerationTest.scala @@ -30,7 +30,7 @@ package com.salesforce.op.readers -import com.salesforce.op.aggregators.CutOffTime +import com.salesforce.op.aggregators.{CutOffTime, MaxRealNN, MinRealNN} import com.salesforce.op.features.types._ import com.salesforce.op.features.{FeatureBuilder, OPFeature} import com.salesforce.op.test._ @@ -38,8 +38,8 @@ import com.salesforce.op.utils.spark.RichDataset._ import org.apache.spark.sql.Row import org.joda.time.Duration import org.junit.runner.RunWith +import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner -import org.scalatest.{FlatSpec, Matchers} import org.slf4j.LoggerFactory @@ -51,13 +51,13 @@ class JoinedDataReaderDataGenerationTest extends FlatSpec with PassengerSparkFix val newWeight = FeatureBuilder.RealNN[PassengerCSV] .extract(_.getWeight.toDouble.toRealNN) - .aggregate(zero = Some(Double.MaxValue), (a, b) => Some(math.min(a.v.getOrElse(0.0), b.v.getOrElse(0.0)))) + .aggregate(MinRealNN) .asPredictor val newHeight = FeatureBuilder.RealNN[PassengerCSV] .extract(_.getHeight.toDouble.toRealNN) - .aggregate(zero = Some(0.0), (a, b) => Some(math.max(a.v.getOrElse(0.0), b.v.getOrElse(0.0)))) + .aggregate(MaxRealNN) .asPredictor val recordTime = FeatureBuilder.DateTime[PassengerCSV].extract(_.getRecordDate.toLong.toDateTime).asPredictor @@ -312,10 +312,10 @@ class JoinedDataReaderDataGenerationTest extends FlatSpec with PassengerSparkFix Array(Date.empty, Date.empty, Date(1471046100L), Date(1471046400L), Date(1471046400L), Date(1471046600L)) aggregatedData.collect(newHeight) should contain theSameElementsAs - Seq(186.0, 168.0, 0.0, 0.0, 186.0, 172.0).toRealNN + Seq(186.0, 168.0, Double.NegativeInfinity, Double.NegativeInfinity, 186.0, 172.0).toRealNN aggregatedData.collect(newWeight) should contain theSameElementsAs - Seq(96.0, 67.0, Double.MaxValue, Double.MaxValue, 76.0, 78.0).toRealNN + Seq(96.0, 67.0, Double.PositiveInfinity, Double.PositiveInfinity, 76.0, 78.0).toRealNN aggregatedData.collect(recordTime) should contain theSameElementsAs Array(DateTime(None), DateTime(None), DateTime(1471045900L), DateTime(1471046000L),