# Introduction to XGBoost Spark with GPU

Taxi is an example of xgboost regressor. In this notebook, we will show you how to load data, train the xgboost model and use this model to predict "fare_amount" of your taxi trip. Comparing to original XGBoost Spark codes, there're only two API differences.


## Load libraries
First we load some common libraries that both GPU version and CPU version xgboost will use:

In [1]:
import ml.dmlc.xgboost4j.scala.spark.{XGBoostRegressor, XGBoostRegressionModel}
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.sql.types.{DoubleType, IntegerType, StructField, StructType}

what is new to xgboost-spark users is only `rapids.GpuDataReader`

In [2]:
import ml.dmlc.xgboost4j.scala.spark.rapids.{GpuDataReader, GpuDataset}

Some libraries needed for CPU version are not needed in GPU version any more. The extra libraries needed for CPU are like below:

```scala
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.FloatType
```

## Set your dataset path

In [3]:
// Set the paths of datasets for training and prediction
// You need to update them to your real paths!
val trainPath = "/data/taxi/csv/train/"
val trainWithEvalPath = "/data/taxi/csv/trainWithEval/"
val evalPath  = "/data/taxi/csv/eval/"

trainPath = /data/taxi/csv/train/
trainWithEvalPath = /data/taxi/csv/trainWithEval/
evalPath = /data/taxi/csv/eval/


/data/taxi/csv/eval/

## Set the schema of the dataset
For Taxi example, the data has 16 columns: 15 features and 1 label. "fare_amount" is set to the label column. The schema will be used to help load data in the future. We also defined some key parameters used in xgboost training process. We also set some basic xgboost parameters here.

In [4]:
lazy val schema =
  StructType(Array(
    StructField("vendor_id", DoubleType),
    StructField("passenger_count", DoubleType),
    StructField("trip_distance", DoubleType),
    StructField("pickup_longitude", DoubleType),
    StructField("pickup_latitude", DoubleType),
    StructField("rate_code", DoubleType),
    StructField("store_and_fwd", DoubleType),
    StructField("dropoff_longitude", DoubleType),
    StructField("dropoff_latitude", DoubleType),
    StructField(labelName, DoubleType),
    StructField("hour", DoubleType),
    StructField("year", IntegerType),
    StructField("month", IntegerType),
    StructField("day", DoubleType),
    StructField("day_of_week", DoubleType),
    StructField("is_weekend", DoubleType)
  ))

val labelName = "fare_amount"

lazy val paramMap = Map(
  "learning_rate" -> 0.05,
  "max_depth" -> 8,
  "subsample" -> 0.8,
  "gamma" -> 1,
  "num_round" -> 500
)

schema = <lazy>
labelName = fare_amount
paramMap = <lazy>


<lazy>

## Create a new spark session and load data
we must create a new spark session to continue all spark operations. It will also be used to initilize the `GpuDataReader` which is a data reader powered by GPU.

NOTE: in this notebook, we have uploaded dependency jars when installing toree kernel. If we don't upload them at installation time, we can also upload in notebook by [%AddJar magic](https://toree.incubator.apache.org/docs/current/user/faq/). However, there's one restriction for `%AddJar`: the jar uploaded can only be available when `AddJar` is called after a new spark session is created. We must use it as below:

```scala
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("Taxi-GPU").getOrCreate
%AddJar file:/data/libs/cudf-0.9-cuda10.jar
%AddJar file:/data/libs/xgboost4j_2.11-1.0.0-Beta.jar
%AddJar file:/data/libs/xgboost4j-spark_2.11-1.0.0-Beta.jar
// ...
```

In [5]:
val spark = SparkSession.builder().appName("Taxi-GPU").getOrCreate

spark = org.apache.spark.sql.SparkSession@27d062ec


org.apache.spark.sql.SparkSession@27d062ec

Here's the first API difference, we now use GpuDataReader to load dataset. Similar to original Spark data loading API, GpuDataReader also uses chaining call of "option", "schema","csv". For CPU verions data reader, the code is like below:

```scala
val dataReader = spark.read
```

`featureNames` is used to tell xgboost which columns are `features`, and which column is `label`.

In [6]:
val reader = new GpuDataReader(spark).option("header", true).schema(schema)
val featureNames = schema.filter(_.name != labelName).map(_.name)

reader = ml.dmlc.xgboost4j.scala.spark.rapids.GpuDataReader@57c9d28a
featureNames = List(vendor_id, passenger_count, trip_distance, pickup_longitude, pickup_latitude, rate_code, store_and_fwd, dropoff_longitude, dropoff_latitude, hour, year, month, day, day_of_week, is_weekend)


List(vendor_id, passenger_count, trip_distance, pickup_longitude, pickup_latitude, rate_code, store_and_fwd, dropoff_longitude, dropoff_latitude, hour, year, month, day, day_of_week, is_weekend)

## Initialize XGBoostRegressor
The second API difference is `setFeaturesCol` in CPU version vs `setFeaturesCols` in GPU version. setFeaturesCol accepts a String that indicates which vectorized column is the feature column. It requires `VectorAssembler` to help vectorize all feature columns into one. setFeaturesCols accepts a list of strings so that we don't need VectorAssembler any more. So GPU verion help reduce the preparation codes before you train your xgboost model.

CPU version:
```scala
object Vectorize {
  def apply(df: DataFrame, featureNames: Seq[String], labelName: String): DataFrame = {
    val toFloat = df.schema.map(f => col(f.name).cast(FloatType))
    new VectorAssembler()
      .setInputCols(featureNames.toArray)
      .setOutputCol("features")
      .transform(df.select(toFloat:_*))
      .select(col("features"), col(labelName))
  }
}
val reader = spark.read.option("header", true).schema(schema)
var trainSet = reader.csv(trainPath)
var evalSet = reader.csv(evalPath)
trainSet = Vectorize(trainSet, featureNames, labelColName)
evalSet = Vectorize(evalSet, featureNames, labelColName)
```

While with GpuDataReader, `VectorAssembler` is not needed any more. We can simply read data by:

In [7]:
val trainSet = reader.csv(trainPath)
val trainWithEvalSet = reader.csv(trainWithEvalPath)
val evalSet = reader.csv(evalPath)

trainSet = ml.dmlc.xgboost4j.scala.spark.rapids.GpuDataset@40ff4c27
trainWithEvalSet = ml.dmlc.xgboost4j.scala.spark.rapids.GpuDataset@be125b7
evalSet = ml.dmlc.xgboost4j.scala.spark.rapids.GpuDataset@2b60bd9a


ml.dmlc.xgboost4j.scala.spark.rapids.GpuDataset@2b60bd9a

## Add XGBoost parameters for GPU version
Another modification is `num_workers` should be set to the number of machines with GPU in Spark cluster, while it can be set to the number of your CPU cores in CPU version
```scala
// difference in parameters
"tree_method" -> "hist",
"num_workers" -> 12
```

In [8]:
val xgbParamFinal = paramMap ++ Map("tree_method" -> "gpu_hist", "num_workers" -> 1)

xgbParamFinal = Map(learning_rate -> 0.05, num_workers -> 1, subsample -> 0.8, max_depth -> 8, num_round -> 500, tree_method -> gpu_hist, gamma -> 1)


Map(learning_rate -> 0.05, num_workers -> 1, subsample -> 0.8, max_depth -> 8, num_round -> 500, tree_method -> gpu_hist, gamma -> 1)

## Initialize XGBoostRegressor
The second API difference is `setFeaturesCol` in CPU version vs `setFeaturesCols` in GPU version. `setFeaturesCol` accepts a String that indicates which vectorized column is the feature column. It requires `VectorAssembler` to help vectorize all feature columns into one. setFeaturesCols accepts a list of strings so that we don't need VectorAssembler any more. So GPU verion help reduce the preparation codes before you train your xgboost model.

CPU version:
```scala
val xgbRegressor = new XGBoostRegressor(xgbParamFinal)
      .setLabelCol(labelColName)
      .setFeaturesCol("features")
```

In [9]:
val xgbRegressor = new XGBoostRegressor(xgbParamFinal)
  .setLabelCol(labelName)
  .setFeaturesCols(featureNames)

xgbRegressor = xgbr_2eee9ab3afd6


xgbr_2eee9ab3afd6

## Benchmark and train
The benchmark object is for calculating training time. We will use it to compare with xgboost in CPU version.

We also support training with evaluation sets in 2 ways as same as CPU version behavior:

* API `setEvalSets` after initializing an XGBoostClassifier

```scala
xgbClassifier.setEvalSets(Map("eval" -> evalSet))

```

* parameter `eval_sets` when initializing an XGBoostClassifier

```scala
val paramMapWithEval = paramMap + ("eval_sets" -> Map("eval" -> evalSet))
val xgbClassifierWithEval = new XGBoostClassifier(paramMapWithEval)
```

in this notebook, we use API method to set evaluation sets.

In [10]:
xgbRegressor.setEvalSets(Map("eval" -> trainWithEvalSet))

xgbr_2eee9ab3afd6

In [11]:
object Benchmark {
  def time[R](phase: String)(block: => R): (R, Float) = {
    val t0 = System.currentTimeMillis
    val result = block // call-by-name
    val t1 = System.currentTimeMillis
    println("Elapsed time [" + phase + "]: " + ((t1 - t0).toFloat / 1000) + "s")
    (result, (t1 - t0).toFloat / 1000)
  }
}

// start training
val (model, _) = Benchmark.time("train") {
  xgbRegressor.fit(trainSet)
}

Tracker started, with env={DMLC_NUM_SERVER=0, DMLC_TRACKER_URI=10.19.183.124, DMLC_TRACKER_PORT=9092, DMLC_NUM_WORKER=1}
Elapsed time [train]: 14.501s


defined object Benchmark
model = xgbr_2eee9ab3afd6


xgbr_2eee9ab3afd6

## Transformation and evaluation
We use `evalSet` to evaluate our model and use some key columns to show our predictions. Finally we use `RegressionEvaluator` to calculate an overall accuracy of our predictions.

In [12]:
// start transform
val (prediction, _) = Benchmark.time("transform") {
  val ret = model.transform(evalSet).cache()
  ret.foreachPartition(_ => ())
  ret
}
prediction.select("vendor_id", "passenger_count", "trip_distance", labelName, "prediction").show(10)
val evaluator = new RegressionEvaluator().setLabelCol(labelName)
val (rmse, _) = Benchmark.time("evaluation") {
  evaluator.evaluate(prediction)
}
println(s"RMSE == $rmse")

Elapsed time [transform]: 2.416s
+-------------+---------------+-------------+-----------+------------------+
|    vendor_id|passenger_count|trip_distance|fare_amount|        prediction|
+-------------+---------------+-------------+-----------+------------------+
| 1.55973043E9|            1.0|          2.3|        7.7| 7.165289402008057|
| 1.55973043E9|            1.0|          1.7|        8.5| 7.477686405181885|
|-1.67996288E9|            2.0|          5.0|       13.3|13.023439407348633|
| 1.55973043E9|            1.0|          3.0|       11.7|11.613130569458008|
| 1.55973043E9|            2.0|          1.2|        5.8| 6.099410533905029|
| 1.55973043E9|            1.0|          1.7|        7.4|  7.53718376159668|
| 1.55973043E9|            1.0|          5.1|       15.4|14.448943138122559|
| 1.55973043E9|            1.0|          0.4|        4.6| 4.475068092346191|
| 1.55973043E9|            2.0|          2.1|        8.2| 8.562113761901855|
| 1.55973043E9|            1.0|          1.

prediction = [vendor_id: float, passenger_count: float ... 15 more fields]
evaluator = regEval_e3a5b4f23a9f
rmse = 0.6899989190851109


0.6899989190851109

## Save the model to disk and load model
We save the model to disk and then load it to memory. We can use the loaded model to do a new prediction.

In [13]:
model.write.overwrite.save("/data/model/taxi")

val modelFromDisk = XGBoostRegressionModel.load("/data/model/taxi")
val (results2, _) = Benchmark.time("transform2") {
  modelFromDisk.transform(evalSet)
}
results2.select("vendor_id", "passenger_count", "trip_distance", labelName, "prediction").show(5)

Elapsed time [transform2]: 0.044s
+-------------+---------------+-------------+-----------+------------------+
|    vendor_id|passenger_count|trip_distance|fare_amount|        prediction|
+-------------+---------------+-------------+-----------+------------------+
| 1.55973043E9|            1.0|          2.3|        7.7| 7.165289402008057|
| 1.55973043E9|            1.0|          1.7|        8.5| 7.477686405181885|
|-1.67996288E9|            2.0|          5.0|       13.3|13.023439407348633|
| 1.55973043E9|            1.0|          3.0|       11.7|11.613130569458008|
| 1.55973043E9|            2.0|          1.2|        5.8| 6.099410533905029|
+-------------+---------------+-------------+-----------+------------------+
only showing top 5 rows



modelFromDisk = xgbr_2eee9ab3afd6
results2 = [vendor_id: float, passenger_count: float ... 15 more fields]


[vendor_id: float, passenger_count: float ... 15 more fields]

In [14]:
spark.close()