Skip to content

Commit

Permalink
Forecast Evaluator - fixes SMAPE, adds MASE and Seasonal Error metrics (
Browse files Browse the repository at this point in the history
  • Loading branch information
wsuchy authored and tovbinm committed Jul 2, 2019
1 parent 2d49809 commit ecc7f22
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ sealed abstract class ForecastEvalMetric
object ForecastEvalMetrics extends Enum[ForecastEvalMetric] {
val values: Seq[ForecastEvalMetric] = findValues
case object SMAPE extends ForecastEvalMetric("smape", "symmetric mean absolute percentage error")
case object MASE extends ForecastEvalMetric("mase", "mean absolute scaled error")
case object SeasonalError extends ForecastEvalMetric("seasonalError", "seasonal error")
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,63 +39,94 @@ import com.twitter.algebird.Operators._
import com.twitter.algebird.Semigroup
import com.twitter.algebird.macros.caseclass

import scala.collection.mutable

/**
*
* Instance to evaluate Forecast metrics
* The metrics are SMAPE
* Default evaluation returns SMAPE
*
* See: https://www.m4.unic.ac.cy/wp-content/uploads/2018/03/M4-Competitors-Guide.pdf
*
* @param seasonalWindow length of the season (e.g. 7 for daily data with weekly seasonality)
* @param maxItems max number of items to process (default: 10 years of hourly data)
* @param name name of default metric
* @param isLargerBetter is metric better if larger
* @param uid uid for instance
*/

private[op] class OpForecastEvaluator
(
val seasonalWindow: Int = 1,
val maxItems: Int = 87660,
override val name: EvalMetric = OpEvaluatorNames.Forecast,
override val isLargerBetter: Boolean = false,
override val uid: String = UID[OpForecastEvaluator]
) extends OpRegressionEvaluatorBase[ForecastMetrics](uid) {

require(seasonalWindow > 0, "seasonalWindow must not be negative")
require(maxItems > 0, "maxItems must not be negative")

@transient private lazy val log = LoggerFactory.getLogger(this.getClass)

def getDefaultMetric: ForecastMetrics => Double = _.SMAPE

override def evaluateAll(data: Dataset[_]): ForecastMetrics = {
val dataUse = makeDataToUse(data, getLabelCol)

val smape: Double = getSMAPE(dataUse, getLabelCol, getPredictionValueCol)
val metrics = ForecastMetrics(SMAPE = smape)

val metrics = computeMetrics(dataUse, getLabelCol, getPredictionValueCol)
log.info("Evaluated metrics: {}", metrics.toString)
metrics
}

protected def getSMAPE(data: Dataset[_], labelCol: String, predictionValueCol: String): Double = {
data.select(labelCol, predictionValueCol).rdd
.map(r => SMAPEValue(r.getAs[Double](0), r.getAs[Double](1)))
.reduce(_ + _).value
}
}

/**
* SMAPE value computation. See formula here:
* https://www.m4.unic.ac.cy/wp-content/uploads/2018/03/M4-Competitors-Guide.pdf
*
* @param SMAPE symmetric Mean Absolute Percentage Error
*/
object SMAPEValue {
def apply(y: Double, yHat: Double): SMAPEValue = {
SMAPEValue(2 * Math.abs(y - yHat), Math.abs(y) + Math.abs(yHat), 1L)
}
implicit val smapeSG: Semigroup[SMAPEValue] = caseclass.semigroup[SMAPEValue]
}
protected def computeMetrics(data: Dataset[_], labelCol: String, predictionValueCol: String): ForecastMetrics = {

val rows = data.select(labelCol, predictionValueCol).rdd
.map(r => (r.getAs[Double](0), r.getAs[Double](1))).take(maxItems)

val cnt = rows.length
val seasonalLimit = cnt - seasonalWindow
var i = 0
var (seasonalAbsDiff, absDiffSum, smapeSum) = (0.0, 0.0, 0.0)

while (i < cnt) {
val (y, yHat) = rows(i)

if (i < seasonalLimit) {
val (ySeasonal, _) = rows(i + seasonalWindow)
seasonalAbsDiff += Math.abs(y - ySeasonal)
}
val absDiff = Math.abs(y - yHat)

val sumAbs = Math.abs(y) + Math.abs(yHat)
if (sumAbs > 0) {
smapeSum += absDiff / sumAbs
}

case class SMAPEValue private (nominator: Double, denominator: Double, cnt: Long) {
def value: Double = if (denominator == 0.0) Double.NaN else (nominator / denominator) / cnt
absDiffSum += absDiff
i += 1
}

val seasonalError = seasonalAbsDiff / seasonalLimit
val maseDenominator = seasonalError * cnt

ForecastMetrics(
SMAPE = if (cnt > 0) 2 * smapeSum / cnt else 0.0,
SeasonalError = seasonalError,
MASE = if (maseDenominator > 0) absDiffSum / maseDenominator else 0.0
)

}
}

/**
* Metrics of Forecasting Problem
*
* @param SMAPE symmetric Mean Absolute Percentage Error
* @param SMAPE Symmetric Mean Absolute Percentage Error
* @param SeasonalError Seasonal Error
* @param MASE Mean Absolute Scaled Error
*
*/
case class ForecastMetrics(SMAPE: Double) extends EvaluationMetrics
case class ForecastMetrics(SMAPE: Double, SeasonalError: Double, MASE: Double) extends EvaluationMetrics
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ import org.scalatest.junit.JUnitRunner
@RunWith(classOf[JUnitRunner])
class OpForecastEvaluatorTest extends FlatSpec with TestSparkContext {

import spark.implicits._

val (ds, rawLabel, features) = TestFeatureBuilder[RealNN, OPVector](
Seq(
(12.0, Vectors.dense(1.0, 4.3, 1.3)),
Expand Down Expand Up @@ -87,23 +89,59 @@ class OpForecastEvaluatorTest extends FlatSpec with TestSparkContext {
val transformedData = model.setInput(label, features).transform(ds)
val metrics = testEvaluator.evaluateAll(transformedData).toMetadata()

metrics.getDouble(ForecastEvalMetrics.SMAPE.toString) shouldBe (0.0075 +- 1e-4)
metrics.getDouble(ForecastEvalMetrics.SMAPE.toString) shouldBe (0.08979 +- 1e-4)

}

it should "evaluate the metrics from a single model" in {
val model = testEstimator2.fit(ds)
val transformedData = model.setInput(label, features).transform(ds)
val metrics = testEvaluator2.evaluateAll(transformedData).toMetadata()
metrics.getDouble(ForecastEvalMetrics.SMAPE.toString) shouldBe (0.0072 +- 1e-4)
metrics.getDouble(ForecastEvalMetrics.SMAPE.toString) shouldBe (0.09013 +- 1e-4)
}

it should "evaluate the metrics when data is 0" in {
val data = Seq(0.0, 0.0, 0.0).map(x => (x, Map("prediction" -> x)))
import spark.implicits._
it should "evaluate the metrics when data is empty" in {
val data = Seq().map(x => (x, Map("prediction" -> x)))
val df = spark.sparkContext.parallelize(data).toDF("f1", "r1")
val metrics = new OpForecastEvaluator().setLabelCol("f1").setPredictionCol("r1").evaluateAll(df).toMetadata()
metrics.getDouble(ForecastEvalMetrics.SMAPE.toString).isNaN shouldBe true
metrics.getDouble(ForecastEvalMetrics.SMAPE.toString) shouldBe 0.0
metrics.getDouble(ForecastEvalMetrics.MASE.toString) shouldBe 0.0
metrics.getDouble(ForecastEvalMetrics.SeasonalError.toString) shouldBe 0.0
}

it should "verify that Forecast Metrics works" in {
val dataLength = 100
val seasonalWindow = 25
val data = (0 until dataLength).map(x => {
val y = Math.sin((x.toDouble / dataLength) * 2.0 * Math.PI)
(y, Map("prediction" -> y * 1.2))
})

val df = spark.sparkContext.parallelize(data).toDF("f1", "r1")
val metrics = new OpForecastEvaluator(seasonalWindow)
.setLabelCol("f1").setPredictionCol("r1").evaluateAll(df).toMetadata()

metrics.getDouble(ForecastEvalMetrics.SMAPE.toString) shouldBe (0.18 +- 1e-3)
metrics.getDouble(ForecastEvalMetrics.MASE.toString) shouldBe (0.16395 +- 1e-5)
metrics.getDouble(ForecastEvalMetrics.SeasonalError.toString) shouldBe (0.77634 +- 1e-5)

}

it should "verify that Forecast Metrics works if seasonalWindow is too large" in {
val dataLength = 100
val seasonalWindow = 101
val data = (0 until dataLength).map(x => {
val y = Math.sin((x.toDouble / dataLength) * 2.0 * Math.PI)
(y, Map("prediction" -> y * 1.2))
})

val df = spark.sparkContext.parallelize(data).toDF("f1", "r1")
val metrics = new OpForecastEvaluator(seasonalWindow)
.setLabelCol("f1").setPredictionCol("r1").evaluateAll(df).toMetadata()

metrics.getDouble(ForecastEvalMetrics.SMAPE.toString) shouldBe (0.18 +- 1e-3)
metrics.getDouble(ForecastEvalMetrics.MASE.toString) shouldBe 0.0
metrics.getDouble(ForecastEvalMetrics.SeasonalError.toString) shouldBe 0.0
}

}

0 comments on commit ecc7f22

Please sign in to comment.