 # Using XGBoost with PySpark

Following notebook showcases an example for using XGBoost with PySpark.

XGBoost does not provide a PySpark API in Spark, it only provides Scala and other APIs. Hence we will be using a custom python wrapper for XGBoost built by [sllynn here](https://github.com/sllynn/spark-xgboost/tree/master/sparkxgb).

We will be using Spark 2.4.5 with XGBoost 0.9 as it is one the working version pairs.

This uses dummy sales data.

***

<b>Spark 2.4.5</b> (with Python 3.7) has been used for this notebook.<br>
Refer to [spark documentation](https://spark.apache.org/docs/2.4.5/api/sql/index.html) for help with <b>data ops functions</b>.<br>
Refer to [this article](https://medium.com/analytics-vidhya/installing-and-using-pyspark-on-windows-machine-59c2d64af76e) to <b>install and use PySpark on Windows machine</b> and [this article](https://patilvijay23.medium.com/installing-and-using-pyspark-on-linux-machine-e9f8dddc0c9a) to <b>install and use PySpark on Linux machine</b>.

## Getting the required files
XGBoost 0.9 Jar files:
- `wget https://repo1.maven.org/maven2/ml/dmlc/xgboost4j/0.90/xgboost4j-0.90.jar`
- `wget https://repo1.maven.org/maven2/ml/dmlc/xgboost4j-spark/0.90/xgboost4j-spark-0.90.jar`

OR

- `curl -o xgboost4j-0.90.jar https://repo1.maven.org/maven2/ml/dmlc/xgboost4j/0.90/xgboost4j-0.90.jar`
- `curl -o xgboost4j-spark-0.90.jar https://repo1.maven.org/maven2/ml/dmlc/xgboost4j-spark/0.90/xgboost4j-spark-0.90.jar`

Wrapper code
- `git clone https://github.com/sllynn/spark-xgboost.git`

Setup:
- Place `sparkxgb` folder from the git repo into your working directory.
- Copy the two jar files into `sparkxgb` directory to have everything in one place.

### Building a spark session
To create a SparkSession, use the following builder pattern:
 
`spark = SparkSession\
    .builder\
    .master("local")\
    .appName("Word Count")\
    .config("spark.some.config.option", "some-value")\
    .getOrCreate()`

We will use `.config("spark.jars", "/path/jar1.jar,/path/jar2.jar")` to add the required XGBoost jars to the session.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql.types import FloatType

from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.evaluation import BinaryClassificationEvaluator

import pandas as pd

In [2]:
#initiating spark session
spark.stop()

In [3]:
spark = SparkSession\
    .builder\
    .appName("xgboost")\
    .config("spark.executor.memory", "1536m")\
    .config("spark.driver.memory", "2g")\
    .config("spark.jars", "sparkxgb/xgboost4j-0.90.jar,sparkxgb/xgboost4j-spark-0.90.jar")\
    .getOrCreate()

In [4]:
spark

<b>Import the wrapper</b>

In [5]:
from sparkxgb import XGBoostClassifier, XGBoostRegressor

## Data prep

We will use the model training dataset created in the Rolling Window features notebook- 3_rolling_window_features.

The dataset has two y variables, one for each classification and regression tasks. We will use those to build classification and regression models using XGBoost.

### Read input dataset

In [6]:
df_features = spark.read.parquet('./data/rw_features/')

### Dataset for modeling

<b>Sample one week_end per month</b>

In [7]:
df_wk_sample = df_features.select('week_end').withColumn('month', F.substring(F.col('week_end'), 1,7))
df_wk_sample = df_wk_sample.groupBy('month').agg(F.max('week_end').alias('week_end'))

df_wk_sample = df_wk_sample.repartition(1).persist()
df_wk_sample.count()

24

In [8]:
df_wk_sample.sort('week_end').show(5)

+-------+----------+
|  month|  week_end|
+-------+----------+
|2019-01|2019-01-26|
|2019-02|2019-02-23|
|2019-03|2019-03-30|
|2019-04|2019-04-27|
|2019-05|2019-05-25|
+-------+----------+
only showing top 5 rows



In [9]:
# join back to filer
df_model = df_features.join(F.broadcast(df_wk_sample.select('week_end')), on=['week_end'], how='inner')

<b>Eligibility filter</b>: Customer should be active in last year w.r.t the reference date

In [10]:
# use sales_52w for elig. filter
df_model = df_model.where(F.col('sales_52w')>0)

<b>Removing latest 4 week_end dates</b>: As we have a look-forward period of 4 weeks, latest 4 week_end dates in the data cannot be used for our model as these do not have 4 weeks ahead of them for the y-variable.

In [11]:
# see latest week_end dates (in the dataframe prior to monthly sampling)
df_features.select('week_end').drop_duplicates().sort(F.col('week_end').desc()).show(5)

+----------+
|  week_end|
+----------+
|2020-12-05|
|2020-11-28|
|2020-11-21|
|2020-11-14|
|2020-11-07|
+----------+
only showing top 5 rows



In [12]:
# filter
df_model = df_model.where(F.col('week_end')<'2020-11-14')

In [13]:
# fillna
df_model = df_model.fillna(0)

### Train-Test Split

80-20 split

In [14]:
train, test = df_model.randomSplit([0.8, 0.2], seed=125)

In [15]:
train.columns

['week_end',
 'customer_id',
 'min_week',
 'max_week',
 'sales',
 'orders',
 'units',
 'cat_A',
 'cat_B',
 'cat_C',
 'cat_D',
 'cat_E',
 'pay_cash',
 'pay_credit_card',
 'pay_debit_card',
 'pay_gift_card',
 'pay_others',
 'purchase_flag',
 'purchase_flag_next_4w',
 'sales_next_4w',
 'sales_4w',
 'sales_13w',
 'sales_26w',
 'sales_52w',
 'orders_4w',
 'orders_13w',
 'orders_26w',
 'orders_52w',
 'units_4w',
 'units_13w',
 'units_26w',
 'units_52w',
 'cat_A_4w',
 'cat_A_13w',
 'cat_A_26w',
 'cat_A_52w',
 'cat_B_4w',
 'cat_B_13w',
 'cat_B_26w',
 'cat_B_52w',
 'cat_C_4w',
 'cat_C_13w',
 'cat_C_26w',
 'cat_C_52w',
 'cat_D_4w',
 'cat_D_13w',
 'cat_D_26w',
 'cat_D_52w',
 'cat_E_4w',
 'cat_E_13w',
 'cat_E_26w',
 'cat_E_52w',
 'pay_cash_4w',
 'pay_cash_13w',
 'pay_cash_26w',
 'pay_cash_52w',
 'pay_credit_card_4w',
 'pay_credit_card_13w',
 'pay_credit_card_26w',
 'pay_credit_card_52w',
 'pay_debit_card_4w',
 'pay_debit_card_13w',
 'pay_debit_card_26w',
 'pay_debit_card_52w',
 'pay_gift_card_4w',

## Classification

### Model Dataset Summary
Let's look at event rate for our dataset and also get a quick summary of all features.

The y-variable is balanced here because it is a dummy dataset. <mark>In most actual scenarios, this will not be balanced and the model build exercise will involving sampling for balancing.</mark>

In [16]:
train.groupBy('purchase_flag_next_4w').count().sort('purchase_flag_next_4w').show()

+---------------------+-----+
|purchase_flag_next_4w|count|
+---------------------+-----+
|                    0| 8328|
|                    1| 8312|
+---------------------+-----+



In [17]:
test.groupBy('purchase_flag_next_4w').count().sort('purchase_flag_next_4w').show()

+---------------------+-----+
|purchase_flag_next_4w|count|
+---------------------+-----+
|                    0| 2136|
|                    1| 2162|
+---------------------+-----+



### Pre-Processing pipeline
The below pipeline only does pre-processing and saves it to be used for scoring.

You can also add the model step to this to have a single pipeline instead of two that I have created. Though having two pipelines makes it easier to iterate through just the model step during training.

In [18]:
# list of features: remove identifier columns and the y-var
col_list = df_model.drop('week_end','customer_id','min_week','max_week','purchase_flag_next_4w','sales_next_4w').columns

stages = []
assembler = VectorAssembler(inputCols=col_list, outputCol='features')
stages.append(assembler)

pipe = Pipeline(stages=stages)
pipe_model = pipe.fit(train)

pipe_model.write().overwrite().save('./files/model_objects/xgb_clf_pipe/')

In [19]:
pipe_model = PipelineModel.load('./files/model_objects/xgb_clf_pipe/')

<b>Apply the transformation pipeline</b>

Also keep the identifier columns and y-var in the transformed dataframe.

<mark>We are keeping both the classification and regression y-vars here as we will be re-using the same processed dataset for the regression section.</mark>

In [20]:
train_pr = pipe_model.transform(train)
train_pr = train_pr.select('customer_id','week_end','purchase_flag_next_4w','sales_next_4w','features')
train_pr = train_pr.persist()
train_pr.count()

16640

In [21]:
test_pr = pipe_model.transform(test)
test_pr = test_pr.select('customer_id','week_end','purchase_flag_next_4w','sales_next_4w','features')
test_pr = test_pr.persist()
test_pr.count()

4298

### Model Training
We will train one iteration of XGBoost classification model as showcase.

In actual scenario, you will have to iterate through the training step multiple times for feature selection and model hyper parameter tuning to get a good final model.

In [22]:
train_pr.show(5)

+-----------+----------+---------------------+-------------+--------------------+
|customer_id|  week_end|purchase_flag_next_4w|sales_next_4w|            features|
+-----------+----------+---------------------+-------------+--------------------+
|         27|2019-01-26|                    1|          120|(102,[14,15,16,17...|
|         29|2019-01-26|                    0|            0|(102,[0,1,2,3,10,...|
|         40|2019-01-26|                    1|         1705|(102,[0,1,2,3,9,1...|
|         69|2019-01-26|                    1|         1100|(102,[14,15,16,17...|
|         70|2019-01-26|                    1|          400|(102,[0,1,2,3,10,...|
+-----------+----------+---------------------+-------------+--------------------+
only showing top 5 rows



In [23]:
# XGBoost model requires a PipeLine object for the save and load steps to work properly
stages = []

In [24]:
xgboost=XGBoostClassifier(
    featuresCol="features",
    labelCol="purchase_flag_next_4w",
    predictionCol="prediction",
    objective="binary:logistic", 
    evalMetric="logloss",
    maxDepth=15,
    missing=0.0,
    subsample=0.7,
    numRound=50,
    numWorkers=1)

TypeError: 'JavaPackage' object is not callable

In [None]:
stages.append(xgboost)
pipe = Pipeline(stages=stages)

start_time = time()
# model = xgboost.fit(train_pr)
model = pipe.fit(train_pr)
print('time elapsed: ', np.round(time()-start_time,2),'s',sep='')

# model.write().overwrite().save('s3://marketing-analytics-pal/dev/adhoc/vijay/xgb_classifier_09/')
model.write().overwrite().save('s3://marketing-analytics-pal/dev/adhoc/vijay/xgb_classifier_09/')

# model = XGBoostClassificationModel.load(path='s3://marketing-analytics-pal/dev/adhoc/vijay/xgb_classifier_09/')
model = PipelineModel.load(path='s3://marketing-analytics-pal/dev/adhoc/vijay/xgb_classifier_09/')

In [69]:
model_params = {
    'labelCol': 'purchase_flag_next_4w',
    'numTrees': 128, # default: 128
    'maxDepth': 12,  # default: 12
    'featuresCol': 'features',
    'minInstancesPerNode': 25,
    'maxBins': 128,
    'minInfoGain': 0.0,
    'subsamplingRate': 0.7,
    'featureSubsetStrategy': '0.3',
    'impurity': 'gini',
    'seed': 125,
    'cacheNodeIds': False,
    'maxMemoryInMB': 256
    }

clf = RandomForestClassifier(**model_params)

In [70]:
trained_clf = clf.fit(train_pr)

### Feature Importance
We will save feature importance as a csv.

In [71]:
# Feature importance
feature_importance_list = trained_clf.featureImportances
feature_list = pd.DataFrame(train_pr.schema['features'].metadata['ml_attr']['attrs']['numeric']).sort_values('idx')

feature_importance_list = pd.DataFrame(
    data=feature_importance_list.toArray(),
    columns=['relative_importance'],
    index=feature_list['name'])
feature_importance_list = feature_importance_list.sort_values('relative_importance', ascending=False)

feature_importance_list.to_csv('./files/rw_rf_feat_imp.csv')

### Predict on train and test

In [72]:
secondelement = F.udf(lambda v: float(v[1]), FloatType())

train_pred = trained_clf.transform(train_pr).withColumn('score',secondelement(F.col('probability')))
test_pred =  trained_clf.transform(test_pr).withColumn('score', secondelement(F.col('probability')))

In [73]:
test_pred.show(5)

+-----------+----------+---------------------+--------------------+--------------------+--------------------+----------+----------+
|customer_id|  week_end|purchase_flag_next_4w|            features|       rawPrediction|         probability|prediction|     score|
+-----------+----------+---------------------+--------------------+--------------------+--------------------+----------+----------+
|         15|2019-01-26|                    1|(102,[0,1,2,6,9,1...|[53.0784863119179...|[0.41467567431185...|       1.0|0.58532435|
|         27|2019-01-26|                    1|(102,[14,15,16,17...|[51.1393971441854...|[0.39952654018894...|       1.0|0.60047346|
|         28|2019-01-26|                    1|(102,[14,15,16,17...|[49.8725279055357...|[0.38962912426199...|       1.0| 0.6103709|
|        170|2019-01-26|                    0|(102,[0,1,2,7,10,...|[54.1789500442362...|[0.42327304722059...|       1.0|  0.576727|
|        192|2019-01-26|                    0|(102,[0,1,2,7,10,...|[57.53121

### Test Set Evaluation

In [74]:
evaluator = BinaryClassificationEvaluator(
        rawPredictionCol='rawPrediction',
        labelCol='purchase_flag_next_4w',
        metricName='areaUnderROC')

In [75]:
# areaUnderROC
evaluator.evaluate(train_pred)

0.8116811886255015

In [76]:
evaluator.evaluate(test_pred)

0.7412597272276923

In [77]:
# cm
test_pred.groupBy('purchase_flag_next_4w','prediction').count().sort('purchase_flag_next_4w','prediction').show()

+---------------------+----------+-----+
|purchase_flag_next_4w|prediction|count|
+---------------------+----------+-----+
|                    0|       0.0| 1655|
|                    0|       1.0|  450|
|                    1|       0.0|  937|
|                    1|       1.0| 1120|
+---------------------+----------+-----+



In [78]:
# accuracy
test_pred.where(F.col('purchase_flag_next_4w')==F.col('prediction')).count()/test_pred.count()

0.6667467563671312

### Save Model

In [79]:
trained_clf.write().overwrite().save('./files/model_objects/rw_rf_model/')

In [80]:
trained_clf = RandomForestClassificationModel.load('./files/model_objects/rw_rf_model/')

## Scoring
We will take the records for latest week_end from df_features and score it using our trained model.

In [81]:
df_features = spark.read.parquet('./data/rw_features/')

In [82]:
max_we = df_features.selectExpr('max(week_end)').collect()[0][0]
max_we

datetime.date(2020, 12, 5)

In [88]:
df_scoring = df_features.where(F.col('week_end')==max_we)

In [89]:
df_scoring.count()

1000

In [90]:
# fillna
df_scoring = df_scoring.fillna(0)

# transformation pipeline
pipe_model = PipelineModel.load('./files/model_objects/rw_pipe/')

# apply
df_scoring = pipe_model.transform(df_scoring)
df_scoring = df_scoring.select('customer_id','week_end','features')

# rf model
trained_clf = RandomForestClassificationModel.load('./files/model_objects/rw_rf_model/')

#apply
secondelement = F.udf(lambda v: float(v[1]), FloatType())

df_scoring = trained_clf.transform(df_scoring).withColumn('score',secondelement(F.col('probability')))

In [91]:
df_scoring.show(5)

+-----------+----------+--------------------+--------------------+--------------------+----------+----------+
|customer_id|  week_end|            features|       rawPrediction|         probability|prediction|     score|
+-----------+----------+--------------------+--------------------+--------------------+----------+----------+
|        148|2020-12-05|[1200.0,1.0,3.0,1...|[68.4381452599104...|[0.53467300984305...|       0.0|  0.465327|
|        787|2020-12-05|(102,[15,16,17,19...|[96.4135634068059...|[0.75323096411567...|       0.0|0.24676904|
|        906|2020-12-05|(102,[16,17,20,21...|[90.4284145792717...|[0.70647198890056...|       0.0|0.29352802|
|        182|2020-12-05|(102,[16,17,20,21...|[90.9264612678324...|[0.71036297865494...|       0.0|0.28963703|
|        442|2020-12-05|(102,[16,17,20,21...|[105.838704191258...|[0.82686487649421...|       0.0|0.17313512|
+-----------+----------+--------------------+--------------------+--------------------+----------+----------+
only showi

In [92]:
# save scored output
df_scoring.repartition(8).write.parquet('./data/rw_scored/', mode='overwrite')

## Regression