Skip to content

Commit

Permalink
Apply DateToUnitCircleTransformer logic in raw feature transformation…
Browse files Browse the repository at this point in the history
…s. (#130)
  • Loading branch information
marcovivero authored and tovbinm committed Sep 21, 2018
1 parent a32f461 commit 3e84f4d
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 54 deletions.
50 changes: 29 additions & 21 deletions core/src/main/scala/com/salesforce/op/OpWorkflow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import com.salesforce.op.features.OPFeature
import com.salesforce.op.filters.{FeatureDistribution, RawFeatureFilter, Summary}
import com.salesforce.op.readers.Reader
import com.salesforce.op.stages.OPStage
import com.salesforce.op.stages.impl.feature.TimePeriod
import com.salesforce.op.stages.impl.preparators.CorrelationType
import com.salesforce.op.stages.impl.selector.ModelSelector
import com.salesforce.op.utils.reflection.ReflectionUtils
Expand Down Expand Up @@ -488,24 +489,27 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
* Add a raw features filter to the workflow to look at fill rates and distributions of raw features and exclude
* features that do not meet specifications from modeling DAG
*
* @param trainingReader training reader to use in filter if not supplied will fall back to reader specified for
* workflow (note that this reader will take precedence over readers directly input to the
* workflow if both are supplied)
* @param scoringReader scoring reader to use in filter if not supplied will do the checks possible with only
* training data available
* @param bins number of bins to use in estimating feature distributions
* @param minFillRate minimum non-null fraction of instances that a feature should contain
* @param maxFillDifference maximum absolute difference in fill rate between scoring and training data for a feature
* @param maxFillRatioDiff maximum difference in fill ratio (symmetric) between scoring and training data for
* a feature
* @param maxJSDivergence maximum Jensen-Shannon divergence between the training and scoring distributions
* for a feature
* @param protectedFeatures list of features that should never be removed (features that are used to create them will
* also be protected)
* @param textBinsFormula formula to compute the text features bin size.
* Input arguments are [[Summary]] and number of bins to use in computing
* feature distributions (histograms for numerics, hashes for strings).
* Output is the bins for the text features.
* @param trainingReader training reader to use in filter if not supplied will fall back to reader specified for
* workflow (note that this reader will take precedence over readers directly input to the
* workflow if both are supplied)
* @param scoringReader scoring reader to use in filter if not supplied will do the checks possible with only
* training data available
* @param bins number of bins to use in estimating feature distributions
* @param minFillRate minimum non-null fraction of instances that a feature should contain
* @param maxFillDifference maximum absolute difference in fill rate between scoring and training data for a feature
* @param maxFillRatioDiff maximum difference in fill ratio (symmetric) between scoring and training data for
* a feature
* @param maxJSDivergence maximum Jensen-Shannon divergence between the training and scoring distributions
* for a feature
* @param protectedFeatures list of features that should never be removed (features that are used to create them will
* also be protected)
* @param protectedJSFeatures features that are protected from removal by JS divergence check
* @param textBinsFormula formula to compute the text features bin size.
* Input arguments are [[Summary]] and number of bins to use in computing
* feature distributions (histograms for numerics, hashes for strings).
* Output is the bins for the text features.
* @param timePeriod Time period used to apply circulate date transformation for date features, if not
* specified will use numeric feature transformation
* @tparam T Type of the data read in
*/
@Experimental
Expand All @@ -521,12 +525,15 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
maxCorrelation: Double = 0.95,
correlationType: CorrelationType = CorrelationType.Pearson,
protectedFeatures: Array[OPFeature] = Array.empty,
textBinsFormula: (Summary, Int) => Int = RawFeatureFilter.textBinsFormula
protectedJSFeatures: Array[OPFeature] = Array.empty,
textBinsFormula: (Summary, Int) => Int = RawFeatureFilter.textBinsFormula,
timePeriod: Option[TimePeriod] = None
): this.type = {
val training = trainingReader.orElse(reader).map(_.asInstanceOf[Reader[T]])
require(training.nonEmpty, "Reader for training data must be provided either in withRawFeatureFilter or directly" +
"as the reader for the workflow")
val protectedRawFeatures = protectedFeatures.flatMap(_.rawFeatures).map(_.name).toSet
val protectedRawJSFeatures = protectedJSFeatures.flatMap(_.rawFeatures).map(_.name).toSet
rawFeatureFilter = Option {
new RawFeatureFilter(
trainingReader = training.get,
Expand All @@ -539,8 +546,9 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
maxCorrelation = maxCorrelation,
correlationType = correlationType,
protectedFeatures = protectedRawFeatures,
textBinsFormula = textBinsFormula
)
jsDivergenceProtectedFeatures = protectedRawJSFeatures,
textBinsFormula = textBinsFormula,
timePeriod = timePeriod)
}
this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@

package com.salesforce.op.filters


import com.salesforce.op.features.TransientFeature
import com.salesforce.op.features.types._
import com.salesforce.op.stages.impl.feature.TextTokenizer
import com.salesforce.op.stages.impl.feature.{DateToUnitCircle, TextTokenizer, TimePeriod}
import com.salesforce.op.utils.spark.RichRow._
import com.salesforce.op.utils.text.Language
import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors}
Expand Down Expand Up @@ -125,17 +126,23 @@ private[filters] object PreparedFeatures {
* @param row data frame row
* @param responses transient features derived from responses
* @param predictors transient features derived from predictors
* @param timePeriod optional time period to use raw feature binning, otherwise standard numeric transformation
* is applied
* @return set of prepared features
*/
def apply(row: Row, responses: Array[TransientFeature], predictors: Array[TransientFeature]): PreparedFeatures = {
def apply(
row: Row,
responses: Array[TransientFeature],
predictors: Array[TransientFeature],
timePeriod: Option[TimePeriod]): PreparedFeatures = {
val empty: Map[FeatureKey, ProcessedSeq] = Map.empty
val preparedResponses = responses.foldLeft(empty) { case (map, feature) =>
val converter = FeatureTypeSparkConverter.fromFeatureTypeName(feature.typeName)
map ++ prepareFeature(feature.name, row.getFeatureType(feature)(converter))
map ++ prepareFeature(feature.name, row.getFeatureType(feature)(converter), timePeriod)
}
val preparedPredictors = predictors.foldLeft(empty) { case (map, feature) =>
val converter = FeatureTypeSparkConverter.fromFeatureTypeName(feature.typeName)
map ++ prepareFeature(feature.name, row.getFeatureType(feature)(converter))
map ++ prepareFeature(feature.name, row.getFeatureType(feature)(converter), timePeriod)
}

PreparedFeatures(responses = preparedResponses, predictors = preparedPredictors)
Expand All @@ -146,24 +153,30 @@ private[filters] object PreparedFeatures {
*
* @param name feature name
* @param value feature value
* @param timePeriod optional time period to use raw feature binning, otherwise standard numeric transformation
* is applied
* @tparam T type of the feature
* @return tuple containing whether the feature was empty and a sequence of either doubles or strings
*/
private def prepareFeature[T <: FeatureType](name: String, value: T): Map[FeatureKey, ProcessedSeq] =
private def prepareFeature[T <: FeatureType](
name: String,
value: T,
timePeriod: Option[TimePeriod]): Map[FeatureKey, ProcessedSeq] =
value match {
case v: Text => v.value
.map(s => Map[FeatureKey, ProcessedSeq]((name, None) -> Left(tokenize(s)))).getOrElse(Map.empty)
case v: OPNumeric[_] => v.toDouble
.map(d => Map[FeatureKey, ProcessedSeq]((name, None) -> Right(Seq(d)))).getOrElse(Map.empty)
case SomeValue(v: DenseVector) => Map((name, None) -> Right(v.toArray.toSeq))
case SomeValue(v: SparseVector) => Map((name, None) -> Right(v.indices.map(_.toDouble).toSeq))
case ft@SomeValue(_) => ft match {
case v: Text => Map((name, None) -> Left(v.value.toSeq.flatMap(tokenize)))
case v: Date => Map((name, None) -> Right(v.value.map(prepareDateValue(_, timePeriod)).toSeq))
case v: OPNumeric[_] => Map((name, None) -> Right(v.toDouble.toSeq))
case v: Geolocation => Map((name, None) -> Right(v.value))
case v: TextList => Map((name, None) -> Left(v.value))
case v: DateList => Map((name, None) -> Right(v.value.map(_.toDouble)))
case v: DateList => Map((name, None) -> Right(v.value.map(prepareDateValue(_, timePeriod))))
case v: MultiPickList => Map((name, None) -> Left(v.value.toSeq))
case v: MultiPickListMap => v.value.map { case (k, e) => (name, Option(k)) -> Left(e.toSeq) }
case v: GeolocationMap => v.value.map{ case (k, e) => (name, Option(k)) -> Right(e) }
case v: DateMap =>
v.value.map { case (k, e) => (name, Option(k)) -> Right(Seq(prepareDateValue(e, timePeriod))) }
case v: OPMap[_] => v.value.map { case (k, e) => e match {
case d: Double => (name, Option(k)) -> Right(Seq(d))
// Do not need to distinguish between string map types, all text is tokenized for distribution calculation
Expand All @@ -183,4 +196,10 @@ private[filters] object PreparedFeatures {
* @return array of string tokens
*/
private def tokenize(s: String) = TextTokenizer.Analyzer.analyze(s, Language.Unknown)

private def prepareDateValue(timestamp: Long, timePeriod: Option[TimePeriod]): Double =
timePeriod match {
case Some(period) => DateToUnitCircle.convertToBin(timestamp, period)
case None => timestamp.toDouble
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import com.salesforce.op.OpParams
import com.salesforce.op.features.types._
import com.salesforce.op.features.{OPFeature, TransientFeature}
import com.salesforce.op.readers.{DataFrameFieldNames, Reader}
import com.salesforce.op.stages.impl.feature.TimePeriod
import com.salesforce.op.stages.impl.preparators.CorrelationType
import com.salesforce.op.utils.spark.RichRow._
import com.twitter.algebird.Monoid._
Expand Down Expand Up @@ -78,6 +79,8 @@ import scala.util.Failure
* Input arguments are [[Summary]] and number of bins to use in computing feature
* distributions (histograms for numerics, hashes for strings).
* Output is the bins for the text features.
* @param timePeriod Time period used to apply circulate date transformation for date features, if
* not specified will use regular numeric feature transformation
* @tparam T datatype of the reader
*/
class RawFeatureFilter[T]
Expand All @@ -93,7 +96,8 @@ class RawFeatureFilter[T]
val correlationType: CorrelationType = CorrelationType.Pearson,
val jsDivergenceProtectedFeatures: Set[String] = Set.empty,
val protectedFeatures: Set[String] = Set.empty,
val textBinsFormula: (Summary, Int) => Int = RawFeatureFilter.textBinsFormula
val textBinsFormula: (Summary, Int) => Int = RawFeatureFilter.textBinsFormula,
val timePeriod: Option[TimePeriod] = None
) extends Serializable {

assert(bins > 1 && bins <= FeatureDistribution.MaxBins, s"Invalid bin size $bins," +
Expand Down Expand Up @@ -139,7 +143,7 @@ class RawFeatureFilter[T]
(respOut, predOut)
}
val preparedFeatures: RDD[PreparedFeatures] =
data.rdd.map(PreparedFeatures(_, responses, predictors))
data.rdd.map(PreparedFeatures(_, responses, predictors, timePeriod))
// Have to use the training summaries do process scoring for comparison
val (responseSummaries, predictorSummaries): (Map[FeatureKey, Summary], Map[FeatureKey, Summary]) =
allFeatureInfo.map(info => info.responseSummaries -> info.predictorSummaries)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import com.salesforce.op.utils.spark.OpVectorMetadata
import com.salesforce.op.{FeatureHistory, UID}
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.param.{Param, Params}
import org.joda.time.{DateTime => JDateTime}
import org.joda.time.{DateTime => JDateTime, DateTimeZone}

import scala.reflect.runtime.universe.TypeTag

Expand Down Expand Up @@ -104,19 +104,27 @@ private[op] object DateToUnitCircle {

def metadataValues(timePeriod: TimePeriod): Seq[String] = Seq(s"x_$timePeriod", s"y_$timePeriod")

def convertToRandians(timestamp: Option[Long], timePeriodDesired: TimePeriod): Array[Double] = {
val datetime: Option[JDateTime] = timestamp.map(new JDateTime(_))
val (timePeriod, periodSize) = timePeriodDesired match {
case TimePeriod.DayOfMonth => (datetime.map(_.dayOfMonth().get() - 1), 31)
case TimePeriod.DayOfWeek => (datetime.map(_.dayOfWeek().get() - 1), 7)
case TimePeriod.DayOfYear => (datetime.map(_.dayOfYear().get() - 1), 366)
case TimePeriod.HourOfDay => (datetime.map(_.hourOfDay().get()), 24)
case TimePeriod.MonthOfYear => (datetime.map(_.monthOfYear().get() - 1), 12)
case TimePeriod.WeekOfMonth => (
datetime.map(x => x.weekOfWeekyear().get() - x.withDayOfMonth(1).weekOfWeekyear().get()), 6)
case TimePeriod.WeekOfYear => (datetime.map(_.weekOfWeekyear().get() - 1), 53)
def convertToBin(timestamp: Long, timePeriodDesired: TimePeriod): Double =
getPeriodWithSize(timestamp, timePeriodDesired)._1

def convertToRandians(timestamp: Option[Long], timePeriodDesired: TimePeriod): Array[Double] =
timestamp.map { ts =>
val (timePeriod, periodSize) = getPeriodWithSize(ts, timePeriodDesired)
val radians = (2 * math.Pi * timePeriod) / periodSize
Array(math.cos(radians), math.sin(radians))
}.getOrElse(Array(0.0, 0.0))

private def getPeriodWithSize(timestamp: Long, timePeriod: TimePeriod): (Double, Int) = {
val dt = new JDateTime(timestamp).withZone(DateTimeZone.UTC)
timePeriod match {
case TimePeriod.DayOfMonth => (dt.dayOfMonth.get.toDouble - 1, 31)
case TimePeriod.DayOfWeek => (dt.dayOfWeek.get.toDouble - 1, 7)
case TimePeriod.DayOfYear => (dt.dayOfYear.get.toDouble - 1, 366)
case TimePeriod.HourOfDay => (dt.hourOfDay.get.toDouble, 24)
case TimePeriod.MonthOfYear => (dt.monthOfYear.get.toDouble - 1, 12)
case TimePeriod.WeekOfMonth =>
((dt.weekOfWeekyear.get - dt.withDayOfMonth(1).weekOfWeekyear.get).toDouble, 6)
case TimePeriod.WeekOfYear => (dt.weekOfWeekyear.get.toDouble - 1, 53)
}
val radians = timePeriod.map(2 * math.Pi * _ / periodSize)
radians.map(r => Array(math.cos(r), math.sin(r))).getOrElse(Array(0.0, 0.0))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,22 @@

package com.salesforce.op.filters

import scala.math.round

import com.salesforce.op.features.types._
import com.salesforce.op.features.{FeatureBuilder, OPFeature, TransientFeature}
import com.salesforce.op.stages.impl.feature.TimePeriod
import com.salesforce.op.stages.impl.preparators.CorrelationType
import com.salesforce.op.test.TestSparkContext
import com.salesforce.op.test.{Passenger, PassengerSparkFixtureTest}
import com.twitter.algebird.Monoid._
import com.twitter.algebird.Operators._
import org.apache.spark.mllib.linalg.{Matrix, Vector}
import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner

@RunWith(classOf[JUnitRunner])
class PreparedFeaturesTest extends FlatSpec with TestSparkContext {
class PreparedFeaturesTest extends FlatSpec with PassengerSparkFixtureTest {

val responseKey1: FeatureKey = "Response1" -> None
val responseKey2: FeatureKey = "Response2" -> None
Expand Down Expand Up @@ -72,6 +73,7 @@ class PreparedFeaturesTest extends FlatSpec with TestSparkContext {
val allPredictorKeys1 = Array(predictorKey1, predictorKey2A, predictorKey2B)
val allPredictorKeys2 = Array(predictorKey1)


Spec[PreparedFeatures] should "produce correct summaries" in {
val (responseSummaries1, predictorSummaries1) = preparedFeatures1.summaries
val (responseSummaries2, predictorSummaries2) = preparedFeatures2.summaries
Expand Down Expand Up @@ -157,13 +159,56 @@ class PreparedFeaturesTest extends FlatSpec with TestSparkContext {
testCorrMatrix(allResponseKeys2, CorrelationType.Spearman, expected)
}

it should "transform dates for each period" in {
val expectedBins = Map(
TimePeriod.DayOfMonth -> 17.0,
TimePeriod.DayOfWeek -> 6.0,
TimePeriod.DayOfYear -> 17.0,
TimePeriod.HourOfDay -> 0.0,
TimePeriod.MonthOfYear -> 0.0,
TimePeriod.WeekOfMonth -> 2.0,
TimePeriod.WeekOfYear -> 2.0
)
expectedBins.keys should contain theSameElementsAs TimePeriod.values

val dateMap = FeatureBuilder.DateMap[Passenger]
.extract(p => Map("DTMap" -> p.getBoarded.toLong).toDateMap).asPredictor

val dateFeatures: Array[OPFeature] = Array(boarded, boardedTime, boardedTimeAsDateTime, dateMap)
val dateDataFrame: DataFrame = dataReader.generateDataFrame(dateFeatures).persist()

for {
(period, expectedBin) <- expectedBins
} {
def createExpectedDateMap(d: Double, aggregates: Int): Map[FeatureKey, ProcessedSeq] = Map(
(boarded.name, None) -> Right((0 until aggregates).map(_ => d).toList),
(boardedTime.name, None) -> Right(List(d)),
(boardedTimeAsDateTime.name, None) -> Right(List(d)),
(dateMap.name, Option("DTMap")) -> Right(List(d)))

val res = dateDataFrame.rdd
.map(PreparedFeatures(_, Array.empty, dateFeatures.map(TransientFeature(_)), Option(period)))
.map(_.predictors.mapValues(_.right.map(_.toList)))
.collect()

val expectedResults: Seq[Map[FeatureKey, ProcessedSeq]] =
// The first observation is expected to be aggregated twice
Seq(createExpectedDateMap(expectedBin, 2)) ++
Seq.fill(4)(expectedBin).map(createExpectedDateMap(_, 1)) ++
Seq(Map[FeatureKey, ProcessedSeq]())

withClue(s"Computed bin for $period period does not match:\n") {
res should contain theSameElementsAs expectedResults
}
}
}

def testCorrMatrix(
responseKeys: Array[FeatureKey],
correlationType: CorrelationType,
expectedResult: Seq[Array[Double]]
): Unit = {
val corrRDD =
sc.parallelize(allPreparedFeatures.map(_.getNullLabelLeakageVector(responseKeys, allPredictorKeys1)))
val corrRDD = sc.parallelize(allPreparedFeatures.map(_.getNullLabelLeakageVector(responseKeys, allPredictorKeys1)))
val corrMatrix = Statistics.corr(corrRDD, correlationType.sparkName)

corrMatrix.colIter.zipWithIndex.map { case(vec, idx) =>
Expand Down

0 comments on commit 3e84f4d

Please sign in to comment.