<img src="python-spark-xgb.png">

I will use data from the [Titanic](https://www.kaggle.com/c/titanic/data): Machine learning from disaster one of the many Kaggle competitions.

### Step 1: Download or build the XGBoost jars
The python code will need two scala jars dependencies in order to work. I downloaded them directly from maven:

1. [xgboost4j](https://mvnrepository.com/artifact/ml.dmlc/xgboost4j/0.72)
2. [xgboost4j-spark](https://mvnrepository.com/artifact/ml.dmlc/xgboost4j-spark/0.72)




### Step 2: Add the custom XGBoost jars to the Spark app
Before starting Spark we need to add the jars we previously downloaded. We can do this using the --jars flag:

### Step 3: Integrate PySpark into the Jupyther notebook
Easiest way to make PySpark available is using the findspark package:


### Please see the running instruction for more detail of packages that needs to be installed on OSX with Anconda

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars xgboost4j-spark-0.72.jar,xgboost4j-0.72.jar pyspark-shell'

import findspark
findspark.init()

import pyspark
from pyspark.sql.session import SparkSession
from pyspark.sql.types import *
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import col


### Step 4: Start the spark session
We are now ready to start the spark session. We are creating a spark app that will run locally and will use as many threads as there are cores using local[*] :

In [2]:
spark = SparkSession\
        .builder\
        .appName("PySpark XGBOOST Titanic")\
        .getOrCreate()

### Step 5: Download the XGBoost python wrapper
You can download the PySpark XGBoost code from [here](https://github.com/dmlc/xgboost/files/2161553/sparkxgb.zip). This is the interface between the part that we will write and the XGBoost scala implementation. 

### Step 6: Add the PySpark XGBoost wrapper code
As we have now the spark session, we can add the wrapper code we dowloaded in step 5:

In [7]:
spark.sparkContext.addPyFile("../resource/sparkxgb.zip")

In [8]:
from sparkxgb import XGBoostEstimator

### Step 7: Defining a schema
Next we define a schema of the data we read from the csv. This is usually a better practice than letting spark to infer the schema because it consumes less resources and we have total control over the fields.

In [9]:
schema = StructType(
                    [StructField("PassengerId", DoubleType()),
                    StructField("Survival", DoubleType()),
                    StructField("Pclass", DoubleType()),
                    StructField("Name", StringType()),
                    StructField("Sex", StringType()),
                    StructField("Age", DoubleType()),
                    StructField("SibSp", DoubleType()),
                    StructField("Parch", DoubleType()),
                    StructField("Ticket", StringType()),
                    StructField("Fare", DoubleType()),
                    StructField("Cabin", StringType()),
                    StructField("Embarked", StringType())
                    ])

### Step 8: Read the csv data into a dataframe
We read the csv into a DataFrame, making sure we mention we have a header.

In [10]:
df_raw = spark\
    .read\
    .option("header", "true")\
    .schema(schema)\
    .csv("../resource/titanic_train.csv")

In [11]:
df_raw.show(2)

+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+
|PassengerId|Survival|Pclass|                Name|   Sex| Age|SibSp|Parch|   Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+
|        1.0|     0.0|   3.0|Braund, Mr. Owen ...|  male|22.0|  1.0|  0.0|A/5 21171|   7.25| null|       S|
|        2.0|     1.0|   1.0|Cumings, Mrs. Joh...|female|38.0|  1.0|  0.0| PC 17599|71.2833|  C85|       C|
+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+
only showing top 2 rows



### Step 9: we also replace null values with 0:

In [12]:
#fill Nan value with 0
df = df_raw.na.fill(0)

### Step 10: Convert the nominal values to numeric
ML pipelines, is a set of high level APIs build on top of the DataFrames which make it easier to combine multiple algorithms into a single process. The main elements of a pipeline are the **Transformer** and the **Estimator**. The first can represent an algorithm that can transform a DataFrame into another DataFrame, and the latter is an algorithm that can fit on a DataFrame to produce a Transformer .

In order to convert the nominal values into numeric ones we need to define aTransformer for each column:

Using the StringIndexer to transform the values. For each Transformer I'm defining the input column and the output column that will contain the modified value.

In [13]:
sexIndexer = StringIndexer()\
  .setInputCol("Sex")\
  .setOutputCol("SexIndex")\
  .setHandleInvalid("keep")
    
cabinIndexer = StringIndexer()\
  .setInputCol("Cabin")\
  .setOutputCol("CabinIndex")\
  .setHandleInvalid("keep")
    
embarkedIndexer = StringIndexer()\
  .setInputCol("Embarked")\
  .setOutputCol("EmbarkedIndex")\
  .setHandleInvalid("keep")

### Step 11: Assemble the columns into a feature vector
I will use another Transformer to assemble the columns used in the classification by the XGBoost Estimatorinto a vector:

In [14]:
vectorAssembler = VectorAssembler()\
                    .setInputCols(["Pclass", "SexIndex", "Age", "SibSp", "Parch", "Fare", "CabinIndex", "EmbarkedIndex"])\
                    .setOutputCol("features")

### Step 12: Defining the XGBoostEstimator
In this step I'm defining the Estimator that will produce the model. Most of the parameters used here are default:

We only define the feature, label (have to match out columns from the DataFrame ) and the new prediction column that contains the output of the classifier.

In [15]:
xgboost = XGBoostEstimator(
    featuresCol="features", 
    labelCol="Survival", 
    predictionCol="prediction"
)

### Step 13: Building the pipeline and the classifier
I can define the actual pipeline and the order of the operations:
The input DataFrame will be transformed multiple times and in the end will produce the model trained with our data.

In [16]:
pipeline = Pipeline().setStages([sexIndexer, cabinIndexer, embarkedIndexer, vectorAssembler, xgboost])

### Step 14: Train the model and predict on new test data
I first split the data into train and test, then I fit the model with the train data and finally I see what predictions I have obtained for each passenger:



In [17]:
trainDF, testDF = df.randomSplit([0.8, 0.2], seed=24)
model = pipeline.fit(trainDF)
model.transform(testDF).select(col("PassengerId"), col("prediction")).show()

+-----------+----------+
|PassengerId|prediction|
+-----------+----------+
|        1.0|       0.0|
|        4.0|       1.0|
|       14.0|       0.0|
|       15.0|       1.0|
|       20.0|       1.0|
|       28.0|       1.0|
|       34.0|       0.0|
|       38.0|       0.0|
|       50.0|       1.0|
|       52.0|       0.0|
|       59.0|       1.0|
|       60.0|       0.0|
|       82.0|       0.0|
|       94.0|       0.0|
|       96.0|       0.0|
|       99.0|       1.0|
|      104.0|       0.0|
|      105.0|       0.0|
|      107.0|       1.0|
|      116.0|       0.0|
+-----------+----------+
only showing top 20 rows

