# Using xgboost on pyspark
xgb is a wild used model on the industry, unlike other algos like random forest which is build-in in pyspark library, we need to set up by ourselves. But should not be so difficult.

## 1. prepare the jars & wrappers

we need to download two jars and one python wrapper. the link is listed below:
- xgboost4j: https://mvnrepository.com/artifact/ml.dmlc/xgboost4j/0.72?fireglass_rsn=true
- xgboost4j-spark: https://mvnrepository.com/artifact/ml.dmlc/xgboost4j-spark/0.72?fireglass_rsn=true
- XGBoost python wrapper: https://link.zhihu.com/?target=https%3A//github.com/dmlc/xgboost/files/2161553/sparkxgb.zip%3Ffireglass_rsn%3Dtrue

and i will use bank data as an example:
dataset: https://link.zhihu.com/?target=https%3A//www.kaggle.com/janiobachmann/bank-marketing-dataset

import package

In [None]:
import numpy as np
import pandas as pd
import os
import re
from sklearn import metrics
import matplotlib.pyplot as plt
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars xgboost4j-spark-0.72.jar,xgboost4j-0.72.jar pyspark-shell'
import findspark
findspark.init()
 
import pyspark
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
 
spark = SparkSession\
        .builder\
        .appName("PySpark XGBOOST")\
        .master("local[*]")\
        .getOrCreate()
 
from pyspark.sql.types import *
from pyspark.ml.feature import StringIndexer, VectorAssembler
spark.sparkContext.addPyFile("sparkxgb.zip")
from sparkxgb import XGBoostEstimator
import pyspark.sql.functions as F
import pyspark.sql.types as T

load spark and revise the column names to make sure there is no '.' on the columns, because spark dont accept


In [None]:
df_all = spark\
  .read\
  .option("header", "true")\
  .csv("bank.csv")

tran_tab = str.maketrans({x:None for x in list('{()}')})
df_all = df_all.toDF(*(re.sub(r'[\.\s]+', '_', c).translate(tran_tab) for c in df_all.columns))
 
# fill na
df_all = df_all.na.fill(0)

when training model we need to set up pipeline first. In pipeline, the stage are defined to show the operation sequence.
Moreover, when we deal with some categorial features,we need to do some transformtion on these features. Here we use stringindex & OneHotEncoder to do the transfermation, and for numerical features we can just add it directly as stages into pipeline.

In [None]:
unused_col = ['day','month']
df_all = df_all.select([col for col in df_all.columns if col not in unused_col])
numeric_features = [t[0] for t in df_all.dtypes if t[1] == 'int']
cols = df_all.columns
 
 
string_col = [t[0] for t in df_all.dtypes if t[1] != 'int']
string_col = [x for x in string_col if x!='deposit']
 
for S in string_col:
    globals()[str(S)+'Indexer'] = StringIndexer()\
                                  .setInputCol(S)\
                                  .setOutputCol(str(S)+'Index')\
                                  .setHandleInvalid("keep")
    globals()[str(S)+'classVecIndexer'] = OneHotEncoderEstimator(inputCols=[globals()[str(S)+'Indexer'].getOutputCol()], outputCols=[str(S)+ "classVec"]) 
 
 
# zip to one 'feature' columns
feature_col = [s+'Index' for s in string_col]
feature_col.extend([str(s)+ "classVec"  for s in string_col])
feature_col.extend(numeric_features)
 
vectorAssembler = VectorAssembler()\
  .setInputCols(feature_col)\
  .setOutputCol("features")
  
# index label columns
label_stringIdx = StringIndexer(inputCol = 'deposit', outputCol = 'label')
 
 
# define xgboost
xgboost = XGBoostEstimator(
    featuresCol="features", 
    labelCol="label", 
    predictionCol="prediction"
)


## 3. define pipeline and all the stages

In [None]:
feat_stage = [globals()[str(S)+'Indexer'] for S in string_col]
feat_stage.extend([globals()[str(s)+ "classVecIndexer"]  for s in string_col])
feat_stage.extend([vectorAssembler,label_stringIdx,xgboost])
xgb_pipeline = Pipeline().setStages(feat_stage)
 
# split train & test
trainDF, testDF = df_all.randomSplit([0.8, 0.2], seed=24)


## 4. model training & test

In [None]:
# train model
model = xgb_pipeline.fit(trainDF)
# predict 
pre   = model.transform(testDF)\
        .select(col("label"),col('probabilities'),col("prediction"))

# to pandas df 
cx = pre.toPandas()
cx["probabilities"] =   cx["probabilities"].apply(lambda x: x.values)
cx[['prob_0','prob_1']] = pd.DataFrame(cx.probabilities.tolist(), index= cx.index)
cx  = cx[["label",'prob_1']].sort_values(by = ['prob_1'],ascending = False)

## 5. evaluate results

In [None]:
#evaluate
metrics.roc_auc_score(cx.label, cx.prob_1)
 
# plot ROC curve
y_pred_proba =cx.prob_1
fpr, tpr, _ = metrics.roc_curve(cx.label,  y_pred_proba)
auc = metrics.roc_auc_score(cx.label, y_pred_proba)
plt.plot(fpr,tpr,label="data 1, auc="+str(auc))
plt.legend(loc=4)
plt.show()