Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove free function aggregation with feature builders #311

Merged
merged 3 commits into from
May 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import scala.reflect.runtime.universe.WeakTypeTag
* @param associativeFn associative function to combine values
* @tparam O type of feature
*/
case class CustomMonoidAggregator[O <: FeatureType]
abstract class CustomMonoidAggregator[O <: FeatureType]
(
zero: O#Value,
associativeFn: (O#Value, O#Value) => O#Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,26 +275,6 @@ final class FeatureBuilderWithExtract[I, O <: FeatureType]
var aggregator: MonoidAggregator[Event[O], _, O] = MonoidAggregatorDefaults.aggregatorOf[O](tto)
var aggregateWindow: Option[Duration] = None

/**
* Feature aggregation function with zero value
* @param zero a zero element for aggregation
* @param fn aggregation function
*/
def aggregate(zero: O#Value, fn: (O#Value, O#Value) => O#Value): this.type = {
aggregator = new CustomMonoidAggregator[O](associativeFn = fn, zero = zero)(tto)
this
}

/**
* Feature aggregation function with zero value of [[FeatureTypeDefaults.default[O].value]]
* @param fn aggregation function
*/
def aggregate(fn: (O#Value, O#Value) => O#Value): this.type = {
val zero = FeatureTypeDefaults.default[O](tto).value
aggregator = new CustomMonoidAggregator[O](associativeFn = fn, zero = zero)(tto)
this
}

/**
* Feature aggregation with a monoid aggregator
* @param monoid a monoid aggregator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,8 +534,8 @@ class MonoidAggregatorDefaultsTest extends FlatSpec with TestCommon {
}

Spec[CustomMonoidAggregator[_]] should "work" in {
val customAgg = new CustomMonoidAggregator[Real](zero = None, associativeFn = (r1, r2) => (r1 -> r2).map(_ + _))
assertAggr(customAgg, realTestSeq, Option(doubleBase.flatten.sum))
val customSum = new CustomMonoidAggregator[Real](zero = None, associativeFn = (r1, r2) => (r1 -> r2).map(_ + _)){}
assertAggr(customSum, realTestSeq, Option(doubleBase.flatten.sum))
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import java.util

import com.salesforce.op.aggregators._
import com.salesforce.op.features.types._
import com.salesforce.op.stages.FeatureGeneratorStage
import com.salesforce.op.test.{FeatureAsserts, Passenger, TestSparkContext}
import org.apache.spark.sql.{DataFrame, Row}
import org.joda.time.Duration
Expand Down Expand Up @@ -155,27 +154,4 @@ class FeatureBuilderTest extends FlatSpec with TestSparkContext with FeatureAsse
assertFeature[Passenger, Real](feature)(name = name, in = passenger, out = 1.toReal, aggregator = _ => MaxReal)
}

it should "build an aggregated feature with a custom aggregate function" in {
val feature =
FeatureBuilder.Real[Passenger]
.extract(p => Option(p.getAge).map(_.toDouble).toReal)
.aggregate((v1, _) => v1)
.asPredictor

assertFeature[Passenger, Real](feature)(name = name, in = passenger, out = 1.toReal,
aggregator = _ => feature.originStage.asInstanceOf[FeatureGeneratorStage[Passenger, Real]].aggregator
)
}

it should "build an aggregated feature with a custom aggregate function with zero" in {
val feature = FeatureBuilder.Real[Passenger]
.extract(p => Option(p.getAge).map(_.toDouble).toReal)
.aggregate(Real.empty.v, (v1, _) => v1)
.asPredictor

assertFeature[Passenger, Real](feature)(name = name, in = passenger, out = 1.toReal,
aggregator = _ => feature.originStage.asInstanceOf[FeatureGeneratorStage[Passenger, Real]].aggregator
)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,15 @@

package com.salesforce.op.test

import com.salesforce.op.aggregators.MaxReal
import com.salesforce.op.features.types._
import com.salesforce.op.features.{FeatureBuilder, OPFeature}
import com.salesforce.op.utils.tuples.RichTuple._
import org.joda.time.Duration


trait PassengerFeaturesTest {

val age = FeatureBuilder.Real[Passenger]
.extract(_.getAge.toReal)
.aggregate((l, r) => (l -> r).map(breeze.linalg.max(_, _)))
.asPredictor

val age = FeatureBuilder.Real[Passenger].extract(_.getAge.toReal).aggregate(MaxReal).asPredictor
val gender = FeatureBuilder.MultiPickList[Passenger].extract(p => Set(p.getGender).toMultiPickList).asPredictor
val genderPL = FeatureBuilder.PickList[Passenger].extract(p => p.getGender.toPickList).asPredictor

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
package com.salesforce.op.readers

import com.salesforce.op.OpParams
import com.salesforce.op.aggregators.CutOffTime
import com.salesforce.op.aggregators.{CutOffTime, LogicalAnd}
import com.salesforce.op.features.FeatureBuilder
import com.salesforce.op.features.types._
import com.salesforce.op.test._
Expand Down Expand Up @@ -62,7 +62,7 @@ class DataReadersTest extends FlatSpec with PassengerSparkFixtureTest with TestC

val survivedResponse = FeatureBuilder.Binary[PassengerCaseClass]
.extract(_.survived.toBinary)
.aggregate(zero = Some(true), (l, r) => Some(l.getOrElse(false) && r.getOrElse(false)))
.aggregate(LogicalAnd)
.asResponse

val aggregateParameters = AggregateParams(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@

package com.salesforce.op.readers

import com.salesforce.op.aggregators.CutOffTime
import com.salesforce.op.aggregators.{CutOffTime, MaxRealNN, MinRealNN}
import com.salesforce.op.features.types._
import com.salesforce.op.features.{FeatureBuilder, OPFeature}
import com.salesforce.op.test._
import com.salesforce.op.utils.spark.RichDataset._
import org.apache.spark.sql.Row
import org.joda.time.Duration
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner
import org.scalatest.{FlatSpec, Matchers}
import org.slf4j.LoggerFactory


Expand All @@ -51,13 +51,13 @@ class JoinedDataReaderDataGenerationTest extends FlatSpec with PassengerSparkFix
val newWeight =
FeatureBuilder.RealNN[PassengerCSV]
.extract(_.getWeight.toDouble.toRealNN)
.aggregate(zero = Some(Double.MaxValue), (a, b) => Some(math.min(a.v.getOrElse(0.0), b.v.getOrElse(0.0))))
.aggregate(MinRealNN)
.asPredictor

val newHeight =
FeatureBuilder.RealNN[PassengerCSV]
.extract(_.getHeight.toDouble.toRealNN)
.aggregate(zero = Some(0.0), (a, b) => Some(math.max(a.v.getOrElse(0.0), b.v.getOrElse(0.0))))
.aggregate(MaxRealNN)
.asPredictor

val recordTime = FeatureBuilder.DateTime[PassengerCSV].extract(_.getRecordDate.toLong.toDateTime).asPredictor
Expand Down Expand Up @@ -312,10 +312,10 @@ class JoinedDataReaderDataGenerationTest extends FlatSpec with PassengerSparkFix
Array(Date.empty, Date.empty, Date(1471046100L), Date(1471046400L), Date(1471046400L), Date(1471046600L))

aggregatedData.collect(newHeight) should contain theSameElementsAs
Seq(186.0, 168.0, 0.0, 0.0, 186.0, 172.0).toRealNN
Seq(186.0, 168.0, Double.NegativeInfinity, Double.NegativeInfinity, 186.0, 172.0).toRealNN

aggregatedData.collect(newWeight) should contain theSameElementsAs
Seq(96.0, 67.0, Double.MaxValue, Double.MaxValue, 76.0, 78.0).toRealNN
Seq(96.0, 67.0, Double.PositiveInfinity, Double.PositiveInfinity, 76.0, 78.0).toRealNN

aggregatedData.collect(recordTime) should contain theSameElementsAs
Array(DateTime(None), DateTime(None), DateTime(1471045900L), DateTime(1471046000L),
Expand Down