From 90f32a2dbaefe6395ab23b5a9047049417a2c448 Mon Sep 17 00:00:00 2001 From: Matthew Tovbin Date: Mon, 1 Apr 2019 22:56:41 -0700 Subject: [PATCH 1/7] MLeap fixes: 1) contruct dataframe with schema + meta 2) use explicit models instead of reflection --- .../op/local/OpWorkflowModelLocal.scala | 129 +++++++++++++++--- .../op/local/OpWorkflowRunnerLocal.scala | 5 +- .../op/local/OpWorkflowRunnerLocalTest.scala | 35 +++-- 3 files changed, 138 insertions(+), 31 deletions(-) diff --git a/local/src/main/scala/com/salesforce/op/local/OpWorkflowModelLocal.scala b/local/src/main/scala/com/salesforce/op/local/OpWorkflowModelLocal.scala index 5ce9a5b7ca..8793854b2b 100644 --- a/local/src/main/scala/com/salesforce/op/local/OpWorkflowModelLocal.scala +++ b/local/src/main/scala/com/salesforce/op/local/OpWorkflowModelLocal.scala @@ -30,19 +30,23 @@ package com.salesforce.op.local + import java.nio.file.Paths import com.github.marschall.memoryfilesystem.MemoryFileSystemBuilder import com.salesforce.op.OpWorkflowModel +import com.salesforce.op.features.FeatureSparkTypes import com.salesforce.op.stages.sparkwrappers.generic.SparkWrapperParams import com.salesforce.op.stages.{OPStage, OpTransformer} -import com.salesforce.op.utils.reflection.ReflectionUtils.reflectMethod import ml.combust.bundle.serializer.SerializationFormat import ml.combust.bundle.{BundleContext, BundleRegistry} +import ml.combust.mleap.core.feature._ import ml.combust.mleap.runtime.MleapContext import org.apache.spark.ml.Transformer import org.apache.spark.ml.bundle.SparkBundleContext -import org.apache.spark.ml.param.ParamMap +import org.apache.spark.ml.linalg.Vector +import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.{DataFrame, Row, SparkSession} import scala.collection.mutable @@ -77,26 +81,34 @@ trait OpWorkflowModelLocal extends Serializable { ( inputs: Array[String], output: String, - modelFn: Array[AnyRef] => AnyRef + modelFn: Array[Any] => Any ) /** * Prepares a score function for local scoring * + * @param spark spark session needed for preparing scoring function, + * Once scoring function is returned the session then can be shutdown as it's not used during scoring * @return score function for local scoring */ - def scoreFunction: ScoreFunction = { + def scoreFunction(implicit spark: SparkSession): ScoreFunction = { // Prepare the stages for scoring val stagesWithIndex = model.stages.zipWithIndex + + // Prepare an empty DataFrame with transformed schema & metadata (needed for loading MLeap models) + val transformedData = makeTransformedDataFrame(model) + // Collect all OP stages val opStages = stagesWithIndex.collect { case (s: OpTransformer, i) => OPModel(s.getOutputFeatureName, s) -> i } + // Collect all Spark wrapped stages val sparkStages = stagesWithIndex.filterNot(_._1.isInstanceOf[OpTransformer]).collect { - case (s: OPStage with SparkWrapperParams[_], i) if s.getSparkMlStage().isDefined => - (s, s.getSparkMlStage().get.asInstanceOf[Transformer].copy(ParamMap.empty), i) + case (opStage: OPStage with SparkWrapperParams[_], i) if opStage.getSparkMlStage().isDefined => + val sparkStage = opStage.getSparkMlStage().get.asInstanceOf[Transformer] + (opStage, sparkStage, i) } // Convert Spark wrapped stages into MLeap models - val mleapStages = toMLeapModels(sparkStages) + val mleapStages = toMLeapModels(sparkStages, transformedData) // Combine all stages and apply the original order val allStages = (opStages ++ mleapStages).sortBy(_._2).map(_._1) @@ -114,22 +126,37 @@ trait OpWorkflowModelLocal extends Serializable { case (row, MLeapModel(inputs, output, modelFn)) => val in = inputs.map(inputName => row.get(inputName) match { case None | Some(null) => null - case Some(v) => v.asInstanceOf[AnyRef] + case Some(v) => v }) row += output -> modelFn(in) } + // Only return the result features of the model transformedRow.filterKeys(resultFeatures.contains).toMap } } /** - * Convert Spark wrapped stages into MLeal local Models + * Prepares an empty DataFrame with transformed schema & metadata (needed for loading MLeap models) + */ + private def makeTransformedDataFrame(model: OpWorkflowModel)(implicit spark: SparkSession): DataFrame = { + val rawSchema = FeatureSparkTypes.toStructType(model.rawFeatures: _*) + val df = spark.emptyDataset[Row](RowEncoder(rawSchema)) + model.stages.collect { case t: Transformer => t }.foldLeft(df) { case (d, t) => t.transform(d) } + } + + /** + * Convert Spark wrapped stages into MLeap local Models * - * @param sparkStages stages to convert + * @param sparkStages stages to convert + * @param transformedData dataset with transformed schema & metadata (needed for loading MLeap models) * @return MLeap local stages */ - private def toMLeapModels(sparkStages: Seq[(OPStage, Transformer, Int)]): Seq[(MLeapModel, Int)] = { + private def toMLeapModels + ( + sparkStages: Seq[(OPStage, Transformer, Int)], + transformedData: DataFrame + ): Seq[(MLeapModel, Int)] = { // Setup a in-memory file system for MLeap model saving/loading val emptyPath = Paths.get("") val fs = MemoryFileSystemBuilder.newEmpty().build() @@ -138,15 +165,16 @@ trait OpWorkflowModelLocal extends Serializable { val mleapRegistry = BundleRegistry("ml.combust.mleap.registry.default") val sparkRegistry = BundleRegistry("ml.combust.mleap.spark.registry.default") - // TODO - consider defining an empty Dataset with correct schema, since some Spark stages might fail to convert val sparkBundleContext = BundleContext[SparkBundleContext]( - SparkBundleContext(dataset = None, sparkRegistry), SerializationFormat.Json, sparkRegistry, fs, emptyPath) + SparkBundleContext(Option(transformedData), sparkRegistry), + SerializationFormat.Json, sparkRegistry, fs, emptyPath + ) val mleapBundleContext = BundleContext[MleapContext]( MleapContext(mleapRegistry), SerializationFormat.Json, mleapRegistry, fs, emptyPath) for { (opStage, sparkStage, i) <- sparkStages - } yield { + } yield try { val model = { // Serialize Spark model using Spark registry val opModel = sparkRegistry.opForObj[SparkBundleContext, AnyRef, AnyRef](sparkStage) @@ -157,15 +185,74 @@ trait OpWorkflowModelLocal extends Serializable { val mleapLocalModel = mleapRegistry.model[MleapContext, AnyRef](op = serializedModel.op) mleapLocalModel.load(serializedModel)(mleapBundleContext) } - // Reflect the apply method on MLeap local model - val applyMethodMirror = reflectMethod(model, "apply") - val modelFn = (x: Array[AnyRef]) => applyMethodMirror.apply(x: _*).asInstanceOf[AnyRef] - val inputs = opStage.getTransientFeatures().map(_.name) - val output = opStage.getOutputFeatureName - - MLeapModel(inputs, output, modelFn) -> i + // Prepare and return MLeap model with inputs, output and model function + MLeapModel( + inputs = opStage.getTransientFeatures().map(_.name), + output = opStage.getOutputFeatureName, + modelFn = MLeapModelConverter.modelToFunction(model) + ) -> i + } catch { + case e: Exception => + throw new RuntimeException(s"Failed to convert stage '${opStage.uid}' to MLeap stage", e) } } + + + } + +} + +private case object MLeapModelConverter { + + /** + * Convert MLeap model instance to a model apply function + * + * @param model MLeap model + * @throws RuntimeException if model type is not supported + * @return runnable model apply function + */ + def modelToFunction(model: Any): Array[Any] => Any = model match { + case m: BinarizerModel => x => m.apply(x(0).asInstanceOf[Number].doubleValue()) + case m: BucketedRandomProjectionLSHModel => x => m.apply(x(0).asInstanceOf[Vector]) + case m: BucketizerModel => x => m.apply(x(0).asInstanceOf[Number].doubleValue()) + case m: ChiSqSelectorModel => x => m.apply(x(0).asInstanceOf[Vector]) + case m: CoalesceModel => x => m.apply(x: _*) + case m: CountVectorizerModel => x => m.apply(x(0).asInstanceOf[Seq[String]]) + case m: DCTModel => x => m.apply(x(0).asInstanceOf[Vector]) + case m: ElementwiseProductModel => x => m.apply(x(0).asInstanceOf[Vector]) + case m: FeatureHasherModel => x => m.apply(x(0).asInstanceOf[Seq[Any]]) + case m: HashingTermFrequencyModel => x => m.apply(x(0).asInstanceOf[Seq[Any]]) + case m: IDFModel => x => m.apply(x(0).asInstanceOf[Vector]) + case m: ImputerModel => x => m.apply(x(0).asInstanceOf[Number].doubleValue()) + case m: InteractionModel => x => m.apply(x(0).asInstanceOf[Seq[Any]]) + case m: MathBinaryModel => x => + m.apply( + x.headOption.map(_.asInstanceOf[Number].doubleValue()), + x.lastOption.map(_.asInstanceOf[Number].doubleValue()) + ) + case m: MathUnaryModel => x => m.apply(x(0).asInstanceOf[Number].doubleValue()) + case m: MaxAbsScalerModel => x => m.apply(x(0).asInstanceOf[Vector]) + case m: MinHashLSHModel => x => m.apply(x(0).asInstanceOf[Vector]) + case m: MinMaxScalerModel => x => m.apply(x(0).asInstanceOf[Vector]) + case m: NGramModel => x => m.apply(x(0).asInstanceOf[Seq[String]]) + case m: NormalizerModel => x => m.apply(x(0).asInstanceOf[Vector]) + case m: OneHotEncoderModel => x => m.apply(x(0).asInstanceOf[Vector].toArray) + case m: PcaModel => x => m.apply(x(0).asInstanceOf[Vector]) + case m: PolynomialExpansionModel => x => m.apply(x(0).asInstanceOf[Vector]) + case m: RegexIndexerModel => x => m.apply(x(0).toString) + case m: RegexTokenizerModel => x => m.apply(x(0).toString) + case m: ReverseStringIndexerModel => x => m.apply(x(0).asInstanceOf[Number].intValue()) + case m: StandardScalerModel => x => m.apply(x(0).asInstanceOf[Vector]) + case m: StopWordsRemoverModel => x => m.apply(x(0).asInstanceOf[Seq[String]]) + case m: StringIndexerModel => x => m.apply(x(0)) + case m: StringMapModel => x => m.apply(x(0).toString) + case m: TokenizerModel => x => m.apply(x(0).toString) + case m: VectorAssemblerModel => x => m.apply(x(0).asInstanceOf[Seq[Any]]) + case m: VectorIndexerModel => x => m.apply(x(0).asInstanceOf[Vector]) + case m: VectorSlicerModel => x => m.apply(x(0).asInstanceOf[Vector]) + case m: WordLengthFilterModel => x => m.apply(x(0).asInstanceOf[Seq[String]]) + case m: WordToVectorModel => x => m.apply(x(0).asInstanceOf[Seq[String]]) + case m => throw new RuntimeException(s"Unsupported MLeap model: ${m.getClass.getName}") } } diff --git a/local/src/main/scala/com/salesforce/op/local/OpWorkflowRunnerLocal.scala b/local/src/main/scala/com/salesforce/op/local/OpWorkflowRunnerLocal.scala index d8b1a27b75..34834688d4 100644 --- a/local/src/main/scala/com/salesforce/op/local/OpWorkflowRunnerLocal.scala +++ b/local/src/main/scala/com/salesforce/op/local/OpWorkflowRunnerLocal.scala @@ -31,6 +31,7 @@ package com.salesforce.op.local import com.salesforce.op.{OpParams, OpWorkflow} +import org.apache.spark.sql.SparkSession /** @@ -52,9 +53,11 @@ class OpWorkflowRunnerLocal(val workflow: OpWorkflow) extends Serializable { * SparkSession.builder().getOrCreate().stop() * * @param params params to use during scoring + * @param spark spark session needed for preparing scoring function. + * Once scoring function is returned the session then can be shutdown as it's not used during scoring * @return score function for local scoring */ - def score(params: OpParams): ScoreFunction = { + def score(params: OpParams)(implicit spark: SparkSession): ScoreFunction = { require(params.modelLocation.isDefined, "Model location must be set in params") val model = workflow.loadModel(params.modelLocation.get) model.scoreFunction diff --git a/local/src/test/scala/com/salesforce/op/local/OpWorkflowRunnerLocalTest.scala b/local/src/test/scala/com/salesforce/op/local/OpWorkflowRunnerLocalTest.scala index 8cd9d6b424..d906727354 100644 --- a/local/src/test/scala/com/salesforce/op/local/OpWorkflowRunnerLocalTest.scala +++ b/local/src/test/scala/com/salesforce/op/local/OpWorkflowRunnerLocalTest.scala @@ -34,13 +34,13 @@ import java.nio.file.Paths import com.salesforce.op.features.types._ import com.salesforce.op.readers.DataFrameFieldNames._ -import com.salesforce.op.stages.impl.classification.BinaryClassificationModelSelector -import com.salesforce.op.stages.impl.classification.BinaryClassificationModelsToTry._ +import com.salesforce.op.stages.impl.classification.{BinaryClassificationModelSelector, OpLogisticRegression} import com.salesforce.op.stages.impl.feature.StringIndexerHandleInvalid import com.salesforce.op.test.{PassengerSparkFixtureTest, TestCommon} import com.salesforce.op.utils.spark.RichDataset._ import com.salesforce.op.utils.spark.RichRow._ import com.salesforce.op.{OpParams, OpWorkflow} +import org.apache.spark.ml.tuning.ParamGridBuilder import org.junit.runner.RunWith import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner @@ -54,26 +54,41 @@ class OpWorkflowRunnerLocalTest extends FlatSpec with PassengerSparkFixtureTest val features = Seq(height, weight, gender, description, age).transmogrify() val survivedNum = survived.occurs() + val indexed = description.indexed(handleInvalid = StringIndexerHandleInvalid.Keep) + // TODO - BUG! we seem to be loosing null label in OpStringIndexerNoFilterModel + // val indexed = description.indexed(handleInvalid = StringIndexerHandleInvalid.Keep) + // val indexedNoFilter = description.indexed() + + // TODO - BUG! might be the same problem! + // val indexed = description.indexed() + // val deindexed = indexed.deindexed(handleInvalid = IndexToStringHandleInvalid.Error) + + // TODO - try to reproduce the same bug as in E1 + // ? HOW ? + + val logReg = BinaryClassificationModelSelector.Defaults.modelsAndParams.collect { + case (lg: OpLogisticRegression, _) => lg -> new ParamGridBuilder().build() + } + val prediction = BinaryClassificationModelSelector.withTrainValidationSplit( - splitter = None, modelTypesToUse = Seq(OpLogisticRegression) + modelsAndParameters = logReg, splitter = None ).setInput(survivedNum, features).getOutput() - val workflow = new OpWorkflow().setResultFeatures(prediction, survivedNum, indexed).setReader(dataReader) + val workflow = new OpWorkflow().setReader(dataReader) + .setResultFeatures(prediction, survivedNum, indexed) lazy val model = workflow.train() - lazy val modelLocation = { val path = Paths.get(tempDir.toString, "op-runner-local-test-model").toFile.getCanonicalFile.toString model.save(path) path } - lazy val rawData = dataReader.generateDataFrame(model.rawFeatures).sort(KeyFieldName).collect().map(_.toMap) - lazy val expectedScores = model.score().sort(KeyFieldName).collect(prediction, survivedNum, indexed) + Spec(classOf[OpWorkflowRunnerLocal]) should "produce scores without Spark" in { val params = new OpParams().withValues(modelLocation = Some(modelLocation)) val scoreFn = new OpWorkflowRunnerLocal(workflow).score(params) @@ -105,13 +120,15 @@ class OpWorkflowRunnerLocalTest extends FlatSpec with PassengerSparkFixtureTest ): Unit = { scores.length shouldBe expectedScores.length for { - (score, (predV, survivedV, indexedV)) <- scores.zip(expectedScores) + ((score, (predV, survivedV, indexedV)), i) <- scores.zip(expectedScores).zipWithIndex expected = Map( prediction.name -> predV.value, survivedNum.name -> survivedV.value.get, indexed.name -> indexedV.value.get ) - } score shouldBe expected + } withClue(s"Record index $i: ") { + score shouldBe expected + } } } From a77e53b3bc3fb26e35fc7e0bfe177866017c7cea Mon Sep 17 00:00:00 2001 From: Matthew Tovbin Date: Tue, 2 Apr 2019 19:59:07 -0700 Subject: [PATCH 2/7] Fixes --- .../impl/feature/OpStringIndexerTest.scala | 2 +- .../types/FeatureTypeSparkConverter.scala | 2 ++ local/README.md | 4 +-- .../op/local/OpWorkflowModelLocal.scala | 5 ++-- .../op/local/OpWorkflowRunnerLocal.scala | 7 +++--- .../op/local/OpWorkflowRunnerLocalTest.scala | 25 ++++++------------- 6 files changed, 20 insertions(+), 25 deletions(-) diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/feature/OpStringIndexerTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/feature/OpStringIndexerTest.scala index ea39b81d30..f2ddc32eba 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/feature/OpStringIndexerTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/feature/OpStringIndexerTest.scala @@ -39,7 +39,7 @@ import org.scalatest.junit.JUnitRunner import com.salesforce.op.utils.spark.RichDataset._ @RunWith(classOf[JUnitRunner]) -class OpStringIndexerTest extends FlatSpec with TestSparkContext{ +class OpStringIndexerTest extends FlatSpec with TestSparkContext { val txtData = Seq("a", "b", "c", "a", "a", "c").map(_.toText) val (ds, txtF) = TestFeatureBuilder(txtData) diff --git a/features/src/main/scala/com/salesforce/op/features/types/FeatureTypeSparkConverter.scala b/features/src/main/scala/com/salesforce/op/features/types/FeatureTypeSparkConverter.scala index d99562d6a4..d7dfa31509 100644 --- a/features/src/main/scala/com/salesforce/op/features/types/FeatureTypeSparkConverter.scala +++ b/features/src/main/scala/com/salesforce/op/features/types/FeatureTypeSparkConverter.scala @@ -168,6 +168,7 @@ case object FeatureTypeSparkConverter { case null => None case v: Float => Some(v.toDouble) case v: Double => Some(v) + case v: Number => Some(v.doubleValue()) case v => throw new IllegalArgumentException(s"RealNN type mapping is not defined for ${v.getClass}") } case wt if wt <:< weakTypeOf[t.Real] => (value: Any) => @@ -175,6 +176,7 @@ case object FeatureTypeSparkConverter { case null => FeatureTypeDefaults.Real.value case v: Float => Some(v.toDouble) case v: Double => Some(v) + case v: Number => Some(v.doubleValue()) case v => throw new IllegalArgumentException(s"Real type mapping is not defined for ${v.getClass}") } case wt if wt <:< weakTypeOf[t.Integral] => (value: Any) => diff --git a/local/README.md b/local/README.md index aa0758a363..c9e4cb8c3b 100644 --- a/local/README.md +++ b/local/README.md @@ -30,9 +30,9 @@ val scores = rawData.map(scoreFn) Or using the local runner: ```scala -val scoreFn = new OpWorkflowRunnerLocal(workflow).score(opParams) +val scoreFn = new OpWorkflowRunnerLocal(workflow).scoreFunction(opParams) ``` - +Note: Spark Session is only required for loading the model & preparing the scoring function. Once scoring function is returned the Spark Session can be shutdown since it's not required during local scoring. ## Performance Results diff --git a/local/src/main/scala/com/salesforce/op/local/OpWorkflowModelLocal.scala b/local/src/main/scala/com/salesforce/op/local/OpWorkflowModelLocal.scala index 8793854b2b..16905de5bc 100644 --- a/local/src/main/scala/com/salesforce/op/local/OpWorkflowModelLocal.scala +++ b/local/src/main/scala/com/salesforce/op/local/OpWorkflowModelLocal.scala @@ -87,8 +87,9 @@ trait OpWorkflowModelLocal extends Serializable { /** * Prepares a score function for local scoring * - * @param spark spark session needed for preparing scoring function, - * Once scoring function is returned the session then can be shutdown as it's not used during scoring + * @param spark Spark Session needed for preparing scoring function, + * Once scoring function is returned the Spark Session can be shutdown + * since it's not required during local scoring. * @return score function for local scoring */ def scoreFunction(implicit spark: SparkSession): ScoreFunction = { diff --git a/local/src/main/scala/com/salesforce/op/local/OpWorkflowRunnerLocal.scala b/local/src/main/scala/com/salesforce/op/local/OpWorkflowRunnerLocal.scala index 34834688d4..f52c63cd41 100644 --- a/local/src/main/scala/com/salesforce/op/local/OpWorkflowRunnerLocal.scala +++ b/local/src/main/scala/com/salesforce/op/local/OpWorkflowRunnerLocal.scala @@ -53,11 +53,12 @@ class OpWorkflowRunnerLocal(val workflow: OpWorkflow) extends Serializable { * SparkSession.builder().getOrCreate().stop() * * @param params params to use during scoring - * @param spark spark session needed for preparing scoring function. - * Once scoring function is returned the session then can be shutdown as it's not used during scoring + * @param spark Spark Session needed for preparing scoring function. + * Once scoring function is returned the Spark Session can be shutdown + * since it's not required during local scoring. * @return score function for local scoring */ - def score(params: OpParams)(implicit spark: SparkSession): ScoreFunction = { + def scoreFunction(params: OpParams)(implicit spark: SparkSession): ScoreFunction = { require(params.modelLocation.isDefined, "Model location must be set in params") val model = workflow.loadModel(params.modelLocation.get) model.scoreFunction diff --git a/local/src/test/scala/com/salesforce/op/local/OpWorkflowRunnerLocalTest.scala b/local/src/test/scala/com/salesforce/op/local/OpWorkflowRunnerLocalTest.scala index d906727354..35e15f5c54 100644 --- a/local/src/test/scala/com/salesforce/op/local/OpWorkflowRunnerLocalTest.scala +++ b/local/src/test/scala/com/salesforce/op/local/OpWorkflowRunnerLocalTest.scala @@ -56,17 +56,7 @@ class OpWorkflowRunnerLocalTest extends FlatSpec with PassengerSparkFixtureTest val survivedNum = survived.occurs() val indexed = description.indexed(handleInvalid = StringIndexerHandleInvalid.Keep) - - // TODO - BUG! we seem to be loosing null label in OpStringIndexerNoFilterModel - // val indexed = description.indexed(handleInvalid = StringIndexerHandleInvalid.Keep) - // val indexedNoFilter = description.indexed() - - // TODO - BUG! might be the same problem! - // val indexed = description.indexed() - // val deindexed = indexed.deindexed(handleInvalid = IndexToStringHandleInvalid.Error) - - // TODO - try to reproduce the same bug as in E1 - // ? HOW ? + val deindexed = indexed.deindexed() val logReg = BinaryClassificationModelSelector.Defaults.modelsAndParams.collect { case (lg: OpLogisticRegression, _) => lg -> new ParamGridBuilder().build() @@ -77,7 +67,7 @@ class OpWorkflowRunnerLocalTest extends FlatSpec with PassengerSparkFixtureTest ).setInput(survivedNum, features).getOutput() val workflow = new OpWorkflow().setReader(dataReader) - .setResultFeatures(prediction, survivedNum, indexed) + .setResultFeatures(prediction, survivedNum, indexed, deindexed) lazy val model = workflow.train() lazy val modelLocation = { @@ -86,12 +76,12 @@ class OpWorkflowRunnerLocalTest extends FlatSpec with PassengerSparkFixtureTest path } lazy val rawData = dataReader.generateDataFrame(model.rawFeatures).sort(KeyFieldName).collect().map(_.toMap) - lazy val expectedScores = model.score().sort(KeyFieldName).collect(prediction, survivedNum, indexed) + lazy val expectedScores = model.score().sort(KeyFieldName).collect(prediction, survivedNum, indexed, deindexed) Spec(classOf[OpWorkflowRunnerLocal]) should "produce scores without Spark" in { val params = new OpParams().withValues(modelLocation = Some(modelLocation)) - val scoreFn = new OpWorkflowRunnerLocal(workflow).score(params) + val scoreFn = new OpWorkflowRunnerLocal(workflow).scoreFunction(params) scoreFn shouldBe a[ScoreFunction] val scores = rawData.map(scoreFn) assert(scores, expectedScores) @@ -116,15 +106,16 @@ class OpWorkflowRunnerLocalTest extends FlatSpec with PassengerSparkFixtureTest private def assert( scores: Array[Map[String, Any]], - expectedScores: Array[(Prediction, RealNN, RealNN)] + expectedScores: Array[(Prediction, RealNN, RealNN, Text)] ): Unit = { scores.length shouldBe expectedScores.length for { - ((score, (predV, survivedV, indexedV)), i) <- scores.zip(expectedScores).zipWithIndex + ((score, (predV, survivedV, indexedV, deindexedV)), i) <- scores.zip(expectedScores).zipWithIndex expected = Map( prediction.name -> predV.value, survivedNum.name -> survivedV.value.get, - indexed.name -> indexedV.value.get + indexed.name -> indexedV.value.get, + deindexed.name -> deindexedV.value.orNull ) } withClue(s"Record index $i: ") { score shouldBe expected From 05f08300b537e3a388d5794e2708c14dff6397cd Mon Sep 17 00:00:00 2001 From: Matthew Tovbin Date: Tue, 2 Apr 2019 20:25:13 -0700 Subject: [PATCH 3/7] tests --- .../op/local/MLeapModelConverter.scala | 92 ++++++++++++++++++ .../op/local/OpWorkflowModelLocal.scala | 57 ----------- .../op/local/MLeapModelConverterTest.scala | 96 +++++++++++++++++++ 3 files changed, 188 insertions(+), 57 deletions(-) create mode 100644 local/src/main/scala/com/salesforce/op/local/MLeapModelConverter.scala create mode 100644 local/src/test/scala/com/salesforce/op/local/MLeapModelConverterTest.scala diff --git a/local/src/main/scala/com/salesforce/op/local/MLeapModelConverter.scala b/local/src/main/scala/com/salesforce/op/local/MLeapModelConverter.scala new file mode 100644 index 0000000000..09f0d1dadf --- /dev/null +++ b/local/src/main/scala/com/salesforce/op/local/MLeapModelConverter.scala @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2017, Salesforce.com, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * * Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.salesforce.op.local + +import ml.combust.mleap.core.feature._ +import org.apache.spark.ml.linalg.Vector + +/** + * Converter of MLeap model instances to a model apply function + */ +case object MLeapModelConverter { + + /** + * Convert MLeap model instance to a model apply function + * + * @param model MLeap model + * @throws RuntimeException if model type is not supported + * @return runnable model apply function + */ + def modelToFunction(model: Any): Array[Any] => Any = model match { + case m: BinarizerModel => x => m.apply(x(0).asInstanceOf[Number].doubleValue()) + case m: BucketedRandomProjectionLSHModel => x => m.apply(x(0).asInstanceOf[Vector]) + case m: BucketizerModel => x => m.apply(x(0).asInstanceOf[Number].doubleValue()) + case m: ChiSqSelectorModel => x => m.apply(x(0).asInstanceOf[Vector]) + case m: CoalesceModel => x => m.apply(x: _*) + case m: CountVectorizerModel => x => m.apply(x(0).asInstanceOf[Seq[String]]) + case m: DCTModel => x => m.apply(x(0).asInstanceOf[Vector]) + case m: ElementwiseProductModel => x => m.apply(x(0).asInstanceOf[Vector]) + case m: FeatureHasherModel => x => m.apply(x(0).asInstanceOf[Seq[Any]]) + case m: HashingTermFrequencyModel => x => m.apply(x(0).asInstanceOf[Seq[Any]]) + case m: IDFModel => x => m.apply(x(0).asInstanceOf[Vector]) + case m: ImputerModel => x => m.apply(x(0).asInstanceOf[Number].doubleValue()) + case m: InteractionModel => x => m.apply(x(0).asInstanceOf[Seq[Any]]) + case m: MathBinaryModel => x => + m.apply( + x.headOption.map(_.asInstanceOf[Number].doubleValue()), + x.lastOption.map(_.asInstanceOf[Number].doubleValue()) + ) + case m: MathUnaryModel => x => m.apply(x(0).asInstanceOf[Number].doubleValue()) + case m: MaxAbsScalerModel => x => m.apply(x(0).asInstanceOf[Vector]) + case m: MinHashLSHModel => x => m.apply(x(0).asInstanceOf[Vector]) + case m: MinMaxScalerModel => x => m.apply(x(0).asInstanceOf[Vector]) + case m: NGramModel => x => m.apply(x(0).asInstanceOf[Seq[String]]) + case m: NormalizerModel => x => m.apply(x(0).asInstanceOf[Vector]) + case m: OneHotEncoderModel => x => m.apply(x(0).asInstanceOf[Vector].toArray) + case m: PcaModel => x => m.apply(x(0).asInstanceOf[Vector]) + case m: PolynomialExpansionModel => x => m.apply(x(0).asInstanceOf[Vector]) + case m: RegexIndexerModel => x => m.apply(x(0).toString) + case m: RegexTokenizerModel => x => m.apply(x(0).toString) + case m: ReverseStringIndexerModel => x => m.apply(x(0).asInstanceOf[Number].intValue()) + case m: StandardScalerModel => x => m.apply(x(0).asInstanceOf[Vector]) + case m: StopWordsRemoverModel => x => m.apply(x(0).asInstanceOf[Seq[String]]) + case m: StringIndexerModel => x => m.apply(x(0)) + case m: StringMapModel => x => m.apply(x(0).toString) + case m: TokenizerModel => x => m.apply(x(0).toString) + case m: VectorAssemblerModel => x => m.apply(x(0).asInstanceOf[Seq[Any]]) + case m: VectorIndexerModel => x => m.apply(x(0).asInstanceOf[Vector]) + case m: VectorSlicerModel => x => m.apply(x(0).asInstanceOf[Vector]) + case m: WordLengthFilterModel => x => m.apply(x(0).asInstanceOf[Seq[String]]) + case m: WordToVectorModel => x => m.apply(x(0).asInstanceOf[Seq[String]]) + case m => throw new RuntimeException(s"Unsupported MLeap model: ${m.getClass.getName}") + } + +} diff --git a/local/src/main/scala/com/salesforce/op/local/OpWorkflowModelLocal.scala b/local/src/main/scala/com/salesforce/op/local/OpWorkflowModelLocal.scala index 16905de5bc..dde29c69b7 100644 --- a/local/src/main/scala/com/salesforce/op/local/OpWorkflowModelLocal.scala +++ b/local/src/main/scala/com/salesforce/op/local/OpWorkflowModelLocal.scala @@ -40,11 +40,9 @@ import com.salesforce.op.stages.sparkwrappers.generic.SparkWrapperParams import com.salesforce.op.stages.{OPStage, OpTransformer} import ml.combust.bundle.serializer.SerializationFormat import ml.combust.bundle.{BundleContext, BundleRegistry} -import ml.combust.mleap.core.feature._ import ml.combust.mleap.runtime.MleapContext import org.apache.spark.ml.Transformer import org.apache.spark.ml.bundle.SparkBundleContext -import org.apache.spark.ml.linalg.Vector import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.{DataFrame, Row, SparkSession} @@ -202,58 +200,3 @@ trait OpWorkflowModelLocal extends Serializable { } } - -private case object MLeapModelConverter { - - /** - * Convert MLeap model instance to a model apply function - * - * @param model MLeap model - * @throws RuntimeException if model type is not supported - * @return runnable model apply function - */ - def modelToFunction(model: Any): Array[Any] => Any = model match { - case m: BinarizerModel => x => m.apply(x(0).asInstanceOf[Number].doubleValue()) - case m: BucketedRandomProjectionLSHModel => x => m.apply(x(0).asInstanceOf[Vector]) - case m: BucketizerModel => x => m.apply(x(0).asInstanceOf[Number].doubleValue()) - case m: ChiSqSelectorModel => x => m.apply(x(0).asInstanceOf[Vector]) - case m: CoalesceModel => x => m.apply(x: _*) - case m: CountVectorizerModel => x => m.apply(x(0).asInstanceOf[Seq[String]]) - case m: DCTModel => x => m.apply(x(0).asInstanceOf[Vector]) - case m: ElementwiseProductModel => x => m.apply(x(0).asInstanceOf[Vector]) - case m: FeatureHasherModel => x => m.apply(x(0).asInstanceOf[Seq[Any]]) - case m: HashingTermFrequencyModel => x => m.apply(x(0).asInstanceOf[Seq[Any]]) - case m: IDFModel => x => m.apply(x(0).asInstanceOf[Vector]) - case m: ImputerModel => x => m.apply(x(0).asInstanceOf[Number].doubleValue()) - case m: InteractionModel => x => m.apply(x(0).asInstanceOf[Seq[Any]]) - case m: MathBinaryModel => x => - m.apply( - x.headOption.map(_.asInstanceOf[Number].doubleValue()), - x.lastOption.map(_.asInstanceOf[Number].doubleValue()) - ) - case m: MathUnaryModel => x => m.apply(x(0).asInstanceOf[Number].doubleValue()) - case m: MaxAbsScalerModel => x => m.apply(x(0).asInstanceOf[Vector]) - case m: MinHashLSHModel => x => m.apply(x(0).asInstanceOf[Vector]) - case m: MinMaxScalerModel => x => m.apply(x(0).asInstanceOf[Vector]) - case m: NGramModel => x => m.apply(x(0).asInstanceOf[Seq[String]]) - case m: NormalizerModel => x => m.apply(x(0).asInstanceOf[Vector]) - case m: OneHotEncoderModel => x => m.apply(x(0).asInstanceOf[Vector].toArray) - case m: PcaModel => x => m.apply(x(0).asInstanceOf[Vector]) - case m: PolynomialExpansionModel => x => m.apply(x(0).asInstanceOf[Vector]) - case m: RegexIndexerModel => x => m.apply(x(0).toString) - case m: RegexTokenizerModel => x => m.apply(x(0).toString) - case m: ReverseStringIndexerModel => x => m.apply(x(0).asInstanceOf[Number].intValue()) - case m: StandardScalerModel => x => m.apply(x(0).asInstanceOf[Vector]) - case m: StopWordsRemoverModel => x => m.apply(x(0).asInstanceOf[Seq[String]]) - case m: StringIndexerModel => x => m.apply(x(0)) - case m: StringMapModel => x => m.apply(x(0).toString) - case m: TokenizerModel => x => m.apply(x(0).toString) - case m: VectorAssemblerModel => x => m.apply(x(0).asInstanceOf[Seq[Any]]) - case m: VectorIndexerModel => x => m.apply(x(0).asInstanceOf[Vector]) - case m: VectorSlicerModel => x => m.apply(x(0).asInstanceOf[Vector]) - case m: WordLengthFilterModel => x => m.apply(x(0).asInstanceOf[Seq[String]]) - case m: WordToVectorModel => x => m.apply(x(0).asInstanceOf[Seq[String]]) - case m => throw new RuntimeException(s"Unsupported MLeap model: ${m.getClass.getName}") - } - -} diff --git a/local/src/test/scala/com/salesforce/op/local/MLeapModelConverterTest.scala b/local/src/test/scala/com/salesforce/op/local/MLeapModelConverterTest.scala new file mode 100644 index 0000000000..d0041aebca --- /dev/null +++ b/local/src/test/scala/com/salesforce/op/local/MLeapModelConverterTest.scala @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2017, Salesforce.com, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * * Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.salesforce.op.local + +import com.salesforce.op.test.TestCommon +import ml.combust.mleap.core.feature._ +import ml.combust.mleap.core.types.ScalarShape +import org.apache.spark.ml.linalg.{DenseMatrix, Vectors} +import org.junit.runner.RunWith +import org.scalatest.PropSpec +import org.scalatest.junit.JUnitRunner +import org.scalatest.prop.PropertyChecks + +@RunWith(classOf[JUnitRunner]) +class MLeapModelConverterTest extends PropSpec with PropertyChecks with TestCommon { + + val mleapModels = Table("mleapModels", + BinarizerModel(0.0, ScalarShape()), + BucketedRandomProjectionLSHModel(Seq(), 0.0, 0), + BucketizerModel(Array.empty), + ChiSqSelectorModel(Seq(), 0), + CoalesceModel(Seq()), + CountVectorizerModel(Array.empty, false, 0.0), + DCTModel(false, 0), + ElementwiseProductModel(Vectors.zeros(0)), + FeatureHasherModel(0, Seq(), Seq(), Seq()), + HashingTermFrequencyModel(), + IDFModel(Vectors.zeros(0)), + ImputerModel(0.0, 0.0, ""), + InteractionModel(Array(), Seq()), + MathBinaryModel(BinaryOperation.Add), + MathUnaryModel(UnaryOperation.Log), + MaxAbsScalerModel(Vectors.zeros(0)), + MinHashLSHModel(Seq(), 0), + MinMaxScalerModel(Vectors.zeros(0), Vectors.zeros(0)), + NGramModel(0), + NormalizerModel(0.0, 0), + OneHotEncoderModel(Array()), + PcaModel(DenseMatrix.zeros(0, 0)), + PolynomialExpansionModel(0, 0), + RegexIndexerModel(Seq(), None), + RegexTokenizerModel(".*".r), + ReverseStringIndexerModel(Seq()), + StandardScalerModel(Some(Vectors.dense(Array(1.0))), Some(Vectors.dense(Array(1.0)))), + StopWordsRemoverModel(Seq(), false), + StringIndexerModel(Seq()), + StringMapModel(Map()), + TokenizerModel(), + VectorAssemblerModel(Seq()), + VectorIndexerModel(0, Map()), + VectorSlicerModel(Array(), Array(), 0), + WordLengthFilterModel(), + WordToVectorModel(Map("a" -> 1), Array(1)) + ) + + property("convert mleap models to functions") { + forAll(mleapModels) { m => + val fn = MLeapModelConverter.modelToFunction(m) + fn shouldBe a[Function[_, _]] + } + } + + property("error on unsupported models") { + the[RuntimeException] thrownBy MLeapModelConverter.modelToFunction(model = "not at model") should have message + "Unsupported MLeap model: java.lang.String" + } + +} From 5067d5a08ba84028aea6e415dc39fc2a9627a97a Mon Sep 17 00:00:00 2001 From: Matthew Tovbin Date: Tue, 2 Apr 2019 20:32:34 -0700 Subject: [PATCH 4/7] Update README.md --- local/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/local/README.md b/local/README.md index c9e4cb8c3b..26138aad6d 100644 --- a/local/README.md +++ b/local/README.md @@ -32,7 +32,7 @@ Or using the local runner: ```scala val scoreFn = new OpWorkflowRunnerLocal(workflow).scoreFunction(opParams) ``` -Note: Spark Session is only required for loading the model & preparing the scoring function. Once scoring function is returned the Spark Session can be shutdown since it's not required during local scoring. +**Note**: *Spark Session is only required for loading the model & preparing the scoring function. Once scoring function is returned the Spark Session can be shutdown since it's not required during local scoring.* ## Performance Results From 4b13d4ddb9d765ebe12f95f3f9d79f648965998c Mon Sep 17 00:00:00 2001 From: Matthew Tovbin Date: Wed, 3 Apr 2019 11:12:32 -0700 Subject: [PATCH 5/7] forgot to commit a test --- .../types/FeatureTypeSparkConverterTest.scala | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/features/src/test/scala/com/salesforce/op/features/types/FeatureTypeSparkConverterTest.scala b/features/src/test/scala/com/salesforce/op/features/types/FeatureTypeSparkConverterTest.scala index bb1939924a..0577c7f31d 100644 --- a/features/src/test/scala/com/salesforce/op/features/types/FeatureTypeSparkConverterTest.scala +++ b/features/src/test/scala/com/salesforce/op/features/types/FeatureTypeSparkConverterTest.scala @@ -153,11 +153,23 @@ class FeatureTypeSparkConverterTest FeatureTypeSparkConverter.toSpark(rn.doubleValue().toReal) shouldEqual rn } } - property("raises error on invalid real numbers") { + property("converts natural numbers to Real feature type") { + forAll(naturalNumbers) { rn => + FeatureTypeSparkConverter[Real]().fromSpark(rn) shouldBe rn.doubleValue().toReal + FeatureTypeSparkConverter.toSpark(rn.doubleValue().toReal) shouldEqual rn + } + } + property("converts natural numbers to RealNN feature type") { forAll(naturalNumbers) { rn => - intercept[IllegalArgumentException](FeatureTypeSparkConverter[Real]().fromSpark(rn)) + FeatureTypeSparkConverter[RealNN]().fromSpark(rn) shouldBe rn.doubleValue().toReal + FeatureTypeSparkConverter.toSpark(rn.doubleValue().toReal) shouldEqual rn + } + } + property("raises error on invalid real numbers") { + forAll(booleans) { b => + intercept[IllegalArgumentException](FeatureTypeSparkConverter[Real]().fromSpark(b)) .getMessage startsWith "Real type mapping is not defined" - intercept[IllegalArgumentException](FeatureTypeSparkConverter[RealNN]().fromSpark(rn)) + intercept[IllegalArgumentException](FeatureTypeSparkConverter[RealNN]().fromSpark(b)) .getMessage startsWith "RealNN type mapping is not defined" } } From e5fc9298bdf3e28fbce28deaf917fcb2a17c599b Mon Sep 17 00:00:00 2001 From: Matthew Tovbin Date: Thu, 4 Apr 2019 20:40:37 -0700 Subject: [PATCH 6/7] Update README.md --- local/README.md | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/local/README.md b/local/README.md index 26138aad6d..f5124bda27 100644 --- a/local/README.md +++ b/local/README.md @@ -22,8 +22,20 @@ Then in your code you may load and score models as follows: ```scala import com.salesforce.op.local._ +// Spark Session needed for model loading & score function creation +implicit val spark = SparkSession.builder().getOrCreate() + +// Create your workflow & load the model +val workflow: OpWorkflow = ... val model = workflow.loadModel("/path/to/model") -val scoreFn = model.scoreFunction // create score function once and then use it indefinitely + +// Create score function once and use it indefinitely +val scoreFn = model.scoreFunction + +// Spark Session can be stopped now since it's not required during local scoring +spark.stop() + +// Compute scores with score function val rawData = Seq(Map("name" -> "Peter", "age" -> 18), Map("name" -> "John", "age" -> 23)) val scores = rawData.map(scoreFn) ``` From b03e76c30d35fafaff9548d1ee0ba2551bd570a2 Mon Sep 17 00:00:00 2001 From: Matthew Tovbin Date: Thu, 4 Apr 2019 20:43:48 -0700 Subject: [PATCH 7/7] Update README.md --- local/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/local/README.md b/local/README.md index f5124bda27..6e2f34cafc 100644 --- a/local/README.md +++ b/local/README.md @@ -1,7 +1,7 @@ # TransmogrifAI Local -This module enables local scoring with TransmogrifAI models without the need for a Spark session. -Instead it applies a combination of TransmogrifAI's transformer interface and [MLeap](https://github.com/combust/mleap) runtime on JVM. It delivers unprecedented portability and performance of TransmogrifAI models allowing the serving of scores from any JVM process. +This module enables local scoring with TransmogrifAI models without the need for Spark Session during scoring. +Instead it implementes local inference by applying TransmogrifAI's transformers and [MLeap](https://github.com/combust/mleap) runtime on JVM. It delivers unprecedented portability and performance of TransmogrifAI models allowing the serving of scores from any JVM process. ## Usage