Skip to content

Commit

Permalink
Aggregated LOCOs of SmartTextVectorizer outputs (#308)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelweilsalesforce authored and tovbinm committed Jun 3, 2019
1 parent 154e188 commit cc84919
Show file tree
Hide file tree
Showing 2 changed files with 330 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ import org.apache.spark.ml.Model
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.param.{IntParam, Param}

import scala.collection.mutable.PriorityQueue
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

/**
* Creates record level insights for model predictions. Takes the model to explain as a constructor argument.
Expand All @@ -55,6 +56,7 @@ import scala.collection.mutable.PriorityQueue
* derived features based on the LOCO insight.For MultiClassification, the value is from the predicted class
* (i.e. the class having the highest probability)
* - If Abs, returns at most topK elements : the topK derived features having highest absolute value of LOCO score.
*
* @param model model instance that you wish to explain
* @param uid uid for instance
*/
Expand All @@ -71,20 +73,15 @@ class RecordInsightsLOCO[T <: Model[T]]
)

def setTopK(value: Int): this.type = set(topK, value)

def getTopK: Int = $(topK)

setDefault(topK -> 20)

final val topKStrategy = new Param[String](parent = this, name = "topKStrategy",
doc = "Whether returning topK based on absolute value or topK positives and negatives. For MultiClassification," +
" the value is from the predicted class (i.e. the class having the highest probability)"
)

def setTopKStrategy(strategy: TopKStrategy): this.type = set(topKStrategy, strategy.entryName)

def getTopKStrategy: TopKStrategy = TopKStrategy.withName($(topKStrategy))

setDefault(topKStrategy, TopKStrategy.Abs.entryName)

private val modelApply = model match {
Expand All @@ -93,107 +90,186 @@ class RecordInsightsLOCO[T <: Model[T]]
case m => toOPUnchecked(m).transformFn
}
private val labelDummy = RealNN(0.0)
private lazy val histories = OpVectorMetadata(getInputSchema()(in1.name)).getColumnHistory()
private lazy val featureInfo = histories.map(_.toJson(false))

/**
* These are the name of the types we want to perform an aggregation of the LOCO results over derived features
*/
private val textTypes =
Set(FeatureType.typeName[Text], FeatureType.typeName[TextArea], FeatureType.typeName[TextList])
private val textMapTypes =
Set(FeatureType.typeName[TextMap], FeatureType.typeName[TextAreaMap])

private lazy val featureInfo = OpVectorMetadata(getInputSchema()(in1.name)).getColumnHistory().map(_.toJson(false))
// Indices of features derived from Text(Map)Vectorizer
private lazy val textFeatureIndices = histories
.filter(_.parentFeatureType.exists((textTypes ++ textMapTypes).contains))
.map(_.index)
.distinct.sorted

private def computeDiffs(i: Int, oldInd: Int, oldVal: Double, featureArray: Array[(Int, Double)], featureSize: Int,
baseScore: Array[Double]): Array[Double] = {
featureArray.update(i, (oldInd, 0))
val score = modelApply(labelDummy, OPVector(Vectors.sparse(featureSize, featureArray))).score
private def computeDiffs
(
i: Int,
oldInd: Int,
oldVal: Double,
featureArray: Array[(Int, Double)],
featureSize: Int,
baseScore: Array[Double]
): Array[Double] = {
featureArray.update(i, (oldInd, 0.0))
val score = modelApply(labelDummy, Vectors.sparse(featureSize, featureArray).toOPVector).score
val diffs = baseScore.zip(score).map { case (b, s) => b - s }
featureArray.update(i, (oldInd, oldVal))
diffs
}

private def returnTopPosNeg(filledSize: Int, featureArray: Array[(Int, Double)], featureSize: Int,
baseScore: Array[Double], k: Int, indexToExamine: Int): Seq[(Int, Double, Array[Double])] = {
// Heap that will contain the top K positive LOCO values
val positiveMaxHeap = PriorityQueue.empty(MinScore)
// Heap that will contain the top K negative LOCO values
val negativeMaxHeap = PriorityQueue.empty(MaxScore)
// for each element of the feature vector != 0.0
// Size of positive heap
var positiveCount = 0
// Size of negative heap
var negativeCount = 0
var i = 0
while (i < filledSize) {
private def sumArrays(left: Array[Double], right: Array[Double]): Array[Double] = {
left.zipAll(right, 0.0, 0.0).map { case (l, r) => l + r }
}

private def returnTopPosNeg
(
featureArray: Array[(Int, Double)],
featureSize: Int,
baseScore: Array[Double],
k: Int,
indexToExamine: Int
): Seq[LOCOValue] = {

val minMaxHeap = new MinMaxHeap(k)
val aggregationMap = mutable.Map.empty[String, (Array[Int], Array[Double])]
for {i <- featureArray.indices} {
val (oldInd, oldVal) = featureArray(i)
val diffToExamine = computeDiffs(i, oldInd, oldVal, featureArray, featureSize, baseScore)
val max = diffToExamine(indexToExamine)
val history = histories(oldInd)

if (max > 0.0) { // if positive LOCO then add it to positive heap
positiveMaxHeap.enqueue((i, max, diffToExamine))
positiveCount += 1
if (positiveCount > k) { // remove the lowest element if the heap size goes from 5 to 6
positiveMaxHeap.dequeue()
// Let's check the indicator value and descriptor value
// If those values are empty, the field is likely to be a derived text feature (hashing tf output)
if (textFeatureIndices.contains(oldInd) && history.indicatorValue.isEmpty && history.descriptorValue.isEmpty) {
// Name of the field
val rawName = history.parentFeatureType match {
case s if s.exists(textMapTypes.contains) => {
val grouping = history.grouping
history.parentFeatureOrigins.headOption.map(_ + "_" + grouping.getOrElse(""))
}
case s if s.exists(textTypes.contains) => history.parentFeatureOrigins.headOption
case s => throw new Error(s"type should be Text or TextMap, here ${s.mkString(",")}")
}
// Update the aggregation map
for {name <- rawName} {
val key = name
val (indices, array) = aggregationMap.getOrElse(key, (Array.empty[Int], Array.empty[Double]))
aggregationMap.update(key, (indices :+ i, sumArrays(array, diffToExamine)))
}
} else if (max < 0.0) { // if negative LOCO then add it to negative heap
negativeMaxHeap.enqueue((i, max, diffToExamine))
negativeCount += 1
if (negativeCount > k) { // remove the highest element if the heap size goes from 5 to 6
negativeMaxHeap.dequeue()
} // Not keeping LOCOs with value 0
} else {
minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine)
}
i += 1
}
val topPositive = positiveMaxHeap.dequeueAll
val topNegative = negativeMaxHeap.dequeueAll
(topPositive ++ topNegative)

// Adding LOCO results from aggregation map into heaps
for {(indices, ar) <- aggregationMap.values} {
// The index here is arbitrary
val (i, n) = (indices.head, indices.length)
val diffToExamine = ar.map(_ / n)
minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine)
}

minMaxHeap.dequeueAll
}

override def transformFn: OPVector => TextMap = (features) => {
override def transformFn: OPVector => TextMap = features => {
val baseResult = modelApply(labelDummy, features)
val baseScore = baseResult.score

// TODO sparse implementation only works if changing values to zero - use dense vector to test effect of zeros
// TODO: sparse implementation only works if changing values to zero - use dense vector to test effect of zeros
val featuresSparse = features.value.toSparse
val featureArray = featuresSparse.indices.zip(featuresSparse.values)
val filledSize = featureArray.length
val res = ArrayBuffer.empty[(Int, Double)]
featuresSparse.foreachActive((i, v) => res += i -> v)
// Besides non 0 values, we want to check the text features as well
textFeatureIndices.foreach(i => if (!featuresSparse.indices.contains(i)) res += i -> 0.0)
val featureArray = res.toArray
val featureSize = featuresSparse.size

val k = $(topK)
// Index where to examine the difference in the prediction vector
val indexToExamine = baseScore.length match {
case 0 => throw new RuntimeException("model does not produce scores for insights")
case 1 => 0
case 2 => 1
case n if (n > 2) => baseResult.prediction.toInt
// For MultiClassification, the value is from the predicted class(i.e. the class having the highest probability)
case n if n > 2 => baseResult.prediction.toInt
}
val topPosNeg = returnTopPosNeg(filledSize, featureArray, featureSize, baseScore, k, indexToExamine)
val topPosNeg = returnTopPosNeg(featureArray, featureSize, baseScore, k, indexToExamine)
val top = getTopKStrategy match {
case TopKStrategy.Abs => topPosNeg.sortBy { case (_, v, _) => -math.abs(v) }.take(k)
case TopKStrategy.PositiveNegative => topPosNeg.sortBy { case (_, v, _) => -v }
case TopKStrategy.Abs => topPosNeg.sortBy { case LOCOValue(_, v, _) => -math.abs(v) }.take(k)
// Take top K positive and top K negative LOCOs, hence 2 * K
case TopKStrategy.PositiveNegative => topPosNeg.sortBy { case LOCOValue(_, v, _) => -v }.take(2 * k)
}

top.map { case (k, _, v) => RecordInsightsParser.insightToText(featureInfo(featureArray(k)._1), v) }
.toMap.toTextMap
top.map { case LOCOValue(i, _, diffs) =>
RecordInsightsParser.insightToText(featureInfo(featureArray(i)._1), diffs)
}.toMap.toTextMap
}

}

/**
* Heap to keep top K of min and max LOCO values
*
* @param k number of values to keep
*/
private class MinMaxHeap(k: Int) {

// Heap that will contain the top K positive LOCO values
private val positives = mutable.PriorityQueue.empty(MinScore)

// Heap that will contain the top K negative LOCO values
private val negatives = mutable.PriorityQueue.empty(MaxScore)

def enqueue(loco: LOCOValue): Unit = {
// Not keeping LOCOs with value 0, i.e. for each element of the feature vector != 0.0
if (loco.value > 0.0) { // if positive LOCO then add it to positive heap
positives.enqueue(loco)
// remove the lowest element if the heap size goes above k
if (positives.length > k) positives.dequeue()
} else if (loco.value < 0.0) { // if negative LOCO then add it to negative heap
negatives.enqueue(loco)
// remove the highest element if the heap size goes above k
if (negatives.length > k) negatives.dequeue()
}
}

def dequeueAll: Seq[LOCOValue] = positives.dequeueAll ++ negatives.dequeueAll

}

/**
* LOCO value container
*
* @param i feature value index
* @param value value - min or max, depending on the ordering
* @param diffs scores diff
*/
private case class LOCOValue(i: Int, value: Double, diffs: Array[Double])

/**
* Ordering of the heap that removes lowest score
*/
private object MinScore extends Ordering[(Int, Double, Array[Double])] {
def compare(x: (Int, Double, Array[Double]), y: (Int, Double, Array[Double])): Int =
y._2 compare x._2
private object MinScore extends Ordering[LOCOValue] {
def compare(x: LOCOValue, y: LOCOValue): Int = y.value compare x.value
}

/**
* Ordering of the heap that removes highest score
*/
private object MaxScore extends Ordering[(Int, Double, Array[Double])] {
def compare(x: (Int, Double, Array[Double]), y: (Int, Double, Array[Double])): Int =
x._2 compare y._2
private object MaxScore extends Ordering[LOCOValue] {
def compare(x: LOCOValue, y: LOCOValue): Int = x.value compare y.value
}

sealed abstract class TopKStrategy(val name: String) extends EnumEntry with Serializable

object TopKStrategy extends Enum[TopKStrategy] {
val values = findValues

case object Abs extends TopKStrategy("abs")

case object PositiveNegative extends TopKStrategy("positive and negative")

}

0 comments on commit cc84919

Please sign in to comment.