Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep raw feature distributions calculated in raw feature filter #76

Merged
merged 7 commits into from
Aug 22, 2018
14 changes: 8 additions & 6 deletions core/src/main/scala/com/salesforce/op/OpWorkflow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,13 @@
package com.salesforce.op

import com.salesforce.op.features.OPFeature
import com.salesforce.op.filters.RawFeatureFilter
import com.salesforce.op.filters.{FeatureDistribution, RawFeatureFilter}
import com.salesforce.op.readers.Reader
import com.salesforce.op.stages.OPStage
import com.salesforce.op.stages.impl.preparators.CorrelationType
import com.salesforce.op.stages.impl.selector.ModelSelector
import com.salesforce.op.stages.impl.selector.ModelSelectorNames.{EstimatorType, ModelType}
import com.salesforce.op.utils.spark.RichDataset._
import com.salesforce.op.utils.reflection.ReflectionUtils
import com.salesforce.op.utils.spark.RichDataset._
import com.salesforce.op.utils.stages.FitStagesUtil
import com.salesforce.op.utils.stages.FitStagesUtil.{CutDAG, FittedDAG, Layer, StagesDAG}
import org.apache.spark.annotation.Experimental
Expand Down Expand Up @@ -108,7 +107,7 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
* Will set the blacklisted features variable and if list is non-empty it will
* @param features list of features to blacklist
*/
private[op] def setBlacklist(features: Array[OPFeature]): Unit = {
private[op] def setBlacklist(features: Array[OPFeature], distributions: Seq[FeatureDistribution]): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update doc.

blacklistedFeatures = features
if (blacklistedFeatures.nonEmpty) {
val allBlacklisted: MList[OPFeature] = MList(getBlacklist(): _*)
Expand All @@ -125,7 +124,9 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
// for each stage remove anything blacklisted from the inputs and update any changed input features
initialStages.foreach { stg =>
val inFeatures = stg.getInputFeatures()
val blacklistRemoved = inFeatures.filterNot{ f => allBlacklisted.exists(bl => bl.sameOrigin(f)) }
val blacklistRemoved = inFeatures
.filterNot{ f => allBlacklisted.exists(bl => bl.sameOrigin(f)) }
.map{ f => if (f.isRaw) f.withDistributions(distributions.collect{ case d if d.name == f.name => d }) else f }
val inputsChanged = blacklistRemoved.map{ f => allUpdated.find(u => u.sameOrigin(f)).getOrElse(f) }
val oldOutput = stg.getOutput()
Try{
Expand Down Expand Up @@ -229,7 +230,8 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
}
checkReadersAndFeatures()
val filteredRawData = rf.generateFilteredRaw(rawFeatures, parameters)
setBlacklist(filteredRawData.featuresToDrop)
setRawFeatureDistributions(filteredRawData.featureDistributions.toArray)
setBlacklist(filteredRawData.featuresToDrop, filteredRawData.featureDistributions)
setBlacklistMapKeys(filteredRawData.mapKeysToDrop)
filteredRawData.cleanedData
}
Expand Down
16 changes: 16 additions & 0 deletions core/src/main/scala/com/salesforce/op/OpWorkflowCore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import com.salesforce.op.utils.stages.FitStagesUtil._
import com.salesforce.op.utils.stages.FitStagesUtil
import com.salesforce.op.features.OPFeature
import com.salesforce.op.features.types.FeatureType
import com.salesforce.op.filters.FeatureDistribution
import com.salesforce.op.readers.{CustomReader, Reader, ReaderKey}
import com.salesforce.op.stages.{FeatureGeneratorStage, OPStage, OpTransformer}
import com.salesforce.op.utils.spark.RichDataset._
Expand Down Expand Up @@ -70,8 +71,12 @@ private[op] trait OpWorkflowCore {
// features that have been blacklisted from use in dag
private[op] var blacklistedFeatures: Array[OPFeature] = Array[OPFeature]()

// map keys that were blacklisted from use in dag
private[op] var blacklistedMapKeys: Map[String, Set[String]] = Map[String, Set[String]]()

// raw feature distributions calculated in raw feature filter
private[op] var rawFeatureDistributions: Array[FeatureDistribution] = Array[FeatureDistribution]()

// stages of the workflow
private[op] var stages: Array[OPStage] = Array[OPStage]()

Expand All @@ -88,6 +93,11 @@ private[op] trait OpWorkflowCore {
this
}

private[op] final def setRawFeatureDistributions(distributions: Array[FeatureDistribution]): this.type = {
rawFeatureDistributions = distributions
this
}

/**
* :: Experimental ::
* Decides whether the cross-validation/train-validation-split will be done at workflow level
Expand Down Expand Up @@ -184,6 +194,12 @@ private[op] trait OpWorkflowCore {
*/
final def getParameters(): OpParams = parameters

/**
* Get raw feature distribution information computed during raw feature filter
* @return sequence of feature distribution information
*/
final def getRawFeatureDistributions(): Array[FeatureDistribution] = rawFeatureDistributions

/**
* Determine if any of the raw features do not have a matching reader
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

package com.salesforce.op.filters

import com.salesforce.op.features.FeatureDistributionBase
import com.salesforce.op.stages.impl.feature.{Inclusion, NumericBucketizer}
import com.twitter.algebird.Semigroup
import com.twitter.algebird.Monoid._
Expand All @@ -55,7 +56,7 @@ case class FeatureDistribution
nulls: Long,
distribution: Array[Double],
summaryInfo: Array[Double]
) {
) extends FeatureDistributionBase {

/**
* Get feature key associated to this distribution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ class RawFeatureFilter[T]
.reduce(_ + _) // NOTE: resolved semigroup is IndexedSeqSemigroup
val correlationInfo: Map[FeatureKey, Map[FeatureKey, Double]] =
allFeatureInfo.map(_.correlationInfo).getOrElse {
val emptyCorr: Map[FeatureKey, Map[FeatureKey, Double]] = Map()
val responseKeys: Array[FeatureKey] = responseSummariesArr.map(_._1)
val predictorKeys: Array[FeatureKey] = predictorSummariesArr.map(_._1)
val corrRDD: RDD[Vector] = preparedFeatures.map(_.getNullLabelLeakageVector(responseKeys, predictorKeys))
Expand Down Expand Up @@ -254,14 +253,13 @@ class RawFeatureFilter[T]
* @return dataframe that has had bad features and bad map keys removed and a list of all features that should be
* dropped from the DAG
*/
// TODO return distribution information to attach to features that are kept
def generateFilteredRaw(rawFeatures: Array[OPFeature], parameters: OpParams)
(implicit spark: SparkSession): FilteredRawData = {

val trainData = trainingReader.generateDataFrame(rawFeatures, parameters).persist()
log.info("Loaded training data")
assert(trainData.count() > 0, "RawFeatureFilter cannot work with empty training data")
val trainingSummary = computeFeatureStats(trainData, rawFeatures) // TODO also response summaries??
val trainingSummary = computeFeatureStats(trainData, rawFeatures)
log.info("Computed summary stats for training features")
log.debug(trainingSummary.predictorDistributions.mkString("\n"))

Expand All @@ -276,7 +274,7 @@ class RawFeatureFilter[T]
}

val scoringSummary = scoreData.map{ sd =>
val ss = computeFeatureStats(sd, rawFeatures, Some(trainingSummary)) // TODO also response summaries??
val ss = computeFeatureStats(sd, rawFeatures, Some(trainingSummary))
log.info("Computed summary stats for scoring features")
log.debug(ss.predictorDistributions.mkString("\n"))
ss
Expand Down Expand Up @@ -307,7 +305,8 @@ class RawFeatureFilter[T]
trainData.unpersist()
scoreData.map(_.unpersist())

FilteredRawData(cleanedData, featuresToDrop, mapKeysToDrop)
FilteredRawData(cleanedData, featuresToDrop, mapKeysToDrop,
trainingSummary.responseDistributions ++ trainingSummary.predictorDistributions)
}
}

Expand All @@ -316,10 +315,12 @@ class RawFeatureFilter[T]
* @param cleanedData RFF cleaned data
* @param featuresToDrop raw features dropped by RFF
* @param mapKeysToDrop keys in map features dropped by RFF
* @param featureDistributions the feature distributions calculated from the training data
*/
case class FilteredRawData
(
cleanedData: DataFrame,
featuresToDrop: Array[OPFeature],
mapKeysToDrop: Map[String, Set[String]]
mapKeysToDrop: Map[String, Set[String]],
featureDistributions: Seq[FeatureDistribution]
)
16 changes: 13 additions & 3 deletions core/src/test/scala/com/salesforce/op/OpWorkflowTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,16 @@ class OpWorkflowTest extends FlatSpec with PassengerSparkFixtureTest {
Array(age, boarded, booleanMap, description, gender, height, numericMap, stringMap, survived, weight)

val blacklist: Array[OPFeature] = Array(age, gender, description, stringMap, numericMap)
wf.setBlacklist(blacklist)
wf.setBlacklist(blacklist, Seq.empty)
wf.getBlacklist() should contain theSameElementsAs blacklist
wf.rawFeatures should contain theSameElementsAs
Array(boarded, booleanMap, height, survived, weight)
wf.getResultFeatures().flatMap(_.rawFeatures).distinct.sortBy(_.name) should contain theSameElementsAs
Array(boarded, booleanMap, height, survived, weight)
}

it should "correctly allow you to interact with updated features when things are blacklisted" in {
it should "allow you to interact with updated features when things are blacklisted and" +
" features should have distributions" in {
val fv = Seq(age, gender, height, weight, description, boarded, stringMap, numericMap, booleanMap).transmogrify()
val survivedNum = survived.occurs()
val checked = survivedNum.sanityCheck(fv)
Expand All @@ -165,6 +166,15 @@ class OpWorkflowTest extends FlatSpec with PassengerSparkFixtureTest {
protectedFeatures = Array(height, weight))

val wfM = wf.train()
wf.rawFeatures.foreach{ f =>
f.distributions.nonEmpty shouldBe true
f.name shouldEqual f.distributions.head.name
}
wfM.rawFeatures.foreach{ f =>
f.distributions.nonEmpty shouldBe true
f.name shouldEqual f.distributions.head.name
}
wf.getRawFeatureDistributions().length shouldBe 13
val data = wfM.score()
data.schema.fields.size shouldBe 3
val Array(whyNotNormed2, prob2) = wfM.getUpdatedFeatures(Array(whyNotNormed, pred))
Expand All @@ -180,7 +190,7 @@ class OpWorkflowTest extends FlatSpec with PassengerSparkFixtureTest {
.withRawFeatureFilter(Option(dataReader), None)

val error = intercept[RuntimeException](
wf.setBlacklist(Array(age, gender, height, description, stringMap, numericMap))
wf.setBlacklist(Array(age, gender, height, description, stringMap, numericMap), Seq.empty)
)
error.getMessage.contains("creation of required result feature (height-weight_4-stagesApplied_Real")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,26 @@ class FeaturesTest extends WordSpec with PassengerFeaturesTest with TestCommon {
|""".stripMargin
}
}
"with distributions" should {
"make a copy of the feature containing the specified distributions" in {
val distrib = new FeatureDistributionBase {
override val count: Long = 1L
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

override is redundant.

override val nulls: Long = 1L
override val distribution: Array[Double] = Array(0.5)
override val summaryInfo: Array[Double] = Array(0.5)
override val name: String = age.name
override val key: Option[String] = None
}
val newAge = age.withDistributions(Seq(distrib))
newAge.name shouldEqual age.name
newAge.isResponse shouldEqual age.isResponse
newAge.originStage shouldEqual age.originStage
newAge.parents shouldEqual age.parents
newAge.uid shouldEqual age.uid
newAge.distributions.length shouldEqual 1
newAge.distributions.head shouldEqual distrib
}
}
// TODO: test other feature methods

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,10 @@ class RawFeatureFilterTest extends FlatSpec with PassengerSparkFixtureTest with
val params = new OpParams()
val features: Array[OPFeature] =
Array(survived, age, gender, height, weight, description, boarded, stringMap, numericMap, booleanMap)
val FilteredRawData(df, dropped, droppedKeyValue) = getFilter(maxCorrelation).generateFilteredRaw(features, params)
val FilteredRawData(df, dropped, droppedKeyValue, _) =
getFilter(maxCorrelation).generateFilteredRaw(features, params)

dropped should contain theSameElementsAs expectedDropped.toSeq
dropped should contain theSameElementsAs expectedDropped
droppedKeyValue should contain theSameElementsAs expectedDroppedMapKeys
df.schema.fields.map(_.name) should contain theSameElementsAs
DataFrameFieldNames.KeyFieldName +: features.diff(dropped).map(_.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ case class Feature[O <: FeatureType] private[op]
isResponse: Boolean,
originStage: OpPipelineStage[O],
parents: Seq[OPFeature],
uid: String
uid: String,
distributions: Seq[FeatureDistributionBase] = Seq.empty
)(implicit val wtt: WeakTypeTag[O]) extends FeatureLike[O] {

def this(
Expand All @@ -68,7 +69,8 @@ case class Feature[O <: FeatureType] private[op]
isResponse = isResponse,
originStage = originStage,
parents = parents,
uid = FeatureUID(originStage.uid)
uid = FeatureUID(originStage.uid),
distributions = Seq.empty
)(wtt)

/**
Expand All @@ -89,13 +91,22 @@ case class Feature[O <: FeatureType] private[op]
val stage = stagesMap.getOrElse(f.originStage.uid, f.originStage).asInstanceOf[OpPipelineStage[T]]
val newParents = f.parents.map(p => copy[T](p.asInstanceOf[FeatureLike[T]]))
Feature[T](
name = f.name, isResponse = f.isResponse, originStage = stage, parents = newParents, uid = f.uid
name = f.name, isResponse = f.isResponse, originStage = stage, parents = newParents, uid = f.uid,
distributions = f.distributions
)(f.wtt)
}

copy(this)
}

/**
* Takes an a sequence of feature distributions assocaited with the feature
*
* @param distributions Seq of the feature distributions for the feature
* @return A feature with the distributions assocated
*/
override private[op] def withDistributions(distributions: Seq[FeatureDistributionBase]) =
this.copy(distributions = distributions)
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (c) 2017, Salesforce.com, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* * Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package com.salesforce.op.features


/**
* Keeps the distribution information for features
*/
trait FeatureDistributionBase {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leahmcguire why do we need the trait? if we do, let's name it FeatureDistributionLike.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok - It seemed nicer to make the trait than to move all of the classes in raw feature filter here. though I am willing to hear counter arguments :-P

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I missed the part that it's located in a separate project. Makes sense then.
Let's rename it then to match our convention and also lets replace vals in the trait with defs.


/**
* name of the feature
*/
val name: String

/**
* key map key associated with distribution (when the feature is a map)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

key map key -> map key

*/
val key: Option[String]

/**
* total count of feature seen
*/
val count: Long

/**
* number of empties seen in feature
*/
val nulls: Long

/**
* binned counts of feature values (hashed for strings, evently spaced bins for numerics)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

evently -> evenly

*/
val distribution: Array[Double]

/**
* either min and max number of tokens for text data, or number of splits used for bins for numeric data
*/
val summaryInfo: Array[Double]
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ trait FeatureLike[O <: FeatureType] {
*/
val parents: Seq[OPFeature]

/**
* The distribution information of the feature (is a sequence because map features have distribution for each key)
*/
val distributions: Seq[FeatureDistributionBase]

/**
* Weak type tag of the feature type O
*/
Expand Down Expand Up @@ -159,7 +164,8 @@ trait FeatureLike[O <: FeatureType] {
val oid = Option(originStage).map(_.uid).orNull
val pids = parents.map(_.uid).mkString("[", ",", "]")
s"${this.getClass.getSimpleName}(" +
s"name = $name, uid = $uid, isResponse = $isResponse, originStage = $oid, parents = $pids)"
s"name = $name, uid = $uid, isResponse = $isResponse, originStage = $oid, parents = $pids," +
s" distributions = ${distributions})"
}

/**
Expand Down Expand Up @@ -430,4 +436,12 @@ trait FeatureLike[O <: FeatureType] {
*/
private[op] def copyWithNewStages(stages: Array[OPStage]): FeatureLike[O]

/**
* Takes an a sequence of feature distributions assocaited with the feature
*
* @param distributions Seq of the feature distributions for the feature
* @return A feature with the distributions assocated
*/
private[op] def withDistributions(distributions: Seq[FeatureDistributionBase]): FeatureLike[O]

}