Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep raw feature distributions calculated in raw feature filter #76

Merged
merged 7 commits into from
Aug 22, 2018
16 changes: 10 additions & 6 deletions core/src/main/scala/com/salesforce/op/OpWorkflow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,13 @@
package com.salesforce.op

import com.salesforce.op.features.OPFeature
import com.salesforce.op.filters.RawFeatureFilter
import com.salesforce.op.filters.{FeatureDistribution, RawFeatureFilter}
import com.salesforce.op.readers.Reader
import com.salesforce.op.stages.OPStage
import com.salesforce.op.stages.impl.preparators.CorrelationType
import com.salesforce.op.stages.impl.selector.ModelSelector
import com.salesforce.op.stages.impl.selector.ModelSelectorNames.{EstimatorType, ModelType}
import com.salesforce.op.utils.spark.RichDataset._
import com.salesforce.op.utils.reflection.ReflectionUtils
import com.salesforce.op.utils.spark.RichDataset._
import com.salesforce.op.utils.stages.FitStagesUtil
import com.salesforce.op.utils.stages.FitStagesUtil.{CutDAG, FittedDAG, Layer, StagesDAG}
import org.apache.spark.annotation.Experimental
Expand Down Expand Up @@ -107,8 +106,9 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
/**
* Will set the blacklisted features variable and if list is non-empty it will
* @param features list of features to blacklist
* @param distributions feature distributions calculated in raw feature filter
*/
private[op] def setBlacklist(features: Array[OPFeature]): Unit = {
private[op] def setBlacklist(features: Array[OPFeature], distributions: Seq[FeatureDistribution]): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update doc.

blacklistedFeatures = features
if (blacklistedFeatures.nonEmpty) {
val allBlacklisted: MList[OPFeature] = MList(getBlacklist(): _*)
Expand All @@ -125,7 +125,9 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
// for each stage remove anything blacklisted from the inputs and update any changed input features
initialStages.foreach { stg =>
val inFeatures = stg.getInputFeatures()
val blacklistRemoved = inFeatures.filterNot{ f => allBlacklisted.exists(bl => bl.sameOrigin(f)) }
val blacklistRemoved = inFeatures
.filterNot{ f => allBlacklisted.exists(bl => bl.sameOrigin(f)) }
.map{ f => if (f.isRaw) f.withDistributions(distributions.collect{ case d if d.name == f.name => d }) else f }
val inputsChanged = blacklistRemoved.map{ f => allUpdated.find(u => u.sameOrigin(f)).getOrElse(f) }
val oldOutput = stg.getOutput()
Try{
Expand Down Expand Up @@ -229,7 +231,8 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
}
checkReadersAndFeatures()
val filteredRawData = rf.generateFilteredRaw(rawFeatures, parameters)
setBlacklist(filteredRawData.featuresToDrop)
setRawFeatureDistributions(filteredRawData.featureDistributions.toArray)
setBlacklist(filteredRawData.featuresToDrop, filteredRawData.featureDistributions)
setBlacklistMapKeys(filteredRawData.mapKeysToDrop)
filteredRawData.cleanedData
}
Expand Down Expand Up @@ -341,6 +344,7 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
.setParameters(getParameters())
.setBlacklist(getBlacklist())
.setBlacklistMapKeys(getBlacklistMapKeys())
.setRawFeatureDistributions(getRawFeatureDistributions())

reader.map(model.setReader).getOrElse(model)
}
Expand Down
16 changes: 16 additions & 0 deletions core/src/main/scala/com/salesforce/op/OpWorkflowCore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import com.salesforce.op.utils.stages.FitStagesUtil._
import com.salesforce.op.utils.stages.FitStagesUtil
import com.salesforce.op.features.OPFeature
import com.salesforce.op.features.types.FeatureType
import com.salesforce.op.filters.FeatureDistribution
import com.salesforce.op.readers.{CustomReader, Reader, ReaderKey}
import com.salesforce.op.stages.{FeatureGeneratorStage, OPStage, OpTransformer}
import com.salesforce.op.utils.spark.RichDataset._
Expand Down Expand Up @@ -70,8 +71,12 @@ private[op] trait OpWorkflowCore {
// features that have been blacklisted from use in dag
private[op] var blacklistedFeatures: Array[OPFeature] = Array[OPFeature]()

// map keys that were blacklisted from use in dag
private[op] var blacklistedMapKeys: Map[String, Set[String]] = Map[String, Set[String]]()

// raw feature distributions calculated in raw feature filter
private[op] var rawFeatureDistributions: Array[FeatureDistribution] = Array[FeatureDistribution]()

// stages of the workflow
private[op] var stages: Array[OPStage] = Array[OPStage]()

Expand All @@ -88,6 +93,11 @@ private[op] trait OpWorkflowCore {
this
}

private[op] final def setRawFeatureDistributions(distributions: Array[FeatureDistribution]): this.type = {
rawFeatureDistributions = distributions
this
}

/**
* :: Experimental ::
* Decides whether the cross-validation/train-validation-split will be done at workflow level
Expand Down Expand Up @@ -184,6 +194,12 @@ private[op] trait OpWorkflowCore {
*/
final def getParameters(): OpParams = parameters

/**
* Get raw feature distribution information computed during raw feature filter
* @return sequence of feature distribution information
*/
final def getRawFeatureDistributions(): Array[FeatureDistribution] = rawFeatureDistributions

/**
* Determine if any of the raw features do not have a matching reader
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import com.salesforce.op.OpWorkflowModelReadWriteShared.FieldNames._
import com.salesforce.op.features.{FeatureJsonHelper, OPFeature, TransientFeature}
import com.salesforce.op.stages.{OpPipelineStageReader, _}
import OpPipelineStageReadWriteShared._
import com.salesforce.op.filters.FeatureDistribution
import com.salesforce.op.utils.json.JsonUtils
import org.apache.spark.ml.util.MLReader
import org.json4s.JsonAST.{JArray, JNothing, JValue}
import org.json4s.jackson.JsonMethods.parse
Expand Down Expand Up @@ -86,11 +88,13 @@ class OpWorkflowModelReader(val workflow: OpWorkflow) extends MLReader[OpWorkflo
model <- Try(new OpWorkflowModel(uid = (json \ Uid.entryName).extract[String], trainParams))
(stages, resultFeatures) <- Try(resolveFeaturesAndStages(json, path))
blacklist <- Try(resolveBlacklist(json))
distributions <- resolveRawFeatureDistributions(json)
} yield model
.setStages(stages.filterNot(_.isInstanceOf[FeatureGeneratorStage[_, _]]))
.setFeatures(resultFeatures)
.setParameters(params)
.setBlacklist(blacklist)
.setRawFeatureDistributions(distributions)
}

private def resolveBlacklist(json: JValue): Array[OPFeature] = {
Expand Down Expand Up @@ -151,5 +155,13 @@ class OpWorkflowModelReader(val workflow: OpWorkflow) extends MLReader[OpWorkflo
}
}

private def resolveRawFeatureDistributions(json: JValue): Try[Array[FeatureDistribution]] = {
if ((json \ RawFeatureDistributions.entryName) != JNothing) { // for backwards compatibility
val distString = (json \ RawFeatureDistributions.entryName).extract[String]
JsonUtils.fromString[Array[FeatureDistribution]](distString)
} else {
Try(Array.empty[FeatureDistribution])
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ package com.salesforce.op

import com.salesforce.op.features.FeatureJsonHelper
import com.salesforce.op.stages.{OpPipelineStageBase, OpPipelineStageWriter}
import com.salesforce.op.utils.json.JsonUtils
import enumeratum._
import org.apache.hadoop.fs.Path
import org.apache.spark.ml.util.MLWriter
Expand Down Expand Up @@ -79,7 +80,9 @@ class OpWorkflowModelWriter(val model: OpWorkflowModel) extends MLWriter {
(FN.Stages.entryName -> stagesJArray(path)) ~
(FN.AllFeatures.entryName -> allFeaturesJArray) ~
(FN.Parameters.entryName -> model.parameters.toJson(pretty = false)) ~
(FN.TrainParameters.entryName -> model.trainingParams.toJson(pretty = false))
(FN.TrainParameters.entryName -> model.trainingParams.toJson(pretty = false)) ~
(FN.RawFeatureDistributions.entryName -> JsonUtils.toJsonString(model.getRawFeatureDistributions(),
pretty = false))
}

private def resultFeaturesJArray(): JArray =
Expand Down Expand Up @@ -136,6 +139,7 @@ private[op] object OpWorkflowModelReadWriteShared {
case object AllFeatures extends FieldNames("allFeatures")
case object Parameters extends FieldNames("parameters")
case object TrainParameters extends FieldNames("trainParameters")
case object RawFeatureDistributions extends FieldNames("rawFeatureDistributions")
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

package com.salesforce.op.filters

import com.salesforce.op.features.FeatureDistributionLike
import com.salesforce.op.stages.impl.feature.{Inclusion, NumericBucketizer}
import com.twitter.algebird.Semigroup
import com.twitter.algebird.Monoid._
Expand All @@ -55,7 +56,7 @@ case class FeatureDistribution
nulls: Long,
distribution: Array[Double],
summaryInfo: Array[Double]
) {
) extends FeatureDistributionLike {

/**
* Get feature key associated to this distribution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ class RawFeatureFilter[T]
.reduce(_ + _) // NOTE: resolved semigroup is IndexedSeqSemigroup
val correlationInfo: Map[FeatureKey, Map[FeatureKey, Double]] =
allFeatureInfo.map(_.correlationInfo).getOrElse {
val emptyCorr: Map[FeatureKey, Map[FeatureKey, Double]] = Map()
val responseKeys: Array[FeatureKey] = responseSummariesArr.map(_._1)
val predictorKeys: Array[FeatureKey] = predictorSummariesArr.map(_._1)
val corrRDD: RDD[Vector] = preparedFeatures.map(_.getNullLabelLeakageVector(responseKeys, predictorKeys))
Expand Down Expand Up @@ -254,14 +253,13 @@ class RawFeatureFilter[T]
* @return dataframe that has had bad features and bad map keys removed and a list of all features that should be
* dropped from the DAG
*/
// TODO return distribution information to attach to features that are kept
def generateFilteredRaw(rawFeatures: Array[OPFeature], parameters: OpParams)
(implicit spark: SparkSession): FilteredRawData = {

val trainData = trainingReader.generateDataFrame(rawFeatures, parameters).persist()
log.info("Loaded training data")
assert(trainData.count() > 0, "RawFeatureFilter cannot work with empty training data")
val trainingSummary = computeFeatureStats(trainData, rawFeatures) // TODO also response summaries??
val trainingSummary = computeFeatureStats(trainData, rawFeatures)
log.info("Computed summary stats for training features")
log.debug(trainingSummary.predictorDistributions.mkString("\n"))

Expand All @@ -276,7 +274,7 @@ class RawFeatureFilter[T]
}

val scoringSummary = scoreData.map{ sd =>
val ss = computeFeatureStats(sd, rawFeatures, Some(trainingSummary)) // TODO also response summaries??
val ss = computeFeatureStats(sd, rawFeatures, Some(trainingSummary))
log.info("Computed summary stats for scoring features")
log.debug(ss.predictorDistributions.mkString("\n"))
ss
Expand Down Expand Up @@ -307,7 +305,8 @@ class RawFeatureFilter[T]
trainData.unpersist()
scoreData.map(_.unpersist())

FilteredRawData(cleanedData, featuresToDrop, mapKeysToDrop)
FilteredRawData(cleanedData, featuresToDrop, mapKeysToDrop,
trainingSummary.responseDistributions ++ trainingSummary.predictorDistributions)
}
}

Expand All @@ -316,10 +315,12 @@ class RawFeatureFilter[T]
* @param cleanedData RFF cleaned data
* @param featuresToDrop raw features dropped by RFF
* @param mapKeysToDrop keys in map features dropped by RFF
* @param featureDistributions the feature distributions calculated from the training data
*/
case class FilteredRawData
(
cleanedData: DataFrame,
featuresToDrop: Array[OPFeature],
mapKeysToDrop: Map[String, Set[String]]
mapKeysToDrop: Map[String, Set[String]],
featureDistributions: Seq[FeatureDistribution]
)
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import java.io.File
import com.salesforce.op.OpWorkflowModelReadWriteShared.FieldNames._
import com.salesforce.op.features.OPFeature
import com.salesforce.op.features.types.{Real, RealNN}
import com.salesforce.op.filters.FeatureDistribution
import com.salesforce.op.readers.{AggregateAvroReader, DataReaders}
import com.salesforce.op.stages.OPStage
import com.salesforce.op.stages.sparkwrappers.generic.SwUnaryEstimator
Expand Down Expand Up @@ -83,11 +84,15 @@ class OpWorkflowModelReaderWriterTest
aggregateParams = null
)

val distributions = Array(FeatureDistribution("a", None, 1L, 1L, Array(1.0), Array(1.0)),
FeatureDistribution("b", Option("b"), 2L, 2L, Array(2.0), Array(2.0)))

def makeDummyModel(wf: OpWorkflow): OpWorkflowModel = {
val model = new OpWorkflowModel(wf.uid, wf.parameters)
.setStages(wf.stages)
.setFeatures(wf.resultFeatures)
.setParameters(wf.parameters)
.setRawFeatureDistributions(distributions)

model.setReader(wf.reader.get)
}
Expand All @@ -105,6 +110,7 @@ class OpWorkflowModelReaderWriterTest
.setReader(dummyReader)
.setResultFeatures(density)
.setParameters(workflowParams)
.setRawFeatureDistributions(distributions)
val (wfM, jsonModel) = makeModelAndJson(wf)
}

Expand All @@ -116,6 +122,7 @@ class OpWorkflowModelReaderWriterTest
.setReader(dummyReader)
.setResultFeatures(density, weight2)
.setParameters(workflowParams)
.setRawFeatureDistributions(distributions)
val (wfM, jsonModel) = makeModelAndJson(wf)
}

Expand All @@ -124,6 +131,7 @@ class OpWorkflowModelReaderWriterTest
.setReader(dummyReader)
.setResultFeatures(weight)
.setParameters(workflowParams)
.setRawFeatureDistributions(distributions)
val (wfM, jsonModel) = makeModelAndJson(wf)
}

Expand All @@ -140,6 +148,7 @@ class OpWorkflowModelReaderWriterTest
.setParameters(workflowParams)
.setReader(dummyReader)
.setResultFeatures(scaled)
.setRawFeatureDistributions(distributions)
val (wfM, jsonModel) = makeModelAndJson(wf)
}

Expand Down Expand Up @@ -262,7 +271,7 @@ class OpWorkflowModelReaderWriterTest
compareWorkflowModels(wfM, wfMR)
}

it should "load a workflow model that has a RawFeatureFiler and a different workflow" in new VectorizedFlow {
it should "load a workflow model that has a RawFeatureFilter and a different workflow" in new VectorizedFlow {
val wfM = wf.loadModel(saveFlowPathStable)
wf.getResultFeatures().head.name shouldBe wfM.getResultFeatures().head.name
wf.getResultFeatures().head.history().originFeatures should contain theSameElementsAs
Expand Down Expand Up @@ -304,6 +313,7 @@ class OpWorkflowModelReaderWriterTest
compareFeatures(wf1.blacklistedFeatures, wf2.blacklistedFeatures)
compareFeatures(wf1.rawFeatures, wf2.rawFeatures)
compareStages(wf1.stages, wf2.stages)
compareDistributions(wf1.getRawFeatureDistributions(), wf2.getRawFeatureDistributions())
}

def compareWorkflowModels(wf1: OpWorkflowModel, wf2: OpWorkflowModel): Unit = {
Expand All @@ -314,13 +324,26 @@ class OpWorkflowModelReaderWriterTest
compareFeatures(wf1.blacklistedFeatures, wf2.blacklistedFeatures)
compareFeatures(wf1.rawFeatures, wf2.rawFeatures)
compareStages(wf1.stages, wf2.stages)
compareDistributions(wf1.getRawFeatureDistributions(), wf2.getRawFeatureDistributions())
}

def compareParams(p1: OpParams, p2: OpParams): Unit = {
p1.stageParams shouldBe p2.stageParams
p1.readerParams.toString() shouldBe p2.readerParams.toString()
p1.customParams shouldBe p2.customParams
}

def compareDistributions(d1: Array[FeatureDistribution], d2: Array[FeatureDistribution]): Unit = {
d1.zip(d2)
.foreach{ case (a, b) =>
a.name shouldEqual b.name
a.key shouldEqual b.key
a.count shouldEqual b.count
a.nulls shouldEqual b.nulls
a.distribution shouldEqual b.distribution
a.summaryInfo shouldEqual b.summaryInfo
}
}
}

trait UIDReset {
Expand Down
16 changes: 13 additions & 3 deletions core/src/test/scala/com/salesforce/op/OpWorkflowTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,16 @@ class OpWorkflowTest extends FlatSpec with PassengerSparkFixtureTest {
Array(age, boarded, booleanMap, description, gender, height, numericMap, stringMap, survived, weight)

val blacklist: Array[OPFeature] = Array(age, gender, description, stringMap, numericMap)
wf.setBlacklist(blacklist)
wf.setBlacklist(blacklist, Seq.empty)
wf.getBlacklist() should contain theSameElementsAs blacklist
wf.rawFeatures should contain theSameElementsAs
Array(boarded, booleanMap, height, survived, weight)
wf.getResultFeatures().flatMap(_.rawFeatures).distinct.sortBy(_.name) should contain theSameElementsAs
Array(boarded, booleanMap, height, survived, weight)
}

it should "correctly allow you to interact with updated features when things are blacklisted" in {
it should "allow you to interact with updated features when things are blacklisted and" +
" features should have distributions" in {
val fv = Seq(age, gender, height, weight, description, boarded, stringMap, numericMap, booleanMap).transmogrify()
val survivedNum = survived.occurs()
val checked = survivedNum.sanityCheck(fv)
Expand All @@ -165,6 +166,15 @@ class OpWorkflowTest extends FlatSpec with PassengerSparkFixtureTest {
protectedFeatures = Array(height, weight))

val wfM = wf.train()
wf.rawFeatures.foreach{ f =>
f.distributions.nonEmpty shouldBe true
f.name shouldEqual f.distributions.head.name
}
wfM.rawFeatures.foreach{ f =>
f.distributions.nonEmpty shouldBe true
f.name shouldEqual f.distributions.head.name
}
wf.getRawFeatureDistributions().length shouldBe 13
val data = wfM.score()
data.schema.fields.size shouldBe 3
val Array(whyNotNormed2, prob2) = wfM.getUpdatedFeatures(Array(whyNotNormed, pred))
Expand All @@ -180,7 +190,7 @@ class OpWorkflowTest extends FlatSpec with PassengerSparkFixtureTest {
.withRawFeatureFilter(Option(dataReader), None)

val error = intercept[RuntimeException](
wf.setBlacklist(Array(age, gender, height, description, stringMap, numericMap))
wf.setBlacklist(Array(age, gender, height, description, stringMap, numericMap), Seq.empty)
)
error.getMessage.contains("creation of required result feature (height-weight_4-stagesApplied_Real")
}
Expand Down
Loading