Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serialize blacklisted map keys with the model + updated access on workflow/model members #320

Merged
merged 4 commits into from
May 15, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/main/scala/com/salesforce/op/OpWorkflow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,8 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
* needed to generate the features not included in the fitted model
*/
def withModelStages(model: OpWorkflowModel): this.type = {
val newResultFeatures = (resultFeatures ++ model.getResultFeatures()).map(_.copyWithNewStages(model.stages))
val newResultFeatures =
(resultFeatures ++ model.getResultFeatures()).map(_.copyWithNewStages(model.getStages()))
setResultFeatures(newResultFeatures: _*)
}

Expand Down
42 changes: 32 additions & 10 deletions core/src/main/scala/com/salesforce/op/OpWorkflowCore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,31 +57,31 @@ private[op] trait OpWorkflowCore {
def uid: String

// whether the CV/TV is performed on the workflow level
private[op] var isWorkflowCV = false
protected var isWorkflowCVEnabled = false

// the data reader for the workflow or model
private[op] var reader: Option[Reader[_]] = None
protected var reader: Option[Reader[_]] = None

// final features from workflow, used to find stages of the workflow
private[op] var resultFeatures: Array[OPFeature] = Array[OPFeature]()
protected var resultFeatures: Array[OPFeature] = Array[OPFeature]()

// raw features generated after data is read in and aggregated
private[op] var rawFeatures: Array[OPFeature] = Array[OPFeature]()
protected var rawFeatures: Array[OPFeature] = Array[OPFeature]()

// features that have been blacklisted from use in dag
private[op] var blacklistedFeatures: Array[OPFeature] = Array[OPFeature]()
protected var blacklistedFeatures: Array[OPFeature] = Array[OPFeature]()

// map keys that were blacklisted from use in dag
private[op] var blacklistedMapKeys: Map[String, Set[String]] = Map[String, Set[String]]()
protected var blacklistedMapKeys: Map[String, Set[String]] = Map[String, Set[String]]()

// raw feature filter results calculated in raw feature filter
private[op] var rawFeatureFilterResults: RawFeatureFilterResults = RawFeatureFilterResults()
protected var rawFeatureFilterResults: RawFeatureFilterResults = RawFeatureFilterResults()

// stages of the workflow
private[op] var stages: Array[OPStage] = Array[OPStage]()
protected var stages: Array[OPStage] = Array[OPStage]()

// command line parameters for the workflow stages and readers
private[op] var parameters = new OpParams()
protected var parameters = new OpParams()

private[op] def setStages(value: Array[OPStage]): this.type = {
stages = value
Expand All @@ -102,10 +102,16 @@ private[op] trait OpWorkflowCore {
*/
@Experimental
final def withWorkflowCV: this.type = {
isWorkflowCV = true
isWorkflowCVEnabled = true
this
}

/**
* Whether the cross-validation/train-validation-split will be done at workflow level
*g c
* @return true if the cross-validation will be done at workflow level, false otherwise
*/
final def isWorkflowCV: Boolean = isWorkflowCVEnabled

/**
* Set data reader that will be used to generate data frame for stages
Expand All @@ -119,6 +125,15 @@ private[op] trait OpWorkflowCore {
this
}

/**
* Get data reader that will be used to generate data frame for stages
*
* @return reader for workflow
*/
final def getReader(): Reader[_] = {
reader.getOrElse(throw new IllegalArgumentException("Reader is not set"))
}

/**
* Set input dataset which contains columns corresponding to the raw features used in the workflow
* The type of the dataset (Dataset[T]) must match the type of the FeatureBuilders[T] used to generate
Expand Down Expand Up @@ -161,6 +176,13 @@ private[op] trait OpWorkflowCore {
*/
final def getStages(): Array[OPStage] = stages

/**
* Get the raw features generated by the workflow
*
* @return raw features for workflow
*/
final def getRawFeatures(): Array[OPFeature] = rawFeatures
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

woo! 👍


/**
* Get the final features generated by the workflow
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class OpWorkflowModel(val uid: String = UID[OpWorkflowModel], val trainingParams
val parentStageIds = feature.traverse[Set[String]](Set.empty[String])((s, f) => s + f.originStage.uid)
val modelStages = stages.filter(s => parentStageIds.contains(s.uid))
ModelInsights.extractFromStages(modelStages, rawFeatures, trainingParams,
blacklistedFeatures, blacklistedMapKeys, rawFeatureFilterResults)
getBlacklist(), getBlacklistMapKeys(), getRawFeatureFilterResults())
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,27 +92,36 @@ class OpWorkflowModelReader(val workflowOpt: Option[OpWorkflow]) extends MLReade
model <- Try(new OpWorkflowModel(uid = (json \ Uid.entryName).extract[String], trainParams))
(stages, resultFeatures) <- Try(resolveFeaturesAndStages(workflow, json, path))
blacklist <- Try(resolveBlacklist(workflow, json))
blacklistMapKeys <- Try(resolveBlacklistMapKeys(json))
results <- resolveRawFeatureFilterResults(json)
} yield model
.setStages(stages.filterNot(_.isInstanceOf[FeatureGeneratorStage[_, _]]))
.setFeatures(resultFeatures)
.setParameters(params)
.setBlacklist(blacklist)
.setBlacklistMapKeys(blacklistMapKeys)
.setRawFeatureFilterResults(results)
}

private def resolveBlacklist(workflow: OpWorkflow, json: JValue): Array[OPFeature] = {
if ((json \ BlacklistedFeaturesUids.entryName) != JNothing) { // for backwards compatibility
val blacklistIds = (json \ BlacklistedFeaturesUids.entryName).extract[JArray].arr
val allFeatures = workflow.rawFeatures ++ workflow.blacklistedFeatures ++
workflow.stages.flatMap(s => s.getInputFeatures()) ++
workflow.resultFeatures
val allFeatures = workflow.getRawFeatures() ++ workflow.getBlacklist() ++
workflow.getStages().flatMap(_.getInputFeatures()) ++
workflow.getResultFeatures()
blacklistIds.flatMap(uid => allFeatures.find(_.uid == uid.extract[String])).toArray
} else {
Array.empty[OPFeature]
}
}

private def resolveBlacklistMapKeys(json: JValue): Map[String, Set[String]] = {
(json \ BlacklistedMapKeys.entryName).extractOpt[Map[String, List[String]]] match {
case Some(blackMapKeys) => blackMapKeys.map { case (k, vs) => k -> vs.toSet }
case None => Map.empty
}
}

private def resolveFeaturesAndStages
(
workflow: OpWorkflow,
Expand All @@ -135,14 +144,14 @@ class OpWorkflowModelReader(val workflowOpt: Option[OpWorkflow]) extends MLReade
val recoveredStages = stagesJs.flatMap { j =>
val stageUidOpt = (j \ Uid.entryName).extractOpt[String]
stageUidOpt.map { stageUid =>
val originalStage = workflow.stages.find(_.uid == stageUid)
val originalStage = workflow.getStages().find(_.uid == stageUid)
originalStage match {
case Some(os) => new OpPipelineStageReader(os).loadFromJson(j, path = path).asInstanceOf[OPStage]
case None => throw new RuntimeException(s"Workflow does not contain a stage with uid: $stageUid")
}
}
}
val generators = workflow.rawFeatures.map(_.originStage)
val generators = workflow.getRawFeatures().map(_.originStage)
generators ++ recoveredStages
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,23 @@ class OpWorkflowModelWriter(val model: OpWorkflowModel) extends MLWriter {
(FN.Uid.entryName -> model.uid) ~
(FN.ResultFeaturesUids.entryName -> resultFeaturesJArray) ~
(FN.BlacklistedFeaturesUids.entryName -> blacklistFeaturesJArray()) ~
(FN.BlacklistedMapKeys.entryName -> blacklistMapKeys()) ~
(FN.Stages.entryName -> stagesJArray(path)) ~
(FN.AllFeatures.entryName -> allFeaturesJArray) ~
(FN.Parameters.entryName -> model.parameters.toJson(pretty = false)) ~
(FN.Parameters.entryName -> model.getParameters().toJson(pretty = false)) ~
(FN.TrainParameters.entryName -> model.trainingParams.toJson(pretty = false)) ~
(FN.RawFeatureFilterResultsFieldName.entryName ->
RawFeatureFilterResults.toJson(model.getRawFeatureFilterResults()))
}

private def resultFeaturesJArray(): JArray =
JArray(model.resultFeatures.map(_.uid).map(JString).toList)
JArray(model.getResultFeatures().map(_.uid).map(JString).toList)

private def blacklistFeaturesJArray(): JArray =
JArray(model.blacklistedFeatures.map(_.uid).map(JString).toList)
JArray(model.getBlacklist().map(_.uid).map(JString).toList)

private def blacklistMapKeys(): JObject =
JObject(model.getBlacklistMapKeys().map { case (k, vs) => k -> JArray(vs.map(JString).toList) }.toList)

/**
* Serialize all the workflow model stages
Expand All @@ -98,7 +102,7 @@ class OpWorkflowModelWriter(val model: OpWorkflowModel) extends MLWriter {
* @return array of serialized stages
*/
private def stagesJArray(path: String): JArray = {
val stages: Seq[OpPipelineStageBase] = model.stages
val stages: Seq[OpPipelineStageBase] = model.getStages()
val stagesJson: Seq[JObject] = stages
.map(_.write.asInstanceOf[OpPipelineStageWriter].writeToJson(path))
.filter(_.children.nonEmpty)
Expand All @@ -112,7 +116,9 @@ class OpWorkflowModelWriter(val model: OpWorkflowModel) extends MLWriter {
* @return all features to be serialized
*/
private def allFeaturesJArray: JArray = {
val features = model.rawFeatures ++ model.stages.flatMap(s => s.getInputFeatures()) ++ model.resultFeatures
val features = model.getRawFeatures() ++
model.getStages().flatMap(_.getInputFeatures()) ++
model.getResultFeatures()
JArray(features.distinct.map(FeatureJsonHelper.toJson).toList)
}

Expand All @@ -137,6 +143,7 @@ private[op] object OpWorkflowModelReadWriteShared {
case object Uid extends FieldNames("uid")
case object ResultFeaturesUids extends FieldNames("resultFeaturesUids")
case object BlacklistedFeaturesUids extends FieldNames("blacklistedFeaturesUids")
case object BlacklistedMapKeys extends FieldNames("blacklistedMapKeys")
case object Stages extends FieldNames("stages")
case object AllFeatures extends FieldNames("allFeatures")
case object Parameters extends FieldNames("parameters")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,12 +428,12 @@ class ModelInsightsTest extends FlatSpec with PassengerSparkFixtureTest with Dou
it should "have feature insights for features that are removed by the raw feature filter" in {
val insights = modelWithRFF.modelInsights(predWithMaps)

modelWithRFF.blacklistedFeatures should contain theSameElementsAs Array(age, description, genderPL, weight)
modelWithRFF.getBlacklist() should contain theSameElementsAs Array(age, description, genderPL, weight)
val heightIn = insights.features.find(_.featureName == age.name).get
heightIn.derivedFeatures.size shouldBe 1
heightIn.derivedFeatures.head.excluded shouldBe Some(true)

modelWithRFF.blacklistedMapKeys should contain theSameElementsAs Map(numericMap.name -> Set("Female"))
modelWithRFF.getBlacklistMapKeys() should contain theSameElementsAs Map(numericMap.name -> Set("Female"))
val mapDerivedIn = insights.features.find(_.featureName == numericMap.name).get.derivedFeatures
val droppedMapDerivedIn = mapDerivedIn.filter(_.derivedFeatureName == "Female")
mapDerivedIn.size shouldBe 3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,12 @@ class OpWorkflowModelReaderWriterTest


def makeDummyModel(wf: OpWorkflow): OpWorkflowModel = {
val model = new OpWorkflowModel(wf.uid, wf.parameters)
.setStages(wf.stages)
.setFeatures(wf.resultFeatures)
.setParameters(wf.parameters)
new OpWorkflowModel(wf.uid, wf.getParameters())
.setStages(wf.getStages())
.setFeatures(wf.getResultFeatures())
.setParameters(wf.getParameters())
.setRawFeatureFilterResults(rawFeatureFilterResults)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a getRawFeatureFilterResults()?


model.setReader(wf.reader.get)
.setReader(wf.getReader())
}

def makeModelAndJson(wf: OpWorkflow): (OpWorkflowModel, JValue) = {
Expand Down Expand Up @@ -253,14 +252,21 @@ class OpWorkflowModelReaderWriterTest
compareWorkflowModels(model, wfMR)
}

trait VectorizedFlow extends UIDReset {
trait OldVectorizedFlow extends UIDReset {
val cat = Seq(gender, boarded, height, age, description).transmogrify()
val catHead = cat.map[Real](v => Real(v.value.toArray.headOption))
val wf = new OpWorkflow()
.setParameters(workflowParams)
.setResultFeatures(catHead)
}

trait VectorizedFlow extends UIDReset {
val catHead = rawFeatures.transmogrify().map[Real](v => Real(v.value.toArray.headOption))
val wf = new OpWorkflow()
.setParameters(workflowParams)
.setResultFeatures(catHead)
}

it should "load workflow model with vectorized feature" in new VectorizedFlow {
wf.setReader(dataReader)
val wfM = wf.train()
Expand All @@ -269,48 +275,56 @@ class OpWorkflowModelReaderWriterTest
compareWorkflowModels(wfMR, wfM)
}

it should "save a workflow model that has a RawFeatureFilter" in new VectorizedFlow {
wf.withRawFeatureFilter(Some(dataReader), None, minFillRate = 0.8)
it should "save a workflow model that has a RawFeatureFilter with correct blacklists" in new VectorizedFlow {
wf.withRawFeatureFilter(trainingReader = Some(dataReader), scoringReader = Some(simpleReader),
bins = 10, minFillRate = 0.1, maxFillDifference = 0.1, maxFillRatioDiff = 2,
maxJSDivergence = 0.2, maxCorrelation = 0.9, minScoringRows = 0
)
val wfM = wf.train()
wfM.save(saveFlowPathStable)
wf.getBlacklist().map(_.name) should contain theSameElementsAs Array("age", "description")
wf.getBlacklist().map(_.name) should contain theSameElementsAs
Array("age", "boarded", "description", "gender", "height", "weight")
wf.getBlacklistMapKeys() shouldBe
Map("booleanMap" -> Set("Male"), "stringMap" -> Set("Male"), "numericMap" -> Set("Male"))

val wfMR = wf.loadModel(saveFlowPathStable)
wfMR.getBlacklist().map(_.name) should contain theSameElementsAs Array("age", "description")
compareWorkflowModels(wfM, wfMR)
}

it should "load a workflow model that has a RawFeatureFilter and a different workflow" in new VectorizedFlow {
val wfM = wf.loadModel(saveFlowPathStable)
wf.getResultFeatures().head.name shouldBe wfM.getResultFeatures().head.name
wf.getResultFeatures().head.history().originFeatures should contain theSameElementsAs
Array("age", "boarded", "description", "gender", "height")
Array("age", "boarded", "booleanMap", "description", "gender", "height", "numericMap",
"stringMap", "survived", "weight")
wfM.getResultFeatures().head.history().originFeatures should contain theSameElementsAs
Array("boarded", "gender", "height")
wfM.getBlacklist().map(_.name) should contain theSameElementsAs Array("age", "description")
Array("booleanMap", "numericMap", "stringMap", "survived")
wfM.getBlacklist().map(_.name) should contain theSameElementsAs
Array("age", "boarded", "description", "gender", "height", "weight")
}

it should "load model and allow copying it" in new VectorizedFlow {
val wfM = wf.loadModel(saveFlowPathStable)
val wfM = wf.loadModel(saveFlowPathStable).setReader(dataReader)
val copy = wfM.copy()
copy.uid shouldBe wfM.uid
copy.trainingParams.toString shouldBe wfM.trainingParams.toString
copy.isWorkflowCV shouldBe wfM.isWorkflowCV
copy.reader shouldBe wfM.reader
copy.resultFeatures shouldBe wfM.resultFeatures
copy.rawFeatures shouldBe wfM.rawFeatures
copy.blacklistedFeatures shouldBe wfM.blacklistedFeatures
copy.blacklistedMapKeys shouldBe wfM.blacklistedMapKeys
copy.rawFeatureFilterResults shouldBe wfM.rawFeatureFilterResults
copy.stages.map(_.uid) shouldBe wfM.stages.map(_.uid)
copy.parameters.toString shouldBe wfM.parameters.toString
copy.getReader() shouldBe wfM.getReader()
copy.getResultFeatures() shouldBe wfM.getResultFeatures()
copy.getRawFeatures() shouldBe wfM.getRawFeatures()
copy.getBlacklist() shouldBe wfM.getBlacklist()
copy.getBlacklistMapKeys() shouldBe wfM.getBlacklistMapKeys()
copy.getRawFeatureFilterResults() shouldBe wfM.getRawFeatureFilterResults()
copy.getStages().map(_.uid) shouldBe wfM.getStages().map(_.uid)
copy.getParameters().toString shouldBe wfM.getParameters().toString
}

it should "be able to load a old version of a saved model" in new VectorizedFlow {
it should "be able to load a old version of a saved model" in new OldVectorizedFlow {
val wfM = wf.loadModel("src/test/resources/OldModelVersion")
wfM.getBlacklist().isEmpty shouldBe true
}

it should "be able to load a old version of a saved model (v0.5.1)" in new VectorizedFlow {
it should "be able to load a old version of a saved model (v0.5.1)" in new OldVectorizedFlow {
// note: in these old models, raw feature filter config will be set to the config defaults
// but we never re-initialize raw feature filter when loading a model (only scoring, no training)
val wfM = wf.loadModel("src/test/resources/OldModelVersion_0_5_1")
Expand Down Expand Up @@ -347,22 +361,24 @@ class OpWorkflowModelReaderWriterTest

def compareWorkflows(wf1: OpWorkflow, wf2: OpWorkflow): Unit = {
wf1.uid shouldBe wf2.uid
compareParams(wf1.parameters, wf2.parameters)
compareFeatures(wf1.resultFeatures, wf2.resultFeatures)
compareFeatures(wf1.blacklistedFeatures, wf2.blacklistedFeatures)
compareFeatures(wf1.rawFeatures, wf2.rawFeatures)
compareStages(wf1.stages, wf2.stages)
compareParams(wf1.getParameters(), wf2.getParameters())
compareFeatures(wf1.getResultFeatures(), wf2.getResultFeatures())
compareFeatures(wf1.getBlacklist(), wf2.getBlacklist())
compareFeatures(wf1.getRawFeatures(), wf2.getRawFeatures())
compareStages(wf1.getStages(), wf2.getStages())
wf1.getBlacklistMapKeys() shouldBe wf2.getBlacklistMapKeys()
RawFeatureFilterResultsComparison.compare(wf1.getRawFeatureFilterResults(), wf2.getRawFeatureFilterResults())
}

def compareWorkflowModels(wf1: OpWorkflowModel, wf2: OpWorkflowModel): Unit = {
wf1.uid shouldBe wf2.uid
compareParams(wf1.trainingParams, wf2.trainingParams)
compareParams(wf1.parameters, wf2.parameters)
compareFeatures(wf1.resultFeatures, wf2.resultFeatures)
compareFeatures(wf1.blacklistedFeatures, wf2.blacklistedFeatures)
compareFeatures(wf1.rawFeatures, wf2.rawFeatures)
compareStages(wf1.stages, wf2.stages)
compareParams(wf1.getParameters(), wf2.getParameters())
compareFeatures(wf1.getResultFeatures(), wf2.getResultFeatures())
compareFeatures(wf1.getBlacklist(), wf2.getBlacklist())
compareFeatures(wf1.getRawFeatures(), wf2.getRawFeatures())
compareStages(wf1.getStages(), wf2.getStages())
wf1.getBlacklistMapKeys() shouldBe wf2.getBlacklistMapKeys()
RawFeatureFilterResultsComparison.compare(wf1.getRawFeatureFilterResults(), wf2.getRawFeatureFilterResults())
}

Expand Down
Loading