<a href="https://colab.research.google.com/github/nhanwei/medium/blob/master/XGBoost_with_Spark_on%C2%A0GPU.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# XGBoost with Spark on GPU



In [None]:
# checking that we have GPU in the system
!nvidia-smi

Sun Aug 30 13:28:53 2020       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 450.66       Driver Version: 418.67       CUDA Version: 10.1     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla T4            Off  | 00000000:00:04.0 Off |                    0 |
| N/A   51C    P8    10W /  70W |      0MiB / 15079MiB |      0%      Default |
|                               |                      |                 ERR! |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

Take note that we are using **CUDA 10.1** with **Tesla T4**. We have 15,000 MiB of memory which are all free now.

In [None]:
# getting JDK because Spark is developed using Scala which requires Java runtime environment
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# downloading and unzipping the Spark
!wget -q https://downloads.apache.org/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz
!tar xf spark-3.0.0-bin-hadoop2.7.tgz
# findspark is a utility that automatically set all the os path and initialize the spark context
!pip install -q findspark

In [None]:
# if you want to use cuda. make sure the version is correct as above
!wget https://repo1.maven.org/maven2/ai/rapids/cudf/0.14/cudf-0.14-cuda10-1.jar
# downloading xgboost
!wget https://repo1.maven.org/maven2/com/nvidia/xgboost4j_3.0/1.0.0-0.1.0/xgboost4j_3.0-1.0.0-0.1.0.jar   
!wget https://repo1.maven.org/maven2/com/nvidia/xgboost4j-spark_3.0/1.0.0-0.1.0/xgboost4j-spark_3.0-1.0.0-0.1.0.jar   
# rapids for spark is nvidia's framework to train ml models on gpu
!wget http://insecure.repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/0.1.0/rapids-4-spark_2.12-0.1.0.jar

--2020-08-30 13:29:35--  https://repo1.maven.org/maven2/ai/rapids/cudf/0.14/cudf-0.14-cuda10-1.jar
Resolving repo1.maven.org (repo1.maven.org)... 151.101.52.209
Connecting to repo1.maven.org (repo1.maven.org)|151.101.52.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 145993287 (139M) [application/java-archive]
Saving to: ‘cudf-0.14-cuda10-1.jar’


2020-08-30 13:29:36 (193 MB/s) - ‘cudf-0.14-cuda10-1.jar’ saved [145993287/145993287]

--2020-08-30 13:29:36--  https://repo1.maven.org/maven2/com/nvidia/xgboost4j_3.0/1.0.0-0.1.0/xgboost4j_3.0-1.0.0-0.1.0.jar
Resolving repo1.maven.org (repo1.maven.org)... 151.101.52.209
Connecting to repo1.maven.org (repo1.maven.org)|151.101.52.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 231556205 (221M) [application/java-archive]
Saving to: ‘xgboost4j_3.0-1.0.0-0.1.0.jar’


2020-08-30 13:29:37 (213 MB/s) - ‘xgboost4j_3.0-1.0.0-0.1.0.jar’ saved [231556205/231556205]

--2020-08-30 13:29:37--  htt

In [None]:
# before doing findspark
import os 
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64" # set java home
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop2.7" # set spark home. it's the file we downloaded and unpacked just now
# transfer all the jar file to the cluster
# usually we list this in the command line when we submit spark jobs. 
# however, since we are working interactively with spark, we set it here
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /content/cudf-0.14-cuda10-1.jar,/content/xgboost4j_3.0-1.0.0-0.1.0.jar,/content/xgboost4j-spark_3.0-1.0.0-0.1.0.jar,/content/rapids-4-spark_2.12-0.1.0.jar pyspark-shell'

In [None]:
import findspark 
findspark.init() # go to the spark and java home to initiate the environment
from pyspark.sql import SparkSession
# line 7: either cluster or local
# line 8: to run on GPU
# line 9: Should RMM act as a pooling allocator for GPU memory, or should it just pass through to CUDA memory allocation directly
spark = SparkSession.builder.master("local[*]").\
        config("spark.plugins", "com.nvidia.spark.SQLPlugin").\
        config("spark.rapids.memory.gpu.pooling.enabled", False).\
        getOrCreate()

# these jars have some python files that we need to use
spark.sparkContext.addPyFile("/content/xgboost4j-spark_3.0-1.0.0-0.1.0.jar")
spark.sparkContext.addPyFile("/content/rapids-4-spark_2.12-0.1.0.jar")

## XGBoost model with Spark
The <a href='https://github.com/NVIDIA/spark-xgboost-examples/blob/spark-3/api-docs/python.md#xgboostclassifier'>Pyspark XGBoost API</a> is a wrapper around the <a href='https://github.com/NVIDIA/spark-xgboost-examples/blob/spark-3/api-docs/scala.md#xgboostclassifier'>Scala XGBoostClassifier</a>.

All standard parameters are supported. Look <a href='https://xgboost.readthedocs.io/en/latest/parameter.html'>here</a> and <a href='https://xgboost.readthedocs.io/en/latest/jvm/scaladocs/xgboost4j-spark/ml/dmlc/xgboost4j/scala/spark/XGBoostClassifier.html'>here</a> for more details.


In [None]:
# these XGBoost models are wrapper around scala xgboost classifier
# most standard parameters are supported
from ml.dmlc.xgboost4j.scala.spark import XGBoostClassificationModel, XGBoostClassifier
import pandas as pd
import time

## Downloading data
We are getting the **Covertype** dataset (`https://archive.ics.uci.edu/ml/datasets/covertype`). Basically, you are asked to predict forest cover type from cartograph
ic variables only. Srivatsan88 has converted the csv files into parquet files and uploaded to github. Let's use that.

Parquet is a columnar data format that is more efficient and saves storage space. If you find yourself dealing with **certain columns** of large datasets (e.g. time series), you might want to consider using parquet files to store your data to reduce IO. 

In [None]:
!wget https://raw.githubusercontent.com/srivatsan88/YoutubeLI/master/dataset/covtype_train.parquet
!wget https://raw.githubusercontent.com/srivatsan88/YoutubeLI/master/dataset/covtype_test.parquet

--2020-08-30 13:29:53--  https://raw.githubusercontent.com/srivatsan88/YoutubeLI/master/dataset/covtype_train.parquet
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.0.133, 151.101.64.133, 151.101.128.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.0.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 6827427 (6.5M) [application/octet-stream]
Saving to: ‘covtype_train.parquet’


2020-08-30 13:29:54 (25.7 MB/s) - ‘covtype_train.parquet’ saved [6827427/6827427]

--2020-08-30 13:29:54--  https://raw.githubusercontent.com/srivatsan88/YoutubeLI/master/dataset/covtype_test.parquet
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.0.133, 151.101.64.133, 151.101.128.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.0.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1591040 (1.5M) [application/octet-stream]
Saving to: 

In [None]:
reader = spark.read
train_data = reader.parquet("/content/covtype_train.parquet")
test_data = reader.parquet("/content/covtype_test.parquet")
train_data.show()

+---------+------+-----+--------------------------------+------------------------------+-------------------------------+-------------+--------------+-------------+----------------------------------+----------------+----------------+----------------+----------------+----------+----------+----------+----------+----------+----------+----------+----------+----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------+
|Elevation|Aspect|Slope|Horizontal_Distance_To_Hydrology|Vertical_Distance_To_Hydrology|Horizontal_Distance_To_Roadways|Hillshade_9am|Hillshade_Noon|Hillshade_3pm|Horizontal_Distance_To_Fire_Points|Wilderness_Area1|Wilderness_Area2|Wilderness_Area3|Wilder

In [None]:
# getting the feature names
target = 'target'
features = [feat for feat in train_data.schema.names if feat != target]

In [None]:
# setting parameters. you can find these parameters in the link above.
params = {'eta': 0.1, 'gamma': 0.1, 'missing': 0.0,
          'treeMethod': 'gpu_hist', 'maxDepth': 3, 
          'growPolicy': 'depthwise', 'lambda_': 1.0,
          'subsample': 1.0, 'numRound': 1000,
          'numWorkers': 1, 'verbosity': 1}

# create XGBoost model object
xgboost = XGBoostClassifier(**params).setLabelCol(target).setFeaturesCols(features)

In [None]:
start_time = time.time()

model = xgboost.fit(train_data)

print("GPU Training Time: %s seconds" % (str(time.time() - start_time)))

GPU Training Time: 11.793736219406128 seconds


In [None]:
!nvidia-smi

Sun Aug 30 13:30:29 2020       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 450.66       Driver Version: 418.67       CUDA Version: 10.1     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla T4            Off  | 00000000:00:04.0 Off |                    0 |
| N/A   50C    P0    37W /  70W |   1319MiB / 15079MiB |      0%      Default |
|                               |                      |                 ERR! |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

It took about 12 seconds and used 1339 MiB

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, \
                                  MulticlassClassificationEvaluator

accuracy = MulticlassClassificationEvaluator(labelCol=target, 
                                             predictionCol='prediction',
                                             metricName='accuracy')

precision = MulticlassClassificationEvaluator(labelCol=target, 
                                              predictionCol='prediction',
                                              metricName='weightedPrecision')

recall = MulticlassClassificationEvaluator(labelCol=target, 
                                           predictionCol='prediction',
                                           metricName='weightedRecall')

f1 = MulticlassClassificationEvaluator(labelCol=target, 
                                       predictionCol='prediction',
                                       metricName='f1')

areaROC = BinaryClassificationEvaluator(labelCol=target,
                                        rawPredictionCol='prediction',
                                        metricName='areaUnderROC')

areaPR = BinaryClassificationEvaluator(labelCol=target,
                                       rawPredictionCol='prediction',
                                       metricName='areaUnderPR')

In [None]:
# the error metrics
metrics = [accuracy, precision, recall, f1, areaROC, areaPR]
metric_labels = ['accuracy', 'precision', 'recall', 'f1', 'areaROC', 'areaPR']

eval_list = list()

# calculate error metrics and add to a Pandas series
metric_vals = pd.Series(dict([(x[0], x[1].evaluate(result)) 
                              for x in zip(metric_labels, metrics)]))
    
metric_vals

NameError: ignored