From c101cfe8a931479724c15a228558bffcb4d28207 Mon Sep 17 00:00:00 2001 From: leahmcguire Date: Tue, 21 Aug 2018 09:37:09 -0700 Subject: [PATCH 1/4] passing back the rff distributions to be added to the features --- .../main/scala/com/salesforce/op/OpWorkflow.scala | 1 + .../salesforce/op/filters/RawFeatureFilter.scala | 13 +++++++------ 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/OpWorkflow.scala b/core/src/main/scala/com/salesforce/op/OpWorkflow.scala index 380af0e0ae..4011f63260 100644 --- a/core/src/main/scala/com/salesforce/op/OpWorkflow.scala +++ b/core/src/main/scala/com/salesforce/op/OpWorkflow.scala @@ -228,6 +228,7 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore { } checkReadersAndFeatures() val filteredRawData = rf.generateFilteredRaw(rawFeatures, parameters) + // Add distributions to raw features setBlacklist(filteredRawData.featuresToDrop) setBlacklistMapKeys(filteredRawData.mapKeysToDrop) filteredRawData.cleanedData diff --git a/core/src/main/scala/com/salesforce/op/filters/RawFeatureFilter.scala b/core/src/main/scala/com/salesforce/op/filters/RawFeatureFilter.scala index cc5da5dcd4..892ac5d599 100644 --- a/core/src/main/scala/com/salesforce/op/filters/RawFeatureFilter.scala +++ b/core/src/main/scala/com/salesforce/op/filters/RawFeatureFilter.scala @@ -143,7 +143,6 @@ class RawFeatureFilter[T] .reduce(_ + _) // NOTE: resolved semigroup is IndexedSeqSemigroup val correlationInfo: Map[FeatureKey, Map[FeatureKey, Double]] = allFeatureInfo.map(_.correlationInfo).getOrElse { - val emptyCorr: Map[FeatureKey, Map[FeatureKey, Double]] = Map() val responseKeys: Array[FeatureKey] = responseSummariesArr.map(_._1) val predictorKeys: Array[FeatureKey] = predictorSummariesArr.map(_._1) val corrRDD: RDD[Vector] = preparedFeatures.map(_.getNullLabelLeakageVector(responseKeys, predictorKeys)) @@ -256,14 +255,13 @@ class RawFeatureFilter[T] * @return dataframe that has had bad features and bad map keys removed and a list of all features that should be * dropped from the DAG */ - // TODO return distribution information to attach to features that are kept def generateFilteredRaw(rawFeatures: Array[OPFeature], parameters: OpParams) (implicit spark: SparkSession): FilteredRawData = { val trainData = trainingReader.generateDataFrame(rawFeatures, parameters).persist() log.info("Loaded training data") assert(trainData.count() > 0, "RawFeatureFilter cannot work with empty training data") - val trainingSummary = computeFeatureStats(trainData, rawFeatures) // TODO also response summaries?? + val trainingSummary = computeFeatureStats(trainData, rawFeatures) log.info("Computed summary stats for training features") log.debug(trainingSummary.predictorDistributions.mkString("\n")) @@ -278,7 +276,7 @@ class RawFeatureFilter[T] } val scoringSummary = scoreData.map{ sd => - val ss = computeFeatureStats(sd, rawFeatures, Some(trainingSummary)) // TODO also response summaries?? + val ss = computeFeatureStats(sd, rawFeatures, Some(trainingSummary)) log.info("Computed summary stats for scoring features") log.debug(ss.predictorDistributions.mkString("\n")) ss @@ -309,7 +307,8 @@ class RawFeatureFilter[T] trainData.unpersist() scoreData.map(_.unpersist()) - FilteredRawData(cleanedData, featuresToDrop, mapKeysToDrop) + FilteredRawData(cleanedData, featuresToDrop, mapKeysToDrop, + trainingSummary.responseDistributions ++ trainingSummary.predictorDistributions) } } @@ -318,10 +317,12 @@ class RawFeatureFilter[T] * @param cleanedData RFF cleaned data * @param featuresToDrop raw features dropped by RFF * @param mapKeysToDrop keys in map features dropped by RFF + * @param featureDistributions the feature distributions calculated from the training data */ case class FilteredRawData ( cleanedData: DataFrame, featuresToDrop: Array[OPFeature], - mapKeysToDrop: Map[String, Set[String]] + mapKeysToDrop: Map[String, Set[String]], + featureDistributions: Seq[FeatureDistribution] ) From 1cd4f6312c79f3e264266a8b14e1268054d7bdf3 Mon Sep 17 00:00:00 2001 From: leahmcguire Date: Tue, 21 Aug 2018 13:55:31 -0700 Subject: [PATCH 2/4] added distributions calculated to workflow and raw features --- .../scala/com/salesforce/op/OpWorkflow.scala | 15 ++-- .../com/salesforce/op/OpWorkflowCore.scala | 16 +++++ .../op/filters/FeatureDistribution.scala | 3 +- .../com/salesforce/op/OpWorkflowTest.scala | 16 ++++- .../salesforce/op/features/FeaturesTest.scala | 20 ++++++ .../op/filters/RawFeatureFilterTest.scala | 5 +- .../com/salesforce/op/features/Feature.scala | 17 ++++- .../op/features/FeatureDistributionBase.scala | 68 +++++++++++++++++++ .../salesforce/op/features/FeatureLike.scala | 16 ++++- 9 files changed, 159 insertions(+), 17 deletions(-) create mode 100644 features/src/main/scala/com/salesforce/op/features/FeatureDistributionBase.scala diff --git a/core/src/main/scala/com/salesforce/op/OpWorkflow.scala b/core/src/main/scala/com/salesforce/op/OpWorkflow.scala index c3ce9ab192..01bd65b865 100644 --- a/core/src/main/scala/com/salesforce/op/OpWorkflow.scala +++ b/core/src/main/scala/com/salesforce/op/OpWorkflow.scala @@ -31,14 +31,13 @@ package com.salesforce.op import com.salesforce.op.features.OPFeature -import com.salesforce.op.filters.RawFeatureFilter +import com.salesforce.op.filters.{FeatureDistribution, RawFeatureFilter} import com.salesforce.op.readers.Reader import com.salesforce.op.stages.OPStage import com.salesforce.op.stages.impl.preparators.CorrelationType import com.salesforce.op.stages.impl.selector.ModelSelector -import com.salesforce.op.stages.impl.selector.ModelSelectorNames.{EstimatorType, ModelType} -import com.salesforce.op.utils.spark.RichDataset._ import com.salesforce.op.utils.reflection.ReflectionUtils +import com.salesforce.op.utils.spark.RichDataset._ import com.salesforce.op.utils.stages.FitStagesUtil import com.salesforce.op.utils.stages.FitStagesUtil.{CutDAG, FittedDAG, Layer, StagesDAG} import org.apache.spark.annotation.Experimental @@ -108,7 +107,7 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore { * Will set the blacklisted features variable and if list is non-empty it will * @param features list of features to blacklist */ - private[op] def setBlacklist(features: Array[OPFeature]): Unit = { + private[op] def setBlacklist(features: Array[OPFeature], distributions: Seq[FeatureDistribution]): Unit = { blacklistedFeatures = features if (blacklistedFeatures.nonEmpty) { val allBlacklisted: MList[OPFeature] = MList(getBlacklist(): _*) @@ -125,7 +124,9 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore { // for each stage remove anything blacklisted from the inputs and update any changed input features initialStages.foreach { stg => val inFeatures = stg.getInputFeatures() - val blacklistRemoved = inFeatures.filterNot{ f => allBlacklisted.exists(bl => bl.sameOrigin(f)) } + val blacklistRemoved = inFeatures + .filterNot{ f => allBlacklisted.exists(bl => bl.sameOrigin(f)) } + .map{ f => if (f.isRaw) f.withDistributions(distributions.collect{ case d if d.name == f.name => d }) else f } val inputsChanged = blacklistRemoved.map{ f => allUpdated.find(u => u.sameOrigin(f)).getOrElse(f) } val oldOutput = stg.getOutput() Try{ @@ -229,8 +230,8 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore { } checkReadersAndFeatures() val filteredRawData = rf.generateFilteredRaw(rawFeatures, parameters) - // Add distributions to raw features - setBlacklist(filteredRawData.featuresToDrop) + setRawFeatureDistributions(filteredRawData.featureDistributions.toArray) + setBlacklist(filteredRawData.featuresToDrop, filteredRawData.featureDistributions) setBlacklistMapKeys(filteredRawData.mapKeysToDrop) filteredRawData.cleanedData } diff --git a/core/src/main/scala/com/salesforce/op/OpWorkflowCore.scala b/core/src/main/scala/com/salesforce/op/OpWorkflowCore.scala index 54e738dd92..d1cb60f1e9 100644 --- a/core/src/main/scala/com/salesforce/op/OpWorkflowCore.scala +++ b/core/src/main/scala/com/salesforce/op/OpWorkflowCore.scala @@ -34,6 +34,7 @@ import com.salesforce.op.utils.stages.FitStagesUtil._ import com.salesforce.op.utils.stages.FitStagesUtil import com.salesforce.op.features.OPFeature import com.salesforce.op.features.types.FeatureType +import com.salesforce.op.filters.FeatureDistribution import com.salesforce.op.readers.{CustomReader, Reader, ReaderKey} import com.salesforce.op.stages.{FeatureGeneratorStage, OPStage, OpTransformer} import com.salesforce.op.utils.spark.RichDataset._ @@ -70,8 +71,12 @@ private[op] trait OpWorkflowCore { // features that have been blacklisted from use in dag private[op] var blacklistedFeatures: Array[OPFeature] = Array[OPFeature]() + // map keys that were blacklisted from use in dag private[op] var blacklistedMapKeys: Map[String, Set[String]] = Map[String, Set[String]]() + // raw feature distributions calculated in raw feature filter + private[op] var rawFeatureDistributions: Array[FeatureDistribution] = Array[FeatureDistribution]() + // stages of the workflow private[op] var stages: Array[OPStage] = Array[OPStage]() @@ -88,6 +93,11 @@ private[op] trait OpWorkflowCore { this } + private[op] final def setRawFeatureDistributions(distributions: Array[FeatureDistribution]): this.type = { + rawFeatureDistributions = distributions + this + } + /** * :: Experimental :: * Decides whether the cross-validation/train-validation-split will be done at workflow level @@ -184,6 +194,12 @@ private[op] trait OpWorkflowCore { */ final def getParameters(): OpParams = parameters + /** + * Get raw feature distribution information computed during raw feature filter + * @return sequence of feature distribution information + */ + final def getRawFeatureDistributions(): Array[FeatureDistribution] = rawFeatureDistributions + /** * Determine if any of the raw features do not have a matching reader */ diff --git a/core/src/main/scala/com/salesforce/op/filters/FeatureDistribution.scala b/core/src/main/scala/com/salesforce/op/filters/FeatureDistribution.scala index 3797e9a214..3ad7fcc43b 100644 --- a/core/src/main/scala/com/salesforce/op/filters/FeatureDistribution.scala +++ b/core/src/main/scala/com/salesforce/op/filters/FeatureDistribution.scala @@ -30,6 +30,7 @@ package com.salesforce.op.filters +import com.salesforce.op.features.FeatureDistributionBase import com.salesforce.op.stages.impl.feature.{Inclusion, NumericBucketizer} import com.twitter.algebird.Semigroup import com.twitter.algebird.Monoid._ @@ -55,7 +56,7 @@ case class FeatureDistribution nulls: Long, distribution: Array[Double], summaryInfo: Array[Double] -) { +) extends FeatureDistributionBase { /** * Get feature key associated to this distribution diff --git a/core/src/test/scala/com/salesforce/op/OpWorkflowTest.scala b/core/src/test/scala/com/salesforce/op/OpWorkflowTest.scala index aebd227455..8b8cbd8d8c 100644 --- a/core/src/test/scala/com/salesforce/op/OpWorkflowTest.scala +++ b/core/src/test/scala/com/salesforce/op/OpWorkflowTest.scala @@ -140,7 +140,7 @@ class OpWorkflowTest extends FlatSpec with PassengerSparkFixtureTest { Array(age, boarded, booleanMap, description, gender, height, numericMap, stringMap, survived, weight) val blacklist: Array[OPFeature] = Array(age, gender, description, stringMap, numericMap) - wf.setBlacklist(blacklist) + wf.setBlacklist(blacklist, Seq.empty) wf.getBlacklist() should contain theSameElementsAs blacklist wf.rawFeatures should contain theSameElementsAs Array(boarded, booleanMap, height, survived, weight) @@ -148,7 +148,8 @@ class OpWorkflowTest extends FlatSpec with PassengerSparkFixtureTest { Array(boarded, booleanMap, height, survived, weight) } - it should "correctly allow you to interact with updated features when things are blacklisted" in { + it should "allow you to interact with updated features when things are blacklisted and" + + " features should have distributions" in { val fv = Seq(age, gender, height, weight, description, boarded, stringMap, numericMap, booleanMap).transmogrify() val survivedNum = survived.occurs() val checked = survivedNum.sanityCheck(fv) @@ -165,6 +166,15 @@ class OpWorkflowTest extends FlatSpec with PassengerSparkFixtureTest { protectedFeatures = Array(height, weight)) val wfM = wf.train() + wf.rawFeatures.foreach{ f => + f.distributions.nonEmpty shouldBe true + f.name shouldEqual f.distributions.head.name + } + wfM.rawFeatures.foreach{ f => + f.distributions.nonEmpty shouldBe true + f.name shouldEqual f.distributions.head.name + } + wf.getRawFeatureDistributions().length shouldBe 13 val data = wfM.score() data.schema.fields.size shouldBe 3 val Array(whyNotNormed2, prob2) = wfM.getUpdatedFeatures(Array(whyNotNormed, pred)) @@ -180,7 +190,7 @@ class OpWorkflowTest extends FlatSpec with PassengerSparkFixtureTest { .withRawFeatureFilter(Option(dataReader), None) val error = intercept[RuntimeException]( - wf.setBlacklist(Array(age, gender, height, description, stringMap, numericMap)) + wf.setBlacklist(Array(age, gender, height, description, stringMap, numericMap), Seq.empty) ) error.getMessage.contains("creation of required result feature (height-weight_4-stagesApplied_Real") } diff --git a/core/src/test/scala/com/salesforce/op/features/FeaturesTest.scala b/core/src/test/scala/com/salesforce/op/features/FeaturesTest.scala index 537596dad9..18479cebab 100644 --- a/core/src/test/scala/com/salesforce/op/features/FeaturesTest.scala +++ b/core/src/test/scala/com/salesforce/op/features/FeaturesTest.scala @@ -191,6 +191,26 @@ class FeaturesTest extends WordSpec with PassengerFeaturesTest with TestCommon { |""".stripMargin } } + "with distributions" should { + "make a copy of the feature containing the specified distributions" in { + val distrib = new FeatureDistributionBase { + override val count: Long = 1L + override val nulls: Long = 1L + override val distribution: Array[Double] = Array(0.5) + override val summaryInfo: Array[Double] = Array(0.5) + override val name: String = age.name + override val key: Option[String] = None + } + val newAge = age.withDistributions(Seq(distrib)) + newAge.name shouldEqual age.name + newAge.isResponse shouldEqual age.isResponse + newAge.originStage shouldEqual age.originStage + newAge.parents shouldEqual age.parents + newAge.uid shouldEqual age.uid + newAge.distributions.length shouldEqual 1 + newAge.distributions.head shouldEqual distrib + } + } // TODO: test other feature methods } diff --git a/core/src/test/scala/com/salesforce/op/filters/RawFeatureFilterTest.scala b/core/src/test/scala/com/salesforce/op/filters/RawFeatureFilterTest.scala index 32c8d46fe4..fddc2adb51 100644 --- a/core/src/test/scala/com/salesforce/op/filters/RawFeatureFilterTest.scala +++ b/core/src/test/scala/com/salesforce/op/filters/RawFeatureFilterTest.scala @@ -263,9 +263,10 @@ class RawFeatureFilterTest extends FlatSpec with PassengerSparkFixtureTest with val params = new OpParams() val features: Array[OPFeature] = Array(survived, age, gender, height, weight, description, boarded, stringMap, numericMap, booleanMap) - val FilteredRawData(df, dropped, droppedKeyValue) = getFilter(maxCorrelation).generateFilteredRaw(features, params) + val FilteredRawData(df, dropped, droppedKeyValue, _) = + getFilter(maxCorrelation).generateFilteredRaw(features, params) - dropped should contain theSameElementsAs expectedDropped.toSeq + dropped should contain theSameElementsAs expectedDropped droppedKeyValue should contain theSameElementsAs expectedDroppedMapKeys df.schema.fields.map(_.name) should contain theSameElementsAs DataFrameFieldNames.KeyFieldName +: features.diff(dropped).map(_.name) diff --git a/features/src/main/scala/com/salesforce/op/features/Feature.scala b/features/src/main/scala/com/salesforce/op/features/Feature.scala index 55ca535211..af8aca4ea4 100644 --- a/features/src/main/scala/com/salesforce/op/features/Feature.scala +++ b/features/src/main/scala/com/salesforce/op/features/Feature.scala @@ -55,7 +55,8 @@ case class Feature[O <: FeatureType] private[op] isResponse: Boolean, originStage: OpPipelineStage[O], parents: Seq[OPFeature], - uid: String + uid: String, + distributions: Seq[FeatureDistributionBase] = Seq.empty )(implicit val wtt: WeakTypeTag[O]) extends FeatureLike[O] { def this( @@ -68,7 +69,8 @@ case class Feature[O <: FeatureType] private[op] isResponse = isResponse, originStage = originStage, parents = parents, - uid = FeatureUID(originStage.uid) + uid = FeatureUID(originStage.uid), + distributions = Seq.empty )(wtt) /** @@ -89,13 +91,22 @@ case class Feature[O <: FeatureType] private[op] val stage = stagesMap.getOrElse(f.originStage.uid, f.originStage).asInstanceOf[OpPipelineStage[T]] val newParents = f.parents.map(p => copy[T](p.asInstanceOf[FeatureLike[T]])) Feature[T]( - name = f.name, isResponse = f.isResponse, originStage = stage, parents = newParents, uid = f.uid + name = f.name, isResponse = f.isResponse, originStage = stage, parents = newParents, uid = f.uid, + distributions = f.distributions )(f.wtt) } copy(this) } + /** + * Takes an a sequence of feature distributions assocaited with the feature + * + * @param distributions Seq of the feature distributions for the feature + * @return A feature with the distributions assocated + */ + override private[op] def withDistributions(distributions: Seq[FeatureDistributionBase]) = + this.copy(distributions = distributions) } /** diff --git a/features/src/main/scala/com/salesforce/op/features/FeatureDistributionBase.scala b/features/src/main/scala/com/salesforce/op/features/FeatureDistributionBase.scala new file mode 100644 index 0000000000..496197aa5c --- /dev/null +++ b/features/src/main/scala/com/salesforce/op/features/FeatureDistributionBase.scala @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2017, Salesforce.com, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * * Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.salesforce.op.features + + +/** + * Keeps the distribution information for features + */ +trait FeatureDistributionBase { + + /** + * name of the feature + */ + val name: String + + /** + * key map key associated with distribution (when the feature is a map) + */ + val key: Option[String] + + /** + * total count of feature seen + */ + val count: Long + + /** + * number of empties seen in feature + */ + val nulls: Long + + /** + * binned counts of feature values (hashed for strings, evently spaced bins for numerics) + */ + val distribution: Array[Double] + + /** + * either min and max number of tokens for text data, or number of splits used for bins for numeric data + */ + val summaryInfo: Array[Double] +} diff --git a/features/src/main/scala/com/salesforce/op/features/FeatureLike.scala b/features/src/main/scala/com/salesforce/op/features/FeatureLike.scala index 8a3c38b155..2b2025fe57 100644 --- a/features/src/main/scala/com/salesforce/op/features/FeatureLike.scala +++ b/features/src/main/scala/com/salesforce/op/features/FeatureLike.scala @@ -72,6 +72,11 @@ trait FeatureLike[O <: FeatureType] { */ val parents: Seq[OPFeature] + /** + * The distribution information of the feature (is a sequence because map features have distribution for each key) + */ + val distributions: Seq[FeatureDistributionBase] + /** * Weak type tag of the feature type O */ @@ -159,7 +164,8 @@ trait FeatureLike[O <: FeatureType] { val oid = Option(originStage).map(_.uid).orNull val pids = parents.map(_.uid).mkString("[", ",", "]") s"${this.getClass.getSimpleName}(" + - s"name = $name, uid = $uid, isResponse = $isResponse, originStage = $oid, parents = $pids)" + s"name = $name, uid = $uid, isResponse = $isResponse, originStage = $oid, parents = $pids," + + s" distributions = ${distributions})" } /** @@ -430,4 +436,12 @@ trait FeatureLike[O <: FeatureType] { */ private[op] def copyWithNewStages(stages: Array[OPStage]): FeatureLike[O] + /** + * Takes an a sequence of feature distributions assocaited with the feature + * + * @param distributions Seq of the feature distributions for the feature + * @return A feature with the distributions assocated + */ + private[op] def withDistributions(distributions: Seq[FeatureDistributionBase]): FeatureLike[O] + } From 15b0ccabe5a382d880db347598b1665f54948450 Mon Sep 17 00:00:00 2001 From: leahmcguire Date: Wed, 22 Aug 2018 12:05:41 -0700 Subject: [PATCH 3/4] addressed comments added ser and deser for distributions on workflow model --- .../scala/com/salesforce/op/OpWorkflow.scala | 2 ++ .../salesforce/op/OpWorkflowModelReader.scala | 12 +++++++++ .../salesforce/op/OpWorkflowModelWriter.scala | 6 ++++- .../op/filters/FeatureDistribution.scala | 4 +-- .../op/OpWorkflowModelReaderWriterTest.scala | 25 ++++++++++++++++++- .../salesforce/op/features/FeaturesTest.scala | 14 +++++------ .../com/salesforce/op/features/Feature.scala | 4 +-- ...se.scala => FeatureDistributionLike.scala} | 6 ++--- .../salesforce/op/features/FeatureLike.scala | 4 +-- 9 files changed, 59 insertions(+), 18 deletions(-) rename features/src/main/scala/com/salesforce/op/features/{FeatureDistributionBase.scala => FeatureDistributionLike.scala} (91%) diff --git a/core/src/main/scala/com/salesforce/op/OpWorkflow.scala b/core/src/main/scala/com/salesforce/op/OpWorkflow.scala index 01bd65b865..3f65ac109a 100644 --- a/core/src/main/scala/com/salesforce/op/OpWorkflow.scala +++ b/core/src/main/scala/com/salesforce/op/OpWorkflow.scala @@ -106,6 +106,7 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore { /** * Will set the blacklisted features variable and if list is non-empty it will * @param features list of features to blacklist + * @param distributions feature distributions calculated in raw feature filter */ private[op] def setBlacklist(features: Array[OPFeature], distributions: Seq[FeatureDistribution]): Unit = { blacklistedFeatures = features @@ -343,6 +344,7 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore { .setParameters(getParameters()) .setBlacklist(getBlacklist()) .setBlacklistMapKeys(getBlacklistMapKeys()) + .setRawFeatureDistributions(getRawFeatureDistributions()) reader.map(model.setReader).getOrElse(model) } diff --git a/core/src/main/scala/com/salesforce/op/OpWorkflowModelReader.scala b/core/src/main/scala/com/salesforce/op/OpWorkflowModelReader.scala index b6e2dac7ec..70d711017b 100644 --- a/core/src/main/scala/com/salesforce/op/OpWorkflowModelReader.scala +++ b/core/src/main/scala/com/salesforce/op/OpWorkflowModelReader.scala @@ -34,6 +34,8 @@ import com.salesforce.op.OpWorkflowModelReadWriteShared.FieldNames._ import com.salesforce.op.features.{FeatureJsonHelper, OPFeature, TransientFeature} import com.salesforce.op.stages.{OpPipelineStageReader, _} import OpPipelineStageReadWriteShared._ +import com.salesforce.op.filters.FeatureDistribution +import com.salesforce.op.utils.json.JsonUtils import org.apache.spark.ml.util.MLReader import org.json4s.JsonAST.{JArray, JNothing, JValue} import org.json4s.jackson.JsonMethods.parse @@ -86,11 +88,13 @@ class OpWorkflowModelReader(val workflow: OpWorkflow) extends MLReader[OpWorkflo model <- Try(new OpWorkflowModel(uid = (json \ Uid.entryName).extract[String], trainParams)) (stages, resultFeatures) <- Try(resolveFeaturesAndStages(json, path)) blacklist <- Try(resolveBlacklist(json)) + distributions <- resolveRawFeatureDistributions(json) } yield model .setStages(stages.filterNot(_.isInstanceOf[FeatureGeneratorStage[_, _]])) .setFeatures(resultFeatures) .setParameters(params) .setBlacklist(blacklist) + .setRawFeatureDistributions(distributions) } private def resolveBlacklist(json: JValue): Array[OPFeature] = { @@ -151,5 +155,13 @@ class OpWorkflowModelReader(val workflow: OpWorkflow) extends MLReader[OpWorkflo } } + private def resolveRawFeatureDistributions(json: JValue): Try[Array[FeatureDistribution]] = { + if ((json \ RawFeatureDistributions.entryName) != JNothing) { // for backwards compatibility + val distString = (json \ RawFeatureDistributions.entryName).extract[String] + JsonUtils.fromString[Array[FeatureDistribution]](distString) + } else { + Try(Array.empty[FeatureDistribution]) + } + } } diff --git a/core/src/main/scala/com/salesforce/op/OpWorkflowModelWriter.scala b/core/src/main/scala/com/salesforce/op/OpWorkflowModelWriter.scala index 6de4e1297b..259c922b6e 100644 --- a/core/src/main/scala/com/salesforce/op/OpWorkflowModelWriter.scala +++ b/core/src/main/scala/com/salesforce/op/OpWorkflowModelWriter.scala @@ -32,6 +32,7 @@ package com.salesforce.op import com.salesforce.op.features.FeatureJsonHelper import com.salesforce.op.stages.{OpPipelineStageBase, OpPipelineStageWriter} +import com.salesforce.op.utils.json.JsonUtils import enumeratum._ import org.apache.hadoop.fs.Path import org.apache.spark.ml.util.MLWriter @@ -79,7 +80,9 @@ class OpWorkflowModelWriter(val model: OpWorkflowModel) extends MLWriter { (FN.Stages.entryName -> stagesJArray(path)) ~ (FN.AllFeatures.entryName -> allFeaturesJArray) ~ (FN.Parameters.entryName -> model.parameters.toJson(pretty = false)) ~ - (FN.TrainParameters.entryName -> model.trainingParams.toJson(pretty = false)) + (FN.TrainParameters.entryName -> model.trainingParams.toJson(pretty = false)) ~ + (FN.RawFeatureDistributions.entryName -> JsonUtils.toJsonString(model.getRawFeatureDistributions(), + pretty = false)) } private def resultFeaturesJArray(): JArray = @@ -136,6 +139,7 @@ private[op] object OpWorkflowModelReadWriteShared { case object AllFeatures extends FieldNames("allFeatures") case object Parameters extends FieldNames("parameters") case object TrainParameters extends FieldNames("trainParameters") + case object RawFeatureDistributions extends FieldNames("rawFeatureDistributions") } } diff --git a/core/src/main/scala/com/salesforce/op/filters/FeatureDistribution.scala b/core/src/main/scala/com/salesforce/op/filters/FeatureDistribution.scala index 3ad7fcc43b..8dba96dd55 100644 --- a/core/src/main/scala/com/salesforce/op/filters/FeatureDistribution.scala +++ b/core/src/main/scala/com/salesforce/op/filters/FeatureDistribution.scala @@ -30,7 +30,7 @@ package com.salesforce.op.filters -import com.salesforce.op.features.FeatureDistributionBase +import com.salesforce.op.features.FeatureDistributionLike import com.salesforce.op.stages.impl.feature.{Inclusion, NumericBucketizer} import com.twitter.algebird.Semigroup import com.twitter.algebird.Monoid._ @@ -56,7 +56,7 @@ case class FeatureDistribution nulls: Long, distribution: Array[Double], summaryInfo: Array[Double] -) extends FeatureDistributionBase { +) extends FeatureDistributionLike { /** * Get feature key associated to this distribution diff --git a/core/src/test/scala/com/salesforce/op/OpWorkflowModelReaderWriterTest.scala b/core/src/test/scala/com/salesforce/op/OpWorkflowModelReaderWriterTest.scala index c2fcd9664d..3e886b5709 100644 --- a/core/src/test/scala/com/salesforce/op/OpWorkflowModelReaderWriterTest.scala +++ b/core/src/test/scala/com/salesforce/op/OpWorkflowModelReaderWriterTest.scala @@ -35,6 +35,7 @@ import java.io.File import com.salesforce.op.OpWorkflowModelReadWriteShared.FieldNames._ import com.salesforce.op.features.OPFeature import com.salesforce.op.features.types.{Real, RealNN} +import com.salesforce.op.filters.FeatureDistribution import com.salesforce.op.readers.{AggregateAvroReader, DataReaders} import com.salesforce.op.stages.OPStage import com.salesforce.op.stages.sparkwrappers.generic.SwUnaryEstimator @@ -83,11 +84,15 @@ class OpWorkflowModelReaderWriterTest aggregateParams = null ) + val distributions = Array(FeatureDistribution("a", None, 1L, 1L, Array(1.0), Array(1.0)), + FeatureDistribution("b", Option("b"), 2L, 2L, Array(2.0), Array(2.0))) + def makeDummyModel(wf: OpWorkflow): OpWorkflowModel = { val model = new OpWorkflowModel(wf.uid, wf.parameters) .setStages(wf.stages) .setFeatures(wf.resultFeatures) .setParameters(wf.parameters) + .setRawFeatureDistributions(distributions) model.setReader(wf.reader.get) } @@ -105,6 +110,7 @@ class OpWorkflowModelReaderWriterTest .setReader(dummyReader) .setResultFeatures(density) .setParameters(workflowParams) + .setRawFeatureDistributions(distributions) val (wfM, jsonModel) = makeModelAndJson(wf) } @@ -116,6 +122,7 @@ class OpWorkflowModelReaderWriterTest .setReader(dummyReader) .setResultFeatures(density, weight2) .setParameters(workflowParams) + .setRawFeatureDistributions(distributions) val (wfM, jsonModel) = makeModelAndJson(wf) } @@ -124,6 +131,7 @@ class OpWorkflowModelReaderWriterTest .setReader(dummyReader) .setResultFeatures(weight) .setParameters(workflowParams) + .setRawFeatureDistributions(distributions) val (wfM, jsonModel) = makeModelAndJson(wf) } @@ -140,6 +148,7 @@ class OpWorkflowModelReaderWriterTest .setParameters(workflowParams) .setReader(dummyReader) .setResultFeatures(scaled) + .setRawFeatureDistributions(distributions) val (wfM, jsonModel) = makeModelAndJson(wf) } @@ -262,7 +271,7 @@ class OpWorkflowModelReaderWriterTest compareWorkflowModels(wfM, wfMR) } - it should "load a workflow model that has a RawFeatureFiler and a different workflow" in new VectorizedFlow { + it should "load a workflow model that has a RawFeatureFilter and a different workflow" in new VectorizedFlow { val wfM = wf.loadModel(saveFlowPathStable) wf.getResultFeatures().head.name shouldBe wfM.getResultFeatures().head.name wf.getResultFeatures().head.history().originFeatures should contain theSameElementsAs @@ -304,6 +313,7 @@ class OpWorkflowModelReaderWriterTest compareFeatures(wf1.blacklistedFeatures, wf2.blacklistedFeatures) compareFeatures(wf1.rawFeatures, wf2.rawFeatures) compareStages(wf1.stages, wf2.stages) + compareDistributions(wf1.getRawFeatureDistributions(), wf2.getRawFeatureDistributions()) } def compareWorkflowModels(wf1: OpWorkflowModel, wf2: OpWorkflowModel): Unit = { @@ -314,6 +324,7 @@ class OpWorkflowModelReaderWriterTest compareFeatures(wf1.blacklistedFeatures, wf2.blacklistedFeatures) compareFeatures(wf1.rawFeatures, wf2.rawFeatures) compareStages(wf1.stages, wf2.stages) + compareDistributions(wf1.getRawFeatureDistributions(), wf2.getRawFeatureDistributions()) } def compareParams(p1: OpParams, p2: OpParams): Unit = { @@ -321,6 +332,18 @@ class OpWorkflowModelReaderWriterTest p1.readerParams.toString() shouldBe p2.readerParams.toString() p1.customParams shouldBe p2.customParams } + + def compareDistributions(d1: Array[FeatureDistribution], d2: Array[FeatureDistribution]): Unit = { + d1.zip(d2) + .foreach{ case (a, b) => + a.name shouldEqual b.name + a.key shouldEqual b.key + a.count shouldEqual b.count + a.nulls shouldEqual b.nulls + a.distribution shouldEqual b.distribution + a.summaryInfo shouldEqual b.summaryInfo + } + } } trait UIDReset { diff --git a/core/src/test/scala/com/salesforce/op/features/FeaturesTest.scala b/core/src/test/scala/com/salesforce/op/features/FeaturesTest.scala index 18479cebab..d8e9fe6da0 100644 --- a/core/src/test/scala/com/salesforce/op/features/FeaturesTest.scala +++ b/core/src/test/scala/com/salesforce/op/features/FeaturesTest.scala @@ -193,13 +193,13 @@ class FeaturesTest extends WordSpec with PassengerFeaturesTest with TestCommon { } "with distributions" should { "make a copy of the feature containing the specified distributions" in { - val distrib = new FeatureDistributionBase { - override val count: Long = 1L - override val nulls: Long = 1L - override val distribution: Array[Double] = Array(0.5) - override val summaryInfo: Array[Double] = Array(0.5) - override val name: String = age.name - override val key: Option[String] = None + val distrib = new FeatureDistributionLike { + val count: Long = 1L + val nulls: Long = 1L + val distribution: Array[Double] = Array(0.5) + val summaryInfo: Array[Double] = Array(0.5) + val name: String = age.name + val key: Option[String] = None } val newAge = age.withDistributions(Seq(distrib)) newAge.name shouldEqual age.name diff --git a/features/src/main/scala/com/salesforce/op/features/Feature.scala b/features/src/main/scala/com/salesforce/op/features/Feature.scala index af8aca4ea4..c40076eff8 100644 --- a/features/src/main/scala/com/salesforce/op/features/Feature.scala +++ b/features/src/main/scala/com/salesforce/op/features/Feature.scala @@ -56,7 +56,7 @@ case class Feature[O <: FeatureType] private[op] originStage: OpPipelineStage[O], parents: Seq[OPFeature], uid: String, - distributions: Seq[FeatureDistributionBase] = Seq.empty + distributions: Seq[FeatureDistributionLike] = Seq.empty )(implicit val wtt: WeakTypeTag[O]) extends FeatureLike[O] { def this( @@ -105,7 +105,7 @@ case class Feature[O <: FeatureType] private[op] * @param distributions Seq of the feature distributions for the feature * @return A feature with the distributions assocated */ - override private[op] def withDistributions(distributions: Seq[FeatureDistributionBase]) = + override private[op] def withDistributions(distributions: Seq[FeatureDistributionLike]) = this.copy(distributions = distributions) } diff --git a/features/src/main/scala/com/salesforce/op/features/FeatureDistributionBase.scala b/features/src/main/scala/com/salesforce/op/features/FeatureDistributionLike.scala similarity index 91% rename from features/src/main/scala/com/salesforce/op/features/FeatureDistributionBase.scala rename to features/src/main/scala/com/salesforce/op/features/FeatureDistributionLike.scala index 496197aa5c..7228f4eba8 100644 --- a/features/src/main/scala/com/salesforce/op/features/FeatureDistributionBase.scala +++ b/features/src/main/scala/com/salesforce/op/features/FeatureDistributionLike.scala @@ -34,7 +34,7 @@ package com.salesforce.op.features /** * Keeps the distribution information for features */ -trait FeatureDistributionBase { +trait FeatureDistributionLike { /** * name of the feature @@ -42,7 +42,7 @@ trait FeatureDistributionBase { val name: String /** - * key map key associated with distribution (when the feature is a map) + * map key associated with distribution (when the feature is a map) */ val key: Option[String] @@ -57,7 +57,7 @@ trait FeatureDistributionBase { val nulls: Long /** - * binned counts of feature values (hashed for strings, evently spaced bins for numerics) + * binned counts of feature values (hashed for strings, evenly spaced bins for numerics) */ val distribution: Array[Double] diff --git a/features/src/main/scala/com/salesforce/op/features/FeatureLike.scala b/features/src/main/scala/com/salesforce/op/features/FeatureLike.scala index 2b2025fe57..e3fcfde9af 100644 --- a/features/src/main/scala/com/salesforce/op/features/FeatureLike.scala +++ b/features/src/main/scala/com/salesforce/op/features/FeatureLike.scala @@ -75,7 +75,7 @@ trait FeatureLike[O <: FeatureType] { /** * The distribution information of the feature (is a sequence because map features have distribution for each key) */ - val distributions: Seq[FeatureDistributionBase] + val distributions: Seq[FeatureDistributionLike] /** * Weak type tag of the feature type O @@ -442,6 +442,6 @@ trait FeatureLike[O <: FeatureType] { * @param distributions Seq of the feature distributions for the feature * @return A feature with the distributions assocated */ - private[op] def withDistributions(distributions: Seq[FeatureDistributionBase]): FeatureLike[O] + private[op] def withDistributions(distributions: Seq[FeatureDistributionLike]): FeatureLike[O] } From 30c60f9e28dd084b6633cb29ede44eb33d975a7b Mon Sep 17 00:00:00 2001 From: leahmcguire Date: Wed, 22 Aug 2018 13:18:05 -0700 Subject: [PATCH 4/4] changed try to success --- .../main/scala/com/salesforce/op/OpWorkflowModelReader.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/com/salesforce/op/OpWorkflowModelReader.scala b/core/src/main/scala/com/salesforce/op/OpWorkflowModelReader.scala index 70d711017b..964d14aec4 100644 --- a/core/src/main/scala/com/salesforce/op/OpWorkflowModelReader.scala +++ b/core/src/main/scala/com/salesforce/op/OpWorkflowModelReader.scala @@ -160,7 +160,7 @@ class OpWorkflowModelReader(val workflow: OpWorkflow) extends MLReader[OpWorkflo val distString = (json \ RawFeatureDistributions.entryName).extract[String] JsonUtils.fromString[Array[FeatureDistribution]](distString) } else { - Try(Array.empty[FeatureDistribution]) + Success(Array.empty[FeatureDistribution]) } }