# "[Spark] Spark GPU Cluster에서 XGBoost 분산 GPU 학습하기"
> Spark GPU Cluster에서 XGBoost GPU Regression 학습 테스트

- toc: true 
- badges: true
- comments: true
- categories: [Spark]
- tags: [spark, gpu, xgboost]

# Dependency
- CentOS 7
- Nvidia driver 510.73.05
- Spark 3.2.1
- Python 3.7
- jars
    - cudf-22.04.0-cuda11.jar
    - rapids-4-spark_2.12-22.04.0.jar
    - xgboost4j-spark_3.0-1.4.2-0.3.0.jar
    - xgboost4j_3.0-1.4.2-0.3.0.jar

# XGBoost4J-Spark-GPU 설정

[spark-rapids github](https://github.com/NVIDIA/spark-rapids)을 참고하여 설정 하였다. 설정이 매우 간단한 거 같지만 dependency 문제 때문에 생각보다 시간이 많이 소요되었다.

- 아래 jars들을 [Maven Repository](https://mvnrepository.com)에서 검색하여 `$SPARK_HOME/jars` 경로에 넣어 준다. 버전이 중요하다. 버전이 안 맞으면 온갖 exception에 시달린다.
    - cudf-22.04.0-cuda11.jar
    - rapids-4-spark_2.12-22.04.0.jar
    - xgboost4j-spark_3.0-1.4.2-0.3.0.jar
    - xgboost4j_3.0-1.4.2-0.3.0.jar

#### 아래는 GPU Resource가 있는 Spark Worker에서 진행
- [Spark 공식 github](https://github.com/apache/spark/blob/master/examples/src/main/scripts/getGpusResources.sh) 에 접속하여 `getGpusResources.sh` 스크립트를 다운 받는다.
- `getGpusResources.sh` 스크립트는 스파크가 GPU 자원이 있는지 인식할 수 있게 해준다.
- `getGpusResources.sh`를 `$SPARK_HOME/drivers` 경로에 넣어 주고(`$SPARK_HOME/drivers`가 아니여도 상관없고, 디렉토리가 없으면 만들어서 넣어줌), 실행 권한을 준다.

```sh
$ chmod +x $SPARK_HOME/drivers/getGpusResources.sh
```

- GPU가 있는 Spark Worker의 `$SPARK_HOME/conf/spark-env.sh` 를 열어 Spark Worker가 `getGpusResources.sh` 스크립트 위치를 알 수 있도록 아래 내용을 넣어준다.

```sh
$ vi $SPARK_HOME/conf/spark-env.sh
```

```sh
# $SPARK_HOME/conf/spark-env.sh
export SPARK_WORKER_OPTS="-Dspark.worker.resource.gpu.amount=1 -Dspark.worker.resource.gpu.discoveryScript=/home/a/spark/drivers/getGpusResources.sh"
```

- `$SPARK_HOME/conf/spark-defaults.conf` 를 열어 아래 내용을 추가한다.

```sh
spark.kryo.registrator  com.nvidia.spark.rapids.GpuKryoRegistrator
```

# XGBoost 분산 GPU 학습 예제

In [23]:
spark.stop()

## 관련 라이브러리 import

In [24]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
conf = SparkConf().setAppName("Spark-XGBoost-GPU")
conf.set("spark.sql.repl.eagerEval.enabled", True)
# conf.set("spark.jars", "s3://gourav-bucket/gourav/gpu/cudf-0.9.2.jar,s3://gourav-bucket/gourav/gpu/rapids-4-spark_2.12-0.1.0.jar,s3://gourav-bucket/gourav/gpu/cudf-0.14-cuda10-1.jar")
# conf.set("spark.executor.instances", "20")
# conf.set("spark.executor.cores", "4")
conf.set("spark.task.cpus", "2")
# conf.set("spark.executor.memory", "4g")
# conf.set("spark.executor.memoryOverhead", "2G")
conf.set("spark.executor.extraJavaOptions", "-Dai.rapids.cudf.prefer-pinned=true")
conf.set("spark.executor.resource.gpu.amount", "1")
conf.set("spark.task.resource.gpu.amount", "0.25")
conf.set("spark.locality.wait", "0s")
conf.set("spark.executor.resource.gpu.discoveryScript", "/home/a/spark/drivers/getGpusResources.sh")
conf.set("spark.plugins", "com.nvidia.spark.SQLPlugin")
conf.set('spark.rapids.sql.enabled','true')
conf.set('spark.rapids.sql.explain', 'ALL') # ALL/NONE/NOT_ON_GPU
conf.set("spark.rapids.sql.concurrentGpuTasks", "3")
conf.set("spark.rapids.memory.pinnedPool.size", "1G")
# conf.set("spark.rapids.sql.hasNans", "false")
# conf.set('spark.rapids.sql.batchSizeBytes', '512M')
# conf.set('spark.rapids.sql.reader.batchSizeBytes', '768M')
# conf.set('spark.rapids.sql.variableFloatAgg.enabled', 'true')
# conf.set('spark.rapids.sql.incompatibleDateFormats.enabled', 'true')
# conf.set('spark.rapids.sql.udfCompiler.enabled', 'true')
# conf.set('spark.rapids.sql.csv.read.double.enabled', 'true')
# conf.set("spark.sql.adaptive.enabled", "false")
conf.set("spark.sql.files.maxPartitionBytes", "512m")

spark = SparkSession.builder.config(conf=conf)\
    .master("spark://lab100:7077").getOrCreate()

spark.sparkContext.addPyFile("/home/a/spark/jars/rapids-4-spark_2.12-22.04.0.jar")
spark.sparkContext.addPyFile("/home/a/spark/jars/xgboost4j-spark_3.0-1.4.2-0.3.0.jar")

In [25]:
import os
MINIO_ACCESS_KEY = os.environ['MINIO_ACCESS_KEY']
MINIO_SECRET_KEY = os.environ['MINIO_SECRET_KEY']

spark.sparkContext._jsc.hadoopConfiguration()\
    .set("fs.s3a.access.key", MINIO_ACCESS_KEY)
spark.sparkContext._jsc.hadoopConfiguration()\
    .set("fs.s3a.secret.key", MINIO_SECRET_KEY)
spark.sparkContext._jsc.hadoopConfiguration()\
    .set("fs.s3a.endpoint", "http://lab101:10170")
spark.sparkContext._jsc.hadoopConfiguration()\
    .set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark.sparkContext._jsc.hadoopConfiguration()\
    .set("fs.s3a.connection.ssl.enabled", "false")
spark.sparkContext._jsc.hadoopConfiguration()\
    .set("fs.s3a.path.style.access", "true")
spark.sparkContext._jsc.hadoopConfiguration()\
    .set("com.amazonaws.services.s3.enableV2", "true")
spark.sparkContext._jsc.hadoopConfiguration()\
    .set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")

In [26]:
from ml.dmlc.xgboost4j.scala.spark import XGBoostRegressionModel, XGBoostRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import SparkSession
from pyspark.sql.types import FloatType, IntegerType, StructField, StructType
from time import time

## 데이터 로드 및 전처리

In [27]:
reader = spark.read

In [28]:
label = 'fare_amount'
schema = StructType([
    StructField('vendor_id', FloatType()),
    StructField('passenger_count', FloatType()),
    StructField('trip_distance', FloatType()),
    StructField('pickup_longitude', FloatType()),
    StructField('pickup_latitude', FloatType()),
    StructField('rate_code', FloatType()),
    StructField('store_and_fwd', FloatType()),
    StructField('dropoff_longitude', FloatType()),
    StructField('dropoff_latitude', FloatType()),
    StructField(label, FloatType()),
    StructField('hour', FloatType()),
    StructField('year', IntegerType()),
    StructField('month', IntegerType()),
    StructField('day', FloatType()),
    StructField('day_of_week', FloatType()),
    StructField('is_weekend', FloatType()),
])
features = [ x.name for x in schema if x.name != label ]

train_data = reader.schema(schema).option('header', True).csv('s3a://data/taxi-small/train')
trans_data  = reader.schema(schema).option('header', True).csv('s3a://data/taxi-small/trainWithEval/')

## XGBoostRegressor 생성
- 처음에 공식문서가 아닌 구글링을 기반으로 테스트하다가 이 부분 때문에 고생했다. setFeaturesCols은 GPU 버전에서만 사용할 수 있고, CPU 버전에서는 setFeaturesCols이 없고 대신 setFeaturesCol을 써야 한다. CPU 버전에서는 하나의 벡터를 만들어서 넣어줘야 한다.
- CPU 버전에서는 아래와 같이 전처리 한다.

```python
# CPU Version
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col

def vectorize(data_frame):
    to_floats = [ col(x.name).cast(FloatType()) for x in data_frame.schema ]
    return (VectorAssembler()
        .setInputCols(features)
        .setOutputCol('features')
        .transform(data_frame.select(to_floats))
        .select(col('features'), col(label)))

train_data = vectorize(train_data)
trans_data = vectorize(trans_data)
```

- parmas의 treeMethod에서 GPU Version에서는 `gpu_hist`만 지원한다. `hist`는 CPU Version에서만 사용된다.

In [29]:
params = { 
    'eta': 0.05,
    'treeMethod': 'gpu_hist',
    'maxDepth': 8,
    'subsample': 0.8,
    'gamma': 1.0,
    'numRound': 100,
    'numWorkers': 1,
}

# GPU Version
regressor = XGBoostRegressor(**params).setLabelCol(label).setFeaturesCols(features)

# CPU Version
# regressor = XGBoostRegressor(**params).setLabelCol(label).setFeaturesCol('features')

## Train the Data with Benchmark

In [30]:
def with_benchmark(phrase, action):
    start = time()
    result = action()
    end = time()
    print('{} takes {} seconds'.format(phrase, round(end - start, 2)))
    return result
model = with_benchmark('Training', lambda: regressor.fit(train_data))

Training takes 10.44 seconds


## 모델 저장 및 재로드

In [31]:
model.write().overwrite().save('s3a://data/new-model-path')
loaded_model = XGBoostRegressionModel().load('s3a://data/new-model-path')

## 결과
- CPU Version에서는 `select` function을 쓸 수 없다.

In [32]:
def transform():
    result = loaded_model.transform(trans_data).cache()
    result.foreachPartition(lambda _: None)
    return result
result = with_benchmark('Transformation', transform)
result.select('vendor_id', 'passenger_count', 'trip_distance', label, 'prediction').show(5)

Transformation takes 2.22 seconds
+------------+---------------+-------------+-----------+-----------------+
|   vendor_id|passenger_count|trip_distance|fare_amount|       prediction|
+------------+---------------+-------------+-----------+-----------------+
|1.55973043E9|            1.0|          1.5|        6.0|8.171152114868164|
|1.55973043E9|            1.0|          2.1|        9.5|9.933568954467773|
|1.55973043E9|            1.0|          1.7|        9.5|8.162859916687012|
|1.55973043E9|            1.0|          0.8|        6.0|5.170645713806152|
|1.55973043E9|            1.0|          0.8|        5.5|5.057125091552734|
+------------+---------------+-------------+-----------+-----------------+
only showing top 5 rows



In [33]:
accuracy = with_benchmark(
    'Evaluation',
    lambda: RegressionEvaluator().setLabelCol(label).evaluate(result))
print('RMSE is ' + str(accuracy))

Evaluation takes 0.27 seconds
RMSE is 1.8364262858328744
