Skip to content

Commit

Permalink
addressed comments added ser and deser for distributions on workflow …
Browse files Browse the repository at this point in the history
…model
  • Loading branch information
leahmcguire committed Aug 22, 2018
1 parent 1cd4f63 commit 15b0cca
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 18 deletions.
2 changes: 2 additions & 0 deletions core/src/main/scala/com/salesforce/op/OpWorkflow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
/**
* Will set the blacklisted features variable and if list is non-empty it will
* @param features list of features to blacklist
* @param distributions feature distributions calculated in raw feature filter
*/
private[op] def setBlacklist(features: Array[OPFeature], distributions: Seq[FeatureDistribution]): Unit = {
blacklistedFeatures = features
Expand Down Expand Up @@ -343,6 +344,7 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
.setParameters(getParameters())
.setBlacklist(getBlacklist())
.setBlacklistMapKeys(getBlacklistMapKeys())
.setRawFeatureDistributions(getRawFeatureDistributions())

reader.map(model.setReader).getOrElse(model)
}
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/scala/com/salesforce/op/OpWorkflowModelReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import com.salesforce.op.OpWorkflowModelReadWriteShared.FieldNames._
import com.salesforce.op.features.{FeatureJsonHelper, OPFeature, TransientFeature}
import com.salesforce.op.stages.{OpPipelineStageReader, _}
import OpPipelineStageReadWriteShared._
import com.salesforce.op.filters.FeatureDistribution
import com.salesforce.op.utils.json.JsonUtils
import org.apache.spark.ml.util.MLReader
import org.json4s.JsonAST.{JArray, JNothing, JValue}
import org.json4s.jackson.JsonMethods.parse
Expand Down Expand Up @@ -86,11 +88,13 @@ class OpWorkflowModelReader(val workflow: OpWorkflow) extends MLReader[OpWorkflo
model <- Try(new OpWorkflowModel(uid = (json \ Uid.entryName).extract[String], trainParams))
(stages, resultFeatures) <- Try(resolveFeaturesAndStages(json, path))
blacklist <- Try(resolveBlacklist(json))
distributions <- resolveRawFeatureDistributions(json)
} yield model
.setStages(stages.filterNot(_.isInstanceOf[FeatureGeneratorStage[_, _]]))
.setFeatures(resultFeatures)
.setParameters(params)
.setBlacklist(blacklist)
.setRawFeatureDistributions(distributions)
}

private def resolveBlacklist(json: JValue): Array[OPFeature] = {
Expand Down Expand Up @@ -151,5 +155,13 @@ class OpWorkflowModelReader(val workflow: OpWorkflow) extends MLReader[OpWorkflo
}
}

private def resolveRawFeatureDistributions(json: JValue): Try[Array[FeatureDistribution]] = {
if ((json \ RawFeatureDistributions.entryName) != JNothing) { // for backwards compatibility
val distString = (json \ RawFeatureDistributions.entryName).extract[String]
JsonUtils.fromString[Array[FeatureDistribution]](distString)
} else {
Try(Array.empty[FeatureDistribution])

This comment has been minimized.

Copy link
@tovbinm

tovbinm Aug 22, 2018

Collaborator

Success(Array.empty[FeatureDistribution])

}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ package com.salesforce.op

import com.salesforce.op.features.FeatureJsonHelper
import com.salesforce.op.stages.{OpPipelineStageBase, OpPipelineStageWriter}
import com.salesforce.op.utils.json.JsonUtils
import enumeratum._
import org.apache.hadoop.fs.Path
import org.apache.spark.ml.util.MLWriter
Expand Down Expand Up @@ -79,7 +80,9 @@ class OpWorkflowModelWriter(val model: OpWorkflowModel) extends MLWriter {
(FN.Stages.entryName -> stagesJArray(path)) ~
(FN.AllFeatures.entryName -> allFeaturesJArray) ~
(FN.Parameters.entryName -> model.parameters.toJson(pretty = false)) ~
(FN.TrainParameters.entryName -> model.trainingParams.toJson(pretty = false))
(FN.TrainParameters.entryName -> model.trainingParams.toJson(pretty = false)) ~
(FN.RawFeatureDistributions.entryName -> JsonUtils.toJsonString(model.getRawFeatureDistributions(),
pretty = false))
}

private def resultFeaturesJArray(): JArray =
Expand Down Expand Up @@ -136,6 +139,7 @@ private[op] object OpWorkflowModelReadWriteShared {
case object AllFeatures extends FieldNames("allFeatures")
case object Parameters extends FieldNames("parameters")
case object TrainParameters extends FieldNames("trainParameters")
case object RawFeatureDistributions extends FieldNames("rawFeatureDistributions")
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

package com.salesforce.op.filters

import com.salesforce.op.features.FeatureDistributionBase
import com.salesforce.op.features.FeatureDistributionLike
import com.salesforce.op.stages.impl.feature.{Inclusion, NumericBucketizer}
import com.twitter.algebird.Semigroup
import com.twitter.algebird.Monoid._
Expand All @@ -56,7 +56,7 @@ case class FeatureDistribution
nulls: Long,
distribution: Array[Double],
summaryInfo: Array[Double]
) extends FeatureDistributionBase {
) extends FeatureDistributionLike {

/**
* Get feature key associated to this distribution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import java.io.File
import com.salesforce.op.OpWorkflowModelReadWriteShared.FieldNames._
import com.salesforce.op.features.OPFeature
import com.salesforce.op.features.types.{Real, RealNN}
import com.salesforce.op.filters.FeatureDistribution
import com.salesforce.op.readers.{AggregateAvroReader, DataReaders}
import com.salesforce.op.stages.OPStage
import com.salesforce.op.stages.sparkwrappers.generic.SwUnaryEstimator
Expand Down Expand Up @@ -83,11 +84,15 @@ class OpWorkflowModelReaderWriterTest
aggregateParams = null
)

val distributions = Array(FeatureDistribution("a", None, 1L, 1L, Array(1.0), Array(1.0)),
FeatureDistribution("b", Option("b"), 2L, 2L, Array(2.0), Array(2.0)))

def makeDummyModel(wf: OpWorkflow): OpWorkflowModel = {
val model = new OpWorkflowModel(wf.uid, wf.parameters)
.setStages(wf.stages)
.setFeatures(wf.resultFeatures)
.setParameters(wf.parameters)
.setRawFeatureDistributions(distributions)

model.setReader(wf.reader.get)
}
Expand All @@ -105,6 +110,7 @@ class OpWorkflowModelReaderWriterTest
.setReader(dummyReader)
.setResultFeatures(density)
.setParameters(workflowParams)
.setRawFeatureDistributions(distributions)
val (wfM, jsonModel) = makeModelAndJson(wf)
}

Expand All @@ -116,6 +122,7 @@ class OpWorkflowModelReaderWriterTest
.setReader(dummyReader)
.setResultFeatures(density, weight2)
.setParameters(workflowParams)
.setRawFeatureDistributions(distributions)
val (wfM, jsonModel) = makeModelAndJson(wf)
}

Expand All @@ -124,6 +131,7 @@ class OpWorkflowModelReaderWriterTest
.setReader(dummyReader)
.setResultFeatures(weight)
.setParameters(workflowParams)
.setRawFeatureDistributions(distributions)
val (wfM, jsonModel) = makeModelAndJson(wf)
}

Expand All @@ -140,6 +148,7 @@ class OpWorkflowModelReaderWriterTest
.setParameters(workflowParams)
.setReader(dummyReader)
.setResultFeatures(scaled)
.setRawFeatureDistributions(distributions)
val (wfM, jsonModel) = makeModelAndJson(wf)
}

Expand Down Expand Up @@ -262,7 +271,7 @@ class OpWorkflowModelReaderWriterTest
compareWorkflowModels(wfM, wfMR)
}

it should "load a workflow model that has a RawFeatureFiler and a different workflow" in new VectorizedFlow {
it should "load a workflow model that has a RawFeatureFilter and a different workflow" in new VectorizedFlow {
val wfM = wf.loadModel(saveFlowPathStable)
wf.getResultFeatures().head.name shouldBe wfM.getResultFeatures().head.name
wf.getResultFeatures().head.history().originFeatures should contain theSameElementsAs
Expand Down Expand Up @@ -304,6 +313,7 @@ class OpWorkflowModelReaderWriterTest
compareFeatures(wf1.blacklistedFeatures, wf2.blacklistedFeatures)
compareFeatures(wf1.rawFeatures, wf2.rawFeatures)
compareStages(wf1.stages, wf2.stages)
compareDistributions(wf1.getRawFeatureDistributions(), wf2.getRawFeatureDistributions())
}

def compareWorkflowModels(wf1: OpWorkflowModel, wf2: OpWorkflowModel): Unit = {
Expand All @@ -314,13 +324,26 @@ class OpWorkflowModelReaderWriterTest
compareFeatures(wf1.blacklistedFeatures, wf2.blacklistedFeatures)
compareFeatures(wf1.rawFeatures, wf2.rawFeatures)
compareStages(wf1.stages, wf2.stages)
compareDistributions(wf1.getRawFeatureDistributions(), wf2.getRawFeatureDistributions())
}

def compareParams(p1: OpParams, p2: OpParams): Unit = {
p1.stageParams shouldBe p2.stageParams
p1.readerParams.toString() shouldBe p2.readerParams.toString()
p1.customParams shouldBe p2.customParams
}

def compareDistributions(d1: Array[FeatureDistribution], d2: Array[FeatureDistribution]): Unit = {
d1.zip(d2)
.foreach{ case (a, b) =>
a.name shouldEqual b.name
a.key shouldEqual b.key
a.count shouldEqual b.count
a.nulls shouldEqual b.nulls
a.distribution shouldEqual b.distribution
a.summaryInfo shouldEqual b.summaryInfo
}
}
}

trait UIDReset {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,13 +193,13 @@ class FeaturesTest extends WordSpec with PassengerFeaturesTest with TestCommon {
}
"with distributions" should {
"make a copy of the feature containing the specified distributions" in {
val distrib = new FeatureDistributionBase {
override val count: Long = 1L
override val nulls: Long = 1L
override val distribution: Array[Double] = Array(0.5)
override val summaryInfo: Array[Double] = Array(0.5)
override val name: String = age.name
override val key: Option[String] = None
val distrib = new FeatureDistributionLike {
val count: Long = 1L
val nulls: Long = 1L
val distribution: Array[Double] = Array(0.5)
val summaryInfo: Array[Double] = Array(0.5)
val name: String = age.name
val key: Option[String] = None
}
val newAge = age.withDistributions(Seq(distrib))
newAge.name shouldEqual age.name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ case class Feature[O <: FeatureType] private[op]
originStage: OpPipelineStage[O],
parents: Seq[OPFeature],
uid: String,
distributions: Seq[FeatureDistributionBase] = Seq.empty
distributions: Seq[FeatureDistributionLike] = Seq.empty
)(implicit val wtt: WeakTypeTag[O]) extends FeatureLike[O] {

def this(
Expand Down Expand Up @@ -105,7 +105,7 @@ case class Feature[O <: FeatureType] private[op]
* @param distributions Seq of the feature distributions for the feature
* @return A feature with the distributions assocated
*/
override private[op] def withDistributions(distributions: Seq[FeatureDistributionBase]) =
override private[op] def withDistributions(distributions: Seq[FeatureDistributionLike]) =
this.copy(distributions = distributions)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ package com.salesforce.op.features
/**
* Keeps the distribution information for features
*/
trait FeatureDistributionBase {
trait FeatureDistributionLike {

/**
* name of the feature
*/
val name: String

/**
* key map key associated with distribution (when the feature is a map)
* map key associated with distribution (when the feature is a map)
*/
val key: Option[String]

Expand All @@ -57,7 +57,7 @@ trait FeatureDistributionBase {
val nulls: Long

/**
* binned counts of feature values (hashed for strings, evently spaced bins for numerics)
* binned counts of feature values (hashed for strings, evenly spaced bins for numerics)
*/
val distribution: Array[Double]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ trait FeatureLike[O <: FeatureType] {
/**
* The distribution information of the feature (is a sequence because map features have distribution for each key)
*/
val distributions: Seq[FeatureDistributionBase]
val distributions: Seq[FeatureDistributionLike]

/**
* Weak type tag of the feature type O
Expand Down Expand Up @@ -442,6 +442,6 @@ trait FeatureLike[O <: FeatureType] {
* @param distributions Seq of the feature distributions for the feature
* @return A feature with the distributions assocated
*/
private[op] def withDistributions(distributions: Seq[FeatureDistributionBase]): FeatureLike[O]
private[op] def withDistributions(distributions: Seq[FeatureDistributionLike]): FeatureLike[O]

}

0 comments on commit 15b0cca

Please sign in to comment.