Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Option to calculate LOCO for dates/texts by Leaving Out Entire Vector. #418

Merged
merged 21 commits into from
Nov 5, 2019
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,32 @@ trait RecordInsightsLOCOParams extends Params {
def setTopKStrategy(strategy: TopKStrategy): this.type = set(topKStrategy, strategy.entryName)
def getTopKStrategy: TopKStrategy = TopKStrategy.withName($(topKStrategy))

final val dateAggregationStrategy = new Param[String](parent = this, name = "dateAggregationStrategy",
doc = "Whether vector for each time period - HourOfDay, DayOfWeek, etc is aggregated by " +
"1. LeaveOutVector strategy - calculate the loco by leaving out the entire vector or" +
tovbinm marked this conversation as resolved.
Show resolved Hide resolved
"2. Avg strategy - calculate the loco for each column of the vector and then average all the locos."
)
def setDateAggregationStrategy(strategy: VectorAggregationStrategy): this.type =
set(dateAggregationStrategy, strategy.entryName)
def getDateAggregationStrategy: VectorAggregationStrategy = VectorAggregationStrategy.withName(
$(dateAggregationStrategy))

final val textAggregationStrategy = new Param[String](parent = this, name = "dateAggregationStrategy",
doc = "Whether text vector is aggregated by " +
"1. LeaveOutVector strategy - calculate the loco by leaving out the entire vector or" +
"2. Avg strategy - calculate the loco for each column of the vector and then average all the locos."
)
def setTextAggregationStrategy(strategy: VectorAggregationStrategy): this.type =
set(textAggregationStrategy, strategy.entryName)
def getTextAggregationStrategy: VectorAggregationStrategy = VectorAggregationStrategy.withName(
$(textAggregationStrategy))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really want/need to expose strategies for each feature type? I imagine that there are other types of features me may eventually want to control as either average or leave out vector and putting in individual settings for each seems excessive...maybe one parameter to control how ALL vector treated features are handled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree. One parameter should be enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done



setDefault(
topK -> 20,
topKStrategy -> TopKStrategy.Abs.entryName
topKStrategy -> TopKStrategy.Abs.entryName,
dateAggregationStrategy -> VectorAggregationStrategy.Avg.entryName,
textAggregationStrategy -> VectorAggregationStrategy.Avg.entryName
)
}

Expand Down Expand Up @@ -113,21 +136,22 @@ class RecordInsightsLOCO[T <: Model[T]]
private val dateMapTypes =
Set(FeatureType.typeName[DateMap], FeatureType.typeName[DateTimeMap])

// Indices of features derived from hashed Text(Map)Vectorizer
private lazy val textFeatureIndices: Seq[Int] = getIndicesOfFeatureType(textTypes ++ textMapTypes,
h => h.indicatorValue.isEmpty && h.descriptorValue.isEmpty)
private lazy val textFeaturesCount: Map[String, Int] = getFeatureCount(isTextIndex)
private lazy val dateFeaturesCount: Map[String, Int] = getFeatureCount(isDateIndex)

// Indices of features derived from unit Date(Map)Vectorizer
private lazy val dateFeatureIndices = getIndicesOfFeatureType(dateTypes ++ dateMapTypes, _.descriptorValue.isDefined)
private def getFeatureCount(predicate: OpVectorColumnHistory => Boolean): Map[String, Int] = histories
.filter(predicate)
.groupBy { h => getRawFeatureName(h).get }
.mapValues(_.length).view.toMap
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. what's the point of .view here? either add it before filter of not add it at all.
  2. add docs


/**
* Return the indices of features derived from given types.
* @return Seq[Int]
*/
private def getIndicesOfFeatureType(types: Set[String], predicate: OpVectorColumnHistory => Boolean): Seq[Int] =
histories.collect {
case h if h.parentFeatureType.exists(types.contains) && predicate(h) => h.index
}.distinct.sorted
private def isTextIndex(h: OpVectorColumnHistory): Boolean = {
h.parentFeatureType.exists((textTypes ++ textMapTypes).contains) &&
tovbinm marked this conversation as resolved.
Show resolved Hide resolved
h.indicatorValue.isEmpty && h.descriptorValue.isEmpty
}

private def isDateIndex(h: OpVectorColumnHistory): Boolean = {
h.parentFeatureType.exists((dateTypes ++ dateMapTypes).contains) && h.descriptorValue.isDefined
}

private def computeDiff
(
Expand Down Expand Up @@ -168,82 +192,85 @@ class RecordInsightsLOCO[T <: Model[T]]
}
}

private def aggregateDiffs(
featureSparse: SparseVector,
aggIndices: Array[(Int, Int)],
strategy: VectorAggregationStrategy,
baseScore: Array[Double],
featureCount: Int
): Array[Double] = {
Copy link
Contributor Author

@sanmitra sanmitra Nov 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cost of computeDiff is O(n).
When VectorAggregationStrategy=Avg note that, we do computeDiff m times and
When VectorAggregationStrategy=LeaveOutVector, we do computeDiff only 1 time.
So the former computation is O(n*m) and the latter is O(n)

where n is the size of entire feature vector (i.e containing all the features) and m is the size of individual text/date feature vector which is to be aggregated.

@tovbinm ^^

strategy match {
case VectorAggregationStrategy.Avg =>
aggIndices
.map { case (i, oldInd) => computeDiff(featureSparse.copy.updated(i, oldInd, 0.0), baseScore) }
.foldLeft(Array.empty[Double])(sumArrays)
.map( _ / featureCount)

case VectorAggregationStrategy.LeaveOutVector =>
val copyFeatureSparse = featureSparse.copy
aggIndices.foreach {case (i, oldInd) => copyFeatureSparse.updated(i, oldInd, 0.0)}
tovbinm marked this conversation as resolved.
Show resolved Hide resolved
computeDiff(copyFeatureSparse, baseScore)
}
}

private def returnTopPosNeg
(
featureSparse: SparseVector,
zeroCountByFeature: Map[String, Int],
featureSize: Int,
baseScore: Array[Double],
k: Int,
indexToExamine: Int
): Seq[LOCOValue] = {
val minMaxHeap = new MinMaxHeap(k)
val aggregationMap = mutable.Map.empty[String, (Array[Int], Array[Double])]

agggregateDiffs(featureSparse, indexToExamine, minMaxHeap, aggregationMap,
baseScore)

// Aggregation map contains aggregation of Unit Circle Dates and Hashed Text Features
// Adding LOCO results from aggregation map into heaps
for {(name, (indices, ar)) <- aggregationMap} {
// The index here is arbitrary
val (i, n) = (indices.head, indices.length)
val zeroCounts = zeroCountByFeature.get(name).getOrElse(0)
val diffToExamine = ar.map(_ / (n + zeroCounts))
minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine)
}

minMaxHeap.dequeueAll
}
// Map[FeatureName, (Array[SparseVectorIndices], Array[ActualIndices])
val textActiveIndices = mutable.Map.empty[String, Array[(Int, Int)]]
val dateActiveIndices = mutable.Map.empty[String, Array[(Int, Int)]]

private def agggregateDiffs(
featureVec: SparseVector,
indexToExamine: Int,
minMaxHeap: MinMaxHeap,
aggregationMap: mutable.Map[String, (Array[Int], Array[Double])],
baseScore: Array[Double]
): Unit = {
computeDiffs(featureVec, baseScore).foreach { case (i, oldInd, diffToExamine) =>
(0 until featureSparse.size, featureSparse.indices).zipped.foreach { case (i: Int, oldInd: Int) =>
val history = histories(oldInd)
history match {
// If indicator value and descriptor value of a derived text feature are empty, then it is
// a hashing tf output. We aggregate such features for each (rawFeatureName).
case h if (textFeatureIndices ++ dateFeatureIndices).contains(oldInd) =>
case h if isTextIndex(h) => {
for {name <- getRawFeatureName(h)} {
val indices = textActiveIndices.getOrElse(name, (Array.empty[(Int, Int)]))
textActiveIndices.update(name, indices :+ (i, oldInd))
}
}
case h if isDateIndex(h) => {
for {name <- getRawFeatureName(h)} {
val (indices, array) = aggregationMap.getOrElse(name, (Array.empty[Int], Array.empty[Double]))
aggregationMap.update(name, (indices :+ i, sumArrays(array, diffToExamine)))
val indices = dateActiveIndices.getOrElse(name, (Array.empty[(Int, Int)]))
dateActiveIndices.update(name, indices :+ (i, oldInd))
}
case _ => minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine)
}
case _ => {
val diffToExamine = computeDiff(featureSparse.copy.updated(i, oldInd, 0.0), baseScore)
minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine)
}
}
}
}

private def computeDiffs(
featureVec: SparseVector,
baseScore: Array[Double]
) = {
(0 until featureVec.size, featureVec.indices).zipped.map { case (i, oldInd) =>
(i, oldInd, computeDiff(featureVec.copy.updated(i, oldInd, 0.0), baseScore))
// Aggregate active indices of each text feature and date feature based on their respective strategy.
textActiveIndices.foreach {
case (name, aggIndices) =>
val diffToExamine = aggregateDiffs(featureSparse, aggIndices,
getTextAggregationStrategy, baseScore, textFeaturesCount.get(name).get)
minMaxHeap enqueue LOCOValue(aggIndices.head._1, diffToExamine(indexToExamine), diffToExamine)
}
dateActiveIndices.foreach {
case (name, aggIndices) =>
val diffToExamine = aggregateDiffs(featureSparse, aggIndices,
getDateAggregationStrategy, baseScore, dateFeaturesCount.get(name).get)
minMaxHeap enqueue LOCOValue(aggIndices.head._1, diffToExamine(indexToExamine), diffToExamine)
}

minMaxHeap.dequeueAll
}

override def transformFn: OPVector => TextMap = features => {
val baseResult = modelApply(labelDummy, features)
val baseScore = baseResult.score
val featureSize = features.value.size

// TODO: sparse implementation only works if changing values to zero - use dense vector to test effect of zeros
val featuresSparse = features.value.toSparse
val featureIndexSet = featuresSparse.indices.toSet

// Besides non 0 values, we want to check the text/date features as well
val zeroValIndices = (textFeatureIndices ++ dateFeatureIndices)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did you manage to remove the zero val indices logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From featureSparse vector we know the active indices count, but to calculate the average LOCO for each date/text field, we needed the zero val indices logic. We used to calculate the total count of indices for a date/text feature in each transformation of a individual record/row in transformFn function. There is no need to do this, we can just calculate the total count of indices per date/text feature only once using OpVectorColumnHistory at the global level (i.e outside transformFn function ), this is what I did in line 139

private lazy val textFeaturesCount: Map[String, Int] = getFeatureCount(isTextIndex)
private lazy val dateFeaturesCount: Map[String, Int] = getFeatureCount(isDateIndex) 

.filterNot(featureIndexSet.contains)

// Count zeros by feature name
val zeroCountByFeature = zeroValIndices
.groupBy(i => getRawFeatureName(histories(i)).get)
.mapValues(_.length).view.toMap

val k = $(topK)
// Index where to examine the difference in the prediction vector
Expand All @@ -254,14 +281,14 @@ class RecordInsightsLOCO[T <: Model[T]]
// For MultiClassification, the value is from the predicted class(i.e. the class having the highest probability)
case n if n > 2 => baseResult.prediction.toInt
}
val topPosNeg = returnTopPosNeg(featuresSparse, zeroCountByFeature, featureSize, baseScore, k, indexToExamine)
val topPosNeg = returnTopPosNeg(featuresSparse, baseScore, k, indexToExamine)
val top = getTopKStrategy match {
case TopKStrategy.Abs => topPosNeg.sortBy { case LOCOValue(_, v, _) => -math.abs(v) }.take(k)
// Take top K positive and top K negative LOCOs, hence 2 * K
case TopKStrategy.PositiveNegative => topPosNeg.sortBy { case LOCOValue(_, v, _) => -v }.take(2 * k)
}

val allIndices = featuresSparse.indices ++ zeroValIndices
val allIndices = featuresSparse.indices
top.map { case LOCOValue(i, _, diffs) =>
RecordInsightsParser.insightToText(featureInfo(allIndices(i)), diffs)
}.toMap.toTextMap
Expand Down Expand Up @@ -329,3 +356,14 @@ object TopKStrategy extends Enum[TopKStrategy] {
case object Abs extends TopKStrategy("abs")
case object PositiveNegative extends TopKStrategy("positive and negative")
}


sealed abstract class VectorAggregationStrategy(val name: String) extends EnumEntry with Serializable

object VectorAggregationStrategy extends Enum[VectorAggregationStrategy] {
val values = findValues
case object LeaveOutVector extends
VectorAggregationStrategy("calculate the loco by leaving out the entire vector")
case object Avg extends
VectorAggregationStrategy("calculate the loco for each column of the vector and then average all the locos")
}
Loading