Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding binaryclassification bin score evaluator #119

Merged
merged 20 commits into from
Sep 14, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Copyright (c) 2017, Salesforce.com, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* * Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.salesforce.op.evaluators

import com.salesforce.op.UID
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.types.DoubleType
import org.slf4j.LoggerFactory
import org.apache.spark.Partitioner

/**
*
* Evaluator for Binary Classification which provides statistics about the predicted scores.
* This evaluator creates the specified number of bins and computes the statistics for each bin
* and returns BinaryClassificationBinMetrics, which contains
*
* Total number of data points per bin
* Average Score per bin
* Average Conversion rate per bin
* Bin Centers for each bin
* BrierScore for the overall dataset is also computed, which is a default metric as well.
*
* @param name name of default metric
* @param isLargerBetter is metric better if larger
* @param uid uid for instance
*/
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Perhaps rename to OpBinScoreEvaluator?
  2. Also this evaluator has to be added to the Evaluators factory object as well and BinaryClassEvalMetrics enum.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This evaluator returns 5 diff values. Should there be 5 factory methods? or only for Brier Score, which is the defaultmetric?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only the Brier Score. @leahmcguire wdyt?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes only for the brier score - the other metrics are support for the brier score. The brier score is the only metric that could be used for optimization

private[op] class OpBinaryClassifyBinEvaluator
(
override val name: EvalMetric = OpEvaluatorNames.Binary,
override val isLargerBetter: Boolean = true,
override val uid: String = UID[BinaryClassificationBinMetrics],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uid has to use the class of the evaluator itself, i.e UID[OpBinaryClassifyBinEvaluator]

val numBins: Int = 100
) extends OpBinaryClassificationEvaluatorBase[BinaryClassificationBinMetrics](uid = uid) {

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check numBins here using require - require(numBins > 0, "numBins must be positive")

@transient private lazy val log = LoggerFactory.getLogger(this.getClass)

def getDefaultMetric: BinaryClassificationBinMetrics => Double = _.BrierScore

override def evaluateAll(data: Dataset[_]): BinaryClassificationBinMetrics = {
val labelColName = getLabelCol
val dataUse = makeDataToUse(data, labelColName)

val (rawPredictionColName, predictionColName, probabilityColName) =
(getRawPredictionCol, getPredictionValueCol, getProbabilityCol)
log.debug(
"Evaluating metrics on columns :\n label : {}\n rawPrediction : {}\n prediction : {}\n probability : {}\n",
labelColName, rawPredictionColName, predictionColName, probabilityColName
)

import dataUse.sparkSession.implicits._
val rdd = dataUse.select(predictionColName, labelColName).as[(Double, Double)].rdd

if (rdd.isEmpty()) {
log.error("The dataset is empty")
BinaryClassificationBinMetrics(0.0, Seq.empty[Double], Seq.empty[Long], Seq.empty[Double], Seq.empty[Double])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seq() or Seq.empty no need to specify the type here.

} else {
val scoreAndLabels =
dataUse.select(col(probabilityColName), col(labelColName).cast(DoubleType)).rdd.map {
case Row(prob: Vector, label: Double) => (prob(1), label)
case Row(prob: Double, label: Double) => (prob, label)
}

if (numBins == 0) {
log.error("numBins is set to 0. Returning empty metrics")
BinaryClassificationBinMetrics(0.0, Seq.empty[Double], Seq.empty[Long], Seq.empty[Double], Seq.empty[Double])
} else {
// Find the significant digit to which the scores needs to be rounded, based of numBins.
val significantDigitToRoundOff = math.log10(numBins).toInt + 1
val scoreAndLabelsRounded = for {i <- scoreAndLabels}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you over complicated this. you can simply do it in a single scoreAndLabels.map.reduce run

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think my explanation on the doc may have made this more complicated than it needs to be.
The probabilities will be between 0 and 1 so use that information to compute the bins and their centers. Make a function binFn that takes in the probability and returns the bin. Then you can simply do:
scoreAndLabels.map{ case (score, label) => (binFun(score), (score, label, 1L) }.reduceByKey(_ + _) .map{ case (bin, (scoreSum, labelSum, count)) => (bin, scoreSum/count, labelSum/count, count) }
(you will need to import com.twitter.algebird.Operators._ for the reduceByKey to work without specifying everything)

yield (BigDecimal(i._1).setScale(significantDigitToRoundOff,
BigDecimal.RoundingMode.HALF_UP).toDouble, (i._1, i._2))

// Create `numBins` bins and place each score in its corresponding bin.
val binnedValues = scoreAndLabelsRounded.partitionBy(new OpBinPartitioner(numBins)).values

// compute the average score per bin
val averageScore = binnedValues.mapPartitions(scores => {
val (totalScore, count) = scores.foldLeft(0.0, 0)(
(r: (Double, Int), s: (Double, Double)) => (r._1 + s._1, r._2 + 1))
Iterator(if (count == 0) 0.0 else totalScore / count)
}).collect().toSeq

// compute the average conversion rate per bin. Convertion rate is the number of 1's in labels.
val averageConvertionRate = binnedValues.mapPartitions(scores => {
val (totalConversion, count) = scores.foldLeft(0.0, 0)(
(r: (Double, Int), s: (Double, Double)) => (r._1 + s._2, r._2 + 1))
Iterator(if (count == 0) 0.0 else totalConversion / count)
}).collect().toSeq

// compute total number of data points in each bin.
val numberOfDataPoints = binnedValues.mapPartitions(scores => Iterator(scores.length.toLong)).collect().toSeq

// binCenters is the center point in each bin.
// e.g., for bins [(0.0 - 0.5), (0.5 - 1.0)], bin centers are [0.25, 0.75].
val binCenters = (for {i <- 0 to numBins} yield ((i + 0.5) / numBins)).dropRight(1)

// brier score of entire dataset.
val brierScore = scoreAndLabels.map { case (score, label) => math.pow((score - label), 2) }.mean()

val metrics = BinaryClassificationBinMetrics(
BrierScore = brierScore,
BinCenters = binCenters,
NumberOfDataPoints = numberOfDataPoints,
AverageScore = averageScore,
AverageConversionRate = averageConvertionRate
)

log.info("Evaluated metrics: {}", metrics.toString)
metrics
}
}
}
}

// BinPartitioner which partition the bins.
class OpBinPartitioner(override val numPartitions: Int) extends Partitioner {

// computes the bin number(0-indexed) to which the datapoint is assigned.
// For Score 1.0, overflow happens. So, use math.min(last_bin, bin_index__computed).
def getPartition(key: Any): Int = key match {
case score: Double => math.min(numPartitions - 1, (score * numPartitions).toInt)
}
}

/**
* Metrics of BinaryClassificationBinMetrics
*
* @param BinCenters center of each bin
* @param NumberOfDataPoints total number of data points in each bin
* @param AverageScore average score in each bin
* @param AverageConversionRate average conversion rate in each bin
* @param BrierScore brier score for overall dataset
*/
case class BinaryClassificationBinMetrics
(
BrierScore: Double,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Names should start with a lower case: brierScore

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So for some reason all of our metrics do not follow this convention @tovbinm. We should figure out what we want and then make them all consistent.

BinCenters: Seq[Double],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Json annotations has to added on sequences. See other binclass eval metrics class.

NumberOfDataPoints: Seq[Long],
AverageScore: Seq[Double],
AverageConversionRate: Seq[Double]
) extends EvaluationMetrics
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright (c) 2017, Salesforce.com, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* * Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package com.salesforce.op.evaluators

import com.salesforce.op.test.TestSparkContext
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner

@RunWith(classOf[JUnitRunner])
class OpBinaryClassifyBinEvaluatorTest extends FlatSpec with TestSparkContext {

val labelName = "label"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can create a dataframe using TestFeatureBuilder easily. That way you need to define these strings here and create dataframes with spark below in tests.

val predictionLabel = "pred"

val dataset_test = Seq(
(Map("probability_1" -> 0.99999, "probability_0" -> 0.0001, "prediction" -> 1.0), 1.0),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Prediction type instead

(Map("probability_1" -> 0.99999, "probability_0" -> 0.0001, "prediction" -> 1.0), 1.0),
(Map("probability_1" -> 0.00541, "probability_0" -> 0.99560, "prediction" -> 1.0), 0.0),
(Map("probability_1" -> 0.70, "probability_0" -> 0.30, "prediction" -> 1.0), 0.0),
(Map("probability_1" -> 0.001, "probability_0" -> 0.999, "prediction" -> 0.0), 0.0)
)

val dataset_skewed = Seq(
(Map("probability_1" -> 0.99999, "probability_0" -> 0.0001, "prediction" -> 1.0), 1.0),
(Map("probability_1" -> 0.99999, "probability_0" -> 0.0001, "prediction" -> 1.0), 1.0),
(Map("probability_1" -> 0.9987, "probability_0" -> 0.001, "prediction" -> 1.0), 1.0),
(Map("probability_1" -> 0.946, "probability_0" -> 0.0541, "prediction" -> 1.0), 1.0)
)

val emptyDataSet = Seq.empty[(Map[String, Double], Double)]

Spec[OpBinaryClassifyBinEvaluator] should "return the bin metrics" in {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should "evaluate the bin metrics"

val df = spark.createDataFrame(dataset_test).toDF(predictionLabel, labelName)

val metrics = new OpBinaryClassifyBinEvaluator(numBins = 4)
.setLabelCol(labelName).setPredictionCol(predictionLabel).evaluateAll(df)

BigDecimal(metrics.BrierScore).setScale(3, BigDecimal.RoundingMode.HALF_UP).toDouble shouldBe 0.098
metrics.BinCenters shouldBe Seq(0.125, 0.375, 0.625, 0.875)
metrics.NumberOfDataPoints shouldBe Seq(2, 0, 1, 2)
metrics.AverageScore shouldBe Seq(0.003205, 0.0, 0.7, 0.99999)
metrics.AverageConversionRate shouldBe Seq(0.0, 0.0, 0.0, 1.0)
}

it should "return the empty bin metrics for numBins == 0" in {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should "error on invalid num of bins"

val df = spark.createDataFrame(dataset_test).toDF(predictionLabel, labelName)

val metrics = new OpBinaryClassifyBinEvaluator(numBins = 0)
.setLabelCol(labelName).setPredictionCol(predictionLabel).evaluateAll(df)

metrics.BrierScore shouldBe 0.0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case classes has equals implemented so you can simply do metrics shouldBe BinaryClassificationBinMetrics(0.0, Seq(), Seq(), Seq(), Seq()) here and everywhere in tests

metrics.BinCenters shouldBe Seq.empty[Double]
metrics.NumberOfDataPoints shouldBe Seq.empty[Long]
metrics.AverageScore shouldBe Seq.empty[Double]
metrics.AverageConversionRate shouldBe Seq.empty[Double]
}

it should "return the empty bin metrics for empty data" in {
val df = spark.createDataFrame(emptyDataSet).toDF(predictionLabel, labelName)

val metrics = new OpBinaryClassifyBinEvaluator(numBins = 10)
.setLabelCol(labelName).setPredictionCol(predictionLabel).evaluateAll(df)

metrics.BrierScore shouldBe 0.0
metrics.BinCenters shouldBe Seq.empty[Double]
metrics.NumberOfDataPoints shouldBe Seq.empty[Long]
metrics.AverageScore shouldBe Seq.empty[Double]
metrics.AverageConversionRate shouldBe Seq.empty[Double]
}

it should "return the bin metrics for skewed data" in {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should "evaluate bin metrics for skewed data"

val df = spark.createDataFrame(dataset_skewed).toDF(predictionLabel, labelName)

val metrics = new OpBinaryClassifyBinEvaluator(numBins = 5)
.setLabelCol(labelName).setPredictionCol(predictionLabel).evaluateAll(df)

metrics.BrierScore shouldBe 7.294225500000013E-4
metrics.BinCenters shouldBe Seq(0.1, 0.3, 0.5, 0.7, 0.9)
metrics.NumberOfDataPoints shouldBe Seq(0, 0, 0, 0, 4)
metrics.AverageScore shouldBe Seq(0.0, 0.0, 0.0, 0.0, 0.98617)
metrics.AverageConversionRate shouldBe Seq(0.0, 0.0, 0.0, 0.0, 1.0)
}

it should "return the default metric as BrierScore" in {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should "evaluate the default metric as BrierScore"

val df = spark.createDataFrame(dataset_test).toDF(predictionLabel, labelName)

val evaluator = new OpBinaryClassifyBinEvaluator(numBins = 4)
.setLabelCol(labelName).setPredictionCol(predictionLabel)

val brierScore = evaluator.getDefaultMetric(evaluator.evaluateAll(df))
BigDecimal(brierScore).setScale(3, BigDecimal.RoundingMode.HALF_UP).toDouble shouldBe 0.098
}
}