From 3664d3a302c499713f9001ca059bae460bb1eb1a Mon Sep 17 00:00:00 2001 From: marcovivero Date: Wed, 12 Sep 2018 16:06:32 -0700 Subject: [PATCH 01/11] Date unit circle binning for RFF --- .../op/filters/PreparedFeatures.scala | 63 ++++++++++++++-- .../op/filters/RawFeatureFilter.scala | 6 +- .../op/filters/PreparedFeaturesTest.scala | 71 ++++++++++++++++++- 3 files changed, 130 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/filters/PreparedFeatures.scala b/core/src/main/scala/com/salesforce/op/filters/PreparedFeatures.scala index 142fa02694..90a4b2294f 100644 --- a/core/src/main/scala/com/salesforce/op/filters/PreparedFeatures.scala +++ b/core/src/main/scala/com/salesforce/op/filters/PreparedFeatures.scala @@ -30,9 +30,14 @@ package com.salesforce.op.filters + +import java.time.{Instant, OffsetDateTime, ZoneOffset} +import java.time.temporal.WeekFields +import java.util.Locale + import com.salesforce.op.features.TransientFeature import com.salesforce.op.features.types._ -import com.salesforce.op.stages.impl.feature.TextTokenizer +import com.salesforce.op.stages.impl.feature.{TextTokenizer, TimePeriod} import com.salesforce.op.utils.spark.RichRow._ import com.salesforce.op.utils.text.Language import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors} @@ -127,15 +132,19 @@ private[filters] object PreparedFeatures { * @param predictors transient features derived from predictors * @return set of prepared features */ - def apply(row: Row, responses: Array[TransientFeature], predictors: Array[TransientFeature]): PreparedFeatures = { + def apply( + row: Row, + responses: Array[TransientFeature], + predictors: Array[TransientFeature], + timePeriod: Option[TimePeriod]): PreparedFeatures = { val empty: Map[FeatureKey, ProcessedSeq] = Map.empty val preparedResponses = responses.foldLeft(empty) { case (map, feature) => val converter = FeatureTypeSparkConverter.fromFeatureTypeName(feature.typeName) - map ++ prepareFeature(feature.name, row.getFeatureType(feature)(converter)) + map ++ prepareFeature(feature.name, row.getFeatureType(feature)(converter), timePeriod) } val preparedPredictors = predictors.foldLeft(empty) { case (map, feature) => val converter = FeatureTypeSparkConverter.fromFeatureTypeName(feature.typeName) - map ++ prepareFeature(feature.name, row.getFeatureType(feature)(converter)) + map ++ prepareFeature(feature.name, row.getFeatureType(feature)(converter), timePeriod) } PreparedFeatures(responses = preparedResponses, predictors = preparedPredictors) @@ -149,10 +158,16 @@ private[filters] object PreparedFeatures { * @tparam T type of the feature * @return tuple containing whether the feature was empty and a sequence of either doubles or strings */ - private def prepareFeature[T <: FeatureType](name: String, value: T): Map[FeatureKey, ProcessedSeq] = + private def prepareFeature[T <: FeatureType]( + name: String, + value: T, + timePeriod: Option[TimePeriod]): Map[FeatureKey, ProcessedSeq] = value match { case v: Text => v.value .map(s => Map[FeatureKey, ProcessedSeq]((name, None) -> Left(tokenize(s)))).getOrElse(Map.empty) + case v: Date => v.value.map { timestamp => + Map[FeatureKey, ProcessedSeq]((name, None) -> Right(Seq(prepareDateValue(timestamp, timePeriod)))) + }.getOrElse(Map.empty) case v: OPNumeric[_] => v.toDouble .map(d => Map[FeatureKey, ProcessedSeq]((name, None) -> Right(Seq(d)))).getOrElse(Map.empty) case SomeValue(v: DenseVector) => Map((name, None) -> Right(v.toArray.toSeq)) @@ -160,10 +175,12 @@ private[filters] object PreparedFeatures { case ft@SomeValue(_) => ft match { case v: Geolocation => Map((name, None) -> Right(v.value)) case v: TextList => Map((name, None) -> Left(v.value)) - case v: DateList => Map((name, None) -> Right(v.value.map(_.toDouble))) + case v: DateList => Map((name, None) -> Right(v.value.map(prepareDateValue(_, timePeriod)))) case v: MultiPickList => Map((name, None) -> Left(v.value.toSeq)) case v: MultiPickListMap => v.value.map { case (k, e) => (name, Option(k)) -> Left(e.toSeq) } case v: GeolocationMap => v.value.map{ case (k, e) => (name, Option(k)) -> Right(e) } + case v: DateMap => + v.value.map { case (k, e) => (name, Option(k)) -> Right(Seq(prepareDateValue(e, timePeriod))) } case v: OPMap[_] => v.value.map { case (k, e) => e match { case d: Double => (name, Option(k)) -> Right(Seq(d)) // Do not need to distinguish between string map types, all text is tokenized for distribution calculation @@ -183,4 +200,38 @@ private[filters] object PreparedFeatures { * @return array of string tokens */ private def tokenize(s: String) = TextTokenizer.Analyzer.analyze(s, Language.Unknown) + + private def prepareDateValue(timestamp: Long, timePeriod: Option[TimePeriod]): Double = + timePeriod match { + case Some(period) => + val dt = Instant.ofEpochMilli(timestamp).atZone(ZoneOffset.UTC).toOffsetDateTime + val unproj = period match { + case TimePeriod.DayOfMonth => dt.getDayOfMonth.toDouble + case TimePeriod.DayOfWeek => dt.getDayOfWeek.getValue.toDouble + case TimePeriod.DayOfYear => dt.getDayOfYear.toDouble + case TimePeriod.HourOfDay => dt.getHour.toDouble + case TimePeriod.MonthOfYear => dt.getMonthValue.toDouble + case TimePeriod.WeekOfMonth => dt.get(WeekFields.of(Locale.US).weekOfMonth).toDouble + case TimePeriod.WeekOfYear => dt.get(WeekFields.of(Locale.US).weekOfYear).toDouble + } + + unproj % getTimePeriodMaxBins(period) + case None => timestamp.toDouble + } + + /** + * Utility for getting maximum number of bins to use given an input time period. + * + * @param period input time period + * @return maximum number of bins associated to input time period + */ + private def getTimePeriodMaxBins(period: TimePeriod): Int = period match { + case TimePeriod.DayOfMonth => 31 + case TimePeriod.DayOfWeek => 7 + case TimePeriod.DayOfYear => 366 + case TimePeriod.HourOfDay => 24 + case TimePeriod.MonthOfYear => 12 + case TimePeriod.WeekOfMonth => 5 + case TimePeriod.WeekOfYear => 53 + } } diff --git a/core/src/main/scala/com/salesforce/op/filters/RawFeatureFilter.scala b/core/src/main/scala/com/salesforce/op/filters/RawFeatureFilter.scala index 2bd190755e..c84d0808fc 100644 --- a/core/src/main/scala/com/salesforce/op/filters/RawFeatureFilter.scala +++ b/core/src/main/scala/com/salesforce/op/filters/RawFeatureFilter.scala @@ -34,6 +34,7 @@ import com.salesforce.op.OpParams import com.salesforce.op.features.types._ import com.salesforce.op.features.{OPFeature, TransientFeature} import com.salesforce.op.readers.{DataFrameFieldNames, Reader} +import com.salesforce.op.stages.impl.feature.TimePeriod import com.salesforce.op.stages.impl.preparators.CorrelationType import com.salesforce.op.utils.spark.RichRow._ import com.twitter.algebird.Monoid._ @@ -93,7 +94,8 @@ class RawFeatureFilter[T] val correlationType: CorrelationType = CorrelationType.Pearson, val jsDivergenceProtectedFeatures: Set[String] = Set.empty, val protectedFeatures: Set[String] = Set.empty, - val textBinsFormula: (Summary, Int) => Int = RawFeatureFilter.textBinsFormula + val textBinsFormula: (Summary, Int) => Int = RawFeatureFilter.textBinsFormula, + val timePeriod: Option[TimePeriod] = None ) extends Serializable { assert(bins > 1 && bins <= FeatureDistribution.MaxBins, s"Invalid bin size $bins," + @@ -139,7 +141,7 @@ class RawFeatureFilter[T] (respOut, predOut) } val preparedFeatures: RDD[PreparedFeatures] = - data.rdd.map(PreparedFeatures(_, responses, predictors)) + data.rdd.map(PreparedFeatures(_, responses, predictors, timePeriod)) // Have to use the training summaries do process scoring for comparison val (responseSummaries, predictorSummaries): (Map[FeatureKey, Summary], Map[FeatureKey, Summary]) = allFeatureInfo.map(info => info.responseSummaries -> info.predictorSummaries) diff --git a/core/src/test/scala/com/salesforce/op/filters/PreparedFeaturesTest.scala b/core/src/test/scala/com/salesforce/op/filters/PreparedFeaturesTest.scala index 6ae18b080b..afb8ba26bf 100644 --- a/core/src/test/scala/com/salesforce/op/filters/PreparedFeaturesTest.scala +++ b/core/src/test/scala/com/salesforce/op/filters/PreparedFeaturesTest.scala @@ -32,19 +32,24 @@ package com.salesforce.op.filters import scala.math.round +import com.salesforce.op.features.{FeatureBuilder, OPFeature, TransientFeature} +import com.salesforce.op.features.types._ +import com.salesforce.op.readers._ +import com.salesforce.op.stages.impl.feature.TimePeriod import com.salesforce.op.stages.impl.preparators.CorrelationType -import com.salesforce.op.test.TestSparkContext +import com.salesforce.op.test.{Passenger, PassengerSparkFixtureTest} import com.twitter.algebird.Monoid._ import com.twitter.algebird.Operators._ import org.apache.spark.mllib.linalg.{Matrix, Vector} import org.apache.spark.mllib.stat.Statistics import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Row} import org.junit.runner.RunWith import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner @RunWith(classOf[JUnitRunner]) -class PreparedFeaturesTest extends FlatSpec with TestSparkContext { +class PreparedFeaturesTest extends FlatSpec with PassengerSparkFixtureTest { val responseKey1: FeatureKey = "Response1" -> None val responseKey2: FeatureKey = "Response2" -> None @@ -157,6 +162,68 @@ class PreparedFeaturesTest extends FlatSpec with TestSparkContext { testCorrMatrix(allResponseKeys2, CorrelationType.Spearman, expected) } + it should "correctly transform date features when time period is specified" in { + val dateMap = + FeatureBuilder.DateMap[Passenger].extract(p => Map("DTMap" -> p.getBoarded.toLong).toDateMap).asPredictor + val dateFeatures: Array[OPFeature] = Array(boarded, boardedTime, boardedTimeAsDateTime, dateMap) + val df: DataFrame = dataReader.generateDataFrame(dateFeatures) + + val ppRDD5: RDD[Map[FeatureKey, ProcessedSeq]] = df.rdd + .map(PreparedFeatures( + _, Array.empty[TransientFeature], dateFeatures.map(TransientFeature(_)), Option(TimePeriod.WeekOfMonth))) + .map(_.predictors.mapValues(_.right.map(_.toList))) + val ppRDD7: RDD[Map[FeatureKey, ProcessedSeq]] = df.rdd + .map(PreparedFeatures( + _, Array.empty[TransientFeature], dateFeatures.map(TransientFeature(_)), Option(TimePeriod.DayOfWeek))) + .map(_.predictors.mapValues(_.right.map(_.toList))) + val ppRDD12: RDD[Map[FeatureKey, ProcessedSeq]] = df.rdd + .map(PreparedFeatures( + _, Array.empty[TransientFeature], dateFeatures.map(TransientFeature(_)), Option(TimePeriod.MonthOfYear))) + .map(_.predictors.mapValues(_.right.map(_.toList))) + val ppRDD24: RDD[Map[FeatureKey, ProcessedSeq]] = df.rdd + .map(PreparedFeatures( + _, Array.empty[TransientFeature], dateFeatures.map(TransientFeature(_)), Option(TimePeriod.HourOfDay))) + .map(_.predictors.mapValues(_.right.map(_.toList))) + val ppRDD31: RDD[Map[FeatureKey, ProcessedSeq]] = df.rdd + .map(PreparedFeatures( + _, Array.empty[TransientFeature], dateFeatures.map(TransientFeature(_)), Option(TimePeriod.DayOfMonth))) + .map(_.predictors.mapValues(_.right.map(_.toList))) + val ppRDD53: RDD[Map[FeatureKey, ProcessedSeq]] = df.rdd + .map(PreparedFeatures( + _, Array.empty[TransientFeature], dateFeatures.map(TransientFeature(_)), Option(TimePeriod.WeekOfYear))) + .map(_.predictors.mapValues(_.right.map(_.toList))) + val ppRDD366: RDD[Map[FeatureKey, ProcessedSeq]] = df.rdd + .map(PreparedFeatures( + _, Array.empty[TransientFeature], dateFeatures.map(TransientFeature(_)), Option(TimePeriod.DayOfYear))) + .map(_.predictors.mapValues(_.right.map(_.toList))) + val ppRDDNone: RDD[Map[FeatureKey, ProcessedSeq]] = df.rdd + .map(PreparedFeatures( + _, Array.empty[TransientFeature], dateFeatures.map(TransientFeature(_)), None)) + .map(_.predictors.mapValues(_.right.map(_.toList))) + + def createExpected(d: Double): Seq[(FeatureKey, ProcessedSeq)] = Seq( + (boarded.name, None) -> Right(List(d, d)), + (boarded.name, None) -> Right(List(d)), + (boardedTime.name, None) -> Right(List(d)), + (boardedTimeAsDateTime.name, None) -> Right(List(d)), + (dateMap.name, Option("DTMap")) -> Right(List(d))) + + val expected5 = Seq(4.0).map(createExpected) + val expected7 = Seq(0.0).map(createExpected) + val expected12 = Seq(1.0).map(createExpected) + val expected24 = Seq(0.0).map(createExpected) + val expected31 = Seq(18.0).map(createExpected) + val expected53 = Seq(4.0).map(createExpected) + val expected366 = Seq(18.0).map(createExpected) + + ppRDD5.collect.flatMap(identity(_)).toSet should contain theSameElementsAs expected5.flatMap(identity(_)) + ppRDD7.collect.flatMap(identity(_)).toSet should contain theSameElementsAs expected7.flatMap(identity(_)) + ppRDD12.collect.flatMap(identity(_)).toSet should contain theSameElementsAs expected12.flatMap(identity(_)) + ppRDD24.collect.flatMap(identity(_)).toSet should contain theSameElementsAs expected24.flatMap(identity(_)) + ppRDD53.collect.flatMap(identity(_)).toSet should contain theSameElementsAs expected53.flatMap(identity(_)) + ppRDD366.collect.flatMap(identity(_)).toSet should contain theSameElementsAs expected366.flatMap(identity(_)) + } + def testCorrMatrix( responseKeys: Array[FeatureKey], correlationType: CorrelationType, From f62c38d642a533ec6b5c0e00cec9953f7d17e9cc Mon Sep 17 00:00:00 2001 From: marcovivero Date: Wed, 12 Sep 2018 17:04:24 -0700 Subject: [PATCH 02/11] Add to OpWorkflow constructor --- core/src/main/scala/com/salesforce/op/OpWorkflow.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/OpWorkflow.scala b/core/src/main/scala/com/salesforce/op/OpWorkflow.scala index af45e49a15..3faf34a567 100644 --- a/core/src/main/scala/com/salesforce/op/OpWorkflow.scala +++ b/core/src/main/scala/com/salesforce/op/OpWorkflow.scala @@ -34,6 +34,7 @@ import com.salesforce.op.features.OPFeature import com.salesforce.op.filters.{FeatureDistribution, RawFeatureFilter, Summary} import com.salesforce.op.readers.Reader import com.salesforce.op.stages.OPStage +import com.salesforce.op.stages.impl.feature.TimePeriod import com.salesforce.op.stages.impl.preparators.CorrelationType import com.salesforce.op.stages.impl.selector.ModelSelector import com.salesforce.op.utils.reflection.ReflectionUtils @@ -521,12 +522,15 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore { maxCorrelation: Double = 0.95, correlationType: CorrelationType = CorrelationType.Pearson, protectedFeatures: Array[OPFeature] = Array.empty, - textBinsFormula: (Summary, Int) => Int = RawFeatureFilter.textBinsFormula + protectedJSFeatures: Array[OPFeature] = Array.empty, + textBinsFormula: (Summary, Int) => Int = RawFeatureFilter.textBinsFormula, + timePeriod: Option[TimePeriod] = None ): this.type = { val training = trainingReader.orElse(reader).map(_.asInstanceOf[Reader[T]]) require(training.nonEmpty, "Reader for training data must be provided either in withRawFeatureFilter or directly" + "as the reader for the workflow") val protectedRawFeatures = protectedFeatures.flatMap(_.rawFeatures).map(_.name).toSet + val protectedRawJSFeatures = protectedJSFeatures.flatMap(_.rawFeatures).map(_.name).toSet rawFeatureFilter = Option { new RawFeatureFilter( trainingReader = training.get, @@ -539,8 +543,8 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore { maxCorrelation = maxCorrelation, correlationType = correlationType, protectedFeatures = protectedRawFeatures, - textBinsFormula = textBinsFormula - ) + jsDivergenceProtectedFeatures = protectedRawJSFeatures, + textBinsFormula = textBinsFormula) } this } From c93d0835481eb570b0865caf76df4944db6ad93f Mon Sep 17 00:00:00 2001 From: marcovivero Date: Wed, 12 Sep 2018 17:04:24 -0700 Subject: [PATCH 03/11] Add to OpWorkflow constructor --- .../scala/com/salesforce/op/OpWorkflow.scala | 49 +++++++----- .../op/filters/PreparedFeatures.scala | 36 +-------- .../op/filters/RawFeatureFilter.scala | 2 + .../feature/DateToUnitCircleTransformer.scala | 36 +++++---- .../op/filters/PreparedFeaturesTest.scala | 80 +++++++------------ 5 files changed, 84 insertions(+), 119 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/OpWorkflow.scala b/core/src/main/scala/com/salesforce/op/OpWorkflow.scala index af45e49a15..a060de82d8 100644 --- a/core/src/main/scala/com/salesforce/op/OpWorkflow.scala +++ b/core/src/main/scala/com/salesforce/op/OpWorkflow.scala @@ -34,6 +34,7 @@ import com.salesforce.op.features.OPFeature import com.salesforce.op.filters.{FeatureDistribution, RawFeatureFilter, Summary} import com.salesforce.op.readers.Reader import com.salesforce.op.stages.OPStage +import com.salesforce.op.stages.impl.feature.TimePeriod import com.salesforce.op.stages.impl.preparators.CorrelationType import com.salesforce.op.stages.impl.selector.ModelSelector import com.salesforce.op.utils.reflection.ReflectionUtils @@ -488,24 +489,27 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore { * Add a raw features filter to the workflow to look at fill rates and distributions of raw features and exclude * features that do not meet specifications from modeling DAG * - * @param trainingReader training reader to use in filter if not supplied will fall back to reader specified for - * workflow (note that this reader will take precedence over readers directly input to the - * workflow if both are supplied) - * @param scoringReader scoring reader to use in filter if not supplied will do the checks possible with only - * training data available - * @param bins number of bins to use in estimating feature distributions - * @param minFillRate minimum non-null fraction of instances that a feature should contain - * @param maxFillDifference maximum absolute difference in fill rate between scoring and training data for a feature - * @param maxFillRatioDiff maximum difference in fill ratio (symmetric) between scoring and training data for - * a feature - * @param maxJSDivergence maximum Jensen-Shannon divergence between the training and scoring distributions - * for a feature - * @param protectedFeatures list of features that should never be removed (features that are used to create them will - * also be protected) - * @param textBinsFormula formula to compute the text features bin size. - * Input arguments are [[Summary]] and number of bins to use in computing - * feature distributions (histograms for numerics, hashes for strings). - * Output is the bins for the text features. + * @param trainingReader training reader to use in filter if not supplied will fall back to reader specified for + * workflow (note that this reader will take precedence over readers directly input to the + * workflow if both are supplied) + * @param scoringReader scoring reader to use in filter if not supplied will do the checks possible with only + * training data available + * @param bins number of bins to use in estimating feature distributions + * @param minFillRate minimum non-null fraction of instances that a feature should contain + * @param maxFillDifference maximum absolute difference in fill rate between scoring and training data for a feature + * @param maxFillRatioDiff maximum difference in fill ratio (symmetric) between scoring and training data for + * a feature + * @param maxJSDivergence maximum Jensen-Shannon divergence between the training and scoring distributions + * for a feature + * @param protectedFeatures list of features that should never be removed (features that are used to create them will + * also be protected) + * @param protectedJSFeatures features that are protected from removal by JS divergence check + * @param textBinsFormula formula to compute the text features bin size. + * Input arguments are [[Summary]] and number of bins to use in computing + * feature distributions (histograms for numerics, hashes for strings). + * Output is the bins for the text features. + * @param timePeriod Time period used to apply circulate date transformation for date features, if not + * specified will use numeric feature transformation * @tparam T Type of the data read in */ @Experimental @@ -521,12 +525,15 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore { maxCorrelation: Double = 0.95, correlationType: CorrelationType = CorrelationType.Pearson, protectedFeatures: Array[OPFeature] = Array.empty, - textBinsFormula: (Summary, Int) => Int = RawFeatureFilter.textBinsFormula + protectedJSFeatures: Array[OPFeature] = Array.empty, + textBinsFormula: (Summary, Int) => Int = RawFeatureFilter.textBinsFormula, + timePeriod: Option[TimePeriod] = None ): this.type = { val training = trainingReader.orElse(reader).map(_.asInstanceOf[Reader[T]]) require(training.nonEmpty, "Reader for training data must be provided either in withRawFeatureFilter or directly" + "as the reader for the workflow") val protectedRawFeatures = protectedFeatures.flatMap(_.rawFeatures).map(_.name).toSet + val protectedRawJSFeatures = protectedJSFeatures.flatMap(_.rawFeatures).map(_.name).toSet rawFeatureFilter = Option { new RawFeatureFilter( trainingReader = training.get, @@ -539,8 +546,8 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore { maxCorrelation = maxCorrelation, correlationType = correlationType, protectedFeatures = protectedRawFeatures, - textBinsFormula = textBinsFormula - ) + jsDivergenceProtectedFeatures = protectedRawJSFeatures, + textBinsFormula = textBinsFormula) } this } diff --git a/core/src/main/scala/com/salesforce/op/filters/PreparedFeatures.scala b/core/src/main/scala/com/salesforce/op/filters/PreparedFeatures.scala index 90a4b2294f..9d11a98974 100644 --- a/core/src/main/scala/com/salesforce/op/filters/PreparedFeatures.scala +++ b/core/src/main/scala/com/salesforce/op/filters/PreparedFeatures.scala @@ -31,13 +31,9 @@ package com.salesforce.op.filters -import java.time.{Instant, OffsetDateTime, ZoneOffset} -import java.time.temporal.WeekFields -import java.util.Locale - import com.salesforce.op.features.TransientFeature import com.salesforce.op.features.types._ -import com.salesforce.op.stages.impl.feature.{TextTokenizer, TimePeriod} +import com.salesforce.op.stages.impl.feature.{DateToUnitCircle, TextTokenizer, TimePeriod} import com.salesforce.op.utils.spark.RichRow._ import com.salesforce.op.utils.text.Language import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors} @@ -203,35 +199,7 @@ private[filters] object PreparedFeatures { private def prepareDateValue(timestamp: Long, timePeriod: Option[TimePeriod]): Double = timePeriod match { - case Some(period) => - val dt = Instant.ofEpochMilli(timestamp).atZone(ZoneOffset.UTC).toOffsetDateTime - val unproj = period match { - case TimePeriod.DayOfMonth => dt.getDayOfMonth.toDouble - case TimePeriod.DayOfWeek => dt.getDayOfWeek.getValue.toDouble - case TimePeriod.DayOfYear => dt.getDayOfYear.toDouble - case TimePeriod.HourOfDay => dt.getHour.toDouble - case TimePeriod.MonthOfYear => dt.getMonthValue.toDouble - case TimePeriod.WeekOfMonth => dt.get(WeekFields.of(Locale.US).weekOfMonth).toDouble - case TimePeriod.WeekOfYear => dt.get(WeekFields.of(Locale.US).weekOfYear).toDouble - } - - unproj % getTimePeriodMaxBins(period) + case Some(period) => DateToUnitCircle.convertToBin(timestamp, period) case None => timestamp.toDouble } - - /** - * Utility for getting maximum number of bins to use given an input time period. - * - * @param period input time period - * @return maximum number of bins associated to input time period - */ - private def getTimePeriodMaxBins(period: TimePeriod): Int = period match { - case TimePeriod.DayOfMonth => 31 - case TimePeriod.DayOfWeek => 7 - case TimePeriod.DayOfYear => 366 - case TimePeriod.HourOfDay => 24 - case TimePeriod.MonthOfYear => 12 - case TimePeriod.WeekOfMonth => 5 - case TimePeriod.WeekOfYear => 53 - } } diff --git a/core/src/main/scala/com/salesforce/op/filters/RawFeatureFilter.scala b/core/src/main/scala/com/salesforce/op/filters/RawFeatureFilter.scala index c84d0808fc..7862e9353d 100644 --- a/core/src/main/scala/com/salesforce/op/filters/RawFeatureFilter.scala +++ b/core/src/main/scala/com/salesforce/op/filters/RawFeatureFilter.scala @@ -79,6 +79,8 @@ import scala.util.Failure * Input arguments are [[Summary]] and number of bins to use in computing feature * distributions (histograms for numerics, hashes for strings). * Output is the bins for the text features. + * @param timePeriod Time period used to apply circulate date transformation for date features, if + * not specified will use regular numeric feature transformation * @tparam T datatype of the reader */ class RawFeatureFilter[T] diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/feature/DateToUnitCircleTransformer.scala b/core/src/main/scala/com/salesforce/op/stages/impl/feature/DateToUnitCircleTransformer.scala index d995cc010d..233c2f07bf 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/feature/DateToUnitCircleTransformer.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/feature/DateToUnitCircleTransformer.scala @@ -100,23 +100,31 @@ class DateToUnitCircleTransformer[T <: Date] } } -private[op] object DateToUnitCircle { +object DateToUnitCircle { def metadataValues(timePeriod: TimePeriod): Seq[String] = Seq(s"x_$timePeriod", s"y_$timePeriod") - def convertToRandians(timestamp: Option[Long], timePeriodDesired: TimePeriod): Array[Double] = { - val datetime: Option[JDateTime] = timestamp.map(new JDateTime(_)) - val (timePeriod, periodSize) = timePeriodDesired match { - case TimePeriod.DayOfMonth => (datetime.map(_.dayOfMonth().get() - 1), 31) - case TimePeriod.DayOfWeek => (datetime.map(_.dayOfWeek().get() - 1), 7) - case TimePeriod.DayOfYear => (datetime.map(_.dayOfYear().get() - 1), 366) - case TimePeriod.HourOfDay => (datetime.map(_.hourOfDay().get()), 24) - case TimePeriod.MonthOfYear => (datetime.map(_.monthOfYear().get() - 1), 12) - case TimePeriod.WeekOfMonth => ( - datetime.map(x => x.weekOfWeekyear().get() - x.withDayOfMonth(1).weekOfWeekyear().get()), 6) - case TimePeriod.WeekOfYear => (datetime.map(_.weekOfWeekyear().get() - 1), 53) + def convertToBin(timestamp: Long, timePeriodDesired: TimePeriod): Double = + getPeriodWithSize(timestamp, timePeriodDesired)._1 + + def convertToRandians(timestamp: Option[Long], timePeriodDesired: TimePeriod): Array[Double] = + timestamp.map { ts => + val (timePeriod, periodSize) = getPeriodWithSize(ts, timePeriodDesired) + val radians = (2 * math.Pi * timePeriod) / periodSize + Array(math.cos(radians), math.sin(radians)) + }.getOrElse(Array(0.0, 0.0)) + + private def getPeriodWithSize(timestamp: Long, timePeriod: TimePeriod): (Double, Int) = { + val dt = new JDateTime(timestamp) + timePeriod match { + case TimePeriod.DayOfMonth => (dt.dayOfMonth.get.toDouble % 31, 31) + case TimePeriod.DayOfWeek => (dt.dayOfWeek.get.toDouble % 7, 7) + case TimePeriod.DayOfYear => (dt.dayOfYear.get.toDouble % 366, 366) + case TimePeriod.HourOfDay => (dt.hourOfDay.get.toDouble % 24, 24) + case TimePeriod.MonthOfYear => (dt.monthOfYear.get.toDouble % 12, 12) + case TimePeriod.WeekOfMonth => + ((dt.weekOfWeekyear.get - dt.withDayOfMonth(1).weekOfWeekyear.get).toDouble % 6, 6) + case TimePeriod.WeekOfYear => (dt.weekOfWeekyear.get.toDouble % 53, 53) } - val radians = timePeriod.map(2 * math.Pi * _ / periodSize) - radians.map(r => Array(math.cos(r), math.sin(r))).getOrElse(Array(0.0, 0.0)) } } diff --git a/core/src/test/scala/com/salesforce/op/filters/PreparedFeaturesTest.scala b/core/src/test/scala/com/salesforce/op/filters/PreparedFeaturesTest.scala index afb8ba26bf..dfe75298be 100644 --- a/core/src/test/scala/com/salesforce/op/filters/PreparedFeaturesTest.scala +++ b/core/src/test/scala/com/salesforce/op/filters/PreparedFeaturesTest.scala @@ -163,65 +163,45 @@ class PreparedFeaturesTest extends FlatSpec with PassengerSparkFixtureTest { } it should "correctly transform date features when time period is specified" in { + runDateToUnitCircleTest(TimePeriod.DayOfMonth, 17.0, 17.0, 17.0, 17.0, 17.0) + runDateToUnitCircleTest(TimePeriod.DayOfWeek, 6.0, 6.0, 6.0, 6.0, 6.0) + runDateToUnitCircleTest(TimePeriod.DayOfYear, 17.0, 17.0, 17.0, 17.0, 17.0) + runDateToUnitCircleTest(TimePeriod.HourOfDay, 16.0, 16.0, 16.0, 16.0, 16.0) + runDateToUnitCircleTest(TimePeriod.MonthOfYear, 1.0, 1.0, 1.0, 1.0, 1.0) + runDateToUnitCircleTest(TimePeriod.WeekOfMonth, 2.0, 2.0, 2.0, 2.0, 2.0) + runDateToUnitCircleTest(TimePeriod.WeekOfYear, 3.0, 3.0, 3.0, 3.0, 3.0) + } + + def runDateToUnitCircleTest( + period: TimePeriod, + expected1: Double, + expected2: Double, + expected3: Double, + expected4: Double, + expected5: Double): Unit = { val dateMap = FeatureBuilder.DateMap[Passenger].extract(p => Map("DTMap" -> p.getBoarded.toLong).toDateMap).asPredictor val dateFeatures: Array[OPFeature] = Array(boarded, boardedTime, boardedTimeAsDateTime, dateMap) val df: DataFrame = dataReader.generateDataFrame(dateFeatures) - val ppRDD5: RDD[Map[FeatureKey, ProcessedSeq]] = df.rdd - .map(PreparedFeatures( - _, Array.empty[TransientFeature], dateFeatures.map(TransientFeature(_)), Option(TimePeriod.WeekOfMonth))) - .map(_.predictors.mapValues(_.right.map(_.toList))) - val ppRDD7: RDD[Map[FeatureKey, ProcessedSeq]] = df.rdd - .map(PreparedFeatures( - _, Array.empty[TransientFeature], dateFeatures.map(TransientFeature(_)), Option(TimePeriod.DayOfWeek))) - .map(_.predictors.mapValues(_.right.map(_.toList))) - val ppRDD12: RDD[Map[FeatureKey, ProcessedSeq]] = df.rdd - .map(PreparedFeatures( - _, Array.empty[TransientFeature], dateFeatures.map(TransientFeature(_)), Option(TimePeriod.MonthOfYear))) - .map(_.predictors.mapValues(_.right.map(_.toList))) - val ppRDD24: RDD[Map[FeatureKey, ProcessedSeq]] = df.rdd - .map(PreparedFeatures( - _, Array.empty[TransientFeature], dateFeatures.map(TransientFeature(_)), Option(TimePeriod.HourOfDay))) - .map(_.predictors.mapValues(_.right.map(_.toList))) - val ppRDD31: RDD[Map[FeatureKey, ProcessedSeq]] = df.rdd - .map(PreparedFeatures( - _, Array.empty[TransientFeature], dateFeatures.map(TransientFeature(_)), Option(TimePeriod.DayOfMonth))) - .map(_.predictors.mapValues(_.right.map(_.toList))) - val ppRDD53: RDD[Map[FeatureKey, ProcessedSeq]] = df.rdd - .map(PreparedFeatures( - _, Array.empty[TransientFeature], dateFeatures.map(TransientFeature(_)), Option(TimePeriod.WeekOfYear))) - .map(_.predictors.mapValues(_.right.map(_.toList))) - val ppRDD366: RDD[Map[FeatureKey, ProcessedSeq]] = df.rdd - .map(PreparedFeatures( - _, Array.empty[TransientFeature], dateFeatures.map(TransientFeature(_)), Option(TimePeriod.DayOfYear))) - .map(_.predictors.mapValues(_.right.map(_.toList))) - val ppRDDNone: RDD[Map[FeatureKey, ProcessedSeq]] = df.rdd - .map(PreparedFeatures( - _, Array.empty[TransientFeature], dateFeatures.map(TransientFeature(_)), None)) - .map(_.predictors.mapValues(_.right.map(_.toList))) - - def createExpected(d: Double): Seq[(FeatureKey, ProcessedSeq)] = Seq( - (boarded.name, None) -> Right(List(d, d)), - (boarded.name, None) -> Right(List(d)), + def createExpectedDateMap(d: Double, aggregates: Int): Map[FeatureKey, ProcessedSeq] = Map( + (boarded.name, None) -> Right((0 until aggregates).map(_ => d).toList), (boardedTime.name, None) -> Right(List(d)), (boardedTimeAsDateTime.name, None) -> Right(List(d)), (dateMap.name, Option("DTMap")) -> Right(List(d))) - val expected5 = Seq(4.0).map(createExpected) - val expected7 = Seq(0.0).map(createExpected) - val expected12 = Seq(1.0).map(createExpected) - val expected24 = Seq(0.0).map(createExpected) - val expected31 = Seq(18.0).map(createExpected) - val expected53 = Seq(4.0).map(createExpected) - val expected366 = Seq(18.0).map(createExpected) - - ppRDD5.collect.flatMap(identity(_)).toSet should contain theSameElementsAs expected5.flatMap(identity(_)) - ppRDD7.collect.flatMap(identity(_)).toSet should contain theSameElementsAs expected7.flatMap(identity(_)) - ppRDD12.collect.flatMap(identity(_)).toSet should contain theSameElementsAs expected12.flatMap(identity(_)) - ppRDD24.collect.flatMap(identity(_)).toSet should contain theSameElementsAs expected24.flatMap(identity(_)) - ppRDD53.collect.flatMap(identity(_)).toSet should contain theSameElementsAs expected53.flatMap(identity(_)) - ppRDD366.collect.flatMap(identity(_)).toSet should contain theSameElementsAs expected366.flatMap(identity(_)) + val ppRDD: RDD[Map[FeatureKey, ProcessedSeq]] = df.rdd + .map(PreparedFeatures( + _, Array.empty[TransientFeature], dateFeatures.map(TransientFeature(_)), Option(period))) + .map(_.predictors.mapValues(_.right.map(_.toList))) + + val expected: Seq[Map[FeatureKey, ProcessedSeq]] = + // The first observation is expected to be aggregated twice + Seq(createExpectedDateMap(expected1, 2)) ++ + Seq(expected2, expected3, expected4, expected5).map(createExpectedDateMap(_, 1)) ++ + Seq(Map[FeatureKey, ProcessedSeq]()) + + ppRDD.collect should contain theSameElementsAs expected } def testCorrMatrix( From 37f4c4182454036dfec2ce92ce4d59612a5377cc Mon Sep 17 00:00:00 2001 From: marcovivero Date: Thu, 13 Sep 2018 11:40:51 -0700 Subject: [PATCH 04/11] Forgot some docs --- .../main/scala/com/salesforce/op/filters/PreparedFeatures.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/com/salesforce/op/filters/PreparedFeatures.scala b/core/src/main/scala/com/salesforce/op/filters/PreparedFeatures.scala index 9d11a98974..72390a2fcd 100644 --- a/core/src/main/scala/com/salesforce/op/filters/PreparedFeatures.scala +++ b/core/src/main/scala/com/salesforce/op/filters/PreparedFeatures.scala @@ -151,6 +151,8 @@ private[filters] object PreparedFeatures { * * @param name feature name * @param value feature value + * @param timePeriod optional time period to use raw feature binning, otherwise standard numeric transformation + * is applied * @tparam T type of the feature * @return tuple containing whether the feature was empty and a sequence of either doubles or strings */ From 84f72ae39156da011f523f00791f883047805189 Mon Sep 17 00:00:00 2001 From: marcovivero Date: Thu, 13 Sep 2018 13:06:48 -0700 Subject: [PATCH 05/11] Fix tests --- .../impl/feature/DateToUnitCircleTransformer.scala | 14 +++++++------- .../op/filters/PreparedFeaturesTest.scala | 10 +++++----- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/feature/DateToUnitCircleTransformer.scala b/core/src/main/scala/com/salesforce/op/stages/impl/feature/DateToUnitCircleTransformer.scala index 233c2f07bf..57fc91aacc 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/feature/DateToUnitCircleTransformer.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/feature/DateToUnitCircleTransformer.scala @@ -117,14 +117,14 @@ object DateToUnitCircle { private def getPeriodWithSize(timestamp: Long, timePeriod: TimePeriod): (Double, Int) = { val dt = new JDateTime(timestamp) timePeriod match { - case TimePeriod.DayOfMonth => (dt.dayOfMonth.get.toDouble % 31, 31) - case TimePeriod.DayOfWeek => (dt.dayOfWeek.get.toDouble % 7, 7) - case TimePeriod.DayOfYear => (dt.dayOfYear.get.toDouble % 366, 366) - case TimePeriod.HourOfDay => (dt.hourOfDay.get.toDouble % 24, 24) - case TimePeriod.MonthOfYear => (dt.monthOfYear.get.toDouble % 12, 12) + case TimePeriod.DayOfMonth => (dt.dayOfMonth.get.toDouble - 1, 31) + case TimePeriod.DayOfWeek => (dt.dayOfWeek.get.toDouble - 1, 7) + case TimePeriod.DayOfYear => (dt.dayOfYear.get.toDouble - 1, 366) + case TimePeriod.HourOfDay => (dt.hourOfDay.get.toDouble, 24) + case TimePeriod.MonthOfYear => (dt.monthOfYear.get.toDouble - 1, 12) case TimePeriod.WeekOfMonth => - ((dt.weekOfWeekyear.get - dt.withDayOfMonth(1).weekOfWeekyear.get).toDouble % 6, 6) - case TimePeriod.WeekOfYear => (dt.weekOfWeekyear.get.toDouble % 53, 53) + ((dt.weekOfWeekyear.get - dt.withDayOfMonth(1).weekOfWeekyear.get).toDouble, 6) + case TimePeriod.WeekOfYear => (dt.weekOfWeekyear.get.toDouble - 1, 53) } } } diff --git a/core/src/test/scala/com/salesforce/op/filters/PreparedFeaturesTest.scala b/core/src/test/scala/com/salesforce/op/filters/PreparedFeaturesTest.scala index dfe75298be..8a0cfcc1a8 100644 --- a/core/src/test/scala/com/salesforce/op/filters/PreparedFeaturesTest.scala +++ b/core/src/test/scala/com/salesforce/op/filters/PreparedFeaturesTest.scala @@ -163,13 +163,13 @@ class PreparedFeaturesTest extends FlatSpec with PassengerSparkFixtureTest { } it should "correctly transform date features when time period is specified" in { - runDateToUnitCircleTest(TimePeriod.DayOfMonth, 17.0, 17.0, 17.0, 17.0, 17.0) - runDateToUnitCircleTest(TimePeriod.DayOfWeek, 6.0, 6.0, 6.0, 6.0, 6.0) - runDateToUnitCircleTest(TimePeriod.DayOfYear, 17.0, 17.0, 17.0, 17.0, 17.0) + runDateToUnitCircleTest(TimePeriod.DayOfMonth, 16.0, 16.0, 16.0, 16.0, 16.0) + runDateToUnitCircleTest(TimePeriod.DayOfWeek, 5.0, 5.0, 5.0, 5.0, 5.0) + runDateToUnitCircleTest(TimePeriod.DayOfYear, 16.0, 16.0, 16.0, 16.0, 16.0) runDateToUnitCircleTest(TimePeriod.HourOfDay, 16.0, 16.0, 16.0, 16.0, 16.0) - runDateToUnitCircleTest(TimePeriod.MonthOfYear, 1.0, 1.0, 1.0, 1.0, 1.0) + runDateToUnitCircleTest(TimePeriod.MonthOfYear, 0.0, 0.0, 0.0, 0.0, 0.0) runDateToUnitCircleTest(TimePeriod.WeekOfMonth, 2.0, 2.0, 2.0, 2.0, 2.0) - runDateToUnitCircleTest(TimePeriod.WeekOfYear, 3.0, 3.0, 3.0, 3.0, 3.0) + runDateToUnitCircleTest(TimePeriod.WeekOfYear, 2.0, 2.0, 2.0, 2.0, 2.0) } def runDateToUnitCircleTest( From 9d6ea4412a8d4e69ec35b26282dc3d64414aab4e Mon Sep 17 00:00:00 2001 From: marcovivero Date: Thu, 13 Sep 2018 13:43:36 -0700 Subject: [PATCH 06/11] Split out time period test cases --- .../feature/DateToUnitCircleTransformer.scala | 2 +- .../op/filters/PreparedFeaturesTest.scala | 20 ++++++++++++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/feature/DateToUnitCircleTransformer.scala b/core/src/main/scala/com/salesforce/op/stages/impl/feature/DateToUnitCircleTransformer.scala index 57fc91aacc..ce34e768ca 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/feature/DateToUnitCircleTransformer.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/feature/DateToUnitCircleTransformer.scala @@ -35,7 +35,7 @@ import com.salesforce.op.utils.spark.OpVectorMetadata import com.salesforce.op.{FeatureHistory, UID} import org.apache.spark.ml.linalg.Vectors import org.apache.spark.ml.param.{Param, Params} -import org.joda.time.{DateTime => JDateTime} +import org.joda.time.{DateTime => JDateTime, DateTimeZone} import scala.reflect.runtime.universe.TypeTag diff --git a/core/src/test/scala/com/salesforce/op/filters/PreparedFeaturesTest.scala b/core/src/test/scala/com/salesforce/op/filters/PreparedFeaturesTest.scala index 8a0cfcc1a8..12eea309ae 100644 --- a/core/src/test/scala/com/salesforce/op/filters/PreparedFeaturesTest.scala +++ b/core/src/test/scala/com/salesforce/op/filters/PreparedFeaturesTest.scala @@ -162,13 +162,31 @@ class PreparedFeaturesTest extends FlatSpec with PassengerSparkFixtureTest { testCorrMatrix(allResponseKeys2, CorrelationType.Spearman, expected) } - it should "correctly transform date features when time period is specified" in { + it should "correctly transform date features when time period DayOfMonth is specified" in { runDateToUnitCircleTest(TimePeriod.DayOfMonth, 16.0, 16.0, 16.0, 16.0, 16.0) + } + + it should "correctly transform date features when time period DayOfWeek is specified" in { runDateToUnitCircleTest(TimePeriod.DayOfWeek, 5.0, 5.0, 5.0, 5.0, 5.0) + } + + it should "correctly transform date features when time period DayOfYear is specified" in { runDateToUnitCircleTest(TimePeriod.DayOfYear, 16.0, 16.0, 16.0, 16.0, 16.0) + } + + it should "correctly transform date features when time period HourOfDay is specified" in { runDateToUnitCircleTest(TimePeriod.HourOfDay, 16.0, 16.0, 16.0, 16.0, 16.0) + } + + it should "correctly transform date features when time period MonthOfYear is specified" in { runDateToUnitCircleTest(TimePeriod.MonthOfYear, 0.0, 0.0, 0.0, 0.0, 0.0) + } + + it should "correctly transform date features when time period WeekOfMonth is specified" in { runDateToUnitCircleTest(TimePeriod.WeekOfMonth, 2.0, 2.0, 2.0, 2.0, 2.0) + } + + it should "correctly transform date features when time period WeekOfYear is specified" in { runDateToUnitCircleTest(TimePeriod.WeekOfYear, 2.0, 2.0, 2.0, 2.0, 2.0) } From bc39bfe123323a0efe2e4d586695f97c3fc3bd21 Mon Sep 17 00:00:00 2001 From: marcovivero Date: Thu, 13 Sep 2018 14:22:36 -0700 Subject: [PATCH 07/11] Add time zone + move out dataframe from function --- .../impl/feature/DateToUnitCircleTransformer.scala | 2 +- .../salesforce/op/filters/PreparedFeaturesTest.scala | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/feature/DateToUnitCircleTransformer.scala b/core/src/main/scala/com/salesforce/op/stages/impl/feature/DateToUnitCircleTransformer.scala index ce34e768ca..cfef3001d1 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/feature/DateToUnitCircleTransformer.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/feature/DateToUnitCircleTransformer.scala @@ -115,7 +115,7 @@ object DateToUnitCircle { }.getOrElse(Array(0.0, 0.0)) private def getPeriodWithSize(timestamp: Long, timePeriod: TimePeriod): (Double, Int) = { - val dt = new JDateTime(timestamp) + val dt = new JDateTime(timestamp).withZone(DateTimeZone.UTC) timePeriod match { case TimePeriod.DayOfMonth => (dt.dayOfMonth.get.toDouble - 1, 31) case TimePeriod.DayOfWeek => (dt.dayOfWeek.get.toDouble - 1, 7) diff --git a/core/src/test/scala/com/salesforce/op/filters/PreparedFeaturesTest.scala b/core/src/test/scala/com/salesforce/op/filters/PreparedFeaturesTest.scala index 12eea309ae..31cdd3839d 100644 --- a/core/src/test/scala/com/salesforce/op/filters/PreparedFeaturesTest.scala +++ b/core/src/test/scala/com/salesforce/op/filters/PreparedFeaturesTest.scala @@ -77,6 +77,11 @@ class PreparedFeaturesTest extends FlatSpec with PassengerSparkFixtureTest { val allPredictorKeys1 = Array(predictorKey1, predictorKey2A, predictorKey2B) val allPredictorKeys2 = Array(predictorKey1) + val dateMap = + FeatureBuilder.DateMap[Passenger].extract(p => Map("DTMap" -> p.getBoarded.toLong).toDateMap).asPredictor + val dateFeatures: Array[OPFeature] = Array(boarded, boardedTime, boardedTimeAsDateTime, dateMap) + lazy val dateDataFrame: DataFrame = dataReader.generateDataFrame(dateFeatures) + Spec[PreparedFeatures] should "produce correct summaries" in { val (responseSummaries1, predictorSummaries1) = preparedFeatures1.summaries val (responseSummaries2, predictorSummaries2) = preparedFeatures2.summaries @@ -197,18 +202,13 @@ class PreparedFeaturesTest extends FlatSpec with PassengerSparkFixtureTest { expected3: Double, expected4: Double, expected5: Double): Unit = { - val dateMap = - FeatureBuilder.DateMap[Passenger].extract(p => Map("DTMap" -> p.getBoarded.toLong).toDateMap).asPredictor - val dateFeatures: Array[OPFeature] = Array(boarded, boardedTime, boardedTimeAsDateTime, dateMap) - val df: DataFrame = dataReader.generateDataFrame(dateFeatures) - def createExpectedDateMap(d: Double, aggregates: Int): Map[FeatureKey, ProcessedSeq] = Map( (boarded.name, None) -> Right((0 until aggregates).map(_ => d).toList), (boardedTime.name, None) -> Right(List(d)), (boardedTimeAsDateTime.name, None) -> Right(List(d)), (dateMap.name, Option("DTMap")) -> Right(List(d))) - val ppRDD: RDD[Map[FeatureKey, ProcessedSeq]] = df.rdd + val ppRDD: RDD[Map[FeatureKey, ProcessedSeq]] = dateDataFrame.rdd .map(PreparedFeatures( _, Array.empty[TransientFeature], dateFeatures.map(TransientFeature(_)), Option(period))) .map(_.predictors.mapValues(_.right.map(_.toList))) From f255f874f6b7701f633405f5f74ee01603a53d19 Mon Sep 17 00:00:00 2001 From: marcovivero Date: Thu, 13 Sep 2018 14:59:29 -0700 Subject: [PATCH 08/11] mt/Clean up tests --- .../op/filters/PreparedFeaturesTest.scala | 106 +++++++----------- 1 file changed, 43 insertions(+), 63 deletions(-) diff --git a/core/src/test/scala/com/salesforce/op/filters/PreparedFeaturesTest.scala b/core/src/test/scala/com/salesforce/op/filters/PreparedFeaturesTest.scala index 31cdd3839d..2c53192c3f 100644 --- a/core/src/test/scala/com/salesforce/op/filters/PreparedFeaturesTest.scala +++ b/core/src/test/scala/com/salesforce/op/filters/PreparedFeaturesTest.scala @@ -30,20 +30,16 @@ package com.salesforce.op.filters -import scala.math.round - -import com.salesforce.op.features.{FeatureBuilder, OPFeature, TransientFeature} import com.salesforce.op.features.types._ -import com.salesforce.op.readers._ +import com.salesforce.op.features.{FeatureBuilder, OPFeature, TransientFeature} import com.salesforce.op.stages.impl.feature.TimePeriod import com.salesforce.op.stages.impl.preparators.CorrelationType import com.salesforce.op.test.{Passenger, PassengerSparkFixtureTest} import com.twitter.algebird.Monoid._ import com.twitter.algebird.Operators._ -import org.apache.spark.mllib.linalg.{Matrix, Vector} import org.apache.spark.mllib.stat.Statistics import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.DataFrame import org.junit.runner.RunWith import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner @@ -77,10 +73,6 @@ class PreparedFeaturesTest extends FlatSpec with PassengerSparkFixtureTest { val allPredictorKeys1 = Array(predictorKey1, predictorKey2A, predictorKey2B) val allPredictorKeys2 = Array(predictorKey1) - val dateMap = - FeatureBuilder.DateMap[Passenger].extract(p => Map("DTMap" -> p.getBoarded.toLong).toDateMap).asPredictor - val dateFeatures: Array[OPFeature] = Array(boarded, boardedTime, boardedTimeAsDateTime, dateMap) - lazy val dateDataFrame: DataFrame = dataReader.generateDataFrame(dateFeatures) Spec[PreparedFeatures] should "produce correct summaries" in { val (responseSummaries1, predictorSummaries1) = preparedFeatures1.summaries @@ -167,59 +159,48 @@ class PreparedFeaturesTest extends FlatSpec with PassengerSparkFixtureTest { testCorrMatrix(allResponseKeys2, CorrelationType.Spearman, expected) } - it should "correctly transform date features when time period DayOfMonth is specified" in { - runDateToUnitCircleTest(TimePeriod.DayOfMonth, 16.0, 16.0, 16.0, 16.0, 16.0) - } - - it should "correctly transform date features when time period DayOfWeek is specified" in { - runDateToUnitCircleTest(TimePeriod.DayOfWeek, 5.0, 5.0, 5.0, 5.0, 5.0) - } - - it should "correctly transform date features when time period DayOfYear is specified" in { - runDateToUnitCircleTest(TimePeriod.DayOfYear, 16.0, 16.0, 16.0, 16.0, 16.0) - } - - it should "correctly transform date features when time period HourOfDay is specified" in { - runDateToUnitCircleTest(TimePeriod.HourOfDay, 16.0, 16.0, 16.0, 16.0, 16.0) - } - - it should "correctly transform date features when time period MonthOfYear is specified" in { - runDateToUnitCircleTest(TimePeriod.MonthOfYear, 0.0, 0.0, 0.0, 0.0, 0.0) - } - - it should "correctly transform date features when time period WeekOfMonth is specified" in { - runDateToUnitCircleTest(TimePeriod.WeekOfMonth, 2.0, 2.0, 2.0, 2.0, 2.0) - } - - it should "correctly transform date features when time period WeekOfYear is specified" in { - runDateToUnitCircleTest(TimePeriod.WeekOfYear, 2.0, 2.0, 2.0, 2.0, 2.0) - } - - def runDateToUnitCircleTest( - period: TimePeriod, - expected1: Double, - expected2: Double, - expected3: Double, - expected4: Double, - expected5: Double): Unit = { - def createExpectedDateMap(d: Double, aggregates: Int): Map[FeatureKey, ProcessedSeq] = Map( - (boarded.name, None) -> Right((0 until aggregates).map(_ => d).toList), - (boardedTime.name, None) -> Right(List(d)), - (boardedTimeAsDateTime.name, None) -> Right(List(d)), - (dateMap.name, Option("DTMap")) -> Right(List(d))) - - val ppRDD: RDD[Map[FeatureKey, ProcessedSeq]] = dateDataFrame.rdd - .map(PreparedFeatures( - _, Array.empty[TransientFeature], dateFeatures.map(TransientFeature(_)), Option(period))) - .map(_.predictors.mapValues(_.right.map(_.toList))) - - val expected: Seq[Map[FeatureKey, ProcessedSeq]] = + it should "transform dates for each period" in { + val expectedBins = Map( + TimePeriod.DayOfMonth -> 17.0, + TimePeriod.DayOfWeek -> 6.0, + TimePeriod.DayOfYear -> 17.0, + TimePeriod.HourOfDay -> 0.0, + TimePeriod.MonthOfYear -> 0.0, + TimePeriod.WeekOfMonth -> 2.0, + TimePeriod.WeekOfYear -> 2.0 + ) + expectedBins.keys should contain theSameElementsAs TimePeriod.values + + val dateMap = FeatureBuilder.DateMap[Passenger] + .extract(p => Map("DTMap" -> p.getBoarded.toLong).toDateMap).asPredictor + + val dateFeatures: Array[OPFeature] = Array(boarded, boardedTime, boardedTimeAsDateTime, dateMap) + val dateDataFrame: DataFrame = dataReader.generateDataFrame(dateFeatures).persist() + + for { + (period, expectedBin) <- expectedBins + } { + def createExpectedDateMap(d: Double, aggregates: Int): Map[FeatureKey, ProcessedSeq] = Map( + (boarded.name, None) -> Right((0 until aggregates).map(_ => d).toList), + (boardedTime.name, None) -> Right(List(d)), + (boardedTimeAsDateTime.name, None) -> Right(List(d)), + (dateMap.name, Option("DTMap")) -> Right(List(d))) + + val res = dateDataFrame.rdd + .map(PreparedFeatures(_, Array.empty, dateFeatures.map(TransientFeature(_)), Option(period))) + .map(_.predictors.mapValues(_.right.map(_.toList))) + .collect() + + val expectedResults: Seq[Map[FeatureKey, ProcessedSeq]] = // The first observation is expected to be aggregated twice - Seq(createExpectedDateMap(expected1, 2)) ++ - Seq(expected2, expected3, expected4, expected5).map(createExpectedDateMap(_, 1)) ++ - Seq(Map[FeatureKey, ProcessedSeq]()) + Seq(createExpectedDateMap(expectedBin, 2)) ++ + Seq.fill(4)(expectedBin).map(createExpectedDateMap(_, 1)) ++ + Seq(Map[FeatureKey, ProcessedSeq]()) - ppRDD.collect should contain theSameElementsAs expected + withClue(s"Computed bin for $period period does not match:\n") { + res should contain theSameElementsAs expectedResults + } + } } def testCorrMatrix( @@ -227,8 +208,7 @@ class PreparedFeaturesTest extends FlatSpec with PassengerSparkFixtureTest { correlationType: CorrelationType, expectedResult: Seq[Array[Double]] ): Unit = { - val corrRDD = - sc.parallelize(allPreparedFeatures.map(_.getNullLabelLeakageVector(responseKeys, allPredictorKeys1))) + val corrRDD = sc.parallelize(allPreparedFeatures.map(_.getNullLabelLeakageVector(responseKeys, allPredictorKeys1))) val corrMatrix = Statistics.corr(corrRDD, correlationType.sparkName) corrMatrix.colIter.zipWithIndex.map { case(vec, idx) => From 0ef87da26b08ea9a078048fb19081b6bda3431db Mon Sep 17 00:00:00 2001 From: marcovivero Date: Thu, 13 Sep 2018 15:10:17 -0700 Subject: [PATCH 09/11] Address last few comments --- .../com/salesforce/op/filters/PreparedFeatures.scala | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/filters/PreparedFeatures.scala b/core/src/main/scala/com/salesforce/op/filters/PreparedFeatures.scala index 72390a2fcd..6ba44c0034 100644 --- a/core/src/main/scala/com/salesforce/op/filters/PreparedFeatures.scala +++ b/core/src/main/scala/com/salesforce/op/filters/PreparedFeatures.scala @@ -126,6 +126,8 @@ private[filters] object PreparedFeatures { * @param row data frame row * @param responses transient features derived from responses * @param predictors transient features derived from predictors + * @param timePeriod optional time period to use raw feature binning, otherwise standard numeric transformation + * is applied * @return set of prepared features */ def apply( @@ -161,16 +163,12 @@ private[filters] object PreparedFeatures { value: T, timePeriod: Option[TimePeriod]): Map[FeatureKey, ProcessedSeq] = value match { - case v: Text => v.value - .map(s => Map[FeatureKey, ProcessedSeq]((name, None) -> Left(tokenize(s)))).getOrElse(Map.empty) - case v: Date => v.value.map { timestamp => - Map[FeatureKey, ProcessedSeq]((name, None) -> Right(Seq(prepareDateValue(timestamp, timePeriod)))) - }.getOrElse(Map.empty) - case v: OPNumeric[_] => v.toDouble - .map(d => Map[FeatureKey, ProcessedSeq]((name, None) -> Right(Seq(d)))).getOrElse(Map.empty) case SomeValue(v: DenseVector) => Map((name, None) -> Right(v.toArray.toSeq)) case SomeValue(v: SparseVector) => Map((name, None) -> Right(v.indices.map(_.toDouble).toSeq)) case ft@SomeValue(_) => ft match { + case v: Text => Map((name, None) -> Left(v.value.toSeq.flatMap(tokenize))) + case v: Date => Map((name, None) -> Right(v.value.map(prepareDateValue(_, timePeriod)).toSeq)) + case v: OPNumeric[_] => Map((name, None) -> Right(v.toDouble.toSeq)) case v: Geolocation => Map((name, None) -> Right(v.value)) case v: TextList => Map((name, None) -> Left(v.value)) case v: DateList => Map((name, None) -> Right(v.value.map(prepareDateValue(_, timePeriod)))) From cb63dd425da0debf9d0b121e8bfb385a79e685f1 Mon Sep 17 00:00:00 2001 From: marcovivero Date: Thu, 13 Sep 2018 15:22:44 -0700 Subject: [PATCH 10/11] Add back private[op] --- .../op/stages/impl/feature/DateToUnitCircleTransformer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/feature/DateToUnitCircleTransformer.scala b/core/src/main/scala/com/salesforce/op/stages/impl/feature/DateToUnitCircleTransformer.scala index cfef3001d1..58e678558d 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/feature/DateToUnitCircleTransformer.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/feature/DateToUnitCircleTransformer.scala @@ -100,7 +100,7 @@ class DateToUnitCircleTransformer[T <: Date] } } -object DateToUnitCircle { +private[op] object DateToUnitCircle { def metadataValues(timePeriod: TimePeriod): Seq[String] = Seq(s"x_$timePeriod", s"y_$timePeriod") From 95820022b2f78dc487aaaf1aac6f2eabb90afb98 Mon Sep 17 00:00:00 2001 From: marcovivero Date: Mon, 17 Sep 2018 22:13:14 -0700 Subject: [PATCH 11/11] Silly bug --- core/src/main/scala/com/salesforce/op/OpWorkflow.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/com/salesforce/op/OpWorkflow.scala b/core/src/main/scala/com/salesforce/op/OpWorkflow.scala index a060de82d8..d0c176f161 100644 --- a/core/src/main/scala/com/salesforce/op/OpWorkflow.scala +++ b/core/src/main/scala/com/salesforce/op/OpWorkflow.scala @@ -547,7 +547,8 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore { correlationType = correlationType, protectedFeatures = protectedRawFeatures, jsDivergenceProtectedFeatures = protectedRawJSFeatures, - textBinsFormula = textBinsFormula) + textBinsFormula = textBinsFormula, + timePeriod = timePeriod) } this }