From ecc7f221df24f59129e700115d0074cc6a5db732 Mon Sep 17 00:00:00 2001 From: Christopher Suchanek Date: Tue, 2 Jul 2019 14:32:45 -0700 Subject: [PATCH] Forecast Evaluator - fixes SMAPE, adds MASE and Seasonal Error metrics (#342) --- .../op/evaluators/EvaluationMetrics.scala | 2 + .../op/evaluators/OpForecastEvaluator.scala | 81 +++++++++++++------ .../evaluators/OpForecastEvaluatorTest.scala | 50 ++++++++++-- 3 files changed, 102 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/evaluators/EvaluationMetrics.scala b/core/src/main/scala/com/salesforce/op/evaluators/EvaluationMetrics.scala index 186a69a2cd..56b63577ff 100644 --- a/core/src/main/scala/com/salesforce/op/evaluators/EvaluationMetrics.scala +++ b/core/src/main/scala/com/salesforce/op/evaluators/EvaluationMetrics.scala @@ -190,6 +190,8 @@ sealed abstract class ForecastEvalMetric object ForecastEvalMetrics extends Enum[ForecastEvalMetric] { val values: Seq[ForecastEvalMetric] = findValues case object SMAPE extends ForecastEvalMetric("smape", "symmetric mean absolute percentage error") + case object MASE extends ForecastEvalMetric("mase", "mean absolute scaled error") + case object SeasonalError extends ForecastEvalMetric("seasonalError", "seasonal error") } diff --git a/core/src/main/scala/com/salesforce/op/evaluators/OpForecastEvaluator.scala b/core/src/main/scala/com/salesforce/op/evaluators/OpForecastEvaluator.scala index dcc8739f74..faf0e27c16 100644 --- a/core/src/main/scala/com/salesforce/op/evaluators/OpForecastEvaluator.scala +++ b/core/src/main/scala/com/salesforce/op/evaluators/OpForecastEvaluator.scala @@ -39,22 +39,35 @@ import com.twitter.algebird.Operators._ import com.twitter.algebird.Semigroup import com.twitter.algebird.macros.caseclass +import scala.collection.mutable + /** + * * Instance to evaluate Forecast metrics * The metrics are SMAPE * Default evaluation returns SMAPE * + * See: https://www.m4.unic.ac.cy/wp-content/uploads/2018/03/M4-Competitors-Guide.pdf + * + * @param seasonalWindow length of the season (e.g. 7 for daily data with weekly seasonality) + * @param maxItems max number of items to process (default: 10 years of hourly data) * @param name name of default metric * @param isLargerBetter is metric better if larger * @param uid uid for instance */ + private[op] class OpForecastEvaluator ( + val seasonalWindow: Int = 1, + val maxItems: Int = 87660, override val name: EvalMetric = OpEvaluatorNames.Forecast, override val isLargerBetter: Boolean = false, override val uid: String = UID[OpForecastEvaluator] ) extends OpRegressionEvaluatorBase[ForecastMetrics](uid) { + require(seasonalWindow > 0, "seasonalWindow must not be negative") + require(maxItems > 0, "maxItems must not be negative") + @transient private lazy val log = LoggerFactory.getLogger(this.getClass) def getDefaultMetric: ForecastMetrics => Double = _.SMAPE @@ -62,40 +75,58 @@ private[op] class OpForecastEvaluator override def evaluateAll(data: Dataset[_]): ForecastMetrics = { val dataUse = makeDataToUse(data, getLabelCol) - val smape: Double = getSMAPE(dataUse, getLabelCol, getPredictionValueCol) - val metrics = ForecastMetrics(SMAPE = smape) - + val metrics = computeMetrics(dataUse, getLabelCol, getPredictionValueCol) log.info("Evaluated metrics: {}", metrics.toString) metrics - } - protected def getSMAPE(data: Dataset[_], labelCol: String, predictionValueCol: String): Double = { - data.select(labelCol, predictionValueCol).rdd - .map(r => SMAPEValue(r.getAs[Double](0), r.getAs[Double](1))) - .reduce(_ + _).value } -} -/** - * SMAPE value computation. See formula here: - * https://www.m4.unic.ac.cy/wp-content/uploads/2018/03/M4-Competitors-Guide.pdf - * - * @param SMAPE symmetric Mean Absolute Percentage Error - */ -object SMAPEValue { - def apply(y: Double, yHat: Double): SMAPEValue = { - SMAPEValue(2 * Math.abs(y - yHat), Math.abs(y) + Math.abs(yHat), 1L) - } - implicit val smapeSG: Semigroup[SMAPEValue] = caseclass.semigroup[SMAPEValue] -} + protected def computeMetrics(data: Dataset[_], labelCol: String, predictionValueCol: String): ForecastMetrics = { + + val rows = data.select(labelCol, predictionValueCol).rdd + .map(r => (r.getAs[Double](0), r.getAs[Double](1))).take(maxItems) + + val cnt = rows.length + val seasonalLimit = cnt - seasonalWindow + var i = 0 + var (seasonalAbsDiff, absDiffSum, smapeSum) = (0.0, 0.0, 0.0) + + while (i < cnt) { + val (y, yHat) = rows(i) + + if (i < seasonalLimit) { + val (ySeasonal, _) = rows(i + seasonalWindow) + seasonalAbsDiff += Math.abs(y - ySeasonal) + } + val absDiff = Math.abs(y - yHat) + + val sumAbs = Math.abs(y) + Math.abs(yHat) + if (sumAbs > 0) { + smapeSum += absDiff / sumAbs + } -case class SMAPEValue private (nominator: Double, denominator: Double, cnt: Long) { - def value: Double = if (denominator == 0.0) Double.NaN else (nominator / denominator) / cnt + absDiffSum += absDiff + i += 1 + } + + val seasonalError = seasonalAbsDiff / seasonalLimit + val maseDenominator = seasonalError * cnt + + ForecastMetrics( + SMAPE = if (cnt > 0) 2 * smapeSum / cnt else 0.0, + SeasonalError = seasonalError, + MASE = if (maseDenominator > 0) absDiffSum / maseDenominator else 0.0 + ) + + } } /** * Metrics of Forecasting Problem * - * @param SMAPE symmetric Mean Absolute Percentage Error + * @param SMAPE Symmetric Mean Absolute Percentage Error + * @param SeasonalError Seasonal Error + * @param MASE Mean Absolute Scaled Error + * */ -case class ForecastMetrics(SMAPE: Double) extends EvaluationMetrics +case class ForecastMetrics(SMAPE: Double, SeasonalError: Double, MASE: Double) extends EvaluationMetrics diff --git a/core/src/test/scala/com/salesforce/op/evaluators/OpForecastEvaluatorTest.scala b/core/src/test/scala/com/salesforce/op/evaluators/OpForecastEvaluatorTest.scala index 7a5314b972..f6ccceb3c0 100644 --- a/core/src/test/scala/com/salesforce/op/evaluators/OpForecastEvaluatorTest.scala +++ b/core/src/test/scala/com/salesforce/op/evaluators/OpForecastEvaluatorTest.scala @@ -44,6 +44,8 @@ import org.scalatest.junit.JUnitRunner @RunWith(classOf[JUnitRunner]) class OpForecastEvaluatorTest extends FlatSpec with TestSparkContext { + import spark.implicits._ + val (ds, rawLabel, features) = TestFeatureBuilder[RealNN, OPVector]( Seq( (12.0, Vectors.dense(1.0, 4.3, 1.3)), @@ -87,7 +89,7 @@ class OpForecastEvaluatorTest extends FlatSpec with TestSparkContext { val transformedData = model.setInput(label, features).transform(ds) val metrics = testEvaluator.evaluateAll(transformedData).toMetadata() - metrics.getDouble(ForecastEvalMetrics.SMAPE.toString) shouldBe (0.0075 +- 1e-4) + metrics.getDouble(ForecastEvalMetrics.SMAPE.toString) shouldBe (0.08979 +- 1e-4) } @@ -95,15 +97,51 @@ class OpForecastEvaluatorTest extends FlatSpec with TestSparkContext { val model = testEstimator2.fit(ds) val transformedData = model.setInput(label, features).transform(ds) val metrics = testEvaluator2.evaluateAll(transformedData).toMetadata() - metrics.getDouble(ForecastEvalMetrics.SMAPE.toString) shouldBe (0.0072 +- 1e-4) + metrics.getDouble(ForecastEvalMetrics.SMAPE.toString) shouldBe (0.09013 +- 1e-4) } - it should "evaluate the metrics when data is 0" in { - val data = Seq(0.0, 0.0, 0.0).map(x => (x, Map("prediction" -> x))) - import spark.implicits._ + it should "evaluate the metrics when data is empty" in { + val data = Seq().map(x => (x, Map("prediction" -> x))) val df = spark.sparkContext.parallelize(data).toDF("f1", "r1") val metrics = new OpForecastEvaluator().setLabelCol("f1").setPredictionCol("r1").evaluateAll(df).toMetadata() - metrics.getDouble(ForecastEvalMetrics.SMAPE.toString).isNaN shouldBe true + metrics.getDouble(ForecastEvalMetrics.SMAPE.toString) shouldBe 0.0 + metrics.getDouble(ForecastEvalMetrics.MASE.toString) shouldBe 0.0 + metrics.getDouble(ForecastEvalMetrics.SeasonalError.toString) shouldBe 0.0 + } + + it should "verify that Forecast Metrics works" in { + val dataLength = 100 + val seasonalWindow = 25 + val data = (0 until dataLength).map(x => { + val y = Math.sin((x.toDouble / dataLength) * 2.0 * Math.PI) + (y, Map("prediction" -> y * 1.2)) + }) + + val df = spark.sparkContext.parallelize(data).toDF("f1", "r1") + val metrics = new OpForecastEvaluator(seasonalWindow) + .setLabelCol("f1").setPredictionCol("r1").evaluateAll(df).toMetadata() + + metrics.getDouble(ForecastEvalMetrics.SMAPE.toString) shouldBe (0.18 +- 1e-3) + metrics.getDouble(ForecastEvalMetrics.MASE.toString) shouldBe (0.16395 +- 1e-5) + metrics.getDouble(ForecastEvalMetrics.SeasonalError.toString) shouldBe (0.77634 +- 1e-5) + + } + + it should "verify that Forecast Metrics works if seasonalWindow is too large" in { + val dataLength = 100 + val seasonalWindow = 101 + val data = (0 until dataLength).map(x => { + val y = Math.sin((x.toDouble / dataLength) * 2.0 * Math.PI) + (y, Map("prediction" -> y * 1.2)) + }) + + val df = spark.sparkContext.parallelize(data).toDF("f1", "r1") + val metrics = new OpForecastEvaluator(seasonalWindow) + .setLabelCol("f1").setPredictionCol("r1").evaluateAll(df).toMetadata() + + metrics.getDouble(ForecastEvalMetrics.SMAPE.toString) shouldBe (0.18 +- 1e-3) + metrics.getDouble(ForecastEvalMetrics.MASE.toString) shouldBe 0.0 + metrics.getDouble(ForecastEvalMetrics.SeasonalError.toString) shouldBe 0.0 } }