From 84e2387acdd793f5eec23793506d67c5820fd469 Mon Sep 17 00:00:00 2001 From: mweilsalesforce Date: Wed, 3 Apr 2019 09:42:55 -0700 Subject: [PATCH 01/14] TopK Strategy Param --- .../impl/insights/RecordInsightsLOCO.scala | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala index 3cf80b0996..1814f77c78 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala @@ -37,10 +37,11 @@ import com.salesforce.op.stages.impl.selector.SelectedModel import com.salesforce.op.stages.sparkwrappers.specific.OpPredictorWrapperModel import com.salesforce.op.stages.sparkwrappers.specific.SparkModelConverter._ import com.salesforce.op.utils.spark.OpVectorMetadata +import enumeratum.{Enum, EnumEntry} import org.apache.spark.annotation.Experimental import org.apache.spark.ml.Model import org.apache.spark.ml.linalg.Vectors -import org.apache.spark.ml.param.IntParam +import org.apache.spark.ml.param.{IntParam, Param} import scala.collection.mutable.PriorityQueue @@ -64,6 +65,13 @@ class RecordInsightsLOCO[T <: Model[T]] def setTopK(value: Int): this.type = set(topK, value) def getTopK: Int = $(topK) setDefault(topK -> 20) + + final val topKStrategy = new Param[String](parent = this, name = "topKStrategy", + doc = "Whether returning topK based on absolute value or topKpositive and negatives" + ) + def setTopKStrategy(strat: TopKStrategy): this.type = set(topKStrategy, strat.entryName) + + setDefault(topKStrategy, TopKStrategy.Abs.entryName) private val modelApply = model match { case m: SelectedModel => m.transformFn @@ -109,3 +117,14 @@ private[insights] object MinScore extends Ordering[(Int, Double, Array[Double])] def compare(x: (Int, Double, Array[Double]), y: (Int, Double, Array[Double])): Int = math.abs(y._2) compare math.abs(x._2) } + +sealed abstract class TopKStrategy(val name: String) extends EnumEntry with Serializable + +object TopKStrategy extends Enum[TopKStrategy] { + val values = findValues + + case object Abs extends TopKStrategy("abs") + + case object PositiveNegative extends TopKStrategy("positive and negative") + +} From 9fa76e5d638d37b56927382aece2a0885839203e Mon Sep 17 00:00:00 2001 From: mweilsalesforce Date: Wed, 3 Apr 2019 10:56:09 -0700 Subject: [PATCH 02/14] Changing TransformFn --- .../impl/insights/RecordInsightsLOCO.scala | 107 +++++++++++++++--- 1 file changed, 92 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala index 1814f77c78..a5e882492a 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala @@ -33,7 +33,7 @@ package com.salesforce.op.stages.impl.insights import com.salesforce.op.UID import com.salesforce.op.features.types._ import com.salesforce.op.stages.base.unary.UnaryTransformer -import com.salesforce.op.stages.impl.selector.SelectedModel +import com.salesforce.op.stages.impl.selector.{ProblemType, SelectedModel} import com.salesforce.op.stages.sparkwrappers.specific.OpPredictorWrapperModel import com.salesforce.op.stages.sparkwrappers.specific.SparkModelConverter._ import com.salesforce.op.utils.spark.OpVectorMetadata @@ -65,7 +65,7 @@ class RecordInsightsLOCO[T <: Model[T]] def setTopK(value: Int): this.type = set(topK, value) def getTopK: Int = $(topK) setDefault(topK -> 20) - + final val topKStrategy = new Param[String](parent = this, name = "topKStrategy", doc = "Whether returning topK based on absolute value or topKpositive and negatives" ) @@ -82,6 +82,22 @@ class RecordInsightsLOCO[T <: Model[T]] private lazy val featureInfo = OpVectorMetadata(getInputSchema()(in1.name)).getColumnHistory().map(_.toJson(false)) + private val vectorDummy = Array.fill(featureInfo.length)(0.0).toOPVector + + private val problemType = modelApply(labelDummy, vectorDummy).score.length match { + case 0 => ProblemType.Unknown + case 1 => ProblemType.Regression + case 2 => ProblemType.BinaryClassification + case n if(n > 2) => ProblemType.MultiClassification + } + + private def computeDiffs(i: Int, oldInd: Int, featureArray: Array[(Int, Double)], featureSize: Int, + baseScore: Array[Double]): Array[Double] = { + featureArray.update(i, (oldInd, 0)) + val score = modelApply(labelDummy, OPVector(Vectors.sparse(featureSize, featureArray))).score + val diffs = baseScore.zip(score).map { case (b, s) => b - s } + diffs + } override def transformFn: OPVector => TextMap = (features) => { val baseScore = modelApply(labelDummy, features).score val maxHeap = PriorityQueue.empty(MinScore) @@ -94,30 +110,91 @@ class RecordInsightsLOCO[T <: Model[T]] val k = $(topK) var i = 0 - while (i < filledSize) { - val (oldInd, oldVal) = featureArray(i) - featureArray.update(i, (oldInd, 0)) - val score = modelApply(labelDummy, OPVector(Vectors.sparse(featureSize, featureArray))).score - val diffs = baseScore.zip(score).map{ case (b, s) => b - s } - val max = diffs.maxBy(math.abs) - maxHeap.enqueue((i, max, diffs)) - if (i >= k) maxHeap.dequeue() - featureArray.update(i, (oldInd, oldVal)) - i += 1 + val top = $(topKStrategy) match { + case s if s == TopKStrategy.Abs.entryName => { + while (i < filledSize) { + val absoluteMaxHeap = PriorityQueue.empty(AbsScore) + var count = 0 + val (oldInd, oldVal) = featureArray(i) + featureArray.update(i, (oldInd, 0)) + val score = modelApply(labelDummy, OPVector(Vectors.sparse(featureSize, featureArray))).score + val diffs = baseScore.zip(score).map { case (b, s) => b - s } + val max = diffs.maxBy(math.abs) + if (max != 0) { // Not keeping LOCOs with value 0.0 + absoluteMaxHeap.enqueue((i, max, diffs)) + count += 1 + if (count > k) absoluteMaxHeap.dequeue() + } + featureArray.update(i, (oldInd, oldVal)) + i += 1 + } + + val topAbs = maxHeap.dequeueAll + topAbs.sortBy { case (_, v, _) => -math.abs(v) } + + } + case s if s == TopKStrategy.PositiveNegative.entryName => { + // Heap that will contain the top K positive LOCO values + val positiveMaxHeap = PriorityQueue.empty(MinScore) + // Heap that will contain the top K negative LOCO values + val negativeMaxHeap = PriorityQueue.empty(MaxScore) + // for each element of the feature vector != 0.0 + // Size of positive heap + var positiveCount = 0 + // Size of negative heap + var negativeCount = 0 + for {i <- 0 until filledSize} { + val (oldInd, oldVal) = featureArray(i) + val diffs = computeDiffs(i, oldInd, featureArray, featureSize, baseScore) + val max = if (problemType == ProblemType.Regression) diffs(0) else diffs(1) + + if (max > 0.0) { // if positive LOCO then add it to positive heap + positiveMaxHeap.enqueue((i, max, diffs)) + positiveCount += 1 + if (positiveCount > k) { // remove the lowest element if the heap size goes from 5 to 6 + positiveMaxHeap.dequeue() + } + } else if (max < 0.0) { // if negative LOCO then add it to negative heap + negativeMaxHeap.enqueue((i, max, diffs)) + negativeCount += 1 + if (negativeCount > k) { // remove the highest element if the heap size goes from 5 to 6 + negativeMaxHeap.dequeue() + } // Not keeping LOCOs with value 0 + } + featureArray.update(i, (oldInd, oldVal)) + } + val topPositive = positiveMaxHeap.dequeueAll + val topNegative = negativeMaxHeap.dequeueAll + (topPositive ++ topNegative).sortBy { case (_, v, _) => -v } + } } - - val top = maxHeap.dequeueAll top.map{ case (k, _, v) => RecordInsightsParser.insightToText(featureInfo(featureArray(k)._1), v) } .toMap.toTextMap } } -private[insights] object MinScore extends Ordering[(Int, Double, Array[Double])] { +private[insights] object AbsScore extends Ordering[(Int, Double, Array[Double])] { def compare(x: (Int, Double, Array[Double]), y: (Int, Double, Array[Double])): Int = math.abs(y._2) compare math.abs(x._2) } + /** + * Ordering of the heap that removes lowest score + */ + private object MinScore extends Ordering[(Int, Double, Array[Double])] { + def compare(x: (Int, Double, Array[Double]), y: (Int, Double, Array[Double])): Int = + y._2 compare x._2 + } + + /** + * Ordering of the heap that removes highest score + */ + private object MaxScore extends Ordering[(Int, Double, Array[Double])] { + def compare(x: (Int, Double, Array[Double]), y: (Int, Double, Array[Double])): Int = + x._2 compare y._2 + } + sealed abstract class TopKStrategy(val name: String) extends EnumEntry with Serializable object TopKStrategy extends Enum[TopKStrategy] { From 9bce44f0bfe81255f2cfca02e342a6aa606794f0 Mon Sep 17 00:00:00 2001 From: mweilsalesforce Date: Wed, 3 Apr 2019 12:31:16 -0700 Subject: [PATCH 03/14] Fix tests --- .../impl/insights/RecordInsightsLOCO.scala | 57 +++++++++++-------- .../insights/RecordInsightsLOCOTest.scala | 4 +- 2 files changed, 34 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala index a5e882492a..dc936c21a9 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala @@ -48,8 +48,9 @@ import scala.collection.mutable.PriorityQueue /** * Creates record level insights for model predictions. Takes the model to explain as a constructor argument. * The input feature is the feature vector fed into the model. - * @param model model instance that you wish to explain - * @param uid uid for instance + * + * @param model model instance that you wish to explain + * @param uid uid for instance */ @Experimental class RecordInsightsLOCO[T <: Model[T]] @@ -62,13 +63,17 @@ class RecordInsightsLOCO[T <: Model[T]] parent = this, name = "topK", doc = "Number of insights to keep for each record" ) + def setTopK(value: Int): this.type = set(topK, value) + def getTopK: Int = $(topK) + setDefault(topK -> 20) final val topKStrategy = new Param[String](parent = this, name = "topKStrategy", doc = "Whether returning topK based on absolute value or topKpositive and negatives" ) + def setTopKStrategy(strat: TopKStrategy): this.type = set(topKStrategy, strat.entryName) setDefault(topKStrategy, TopKStrategy.Abs.entryName) @@ -82,13 +87,13 @@ class RecordInsightsLOCO[T <: Model[T]] private lazy val featureInfo = OpVectorMetadata(getInputSchema()(in1.name)).getColumnHistory().map(_.toJson(false)) - private val vectorDummy = Array.fill(featureInfo.length)(0.0).toOPVector + private lazy val vectorDummy = Array.fill(featureInfo.length)(0.0).toOPVector - private val problemType = modelApply(labelDummy, vectorDummy).score.length match { + private lazy val problemType = modelApply(labelDummy, vectorDummy).score.length match { case 0 => ProblemType.Unknown case 1 => ProblemType.Regression case 2 => ProblemType.BinaryClassification - case n if(n > 2) => ProblemType.MultiClassification + case n if (n > 2) => ProblemType.MultiClassification } private def computeDiffs(i: Int, oldInd: Int, featureArray: Array[(Int, Double)], featureSize: Int, @@ -98,9 +103,9 @@ class RecordInsightsLOCO[T <: Model[T]] val diffs = baseScore.zip(score).map { case (b, s) => b - s } diffs } + override def transformFn: OPVector => TextMap = (features) => { val baseScore = modelApply(labelDummy, features).score - val maxHeap = PriorityQueue.empty(MinScore) // TODO sparse implementation only works if changing values to zero - use dense vector to test effect of zeros val featuresSparse = features.value.toSparse @@ -112,9 +117,9 @@ class RecordInsightsLOCO[T <: Model[T]] var i = 0 val top = $(topKStrategy) match { case s if s == TopKStrategy.Abs.entryName => { + val absoluteMaxHeap = PriorityQueue.empty(AbsScore) + var count = 0 while (i < filledSize) { - val absoluteMaxHeap = PriorityQueue.empty(AbsScore) - var count = 0 val (oldInd, oldVal) = featureArray(i) featureArray.update(i, (oldInd, 0)) val score = modelApply(labelDummy, OPVector(Vectors.sparse(featureSize, featureArray))).score @@ -129,7 +134,9 @@ class RecordInsightsLOCO[T <: Model[T]] i += 1 } - val topAbs = maxHeap.dequeueAll + val topAbs = absoluteMaxHeap.dequeueAll + + topAbs.sortBy { case (_, v, _) => -math.abs(v) } } @@ -168,7 +175,9 @@ class RecordInsightsLOCO[T <: Model[T]] (topPositive ++ topNegative).sortBy { case (_, v, _) => -v } } } - top.map{ case (k, _, v) => RecordInsightsParser.insightToText(featureInfo(featureArray(k)._1), v) } + + + top.map { case (k, _, v) => RecordInsightsParser.insightToText(featureInfo(featureArray(k)._1), v) } .toMap.toTextMap } } @@ -179,21 +188,21 @@ private[insights] object AbsScore extends Ordering[(Int, Double, Array[Double])] math.abs(y._2) compare math.abs(x._2) } - /** - * Ordering of the heap that removes lowest score - */ - private object MinScore extends Ordering[(Int, Double, Array[Double])] { - def compare(x: (Int, Double, Array[Double]), y: (Int, Double, Array[Double])): Int = - y._2 compare x._2 - } +/** + * Ordering of the heap that removes lowest score + */ +private object MinScore extends Ordering[(Int, Double, Array[Double])] { + def compare(x: (Int, Double, Array[Double]), y: (Int, Double, Array[Double])): Int = + y._2 compare x._2 +} - /** - * Ordering of the heap that removes highest score - */ - private object MaxScore extends Ordering[(Int, Double, Array[Double])] { - def compare(x: (Int, Double, Array[Double]), y: (Int, Double, Array[Double])): Int = - x._2 compare y._2 - } +/** + * Ordering of the heap that removes highest score + */ +private object MaxScore extends Ordering[(Int, Double, Array[Double])] { + def compare(x: (Int, Double, Array[Double]), y: (Int, Double, Array[Double])): Int = + x._2 compare y._2 +} sealed abstract class TopKStrategy(val name: String) extends EnumEntry with Serializable diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala index d73dae904e..2d34898f1f 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala @@ -218,9 +218,7 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext { // while currency can have either two (if it's null since the currency column will be filled with the mean) or just // one if it's not null. parsed.length shouldBe numRows - parsed.foreach(m => if (m.keySet.count(_.columnName.contains("currency_NullIndicatorValue")) > 0) { - m.size shouldBe 4 - } else m.size shouldBe 3) + parsed.foreach(m => m.size <= 4 shouldBe true) // Want to check the average contribution strengths for each picklist response and compare them to the // average contribution strengths of the other features. We should have a very high contribution when choices From 128f62ab9d8b0c27c7fe213c05ccc2f34c01b661 Mon Sep 17 00:00:00 2001 From: mweilsalesforce Date: Wed, 3 Apr 2019 13:47:14 -0700 Subject: [PATCH 04/14] Add more tests --- .../impl/insights/RecordInsightsLOCO.scala | 17 ++- .../insights/RecordInsightsLOCOTest.scala | 114 +++++++++++------- 2 files changed, 82 insertions(+), 49 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala index dc936c21a9..13bc4a287e 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala @@ -49,6 +49,11 @@ import scala.collection.mutable.PriorityQueue * Creates record level insights for model predictions. Takes the model to explain as a constructor argument. * The input feature is the feature vector fed into the model. * + * The map's contents are different regarding the value of the topKStrategy param (only for Binary Classification + * and Regression) : + * - If PositiveNegative, returns at most 2 * topK elements : the topK most positive and the topK most negative + * derived features based on the LOCO insight. + * - If Abs, returns at most topK elements : the topK derived features having highest absolute value of LOCO score. * @param model model instance that you wish to explain * @param uid uid for instance */ @@ -71,7 +76,8 @@ class RecordInsightsLOCO[T <: Model[T]] setDefault(topK -> 20) final val topKStrategy = new Param[String](parent = this, name = "topKStrategy", - doc = "Whether returning topK based on absolute value or topKpositive and negatives" + doc = "Whether returning topK based on absolute value or topK positives and negatives. Only for Binary " + + "Classification and Regression." ) def setTopKStrategy(strat: TopKStrategy): this.type = set(topKStrategy, strat.entryName) @@ -89,11 +95,14 @@ class RecordInsightsLOCO[T <: Model[T]] private lazy val vectorDummy = Array.fill(featureInfo.length)(0.0).toOPVector - private lazy val problemType = modelApply(labelDummy, vectorDummy).score.length match { + private[insights] lazy val problemType = modelApply(labelDummy, vectorDummy).score.length match { case 0 => ProblemType.Unknown case 1 => ProblemType.Regression case 2 => ProblemType.BinaryClassification - case n if (n > 2) => ProblemType.MultiClassification + case n if (n > 2) => { + log.info("MultiClassification Problem : Top K LOCOs by absolute value") + ProblemType.MultiClassification + } } private def computeDiffs(i: Int, oldInd: Int, featureArray: Array[(Int, Double)], featureSize: Int, @@ -116,7 +125,7 @@ class RecordInsightsLOCO[T <: Model[T]] val k = $(topK) var i = 0 val top = $(topKStrategy) match { - case s if s == TopKStrategy.Abs.entryName => { + case s if s == TopKStrategy.Abs.entryName || problemType == ProblemType.MultiClassification => { val absoluteMaxHeap = PriorityQueue.empty(AbsScore) var count = 0 while (i < filledSize) { diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala index 2d34898f1f..44cfbcb6a6 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala @@ -33,14 +33,15 @@ package com.salesforce.op.stages.impl.insights import com.salesforce.op.{FeatureHistory, OpWorkflow} import com.salesforce.op.features.types._ import com.salesforce.op.stages.impl.classification.{OpLogisticRegression, OpRandomForestClassifier} +import com.salesforce.op.stages.impl.insights.TopKStrategy.PositiveNegative import com.salesforce.op.stages.impl.preparators.SanityCheckDataTest import com.salesforce.op.stages.impl.regression.OpLinearRegression +import com.salesforce.op.stages.impl.selector.ProblemType import com.salesforce.op.stages.sparkwrappers.generic.SparkWrapperParams import com.salesforce.op.test.{TestFeatureBuilder, TestSparkContext} import com.salesforce.op.testkit.{RandomIntegral, RandomReal, RandomText, RandomVector} import com.salesforce.op.utils.spark.RichDataset._ import com.salesforce.op.utils.spark.{OpVectorColumnMetadata, OpVectorMetadata} -import org.apache.spark.ml.classification.LogisticRegressionModel import org.apache.spark.ml.regression.LinearRegressionModel import org.apache.spark.sql.DataFrame import org.apache.spark.sql.types.StructType @@ -52,6 +53,42 @@ import org.scalatest.junit.JUnitRunner @RunWith(classOf[JUnitRunner]) class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext { + // scalastyle:off + val data = Seq( // name, age, height, height_null, isBlueEyed, gender, testFeatNegCor + SanityCheckDataTest("a", 32, 5.0, 0, 0.9, 0.5, 0), + SanityCheckDataTest("b", 32, 4.0, 1, 0.1, 0, 0.1), + SanityCheckDataTest("a", 32, 6.0, 0, 0.8, 0.5, 0), + SanityCheckDataTest("a", 32, 5.5, 0, 0.85, 0.5, 0), + SanityCheckDataTest("b", 32, 5.4, 1, 0.05, 0, 0.1), + SanityCheckDataTest("b", 32, 5.4, 1, 0.2, 0, 0.1), + SanityCheckDataTest("a", 32, 5.0, 0, 0.99, 0.5, 0), + SanityCheckDataTest("b", 32, 4.0, 0, 0.0, 0, 0.1), + SanityCheckDataTest("a", 32, 6.0, 1, 0.7, 0.5, 0), + SanityCheckDataTest("a", 32, 5.5, 0, 0.8, 0.5, 0), + SanityCheckDataTest("b", 32, 5.4, 1, 0.1, 0, 0.1), + SanityCheckDataTest("b", 32, 5.4, 1, 0.05, 0, 0.1), + SanityCheckDataTest("a", 32, 5.0, 0, 1, 0.5, 0), + SanityCheckDataTest("b", 32, 4.0, 1, 0.1, 0, 0.1), + SanityCheckDataTest("a", 32, 6.0, 1, 0.9, 0.5, 0), + SanityCheckDataTest("a", 32, 5.5, 0, 1, 0.5, 0), + SanityCheckDataTest("b", 32, 5.4, 1, 0.2, 0, 0.1), + SanityCheckDataTest("b", 32, 5.4, 1, 0.3, 0, 0.1), + SanityCheckDataTest("a", 32, 5.0, 0, 0.6, 0.5, 0), + SanityCheckDataTest("b", 32, 4.0, 1, 0.1, 0, 0.1), + SanityCheckDataTest("a", 32, 6.0, 0, 0.9, 0.5, 0), + SanityCheckDataTest("a", 32, 5.5, 0, 1, 0.5, 0), + SanityCheckDataTest("b", 32, 5.4, 1, 0.05, 0, 0.1), + SanityCheckDataTest("b", 32, 5.4, 1, 0.3, 0, 0.1), + SanityCheckDataTest("b", 32, 5.4, 1, 0.05, 0, 0.1), + SanityCheckDataTest("b", 32, 5.4, 1, 0.4, 0, 0.1) + ).map(data => + ( + data.name.toText, + data.height_null.toRealNN, + Seq(data.age, data.height, data.isBlueEyed, data.gender, data.testFeatNegCor).toOPVector + ) + ) + // scalastyle:on Spec[RecordInsightsLOCO[_]] should "work with randomly generated features and binary logistic regression" in { val features = RandomVector.sparse(RandomReal.normal(), 40).limit(1000) val labels = RandomIntegral.integrals(0, 2).limit(1000).map(_.value.get.toRealNN) @@ -63,6 +100,8 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext { // val model = sparkModel.getSparkMlStage().get val insightsTransformer = new RecordInsightsLOCO(sparkModel).setInput(f1) val insights = insightsTransformer.transform(dfWithMeta).collect(insightsTransformer.getOutput()) + insightsTransformer.problemType shouldBe ProblemType.BinaryClassification + insights.foreach(_.value.size shouldBe 20) val parsed = insights.map(RecordInsightsParser.parseInsights) parsed.map( _.count{ case (_, v) => v.exists(_._1 == 1) } shouldBe 20 ) // number insights per pred column @@ -78,7 +117,10 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext { val sparkModel = new OpRandomForestClassifier().setInput(l1r, f1).fit(df) val insightsTransformer = new RecordInsightsLOCO(sparkModel).setInput(f1).setTopK(2) + .setTopKStrategy(PositiveNegative) + val insights = insightsTransformer.transform(dfWithMeta).collect(insightsTransformer.getOutput()) + insightsTransformer.problemType shouldBe ProblemType.MultiClassification insights.foreach(_.value.size shouldBe 2) val parsed = insights.map(RecordInsightsParser.parseInsights) parsed.map( _.count{ case (_, v) => v.exists(_._1 == 5) } shouldBe 0 ) // no 6th column of insights @@ -102,6 +144,7 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext { val insightsTransformer = new RecordInsightsLOCO(model).setInput(f1) val insights = insightsTransformer.transform(dfWithMeta).collect(insightsTransformer.getOutput()) + insightsTransformer.problemType shouldBe ProblemType.Regression insights.foreach(_.value.size shouldBe 20) val parsed = insights.map(RecordInsightsParser.parseInsights) parsed.foreach(_.values.foreach(i => i.foreach(v => v._1 shouldBe 0))) // has only one pred column @@ -121,59 +164,40 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext { it should "return the most predictive features" in { - // scalastyle:off - val data = Seq( // name, age, height, height_null, isBlueEyed, gender, testFeatNegCor - SanityCheckDataTest("a", 32, 5.0, 0, 0.9, 0.5, 0), - SanityCheckDataTest("b", 32, 4.0, 1, 0.1, 0, 0.1), - SanityCheckDataTest("a", 32, 6.0, 0, 0.8, 0.5, 0), - SanityCheckDataTest("a", 32, 5.5, 0, 0.85, 0.5, 0), - SanityCheckDataTest("b", 32, 5.4, 1, 0.05, 0, 0.1), - SanityCheckDataTest("b", 32, 5.4, 1, 0.2, 0, 0.1), - SanityCheckDataTest("a", 32, 5.0, 0, 0.99, 0.5, 0), - SanityCheckDataTest("b", 32, 4.0, 0, 0.0, 0, 0.1), - SanityCheckDataTest("a", 32, 6.0, 1, 0.7, 0.5, 0), - SanityCheckDataTest("a", 32, 5.5, 0, 0.8, 0.5, 0), - SanityCheckDataTest("b", 32, 5.4, 1, 0.1, 0, 0.1), - SanityCheckDataTest("b", 32, 5.4, 1, 0.05, 0, 0.1), - SanityCheckDataTest("a", 32, 5.0, 0, 1, 0.5, 0), - SanityCheckDataTest("b", 32, 4.0, 1, 0.1, 0, 0.1), - SanityCheckDataTest("a", 32, 6.0, 1, 0.9, 0.5, 0), - SanityCheckDataTest("a", 32, 5.5, 0, 1, 0.5, 0), - SanityCheckDataTest("b", 32, 5.4, 1, 0.2, 0, 0.1), - SanityCheckDataTest("b", 32, 5.4, 1, 0.3, 0, 0.1), - SanityCheckDataTest("a", 32, 5.0, 0, 0.6, 0.5, 0), - SanityCheckDataTest("b", 32, 4.0, 1, 0.1, 0, 0.1), - SanityCheckDataTest("a", 32, 6.0, 0, 0.9, 0.5, 0), - SanityCheckDataTest("a", 32, 5.5, 0, 1, 0.5, 0), - SanityCheckDataTest("b", 32, 5.4, 1, 0.05, 0, 0.1), - SanityCheckDataTest("b", 32, 5.4, 1, 0.3, 0, 0.1), - SanityCheckDataTest("b", 32, 5.4, 1, 0.05, 0, 0.1), - SanityCheckDataTest("b", 32, 5.4, 1, 0.4, 0, 0.1) - ).map(data => - ( - data.name.toText, - data.height_null.toRealNN, - Seq(data.age, data.height, data.isBlueEyed, data.gender, data.testFeatNegCor).toOPVector - ) - ) - // scalastyle:on + val (testData, name, labelNoRes, featureVector) = TestFeatureBuilder("name", "label", "features", data) val label = labelNoRes.copy(isResponse = true) val testDataMeta = addMetaData(testData, "features", 5) val sparkModel = new OpLogisticRegression().setInput(label, featureVector).fit(testData) - val model = sparkModel.asInstanceOf[SparkWrapperParams[_]].getSparkMlStage().get - .asInstanceOf[LogisticRegressionModel] - val transformer = new RecordInsightsLOCO(model).setInput(featureVector) - val insights = transformer.setTopK(1).transform(testDataMeta) - val parsed = insights.collect(name, transformer.getOutput()) - .map { case (n, i) => n -> RecordInsightsParser.parseInsights(i) } + val transformer = new RecordInsightsLOCO(sparkModel).setInput(featureVector) + + val insights = transformer.setTopK(1).transform(testDataMeta).collect(transformer.getOutput()) + val parsed = insights.map(RecordInsightsParser.parseInsights) // the highest corr that value that is not zero should be the top feature - parsed.foreach { case (_, in) => Set("3_3_3_3", "1_1_1_1").contains(in.head._1.columnName) shouldBe true } + parsed.foreach { case in => Set("3_3_3_3", "1_1_1_1").contains(in.head._1.columnName) shouldBe true } // the scores should be the same but opposite in sign - parsed.foreach { case (_, in) => math.abs(in.head._2(0)._2 + in.head._2(1)._2) < 0.00001 shouldBe true } + parsed.foreach { case in => math.abs(in.head._2(0)._2 + in.head._2(1)._2) < 0.00001 shouldBe true } + } + + it should "return the most predictive features when using top K Positives + top K negatives strat" in { + val (testData, name, labelNoRes, featureVector) = TestFeatureBuilder("name", "label", "features", data) + val label = labelNoRes.copy(isResponse = true) + val testDataMeta = addMetaData(testData, "features", 5) + val sparkModel = new OpLogisticRegression().setInput(label, featureVector).fit(testData) + + + val transformer = new RecordInsightsLOCO(sparkModel).setTopKStrategy(TopKStrategy.PositiveNegative) + .setInput(featureVector) + + val insights = transformer.transform(testDataMeta) + val parsed = insights.collect(name, transformer.getOutput()) + .map { case (n, i) => n -> RecordInsightsParser.parseInsights(i) } + parsed.foreach { case (_, in) => + in.head._1.columnName == "1_1_1_1" || in.last._1.columnName == "3_3_3_3" shouldBe true + } } it should "return the most predictive features for data generated with a strong relation to the label" in { From 97bb4131493d6c34694e22d09b26f734193d5337 Mon Sep 17 00:00:00 2001 From: mweilsalesforce Date: Thu, 4 Apr 2019 10:16:42 -0700 Subject: [PATCH 05/14] strat -> strategy --- .../salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala index 13bc4a287e..1ca932879e 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala @@ -80,7 +80,7 @@ class RecordInsightsLOCO[T <: Model[T]] "Classification and Regression." ) - def setTopKStrategy(strat: TopKStrategy): this.type = set(topKStrategy, strat.entryName) + def setTopKStrategy(strategy: TopKStrategy): this.type = set(topKStrategy, strategy.entryName) setDefault(topKStrategy, TopKStrategy.Abs.entryName) From 0cec1c99c04d3d6ab4d6aa77a32d9f1e74a7a5db Mon Sep 17 00:00:00 2001 From: mweilsalesforce Date: Thu, 4 Apr 2019 10:44:50 -0700 Subject: [PATCH 06/14] getTopKStrategy + method for each strat --- .../impl/insights/RecordInsightsLOCO.scala | 130 ++++++++++-------- 1 file changed, 70 insertions(+), 60 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala index 1ca932879e..dc375b328e 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala @@ -82,6 +82,8 @@ class RecordInsightsLOCO[T <: Model[T]] def setTopKStrategy(strategy: TopKStrategy): this.type = set(topKStrategy, strategy.entryName) + def getTopKStrategy: TopKStrategy = TopKStrategy.withName($(topKStrategy)) + setDefault(topKStrategy, TopKStrategy.Abs.entryName) private val modelApply = model match { @@ -113,6 +115,69 @@ class RecordInsightsLOCO[T <: Model[T]] diffs } + private def returnTopAbs(filledSize: Int, featureArray: Array[(Int, Double)], featureSize: Int, + baseScore: Array[Double], k: Int): Seq[(Int, Double, Array[Double])] = { + var i = 0 + val absoluteMaxHeap = PriorityQueue.empty(AbsScore) + var count = 0 + while (i < filledSize) { + val (oldInd, oldVal) = featureArray(i) + featureArray.update(i, (oldInd, 0)) + val score = modelApply(labelDummy, OPVector(Vectors.sparse(featureSize, featureArray))).score + val diffs = baseScore.zip(score).map { case (b, s) => b - s } + val max = diffs.maxBy(math.abs) + if (max != 0) { // Not keeping LOCOs with value 0.0 + absoluteMaxHeap.enqueue((i, max, diffs)) + count += 1 + if (count > k) absoluteMaxHeap.dequeue() + } + featureArray.update(i, (oldInd, oldVal)) + i += 1 + } + + val topAbs = absoluteMaxHeap.dequeueAll + + + topAbs.sortBy { case (_, v, _) => -math.abs(v) } + + } + + private def returnTopPosNeg(filledSize: Int, featureArray: Array[(Int, Double)], featureSize: Int, + baseScore: Array[Double], k: Int): Seq[(Int, Double, Array[Double])] = { + // Heap that will contain the top K positive LOCO values + val positiveMaxHeap = PriorityQueue.empty(MinScore) + // Heap that will contain the top K negative LOCO values + val negativeMaxHeap = PriorityQueue.empty(MaxScore) + // for each element of the feature vector != 0.0 + // Size of positive heap + var positiveCount = 0 + // Size of negative heap + var negativeCount = 0 + for {i <- 0 until filledSize} { + val (oldInd, oldVal) = featureArray(i) + val diffs = computeDiffs(i, oldInd, featureArray, featureSize, baseScore) + val max = if (problemType == ProblemType.Regression) diffs(0) else diffs(1) + + if (max > 0.0) { // if positive LOCO then add it to positive heap + positiveMaxHeap.enqueue((i, max, diffs)) + positiveCount += 1 + if (positiveCount > k) { // remove the lowest element if the heap size goes from 5 to 6 + positiveMaxHeap.dequeue() + } + } else if (max < 0.0) { // if negative LOCO then add it to negative heap + negativeMaxHeap.enqueue((i, max, diffs)) + negativeCount += 1 + if (negativeCount > k) { // remove the highest element if the heap size goes from 5 to 6 + negativeMaxHeap.dequeue() + } // Not keeping LOCOs with value 0 + } + featureArray.update(i, (oldInd, oldVal)) + } + val topPositive = positiveMaxHeap.dequeueAll + val topNegative = negativeMaxHeap.dequeueAll + (topPositive ++ topNegative).sortBy { case (_, v, _) => -v } + } + override def transformFn: OPVector => TextMap = (features) => { val baseScore = modelApply(labelDummy, features).score @@ -123,66 +188,11 @@ class RecordInsightsLOCO[T <: Model[T]] val featureSize = featuresSparse.size val k = $(topK) - var i = 0 - val top = $(topKStrategy) match { - case s if s == TopKStrategy.Abs.entryName || problemType == ProblemType.MultiClassification => { - val absoluteMaxHeap = PriorityQueue.empty(AbsScore) - var count = 0 - while (i < filledSize) { - val (oldInd, oldVal) = featureArray(i) - featureArray.update(i, (oldInd, 0)) - val score = modelApply(labelDummy, OPVector(Vectors.sparse(featureSize, featureArray))).score - val diffs = baseScore.zip(score).map { case (b, s) => b - s } - val max = diffs.maxBy(math.abs) - if (max != 0) { // Not keeping LOCOs with value 0.0 - absoluteMaxHeap.enqueue((i, max, diffs)) - count += 1 - if (count > k) absoluteMaxHeap.dequeue() - } - featureArray.update(i, (oldInd, oldVal)) - i += 1 - } - - val topAbs = absoluteMaxHeap.dequeueAll - - - topAbs.sortBy { case (_, v, _) => -math.abs(v) } - - } - case s if s == TopKStrategy.PositiveNegative.entryName => { - // Heap that will contain the top K positive LOCO values - val positiveMaxHeap = PriorityQueue.empty(MinScore) - // Heap that will contain the top K negative LOCO values - val negativeMaxHeap = PriorityQueue.empty(MaxScore) - // for each element of the feature vector != 0.0 - // Size of positive heap - var positiveCount = 0 - // Size of negative heap - var negativeCount = 0 - for {i <- 0 until filledSize} { - val (oldInd, oldVal) = featureArray(i) - val diffs = computeDiffs(i, oldInd, featureArray, featureSize, baseScore) - val max = if (problemType == ProblemType.Regression) diffs(0) else diffs(1) - - if (max > 0.0) { // if positive LOCO then add it to positive heap - positiveMaxHeap.enqueue((i, max, diffs)) - positiveCount += 1 - if (positiveCount > k) { // remove the lowest element if the heap size goes from 5 to 6 - positiveMaxHeap.dequeue() - } - } else if (max < 0.0) { // if negative LOCO then add it to negative heap - negativeMaxHeap.enqueue((i, max, diffs)) - negativeCount += 1 - if (negativeCount > k) { // remove the highest element if the heap size goes from 5 to 6 - negativeMaxHeap.dequeue() - } // Not keeping LOCOs with value 0 - } - featureArray.update(i, (oldInd, oldVal)) - } - val topPositive = positiveMaxHeap.dequeueAll - val topNegative = negativeMaxHeap.dequeueAll - (topPositive ++ topNegative).sortBy { case (_, v, _) => -v } - } + val top = getTopKStrategy match { + case s if s == TopKStrategy.Abs || problemType == ProblemType.MultiClassification => returnTopAbs(filledSize, + featureArray, featureSize, baseScore, k) + case s if s == TopKStrategy.PositiveNegative => returnTopPosNeg(filledSize, featureArray, featureSize, baseScore, + k) } From 756cf0c6f4ca7b5311dd343d188608d17fe7860b Mon Sep 17 00:00:00 2001 From: mweilsalesforce Date: Thu, 4 Apr 2019 10:46:35 -0700 Subject: [PATCH 07/14] use while - it's faster --- .../op/stages/impl/insights/RecordInsightsLOCO.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala index dc375b328e..a3df89ba2b 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala @@ -153,7 +153,8 @@ class RecordInsightsLOCO[T <: Model[T]] var positiveCount = 0 // Size of negative heap var negativeCount = 0 - for {i <- 0 until filledSize} { + var i = 0 + while (i < filledSize) { val (oldInd, oldVal) = featureArray(i) val diffs = computeDiffs(i, oldInd, featureArray, featureSize, baseScore) val max = if (problemType == ProblemType.Regression) diffs(0) else diffs(1) @@ -172,6 +173,7 @@ class RecordInsightsLOCO[T <: Model[T]] } // Not keeping LOCOs with value 0 } featureArray.update(i, (oldInd, oldVal)) + i += 1 } val topPositive = positiveMaxHeap.dequeueAll val topNegative = negativeMaxHeap.dequeueAll From f3de950fa07c66dc6cf8bbb76650e98371adbeb3 Mon Sep 17 00:00:00 2001 From: mweilsalesforce Date: Thu, 4 Apr 2019 11:01:28 -0700 Subject: [PATCH 08/14] Removing redundant lines --- .../op/stages/impl/insights/RecordInsightsLOCOTest.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala index 44cfbcb6a6..a000d9d049 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala @@ -171,7 +171,6 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext { val testDataMeta = addMetaData(testData, "features", 5) val sparkModel = new OpLogisticRegression().setInput(label, featureVector).fit(testData) - val transformer = new RecordInsightsLOCO(sparkModel).setInput(featureVector) val insights = transformer.setTopK(1).transform(testDataMeta).collect(transformer.getOutput()) @@ -187,11 +186,8 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext { val label = labelNoRes.copy(isResponse = true) val testDataMeta = addMetaData(testData, "features", 5) val sparkModel = new OpLogisticRegression().setInput(label, featureVector).fit(testData) - - val transformer = new RecordInsightsLOCO(sparkModel).setTopKStrategy(TopKStrategy.PositiveNegative) .setInput(featureVector) - val insights = transformer.transform(testDataMeta) val parsed = insights.collect(name, transformer.getOutput()) .map { case (n, i) => n -> RecordInsightsParser.parseInsights(i) } From 19a9199767e972c12d9c5103c47110f3216948ae Mon Sep 17 00:00:00 2001 From: mweilsalesforce Date: Thu, 4 Apr 2019 11:32:45 -0700 Subject: [PATCH 09/14] IndexToExamine --- .../impl/insights/RecordInsightsLOCO.scala | 45 +++++++++---------- .../insights/RecordInsightsLOCOTest.scala | 5 --- 2 files changed, 21 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala index a3df89ba2b..2112188c1e 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala @@ -33,7 +33,7 @@ package com.salesforce.op.stages.impl.insights import com.salesforce.op.UID import com.salesforce.op.features.types._ import com.salesforce.op.stages.base.unary.UnaryTransformer -import com.salesforce.op.stages.impl.selector.{ProblemType, SelectedModel} +import com.salesforce.op.stages.impl.selector.SelectedModel import com.salesforce.op.stages.sparkwrappers.specific.OpPredictorWrapperModel import com.salesforce.op.stages.sparkwrappers.specific.SparkModelConverter._ import com.salesforce.op.utils.spark.OpVectorMetadata @@ -52,7 +52,8 @@ import scala.collection.mutable.PriorityQueue * The map's contents are different regarding the value of the topKStrategy param (only for Binary Classification * and Regression) : * - If PositiveNegative, returns at most 2 * topK elements : the topK most positive and the topK most negative - * derived features based on the LOCO insight. + * derived features based on the LOCO insight.For MultiClassification, the value is from the predicted class + * (i.e. the class having the highest probability) * - If Abs, returns at most topK elements : the topK derived features having highest absolute value of LOCO score. * @param model model instance that you wish to explain * @param uid uid for instance @@ -76,8 +77,8 @@ class RecordInsightsLOCO[T <: Model[T]] setDefault(topK -> 20) final val topKStrategy = new Param[String](parent = this, name = "topKStrategy", - doc = "Whether returning topK based on absolute value or topK positives and negatives. Only for Binary " + - "Classification and Regression." + doc = "Whether returning topK based on absolute value or topK positives and negatives. For MultiClassification," + + " the value is from the predicted class (i.e. the class having the highest probability)" ) def setTopKStrategy(strategy: TopKStrategy): this.type = set(topKStrategy, strategy.entryName) @@ -95,18 +96,6 @@ class RecordInsightsLOCO[T <: Model[T]] private lazy val featureInfo = OpVectorMetadata(getInputSchema()(in1.name)).getColumnHistory().map(_.toJson(false)) - private lazy val vectorDummy = Array.fill(featureInfo.length)(0.0).toOPVector - - private[insights] lazy val problemType = modelApply(labelDummy, vectorDummy).score.length match { - case 0 => ProblemType.Unknown - case 1 => ProblemType.Regression - case 2 => ProblemType.BinaryClassification - case n if (n > 2) => { - log.info("MultiClassification Problem : Top K LOCOs by absolute value") - ProblemType.MultiClassification - } - } - private def computeDiffs(i: Int, oldInd: Int, featureArray: Array[(Int, Double)], featureSize: Int, baseScore: Array[Double]): Array[Double] = { featureArray.update(i, (oldInd, 0)) @@ -143,7 +132,7 @@ class RecordInsightsLOCO[T <: Model[T]] } private def returnTopPosNeg(filledSize: Int, featureArray: Array[(Int, Double)], featureSize: Int, - baseScore: Array[Double], k: Int): Seq[(Int, Double, Array[Double])] = { + baseScore: Array[Double], k: Int, indexToExamine: Int): Seq[(Int, Double, Array[Double])] = { // Heap that will contain the top K positive LOCO values val positiveMaxHeap = PriorityQueue.empty(MinScore) // Heap that will contain the top K negative LOCO values @@ -157,7 +146,7 @@ class RecordInsightsLOCO[T <: Model[T]] while (i < filledSize) { val (oldInd, oldVal) = featureArray(i) val diffs = computeDiffs(i, oldInd, featureArray, featureSize, baseScore) - val max = if (problemType == ProblemType.Regression) diffs(0) else diffs(1) + val max = diffs(indexToExamine) if (max > 0.0) { // if positive LOCO then add it to positive heap positiveMaxHeap.enqueue((i, max, diffs)) @@ -181,8 +170,9 @@ class RecordInsightsLOCO[T <: Model[T]] } override def transformFn: OPVector => TextMap = (features) => { - val baseScore = modelApply(labelDummy, features).score - + val baseResult = modelApply(labelDummy, features) + val baseScore = baseResult.score + modelApply(labelDummy, features).prediction // TODO sparse implementation only works if changing values to zero - use dense vector to test effect of zeros val featuresSparse = features.value.toSparse val featureArray = featuresSparse.indices.zip(featuresSparse.values) @@ -191,10 +181,17 @@ class RecordInsightsLOCO[T <: Model[T]] val k = $(topK) val top = getTopKStrategy match { - case s if s == TopKStrategy.Abs || problemType == ProblemType.MultiClassification => returnTopAbs(filledSize, - featureArray, featureSize, baseScore, k) - case s if s == TopKStrategy.PositiveNegative => returnTopPosNeg(filledSize, featureArray, featureSize, baseScore, - k) + case s if s == TopKStrategy.Abs => returnTopAbs(filledSize, featureArray, featureSize, baseScore, k) + case s if s == TopKStrategy.PositiveNegative => { + val indexToExamine = baseScore.length match { + case 0 => throw new RuntimeException("model does not produce scores for insights") + case 1 => 0 + case 2 => 1 + case n if (n > 2) => baseResult.prediction.toInt + } + returnTopPosNeg(filledSize, featureArray, featureSize, baseScore, + k, indexToExamine) + } } diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala index a000d9d049..6c52a189c1 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala @@ -36,7 +36,6 @@ import com.salesforce.op.stages.impl.classification.{OpLogisticRegression, OpRan import com.salesforce.op.stages.impl.insights.TopKStrategy.PositiveNegative import com.salesforce.op.stages.impl.preparators.SanityCheckDataTest import com.salesforce.op.stages.impl.regression.OpLinearRegression -import com.salesforce.op.stages.impl.selector.ProblemType import com.salesforce.op.stages.sparkwrappers.generic.SparkWrapperParams import com.salesforce.op.test.{TestFeatureBuilder, TestSparkContext} import com.salesforce.op.testkit.{RandomIntegral, RandomReal, RandomText, RandomVector} @@ -100,7 +99,6 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext { // val model = sparkModel.getSparkMlStage().get val insightsTransformer = new RecordInsightsLOCO(sparkModel).setInput(f1) val insights = insightsTransformer.transform(dfWithMeta).collect(insightsTransformer.getOutput()) - insightsTransformer.problemType shouldBe ProblemType.BinaryClassification insights.foreach(_.value.size shouldBe 20) val parsed = insights.map(RecordInsightsParser.parseInsights) @@ -117,10 +115,8 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext { val sparkModel = new OpRandomForestClassifier().setInput(l1r, f1).fit(df) val insightsTransformer = new RecordInsightsLOCO(sparkModel).setInput(f1).setTopK(2) - .setTopKStrategy(PositiveNegative) val insights = insightsTransformer.transform(dfWithMeta).collect(insightsTransformer.getOutput()) - insightsTransformer.problemType shouldBe ProblemType.MultiClassification insights.foreach(_.value.size shouldBe 2) val parsed = insights.map(RecordInsightsParser.parseInsights) parsed.map( _.count{ case (_, v) => v.exists(_._1 == 5) } shouldBe 0 ) // no 6th column of insights @@ -144,7 +140,6 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext { val insightsTransformer = new RecordInsightsLOCO(model).setInput(f1) val insights = insightsTransformer.transform(dfWithMeta).collect(insightsTransformer.getOutput()) - insightsTransformer.problemType shouldBe ProblemType.Regression insights.foreach(_.value.size shouldBe 20) val parsed = insights.map(RecordInsightsParser.parseInsights) parsed.foreach(_.values.foreach(i => i.foreach(v => v._1 shouldBe 0))) // has only one pred column From ea0069d17efce662d2f5d3cf880628723c2834fb Mon Sep 17 00:00:00 2001 From: mweilsalesforce Date: Thu, 4 Apr 2019 13:21:05 -0700 Subject: [PATCH 10/14] posNeg then Absolute --- .../impl/insights/RecordInsightsLOCO.scala | 55 ++++--------------- 1 file changed, 10 insertions(+), 45 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala index 2112188c1e..a73be73e21 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala @@ -104,33 +104,6 @@ class RecordInsightsLOCO[T <: Model[T]] diffs } - private def returnTopAbs(filledSize: Int, featureArray: Array[(Int, Double)], featureSize: Int, - baseScore: Array[Double], k: Int): Seq[(Int, Double, Array[Double])] = { - var i = 0 - val absoluteMaxHeap = PriorityQueue.empty(AbsScore) - var count = 0 - while (i < filledSize) { - val (oldInd, oldVal) = featureArray(i) - featureArray.update(i, (oldInd, 0)) - val score = modelApply(labelDummy, OPVector(Vectors.sparse(featureSize, featureArray))).score - val diffs = baseScore.zip(score).map { case (b, s) => b - s } - val max = diffs.maxBy(math.abs) - if (max != 0) { // Not keeping LOCOs with value 0.0 - absoluteMaxHeap.enqueue((i, max, diffs)) - count += 1 - if (count > k) absoluteMaxHeap.dequeue() - } - featureArray.update(i, (oldInd, oldVal)) - i += 1 - } - - val topAbs = absoluteMaxHeap.dequeueAll - - - topAbs.sortBy { case (_, v, _) => -math.abs(v) } - - } - private def returnTopPosNeg(filledSize: Int, featureArray: Array[(Int, Double)], featureSize: Int, baseScore: Array[Double], k: Int, indexToExamine: Int): Seq[(Int, Double, Array[Double])] = { // Heap that will contain the top K positive LOCO values @@ -166,7 +139,7 @@ class RecordInsightsLOCO[T <: Model[T]] } val topPositive = positiveMaxHeap.dequeueAll val topNegative = negativeMaxHeap.dequeueAll - (topPositive ++ topNegative).sortBy { case (_, v, _) => -v } + (topPositive ++ topNegative) } override def transformFn: OPVector => TextMap = (features) => { @@ -180,32 +153,24 @@ class RecordInsightsLOCO[T <: Model[T]] val featureSize = featuresSparse.size val k = $(topK) + val indexToExamine = baseScore.length match { + case 0 => throw new RuntimeException("model does not produce scores for insights") + case 1 => 0 + case 2 => 1 + case n if (n > 2) => baseResult.prediction.toInt + } + val topPosNeg = returnTopPosNeg(filledSize, featureArray, featureSize, baseScore, k, indexToExamine) val top = getTopKStrategy match { - case s if s == TopKStrategy.Abs => returnTopAbs(filledSize, featureArray, featureSize, baseScore, k) - case s if s == TopKStrategy.PositiveNegative => { - val indexToExamine = baseScore.length match { - case 0 => throw new RuntimeException("model does not produce scores for insights") - case 1 => 0 - case 2 => 1 - case n if (n > 2) => baseResult.prediction.toInt - } - returnTopPosNeg(filledSize, featureArray, featureSize, baseScore, - k, indexToExamine) - } + case s if s == TopKStrategy.Abs => topPosNeg.sortBy { case (_, v, _) => -math.abs(v) }.take(k) + case s if s == TopKStrategy.PositiveNegative => topPosNeg.sortBy { case (_, v, _) => -v } } - top.map { case (k, _, v) => RecordInsightsParser.insightToText(featureInfo(featureArray(k)._1), v) } .toMap.toTextMap } } -private[insights] object AbsScore extends Ordering[(Int, Double, Array[Double])] { - def compare(x: (Int, Double, Array[Double]), y: (Int, Double, Array[Double])): Int = - math.abs(y._2) compare math.abs(x._2) -} - /** * Ordering of the heap that removes lowest score */ From fba47ef2942df30ac9f7d3f6c1465b517a31a97c Mon Sep 17 00:00:00 2001 From: mweilsalesforce Date: Thu, 4 Apr 2019 13:33:06 -0700 Subject: [PATCH 11/14] Adding withClue --- .../impl/insights/RecordInsightsLOCOTest.scala | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala index 6c52a189c1..9d51d58b18 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala @@ -158,9 +158,6 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext { } it should "return the most predictive features" in { - - - val (testData, name, labelNoRes, featureVector) = TestFeatureBuilder("name", "label", "features", data) val label = labelNoRes.copy(isResponse = true) val testDataMeta = addMetaData(testData, "features", 5) @@ -171,9 +168,13 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext { val insights = transformer.setTopK(1).transform(testDataMeta).collect(transformer.getOutput()) val parsed = insights.map(RecordInsightsParser.parseInsights) // the highest corr that value that is not zero should be the top feature - parsed.foreach { case in => Set("3_3_3_3", "1_1_1_1").contains(in.head._1.columnName) shouldBe true } - // the scores should be the same but opposite in sign - parsed.foreach { case in => math.abs(in.head._2(0)._2 + in.head._2(1)._2) < 0.00001 shouldBe true } + parsed.foreach { case in => + withClue(s"top features : ${in.map(_._1.columnName)}") { + Set("3_3_3_3", "1_1_1_1").contains(in.head._1.columnName) shouldBe true + // the scores should be the same but opposite in sign + math.abs(in.head._2(0)._2 + in.head._2(1)._2) < 0.00001 shouldBe true + } + } } it should "return the most predictive features when using top K Positives + top K negatives strat" in { @@ -187,7 +188,9 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext { val parsed = insights.collect(name, transformer.getOutput()) .map { case (n, i) => n -> RecordInsightsParser.parseInsights(i) } parsed.foreach { case (_, in) => - in.head._1.columnName == "1_1_1_1" || in.last._1.columnName == "3_3_3_3" shouldBe true + withClue(s"top features : ${in.map(_._1.columnName)}") { + in.head._1.columnName == "1_1_1_1" || in.last._1.columnName == "3_3_3_3" shouldBe true + } } } From 21a653f843018ab1adffcc026d0dde2908537626 Mon Sep 17 00:00:00 2001 From: mweilsalesforce Date: Thu, 4 Apr 2019 15:21:29 -0700 Subject: [PATCH 12/14] diffToExamine --- .../op/stages/impl/insights/RecordInsightsLOCO.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala index a73be73e21..5e169d70d4 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala @@ -118,17 +118,17 @@ class RecordInsightsLOCO[T <: Model[T]] var i = 0 while (i < filledSize) { val (oldInd, oldVal) = featureArray(i) - val diffs = computeDiffs(i, oldInd, featureArray, featureSize, baseScore) - val max = diffs(indexToExamine) + val diffToExamine = computeDiffs(i, oldInd, featureArray, featureSize, baseScore) + val max = diffToExamine(indexToExamine) if (max > 0.0) { // if positive LOCO then add it to positive heap - positiveMaxHeap.enqueue((i, max, diffs)) + positiveMaxHeap.enqueue((i, max, diffToExamine)) positiveCount += 1 if (positiveCount > k) { // remove the lowest element if the heap size goes from 5 to 6 positiveMaxHeap.dequeue() } } else if (max < 0.0) { // if negative LOCO then add it to negative heap - negativeMaxHeap.enqueue((i, max, diffs)) + negativeMaxHeap.enqueue((i, max, diffToExamine)) negativeCount += 1 if (negativeCount > k) { // remove the highest element if the heap size goes from 5 to 6 negativeMaxHeap.dequeue() From dac78ba5f42a65acf42b258cfc7c35f18dbf8647 Mon Sep 17 00:00:00 2001 From: mweilsalesforce Date: Thu, 4 Apr 2019 15:35:42 -0700 Subject: [PATCH 13/14] Minor changes --- .../op/stages/impl/insights/RecordInsightsLOCO.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala index 5e169d70d4..be707c13ff 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala @@ -96,11 +96,12 @@ class RecordInsightsLOCO[T <: Model[T]] private lazy val featureInfo = OpVectorMetadata(getInputSchema()(in1.name)).getColumnHistory().map(_.toJson(false)) - private def computeDiffs(i: Int, oldInd: Int, featureArray: Array[(Int, Double)], featureSize: Int, + private def computeDiffs(i: Int, oldInd: Int, oldVal: Double, featureArray: Array[(Int, Double)], featureSize: Int, baseScore: Array[Double]): Array[Double] = { featureArray.update(i, (oldInd, 0)) val score = modelApply(labelDummy, OPVector(Vectors.sparse(featureSize, featureArray))).score val diffs = baseScore.zip(score).map { case (b, s) => b - s } + featureArray.update(i, (oldInd, oldVal)) diffs } @@ -118,7 +119,7 @@ class RecordInsightsLOCO[T <: Model[T]] var i = 0 while (i < filledSize) { val (oldInd, oldVal) = featureArray(i) - val diffToExamine = computeDiffs(i, oldInd, featureArray, featureSize, baseScore) + val diffToExamine = computeDiffs(i, oldInd, oldVal, featureArray, featureSize, baseScore) val max = diffToExamine(indexToExamine) if (max > 0.0) { // if positive LOCO then add it to positive heap @@ -134,7 +135,6 @@ class RecordInsightsLOCO[T <: Model[T]] negativeMaxHeap.dequeue() } // Not keeping LOCOs with value 0 } - featureArray.update(i, (oldInd, oldVal)) i += 1 } val topPositive = positiveMaxHeap.dequeueAll @@ -145,7 +145,7 @@ class RecordInsightsLOCO[T <: Model[T]] override def transformFn: OPVector => TextMap = (features) => { val baseResult = modelApply(labelDummy, features) val baseScore = baseResult.score - modelApply(labelDummy, features).prediction + // TODO sparse implementation only works if changing values to zero - use dense vector to test effect of zeros val featuresSparse = features.value.toSparse val featureArray = featuresSparse.indices.zip(featuresSparse.values) From a137d61840ea7a742bb92cf5b25d63331a86273f Mon Sep 17 00:00:00 2001 From: mweilsalesforce Date: Fri, 5 Apr 2019 11:43:01 -0700 Subject: [PATCH 14/14] Nicer Pattern match --- .../op/stages/impl/insights/RecordInsightsLOCO.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala index be707c13ff..a74c22d59f 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala @@ -161,8 +161,8 @@ class RecordInsightsLOCO[T <: Model[T]] } val topPosNeg = returnTopPosNeg(filledSize, featureArray, featureSize, baseScore, k, indexToExamine) val top = getTopKStrategy match { - case s if s == TopKStrategy.Abs => topPosNeg.sortBy { case (_, v, _) => -math.abs(v) }.take(k) - case s if s == TopKStrategy.PositiveNegative => topPosNeg.sortBy { case (_, v, _) => -v } + case TopKStrategy.Abs => topPosNeg.sortBy { case (_, v, _) => -math.abs(v) }.take(k) + case TopKStrategy.PositiveNegative => topPosNeg.sortBy { case (_, v, _) => -v } } top.map { case (k, _, v) => RecordInsightsParser.insightToText(featureInfo(featureArray(k)._1), v) }