## Training XGBoost4J-Spark with PySpark

#### Introduction to XGBoost

XGBoost stands for “eXtreme Gradient Boosting” and is an open-source machine learning library that implements optimized distributed gradient boosting algorithms. It has become one of the most popular ML libraries because of its fast performance and high scalability. One of its important features is its portability- being able to use the same code on many distributed environments such as Kubernetes, Hadoop, Dask, or MPI. However, deploying distributed XGBoost on Spark can be more complicated as it requires using the XGBoost4J-Spark package, which can be difficult to integrate with Python and MLflow. To showcase how to solve these issues, this notebook will begin with an overview of single-node XGBoost and MLflow, how to build with XGBoost4J-Spark and integrate it with MLflow, and how to create a PySpark wrapper around XGBoost4J-Spark.

#### How does XGBoost work?

Gradient boosted trees are a supervised learning algorithm, which use the predictions of a collection of simple tree models for classification or regression. XGBoost uses a regularized loss function (the difference between the labels and the predictions) and another cost function for model complexity. These loss terms are minimized using gradient descent to reduce error and complexity of the model to move towards the simplest and most accurate model. This is a very broad generalization of the algorithm. For more detail, check out the XGBoost documentation on its powerful algorithm. (https://xgboost.readthedocs.io/en/latest/tutorials/model.html)

#### MLflow

<img src="https://databricks.com/wp-content/uploads/2020/04/databricks-adds-access-control-to-mlflow-model-registry_01.jpg" alt="drawing" width="700"/>

MLflow is a “platform for managing the end-to-end machine learning lifecycle.” It allows you to track experiments, deploy models to model serving tools, register models to manage them from staging to production, and package them to ease collaboration. The platform supports multiple languages, such as Python, Java, and R. It is a key component of the Databricks platform, which combines the multi-language support of both platforms and high-collaboration capabilities for fast development and clear understanding of the ML lifecycle on Spark.

#### XGBoost Example

The following example shows how the non-Spark XGBoost Python API can be used to train on the iris dataset. It is illustrative of how to integrate Spark data ingestion and MLflow with XGBoost with a non-distributed training model.

In [0]:
%python
from pyspark.sql.types import LongType
from pyspark.sql.functions import *

from sklearn.metrics import precision_score
from sklearn.model_selection import train_test_split

import xgboost as xgb
import mlflow.xgboost

# Read iris dataset
data = (spark
        .read
        .format('csv')
        .option("header", "true")
        .option("inferSchema", "true")
        .load('/databricks-datasets/Rdatasets/data-001/csv/datasets/iris.csv')
       )


# UDF for converting labels to indexes
def cnvt_species(s):
    species = ['setosa', 'versicolor', 'virginica']
    return species.index(s)
cnvt_species_udf = udf(cnvt_species, LongType())

# Select, rename columns, apply UDF on label column
data = (data.select(col("`Sepal.Length`").alias('sepal_length'), 
                col("`Sepal.Width`").alias('sepal_width'), 
                col("`Petal.Length`").alias('petal_length'), 
                col("`Petal.Width`").alias('petal_width'), 
                cnvt_species_udf(col("Species")).alias("species")))

# Split to train/test dataset
training_data, testing_data = train_test_split(data.toPandas())

# Load training data into a DMatrix
xgtrain = xgb.DMatrix(training_data[["sepal_length","sepal_width", "petal_length", "petal_width"]].values, training_data['species'])

# Start MLflow training run
with mlflow.start_run():
  
  # Auto-log the model parameters with mlflow.xgboost.autolog
  mlflow.xgboost.autolog()
  param = {'max_depth': 2, 
           'objective': 'multi:softmax', 
           'num_class':3, 
           'nthread':8}
  bst = xgb.train(param, xgtrain, 10)

  # Load testing data into DMatrix
  dtest = xgb.DMatrix(testing_data[["sepal_length","sepal_width", 
                                    "petal_length", "petal_width"]].values)
  # Predict testing data
  ypred = bst.predict(dtest)

  # Calculate accuracy score
  p_score = precision_score(testing_data["species"],ypred, average='micro')

  # Log precision score as a metric
  mlflow.log_metric("precision_score", p_score)
  print("XGBoost Model Precision Score:",p_score)


#### XGBoost4J-Spark Example

XGBoost4J-Spark does not have an official Python API, so this example is in Scala to show how to train a distributed model without requiring a Python wrapper. This would be useful for a Scala or a multi-language ML pipeline.

In [0]:
%sql
CREATE TABLE IF NOT EXISTS iris (rowNum int, SepalLength double, SepalWidth double, PetalLength double, PetalWidth double, Species string)
USING com.databricks.spark.csv
OPTIONS (path "/databricks-datasets/Rdatasets/data-001/csv/datasets/iris.csv", header "true")

In [0]:
%scala
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature._
import org.apache.spark.ml.tuning._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._

import org.mlflow.tracking.ActiveRun
import org.mlflow.tracking.MlflowContext
import org.mlflow.tracking.MlflowClient
import java.io.{File,PrintWriter}

import ml.dmlc.xgboost4j.scala.spark.{XGBoostClassifier, XGBoostClassificationModel}

// Set user value to your username
val user = "stephen.offer@databricks.com"

// Read dataset
val rawInput = spark.sql("select * from iris")

// Split the data into training and test set
val Array(training, test) = rawInput.randomSplit(Array(0.8, 0.2), 123)

// Create Vector Assembler stage
val assembler = new VectorAssembler()
  .setInputCols(Array("SepalLength", "SepalWidth", "PetalLength", "PetalWidth"))
  .setOutputCol("features")

// Create String to Index stage
val labelIndexer = new StringIndexer()
  .setInputCol("Species")
  .setOutputCol("classIndex")
  .fit(training)

// Create XGBoostClassifier model
val booster = new XGBoostClassifier(
  Map("eta" -> 0.1f,
    "max_depth" -> 2,
    "objective" -> "multi:softprob",
    "num_class" -> 3,
    "num_round" -> 100,
    "num_workers" -> 2,
    "tree_method" -> "auto"
  )
)
booster.setFeaturesCol("features")
booster.setLabelCol("classIndex")

// Create the training pipeline
val pipeline = new Pipeline()
  .setStages(Array(assembler, labelIndexer, booster))

// Start MLflow context
val mlflowContext = new MlflowContext()
val experimentName = "/Users/stephen.offer@databricks.com/xgboost4j-spark-quickstart"
val client = mlflowContext.getClient()
val experimentOpt = client.getExperimentByName(experimentName);

if (!experimentOpt.isPresent()) {
  client.createExperiment(experimentName)
}
mlflowContext.setExperimentName(experimentName)
val run = mlflowContext.startRun("run")

// Train model
val model = pipeline.fit(training)

// Batch prediction
val prediction = model.transform(test)
prediction.show(true)

// Model evaluation
val evaluator = new MulticlassClassificationEvaluator()
evaluator.setLabelCol("classIndex")
evaluator.setPredictionCol("prediction")

// Calculate the accuracy of the model
val accuracy = evaluator.evaluate(prediction)
println("The model accuracy is : " + accuracy)

// Save the model and log the path as a parameter
val scratchDir = s"dbfs:/tmp/stephen.offer@databricks.com/xgboost-model"
model.write.overwrite().save(scratchDir)
run.logParam(scratchDir, "model_path")

// Log the accuracy as a metric
run.logMetric("accuracy", accuracy)
run.endRun()

#### XGBoost4J-Spark PySpark Example

Despite not having an official Python API, a PySpark wrapper can be used to train a XGBoost4J-Spark model. In this example, we are using an unofficial wrapper which can be found here (https://github.com/sllynn/spark-xgboost).

In [0]:
%sh
git clone https://github.com/sllynn/spark-xgboost.git;
cd spark-xgboost;
pip install -e .;

In [0]:
%python
# Restart Python if the installed library raises an ImportError
dbutils.library.restartPython()

In [0]:
%python
# Code taken from https://github.com/sllynn/spark-xgboost/blob/master/examples/spark-xgboost_adultdataset.ipynb
from sparkxgb import XGBoostClassifier, XGBoostRegressor
from pprint import PrettyPrinter

from pyspark.sql.types import StringType

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
pp = PrettyPrinter()

col_names = [
  "age", "workclass", "fnlwgt",
  "education", "education-num",
  "marital-status", "occupation",
  "relationship", "race", "sex",
  "capital-gain", "capital-loss",
  "hours-per-week", "native-country",
  "label"
]

train_sdf, test_sdf = (
  spark.read.csv(
    path="/databricks-datasets/adult/adult.data",
    inferSchema=True  
  )
  .toDF(*col_names)
  .repartition(200)
  .randomSplit([0.8, 0.2])
)

string_columns = [fld.name for fld in train_sdf.schema.fields if isinstance(fld.dataType, StringType)]
string_col_replacements = [fld + "_ix" for fld in string_columns]
string_column_map=list(zip(string_columns, string_col_replacements))
target = string_col_replacements[-1]
predictors = [fld.name for fld in train_sdf.schema.fields if not isinstance(fld.dataType, StringType)] + string_col_replacements[:-1]
pp.pprint(
  dict(
    string_column_map=string_column_map,
    target_variable=target,
    predictor_variables=predictors
  )
)

si = [StringIndexer(inputCol=fld[0], outputCol=fld[1]) for fld in string_column_map]
va = VectorAssembler(inputCols=predictors, outputCol="features")
pipeline = Pipeline(stages=[*si, va])
fitted_pipeline = pipeline.fit(train_sdf.union(test_sdf))

train_sdf_prepared = fitted_pipeline.transform(train_sdf)
train_sdf_prepared.cache()
train_sdf_prepared.count()

test_sdf_prepared = fitted_pipeline.transform(test_sdf)
test_sdf_prepared.cache()
test_sdf_prepared.count()

xgbParams = dict(
  eta=0.1,
  maxDepth=2,
  missing=0.0,
  objective="binary:logistic",
  numRound=5,
  numWorkers=2
)

xgb = (
  XGBoostClassifier(**xgbParams)
  .setFeaturesCol("features")
  .setLabelCol("label_ix")
)

bce = BinaryClassificationEvaluator(
  rawPredictionCol="rawPrediction",
  labelCol="label_ix"
)

param_grid = (
  ParamGridBuilder()
  .addGrid(xgb.eta, [1e-1, 1e-2, 1e-3])
  .addGrid(xgb.maxDepth, [2, 4, 8])
  .build()
)

cv = CrossValidator(
  estimator=xgb,
  estimatorParamMaps=param_grid,
  evaluator=bce,#mce,
  numFolds=5
)

import mlflow
import mlflow.spark

spark_model_name = "best_model_spark"

with mlflow.start_run():
  model = cv.fit(train_sdf_prepared)
  best_params = dict(
    eta_best=model.bestModel.getEta(),
    maxDepth_best=model.bestModel.getMaxDepth()
  )
  mlflow.log_params(best_params)
  
  mlflow.spark.log_model(fitted_pipeline, "featuriser")
  mlflow.spark.log_model(model, spark_model_name)

  metrics = dict(
    roc_test=bce.evaluate(model.transform(test_sdf_prepared))
  )
  mlflow.log_metrics(metrics)
