diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala index 3cf80b0996..a74c22d59f 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala @@ -37,18 +37,26 @@ import com.salesforce.op.stages.impl.selector.SelectedModel import com.salesforce.op.stages.sparkwrappers.specific.OpPredictorWrapperModel import com.salesforce.op.stages.sparkwrappers.specific.SparkModelConverter._ import com.salesforce.op.utils.spark.OpVectorMetadata +import enumeratum.{Enum, EnumEntry} import org.apache.spark.annotation.Experimental import org.apache.spark.ml.Model import org.apache.spark.ml.linalg.Vectors -import org.apache.spark.ml.param.IntParam +import org.apache.spark.ml.param.{IntParam, Param} import scala.collection.mutable.PriorityQueue /** * Creates record level insights for model predictions. Takes the model to explain as a constructor argument. * The input feature is the feature vector fed into the model. - * @param model model instance that you wish to explain - * @param uid uid for instance + * + * The map's contents are different regarding the value of the topKStrategy param (only for Binary Classification + * and Regression) : + * - If PositiveNegative, returns at most 2 * topK elements : the topK most positive and the topK most negative + * derived features based on the LOCO insight.For MultiClassification, the value is from the predicted class + * (i.e. the class having the highest probability) + * - If Abs, returns at most topK elements : the topK derived features having highest absolute value of LOCO score. + * @param model model instance that you wish to explain + * @param uid uid for instance */ @Experimental class RecordInsightsLOCO[T <: Model[T]] @@ -61,10 +69,24 @@ class RecordInsightsLOCO[T <: Model[T]] parent = this, name = "topK", doc = "Number of insights to keep for each record" ) + def setTopK(value: Int): this.type = set(topK, value) + def getTopK: Int = $(topK) + setDefault(topK -> 20) + final val topKStrategy = new Param[String](parent = this, name = "topKStrategy", + doc = "Whether returning topK based on absolute value or topK positives and negatives. For MultiClassification," + + " the value is from the predicted class (i.e. the class having the highest probability)" + ) + + def setTopKStrategy(strategy: TopKStrategy): this.type = set(topKStrategy, strategy.entryName) + + def getTopKStrategy: TopKStrategy = TopKStrategy.withName($(topKStrategy)) + + setDefault(topKStrategy, TopKStrategy.Abs.entryName) + private val modelApply = model match { case m: SelectedModel => m.transformFn case m: OpPredictorWrapperModel[_] => m.transformFn @@ -74,9 +96,55 @@ class RecordInsightsLOCO[T <: Model[T]] private lazy val featureInfo = OpVectorMetadata(getInputSchema()(in1.name)).getColumnHistory().map(_.toJson(false)) + private def computeDiffs(i: Int, oldInd: Int, oldVal: Double, featureArray: Array[(Int, Double)], featureSize: Int, + baseScore: Array[Double]): Array[Double] = { + featureArray.update(i, (oldInd, 0)) + val score = modelApply(labelDummy, OPVector(Vectors.sparse(featureSize, featureArray))).score + val diffs = baseScore.zip(score).map { case (b, s) => b - s } + featureArray.update(i, (oldInd, oldVal)) + diffs + } + + private def returnTopPosNeg(filledSize: Int, featureArray: Array[(Int, Double)], featureSize: Int, + baseScore: Array[Double], k: Int, indexToExamine: Int): Seq[(Int, Double, Array[Double])] = { + // Heap that will contain the top K positive LOCO values + val positiveMaxHeap = PriorityQueue.empty(MinScore) + // Heap that will contain the top K negative LOCO values + val negativeMaxHeap = PriorityQueue.empty(MaxScore) + // for each element of the feature vector != 0.0 + // Size of positive heap + var positiveCount = 0 + // Size of negative heap + var negativeCount = 0 + var i = 0 + while (i < filledSize) { + val (oldInd, oldVal) = featureArray(i) + val diffToExamine = computeDiffs(i, oldInd, oldVal, featureArray, featureSize, baseScore) + val max = diffToExamine(indexToExamine) + + if (max > 0.0) { // if positive LOCO then add it to positive heap + positiveMaxHeap.enqueue((i, max, diffToExamine)) + positiveCount += 1 + if (positiveCount > k) { // remove the lowest element if the heap size goes from 5 to 6 + positiveMaxHeap.dequeue() + } + } else if (max < 0.0) { // if negative LOCO then add it to negative heap + negativeMaxHeap.enqueue((i, max, diffToExamine)) + negativeCount += 1 + if (negativeCount > k) { // remove the highest element if the heap size goes from 5 to 6 + negativeMaxHeap.dequeue() + } // Not keeping LOCOs with value 0 + } + i += 1 + } + val topPositive = positiveMaxHeap.dequeueAll + val topNegative = negativeMaxHeap.dequeueAll + (topPositive ++ topNegative) + } + override def transformFn: OPVector => TextMap = (features) => { - val baseScore = modelApply(labelDummy, features).score - val maxHeap = PriorityQueue.empty(MinScore) + val baseResult = modelApply(labelDummy, features) + val baseScore = baseResult.score // TODO sparse implementation only works if changing values to zero - use dense vector to test effect of zeros val featuresSparse = features.value.toSparse @@ -85,27 +153,47 @@ class RecordInsightsLOCO[T <: Model[T]] val featureSize = featuresSparse.size val k = $(topK) - var i = 0 - while (i < filledSize) { - val (oldInd, oldVal) = featureArray(i) - featureArray.update(i, (oldInd, 0)) - val score = modelApply(labelDummy, OPVector(Vectors.sparse(featureSize, featureArray))).score - val diffs = baseScore.zip(score).map{ case (b, s) => b - s } - val max = diffs.maxBy(math.abs) - maxHeap.enqueue((i, max, diffs)) - if (i >= k) maxHeap.dequeue() - featureArray.update(i, (oldInd, oldVal)) - i += 1 + val indexToExamine = baseScore.length match { + case 0 => throw new RuntimeException("model does not produce scores for insights") + case 1 => 0 + case 2 => 1 + case n if (n > 2) => baseResult.prediction.toInt + } + val topPosNeg = returnTopPosNeg(filledSize, featureArray, featureSize, baseScore, k, indexToExamine) + val top = getTopKStrategy match { + case TopKStrategy.Abs => topPosNeg.sortBy { case (_, v, _) => -math.abs(v) }.take(k) + case TopKStrategy.PositiveNegative => topPosNeg.sortBy { case (_, v, _) => -v } } - val top = maxHeap.dequeueAll - top.map{ case (k, _, v) => RecordInsightsParser.insightToText(featureInfo(featureArray(k)._1), v) } + top.map { case (k, _, v) => RecordInsightsParser.insightToText(featureInfo(featureArray(k)._1), v) } .toMap.toTextMap } } -private[insights] object MinScore extends Ordering[(Int, Double, Array[Double])] { +/** + * Ordering of the heap that removes lowest score + */ +private object MinScore extends Ordering[(Int, Double, Array[Double])] { + def compare(x: (Int, Double, Array[Double]), y: (Int, Double, Array[Double])): Int = + y._2 compare x._2 +} + +/** + * Ordering of the heap that removes highest score + */ +private object MaxScore extends Ordering[(Int, Double, Array[Double])] { def compare(x: (Int, Double, Array[Double]), y: (Int, Double, Array[Double])): Int = - math.abs(y._2) compare math.abs(x._2) + x._2 compare y._2 +} + +sealed abstract class TopKStrategy(val name: String) extends EnumEntry with Serializable + +object TopKStrategy extends Enum[TopKStrategy] { + val values = findValues + + case object Abs extends TopKStrategy("abs") + + case object PositiveNegative extends TopKStrategy("positive and negative") + } diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala index d73dae904e..9d51d58b18 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala @@ -33,6 +33,7 @@ package com.salesforce.op.stages.impl.insights import com.salesforce.op.{FeatureHistory, OpWorkflow} import com.salesforce.op.features.types._ import com.salesforce.op.stages.impl.classification.{OpLogisticRegression, OpRandomForestClassifier} +import com.salesforce.op.stages.impl.insights.TopKStrategy.PositiveNegative import com.salesforce.op.stages.impl.preparators.SanityCheckDataTest import com.salesforce.op.stages.impl.regression.OpLinearRegression import com.salesforce.op.stages.sparkwrappers.generic.SparkWrapperParams @@ -40,7 +41,6 @@ import com.salesforce.op.test.{TestFeatureBuilder, TestSparkContext} import com.salesforce.op.testkit.{RandomIntegral, RandomReal, RandomText, RandomVector} import com.salesforce.op.utils.spark.RichDataset._ import com.salesforce.op.utils.spark.{OpVectorColumnMetadata, OpVectorMetadata} -import org.apache.spark.ml.classification.LogisticRegressionModel import org.apache.spark.ml.regression.LinearRegressionModel import org.apache.spark.sql.DataFrame import org.apache.spark.sql.types.StructType @@ -52,6 +52,42 @@ import org.scalatest.junit.JUnitRunner @RunWith(classOf[JUnitRunner]) class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext { + // scalastyle:off + val data = Seq( // name, age, height, height_null, isBlueEyed, gender, testFeatNegCor + SanityCheckDataTest("a", 32, 5.0, 0, 0.9, 0.5, 0), + SanityCheckDataTest("b", 32, 4.0, 1, 0.1, 0, 0.1), + SanityCheckDataTest("a", 32, 6.0, 0, 0.8, 0.5, 0), + SanityCheckDataTest("a", 32, 5.5, 0, 0.85, 0.5, 0), + SanityCheckDataTest("b", 32, 5.4, 1, 0.05, 0, 0.1), + SanityCheckDataTest("b", 32, 5.4, 1, 0.2, 0, 0.1), + SanityCheckDataTest("a", 32, 5.0, 0, 0.99, 0.5, 0), + SanityCheckDataTest("b", 32, 4.0, 0, 0.0, 0, 0.1), + SanityCheckDataTest("a", 32, 6.0, 1, 0.7, 0.5, 0), + SanityCheckDataTest("a", 32, 5.5, 0, 0.8, 0.5, 0), + SanityCheckDataTest("b", 32, 5.4, 1, 0.1, 0, 0.1), + SanityCheckDataTest("b", 32, 5.4, 1, 0.05, 0, 0.1), + SanityCheckDataTest("a", 32, 5.0, 0, 1, 0.5, 0), + SanityCheckDataTest("b", 32, 4.0, 1, 0.1, 0, 0.1), + SanityCheckDataTest("a", 32, 6.0, 1, 0.9, 0.5, 0), + SanityCheckDataTest("a", 32, 5.5, 0, 1, 0.5, 0), + SanityCheckDataTest("b", 32, 5.4, 1, 0.2, 0, 0.1), + SanityCheckDataTest("b", 32, 5.4, 1, 0.3, 0, 0.1), + SanityCheckDataTest("a", 32, 5.0, 0, 0.6, 0.5, 0), + SanityCheckDataTest("b", 32, 4.0, 1, 0.1, 0, 0.1), + SanityCheckDataTest("a", 32, 6.0, 0, 0.9, 0.5, 0), + SanityCheckDataTest("a", 32, 5.5, 0, 1, 0.5, 0), + SanityCheckDataTest("b", 32, 5.4, 1, 0.05, 0, 0.1), + SanityCheckDataTest("b", 32, 5.4, 1, 0.3, 0, 0.1), + SanityCheckDataTest("b", 32, 5.4, 1, 0.05, 0, 0.1), + SanityCheckDataTest("b", 32, 5.4, 1, 0.4, 0, 0.1) + ).map(data => + ( + data.name.toText, + data.height_null.toRealNN, + Seq(data.age, data.height, data.isBlueEyed, data.gender, data.testFeatNegCor).toOPVector + ) + ) + // scalastyle:on Spec[RecordInsightsLOCO[_]] should "work with randomly generated features and binary logistic regression" in { val features = RandomVector.sparse(RandomReal.normal(), 40).limit(1000) val labels = RandomIntegral.integrals(0, 2).limit(1000).map(_.value.get.toRealNN) @@ -63,6 +99,7 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext { // val model = sparkModel.getSparkMlStage().get val insightsTransformer = new RecordInsightsLOCO(sparkModel).setInput(f1) val insights = insightsTransformer.transform(dfWithMeta).collect(insightsTransformer.getOutput()) + insights.foreach(_.value.size shouldBe 20) val parsed = insights.map(RecordInsightsParser.parseInsights) parsed.map( _.count{ case (_, v) => v.exists(_._1 == 1) } shouldBe 20 ) // number insights per pred column @@ -78,6 +115,7 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext { val sparkModel = new OpRandomForestClassifier().setInput(l1r, f1).fit(df) val insightsTransformer = new RecordInsightsLOCO(sparkModel).setInput(f1).setTopK(2) + val insights = insightsTransformer.transform(dfWithMeta).collect(insightsTransformer.getOutput()) insights.foreach(_.value.size shouldBe 2) val parsed = insights.map(RecordInsightsParser.parseInsights) @@ -120,60 +158,40 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext { } it should "return the most predictive features" in { - - // scalastyle:off - val data = Seq( // name, age, height, height_null, isBlueEyed, gender, testFeatNegCor - SanityCheckDataTest("a", 32, 5.0, 0, 0.9, 0.5, 0), - SanityCheckDataTest("b", 32, 4.0, 1, 0.1, 0, 0.1), - SanityCheckDataTest("a", 32, 6.0, 0, 0.8, 0.5, 0), - SanityCheckDataTest("a", 32, 5.5, 0, 0.85, 0.5, 0), - SanityCheckDataTest("b", 32, 5.4, 1, 0.05, 0, 0.1), - SanityCheckDataTest("b", 32, 5.4, 1, 0.2, 0, 0.1), - SanityCheckDataTest("a", 32, 5.0, 0, 0.99, 0.5, 0), - SanityCheckDataTest("b", 32, 4.0, 0, 0.0, 0, 0.1), - SanityCheckDataTest("a", 32, 6.0, 1, 0.7, 0.5, 0), - SanityCheckDataTest("a", 32, 5.5, 0, 0.8, 0.5, 0), - SanityCheckDataTest("b", 32, 5.4, 1, 0.1, 0, 0.1), - SanityCheckDataTest("b", 32, 5.4, 1, 0.05, 0, 0.1), - SanityCheckDataTest("a", 32, 5.0, 0, 1, 0.5, 0), - SanityCheckDataTest("b", 32, 4.0, 1, 0.1, 0, 0.1), - SanityCheckDataTest("a", 32, 6.0, 1, 0.9, 0.5, 0), - SanityCheckDataTest("a", 32, 5.5, 0, 1, 0.5, 0), - SanityCheckDataTest("b", 32, 5.4, 1, 0.2, 0, 0.1), - SanityCheckDataTest("b", 32, 5.4, 1, 0.3, 0, 0.1), - SanityCheckDataTest("a", 32, 5.0, 0, 0.6, 0.5, 0), - SanityCheckDataTest("b", 32, 4.0, 1, 0.1, 0, 0.1), - SanityCheckDataTest("a", 32, 6.0, 0, 0.9, 0.5, 0), - SanityCheckDataTest("a", 32, 5.5, 0, 1, 0.5, 0), - SanityCheckDataTest("b", 32, 5.4, 1, 0.05, 0, 0.1), - SanityCheckDataTest("b", 32, 5.4, 1, 0.3, 0, 0.1), - SanityCheckDataTest("b", 32, 5.4, 1, 0.05, 0, 0.1), - SanityCheckDataTest("b", 32, 5.4, 1, 0.4, 0, 0.1) - ).map(data => - ( - data.name.toText, - data.height_null.toRealNN, - Seq(data.age, data.height, data.isBlueEyed, data.gender, data.testFeatNegCor).toOPVector - ) - ) - // scalastyle:on - val (testData, name, labelNoRes, featureVector) = TestFeatureBuilder("name", "label", "features", data) val label = labelNoRes.copy(isResponse = true) val testDataMeta = addMetaData(testData, "features", 5) val sparkModel = new OpLogisticRegression().setInput(label, featureVector).fit(testData) - val model = sparkModel.asInstanceOf[SparkWrapperParams[_]].getSparkMlStage().get - .asInstanceOf[LogisticRegressionModel] - val transformer = new RecordInsightsLOCO(model).setInput(featureVector) + val transformer = new RecordInsightsLOCO(sparkModel).setInput(featureVector) + + val insights = transformer.setTopK(1).transform(testDataMeta).collect(transformer.getOutput()) + val parsed = insights.map(RecordInsightsParser.parseInsights) + // the highest corr that value that is not zero should be the top feature + parsed.foreach { case in => + withClue(s"top features : ${in.map(_._1.columnName)}") { + Set("3_3_3_3", "1_1_1_1").contains(in.head._1.columnName) shouldBe true + // the scores should be the same but opposite in sign + math.abs(in.head._2(0)._2 + in.head._2(1)._2) < 0.00001 shouldBe true + } + } + } - val insights = transformer.setTopK(1).transform(testDataMeta) + it should "return the most predictive features when using top K Positives + top K negatives strat" in { + val (testData, name, labelNoRes, featureVector) = TestFeatureBuilder("name", "label", "features", data) + val label = labelNoRes.copy(isResponse = true) + val testDataMeta = addMetaData(testData, "features", 5) + val sparkModel = new OpLogisticRegression().setInput(label, featureVector).fit(testData) + val transformer = new RecordInsightsLOCO(sparkModel).setTopKStrategy(TopKStrategy.PositiveNegative) + .setInput(featureVector) + val insights = transformer.transform(testDataMeta) val parsed = insights.collect(name, transformer.getOutput()) .map { case (n, i) => n -> RecordInsightsParser.parseInsights(i) } - // the highest corr that value that is not zero should be the top feature - parsed.foreach { case (_, in) => Set("3_3_3_3", "1_1_1_1").contains(in.head._1.columnName) shouldBe true } - // the scores should be the same but opposite in sign - parsed.foreach { case (_, in) => math.abs(in.head._2(0)._2 + in.head._2(1)._2) < 0.00001 shouldBe true } + parsed.foreach { case (_, in) => + withClue(s"top features : ${in.map(_._1.columnName)}") { + in.head._1.columnName == "1_1_1_1" || in.last._1.columnName == "3_3_3_3" shouldBe true + } + } } it should "return the most predictive features for data generated with a strong relation to the label" in { @@ -218,9 +236,7 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext { // while currency can have either two (if it's null since the currency column will be filled with the mean) or just // one if it's not null. parsed.length shouldBe numRows - parsed.foreach(m => if (m.keySet.count(_.columnName.contains("currency_NullIndicatorValue")) > 0) { - m.size shouldBe 4 - } else m.size shouldBe 3) + parsed.foreach(m => m.size <= 4 shouldBe true) // Want to check the average contribution strengths for each picklist response and compare them to the // average contribution strengths of the other features. We should have a very high contribution when choices