Skip to content

Commit

Permalink
Keep raw feature distributions calculated in raw feature filter (#76)
Browse files Browse the repository at this point in the history
  • Loading branch information
leahmcguire committed Aug 22, 2018
1 parent aac006a commit 54efd8c
Show file tree
Hide file tree
Showing 13 changed files with 209 additions and 24 deletions.
16 changes: 10 additions & 6 deletions core/src/main/scala/com/salesforce/op/OpWorkflow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,13 @@
package com.salesforce.op

import com.salesforce.op.features.OPFeature
import com.salesforce.op.filters.RawFeatureFilter
import com.salesforce.op.filters.{FeatureDistribution, RawFeatureFilter}
import com.salesforce.op.readers.Reader
import com.salesforce.op.stages.OPStage
import com.salesforce.op.stages.impl.preparators.CorrelationType
import com.salesforce.op.stages.impl.selector.ModelSelector
import com.salesforce.op.stages.impl.selector.ModelSelectorNames.{EstimatorType, ModelType}
import com.salesforce.op.utils.spark.RichDataset._
import com.salesforce.op.utils.reflection.ReflectionUtils
import com.salesforce.op.utils.spark.RichDataset._
import com.salesforce.op.utils.stages.FitStagesUtil
import com.salesforce.op.utils.stages.FitStagesUtil.{CutDAG, FittedDAG, Layer, StagesDAG}
import org.apache.spark.annotation.Experimental
Expand Down Expand Up @@ -107,8 +106,9 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
/**
* Will set the blacklisted features variable and if list is non-empty it will
* @param features list of features to blacklist
* @param distributions feature distributions calculated in raw feature filter
*/
private[op] def setBlacklist(features: Array[OPFeature]): Unit = {
private[op] def setBlacklist(features: Array[OPFeature], distributions: Seq[FeatureDistribution]): Unit = {
blacklistedFeatures = features
if (blacklistedFeatures.nonEmpty) {
val allBlacklisted: MList[OPFeature] = MList(getBlacklist(): _*)
Expand All @@ -125,7 +125,9 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
// for each stage remove anything blacklisted from the inputs and update any changed input features
initialStages.foreach { stg =>
val inFeatures = stg.getInputFeatures()
val blacklistRemoved = inFeatures.filterNot{ f => allBlacklisted.exists(bl => bl.sameOrigin(f)) }
val blacklistRemoved = inFeatures
.filterNot{ f => allBlacklisted.exists(bl => bl.sameOrigin(f)) }
.map{ f => if (f.isRaw) f.withDistributions(distributions.collect{ case d if d.name == f.name => d }) else f }
val inputsChanged = blacklistRemoved.map{ f => allUpdated.find(u => u.sameOrigin(f)).getOrElse(f) }
val oldOutput = stg.getOutput()
Try{
Expand Down Expand Up @@ -229,7 +231,8 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
}
checkReadersAndFeatures()
val filteredRawData = rf.generateFilteredRaw(rawFeatures, parameters)
setBlacklist(filteredRawData.featuresToDrop)
setRawFeatureDistributions(filteredRawData.featureDistributions.toArray)
setBlacklist(filteredRawData.featuresToDrop, filteredRawData.featureDistributions)
setBlacklistMapKeys(filteredRawData.mapKeysToDrop)
filteredRawData.cleanedData
}
Expand Down Expand Up @@ -341,6 +344,7 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
.setParameters(getParameters())
.setBlacklist(getBlacklist())
.setBlacklistMapKeys(getBlacklistMapKeys())
.setRawFeatureDistributions(getRawFeatureDistributions())

reader.map(model.setReader).getOrElse(model)
}
Expand Down
16 changes: 16 additions & 0 deletions core/src/main/scala/com/salesforce/op/OpWorkflowCore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import com.salesforce.op.utils.stages.FitStagesUtil._
import com.salesforce.op.utils.stages.FitStagesUtil
import com.salesforce.op.features.OPFeature
import com.salesforce.op.features.types.FeatureType
import com.salesforce.op.filters.FeatureDistribution
import com.salesforce.op.readers.{CustomReader, Reader, ReaderKey}
import com.salesforce.op.stages.{FeatureGeneratorStage, OPStage, OpTransformer}
import com.salesforce.op.utils.spark.RichDataset._
Expand Down Expand Up @@ -70,8 +71,12 @@ private[op] trait OpWorkflowCore {
// features that have been blacklisted from use in dag
private[op] var blacklistedFeatures: Array[OPFeature] = Array[OPFeature]()

// map keys that were blacklisted from use in dag
private[op] var blacklistedMapKeys: Map[String, Set[String]] = Map[String, Set[String]]()

// raw feature distributions calculated in raw feature filter
private[op] var rawFeatureDistributions: Array[FeatureDistribution] = Array[FeatureDistribution]()

// stages of the workflow
private[op] var stages: Array[OPStage] = Array[OPStage]()

Expand All @@ -88,6 +93,11 @@ private[op] trait OpWorkflowCore {
this
}

private[op] final def setRawFeatureDistributions(distributions: Array[FeatureDistribution]): this.type = {
rawFeatureDistributions = distributions
this
}

/**
* :: Experimental ::
* Decides whether the cross-validation/train-validation-split will be done at workflow level
Expand Down Expand Up @@ -184,6 +194,12 @@ private[op] trait OpWorkflowCore {
*/
final def getParameters(): OpParams = parameters

/**
* Get raw feature distribution information computed during raw feature filter
* @return sequence of feature distribution information
*/
final def getRawFeatureDistributions(): Array[FeatureDistribution] = rawFeatureDistributions

/**
* Determine if any of the raw features do not have a matching reader
*/
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/scala/com/salesforce/op/OpWorkflowModelReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import com.salesforce.op.OpWorkflowModelReadWriteShared.FieldNames._
import com.salesforce.op.features.{FeatureJsonHelper, OPFeature, TransientFeature}
import com.salesforce.op.stages.{OpPipelineStageReader, _}
import OpPipelineStageReadWriteShared._
import com.salesforce.op.filters.FeatureDistribution
import com.salesforce.op.utils.json.JsonUtils
import org.apache.spark.ml.util.MLReader
import org.json4s.JsonAST.{JArray, JNothing, JValue}
import org.json4s.jackson.JsonMethods.parse
Expand Down Expand Up @@ -86,11 +88,13 @@ class OpWorkflowModelReader(val workflow: OpWorkflow) extends MLReader[OpWorkflo
model <- Try(new OpWorkflowModel(uid = (json \ Uid.entryName).extract[String], trainParams))
(stages, resultFeatures) <- Try(resolveFeaturesAndStages(json, path))
blacklist <- Try(resolveBlacklist(json))
distributions <- resolveRawFeatureDistributions(json)
} yield model
.setStages(stages.filterNot(_.isInstanceOf[FeatureGeneratorStage[_, _]]))
.setFeatures(resultFeatures)
.setParameters(params)
.setBlacklist(blacklist)
.setRawFeatureDistributions(distributions)
}

private def resolveBlacklist(json: JValue): Array[OPFeature] = {
Expand Down Expand Up @@ -151,5 +155,13 @@ class OpWorkflowModelReader(val workflow: OpWorkflow) extends MLReader[OpWorkflo
}
}

private def resolveRawFeatureDistributions(json: JValue): Try[Array[FeatureDistribution]] = {
if ((json \ RawFeatureDistributions.entryName) != JNothing) { // for backwards compatibility
val distString = (json \ RawFeatureDistributions.entryName).extract[String]
JsonUtils.fromString[Array[FeatureDistribution]](distString)
} else {
Success(Array.empty[FeatureDistribution])
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ package com.salesforce.op

import com.salesforce.op.features.FeatureJsonHelper
import com.salesforce.op.stages.{OpPipelineStageBase, OpPipelineStageWriter}
import com.salesforce.op.utils.json.JsonUtils
import enumeratum._
import org.apache.hadoop.fs.Path
import org.apache.spark.ml.util.MLWriter
Expand Down Expand Up @@ -79,7 +80,9 @@ class OpWorkflowModelWriter(val model: OpWorkflowModel) extends MLWriter {
(FN.Stages.entryName -> stagesJArray(path)) ~
(FN.AllFeatures.entryName -> allFeaturesJArray) ~
(FN.Parameters.entryName -> model.parameters.toJson(pretty = false)) ~
(FN.TrainParameters.entryName -> model.trainingParams.toJson(pretty = false))
(FN.TrainParameters.entryName -> model.trainingParams.toJson(pretty = false)) ~
(FN.RawFeatureDistributions.entryName -> JsonUtils.toJsonString(model.getRawFeatureDistributions(),
pretty = false))
}

private def resultFeaturesJArray(): JArray =
Expand Down Expand Up @@ -136,6 +139,7 @@ private[op] object OpWorkflowModelReadWriteShared {
case object AllFeatures extends FieldNames("allFeatures")
case object Parameters extends FieldNames("parameters")
case object TrainParameters extends FieldNames("trainParameters")
case object RawFeatureDistributions extends FieldNames("rawFeatureDistributions")
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

package com.salesforce.op.filters

import com.salesforce.op.features.FeatureDistributionLike
import com.salesforce.op.stages.impl.feature.{Inclusion, NumericBucketizer}
import com.twitter.algebird.Semigroup
import com.twitter.algebird.Monoid._
Expand All @@ -55,7 +56,7 @@ case class FeatureDistribution
nulls: Long,
distribution: Array[Double],
summaryInfo: Array[Double]
) {
) extends FeatureDistributionLike {

/**
* Get feature key associated to this distribution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ class RawFeatureFilter[T]
.reduce(_ + _) // NOTE: resolved semigroup is IndexedSeqSemigroup
val correlationInfo: Map[FeatureKey, Map[FeatureKey, Double]] =
allFeatureInfo.map(_.correlationInfo).getOrElse {
val emptyCorr: Map[FeatureKey, Map[FeatureKey, Double]] = Map()
val responseKeys: Array[FeatureKey] = responseSummariesArr.map(_._1)
val predictorKeys: Array[FeatureKey] = predictorSummariesArr.map(_._1)
val corrRDD: RDD[Vector] = preparedFeatures.map(_.getNullLabelLeakageVector(responseKeys, predictorKeys))
Expand Down Expand Up @@ -254,14 +253,13 @@ class RawFeatureFilter[T]
* @return dataframe that has had bad features and bad map keys removed and a list of all features that should be
* dropped from the DAG
*/
// TODO return distribution information to attach to features that are kept
def generateFilteredRaw(rawFeatures: Array[OPFeature], parameters: OpParams)
(implicit spark: SparkSession): FilteredRawData = {

val trainData = trainingReader.generateDataFrame(rawFeatures, parameters).persist()
log.info("Loaded training data")
assert(trainData.count() > 0, "RawFeatureFilter cannot work with empty training data")
val trainingSummary = computeFeatureStats(trainData, rawFeatures) // TODO also response summaries??
val trainingSummary = computeFeatureStats(trainData, rawFeatures)
log.info("Computed summary stats for training features")
log.debug(trainingSummary.predictorDistributions.mkString("\n"))

Expand All @@ -276,7 +274,7 @@ class RawFeatureFilter[T]
}

val scoringSummary = scoreData.map{ sd =>
val ss = computeFeatureStats(sd, rawFeatures, Some(trainingSummary)) // TODO also response summaries??
val ss = computeFeatureStats(sd, rawFeatures, Some(trainingSummary))
log.info("Computed summary stats for scoring features")
log.debug(ss.predictorDistributions.mkString("\n"))
ss
Expand Down Expand Up @@ -307,7 +305,8 @@ class RawFeatureFilter[T]
trainData.unpersist()
scoreData.map(_.unpersist())

FilteredRawData(cleanedData, featuresToDrop, mapKeysToDrop)
FilteredRawData(cleanedData, featuresToDrop, mapKeysToDrop,
trainingSummary.responseDistributions ++ trainingSummary.predictorDistributions)
}
}

Expand All @@ -316,10 +315,12 @@ class RawFeatureFilter[T]
* @param cleanedData RFF cleaned data
* @param featuresToDrop raw features dropped by RFF
* @param mapKeysToDrop keys in map features dropped by RFF
* @param featureDistributions the feature distributions calculated from the training data
*/
case class FilteredRawData
(
cleanedData: DataFrame,
featuresToDrop: Array[OPFeature],
mapKeysToDrop: Map[String, Set[String]]
mapKeysToDrop: Map[String, Set[String]],
featureDistributions: Seq[FeatureDistribution]
)
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import java.io.File
import com.salesforce.op.OpWorkflowModelReadWriteShared.FieldNames._
import com.salesforce.op.features.OPFeature
import com.salesforce.op.features.types.{Real, RealNN}
import com.salesforce.op.filters.FeatureDistribution
import com.salesforce.op.readers.{AggregateAvroReader, DataReaders}
import com.salesforce.op.stages.OPStage
import com.salesforce.op.stages.sparkwrappers.generic.SwUnaryEstimator
Expand Down Expand Up @@ -83,11 +84,15 @@ class OpWorkflowModelReaderWriterTest
aggregateParams = null
)

val distributions = Array(FeatureDistribution("a", None, 1L, 1L, Array(1.0), Array(1.0)),
FeatureDistribution("b", Option("b"), 2L, 2L, Array(2.0), Array(2.0)))

def makeDummyModel(wf: OpWorkflow): OpWorkflowModel = {
val model = new OpWorkflowModel(wf.uid, wf.parameters)
.setStages(wf.stages)
.setFeatures(wf.resultFeatures)
.setParameters(wf.parameters)
.setRawFeatureDistributions(distributions)

model.setReader(wf.reader.get)
}
Expand All @@ -105,6 +110,7 @@ class OpWorkflowModelReaderWriterTest
.setReader(dummyReader)
.setResultFeatures(density)
.setParameters(workflowParams)
.setRawFeatureDistributions(distributions)
val (wfM, jsonModel) = makeModelAndJson(wf)
}

Expand All @@ -116,6 +122,7 @@ class OpWorkflowModelReaderWriterTest
.setReader(dummyReader)
.setResultFeatures(density, weight2)
.setParameters(workflowParams)
.setRawFeatureDistributions(distributions)
val (wfM, jsonModel) = makeModelAndJson(wf)
}

Expand All @@ -124,6 +131,7 @@ class OpWorkflowModelReaderWriterTest
.setReader(dummyReader)
.setResultFeatures(weight)
.setParameters(workflowParams)
.setRawFeatureDistributions(distributions)
val (wfM, jsonModel) = makeModelAndJson(wf)
}

Expand All @@ -140,6 +148,7 @@ class OpWorkflowModelReaderWriterTest
.setParameters(workflowParams)
.setReader(dummyReader)
.setResultFeatures(scaled)
.setRawFeatureDistributions(distributions)
val (wfM, jsonModel) = makeModelAndJson(wf)
}

Expand Down Expand Up @@ -262,7 +271,7 @@ class OpWorkflowModelReaderWriterTest
compareWorkflowModels(wfM, wfMR)
}

it should "load a workflow model that has a RawFeatureFiler and a different workflow" in new VectorizedFlow {
it should "load a workflow model that has a RawFeatureFilter and a different workflow" in new VectorizedFlow {
val wfM = wf.loadModel(saveFlowPathStable)
wf.getResultFeatures().head.name shouldBe wfM.getResultFeatures().head.name
wf.getResultFeatures().head.history().originFeatures should contain theSameElementsAs
Expand Down Expand Up @@ -304,6 +313,7 @@ class OpWorkflowModelReaderWriterTest
compareFeatures(wf1.blacklistedFeatures, wf2.blacklistedFeatures)
compareFeatures(wf1.rawFeatures, wf2.rawFeatures)
compareStages(wf1.stages, wf2.stages)
compareDistributions(wf1.getRawFeatureDistributions(), wf2.getRawFeatureDistributions())
}

def compareWorkflowModels(wf1: OpWorkflowModel, wf2: OpWorkflowModel): Unit = {
Expand All @@ -314,13 +324,26 @@ class OpWorkflowModelReaderWriterTest
compareFeatures(wf1.blacklistedFeatures, wf2.blacklistedFeatures)
compareFeatures(wf1.rawFeatures, wf2.rawFeatures)
compareStages(wf1.stages, wf2.stages)
compareDistributions(wf1.getRawFeatureDistributions(), wf2.getRawFeatureDistributions())
}

def compareParams(p1: OpParams, p2: OpParams): Unit = {
p1.stageParams shouldBe p2.stageParams
p1.readerParams.toString() shouldBe p2.readerParams.toString()
p1.customParams shouldBe p2.customParams
}

def compareDistributions(d1: Array[FeatureDistribution], d2: Array[FeatureDistribution]): Unit = {
d1.zip(d2)
.foreach{ case (a, b) =>
a.name shouldEqual b.name
a.key shouldEqual b.key
a.count shouldEqual b.count
a.nulls shouldEqual b.nulls
a.distribution shouldEqual b.distribution
a.summaryInfo shouldEqual b.summaryInfo
}
}
}

trait UIDReset {
Expand Down
16 changes: 13 additions & 3 deletions core/src/test/scala/com/salesforce/op/OpWorkflowTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,16 @@ class OpWorkflowTest extends FlatSpec with PassengerSparkFixtureTest {
Array(age, boarded, booleanMap, description, gender, height, numericMap, stringMap, survived, weight)

val blacklist: Array[OPFeature] = Array(age, gender, description, stringMap, numericMap)
wf.setBlacklist(blacklist)
wf.setBlacklist(blacklist, Seq.empty)
wf.getBlacklist() should contain theSameElementsAs blacklist
wf.rawFeatures should contain theSameElementsAs
Array(boarded, booleanMap, height, survived, weight)
wf.getResultFeatures().flatMap(_.rawFeatures).distinct.sortBy(_.name) should contain theSameElementsAs
Array(boarded, booleanMap, height, survived, weight)
}

it should "correctly allow you to interact with updated features when things are blacklisted" in {
it should "allow you to interact with updated features when things are blacklisted and" +
" features should have distributions" in {
val fv = Seq(age, gender, height, weight, description, boarded, stringMap, numericMap, booleanMap).transmogrify()
val survivedNum = survived.occurs()
val checked = survivedNum.sanityCheck(fv)
Expand All @@ -165,6 +166,15 @@ class OpWorkflowTest extends FlatSpec with PassengerSparkFixtureTest {
protectedFeatures = Array(height, weight))

val wfM = wf.train()
wf.rawFeatures.foreach{ f =>
f.distributions.nonEmpty shouldBe true
f.name shouldEqual f.distributions.head.name
}
wfM.rawFeatures.foreach{ f =>
f.distributions.nonEmpty shouldBe true
f.name shouldEqual f.distributions.head.name
}
wf.getRawFeatureDistributions().length shouldBe 13
val data = wfM.score()
data.schema.fields.size shouldBe 3
val Array(whyNotNormed2, prob2) = wfM.getUpdatedFeatures(Array(whyNotNormed, pred))
Expand All @@ -180,7 +190,7 @@ class OpWorkflowTest extends FlatSpec with PassengerSparkFixtureTest {
.withRawFeatureFilter(Option(dataReader), None)

val error = intercept[RuntimeException](
wf.setBlacklist(Array(age, gender, height, description, stringMap, numericMap))
wf.setBlacklist(Array(age, gender, height, description, stringMap, numericMap), Seq.empty)
)
error.getMessage.contains("creation of required result feature (height-weight_4-stagesApplied_Real")
}
Expand Down
Loading

0 comments on commit 54efd8c

Please sign in to comment.