# Environment

In [None]:
%AddDeps ml.dmlc xgboost4j-spark_2.12 3.1.1 --transitive

In [None]:
%AddJar https://github.com/jpmml/jpmml-sparkml/releases/download/3.0.10/pmml-sparkml-example-executable-3.0.10.jar

# Dataset

In [None]:
import org.apache.spark.sql.DataFrame

def load_df(name: String): DataFrame = {
    val df = spark.read
        .format("csv")
        .option("header", "true")
        .option("inferSchema", "true")
        // Convert missing values to null references, not Double.NaN values
        .option("nullValue", "N/A")
        .load(name)
    df
}

# Workflow

## Option A: without missing values

In [None]:
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.FloatType
import org.jpmml.sparkml.feature.VectorDensifier
import ml.dmlc.xgboost4j.scala.spark.XGBoostRegressor

val labelCol = "mpg"
val catCols = Array("cylinders", "model_year", "origin")
val contCols = Array("acceleration", "displacement", "horsepower", "weight")

val df = load_df("Auto.csv")
    .withColumn(labelCol, col(labelCol).cast(FloatType))

df.printSchema()

val catIndexer = new StringIndexer()
    .setInputCols(catCols)
    .setOutputCols(catCols.map(_ + "Indexed"))

val vecAssembler = new VectorAssembler()
    .setInputCols(catIndexer.getOutputCols ++ contCols)

val vecDensifier = new VectorDensifier()
    .setInputCol(vecAssembler.getOutputCol)
    .setOutputCol(vecAssembler.getOutputCol + "Dense")

val featureTypes: Array[String] = catIndexer.getOutputCols.map(_ => "c") ++ contCols.map(_ => "q")

val regressor = new XGBoostRegressor()
    .setLabelCol(labelCol)
    .setFeaturesCol(vecDensifier.getOutputCol)
    .setFeatureTypes(featureTypes)
    .setMaxDepth(3)
    .setNumRound(11)

val pipeline = new Pipeline()
    .setStages(Array(catIndexer, vecAssembler, vecDensifier, regressor))

val pipelineModel = pipeline.fit(df)

## Option B: with missing values

In [None]:
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.FloatType
import org.jpmml.sparkml.feature.{InvalidCategoryTransformer, VectorDensifier}
import ml.dmlc.xgboost4j.scala.spark.XGBoostRegressor

val labelCol = "mpg"
val catCols = Array("cylinders", "model_year", "origin")
val contCols = Array("acceleration", "displacement", "horsepower", "weight")

val df = load_df("AutoNA.csv")
    .withColumn(labelCol, col(labelCol).cast(FloatType))

df.printSchema()

val catIndexer = new StringIndexer()
    .setInputCols(catCols)
    .setOutputCols(catCols.map(_ + "Indexed"))
    // Pass through null values
    .setHandleInvalid("keep")

// Drop the pseudo-category that was added by StringIndexer, because it will mess up XGBoost category indices
val catIndexTransformer = new InvalidCategoryTransformer()
    .setInputCols(catIndexer.getOutputCols)
    .setOutputCols(catIndexer.getOutputCols.map(_ + "Fixed"))

val vecAssembler = new VectorAssembler()
    .setInputCols(catIndexTransformer.getOutputCols ++ contCols)
    // Allow passed-through null values
    .setHandleInvalid("keep")

val vecDensifier = new VectorDensifier()
    .setInputCol(vecAssembler.getOutputCol)
    .setOutputCol(vecAssembler.getOutputCol + "Dense")

val featureTypes: Array[String] = catIndexer.getOutputCols.map(_ => "c") ++ contCols.map(_ => "q")

val regressor = new XGBoostRegressor()
    .setLabelCol(labelCol)
    .setFeaturesCol(vecDensifier.getOutputCol)
    .setFeatureTypes(featureTypes)
    .setMaxDepth(3)
    .setNumRound(11)

val pipeline = new Pipeline()
    .setStages(Array(catIndexer, catIndexTransformer, vecAssembler, vecDensifier, regressor))

val pipelineModel = pipeline.fit(df)

# Export to PMML

In [None]:
import org.jpmml.sparkml.PMMLBuilder
import org.jpmml.sparkml.model.HasPredictionModelOptions
import org.jpmml.xgboost.HasXGBoostOptions

val pmmlBuilder = new PMMLBuilder(df.schema, pipelineModel)
    // Apache Spark options
    .putOption(HasPredictionModelOptions.OPTION_KEEP_PREDICTIONCOL, false)
    // XGBoost options
    .putOption(HasXGBoostOptions.OPTION_INPUT_FLOAT, true)
    .putOption(HasXGBoostOptions.OPTION_COMPACT, false)

println(pmmlBuilder.buildString)