diff --git a/build.gradle b/build.gradle index 3f0385aa53..ba80c2204b 100644 --- a/build.gradle +++ b/build.gradle @@ -60,7 +60,7 @@ configure(allProjs) { scalaCheckVersion = '1.14.0' junitVersion = '4.11' avroVersion = '1.7.7' - sparkVersion = '2.2.1' + sparkVersion = '2.3.2' sparkAvroVersion = '4.0.0' scalaGraphVersion = '1.12.5' scalafmtVersion = '1.5.1' @@ -69,7 +69,7 @@ configure(allProjs) { json4sVersion = '3.2.11' // matches Spark dependency version jodaTimeVersion = '2.9.4' jodaConvertVersion = '1.8.1' - algebirdVersion = '0.12.3' + algebirdVersion = '0.13.4' jacksonVersion = '2.7.3' luceneVersion = '7.3.0' enumeratumVersion = '1.4.12' @@ -77,12 +77,12 @@ configure(allProjs) { googleLibPhoneNumberVersion = '8.8.5' googleGeoCoderVersion = '2.82' googleCarrierVersion = '1.72' - chillAvroVersion = '0.8.0' + chillVersion = '0.8.4' reflectionsVersion = '0.9.11' collectionsVersion = '3.2.2' optimaizeLangDetectorVersion = '0.0.1' tikaVersion = '1.16' - sparkTestingBaseVersion = '2.2.0_0.8.0' + sparkTestingBaseVersion = '2.3.1_0.10.0' sourceCodeVersion = '0.1.3' pegdownVersion = '1.4.2' commonsValidatorVersion = '1.6' @@ -90,6 +90,8 @@ configure(allProjs) { scoveragePluginVersion = '1.3.1' hadrianVersion = '0.8.5' aardpfarkVersion = '0.1.0-SNAPSHOT' + xgboostVersion = '0.80' + akkaSlf4jVersion = '2.3.11' mainClassName = 'com.salesforce.Main' } diff --git a/core/build.gradle b/core/build.gradle index 7aafccbbc7..35b3dc40d3 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -22,4 +22,9 @@ dependencies { // Scopt compile "com.github.scopt:scopt_$scalaVersion:$scoptVersion" + + // XGBoost + compile "ml.dmlc:xgboost4j-spark:$xgboostVersion" + // Akka slfj4 logging (version matches XGBoost dependency) + testCompile "com.typesafe.akka:akka-slf4j_$scalaVersion:$akkaSlf4jVersion" } diff --git a/core/src/main/scala/com/salesforce/op/ModelInsights.scala b/core/src/main/scala/com/salesforce/op/ModelInsights.scala index bc7de3bc46..b585758b8a 100644 --- a/core/src/main/scala/com/salesforce/op/ModelInsights.scala +++ b/core/src/main/scala/com/salesforce/op/ModelInsights.scala @@ -46,6 +46,7 @@ import com.salesforce.op.utils.spark.RichMetadata._ import com.salesforce.op.utils.spark.{OpVectorColumnMetadata, OpVectorMetadata} import com.salesforce.op.utils.table.Alignment._ import com.salesforce.op.utils.table.Table +import ml.dmlc.xgboost4j.scala.spark.{XGBoostClassificationModel, XGBoostRegressionModel} import org.apache.spark.ml.classification._ import org.apache.spark.ml.regression._ import org.apache.spark.ml.{Model, PipelineStage, Transformer} @@ -631,43 +632,45 @@ case object ModelInsights { } private[op] def getModelContributions(model: Option[Model[_]]): Seq[Seq[Double]] = { - model.map { - case m: SparkWrapperParams[_] => m.getSparkMlStage() match { // TODO add additional models - case Some(m: LogisticRegressionModel) => m.coefficientMatrix.rowIter.toSeq.map(_.toArray.toSeq) - case Some(m: RandomForestClassificationModel) => Seq(m.featureImportances.toArray.toSeq) - case Some(m: NaiveBayesModel) => m.theta.rowIter.toSeq.map(_.toArray.toSeq) - case Some(m: DecisionTreeClassificationModel) => Seq(m.featureImportances.toArray.toSeq) - case Some(m: GBTClassificationModel) => Seq(m.featureImportances.toArray.toSeq) - case Some(m: LinearSVCModel) => Seq(m.coefficients.toArray.toSeq) - case Some(m: LinearRegressionModel) => Seq(m.coefficients.toArray.toSeq) - case Some(m: DecisionTreeRegressionModel) => Seq(m.featureImportances.toArray.toSeq) - case Some(m: RandomForestRegressionModel) => Seq(m.featureImportances.toArray.toSeq) - case Some(m: GBTRegressionModel) => Seq(m.featureImportances.toArray.toSeq) - case Some(m: GeneralizedLinearRegressionModel) => Seq(m.coefficients.toArray.toSeq) - case _ => Seq.empty[Seq[Double]] - } - case _ => Seq.empty[Seq[Double]] - }.getOrElse(Seq.empty[Seq[Double]]) + val stage = model.flatMap { + case m: SparkWrapperParams[_] => m.getSparkMlStage() + case _ => None + } + val contributions = stage.collect { + case m: LogisticRegressionModel => m.coefficientMatrix.rowIter.toSeq.map(_.toArray.toSeq) + case m: RandomForestClassificationModel => Seq(m.featureImportances.toArray.toSeq) + case m: NaiveBayesModel => m.theta.rowIter.toSeq.map(_.toArray.toSeq) + case m: DecisionTreeClassificationModel => Seq(m.featureImportances.toArray.toSeq) + case m: GBTClassificationModel => Seq(m.featureImportances.toArray.toSeq) + case m: LinearSVCModel => Seq(m.coefficients.toArray.toSeq) + case m: LinearRegressionModel => Seq(m.coefficients.toArray.toSeq) + case m: DecisionTreeRegressionModel => Seq(m.featureImportances.toArray.toSeq) + case m: RandomForestRegressionModel => Seq(m.featureImportances.toArray.toSeq) + case m: GBTRegressionModel => Seq(m.featureImportances.toArray.toSeq) + case m: GeneralizedLinearRegressionModel => Seq(m.coefficients.toArray.toSeq) + case m: XGBoostRegressionModel => Seq(m.nativeBooster.getFeatureScore().values.map(_.toDouble).toSeq) + case m: XGBoostClassificationModel => Seq(m.nativeBooster.getFeatureScore().values.map(_.toDouble).toSeq) + } + contributions.getOrElse(Seq.empty) } private def getModelInfo(model: Option[Model[_]]): Option[ModelSelectorSummary] = { model match { - case Some(m: SelectedModel) => Try(ModelSelectorSummary.fromMetadata(m.getMetadata().getSummaryMetadata())) - .toOption + case Some(m: SelectedModel) => + Try(ModelSelectorSummary.fromMetadata(m.getMetadata().getSummaryMetadata())).toOption case _ => None } } private def getStageInfo(stages: Array[OPStage]): Map[String, Any] = { - def getParams(stage: PipelineStage): Map[String, String] = - stage.extractParamMap().toSeq - .collect{ - case p if p.param.name == OpPipelineStageParamsNames.InputFeatures => - p.param.name -> p.value.asInstanceOf[Array[TransientFeature]].map(_.toJsonString()).mkString(", ") - case p if p.param.name != OpPipelineStageParamsNames.OutputMetadata && - p.param.name != OpPipelineStageParamsNames.InputSchema => p.param.name -> p.value.toString - }.toMap - + def getParams(stage: PipelineStage): Map[String, String] = { + stage.extractParamMap().toSeq.collect { + case p if p.param.name == OpPipelineStageParamsNames.InputFeatures => + p.param.name -> p.value.asInstanceOf[Array[TransientFeature]].map(_.toJsonString()).mkString(", ") + case p if p.param.name != OpPipelineStageParamsNames.OutputMetadata && + p.param.name != OpPipelineStageParamsNames.InputSchema => p.param.name -> p.value.toString + }.toMap + } stages.map { s => val params = s match { case m: Model[_] => getParams(if (m.hasParent) m.parent else m) // try for parent estimator so can get params diff --git a/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala b/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala index f544883818..545d71dad4 100644 --- a/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala +++ b/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala @@ -31,8 +31,8 @@ package com.salesforce.op.evaluators import com.fasterxml.jackson.databind.annotation.JsonDeserialize import com.salesforce.op.UID -import com.twitter.algebird.Monoid._ import com.twitter.algebird.Operators._ +import com.twitter.algebird.Tuple4Semigroup import org.apache.spark.ml.linalg.Vector import org.apache.spark.sql.functions.col import org.apache.spark.sql.types.DoubleType @@ -84,6 +84,7 @@ private[op] class OpBinScoreEvaluator // Finding stats per bin -> avg score, avg conv rate, // total num of data points and overall brier score. + implicit val sg = new Tuple4Semigroup[Double, Double, Long, Double]() val stats = scoreAndLabels.map { case (score, label) => (getBinIndex(score, minScore, maxScore), (score, label, 1L, math.pow(score - label, 2))) diff --git a/core/src/main/scala/com/salesforce/op/evaluators/OpMultiClassificationEvaluator.scala b/core/src/main/scala/com/salesforce/op/evaluators/OpMultiClassificationEvaluator.scala index e774723c4a..6d957a9c56 100644 --- a/core/src/main/scala/com/salesforce/op/evaluators/OpMultiClassificationEvaluator.scala +++ b/core/src/main/scala/com/salesforce/op/evaluators/OpMultiClassificationEvaluator.scala @@ -34,6 +34,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize import com.salesforce.op.UID import com.twitter.algebird.Monoid._ import com.twitter.algebird.Operators._ +import com.twitter.algebird.Tuple2Semigroup import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.param.{DoubleArrayParam, IntArrayParam} @@ -226,8 +227,8 @@ private[op] class OpMultiClassificationEvaluator .map(_ -> (new Array[Long](nThresholds), new Array[Long](nThresholds))) .toMap[Label, CorrIncorr] - val agg: MetricsMap = - data.treeAggregate[MetricsMap](zeroValue)(combOp = _ + _, seqOp = _ + computeMetrics(_)) + implicit val sgTuple2 = new Tuple2Semigroup[Array[Long], Array[Long]]() + val agg: MetricsMap = data.treeAggregate[MetricsMap](zeroValue)(combOp = _ + _, seqOp = _ + computeMetrics(_)) val nRows = data.count() ThresholdMetrics( diff --git a/core/src/main/scala/com/salesforce/op/filters/RawFeatureFilter.scala b/core/src/main/scala/com/salesforce/op/filters/RawFeatureFilter.scala index 1f82ba4fc4..7f5d48d6b1 100644 --- a/core/src/main/scala/com/salesforce/op/filters/RawFeatureFilter.scala +++ b/core/src/main/scala/com/salesforce/op/filters/RawFeatureFilter.scala @@ -33,12 +33,15 @@ package com.salesforce.op.filters import com.salesforce.op.OpParams import com.salesforce.op.features.types._ import com.salesforce.op.features.{OPFeature, TransientFeature} +import com.salesforce.op.filters.FeatureDistribution._ +import com.salesforce.op.filters.Summary._ import com.salesforce.op.readers.{DataFrameFieldNames, Reader} import com.salesforce.op.stages.impl.feature.TimePeriod import com.salesforce.op.stages.impl.preparators.CorrelationType import com.salesforce.op.utils.spark.RichRow._ import com.twitter.algebird.Monoid._ import com.twitter.algebird.Operators._ +import com.twitter.algebird.Tuple2Semigroup import org.apache.spark.mllib.linalg.{Matrix, Vector} import org.apache.spark.mllib.stat.Statistics import org.apache.spark.rdd.RDD @@ -139,17 +142,19 @@ class RawFeatureFilter[T] None } val predOut = allPredictors.map(TransientFeature(_)) - (respOut, predOut) } - val preparedFeatures: RDD[PreparedFeatures] = - data.rdd.map(PreparedFeatures(_, responses, predictors, timePeriod)) + val preparedFeatures: RDD[PreparedFeatures] = data.rdd.map(PreparedFeatures(_, responses, predictors, timePeriod)) + + implicit val sgTuple2Maps = new Tuple2Semigroup[Map[FeatureKey, Summary], Map[FeatureKey, Summary]]() // Have to use the training summaries do process scoring for comparison val (responseSummaries, predictorSummaries): (Map[FeatureKey, Summary], Map[FeatureKey, Summary]) = allFeatureInfo.map(info => info.responseSummaries -> info.predictorSummaries) .getOrElse(preparedFeatures.map(_.summaries).reduce(_ + _)) val (responseSummariesArr, predictorSummariesArr): (Array[(FeatureKey, Summary)], Array[(FeatureKey, Summary)]) = (responseSummaries.toArray, predictorSummaries.toArray) + + implicit val sgTuple2Feats = new Tuple2Semigroup[Array[FeatureDistribution], Array[FeatureDistribution]]() val (responseDistributions, predictorDistributions): (Array[FeatureDistribution], Array[FeatureDistribution]) = preparedFeatures .map(_.getFeatureDistributions( @@ -157,8 +162,7 @@ class RawFeatureFilter[T] predictorSummaries = predictorSummariesArr, bins = bins, textBinsFormula = textBinsFormula - )) - .reduce(_ + _) // NOTE: resolved semigroup is IndexedSeqSemigroup + )).reduce(_ + _) val correlationInfo: Map[FeatureKey, Map[FeatureKey, Double]] = allFeatureInfo.map(_.correlationInfo).getOrElse { val responseKeys: Array[FeatureKey] = responseSummariesArr.map(_._1) diff --git a/core/src/main/scala/com/salesforce/op/filters/Summary.scala b/core/src/main/scala/com/salesforce/op/filters/Summary.scala index d158e19347..3e299ab8e0 100644 --- a/core/src/main/scala/com/salesforce/op/filters/Summary.scala +++ b/core/src/main/scala/com/salesforce/op/filters/Summary.scala @@ -47,9 +47,10 @@ case object Summary { val empty: Summary = Summary(Double.PositiveInfinity, Double.NegativeInfinity, 0.0, 0.0) implicit val monoid: Monoid[Summary] = new Monoid[Summary] { - override def zero = empty - override def plus(l: Summary, r: Summary) = Summary(math.min(l.min, r.min), math.max(l.max, r.max), - l.sum + r.sum, l.count + r.count) + override def zero = Summary.empty + override def plus(l: Summary, r: Summary) = Summary( + math.min(l.min, r.min), math.max(l.max, r.max), l.sum + r.sum, l.count + r.count + ) } /** diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpLinearSVC.scala b/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpLinearSVC.scala index 0f3d9381ff..425d43a866 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpLinearSVC.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpLinearSVC.scala @@ -151,15 +151,15 @@ class OpLinearSVCModel ttov: TypeTag[Prediction#Value] ) extends OpPredictorWrapperModel[LinearSVCModel](uid = uid, operationName = operationName, sparkModel = sparkModel) { - @transient private lazy val predictRaw = reflectMethod(getSparkMlStage().get, "predictRaw") - @transient private lazy val predict = reflectMethod(getSparkMlStage().get, "predict") + @transient lazy private val predictRaw = reflectMethod(getSparkMlStage().get, "predictRaw") + @transient lazy private val predict = reflectMethod(getSparkMlStage().get, "predict") /** * Function used to convert input to output */ override def transformFn: (RealNN, OPVector) => Prediction = (label, features) => { - val raw = predictRaw.apply(features.value).asInstanceOf[Vector] - val pred = predict.apply(features.value).asInstanceOf[Double] + val raw = predictRaw(features.value).asInstanceOf[Vector] + val pred = predict(features.value).asInstanceOf[Double] Prediction(rawPrediction = raw, prediction = pred) } diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpLogisticRegression.scala b/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpLogisticRegression.scala index 3b54cf9351..f0c33ad08a 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpLogisticRegression.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpLogisticRegression.scala @@ -195,8 +195,8 @@ class OpLogisticRegression(uid: String = UID[OpLogisticRegression]) class OpLogisticRegressionModel ( sparkModel: LogisticRegressionModel, - operationName: String = classOf[LogisticRegression].getSimpleName, - uid: String = UID[OpLogisticRegressionModel] + uid: String = UID[OpLogisticRegressionModel], + operationName: String = classOf[LogisticRegression].getSimpleName )( implicit tti1: TypeTag[RealNN], tti2: TypeTag[OPVector], @@ -210,4 +210,3 @@ class OpLogisticRegressionModel @transient lazy val probability2predictionMirror = reflectMethod(getSparkMlStage().get, "probability2prediction") } - diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpMultilayerPerceptronClassifier.scala b/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpMultilayerPerceptronClassifier.scala index 21bf7f9c59..43a26a8ec1 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpMultilayerPerceptronClassifier.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpMultilayerPerceptronClassifier.scala @@ -33,7 +33,7 @@ package com.salesforce.op.stages.impl.classification import com.salesforce.op.UID import com.salesforce.op.features.types.{OPVector, Prediction, RealNN} import com.salesforce.op.stages.impl.CheckIsResponseValues -import com.salesforce.op.stages.sparkwrappers.specific.{OpPredictionModel, OpPredictorWrapper} +import com.salesforce.op.stages.sparkwrappers.specific.{OpPredictorWrapper, OpProbabilisticClassifierModel} import com.salesforce.op.utils.reflection.ReflectionUtils.reflectMethod import org.apache.spark.ml.classification.{MultilayerPerceptronClassificationModel, MultilayerPerceptronClassifier, OpMultilayerPerceptronClassifierParams} import org.apache.spark.ml.linalg.Vector @@ -128,7 +128,6 @@ class OpMultilayerPerceptronClassifier(uid: String = UID[OpMultilayerPerceptronC * @param uid uid to give stage * @param operationName unique name of the operation this stage performs */ -// TODO in next release of spark this will be a probabilistic classifier class OpMultilayerPerceptronClassificationModel ( sparkModel: MultilayerPerceptronClassificationModel, @@ -139,9 +138,12 @@ class OpMultilayerPerceptronClassificationModel tti2: TypeTag[OPVector], tto: TypeTag[Prediction], ttov: TypeTag[Prediction#Value] -) extends OpPredictionModel[MultilayerPerceptronClassificationModel]( +) extends OpProbabilisticClassifierModel[MultilayerPerceptronClassificationModel]( sparkModel = sparkModel, uid = uid, operationName = operationName ) { - @transient lazy val predictMirror = reflectMethod(getSparkMlStage().get, "predict") + @transient lazy val predictRawMirror = reflectMethod(getSparkMlStage().get, "predictRaw") + @transient lazy val raw2probabilityMirror = reflectMethod(getSparkMlStage().get, "raw2probability") + @transient lazy val probability2predictionMirror = + reflectMethod(getSparkMlStage().get, "probability2prediction") } diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpXGBoostClassifier.scala b/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpXGBoostClassifier.scala new file mode 100644 index 0000000000..bdfc3d5d41 --- /dev/null +++ b/core/src/main/scala/com/salesforce/op/stages/impl/classification/OpXGBoostClassifier.scala @@ -0,0 +1,374 @@ +/* + * Copyright (c) 2017, Salesforce.com, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * * Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.salesforce.op.stages.impl.classification + +import com.salesforce.op.UID +import com.salesforce.op.features.types.{OPVector, Prediction, RealNN} +import com.salesforce.op.stages.impl.CheckIsResponseValues +import com.salesforce.op.stages.sparkwrappers.specific.{OpPredictorWrapper, OpProbabilisticClassifierModel} +import com.salesforce.op.utils.reflection.ReflectionUtils.reflectMethod +import ml.dmlc.xgboost4j.scala.spark._ +import ml.dmlc.xgboost4j.scala.{DMatrix, EvalTrait, ObjectiveTrait} +import org.apache.spark.ml.linalg.Vectors + +import scala.reflect.runtime.universe._ + +/** + * Wrapper around XGBoost classifier [[XGBoostClassifier]] + */ +class OpXGBoostClassifier(uid: String = UID[OpXGBoostClassifier]) + extends OpPredictorWrapper[XGBoostClassifier, XGBoostClassificationModel]( + predictor = new XGBoostClassifier(), + uid = uid + ) with OpXGBoostClassifierParams { + + override protected def onSetInput(): Unit = { + super.onSetInput() + CheckIsResponseValues(in1, in2) + } + + /** + * Weight column name. If this is not set or empty, we treat all instance weights as 1.0. + */ + def setWeightCol(value: String): this.type = set(weightCol, value) + + /** + * Initial prediction (aka base margin) column name. + */ + def setBaseMarginCol(value: String): this.type = set(baseMarginCol, value) + + /** + * Number of classes + */ + def setNumClass(value: Int): this.type = set(numClass, value) + + // setters for general params + + /** + * Rabit tracker configurations. The parameter must be provided as an instance of the + * [[TrackerConf]] class, which has the following definition: + * + * case class TrackerConf(workerConnectionTimeout: Duration, trainingTimeout: Duration, trackerImpl: String) + * + * See below for detailed explanations. + * + * - trackerImpl: Select the implementation of Rabit tracker. + * default: "python" + * + * Choice between "python" or "scala". The former utilizes the Java wrapper of the + * Python Rabit tracker (in dmlc_core), and does not support timeout settings. + * The "scala" version removes Python components, and fully supports timeout settings. + * + * - workerConnectionTimeout: the maximum wait time for all workers to connect to the tracker. + * default: 0 millisecond (no timeout) + * + * The timeout value should take the time of data loading and pre-processing into account, + * due to the lazy execution of Spark's operations. Alternatively, you may force Spark to + * perform data transformation before calling XGBoost.train(), so that this timeout truly + * reflects the connection delay. Set a reasonable timeout value to prevent model + * training/testing from hanging indefinitely, possible due to network issues. + * Note that zero timeout value means to wait indefinitely (equivalent to Duration.Inf). + * Ignored if the tracker implementation is "python". + */ + def setTrackerConf(value: TrackerConf): this.type = set(trackerConf, value) + + /** + * The number of rounds for boosting + */ + def setNumRound(value: Int): this.type = set(numRound, value) + + /** + * Number of workers used to train xgboost model. default: 1 + */ + def setNumWorkers(value: Int): this.type = set(numWorkers, value) + + /** + * Number of threads used by per worker. default 1 + */ + def setNthread(value: Int): this.type = set(nthread, value) + + /** + * Whether to use external memory as cache. default: false + */ + def setUseExternalMemory(value: Boolean): this.type = set(useExternalMemory, value) + + /** + * 0 means printing running messages, 1 means silent mode. default: 0 + */ + def setSilent(value: Int): this.type = set(silent, value) + + /** + * The value treated as missing + */ + def setMissing(value: Float): this.type = set(missing, value) + + /** + * The maximum time to wait for the job requesting new workers. default: 30 minutes + */ + def setTimeoutRequestWorkers(value: Long): this.type = set(timeoutRequestWorkers, value) + + /** + * The hdfs folder to load and save checkpoint boosters. default: `empty_string` + */ + def setCheckpointPath(value: String): this.type = set(checkpointPath, value) + + /** + * Checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that + * the trained model will get checkpointed every 10 iterations. Note: `checkpoint_path` must + * also be set if the checkpoint interval is greater than 0. + */ + def setCheckpointInterval(value: Int): this.type = set(checkpointInterval, value) + + /** + * Random seed for the C++ part of XGBoost and train/test splitting. + */ + def setSeed(value: Long): this.type = set(seed, value) + + /** + * Step size shrinkage used in update to prevents overfitting. After each boosting step, we + * can directly get the weights of new features and eta actually shrinks the feature weights + * to make the boosting process more conservative. [default=0.3] range: [0,1] + */ + def setEta(value: Double): this.type = set(eta, value) + + /** + * Minimum loss reduction required to make a further partition on a leaf node of the tree. + * the larger, the more conservative the algorithm will be. [default=0] range: [0, + * Double.MaxValue] + */ + def setGamma(value: Double): this.type = set(gamma, value) + + /** + * Maximum depth of a tree, increase this value will make model more complex / likely to be + * overfitting. [default=6] range: [1, Int.MaxValue] + */ + def setMaxDepth(value: Int): this.type = set(maxDepth, value) + + /** + * Minimum sum of instance weight(hessian) needed in a child. If the tree partition step results + * in a leaf node with the sum of instance weight less than min_child_weight, then the building + * process will give up further partitioning. In linear regression mode, this simply corresponds + * to minimum number of instances needed to be in each node. The larger, the more conservative + * the algorithm will be. [default=1] range: [0, Double.MaxValue] + */ + def setMinChildWeight(value: Double): this.type = set(minChildWeight, value) + + /** + * Maximum delta step we allow each tree's weight estimation to be. If the value is set to 0, it + * means there is no constraint. If it is set to a positive value, it can help making the update + * step more conservative. Usually this parameter is not needed, but it might help in logistic + * regression when class is extremely imbalanced. Set it to value of 1-10 might help control the + * update. [default=0] range: [0, Double.MaxValue] + */ + def setMaxDeltaStep(value: Double): this.type = set(maxDeltaStep, value) + + /** + * Subsample ratio of the training instance. Setting it to 0.5 means that XGBoost randomly + * collected half of the data instances to grow trees and this will prevent overfitting. + * [default=1] range:(0,1] + */ + def setSubsample(value: Double): this.type = set(subsample, value) + + /** + * Subsample ratio of columns when constructing each tree. [default=1] range: (0,1] + */ + def setColsampleBytree(value: Double): this.type = set(colsampleBytree, value) + + /** + * Subsample ratio of columns for each split, in each level. [default=1] range: (0,1] + */ + def setColsampleBylevel(value: Double): this.type = set(colsampleBylevel, value) + + /** + * L2 regularization term on weights, increase this value will make model more conservative. + * [default=1] + */ + def setLambda(value: Double): this.type = set(lambda, value) + + /** + * L1 regularization term on weights, increase this value will make model more conservative. + * [default=0] + */ + def setAlpha(value: Double): this.type = set(alpha, value) + + /** + * The tree construction algorithm used in XGBoost. options: {'auto', 'exact', 'approx'} + * [default='auto'] + */ + def setTreeMethod(value: String): this.type = set(treeMethod, value) + + /** + * Growth policy for fast histogram algorithm + */ + def setGrowPolicy(value: String): this.type = set(growPolicy, value) + + /** + * Maximum number of bins in histogram + */ + def setMaxBins(value: Int): this.type = set(maxBins, value) + + /** + * This is only used for approximate greedy algorithm. + * This roughly translated into O(1 / sketch_eps) number of bins. Compared to directly select + * number of bins, this comes with theoretical guarantee with sketch accuracy. + * [default=0.03] range: (0, 1) + */ + def setSketchEps(value: Double): this.type = set(sketchEps, value) + + /** + * Control the balance of positive and negative weights, useful for unbalanced classes. A typical + * value to consider: sum(negative cases) / sum(positive cases). [default=1] + */ + def setScalePosWeight(value: Double): this.type = set(scalePosWeight, value) + + /** + * Parameter for Dart booster. + * Type of sampling algorithm. "uniform": dropped trees are selected uniformly. + * "weighted": dropped trees are selected in proportion to weight. [default="uniform"] + */ + def setSampleType(value: String): this.type = set(sampleType, value) + + /** + * Parameter of Dart booster. + * type of normalization algorithm, options: {'tree', 'forest'}. [default="tree"] + */ + def setNormalizeType(value: String): this.type = set(normalizeType, value) + + /** + * Parameter of Dart booster. + * dropout rate. [default=0.0] range: [0.0, 1.0] + */ + def setRateDrop(value: Double): this.type = set(rateDrop, value) + + /** + * Parameter of Dart booster. + * probability of skip dropout. If a dropout is skipped, new trees are added in the same manner + * as gbtree. [default=0.0] range: [0.0, 1.0] + */ + def setSkipDrop(value: Double): this.type = set(skipDrop, value) + + /** + * Parameter of linear booster + * L2 regularization term on bias, default 0(no L1 reg on bias because it is not important) + */ + def setLambdaBias(value: Double): this.type = set(lambdaBias, value) + + // setters for learning params + def setObjective(value: String): this.type = set(objective, value) + + /** + * Specify the learning task and the corresponding learning objective. + * options: reg:linear, reg:logistic, binary:logistic, binary:logitraw, count:poisson, + * multi:softmax, multi:softprob, rank:pairwise, reg:gamma. default: reg:linear + */ + def setBaseScore(value: Double): this.type = set(baseScore, value) + + /** + * Evaluation metrics for validation data, a default metric will be assigned according to + * objective(rmse for regression, and error for classification, mean average precision for + * ranking). options: rmse, mae, logloss, error, merror, mlogloss, auc, aucpr, ndcg, map, + * gamma-deviance + */ + def setEvalMetric(value: String): this.type = set(evalMetric, value) + + /** + * Fraction of training points to use for testing. + */ + def setTrainTestRatio(value: Double): this.type = set(trainTestRatio, value) + + /** + * If non-zero, the training will be stopped after a specified number + * of consecutive increases in any evaluation metric. + */ + def setNumEarlyStoppingRounds(value: Int): this.type = set(numEarlyStoppingRounds, value) + + /** + * Customized objective function provided by user. default: null + */ + def setCustomObj(value: ObjectiveTrait): this.type = set(customObj, value) + + /** + * Customized evaluation function provided by user. default: null + */ + def setCustomEval(value: EvalTrait): this.type = set(customEval, value) + +} + + +/** + * Class that takes in a spark [[XGBoostClassificationModel]] and wraps it into an OP model which returns a + * Prediction feature + * + * @param sparkModel model to wrap + * @param operationName unique name of the operation this stage performs + * @param uid uid to give stage + */ +class OpXGBoostClassificationModel +( + sparkModel: XGBoostClassificationModel, + uid: String = UID[OpXGBoostClassificationModel], + operationName: String = classOf[XGBoostClassifier].getSimpleName +)( + implicit tti1: TypeTag[RealNN], + tti2: TypeTag[OPVector], + tto: TypeTag[Prediction], + ttov: TypeTag[Prediction#Value] +) extends OpProbabilisticClassifierModel[XGBoostClassificationModel]( + sparkModel = sparkModel, uid = uid, operationName = operationName +) { + import OpXGBoost._ + + protected def predictRawMirror: MethodMirror = + throw new NotImplementedError( + "XGBoost-Spark does not support 'predictRaw'. This might change in upcoming releases.") + + protected def raw2probabilityMirror: MethodMirror = + throw new NotImplementedError( + "XGBoost-Spark does not support 'raw2probability'. This might change in upcoming releases.") + + @transient lazy val probability2predictionMirror = + reflectMethod(getSparkMlStage().get, "probability2prediction") + + private lazy val model = getSparkMlStage().get + private lazy val booster = model.nativeBooster + private lazy val treeLimit = model.getTreeLimit.toInt + private lazy val missing = model.getMissing + + override def transformFn: (RealNN, OPVector) => Prediction = (label, features) => { + val data = removeMissingValues(Iterator(features.value.asXGB), missing) + val dm = new DMatrix(dataIter = data) + val rawPred = booster.predict(dm, outPutMargin = true, treeLimit = treeLimit)(0).map(_.toDouble) + val prob = booster.predict(dm, outPutMargin = false, treeLimit = treeLimit)(0).map(_.toDouble) + val probability = if (model.numClasses == 2) Array(1.0 - prob(0), prob(0)) else prob + val prediction = probability2predictionMirror(Vectors.dense(probability)).asInstanceOf[Double] + Prediction(prediction = prediction, rawPrediction = rawPred, probability = probability) + } +} diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/feature/PercentileCalibrator.scala b/core/src/main/scala/com/salesforce/op/stages/impl/feature/PercentileCalibrator.scala index fe2f9bd326..f234adebc9 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/feature/PercentileCalibrator.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/feature/PercentileCalibrator.scala @@ -61,6 +61,7 @@ class PercentileCalibrator(uid: String = UID[PercentileCalibrator]) .setNumBuckets($(expectedNumBuckets)) .setRelativeError(0) .setInputCol(dataset.columns(0)) + .setOutputCol(dataset.columns(0) + "-out") val bucketizerModel = estimator.fit(dataset) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpGeneralizedLinearRegression.scala b/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpGeneralizedLinearRegression.scala index 1ad09555ad..148d13e02a 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpGeneralizedLinearRegression.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpGeneralizedLinearRegression.scala @@ -184,16 +184,15 @@ class OpGeneralizedLinearRegressionModel sparkModel = sparkModel) { @transient lazy private val predictLink = reflectMethod(getSparkMlStage().get, "predictLink") - @transient lazy private val predict = reflectMethod(getSparkMlStage().get, "predict") + @transient lazy private val predict = reflectMethod(getSparkMlStage().get, "predict", argsCount = Some(2)) /** * Function used to convert input to output */ override def transformFn: (RealNN, OPVector) => Prediction = (label, features) => { - val raw = predictLink.apply(features.value).asInstanceOf[Double] - val pred = predict.apply(features.value).asInstanceOf[Double] + val offset = 0.0 + val raw = predictLink(features.value, offset).asInstanceOf[Double] + val pred = predict(features.value, offset).asInstanceOf[Double] Prediction(prediction = pred, rawPrediction = raw) } } - - diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpXGBoostRegressor.scala b/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpXGBoostRegressor.scala new file mode 100644 index 0000000000..688f34f812 --- /dev/null +++ b/core/src/main/scala/com/salesforce/op/stages/impl/regression/OpXGBoostRegressor.scala @@ -0,0 +1,346 @@ +/* + * Copyright (c) 2017, Salesforce.com, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * * Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.salesforce.op.stages.impl.regression + +import com.salesforce.op.UID +import com.salesforce.op.features.types.{OPVector, Prediction, RealNN} +import com.salesforce.op.stages.impl.CheckIsResponseValues +import com.salesforce.op.stages.sparkwrappers.specific.{OpPredictionModel, OpPredictorWrapper} +import com.salesforce.op.utils.reflection.ReflectionUtils.reflectMethod +import ml.dmlc.xgboost4j.scala.{EvalTrait, ObjectiveTrait} +import ml.dmlc.xgboost4j.scala.spark.{OpXGBoostRegressorParams, TrackerConf, XGBoostRegressionModel, XGBoostRegressor} + +import scala.reflect.runtime.universe.TypeTag + +/** + * Wrapper around XGBoost regressor [[XGBoostRegressor]] + */ +class OpXGBoostRegressor(uid: String = UID[OpXGBoostRegressor]) + extends OpPredictorWrapper[XGBoostRegressor, XGBoostRegressionModel]( + predictor = new XGBoostRegressor(), + uid = uid + ) with OpXGBoostRegressorParams { + + override protected def onSetInput(): Unit = { + super.onSetInput() + CheckIsResponseValues(in1, in2) + } + + /** + * Weight column name. If this is not set or empty, we treat all instance weights as 1.0. + */ + def setWeightCol(value: String): this.type = set(weightCol, value) + + /** + * Initial prediction (aka base margin) column name. + */ + def setBaseMarginCol(value: String): this.type = set(baseMarginCol, value) + + /** + * Group column name + */ + def setGroupCol(value: String): this.type = set(groupCol, value) + + // setters for general params + + /** + * Rabit tracker configurations. The parameter must be provided as an instance of the + * [[TrackerConf]] class, which has the following definition: + * + * case class TrackerConf(workerConnectionTimeout: Duration, trainingTimeout: Duration, trackerImpl: String) + * + * See below for detailed explanations. + * + * - trackerImpl: Select the implementation of Rabit tracker. + * default: "python" + * + * Choice between "python" or "scala". The former utilizes the Java wrapper of the + * Python Rabit tracker (in dmlc_core), and does not support timeout settings. + * The "scala" version removes Python components, and fully supports timeout settings. + * + * - workerConnectionTimeout: the maximum wait time for all workers to connect to the tracker. + * default: 0 millisecond (no timeout) + * + * The timeout value should take the time of data loading and pre-processing into account, + * due to the lazy execution of Spark's operations. Alternatively, you may force Spark to + * perform data transformation before calling XGBoost.train(), so that this timeout truly + * reflects the connection delay. Set a reasonable timeout value to prevent model + * training/testing from hanging indefinitely, possible due to network issues. + * Note that zero timeout value means to wait indefinitely (equivalent to Duration.Inf). + * Ignored if the tracker implementation is "python". + */ + def setTrackerConf(value: TrackerConf): this.type = set(trackerConf, value) + + /** + * The number of rounds for boosting + */ + def setNumRound(value: Int): this.type = set(numRound, value) + + /** + * Number of workers used to train xgboost model. default: 1 + */ + def setNumWorkers(value: Int): this.type = set(numWorkers, value) + + /** + * Number of threads used by per worker. default 1 + */ + def setNthread(value: Int): this.type = set(nthread, value) + + /** + * Whether to use external memory as cache. default: false + */ + def setUseExternalMemory(value: Boolean): this.type = set(useExternalMemory, value) + + /** + * 0 means printing running messages, 1 means silent mode. default: 0 + */ + def setSilent(value: Int): this.type = set(silent, value) + + /** + * The value treated as missing + */ + def setMissing(value: Float): this.type = set(missing, value) + + /** + * The maximum time to wait for the job requesting new workers. default: 30 minutes + */ + def setTimeoutRequestWorkers(value: Long): this.type = set(timeoutRequestWorkers, value) + + /** + * The hdfs folder to load and save checkpoint boosters. default: `empty_string` + */ + def setCheckpointPath(value: String): this.type = set(checkpointPath, value) + + /** + * Checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that + * the trained model will get checkpointed every 10 iterations. Note: `checkpoint_path` must + * also be set if the checkpoint interval is greater than 0. + */ + def setCheckpointInterval(value: Int): this.type = set(checkpointInterval, value) + + /** + * Random seed for the C++ part of XGBoost and train/test splitting. + */ + def setSeed(value: Long): this.type = set(seed, value) + + /** + * Step size shrinkage used in update to prevents overfitting. After each boosting step, we + * can directly get the weights of new features and eta actually shrinks the feature weights + * to make the boosting process more conservative. [default=0.3] range: [0,1] + */ + def setEta(value: Double): this.type = set(eta, value) + + /** + * Minimum loss reduction required to make a further partition on a leaf node of the tree. + * the larger, the more conservative the algorithm will be. [default=0] range: [0, + * Double.MaxValue] + */ + def setGamma(value: Double): this.type = set(gamma, value) + + /** + * Maximum depth of a tree, increase this value will make model more complex / likely to be + * overfitting. [default=6] range: [1, Int.MaxValue] + */ + def setMaxDepth(value: Int): this.type = set(maxDepth, value) + + /** + * Minimum sum of instance weight(hessian) needed in a child. If the tree partition step results + * in a leaf node with the sum of instance weight less than min_child_weight, then the building + * process will give up further partitioning. In linear regression mode, this simply corresponds + * to minimum number of instances needed to be in each node. The larger, the more conservative + * the algorithm will be. [default=1] range: [0, Double.MaxValue] + */ + def setMinChildWeight(value: Double): this.type = set(minChildWeight, value) + + /** + * Maximum delta step we allow each tree's weight estimation to be. If the value is set to 0, it + * means there is no constraint. If it is set to a positive value, it can help making the update + * step more conservative. Usually this parameter is not needed, but it might help in logistic + * regression when class is extremely imbalanced. Set it to value of 1-10 might help control the + * update. [default=0] range: [0, Double.MaxValue] + */ + def setMaxDeltaStep(value: Double): this.type = set(maxDeltaStep, value) + + /** + * Subsample ratio of the training instance. Setting it to 0.5 means that XGBoost randomly + * collected half of the data instances to grow trees and this will prevent overfitting. + * [default=1] range:(0,1] + */ + def setSubsample(value: Double): this.type = set(subsample, value) + + /** + * Subsample ratio of columns when constructing each tree. [default=1] range: (0,1] + */ + def setColsampleBytree(value: Double): this.type = set(colsampleBytree, value) + + /** + * Subsample ratio of columns for each split, in each level. [default=1] range: (0,1] + */ + def setColsampleBylevel(value: Double): this.type = set(colsampleBylevel, value) + + /** + * L2 regularization term on weights, increase this value will make model more conservative. + * [default=1] + */ + def setLambda(value: Double): this.type = set(lambda, value) + + /** + * L1 regularization term on weights, increase this value will make model more conservative. + * [default=0] + */ + def setAlpha(value: Double): this.type = set(alpha, value) + + /** + * The tree construction algorithm used in XGBoost. options: {'auto', 'exact', 'approx'} + * [default='auto'] + */ + def setTreeMethod(value: String): this.type = set(treeMethod, value) + + /** + * Growth policy for fast histogram algorithm + */ + def setGrowPolicy(value: String): this.type = set(growPolicy, value) + + /** + * Maximum number of bins in histogram + */ + def setMaxBins(value: Int): this.type = set(maxBins, value) + + /** + * This is only used for approximate greedy algorithm. + * This roughly translated into O(1 / sketch_eps) number of bins. Compared to directly select + * number of bins, this comes with theoretical guarantee with sketch accuracy. + * [default=0.03] range: (0, 1) + */ + def setSketchEps(value: Double): this.type = set(sketchEps, value) + + /** + * Control the balance of positive and negative weights, useful for unbalanced classes. A typical + * value to consider: sum(negative cases) / sum(positive cases). [default=1] + */ + def setScalePosWeight(value: Double): this.type = set(scalePosWeight, value) + + /** + * Parameter for Dart booster. + * Type of sampling algorithm. "uniform": dropped trees are selected uniformly. + * "weighted": dropped trees are selected in proportion to weight. [default="uniform"] + */ + def setSampleType(value: String): this.type = set(sampleType, value) + + /** + * Parameter of Dart booster. + * type of normalization algorithm, options: {'tree', 'forest'}. [default="tree"] + */ + def setNormalizeType(value: String): this.type = set(normalizeType, value) + + /** + * Parameter of Dart booster. + * dropout rate. [default=0.0] range: [0.0, 1.0] + */ + def setRateDrop(value: Double): this.type = set(rateDrop, value) + + /** + * Parameter of Dart booster. + * probability of skip dropout. If a dropout is skipped, new trees are added in the same manner + * as gbtree. [default=0.0] range: [0.0, 1.0] + */ + def setSkipDrop(value: Double): this.type = set(skipDrop, value) + + /** + * Parameter of linear booster + * L2 regularization term on bias, default 0(no L1 reg on bias because it is not important) + */ + def setLambdaBias(value: Double): this.type = set(lambdaBias, value) + + // setters for learning params + def setObjective(value: String): this.type = set(objective, value) + + /** + * Specify the learning task and the corresponding learning objective. + * options: reg:linear, reg:logistic, binary:logistic, binary:logitraw, count:poisson, + * multi:softmax, multi:softprob, rank:pairwise, reg:gamma. default: reg:linear + */ + def setBaseScore(value: Double): this.type = set(baseScore, value) + + /** + * Evaluation metrics for validation data, a default metric will be assigned according to + * objective(rmse for regression, and error for classification, mean average precision for + * ranking). options: rmse, mae, logloss, error, merror, mlogloss, auc, aucpr, ndcg, map, + * gamma-deviance + */ + def setEvalMetric(value: String): this.type = set(evalMetric, value) + + /** + * Fraction of training points to use for testing. + */ + def setTrainTestRatio(value: Double): this.type = set(trainTestRatio, value) + + /** + * If non-zero, the training will be stopped after a specified number + * of consecutive increases in any evaluation metric. + */ + def setNumEarlyStoppingRounds(value: Int): this.type = set(numEarlyStoppingRounds, value) + + /** + * Customized objective function provided by user. default: null + */ + def setCustomObj(value: ObjectiveTrait): this.type = set(customObj, value) + + /** + * Customized evaluation function provided by user. default: null + */ + def setCustomEval(value: EvalTrait): this.type = set(customEval, value) + +} + + +/** + * Class that takes in a spark [[XGBoostRegressionModel]] and wraps it into an OP model which returns a + * Prediction feature + * @param sparkModel model to wrap + * @param uid uid to give stage + * @param operationName unique name of the operation this stage performs + */ +class OpXGBoostRegressionModel +( + sparkModel: XGBoostRegressionModel, + uid: String = UID[OpXGBoostRegressionModel], + operationName: String = classOf[XGBoostRegressor].getSimpleName +)( + implicit tti1: TypeTag[RealNN], + tti2: TypeTag[OPVector], + tto: TypeTag[Prediction], + ttov: TypeTag[Prediction#Value] +) extends OpPredictionModel[XGBoostRegressionModel]( + sparkModel = sparkModel, uid = uid, operationName = operationName +) { + @transient lazy val predictMirror = reflectMethod(getSparkMlStage().get, "predict") +} diff --git a/core/src/main/scala/com/salesforce/op/stages/sparkwrappers/specific/SparkModelConverter.scala b/core/src/main/scala/com/salesforce/op/stages/sparkwrappers/specific/SparkModelConverter.scala index bc47d01424..a15afaa750 100644 --- a/core/src/main/scala/com/salesforce/op/stages/sparkwrappers/specific/SparkModelConverter.scala +++ b/core/src/main/scala/com/salesforce/op/stages/sparkwrappers/specific/SparkModelConverter.scala @@ -34,6 +34,7 @@ import com.salesforce.op.features.types.{OPVector, Prediction, RealNN} import com.salesforce.op.stages.base.binary.OpTransformer2 import com.salesforce.op.stages.impl.classification._ import com.salesforce.op.stages.impl.regression._ +import ml.dmlc.xgboost4j.scala.spark.{XGBoostClassificationModel, XGBoostRegressionModel} import org.apache.spark.ml.classification._ import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.regression._ @@ -94,6 +95,8 @@ object SparkModelConverter { case m: GBTRegressionModel => new OpGBTRegressionModel(m, uid = uid) case m: DecisionTreeRegressionModel => new OpDecisionTreeRegressionModel(m, uid = uid) case m: GeneralizedLinearRegressionModel => new OpGeneralizedLinearRegressionModel(m, uid = uid) + case m: XGBoostClassificationModel => new OpXGBoostClassificationModel(m, uid = uid) + case m: XGBoostRegressionModel => new OpXGBoostRegressionModel(m, uid = uid) case m => throw new RuntimeException(s"model conversion not implemented for model $m") } } diff --git a/core/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostParams.scala b/core/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostParams.scala new file mode 100644 index 0000000000..06cd17a463 --- /dev/null +++ b/core/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostParams.scala @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2017, Salesforce.com, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * * Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package ml.dmlc.xgboost4j.scala.spark + +import ml.dmlc.xgboost4j.LabeledPoint +import ml.dmlc.xgboost4j.scala.spark.params.GeneralParams +import org.apache.log4j.{Level, Logger} +import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector} + +import scala.collection.mutable.ArrayBuffer + +/** + * Hack to access [[XGBoostClassifierParams]] + */ +trait OpXGBoostClassifierParams extends XGBoostClassifierParams with OpXGBoostGeneralParamsDefaults + +/** + * Hack to access [[XGBoostRegressorParams]] + */ +trait OpXGBoostRegressorParams extends XGBoostRegressorParams with OpXGBoostGeneralParamsDefaults + +/** + * XGBoost [[GeneralParams]] defaults + */ +trait OpXGBoostGeneralParamsDefaults { + self: GeneralParams => + setDefault(trackerConf -> OpXGBoost.DefaultTrackerConf) +} + +/** + * Helper trait to hush XGBoost annoying logging + */ +trait OpXGBoostQuietLogging { + Logger.getLogger("akka").setLevel(Level.WARN) + Logger.getLogger("XGBoostSpark").setLevel(Level.WARN) + Logger.getLogger(classOf[XGBoostClassifier]).setLevel(Level.WARN) + Logger.getLogger(classOf[XGBoostRegressor]).setLevel(Level.WARN) +} + +case object OpXGBoost { + val DefaultTrackerConf = TrackerConf(workerConnectionTimeout = 0L, "scala") + + implicit class RichMLVectorToXGBLabeledPoint(val v: Vector) extends AnyVal { + /** + * Converts a [[Vector]] to a data point with a dummy label. + * + * This is needed for constructing a [[ml.dmlc.xgboost4j.scala.DMatrix]] + * for prediction. + */ + def asXGB: LabeledPoint = v match { + case v: DenseVector => LabeledPoint(0.0f, null, v.values.map(_.toFloat)) + case v: SparseVector => LabeledPoint(0.0f, v.indices, v.values.map(_.toFloat)) + } + } + + /** + * Hack to access [[ml.dmlc.xgboost4j.scala.spark.XGBoost.removeMissingValues]] private method + */ + def removeMissingValues(xgbLabelPoints: Iterator[LabeledPoint], missing: Float): Iterator[LabeledPoint] = + XGBoost.removeMissingValues(xgbLabelPoints, missing) +} diff --git a/core/src/test/resources/RunnerParams.json b/core/src/test/resources/RunnerParams.json index 8ecfd9ce4b..cd9f3bfb26 100644 --- a/core/src/test/resources/RunnerParams.json +++ b/core/src/test/resources/RunnerParams.json @@ -19,9 +19,9 @@ "partitions": 1 } }, - "modelLocation": "resources/tmp/OpWorkflowRunnerTest/op-runner-test-model", - "writeLocation": "resources/tmp/OpWorkflowRunnerTest/op-runner-test-write", - "metricsLocation": "resources/tmp/OpWorkflowRunnerTest/op-runner-test-metrics", + "modelLocation": "", + "writeLocation": "", + "metricsLocation": "", "customParams" : {}, "customTagName": "myTag", "collectStageMetrics": true, diff --git a/core/src/test/resources/application.conf b/core/src/test/resources/application.conf new file mode 100644 index 0000000000..3f09796cad --- /dev/null +++ b/core/src/test/resources/application.conf @@ -0,0 +1,4 @@ +akka { + loggers = ["akka.event.slf4j.Slf4jLogger"] + loglevel = "WARNING" +} diff --git a/core/src/test/scala/com/salesforce/op/ModelInsightsTest.scala b/core/src/test/scala/com/salesforce/op/ModelInsightsTest.scala index 8a1df0e21b..a255955cd9 100644 --- a/core/src/test/scala/com/salesforce/op/ModelInsightsTest.scala +++ b/core/src/test/scala/com/salesforce/op/ModelInsightsTest.scala @@ -30,7 +30,6 @@ package com.salesforce.op -import com.salesforce.op.evaluators.{EvalMetric, EvaluationMetrics} import com.salesforce.op.features.Feature import com.salesforce.op._ import com.salesforce.op.features.types.{PickList, Real, RealNN} @@ -39,6 +38,7 @@ import com.salesforce.op.stages.impl.classification.{BinaryClassificationModelSe import com.salesforce.op.stages.impl.preparators._ import com.salesforce.op.stages.impl.regression.{OpLinearRegression, RegressionModelSelector} import com.salesforce.op.stages.impl.selector.ModelSelectorNames.EstimatorType +import com.salesforce.op.stages.impl.selector.SelectedModel import com.salesforce.op.stages.impl.selector.ValidationType._ import com.salesforce.op.stages.impl.selector.{ModelEvaluation, ProblemType, SelectedModel, ValidationType} import com.salesforce.op.stages.impl.tuning.{DataCutter, DataSplitter, SplitterSummary} @@ -66,8 +66,7 @@ class ModelInsightsTest extends FlatSpec with PassengerSparkFixtureTest { implicit val doubleOptEquality = new Equality[Option[Double]] { def areEqual(a: Option[Double], b: Any): Boolean = b match { case None => a.isEmpty - case Some(s: Double) => (a.exists(_.isNaN) && s.isNaN) || - (a.nonEmpty && a.contains(s)) + case Some(d: Double) => (a.exists(_.isNaN) && d.isNaN) || a.contains(d) case _ => false } } diff --git a/core/src/test/scala/com/salesforce/op/OpWorkflowRunnerTest.scala b/core/src/test/scala/com/salesforce/op/OpWorkflowRunnerTest.scala index b87b5bb945..23ef395cf9 100644 --- a/core/src/test/scala/com/salesforce/op/OpWorkflowRunnerTest.scala +++ b/core/src/test/scala/com/salesforce/op/OpWorkflowRunnerTest.scala @@ -48,26 +48,22 @@ import org.scalatest.junit.JUnitRunner import org.slf4j.LoggerFactory import scala.collection.JavaConverters._ -import scala.concurrent.Promise +import scala.concurrent.{Future, Promise} import scala.reflect.ClassTag @RunWith(classOf[JUnitRunner]) -class OpWorkflowRunnerTest extends AsyncFlatSpec - with PassengerSparkFixtureTest with TestSparkStreamingContext { +class OpWorkflowRunnerTest extends AsyncFlatSpec with PassengerSparkFixtureTest with TestSparkStreamingContext { val log = LoggerFactory.getLogger(this.getClass) - val thisDir = Paths.get("resources", "tmp", "OpWorkflowRunnerTest").toFile.getCanonicalFile - - override def beforeAll: Unit = try deleteRecursively(thisDir) finally super.beforeAll - override def afterAll: Unit = try deleteRecursively(thisDir) finally super.afterAll + lazy val testDir = Paths.get(tempDir.toString, "op-runner-test").toFile.getAbsoluteFile + lazy val modelLocation = Paths.get(testDir.toString, "model").toFile.getAbsoluteFile private val features = Seq(height, weight, gender, description, age).transmogrify() private val survivedNum = survived.occurs() - val pred = new OpLogisticRegression().setRegParam(0) - .setInput(survivedNum, features).getOutput() + val pred = new OpLogisticRegression().setRegParam(0).setInput(survivedNum, features).getOutput() private val workflow = new OpWorkflow().setResultFeatures(pred, survivedNum).setReader(dataReader) private val evaluator = Evaluators.BinaryClassification().setLabelCol(survivedNum).setPredictionCol(pred) @@ -86,7 +82,8 @@ class OpWorkflowRunnerTest extends AsyncFlatSpec val invalidParamsLocation = Some(resourceFile(name = "RunnerParamsInvalid.json").getPath) val paramsLocation = Some(resourceFile(name = "RunnerParams.json").getPath) - val testConfig = OpWorkflowRunnerConfig(paramLocation = paramsLocation) + def testConfig: OpWorkflowRunnerConfig = OpWorkflowRunnerConfig(paramLocation = paramsLocation) + .copy(modelLocation = Some(modelLocation.toString)) Spec[OpWorkflowRunner] should "correctly determine if the command line options are valid for each run type" in { assertConf(OpWorkflowRunnerConfig(Train, modelLocation = Some("Test"))) @@ -127,12 +124,10 @@ class OpWorkflowRunnerTest extends AsyncFlatSpec } it should "train a workflow and write the trained model" in { - lazy val modelLocation = new File(thisDir, "op-runner-test-model") - lazy val modelMetricsLocation = Paths.get(thisDir.toString, "op-runner-test-metrics", "train").toFile + val modelMetricsLocation = Paths.get(testDir.toString, "train-metrics").toFile.getCanonicalFile val runConfig = testConfig.copy( runType = Train, - modelLocation = Some(modelLocation.toString), metricsLocation = Some(modelMetricsLocation.toString) ) val res = doRun[TrainResult](runConfig, modelLocation, modelMetricsLocation) @@ -140,8 +135,8 @@ class OpWorkflowRunnerTest extends AsyncFlatSpec } it should "score a dataset with a trained model" in { - val scoresLocation = Paths.get(thisDir.toString, "op-runner-test-write", "score").toFile - val scoringMetricsLocation = Paths.get(thisDir.toString, "op-runner-test-metrics", "score").toFile + val scoresLocation = Paths.get(testDir.toString, "score").toFile.getCanonicalFile + val scoringMetricsLocation = Paths.get(testDir.toString, "score-metrics").toFile.getCanonicalFile val runConfig = testConfig.copy( runType = Score, @@ -156,8 +151,8 @@ class OpWorkflowRunnerTest extends AsyncFlatSpec } it should "streaming score a dataset with a trained model" in { - val readLocation = Paths.get(thisDir.toString, "op-runner-test-read", "streaming-score").toFile - val scoresLocation = Paths.get(thisDir.toString, "op-runner-test-write", "streaming-score").toFile + val readLocation = Paths.get(testDir.toString, "streaming-score-in").toFile.getCanonicalFile + val scoresLocation = Paths.get(testDir.toString, "streaming-score-out").toFile.getCanonicalFile // Prepare streaming input data FileUtils.forceMkdir(readLocation) @@ -179,18 +174,20 @@ class OpWorkflowRunnerTest extends AsyncFlatSpec } it should "evaluate a dataset with a trained model" in { - val metricsLocation = Paths.get(thisDir.toString, "op-runner-test-metrics", "eval").toFile + val scoresLocation = Paths.get(testDir.toString, "eval-score").toFile.getCanonicalFile + val metricsLocation = Paths.get(testDir.toString, "eval-metrics").toFile.getCanonicalFile val runConfig = testConfig.copy( runType = Evaluate, + writeLocation = Some(scoresLocation.toString), metricsLocation = Some(metricsLocation.toString) ) - val res = doRun[EvaluateResult](runConfig, metricsLocation) + val res = doRun[EvaluateResult](runConfig, metricsLocation, scoresLocation) res.metrics shouldBe a[BinaryClassificationMetrics] } it should "compute features upto with a workflow" in { - lazy val featuresLocation = Paths.get(thisDir.toString, "op-runner-test-write", "features").toFile + lazy val featuresLocation = Paths.get(testDir.toString, "features").toFile.getCanonicalFile val runConfig = testConfig.copy( runType = Features, @@ -201,8 +198,10 @@ class OpWorkflowRunnerTest extends AsyncFlatSpec } it should "collect and report metrics on application end" in { - spark.stop() - metricsPromise.future.map { metrics => + for { + _ <- Future(spark.stop()) // stop spark to make sure metrics promise completes + metrics <- metricsPromise.future + } yield { metrics.appId.isEmpty shouldBe false OpWorkflowRunType.withNameInsensitiveOption(metrics.runType).isDefined shouldBe true metrics.appName shouldBe "op-test" diff --git a/core/src/test/scala/com/salesforce/op/filters/PreparedFeaturesTest.scala b/core/src/test/scala/com/salesforce/op/filters/PreparedFeaturesTest.scala index 2c53192c3f..38689d7615 100644 --- a/core/src/test/scala/com/salesforce/op/filters/PreparedFeaturesTest.scala +++ b/core/src/test/scala/com/salesforce/op/filters/PreparedFeaturesTest.scala @@ -35,14 +35,14 @@ import com.salesforce.op.features.{FeatureBuilder, OPFeature, TransientFeature} import com.salesforce.op.stages.impl.feature.TimePeriod import com.salesforce.op.stages.impl.preparators.CorrelationType import com.salesforce.op.test.{Passenger, PassengerSparkFixtureTest} -import com.twitter.algebird.Monoid._ import com.twitter.algebird.Operators._ +import com.twitter.algebird.Tuple2Semigroup import org.apache.spark.mllib.stat.Statistics -import org.apache.spark.rdd.RDD import org.apache.spark.sql.DataFrame import org.junit.runner.RunWith import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner +import com.salesforce.op.filters.Summary._ @RunWith(classOf[JUnitRunner]) class PreparedFeaturesTest extends FlatSpec with PassengerSparkFixtureTest { @@ -66,6 +66,7 @@ class PreparedFeaturesTest extends FlatSpec with PassengerSparkFixtureTest { responses = Map(responseKey2 -> Right(Seq(-0.5))), predictors = Map(predictorKey2A -> Left(Seq("iv")))) val allPreparedFeatures = Seq(preparedFeatures1, preparedFeatures2, preparedFeatures3) + implicit val sgTuple2 = new Tuple2Semigroup[Map[FeatureKey, Summary], Map[FeatureKey, Summary]]() val (allResponseSummaries, allPredictorSummaries) = allPreparedFeatures.map(_.summaries).reduce(_ + _) val allResponseKeys1 = Array(responseKey1, responseKey2) diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/classification/BinaryClassificationModelSelectorTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/classification/BinaryClassificationModelSelectorTest.scala index 1106375360..beef7c5e9c 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/classification/BinaryClassificationModelSelectorTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/classification/BinaryClassificationModelSelectorTest.scala @@ -66,12 +66,12 @@ class BinaryClassificationModelSelectorTest extends FlatSpec with TestSparkConte // Generate positive observations following a distribution ~ N((0.0, 0.0, 0.0), I_3) val positiveData = - normalVectorRDD(spark.sparkContext, bigCount, 3, seed = seed) + normalVectorRDD(sc, bigCount, 3, seed = seed) .map(v => 1.0 -> Vectors.dense(v.toArray)) // Generate negative observations following a distribution ~ N((10.0, 10.0, 10.0), I_3) val negativeData = - normalVectorRDD(spark.sparkContext, smallCount, 3, seed = seed) + normalVectorRDD(sc, smallCount, 3, seed = seed) .map(v => 0.0 -> Vectors.dense(v.toArray.map(_ + 10.0))) val stageNames = Array("label_prediction", "label_rawPrediction", "label_probability") diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/classification/MultiClassificationModelSelectorTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/classification/MultiClassificationModelSelectorTest.scala index 97b390da86..2d91850a67 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/classification/MultiClassificationModelSelectorTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/classification/MultiClassificationModelSelectorTest.scala @@ -66,17 +66,17 @@ class MultiClassificationModelSelectorTest extends FlatSpec with TestSparkContex // Generate observations of label 1 following a distribution ~ N((-100.0, -100.0, -100.0), I_3) val label0Data = - normalVectorRDD(spark.sparkContext, label0Count, 3, seed = seed) + normalVectorRDD(sc, label0Count, 3, seed = seed) .map(v => 0.0 -> Vectors.dense(v.toArray.map(_ - 100.0))) // Generate observations of label 0 following a distribution ~ N((0.0, 0.0, 0.0), I_3) val label1Data = - normalVectorRDD(spark.sparkContext, label1Count, 3, seed = seed) + normalVectorRDD(sc, label1Count, 3, seed = seed) .map(v => 1.0 -> Vectors.dense(v.toArray)) // Generate observations of label 2 following a distribution ~ N((100.0, 100.0, 100.0), I_3) val label2Data = - normalVectorRDD(spark.sparkContext, label2Count, 3, seed = seed) + normalVectorRDD(sc, label2Count, 3, seed = seed) .map(v => 2.0 -> Vectors.dense(v.toArray.map(_ + 100.0))) val stageNames = Array("label_prediction", "label_rawPrediction", "label_probability") diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpClassifierModelTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpClassifierModelTest.scala index f010566640..c5de4daabc 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpClassifierModelTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpClassifierModelTest.scala @@ -34,16 +34,18 @@ import com.salesforce.op.features.types.{Prediction, RealNN} import com.salesforce.op.stages.sparkwrappers.specific.SparkModelConverter._ import com.salesforce.op.test._ import com.salesforce.op.testkit._ +import ml.dmlc.xgboost4j.scala.spark.{OpXGBoost, OpXGBoostQuietLogging, XGBoostClassifier} import org.apache.spark.ml.classification._ import org.apache.spark.ml.linalg.Vector import org.apache.spark.sql.DataFrame import org.junit.runner.RunWith +import org.scalactic.Equality import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner @RunWith(classOf[JUnitRunner]) -class OpClassifierModelTest extends FlatSpec with TestSparkContext { +class OpClassifierModelTest extends FlatSpec with TestSparkContext with OpXGBoostQuietLogging { private val label = RandomIntegral.integrals(0, 2).limit(1000) .map{ v => RealNN(v.value.map(_.toDouble).getOrElse(0.0)) } @@ -51,9 +53,7 @@ class OpClassifierModelTest extends FlatSpec with TestSparkContext { private val data = label.zip(fv) - private val (rawDF, labelF, featureV) = - TestFeatureBuilder("label", "features", data) - + private val (rawDF, labelF, featureV) = TestFeatureBuilder("label", "features", data) Spec[OpDecisionTreeClassificationModel] should "produce the same values as the spark version" in { val spk = new DecisionTreeClassifier() @@ -62,11 +62,9 @@ class OpClassifierModelTest extends FlatSpec with TestSparkContext { .fit(rawDF) val op = toOP(spk, spk.uid).setInput(labelF, featureV) - compareOutputs(spk.transform(rawDF), op.transform(rawDF)) } - Spec[OpLogisticRegressionModel] should "produce the same values as the spark version" in { val spk = new LogisticRegression() .setFamily("multinomial") @@ -75,7 +73,6 @@ class OpClassifierModelTest extends FlatSpec with TestSparkContext { .fit(rawDF) val op = toOP(spk, spk.uid).setInput(labelF, featureV) - compareOutputs(spk.transform(rawDF), op.transform(rawDF)) } @@ -87,7 +84,6 @@ class OpClassifierModelTest extends FlatSpec with TestSparkContext { .fit(rawDF) val op = toOP(spk, uid = spk.uid).setInput(labelF, featureV) - compareOutputs(spk.transform(rawDF), op.transform(rawDF)) } @@ -98,7 +94,6 @@ class OpClassifierModelTest extends FlatSpec with TestSparkContext { .fit(rawDF) val op = toOP(spk, spk.uid).setInput(labelF, featureV) - compareOutputs(spk.transform(rawDF), op.transform(rawDF)) } @@ -118,7 +113,6 @@ class OpClassifierModelTest extends FlatSpec with TestSparkContext { .setLabelCol(labelF.name) .fit(rawDF) val op = toOP(spk, spk.uid).setInput(labelF, featureV) - compareOutputsPred(spk.transform(rawDF), op.transform(rawDF), 3) } @@ -130,11 +124,37 @@ class OpClassifierModelTest extends FlatSpec with TestSparkContext { .setLabelCol(labelF.name) .fit(rawDF) val op = toOP(spk, spk.uid).setInput(labelF, featureV) - compareOutputsPred(spk.transform(rawDF), op.transform(rawDF), 2) + compareOutputs(spk.transform(rawDF), op.transform(rawDF)) } - def compareOutputs(df1: DataFrame, df2: DataFrame): Unit = { + Spec[OpXGBoostClassifier] should "produce the same values as the spark version" in { + val cl = new XGBoostClassifier() + cl.set(cl.trackerConf, OpXGBoost.DefaultTrackerConf) + .setFeaturesCol(featureV.name) + .setLabelCol(labelF.name) + val spk = cl.fit(rawDF) + val op = toOP(spk, spk.uid).setInput(labelF, featureV) + + // ****************************************************** + // TODO: remove equality tolerance once XGBoost rounding bug in XGBoostClassifier.transform(probabilityUDF) is fixed + // TODO: ETA - will be added in XGBoost version 0.81 + implicit val doubleEquality = new Equality[Double] { + def areEqual(a: Double, b: Any): Boolean = b match { + case s: Double => (a.isNaN && s.isNaN) || math.abs(a - s) < 0.0000001 + case _ => false + } + } + implicit val doubleArrayEquality = new Equality[Array[Double]] { + def areEqual(a: Array[Double], b: Any): Boolean = b match { + case s: Array[_] if a.length == s.length => a.zip(s).forall(v => doubleEquality.areEqual(v._1, v._2)) + case _ => false + } + } + // ****************************************************** + compareOutputs(spk.transform(rawDF), op.transform(rawDF)) + } + def compareOutputs(df1: DataFrame, df2: DataFrame)(implicit arrayEquality: Equality[Array[Double]]): Unit = { def keysStartsWith(name: String, value: Map[String, Double]): Array[Double] = { val names = value.keys.filter(_.startsWith(name)).toArray.sorted names.map(value) diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpDecisionTreeClassifierTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpDecisionTreeClassifierTest.scala index 7d80f22ad6..a7b864b2a5 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpDecisionTreeClassifierTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpDecisionTreeClassifierTest.scala @@ -45,6 +45,8 @@ class OpDecisionTreeClassifierTest extends OpEstimatorSpec[Prediction, OpPredictorWrapperModel[DecisionTreeClassificationModel], OpPredictorWrapper[DecisionTreeClassifier, DecisionTreeClassificationModel]] with PredictionEquality { + override def specName: String = Spec[OpDecisionTreeClassifier] + val (inputData, rawFeature1, feature2) = TestFeatureBuilder("label", "features", Seq[(RealNN, OPVector)]( 1.0.toRealNN -> Vectors.dense(12.0, 4.3, 1.3).toOPVector, @@ -71,7 +73,6 @@ class OpDecisionTreeClassifierTest extends OpEstimatorSpec[Prediction, Prediction(0.0, Array(4.0, 0.0), Array(1.0, 0.0)) ) - it should "allow the user to set the desired spark parameters" in { estimator .setMaxDepth(6) diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpGBTClassifierTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpGBTClassifierTest.scala index f531f4790e..7d1fa11e43 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpGBTClassifierTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpGBTClassifierTest.scala @@ -44,6 +44,8 @@ import org.scalatest.junit.JUnitRunner class OpGBTClassifierTest extends OpEstimatorSpec[Prediction, OpPredictorWrapperModel[GBTClassificationModel], OpPredictorWrapper[GBTClassifier, GBTClassificationModel]] with PredictionEquality { + override def specName: String = Spec[OpGBTClassifier] + val (inputData, rawFeature1, feature2) = TestFeatureBuilder("label", "features", Seq[(RealNN, OPVector)]( 1.0.toRealNN -> Vectors.dense(12.0, 4.3, 1.3).toOPVector, diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpLinearSVCTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpLinearSVCTest.scala index 0724b62f73..c584bdd737 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpLinearSVCTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpLinearSVCTest.scala @@ -44,6 +44,8 @@ import org.scalatest.junit.JUnitRunner class OpLinearSVCTest extends OpEstimatorSpec[Prediction, OpPredictorWrapperModel[LinearSVCModel], OpPredictorWrapper[LinearSVC, LinearSVCModel]] with PredictionEquality { + override def specName: String = Spec[OpLinearSVC] + val (inputData, rawFeature1, feature2) = TestFeatureBuilder("label", "features", Seq[(RealNN, OPVector)]( 1.0.toRealNN -> Vectors.dense(12.0, 4.3, 1.3).toOPVector, diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpLogisticRegressionTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpLogisticRegressionTest.scala index 95e7375735..a9997d2300 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpLogisticRegressionTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpLogisticRegressionTest.scala @@ -44,6 +44,8 @@ import org.scalatest.junit.JUnitRunner class OpLogisticRegressionTest extends OpEstimatorSpec[Prediction, OpPredictorWrapperModel[LogisticRegressionModel], OpPredictorWrapper[LogisticRegression, LogisticRegressionModel]] with PredictionEquality { + override def specName: String = Spec[OpLogisticRegression] + val (inputData, rawFeature1, feature2) = TestFeatureBuilder("label", "features", Seq[(RealNN, OPVector)]( 1.0.toRealNN -> Vectors.dense(12.0, 4.3, 1.3).toOPVector, diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpMultilayerPerceptronClassifierTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpMultilayerPerceptronClassifierTest.scala index e932c7c023..f3486972a7 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpMultilayerPerceptronClassifierTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpMultilayerPerceptronClassifierTest.scala @@ -45,6 +45,8 @@ class OpMultilayerPerceptronClassifierTest extends OpEstimatorSpec[Prediction, OpPredictorWrapperModel[MultilayerPerceptronClassificationModel], OpPredictorWrapper[MultilayerPerceptronClassifier, MultilayerPerceptronClassificationModel]] with PredictionEquality { + override def specName: String = Spec[OpMultilayerPerceptronClassifier] + val (inputData, rawFeature1, feature2) = TestFeatureBuilder("label", "features", Seq[(RealNN, OPVector)]( 1.0.toRealNN -> Vectors.dense(12.0, 4.3, 1.3).toOPVector, @@ -62,26 +64,20 @@ class OpMultilayerPerceptronClassifierTest extends OpEstimatorSpec[Prediction, .setInput(feature1, feature2) .setLayers(Array(3, 5, 4, 2)) - val expectedResult = Seq( - Prediction(1.0), - Prediction(0.0), - Prediction(0.0), - Prediction(1.0), - Prediction(1.0), - Prediction(0.0), - Prediction(1.0), - Prediction(0.0) + Prediction(1.0, Array(-9.655814651428148, 9.202335441336952), Array(6.456683124562021E-9, 0.9999999935433168)), + Prediction(0.0, Array(9.475612761543069, -10.617525149157993), Array(0.9999999981221492, 1.877850786773977E-9)), + Prediction(0.0, Array(9.715293827870028, -10.885255922155942), Array(0.9999999988694366, 1.130563392364822E-9)), + Prediction(1.0, Array(-9.66776357765489, 9.215079716735316), Array(6.299199338896916E-9, 0.9999999937008006)), + Prediction(1.0, Array(-9.668041712561456, 9.215387575592239), Array(6.2955091287182745E-9, 0.9999999937044908)), + Prediction(0.0, Array(9.692904797559496, -10.860273756796797), Array(0.9999999988145918, 1.1854083109077814E-9)), + Prediction(1.0, Array(-9.667687253240183, 9.214995747770411), Array(6.300209139771467E-9, 0.9999999936997908)), + Prediction(0.0, Array(9.703097414537668, -10.872171694864653), Array(0.9999999988404908, 1.1595091005698914E-9)) ) - it should "allow the user to set the desired spark parameters" in { - estimator - .setMaxIter(50) - .setBlockSize(2) - .setSeed(42) + estimator.setMaxIter(50).setBlockSize(2).setSeed(42) estimator.fit(inputData) - estimator.predictor.getMaxIter shouldBe 50 estimator.predictor.getBlockSize shouldBe 2 estimator.predictor.getSeed shouldBe 42 diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpNaiveBayesTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpNaiveBayesTest.scala index aed8c4d3e8..7e24de31ab 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpNaiveBayesTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpNaiveBayesTest.scala @@ -44,6 +44,8 @@ import org.scalatest.junit.JUnitRunner class OpNaiveBayesTest extends OpEstimatorSpec[Prediction, OpPredictorWrapperModel[NaiveBayesModel], OpPredictorWrapper[NaiveBayes, NaiveBayesModel]] with PredictionEquality { + override def specName: String = Spec[OpNaiveBayes] + val (inputData, rawFeature1, feature2) = TestFeatureBuilder("label", "features", Seq[(RealNN, OPVector)]( 1.0.toRealNN -> Vectors.dense(12.0, 4.3, 1.3).toOPVector, @@ -70,12 +72,9 @@ class OpNaiveBayesTest extends OpEstimatorSpec[Prediction, OpPredictorWrapperMod Prediction(0.0, Array(-4.54, -6.32), Array(0.85, 0.14)) ) - it should "allow the user to set the desired spark parameters" in { - estimator - .setSmoothing(2) + estimator.setSmoothing(2) estimator.fit(inputData) - estimator.predictor.getSmoothing shouldBe 2 } } diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpRandomForestClassifierTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpRandomForestClassifierTest.scala index b2b25b5816..7c9e9d0277 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpRandomForestClassifierTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpRandomForestClassifierTest.scala @@ -44,6 +44,8 @@ class OpRandomForestClassifierTest extends OpEstimatorSpec[Prediction, OpPredictorWrapperModel[RandomForestClassificationModel], OpPredictorWrapper[RandomForestClassifier, RandomForestClassificationModel]] with PredictionEquality { + override def specName: String = Spec[OpRandomForestClassifier] + lazy val (inputData, rawLabelMulti, featuresMulti) = TestFeatureBuilder[RealNN, OPVector]("labelMulti", "featuresMulti", Seq( diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpXGBoostClassifierTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpXGBoostClassifierTest.scala new file mode 100644 index 0000000000..77760810db --- /dev/null +++ b/core/src/test/scala/com/salesforce/op/stages/impl/classification/OpXGBoostClassifierTest.scala @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2017, Salesforce.com, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * * Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.salesforce.op.stages.impl.classification + +import com.salesforce.op.features.types._ +import com.salesforce.op.stages.impl.PredictionEquality +import com.salesforce.op.stages.sparkwrappers.specific.{OpPredictorWrapper, OpPredictorWrapperModel} +import com.salesforce.op.test.{OpEstimatorSpec, TestFeatureBuilder} +import ml.dmlc.xgboost4j.scala.spark.{OpXGBoostQuietLogging, XGBoostClassificationModel, XGBoostClassifier} +import org.apache.spark.ml.linalg.Vectors +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner + + +@RunWith(classOf[JUnitRunner]) +class OpXGBoostClassifierTest extends OpEstimatorSpec[Prediction, OpPredictorWrapperModel[XGBoostClassificationModel], + OpPredictorWrapper[XGBoostClassifier, XGBoostClassificationModel]] + with PredictionEquality with OpXGBoostQuietLogging { + + override def specName: String = Spec[OpXGBoostClassifier] + + val rawData = Seq( + 1.0 -> Vectors.dense(12.0, 4.3, 1.3), + 0.0 -> Vectors.dense(0.0, 0.3, 0.1), + 0.0 -> Vectors.dense(1.0, 3.9, 4.3), + 1.0 -> Vectors.dense(10.0, 1.3, 0.9), + 1.0 -> Vectors.dense(15.0, 4.7, 1.3), + 0.0 -> Vectors.dense(0.5, 0.9, 10.1), + 1.0 -> Vectors.dense(11.5, 2.3, 1.3), + 0.0 -> Vectors.dense(0.1, 3.3, 0.1) + ).map { case (l, v) => l.toRealNN -> v.toOPVector } + + val (inputData, label, features) = TestFeatureBuilder("label", "features", rawData) + + val estimator = new OpXGBoostClassifier().setInput(label.copy(isResponse = true), features) + estimator.setSilent(1) + + val expectedResult = Seq( + Prediction(1.0, Array(0.6200000047683716), Array(0.3799999952316284, 0.6200000047683716)), + Prediction(0.0, Array(0.3799999952316284), Array(0.6200000047683716, 0.3799999952316284)), + Prediction(0.0, Array(0.3799999952316284), Array(0.6200000047683716, 0.3799999952316284)), + Prediction(1.0, Array(0.6200000047683716), Array(0.3799999952316284, 0.6200000047683716)), + Prediction(1.0, Array(0.6200000047683716), Array(0.3799999952316284, 0.6200000047683716)), + Prediction(0.0, Array(0.3799999952316284), Array(0.6200000047683716, 0.3799999952316284)), + Prediction(1.0, Array(0.6200000047683716), Array(0.3799999952316284, 0.6200000047683716)), + Prediction(0.0, Array(0.3799999952316284), Array(0.6200000047683716, 0.3799999952316284)) + ) + + it should "allow the user to set the desired spark parameters" in { + estimator.setAlpha(0.872).setEta(0.99912) + estimator.fit(inputData) + estimator.predictor.getAlpha shouldBe 0.872 + estimator.predictor.getEta shouldBe 0.99912 + } +} diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/feature/DecisionTreeNumericBucketizerTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/feature/DecisionTreeNumericBucketizerTest.scala index 263ade4d2f..1f29198606 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/feature/DecisionTreeNumericBucketizerTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/feature/DecisionTreeNumericBucketizerTest.scala @@ -171,11 +171,17 @@ class DecisionTreeNumericBucketizerTest extends OpEstimatorSpec[OPVector, val (ds, rawBinary, rawCurrency, rawER, label) = TestFeatureBuilder("binary", "currency", "expectedRevenue", "label", rawData) + // Spark changed their split algorithm in 2.3.0 to use the mean, so adjust our expected value here + // https://issues.apache.org/jira/browse/SPARK-16957 + val splitValue = expectedRevenueData + .filter(x => x.nonEmpty && x.value.get > 0.0) + .map(_.value.get).min / 2.0 + val out = rawER.autoBucketize(label.copy(isResponse = true), trackNulls = true, trackInvalid = true) assertBucketizer( bucketizer = out.originStage.asInstanceOf[DecisionTreeNumericBucketizer[_, _ <: OPNumeric[_]]], data = ds, shouldSplit = true, trackNulls = true, trackInvalid = true, - expectedSplits = Array(Double.NegativeInfinity, 0.0, Double.PositiveInfinity), + expectedSplits = Array(Double.NegativeInfinity, splitValue, Double.PositiveInfinity), expectedTolerance = 0.15 ) } diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/feature/PercentileCalibratorTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/feature/PercentileCalibratorTest.scala index 490cdd583d..48f3ad5746 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/feature/PercentileCalibratorTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/feature/PercentileCalibratorTest.scala @@ -75,9 +75,10 @@ class PercentileCalibratorTest extends FlatSpec with TestSparkContext { val splits = trans.getMetadata().getSummaryMetadata().getStringArray(PercentileCalibrator.OrigSplitsKey) val scaled = trans.getMetadata().getSummaryMetadata().getStringArray(PercentileCalibrator.ScaledSplitsKey) - splits should contain theSameElementsAs - Array(Double.NegativeInfinity, 0.7231742029971469, 0.9908988967772393, Double.PositiveInfinity).map(_.toString) - scaled should contain theSameElementsAs Array(0.0, 50.0, 99.0, 99.0).map(_.toString) + splits shouldEqual Array( + Double.NegativeInfinity, 0.25329310557439133, 0.7231742029971469, 0.9908988967772393, + Double.PositiveInfinity).map(_.toString) + scaled shouldEqual Array(0.0, 33.0, 66.0, 99.0, 99.0).map(_.toString) } it should "return a maximum calibrated score of 99" in { diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/preparators/SanityCheckerTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/preparators/SanityCheckerTest.scala index 426094722d..157ef808a2 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/preparators/SanityCheckerTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/preparators/SanityCheckerTest.scala @@ -34,17 +34,15 @@ import com.salesforce.op._ import com.salesforce.op.features.FeatureLike import com.salesforce.op.features.types._ import com.salesforce.op.stages.MetadataParam -import com.salesforce.op.stages.impl.feature._ import com.salesforce.op.stages.base.binary.{BinaryEstimator, BinaryModel} -import com.salesforce.op.stages.impl.feature.{HashSpaceStrategy, RealNNVectorizer, SmartTextMapVectorizer} -import com.salesforce.op.test.{OpEstimatorSpec, TestFeatureBuilder, TestSparkContext} +import com.salesforce.op.stages.impl.feature.{HashSpaceStrategy, RealNNVectorizer, SmartTextMapVectorizer, _} +import com.salesforce.op.test.{OpEstimatorSpec, TestFeatureBuilder} import com.salesforce.op.utils.spark.RichMetadata._ import com.salesforce.op.utils.spark.{OpVectorColumnMetadata, OpVectorMetadata} -import org.apache.log4j.Level +import org.apache.spark.SparkException import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.sql.types.Metadata import org.apache.spark.sql.{DataFrame, Row} -import org.apache.spark.SparkException import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner @@ -70,7 +68,9 @@ case class TextRawData @RunWith(classOf[JUnitRunner]) class SanityCheckerTest extends OpEstimatorSpec[OPVector, BinaryModel[RealNN, OPVector, OPVector], - BinaryEstimator[RealNN, OPVector, OPVector]] with TestSparkContext { + BinaryEstimator[RealNN, OPVector, OPVector]] { + + override def specName: String = Spec[SanityChecker] // loggingLevel(Level.INFO) @@ -87,13 +87,11 @@ class SanityCheckerTest extends OpEstimatorSpec[OPVector, BinaryModel[RealNN, OP TextRawData("9", 0.0, Map("beverage" -> "tea")), TextRawData("10", 0.0, Map("beverage" -> "coffee")), TextRawData("11", 0.0, Map("beverage" -> "water")) - ).map( textRawData => - ( - textRawData.id.toText, - textRawData.target.toRealNN, - textRawData.textMap.toTextMap - ) - ) + ).map( textRawData => ( + textRawData.id.toText, + textRawData.target.toRealNN, + textRawData.textMap.toTextMap + )) val (textData, id, target, textMap) = TestFeatureBuilder("id", "target", "textMap", textRawData) val targetResponse: FeatureLike[RealNN] = target.copy(isResponse = true) diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/regression/OpDecisionTreeRegressorTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/regression/OpDecisionTreeRegressorTest.scala index 310d93baa0..0b4554f3a5 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/regression/OpDecisionTreeRegressorTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/regression/OpDecisionTreeRegressorTest.scala @@ -44,6 +44,8 @@ class OpDecisionTreeRegressorTest extends OpEstimatorSpec[Prediction, OpPredictorWrapperModel[DecisionTreeRegressionModel], OpPredictorWrapper[DecisionTreeRegressor, DecisionTreeRegressionModel]] with PredictionEquality { + override def specName: String = Spec[OpDecisionTreeRegressor] + val (inputData, rawLabel, features) = TestFeatureBuilder( Seq[(RealNN, OPVector)]( (10.0.toRealNN, Vectors.dense(1.0, 4.3, 1.3).toOPVector), diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/regression/OpGBTRegressorTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/regression/OpGBTRegressorTest.scala index 3022def8d5..24e896188a 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/regression/OpGBTRegressorTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/regression/OpGBTRegressorTest.scala @@ -43,6 +43,8 @@ import org.scalatest.junit.JUnitRunner class OpGBTRegressorTest extends OpEstimatorSpec[Prediction, OpPredictorWrapperModel[GBTRegressionModel], OpPredictorWrapper[GBTRegressor, GBTRegressionModel]] with PredictionEquality { + override def specName: String = Spec[OpGBTRegressor] + val (inputData, rawLabel, features) = TestFeatureBuilder( Seq[(RealNN, OPVector)]( (10.0.toRealNN, Vectors.dense(1.0, 4.3, 1.3).toOPVector), diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/regression/OpGeneralizedLinearRegressionTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/regression/OpGeneralizedLinearRegressionTest.scala index 4d313a45ee..c302ca9af2 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/regression/OpGeneralizedLinearRegressionTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/regression/OpGeneralizedLinearRegressionTest.scala @@ -44,6 +44,8 @@ class OpGeneralizedLinearRegressionTest extends OpEstimatorSpec[Prediction, OpPredictorWrapperModel[GeneralizedLinearRegressionModel], OpPredictorWrapper[GeneralizedLinearRegression, GeneralizedLinearRegressionModel]] with PredictionEquality { + override def specName: String = Spec[OpGeneralizedLinearRegression] + val (inputData, rawLabel, features) = TestFeatureBuilder( Seq[(RealNN, OPVector)]( (10.0.toRealNN, Vectors.dense(1.0, 4.3, 1.3).toOPVector), @@ -70,14 +72,14 @@ class OpGeneralizedLinearRegressionTest extends OpEstimatorSpec[Prediction, .setRegParam(0.1) .setFitIntercept(true) .setTol(1E-4) - .setSolver("normal") + .setSolver("irls") estimator.fit(inputData) estimator.predictor.getMaxIter shouldBe 10 estimator.predictor.getRegParam shouldBe 0.1 estimator.predictor.getFitIntercept shouldBe true estimator.predictor.getTol shouldBe 1E-4 - estimator.predictor.getSolver shouldBe "normal" + estimator.predictor.getSolver shouldBe "irls" } } diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/regression/OpLinearRegressionTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/regression/OpLinearRegressionTest.scala index d21ca781dc..c7f346699e 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/regression/OpLinearRegressionTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/regression/OpLinearRegressionTest.scala @@ -44,6 +44,8 @@ import org.scalatest.junit.JUnitRunner class OpLinearRegressionTest extends OpEstimatorSpec[Prediction, OpPredictorWrapperModel[LinearRegressionModel], OpPredictorWrapper[LinearRegression, LinearRegressionModel]] with PredictionEquality { + override def specName: String = Spec[OpLinearRegression] + val (inputData, rawLabel, features) = TestFeatureBuilder( Seq[(RealNN, OPVector)]( (10.0.toRealNN, Vectors.dense(1.0, 4.3, 1.3).toOPVector), diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/regression/OpRandomForestRegressorTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/regression/OpRandomForestRegressorTest.scala index f989595c7c..b605a4a60a 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/regression/OpRandomForestRegressorTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/regression/OpRandomForestRegressorTest.scala @@ -44,6 +44,8 @@ class OpRandomForestRegressorTest extends OpEstimatorSpec[Prediction, OpPredictorWrapperModel[RandomForestRegressionModel], OpPredictorWrapper[RandomForestRegressor, RandomForestRegressionModel]] with PredictionEquality { + override def specName: String = Spec[OpRandomForestRegressor] + val (inputData, rawLabel, features) = TestFeatureBuilder( Seq[(RealNN, OPVector)]( (10.0.toRealNN, Vectors.dense(1.0, 4.3, 1.3).toOPVector), diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/regression/OpRegressionModelTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/regression/OpRegressionModelTest.scala index 79c0b6be0a..e3c0fc0703 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/regression/OpRegressionModelTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/regression/OpRegressionModelTest.scala @@ -34,6 +34,7 @@ import com.salesforce.op.features.types.{Prediction, RealNN} import com.salesforce.op.stages.sparkwrappers.specific.SparkModelConverter.toOP import com.salesforce.op.test._ import com.salesforce.op.testkit._ +import ml.dmlc.xgboost4j.scala.spark.{OpXGBoost, OpXGBoostQuietLogging, XGBoostRegressor} import org.apache.spark.ml.regression._ import org.apache.spark.sql.DataFrame import org.junit.runner.RunWith @@ -41,7 +42,7 @@ import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner @RunWith(classOf[JUnitRunner]) -class OpRegressionModelTest extends FlatSpec with TestSparkContext { +class OpRegressionModelTest extends FlatSpec with TestSparkContext with OpXGBoostQuietLogging { private val label = RandomIntegral.integrals(0, 2).limit(1000) .map{ v => RealNN(v.value.map(_.toDouble).getOrElse(0.0)) } @@ -58,11 +59,9 @@ class OpRegressionModelTest extends FlatSpec with TestSparkContext { .fit(rawDF) val op = toOP(spk, spk.uid).setInput(labelF, featureV) - compareOutputs(spk.transform(rawDF), op.transform(rawDF)) } - Spec[OpLinearRegressionModel] should "produce the same values as the spark version" in { val spk = new LinearRegression() .setFeaturesCol(featureV.name) @@ -70,7 +69,6 @@ class OpRegressionModelTest extends FlatSpec with TestSparkContext { .fit(rawDF) val op = toOP(spk, spk.uid).setInput(labelF, featureV) - compareOutputs(spk.transform(rawDF), op.transform(rawDF)) } @@ -81,7 +79,6 @@ class OpRegressionModelTest extends FlatSpec with TestSparkContext { .fit(rawDF) val op = toOP(spk, spk.uid).setInput(labelF, featureV) - compareOutputs(spk.transform(rawDF), op.transform(rawDF)) } @@ -92,7 +89,6 @@ class OpRegressionModelTest extends FlatSpec with TestSparkContext { .fit(rawDF) val op = toOP(spk, spk.uid).setInput(labelF, featureV) - compareOutputs(spk.transform(rawDF), op.transform(rawDF)) } @@ -103,7 +99,17 @@ class OpRegressionModelTest extends FlatSpec with TestSparkContext { .fit(rawDF) val op = toOP(spk, spk.uid).setInput(labelF, featureV) + compareOutputs(spk.transform(rawDF), op.transform(rawDF)) + } + Spec[OpXGBoostRegressionModel] should "produce the same values as the spark version" in { + val reg = new XGBoostRegressor() + reg.set(reg.trackerConf, OpXGBoost.DefaultTrackerConf) + .setFeaturesCol(featureV.name) + .setLabelCol(labelF.name) + val spk = reg.fit(rawDF) + + val op = toOP(spk, spk.uid).setInput(labelF, featureV) compareOutputs(spk.transform(rawDF), op.transform(rawDF)) } diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/regression/OpXGBoostRegressorTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/regression/OpXGBoostRegressorTest.scala new file mode 100644 index 0000000000..db4498638c --- /dev/null +++ b/core/src/test/scala/com/salesforce/op/stages/impl/regression/OpXGBoostRegressorTest.scala @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2017, Salesforce.com, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * * Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.salesforce.op.stages.impl.regression + +import com.salesforce.op.features.types._ +import com.salesforce.op.stages.impl.PredictionEquality +import com.salesforce.op.stages.sparkwrappers.specific.{OpPredictorWrapper, OpPredictorWrapperModel} +import com.salesforce.op.test._ +import ml.dmlc.xgboost4j.scala.spark.{OpXGBoostQuietLogging, XGBoostRegressionModel, XGBoostRegressor} +import org.apache.spark.ml.linalg.Vectors +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner + + +@RunWith(classOf[JUnitRunner]) +class OpXGBoostRegressorTest extends OpEstimatorSpec[Prediction, OpPredictorWrapperModel[XGBoostRegressionModel], + OpPredictorWrapper[XGBoostRegressor, XGBoostRegressionModel]] + with PredictionEquality with OpXGBoostQuietLogging { + + override def specName: String = Spec[OpXGBoostRegressor] + + val rawData = Seq( + (10.0, Vectors.dense(1.0, 4.3, 1.3)), + (20.0, Vectors.dense(2.0, 0.3, 0.1)), + (30.0, Vectors.dense(3.0, 3.9, 4.3)), + (40.0, Vectors.dense(4.0, 1.3, 0.9)), + (50.0, Vectors.dense(5.0, 4.7, 1.3)) + ).map { case (l, v) => l.toRealNN -> v.toOPVector } + + val (inputData, label, features) = TestFeatureBuilder("label", "features", rawData) + + val estimator = new OpXGBoostRegressor().setInput(label.copy(isResponse = true), features) + estimator.setSilent(1) + + val expectedResult = Seq( + Prediction(1.9250000715255737), + Prediction(8.780000686645508), + Prediction(8.780000686645508), + Prediction(8.780000686645508), + Prediction(8.780000686645508) + ) + + it should "allow the user to set the desired spark parameters" in { + estimator.setMaxDepth(18).setBaseScore(0.12345).setSkipDrop(0.6234) + estimator.fit(inputData) + estimator.predictor.getMaxDepth shouldBe 18 + estimator.predictor.getBaseScore shouldBe 0.12345 + estimator.predictor.getSkipDrop shouldBe 0.6234 + + } +} diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelectorTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelectorTest.scala index 2cb2a1619b..53a14f35f5 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelectorTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelectorTest.scala @@ -72,7 +72,7 @@ class RegressionModelSelectorTest extends FlatSpec with TestSparkContext with Co .addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0)) .addGrid(lr.maxIter, Array(10, 100)) .addGrid(lr.regParam, Array(0.0)) - .addGrid(lr.solver, Array("lbfgs")) + .addGrid(lr.solver, Array("l-bfgs")) .build() val rf = new OpRandomForestRegressor() diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/selector/ModelSelectorTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/selector/ModelSelectorTest.scala index ad1307386c..2c9dc927f8 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/selector/ModelSelectorTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/selector/ModelSelectorTest.scala @@ -66,12 +66,12 @@ class ModelSelectorTest extends OpEstimatorSpec[Prediction, SelectedModel, Model // Generate positive observations following a distribution ~ N((0.0, 0.0, 0.0), I_3) val positiveData = - normalVectorRDD(spark.sparkContext, bigCount, 3, seed = seed) + normalVectorRDD(sc, bigCount, 3, seed = seed) .map(v => 1.0 -> Vectors.dense(v.toArray)) // Generate negative observations following a distribution ~ N((10.0, 10.0, 10.0), I_3) val negativeData = - normalVectorRDD(spark.sparkContext, smallCount, 3, seed = seed) + normalVectorRDD(sc, smallCount, 3, seed = seed) .map(v => 0.0 -> Vectors.dense(v.toArray.map(_ + 10.0))) val data = positiveData.union(negativeData).toDF("label", "features") diff --git a/features/src/main/scala/com/salesforce/op/aggregators/ExtendedMultiset.scala b/features/src/main/scala/com/salesforce/op/aggregators/ExtendedMultiset.scala index 8718c94fb8..660fb74e23 100644 --- a/features/src/main/scala/com/salesforce/op/aggregators/ExtendedMultiset.scala +++ b/features/src/main/scala/com/salesforce/op/aggregators/ExtendedMultiset.scala @@ -41,6 +41,8 @@ import com.twitter.algebird._ * However, order does not matter, so {a, a, b} and {a, b, a} are the same multiset. */ trait ExtendedMultiset extends MapMonoid[String, Long] with Group[Map[String, Long]] { + override def negate(kv: Map[String, Long]): Map[String, Long] = kv.mapValues { v => -v } + override def minus(x: Map[String, Long], y: Map[String, Long]): Map[String, Long] = { val keys = x.keySet ++ y.keySet val kvPairs = keys map (k => k -> (x.getOrElse(k, 0L) - y.getOrElse(k, 0L))) filter (_._2 != 0L) diff --git a/features/src/main/scala/com/salesforce/op/features/types/Maps.scala b/features/src/main/scala/com/salesforce/op/features/types/Maps.scala index bc3e72b7f7..a285978fcb 100644 --- a/features/src/main/scala/com/salesforce/op/features/types/Maps.scala +++ b/features/src/main/scala/com/salesforce/op/features/types/Maps.scala @@ -343,6 +343,13 @@ class Prediction private[op](value: Map[String, Double]) extends RealMap(value) val probKeys = keysStartsWith(ProbabilityName) if (probKeys.nonEmpty) probKeys.map(value) else Array(value(PredictionName)) } + + override def toString: String = { + val rawPred = rawPrediction.mkString("Array(", ", ", ")") + val prob = probability.mkString("Array(", ", ", ")") + s"${getClass.getSimpleName}(prediction = $prediction, rawPrediction = $rawPred, probability = $prob)" + } + } object Prediction { object Keys { diff --git a/features/src/test/scala/com/salesforce/op/features/types/FeatureTypeTest.scala b/features/src/test/scala/com/salesforce/op/features/types/FeatureTypeTest.scala index 1efa062895..7fb921341f 100644 --- a/features/src/test/scala/com/salesforce/op/features/types/FeatureTypeTest.scala +++ b/features/src/test/scala/com/salesforce/op/features/types/FeatureTypeTest.scala @@ -211,12 +211,16 @@ class FeatureTypeTest extends PropSpec with PropertyChecks with TestCommon { property("toString should return a valid string") { forAll(featureTypesVals) { ft => val actual = ft.toString - val v = ft.value match { + val v = ft match { case _ if ft.isEmpty => "" - case Seq(lat: Double, lon: Double, acc: Double) if ft.isInstanceOf[Geolocation] => - f"$lat%.5f, $lon%.5f, ${GeolocationAccuracy.withValue(acc.toInt)}" - case t: TraversableOnce[_] => t.mkString(", ") - case x => x.toString + case g: Geolocation => + f"${g.lat}%.5f, ${g.lon}%.5f, ${g.accuracy}" + case p: Prediction => + val rawPred = p.rawPrediction.mkString("Array(", ", ", ")") + val prob = p.probability.mkString("Array(", ", ", ")") + s"prediction = ${p.prediction}, rawPrediction = $rawPred, probability = $prob" + case SomeValue(v: TraversableOnce[_]) => v.mkString(", ") + case t => t.value.toString } val expected = s"${ft.getClass.getSimpleName}($v)" diff --git a/features/src/test/scala/com/salesforce/op/features/types/PredictionTest.scala b/features/src/test/scala/com/salesforce/op/features/types/PredictionTest.scala index 48c423fe1d..d36e1f94aa 100644 --- a/features/src/test/scala/com/salesforce/op/features/types/PredictionTest.scala +++ b/features/src/test/scala/com/salesforce/op/features/types/PredictionTest.scala @@ -79,6 +79,14 @@ class PredictionTest extends FlatSpec with TestCommon { Prediction(1.0, Array(2.0, 3.0), Array.empty[Double]).score shouldBe Array(1.0) Prediction(1.0, Array.empty[Double], Array(2.0, 3.0)).score shouldBe Array(2.0, 3.0) } + it should "have a nice .toString method implementation" in { + Prediction(4.0).toString shouldBe + "Prediction(prediction = 4.0, rawPrediction = Array(), probability = Array())" + Prediction(1.0, Array(2.0, 3.0), Array.empty[Double]).toString shouldBe + "Prediction(prediction = 1.0, rawPrediction = Array(2.0, 3.0), probability = Array())" + Prediction(1.0, Array.empty[Double], Array(2.0, 3.0)).toString shouldBe + "Prediction(prediction = 1.0, rawPrediction = Array(), probability = Array(2.0, 3.0))" + } private def assertPredictionError(f: => Unit) = intercept[NonNullableEmptyException](f).getMessage shouldBe diff --git a/features/src/test/scala/com/salesforce/op/utils/spark/RichVectorTest.scala b/features/src/test/scala/com/salesforce/op/utils/spark/RichVectorTest.scala index 889a93cf3e..1528bba3ee 100644 --- a/features/src/test/scala/com/salesforce/op/utils/spark/RichVectorTest.scala +++ b/features/src/test/scala/com/salesforce/op/utils/spark/RichVectorTest.scala @@ -52,7 +52,7 @@ class RichVectorTest extends PropSpec with PropertyChecks with TestSparkContext import VectorGenerators._ import com.salesforce.op.utils.spark.RichVector._ - lazy val sparseVevtorsRDDGen = RDDGenerator.genRDD[Vector](spark.sparkContext)(sparseVectorGen) + lazy val sparseVevtorsRDDGen = RDDGenerator.genRDD[Vector](sc)(sparseVectorGen) property("Vectors should error on size mismatch") { forAll(sparseVectorGen) { sparse: SparseVector => diff --git a/readers/src/test/scala/com/salesforce/op/readers/DataGenerationTest.scala b/readers/src/test/scala/com/salesforce/op/readers/DataGenerationTest.scala index 0b152ff17b..a7a6065a96 100644 --- a/readers/src/test/scala/com/salesforce/op/readers/DataGenerationTest.scala +++ b/readers/src/test/scala/com/salesforce/op/readers/DataGenerationTest.scala @@ -94,7 +94,7 @@ class DataGenerationTest extends FlatSpec with PassengerSparkFixtureTest { Map("Female" -> "string"), Map("Female" -> 1.0), Map("Female" -> false)), Row("3", null, null, List("Male"), 186, 96, "this is a description", List(1471046600), Map("Male" -> "string"), Map("Male" -> 1.0), Map("Male" -> false)), - Row("4", false, 50, List("Male"), 363, 172, "this is a description stuff", List(1471046400, 1471046300), + Row("4", false, 50, List("Male"), 363, 172, "stuff this is a description", List(1471046300, 1471046400), Map("Male" -> "string string"), Map("Male" -> 2.0), Map("Male" -> false)), Row("5", null, 2, List("Female"), 0.0, 67, "", List(1471046100), Map("Female" -> "string"), Map("Female" -> 1.0), Map("Female" -> false)), @@ -142,8 +142,8 @@ class DataGenerationTest extends FlatSpec with PassengerSparkFixtureTest { Row("5", null, 2, List("Female"), 0.0, 67, "", List(1471046100), Map("Female" -> "string"), Map("Female" -> 1.0), Map("Female" -> false)), Row("6", true, null, null, 0.0, null, null, null, null, null, null), - Row("4", null, 50, List("Male"), 0.0, 248, "this is a description stuff stuff", - List(1471046400, 1471046400, 1471046300), Map("Male" -> "string string string"), + Row("4", null, 50, List("Male"), 0.0, 248, "stuff stuff this is a description", + List(1471046400, 1471046300, 1471046400), Map("Male" -> "string string string"), Map("Male" -> 3.0), Map("Male" -> false)) ) val passenger4 = dataSet.filter(_.get(0) == "4").head diff --git a/readers/src/test/scala/com/salesforce/op/readers/JoinedDataReaderDataGenerationTest.scala b/readers/src/test/scala/com/salesforce/op/readers/JoinedDataReaderDataGenerationTest.scala index 4d0c78ed03..91a43ffc3f 100644 --- a/readers/src/test/scala/com/salesforce/op/readers/JoinedDataReaderDataGenerationTest.scala +++ b/readers/src/test/scala/com/salesforce/op/readers/JoinedDataReaderDataGenerationTest.scala @@ -290,7 +290,7 @@ class JoinedDataReaderDataGenerationTest extends FlatSpec with PassengerSparkFix aggregatedData.collect(description) should contain theSameElementsAs Array(Text.empty, Text.empty, Text.empty, Text(""), - Text("this is a description stuff this is a description stuff this is a description stuff"), + Text("stuff this is a description stuff this is a description stuff this is a description"), Text("this is a description")) aggregatedData.collect(stringMap) should contain theSameElementsAs @@ -300,7 +300,7 @@ class JoinedDataReaderDataGenerationTest extends FlatSpec with PassengerSparkFix aggregatedData.collect(boarded) should contain theSameElementsAs Array(DateList.empty, DateList.empty, DateList(Array(1471046100L)), DateList(Array(1471046400L)), - DateList(Array(1471046400L, 1471046300L, 1471046400L, 1471046300L, 1471046400L, 1471046300L)), + DateList(Array(1471046300L, 1471046400L, 1471046300L, 1471046400L, 1471046300L, 1471046400L)), DateList(Array(1471046600L))) // height has a special integration window so this features tests that things included in other diff --git a/utils/build.gradle b/utils/build.gradle index 5f12c8accc..f18024ec72 100644 --- a/utils/build.gradle +++ b/utils/build.gradle @@ -16,8 +16,8 @@ dependencies { compile "com.twitter:algebird-core_$scalaVersion:$algebirdVersion" // Twitter Chill - compile ("com.twitter:chill-avro_$scalaVersion:$chillAvroVersion") { exclude group: "org.apache.avro", module: "avro" } - compile "com.twitter:chill-algebird_$scalaVersion:$chillAvroVersion" + compile ("com.twitter:chill-avro_$scalaVersion:$chillVersion") { exclude group: "org.apache.avro", module: "avro" } + compile "com.twitter:chill-algebird_$scalaVersion:$chillVersion" // Lucene - (geo location) compile "org.apache.lucene:lucene-spatial3d:$luceneVersion" diff --git a/utils/src/main/scala/com/salesforce/op/utils/reflection/ReflectionUtils.scala b/utils/src/main/scala/com/salesforce/op/utils/reflection/ReflectionUtils.scala index 0f8645c633..d317e35ef3 100644 --- a/utils/src/main/scala/com/salesforce/op/utils/reflection/ReflectionUtils.scala +++ b/utils/src/main/scala/com/salesforce/op/utils/reflection/ReflectionUtils.scala @@ -136,43 +136,57 @@ object ReflectionUtils { /** * Find setter methods for the provided method name - * @param instance class to find method for - * @param setterName name of method to find - * @param classLoader class loader to use - * @tparam T type of instance to copy - * @return reflected method to set type + * @param instance class to find method for + * @param setterName name of method to find + * @param args argument values + * @param argsCount optional number of arguments to match + * @param classLoader class loader to use + * @tparam T type of instance to copy + * @return reflected method to set type */ def reflectSetterMethod[T: ClassTag]( instance: T, setterName: String, - inputs: Seq[Any], + args: Seq[Any], + argsCount: Option[Int] = None, classLoader: ClassLoader = defaultClassLoader ): Any = { - reflectMethod(instance, s"set$setterName", classLoader).apply(inputs: _*) + reflectMethod(instance, s"set$setterName", argsCount, classLoader).apply(args: _*) } /** * Find setter methods for the provided method name - * @param instance class to find method for - * @param methodName name of method to find - * @param classLoader class loader to use - * @tparam T type of instance to copy - * @return reflected method to set type + * @param instance class to find method for + * @param methodName name of method to find + * @param argsCount optional number of arguments to match + * @param classLoader class loader to use + * @tparam T type of instance to copy + * @return reflected method to set type */ def reflectMethod[T: ClassTag]( instance: T, methodName: String, + argsCount: Option[Int] = None, classLoader: ClassLoader = defaultClassLoader ): MethodMirror = { val klazz = instance.getClass val (runtimeMirror, classMirror) = mirrors(klazz, classLoader) val classType = runtimeMirror.classSymbol(klazz).toType val tMembers = classType.members - val methods = tMembers.collect { case m: MethodSymbol if m.isMethod && - termNameStr(m.name).compareToIgnoreCase(methodName) == 0 => m + val methodsWithParams = tMembers.collect { case m: MethodSymbol => m -> m.paramLists.flatten } + val methods = methodsWithParams.collect { + case (m: MethodSymbol, params) if m.isMethod && + termNameStr(m.name).compareToIgnoreCase(methodName) == 0 && + (argsCount.isEmpty || argsCount.contains(params.length)) => m -> params + }.toList.sortBy(-_._2.length).map(_._1) + + methods match { + case method :: _ => + val instanceMirror = runtimeMirror.reflect(instance) + instanceMirror.reflectMethod(method) + case Nil => + throw new RuntimeException(s"Method with name '$methodName' was not found on instance of type: $klazz") } - val instanceMirror = runtimeMirror.reflect(instance) - instanceMirror.reflectMethod(methods.head) } /** diff --git a/utils/src/main/scala/com/salesforce/op/utils/spark/OpSparkListener.scala b/utils/src/main/scala/com/salesforce/op/utils/spark/OpSparkListener.scala index 38404b87e5..2e3278af6c 100644 --- a/utils/src/main/scala/com/salesforce/op/utils/spark/OpSparkListener.scala +++ b/utils/src/main/scala/com/salesforce/op/utils/spark/OpSparkListener.scala @@ -211,7 +211,7 @@ object StageMetrics { def toMillis(ns: Long): Long = ns / 1000000 // some time values are in nanoseconds so we convert those StageMetrics( stageId = si.stageId, - attemptId = si.attemptId, + attemptId = si.attemptNumber, name = si.name, numTasks = si.numTasks, parentIds = si.parentIds, @@ -221,7 +221,7 @@ object StageMetrics { else if (si.completionTime.isDefined) "succeeded" else "running" }, - // TODO: consider also collection all the accumilables - might be costly + // TODO: consider also collecting all the accumilables - might be costly numAccumulables = si.accumulables.size, failureReason = si.failureReason, submissionTime = si.submissionTime, diff --git a/utils/src/test/scala/com/salesforce/op/utils/avro/RichGenericRecordTest.scala b/utils/src/test/scala/com/salesforce/op/utils/avro/RichGenericRecordTest.scala index a2ed6ea496..28396be373 100644 --- a/utils/src/test/scala/com/salesforce/op/utils/avro/RichGenericRecordTest.scala +++ b/utils/src/test/scala/com/salesforce/op/utils/avro/RichGenericRecordTest.scala @@ -39,16 +39,13 @@ import org.scalatest.{FlatSpec, Matchers} @RunWith(classOf[JUnitRunner]) -class RichGenericRecordTest extends FlatSpec - with Matchers - with TestSparkContext - with TestCommon { +class RichGenericRecordTest extends FlatSpec with Matchers with TestSparkContext with TestCommon { import com.salesforce.op.utils.avro.RichGenericRecord._ val dataPath = resourceFile(parent = "../test-data", name = s"PassengerData.avro").getPath val passengerData = AvroInOut.read[GenericRecord](dataPath).getOrElse(throw new Exception("Couldn't read data")) - val firstRow = passengerData.first + val firstRow = passengerData.sortBy(_.get("passengerId").toString.toInt).first Spec[RichGenericRecord] should "get value of Int" in { val id = firstRow.getValue[Int]("passengerId") diff --git a/utils/src/test/scala/com/salesforce/op/utils/io/avro/AvroInOutTest.scala b/utils/src/test/scala/com/salesforce/op/utils/io/avro/AvroInOutTest.scala index 601b5cabc8..b8ca7c42b1 100644 --- a/utils/src/test/scala/com/salesforce/op/utils/io/avro/AvroInOutTest.scala +++ b/utils/src/test/scala/com/salesforce/op/utils/io/avro/AvroInOutTest.scala @@ -47,7 +47,7 @@ class AvroInOutTest extends FlatSpec with TestSparkContext { val avroSchemaPath = s"$testDataDir/PassengerDataAll.avsc" val avroFilePath = s"$testDataDir/PassengerDataAll.avro" val avroFileRecordCount = 891 - val hdfs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration) + val hdfs: FileSystem = FileSystem.get(sc.hadoopConfiguration) lazy val avroTemp: String = tempDir + "/avro-inout-test" Spec(AvroInOut.getClass) should "creates RDD from an avro file" in { diff --git a/utils/src/test/scala/com/salesforce/op/utils/reflection/ReflectionUtilsTest.scala b/utils/src/test/scala/com/salesforce/op/utils/reflection/ReflectionUtilsTest.scala index 0ba3f955dd..5bd6c1ef53 100644 --- a/utils/src/test/scala/com/salesforce/op/utils/reflection/ReflectionUtilsTest.scala +++ b/utils/src/test/scala/com/salesforce/op/utils/reflection/ReflectionUtilsTest.scala @@ -80,6 +80,10 @@ class TestClassVar { } private def getValue: Int = 2 def getValuePerf: Int = 2 + + def boo(x: Int, y: Int): Int = boo(x + y) + def boo(x: Int): Int = x + def boo(): Int = boo(1) } @RunWith(classOf[JUnitRunner]) @@ -226,5 +230,28 @@ class ReflectionUtilsTest extends FlatSpec with Matchers { elapsedReflect should be <= 10 * actual } + it should "error on reflecting a non existent method" in { + val myClass = new TestClassVar() + val err = intercept[RuntimeException](ReflectionUtils.reflectMethod(myClass, "non_existent")) + err.getMessage shouldBe + s"Method with name 'non_existent' was not found on instance of type: ${myClass.getClass}" + } + + it should "reflect methods with largest number of arguments by default" in { + val myClass = new TestClassVar() + val boo = ReflectionUtils.reflectMethod(myClass, "boo", argsCount = None) + boo(2, 3) shouldBe 5 + } + + it should "reflect methods with various number of arguments" in { + val myClass = new TestClassVar() + val boo = ReflectionUtils.reflectMethod(myClass, "boo", argsCount = Some(0)) + val boo1 = ReflectionUtils.reflectMethod(myClass, "boo", argsCount = Some(1)) + val boo2 = ReflectionUtils.reflectMethod(myClass, "boo", argsCount = Some(2)) + boo() shouldBe 1 + boo1(2) shouldBe 2 + boo2(2, 3) shouldBe 5 + } + } diff --git a/utils/src/test/scala/com/salesforce/op/utils/spark/OpSparkListenerTest.scala b/utils/src/test/scala/com/salesforce/op/utils/spark/OpSparkListenerTest.scala index ca67ffb646..90794b82d5 100644 --- a/utils/src/test/scala/com/salesforce/op/utils/spark/OpSparkListenerTest.scala +++ b/utils/src/test/scala/com/salesforce/op/utils/spark/OpSparkListenerTest.scala @@ -47,7 +47,7 @@ class OpSparkListenerTest extends FlatSpec with TableDrivenPropertyChecks with T sparkAppender.setName("spark-appender") sparkAppender.setThreshold(Level.INFO) sparkAppender.setLayout(new org.apache.log4j.PatternLayout) - LogManager.getLogger("com.salesforce.op.utils.spark.OpSparkListener").setLevel(Level.INFO) + LogManager.getLogger(classOf[OpSparkListener]).setLevel(Level.INFO) Logger.getRootLogger.addAppender(sparkAppender) sparkAppender } @@ -84,18 +84,15 @@ class OpSparkListenerTest extends FlatSpec with TableDrivenPropertyChecks with T it should "log messages for listener initialization, stage completion, app completion" in { val firstStage = listener.metrics.stageMetrics.head val logPrefix = listener.logPrefix - val logs = sparkLogAppender.logs + val logs = sparkLogAppender.logs.map(_.getMessage.toString) val messages = Table("Spark Log Messages", - "Instantiated spark listener: com.salesforce.op.utils.spark.OpSparkListener. Log Prefix %s".format(logPrefix), + "Instantiated spark listener: %s. Log Prefix %s".format(classOf[OpSparkListener].getName, logPrefix), "%s,APP_TIME_MS:%s".format(logPrefix, listener.metrics.appEndTime - listener.metrics.appStartTime), "%s,STAGE:%s,MEMORY_SPILLED_BYTES:%s,GC_TIME_MS:%s,STAGE_TIME_MS:%s".format( logPrefix, firstStage.name, firstStage.memoryBytesSpilled, firstStage.jvmGCTime, firstStage.executorRunTime ) ) - - forAll(messages) { m => - logs.map(x => x.getMessage.toString).contains(m) shouldBe true - } + forAll(messages) { m => logs.contains(m) shouldBe true } } } diff --git a/utils/src/test/scala/com/salesforce/op/utils/spark/RichRDDTest.scala b/utils/src/test/scala/com/salesforce/op/utils/spark/RichRDDTest.scala index 0a35e7de0a..4d161ac7d8 100644 --- a/utils/src/test/scala/com/salesforce/op/utils/spark/RichRDDTest.scala +++ b/utils/src/test/scala/com/salesforce/op/utils/spark/RichRDDTest.scala @@ -48,7 +48,7 @@ import org.scalatest.prop.PropertyChecks class RichRDDTest extends PropSpec with PropertyChecks with TestSparkContext { import com.salesforce.op.utils.spark.RichRDD._ - val data = RDDGenerator.genRDD[(Int, Int)](spark.sparkContext)(Arbitrary.arbitrary[(Int, Int)]) + val data = RDDGenerator.genRDD[(Int, Int)](sc)(Arbitrary.arbitrary[(Int, Int)]) property("save as a text file") { forAll(data) { rdd =>