Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correct conversion of Spark model stages into MLeap local models #261

Merged
merged 12 commits into from
Apr 6, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.scalatest.junit.JUnitRunner
import com.salesforce.op.utils.spark.RichDataset._

@RunWith(classOf[JUnitRunner])
class OpStringIndexerTest extends FlatSpec with TestSparkContext{
class OpStringIndexerTest extends FlatSpec with TestSparkContext {

val txtData = Seq("a", "b", "c", "a", "a", "c").map(_.toText)
val (ds, txtF) = TestFeatureBuilder(txtData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,15 @@ case object FeatureTypeSparkConverter {
case null => None
case v: Float => Some(v.toDouble)
case v: Double => Some(v)
case v: Number => Some(v.doubleValue())
case v => throw new IllegalArgumentException(s"RealNN type mapping is not defined for ${v.getClass}")
}
case wt if wt <:< weakTypeOf[t.Real] => (value: Any) =>
value match {
case null => FeatureTypeDefaults.Real.value
case v: Float => Some(v.toDouble)
case v: Double => Some(v)
case v: Number => Some(v.doubleValue())
case v => throw new IllegalArgumentException(s"Real type mapping is not defined for ${v.getClass}")
}
case wt if wt <:< weakTypeOf[t.Integral] => (value: Any) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,23 @@ class FeatureTypeSparkConverterTest
FeatureTypeSparkConverter.toSpark(rn.doubleValue().toReal) shouldEqual rn
}
}
property("raises error on invalid real numbers") {
property("converts natural numbers to Real feature type") {
forAll(naturalNumbers) { rn =>
FeatureTypeSparkConverter[Real]().fromSpark(rn) shouldBe rn.doubleValue().toReal
FeatureTypeSparkConverter.toSpark(rn.doubleValue().toReal) shouldEqual rn
}
}
property("converts natural numbers to RealNN feature type") {
forAll(naturalNumbers) { rn =>
intercept[IllegalArgumentException](FeatureTypeSparkConverter[Real]().fromSpark(rn))
FeatureTypeSparkConverter[RealNN]().fromSpark(rn) shouldBe rn.doubleValue().toReal
FeatureTypeSparkConverter.toSpark(rn.doubleValue().toReal) shouldEqual rn
}
}
property("raises error on invalid real numbers") {
forAll(booleans) { b =>
intercept[IllegalArgumentException](FeatureTypeSparkConverter[Real]().fromSpark(b))
.getMessage startsWith "Real type mapping is not defined"
intercept[IllegalArgumentException](FeatureTypeSparkConverter[RealNN]().fromSpark(rn))
intercept[IllegalArgumentException](FeatureTypeSparkConverter[RealNN]().fromSpark(b))
.getMessage startsWith "RealNN type mapping is not defined"
}
}
Expand Down
22 changes: 17 additions & 5 deletions local/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# TransmogrifAI Local

This module enables local scoring with TransmogrifAI models without the need for a Spark session.
Instead it applies a combination of TransmogrifAI's transformer interface and [MLeap](https://github.com/combust/mleap) runtime on JVM. It delivers unprecedented portability and performance of TransmogrifAI models allowing the serving of scores from any JVM process.
This module enables local scoring with TransmogrifAI models without the need for Spark Session during scoring.
Instead it implementes local inference by applying TransmogrifAI's transformers and [MLeap](https://github.com/combust/mleap) runtime on JVM. It delivers unprecedented portability and performance of TransmogrifAI models allowing the serving of scores from any JVM process.

## Usage

Expand All @@ -22,17 +22,29 @@ Then in your code you may load and score models as follows:
```scala
import com.salesforce.op.local._

// Spark Session needed for model loading & score function creation
implicit val spark = SparkSession.builder().getOrCreate()

// Create your workflow & load the model
val workflow: OpWorkflow = ...
val model = workflow.loadModel("/path/to/model")
val scoreFn = model.scoreFunction // create score function once and then use it indefinitely

// Create score function once and use it indefinitely
val scoreFn = model.scoreFunction

// Spark Session can be stopped now since it's not required during local scoring
spark.stop()

// Compute scores with score function
val rawData = Seq(Map("name" -> "Peter", "age" -> 18), Map("name" -> "John", "age" -> 23))
val scores = rawData.map(scoreFn)
```

Or using the local runner:
```scala
val scoreFn = new OpWorkflowRunnerLocal(workflow).score(opParams)
val scoreFn = new OpWorkflowRunnerLocal(workflow).scoreFunction(opParams)
```

**Note**: *Spark Session is only required for loading the model & preparing the scoring function. Once scoring function is returned the Spark Session can be shutdown since it's not required during local scoring.*

## Performance Results

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright (c) 2017, Salesforce.com, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* * Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package com.salesforce.op.local

import ml.combust.mleap.core.feature._
import org.apache.spark.ml.linalg.Vector

/**
* Converter of MLeap model instances to a model apply function
*/
case object MLeapModelConverter {

/**
* Convert MLeap model instance to a model apply function
*
* @param model MLeap model
* @throws RuntimeException if model type is not supported
* @return runnable model apply function
*/
def modelToFunction(model: Any): Array[Any] => Any = model match {
case m: BinarizerModel => x => m.apply(x(0).asInstanceOf[Number].doubleValue())
case m: BucketedRandomProjectionLSHModel => x => m.apply(x(0).asInstanceOf[Vector])
case m: BucketizerModel => x => m.apply(x(0).asInstanceOf[Number].doubleValue())
case m: ChiSqSelectorModel => x => m.apply(x(0).asInstanceOf[Vector])
case m: CoalesceModel => x => m.apply(x: _*)
case m: CountVectorizerModel => x => m.apply(x(0).asInstanceOf[Seq[String]])
case m: DCTModel => x => m.apply(x(0).asInstanceOf[Vector])
case m: ElementwiseProductModel => x => m.apply(x(0).asInstanceOf[Vector])
case m: FeatureHasherModel => x => m.apply(x(0).asInstanceOf[Seq[Any]])
case m: HashingTermFrequencyModel => x => m.apply(x(0).asInstanceOf[Seq[Any]])
case m: IDFModel => x => m.apply(x(0).asInstanceOf[Vector])
case m: ImputerModel => x => m.apply(x(0).asInstanceOf[Number].doubleValue())
case m: InteractionModel => x => m.apply(x(0).asInstanceOf[Seq[Any]])
case m: MathBinaryModel => x =>
m.apply(
x.headOption.map(_.asInstanceOf[Number].doubleValue()),
x.lastOption.map(_.asInstanceOf[Number].doubleValue())
)
case m: MathUnaryModel => x => m.apply(x(0).asInstanceOf[Number].doubleValue())
case m: MaxAbsScalerModel => x => m.apply(x(0).asInstanceOf[Vector])
case m: MinHashLSHModel => x => m.apply(x(0).asInstanceOf[Vector])
case m: MinMaxScalerModel => x => m.apply(x(0).asInstanceOf[Vector])
case m: NGramModel => x => m.apply(x(0).asInstanceOf[Seq[String]])
case m: NormalizerModel => x => m.apply(x(0).asInstanceOf[Vector])
case m: OneHotEncoderModel => x => m.apply(x(0).asInstanceOf[Vector].toArray)
case m: PcaModel => x => m.apply(x(0).asInstanceOf[Vector])
case m: PolynomialExpansionModel => x => m.apply(x(0).asInstanceOf[Vector])
case m: RegexIndexerModel => x => m.apply(x(0).toString)
case m: RegexTokenizerModel => x => m.apply(x(0).toString)
case m: ReverseStringIndexerModel => x => m.apply(x(0).asInstanceOf[Number].intValue())
case m: StandardScalerModel => x => m.apply(x(0).asInstanceOf[Vector])
case m: StopWordsRemoverModel => x => m.apply(x(0).asInstanceOf[Seq[String]])
case m: StringIndexerModel => x => m.apply(x(0))
case m: StringMapModel => x => m.apply(x(0).toString)
case m: TokenizerModel => x => m.apply(x(0).toString)
case m: VectorAssemblerModel => x => m.apply(x(0).asInstanceOf[Seq[Any]])
case m: VectorIndexerModel => x => m.apply(x(0).asInstanceOf[Vector])
case m: VectorSlicerModel => x => m.apply(x(0).asInstanceOf[Vector])
case m: WordLengthFilterModel => x => m.apply(x(0).asInstanceOf[Seq[String]])
case m: WordToVectorModel => x => m.apply(x(0).asInstanceOf[Seq[String]])
case m => throw new RuntimeException(s"Unsupported MLeap model: ${m.getClass.getName}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so every wrapped spark stage has to be in this list? we should add that the the docs on wrapping...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I currently added all the stages from features package. We can also add models from classification, regression and recommendation packages, but we already have the first two of them covered as our own OpTransformer stages, so I did not see much of a point adding them.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so to your question - for right now I think we have everything covered, except recommenders, which I am planning to add once we are ready.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a todo with the classification and regression models? I dont know that this will be much use without them...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding those is very easy. the thing is we already have classification and regression models as OpTransformers so MLeap won’t be used to run them.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point :-)

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,21 @@

package com.salesforce.op.local


import java.nio.file.Paths

import com.github.marschall.memoryfilesystem.MemoryFileSystemBuilder
import com.salesforce.op.OpWorkflowModel
import com.salesforce.op.features.FeatureSparkTypes
import com.salesforce.op.stages.sparkwrappers.generic.SparkWrapperParams
import com.salesforce.op.stages.{OPStage, OpTransformer}
import com.salesforce.op.utils.reflection.ReflectionUtils.reflectMethod
import ml.combust.bundle.serializer.SerializationFormat
import ml.combust.bundle.{BundleContext, BundleRegistry}
import ml.combust.mleap.runtime.MleapContext
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.bundle.SparkBundleContext
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

import scala.collection.mutable

Expand Down Expand Up @@ -77,26 +79,35 @@ trait OpWorkflowModelLocal extends Serializable {
(
inputs: Array[String],
output: String,
modelFn: Array[AnyRef] => AnyRef
modelFn: Array[Any] => Any
)

/**
* Prepares a score function for local scoring
*
* @param spark Spark Session needed for preparing scoring function,
* Once scoring function is returned the Spark Session can be shutdown
* since it's not required during local scoring.
* @return score function for local scoring
*/
def scoreFunction: ScoreFunction = {
def scoreFunction(implicit spark: SparkSession): ScoreFunction = {
// Prepare the stages for scoring
val stagesWithIndex = model.stages.zipWithIndex

// Prepare an empty DataFrame with transformed schema & metadata (needed for loading MLeap models)
val transformedData = makeTransformedDataFrame(model)

// Collect all OP stages
val opStages = stagesWithIndex.collect { case (s: OpTransformer, i) => OPModel(s.getOutputFeatureName, s) -> i }

// Collect all Spark wrapped stages
val sparkStages = stagesWithIndex.filterNot(_._1.isInstanceOf[OpTransformer]).collect {
case (s: OPStage with SparkWrapperParams[_], i) if s.getSparkMlStage().isDefined =>
(s, s.getSparkMlStage().get.asInstanceOf[Transformer].copy(ParamMap.empty), i)
case (opStage: OPStage with SparkWrapperParams[_], i) if opStage.getSparkMlStage().isDefined =>
val sparkStage = opStage.getSparkMlStage().get.asInstanceOf[Transformer]
(opStage, sparkStage, i)
}
// Convert Spark wrapped stages into MLeap models
val mleapStages = toMLeapModels(sparkStages)
val mleapStages = toMLeapModels(sparkStages, transformedData)

// Combine all stages and apply the original order
val allStages = (opStages ++ mleapStages).sortBy(_._2).map(_._1)
Expand All @@ -114,22 +125,37 @@ trait OpWorkflowModelLocal extends Serializable {
case (row, MLeapModel(inputs, output, modelFn)) =>
val in = inputs.map(inputName => row.get(inputName) match {
case None | Some(null) => null
case Some(v) => v.asInstanceOf[AnyRef]
case Some(v) => v
})
row += output -> modelFn(in)
}

// Only return the result features of the model
transformedRow.filterKeys(resultFeatures.contains).toMap
}
}

/**
* Convert Spark wrapped stages into MLeal local Models
* Prepares an empty DataFrame with transformed schema & metadata (needed for loading MLeap models)
*/
private def makeTransformedDataFrame(model: OpWorkflowModel)(implicit spark: SparkSession): DataFrame = {
val rawSchema = FeatureSparkTypes.toStructType(model.rawFeatures: _*)
val df = spark.emptyDataset[Row](RowEncoder(rawSchema))
model.stages.collect { case t: Transformer => t }.foldLeft(df) { case (d, t) => t.transform(d) }
}

/**
* Convert Spark wrapped stages into MLeap local Models
*
* @param sparkStages stages to convert
* @param sparkStages stages to convert
* @param transformedData dataset with transformed schema & metadata (needed for loading MLeap models)
* @return MLeap local stages
*/
private def toMLeapModels(sparkStages: Seq[(OPStage, Transformer, Int)]): Seq[(MLeapModel, Int)] = {
private def toMLeapModels
(
sparkStages: Seq[(OPStage, Transformer, Int)],
transformedData: DataFrame
): Seq[(MLeapModel, Int)] = {
// Setup a in-memory file system for MLeap model saving/loading
val emptyPath = Paths.get("")
val fs = MemoryFileSystemBuilder.newEmpty().build()
Expand All @@ -138,15 +164,16 @@ trait OpWorkflowModelLocal extends Serializable {
val mleapRegistry = BundleRegistry("ml.combust.mleap.registry.default")
val sparkRegistry = BundleRegistry("ml.combust.mleap.spark.registry.default")

// TODO - consider defining an empty Dataset with correct schema, since some Spark stages might fail to convert
val sparkBundleContext = BundleContext[SparkBundleContext](
SparkBundleContext(dataset = None, sparkRegistry), SerializationFormat.Json, sparkRegistry, fs, emptyPath)
SparkBundleContext(Option(transformedData), sparkRegistry),
SerializationFormat.Json, sparkRegistry, fs, emptyPath
)
val mleapBundleContext = BundleContext[MleapContext](
MleapContext(mleapRegistry), SerializationFormat.Json, mleapRegistry, fs, emptyPath)

for {
(opStage, sparkStage, i) <- sparkStages
} yield {
} yield try {
val model = {
// Serialize Spark model using Spark registry
val opModel = sparkRegistry.opForObj[SparkBundleContext, AnyRef, AnyRef](sparkStage)
Expand All @@ -157,15 +184,19 @@ trait OpWorkflowModelLocal extends Serializable {
val mleapLocalModel = mleapRegistry.model[MleapContext, AnyRef](op = serializedModel.op)
mleapLocalModel.load(serializedModel)(mleapBundleContext)
}
// Reflect the apply method on MLeap local model
val applyMethodMirror = reflectMethod(model, "apply")
val modelFn = (x: Array[AnyRef]) => applyMethodMirror.apply(x: _*).asInstanceOf[AnyRef]
val inputs = opStage.getTransientFeatures().map(_.name)
val output = opStage.getOutputFeatureName

MLeapModel(inputs, output, modelFn) -> i
// Prepare and return MLeap model with inputs, output and model function
MLeapModel(
inputs = opStage.getTransientFeatures().map(_.name),
output = opStage.getOutputFeatureName,
modelFn = MLeapModelConverter.modelToFunction(model)
) -> i
} catch {
case e: Exception =>
throw new RuntimeException(s"Failed to convert stage '${opStage.uid}' to MLeap stage", e)
}
}


}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
package com.salesforce.op.local

import com.salesforce.op.{OpParams, OpWorkflow}
import org.apache.spark.sql.SparkSession


/**
Expand All @@ -52,9 +53,12 @@ class OpWorkflowRunnerLocal(val workflow: OpWorkflow) extends Serializable {
* SparkSession.builder().getOrCreate().stop()
*
* @param params params to use during scoring
* @param spark Spark Session needed for preparing scoring function.
* Once scoring function is returned the Spark Session can be shutdown
* since it's not required during local scoring.
* @return score function for local scoring
*/
def score(params: OpParams): ScoreFunction = {
def scoreFunction(params: OpParams)(implicit spark: SparkSession): ScoreFunction = {
require(params.modelLocation.isDefined, "Model location must be set in params")
val model = workflow.loadModel(params.modelLocation.get)
model.scoreFunction
Expand Down
Loading