# Introduction to XGBoost Spark with GPU

The goal of this notebook is to show how to train a XGBoost Model with Spark RAPIDS XGBoost library on GPUs. The dataset used with this notebook is derived from Fannie Mae’s Single-Family Loan Performance Data with all rights reserved by Fannie Mae. This processed dataset is redistributed with permission and consent from Fannie Mae. This notebook uses XGBoost to train 12-month mortgage loan delinquency prediction model .

A few libraries required for this notebook:
  1. NumPy
  2. cudf jar
  3. xgboost4j jar
  4. xgboost4j-spark jar
  5. rapids-4-spark.jar

This notebook also illustrates the ease of porting a sample CPU based Spark xgboost4j code into GPU. There is only one change required for running Spark XGBoost on GPU. That is replacing the API `setFeaturesCol(feature)` on CPU with the new API `setFeaturesCols(features)`. This also eliminates the need for vectorization (assembling multiple feature columns in to one column) since we can read multiple columns.

Note: For PySpark based XGBoost, please refer to the [Spark-RAPIDS-examples 22.04 branch](https://github.com/NVIDIA/spark-rapids-examples/tree/branch-22.04) that
uses [NVIDIA’s Spark XGBoost version](https://repo1.maven.org/maven2/com/nvidia/xgboost4j-spark_3.0/1.4.2-0.3.0/).

#### Import All Libraries

In [2]:
from time import time
import os

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.sql.types import *


from xgboost.spark import SparkXGBClassifier, SparkXGBClassifierModel

Besides CPU version requires two extra libraries.
```Python
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col
```

#### Create Spark Session and Data Reader

In [4]:
spark = SparkSession.builder.getOrCreate()

#### Specify the Data Schema and Load the Data

In [9]:
label = 'delinquency_12'
schema = StructType([
    StructField('orig_channel', FloatType()),
    StructField('first_home_buyer', FloatType()),
    StructField('loan_purpose', FloatType()),
    StructField('property_type', FloatType()),
    StructField('occupancy_status', FloatType()),
    StructField('property_state', FloatType()),
    StructField('product_type', FloatType()),
    StructField('relocation_mortgage_indicator', FloatType()),
    StructField('seller_name', FloatType()),
    StructField('mod_flag', FloatType()),
    StructField('orig_interest_rate', FloatType()),
    StructField('orig_upb', DoubleType()),
    StructField('orig_loan_term', IntegerType()),
    StructField('orig_ltv', FloatType()),
    StructField('orig_cltv', FloatType()),
    StructField('num_borrowers', FloatType()),
    StructField('dti', FloatType()),
    StructField('borrower_credit_score', FloatType()),
    StructField('num_units', IntegerType()),
    StructField('zip', IntegerType()),
    StructField('mortgage_insurance_percent', FloatType()),
    StructField('current_loan_delinquency_status', IntegerType()),
    StructField('current_actual_upb', FloatType()),
    StructField('interest_rate', FloatType()),
    StructField('loan_age', FloatType()),
    StructField('msa', FloatType()),
    StructField('non_interest_bearing_upb', FloatType()),
    StructField(label, IntegerType()),
])

features = [x.name for x in schema if x.name != label]

# You need to update them to your real paths!
dataRoot = os.getenv("DATA_ROOT", "/data")
train_path = dataRoot + "/mortgage/output/train"
eval_path = dataRoot + "/mortgage/output/eval"

train_path = "/home/bobwang/data/unit_test_data/mortgage/parquet/train"
eval_path = "/home/bobwang/data/unit_test_data/mortgage/parquet/trans"

train_data = spark.read.parquet(train_path)
trans_data = spark.read.parquet(eval_path)

Note on CPU version, vectorization is required before fitting data to classifier, which means you need to assemble all feature columns into one column.

```Python
def vectorize(data_frame):
    to_floats = [ col(x.name).cast(FloatType()) for x in data_frame.schema ]
    return (VectorAssembler()
        .setInputCols(features)
        .setOutputCol('features')
        .transform(data_frame.select(to_floats))
        .select(col('features'), col(label)))

train_data = vectorize(train_data)
trans_data = vectorize(trans_data)
```

#### Create a XGBoostClassifier

In [12]:
classifier = (SparkXGBClassifier(
    features_col=features,
    label_col=label,
    use_gpu=True,
))

The CPU version classifier provides the API `setFeaturesCol` which only accepts a single column name, so vectorization for multiple feature columns is required.
```Python
classifier = XGBoostClassifier(**params).setLabelCol(label).setFeaturesCol('features')
```

The parameter `num_workers` should be set to the number of GPUs in Spark cluster for GPU version, while for CPU version it is usually equal to the number of the CPU cores.

Concerning the tree method, GPU version only supports `gpu_hist` currently, while `hist` is designed and used here for CPU training.

#### Train the Data with Benchmark

In [13]:
def with_benchmark(phrase, action):
    start = time()
    result = action()
    end = time()
    print('{} takes {} seconds'.format(phrase, round(end - start, 2)))
    return result
model = with_benchmark('Training', lambda: classifier.fit(train_data))

If features_cols param set, then features_col param is ignored.
You enabled use_gpu in spark local mode. Please make sure your local node has at least 1 GPUs
22/09/28 10:22:18 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
booster params: {'objective': 'binary:logistic', 'tree_method': 'gpu_hist', 'repartition_random_shuffle': True, 'nthread': 1, 'gpu_id': 0}
train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
dmatrix_kwargs: {'nthread': 1, 'missing': nan}
[10:22:24] task 0 got new rank 0
--------------------------------------------------------------------------------

  CuPy may not function correctly because multiple CuPy packages are installed
  in your environment:

    cupy, cupy-cuda115

  Follow these steps to resolve this issue:

    1. For all packages listed above, run the following command to remove all
       existing CuPy installations:

 

Training takes 16.75 seconds




#### Save and Reload the Model

In [None]:
model.write().overwrite().save(dataRoot + '/mortgage/model')
loaded_model = SparkXGBClassifierModel.load(dataRoot + '/mortgage/model')

If features_cols param set, then features_col param is ignored.
You enabled use_gpu in spark local mode. Please make sure your local node has at least 1 GPUs


#### Transformation and Show Result Sample

In [None]:
def transform():
    result = loaded_model.transform(trans_data).cache()
    result.foreachPartition(lambda _: None)
    return result
result = with_benchmark('Transformation', transform)
result.select(label, 'rawPrediction', 'probability', 'prediction').show(5)

#### Evaluation

In [None]:
accuracy = with_benchmark(
    'Evaluation',
    lambda: MulticlassClassificationEvaluator().setLabelCol(label).evaluate(result))
print('Accuracy is ' + str(accuracy))

In [None]:
spark.stop()