# Getting started with Spark and Machine Learning
---

In this tutorial we will go from scratch through the whole process for using machine learning with Spark. We will use Python (PySpark) as language for our notebook. We will adopt an iterative approach for the process, trying to improve our analysis and knowledge on each iteration. 

Some of the activities we will see are:
* Get the data
* Prepare the data for Spark ML
* Visualize the data
* Data cleaning
* Feature engineering
* Train models using several ML algorithms
* Evaluate models
* Send the results to Kaggle (this is optional)

For this example, we are going to use the datasets from the ATLAS Higgs Boson Machine Learning Challenge. This challenge was published by the ATLAS experiment in May 2014 in the Kaggle platform. The challenge closed in September 2014 but we can still use the data for learning purposes. We can also submit our predictions to Kaggle to see how good they are.


## ATLAS Higgs Boson Machine Learning Challenge


The goal of the Higgs Boson Machine Learning Challenge was to explore the potential of advanced machine learning methods to improve the discovery significance of the experiment. Using simulated data with features characterizing events detected by ATLAS, your task is to classify events where there is a Higgs boson decay into two tau particles (known as "signal") versus events where this decay is not present (known as "background"). 

![Higgs Challenge](https://kaggle2.blob.core.windows.net/competitions/kaggle/3887/media/ATLASEXP_image.png)
link: https://www.kaggle.com/c/higgs-boson

### Dataset description

For this challenge we are provided with the following files:

* training.csv - Training set of 250000 events, with an ID column, 30 feature columns, a weight column and a label column.
* test.csv - Test set of 550000 events with an ID column and 30 feature columns
* random_submission - Sample submission file in the correct format. File format is described on https://www.kaggle.com/c/higgs-boson/details/evaluation
* HiggsBosonCompetition_AMSMetric - Python script to calculate the competition evaluation metric.

Some additional details about the dataset:

* All variables are floating point, except PRI_jet_num which is integer
* Variables prefixed with PRI (for PRImitives) are “raw” quantities about the bunch collision as measured by the detector
* Variables prefixed with DER (for DERived) are quantities computed from the primitive features, which were selected by the physicists of ATLAS
* In some observations, some features are meaningless or cannot be computed. In these cases their value is −999.0, which is outside the normal range of all variables

## Environment setup and resources

There are several resources that are useful or required for this tutorial. 

The SWAN service provides on-demand notebooks already prepared to use with Spark. For this tutorial, you can get access to the SWAN notebooks subscribing to the CERN e-group "hadoop-tutorials-2016-s1". You also need a CERNBox account to use the SWAN notebooks. 
*NOTE*: It is assumed that you will be using SWAN notebooks in this tutorial.

If you want to send the results for the evaluation in Kaggle you need first to sign up. This is optional for the tutorial.

In the following list you can find the links for the mentioned services:
* SWAN: https://swan.cern.ch/
* CERNBox: https://cernbox.cern.ch/
* CERN e-groups: https://e-groups.cern.ch/
* Github: https://github.com/cerndb/hadoop-tutorials
* Gitlab: https://gitlab.cern.ch/db/hadoop-tutorials-2016
* Kaggle: https://www.kaggle.com/

The following links contain useful documentation and reference for this tutorial:
* Spark MLlib documentation: http://spark.apache.org/docs/1.6.1/mllib-guide.html
* PySpark reference: http://spark.apache.org/docs/1.6.1/api/python/index.html

<style>
p.info
{
     color: #616E14;
     border: solid 1px #DDC82D;
     background-color: #FCF8D1;
     -moz-border-radius: 6px;
     -webkit-border-radius: 6px;
     border-radius: 6px;
     padding: 14px 20px;
     mc-auto-number-format: '{b}Note: {/b}';
}
</style>

---

#  Using PySpark for Machine Learning



## Get the data

First, we need to get the data for our analysis. The data can be downloaded from the Kaggle platform but for simplicity it is also available as a zip file shared in CERNBox.


<p class=info><b>Info:</b> If the file is external we can use python for example to download it:</p>

```python
import urllib
opener = urllib.URLopener(urlToMyFile)
opener.retrieve(url, "/tmp/")
```

<br>
Let's create our working directory, copy the zip file and extract in your working directory:

In [None]:
import os

# declare few convinient variables
home = os.environ['HOME']+"/"
wd = home+"spark-tutorial-ml/"
zipFilename = "kaggle-higgs.zip"
remoteFilename = "/eos/user/a/aromerom/Public/kaggle-higgs/kaggle-higgs.zip"
seed = 12345L

# configure pandas options
import pandas as pd
pd.options.display.max_columns = 35
pd.options.display.max_colwidth = 35

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

In [None]:
# create the working directory and change dir
%cd $home
%rm -rf $wd # uncomment this line if you want to remove everything in your wd
%mkdir -p $wd
%cd $wd

# copy the data into your working directory
%cp $remoteFilename $wd
%ll $wd

In [None]:
import zipfile

# Extract the file
zip_ref = zipfile.ZipFile(file=wd+zipFilename, mode='r')
zip_ref.extractall(wd)
zip_ref.close()
%ll $wd

Now that we have all the data we can start working with Spark.

## Read datasets and prepare them for Spark ML

The first thing we have to do to use Spark is to create the SparkContext. We also need to create the SQLContext because we are going to work with DataFrames.
Once the context is created, we can read the train and test datasets.

<p class="info"><b>Note:</b> We have added the package "spark-csv" before creating the SparkContext. This package provides utils to create DataFrames from csv files and save them back in csv format. If spark-csv package is not available you can always read the files using the traditional textFile function and create a DataFrame from the RDD:
</p>
```python

trainRDD = sc.textFile(datapath+"training.csv").map(lambda line: line.split(","))
train = sqlContext.createDataFrame(trainRDD)
```


In [None]:
import os
from pyspark import SparkContext
from pyspark.sql import SQLContext

# add spark-csv packages
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-csv_2.10:1.4.0 pyspark-shell'

# create spark and sql context
sc = SparkContext()
sqlContext = SQLContext(sc)

If the Spark contexts have been properly create we should be able now to read our datasets. We will use the utils available in the `com.databricks.spark.csv` package

In [None]:
# read train and test datasets
train = sqlContext.read.format('com.databricks.spark.csv')\
        .options(header='true', inferSchema='true').load(wd+"training.csv")
    
test = sqlContext.read.format('com.databricks.spark.csv')\
        .options(header='true', inferSchema='true').load(wd+"test.csv")

It is a good idea to check that the data was correctly parsed into DataFrame. We can check the data types using `printSchema()`. We can print the content of a Spark DataFrame using `show()`.

<p class="info"><b>Note:</b> The output of some Spark fuctions like `show()` might return plain text that can be difficult to read depending on the number of columns of your Dataframe. In those cases, you can always convert to Pandas using `toPandas()` which has better output features for Jupyter notebooks.
</p>

In [None]:
train.limit(1).show() # This will show the content in plain text
train.limit(5).toPandas() # this shows the content in a nice table

In [None]:
train.printSchema()

We have to shape our dataset before we can use some Spark ML algorithms with it. First, we need to create a new column containing a Vector with all the features. That vector will be the input features used by the ML for the model training. 

We can do it using the VectorAssembler class:

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer, IndexToString

# get the list of features
featureList = train.columns
featureList.remove('Label')
featureList.remove('EventId')
featureList.remove('Weight')

# the assembler will create a single column with vector of features
assembler = VectorAssembler(inputCols=featureList, outputCol="features")

In [None]:
assembler.transform(train.limit(5)).toPandas()

We also need to convert our labels (that is "s" for signal" and "b" for background") into indexes.

In [None]:
# this converts the label into an index, ncessary for the machine learning
labelIndexer = StringIndexer(inputCol="Label", outputCol="indexedLabel").fit(train)

labelConverter = IndexToString(inputCol="prediction", 
                               outputCol="predictedLabel",
                               labels=labelIndexer.labels)

We are now ready to apply some ML algorithm

## Decision Trees

Decision Trees are a commonly used supervised machine learning algorithm used for classification and regression. The goal using decistion trees is to create a model that predicts the value of a target variable by learning simple decision rules inferred from the data features.

In the following image we can see an example of a decisition tree model showing the survival of passengers on the Titanic dataset ("sibsp" is the number of spouses or siblings aboard). The figures under the leaves show the probability of survival and the percentage of observations in the leaf.
![](https://upload.wikimedia.org/wikipedia/commons/f/f3/CART_tree_titanic_survivors.png)

The `spark.ml` implementation supports decision trees for binary and multiclass classification and for regression, using both continuous and categorical features. The implementation partitions data by rows, allowing distributed training with millions or even billions of instances.

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier

dtClassifier = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="features")

In machine learning, it is common to run a sequence of actions to process the data and train your models. Spark MLlib represents such a workflow as a Pipeline, which consists of a sequence of actions to be run in a specific order.

In our case, we are going to create a Pipeline to run assembler->labelIndexer->decisiontree classifier.

In [None]:
from pyspark.ml import Pipeline

# use pipeline as a container for the actions
pipeline = Pipeline(stages=[assembler, labelIndexer, dtClassifier, labelConverter])

# Fit the pipeline to training documents.
dtModel = pipeline.fit(train)

dtPrediction = dtModel.transform(test)


We can have a look at the predictions made by our model

In [None]:
dtPrediction.limit(5).select('EventId','features','rawPrediction','probability','prediction', 'predictedLabel')\
            .toPandas()

Now we can save our predictions in the format expected by Kaggle. We will create a function so it can be reused later.

In [None]:
# save a csv file with the format needed for submitting to Kaggle
def prepareForKaggle(dataset, path, overwrite=False):
    if overwrite:
        %rm -rf $path
    resultsFormatted = dataset.withColumn('RankOrder', dataset.EventId-349999)\
                              .select('EventId','RankOrder','predictedLabel')
    resultsFormatted = resultsFormatted.withColumnRenamed('predictedLabel','Class')
    resultsFormatted.coalesce(1).write.format("com.databricks.spark.csv")\
                    .option("header", "true").option("codec","org.apache.hadoop.io.compress.GzipCodec").save(path)


In [None]:
prepareForKaggle(dtPrediction, wd+"results-dt-1")

If you have an account in Kaggle you can upload the results and wait for the evaluation.

Question: What is the default "maxDepth" parameter in the DecisionTreeClassifier? Does the result improve putting a higher value?

# Data cleaning

Our first ML model was generated with the raw training dataset. Nevertheless, in many (if not all) cases it is necessary a dedicated process to clean the data and ensure the quality of it. This process varies a lot depending on the dataset.

In our example, we know that null values are represented by the value -999.0. Using the `com.databricks.spark.csv` package, we can specify which value in the dataset correspond to the null/missing value using the parameter `nullValue`. 

In [None]:
train = sqlContext.read.format('com.databricks.spark.csv')\
        .options(header='true', inferSchema='true', nullValue=-999.0).load(wd+"training.csv")
    
test = sqlContext.read.format('com.databricks.spark.csv')\
        .options(header='true', inferSchema='true', nullValue=-999.0).load(wd+"test.csv")

We can fill those null values using `na.fill`. Let's have a look at the different values in the dataset statistics

In [None]:
train.describe().toPandas()
train.na.fill(-999.0).describe().toPandas()

Let's check how many rows contain null values in our datasets

In [None]:
train.na.drop().count()
test.na.drop().count()

In Pandas we can also check the null values easily

In [None]:
trainpd = train.drop('EventId').drop('Weigth').toPandas() # remove unnecessary columns
testpd = test.toPandas()

pd.isnull(trainpd).sum()
pd.isnull(testpd).sum()

We also know some features are raw (PRI) and others are calculated for expert (DER) so we can check how good they are in comparison for the prediction model

In [None]:
featuresDER = ['DER_mass_MMC', 'DER_mass_transverse_met_lep', 'DER_mass_vis', 'DER_pt_h', 'DER_deltaeta_jet_jet', 
              'DER_mass_jet_jet', 'DER_prodeta_jet_jet', 'DER_deltar_tau_lep', 'DER_pt_tot', 'DER_sum_pt',
              'DER_pt_ratio_lep_tau', 'DER_met_phi_centrality', 'DER_lep_eta_centrality']

featuresPRI = ['PRI_tau_pt', 'PRI_tau_eta', 'PRI_tau_phi', 'PRI_lep_pt', 'PRI_lep_eta', 'PRI_lep_phi', 'PRI_met',
              'PRI_met_phi', 'PRI_met_sumet', 'PRI_jet_num', 'PRI_jet_leading_pt', 'PRI_jet_leading_eta',
              'PRI_jet_leading_phi', 'PRI_jet_subleading_pt', 'PRI_jet_subleading_eta', 'PRI_jet_subleading_phi',
              'PRI_jet_all_pt']

assemblerDER = VectorAssembler(inputCols=featuresDER, outputCol="features")
assemblerPRI = VectorAssembler(inputCols=featuresPRI, outputCol="features")

pipelineDER = Pipeline(stages=[assemblerDER, labelIndexer, dtClassifier, labelConverter])
pipelinePRI = Pipeline(stages=[assemblerPRI, labelIndexer, dtClassifier, labelConverter])

dtModelDER = pipelineDER.fit(train.na.fill(0))
dtModelPRI = pipelinePRI.fit(train.na.fill(0))

dtPredictionDER = dtModelDER.transform(test.na.fill(0))
dtPredictionPRI = dtModelPRI.transform(test.na.fill(0))

prepareForKaggle(dtPredictionDER, wd+"results-dt-DER", True) 
prepareForKaggle(dtPredictionPRI, wd+"results-dt-PRI", True)

In [None]:
dtPrediction.limit(5).toPandas()

In [None]:
dtPrediction.limit(5).toPandas()

# Visualization

Visualizing the data it is an important method to get a better understanding of the dataset properties. Some characteristics that can be spotted quickly in a plot might be very difficult to see looking at the raw values. Python has a good number of libraries for data visualization. When working with large datasets in Spark, it can be very useful to sample the data and draw some plots using Python libs.

Since our dataset it is not very big, we can just take the whole dataset for the plots. We are going to use plotly library to see how we can plot few things about our dataset.

NOTE: We need to install a couple of python libs. Open a SWAN terminal and execute the following commands:
`pip install --user plotly`
`pip install --user cufflinks`

In [None]:
%matplotlib notebook
import matplotlib.pyplot as plt
import numpy as np
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
import plotly.graph_objs as go
import cufflinks as cf

# connect to the notebook
cf.set_config_file(offline=True)
init_notebook_mode(connected=True)

# define the pandas datasets
trainpd = train.drop('EventId').drop('Weigth').toPandas() # remove unnecessary columns

In [None]:
trainpd.DER_mass_transverse_met_lep.iplot(kind='histogram')


In [None]:
trainpd.iplot(kind='box')

One of the things we would like to check is the correlation of the features. Correlated features are usually good candidates to be discarded to keep only the "independent" features (likely will be better for the model training).

Remember that "correlation does not imply causation". Two events can consistently correlate with each other but not have any causal relationship. An example is the relationship between reading ability and shoe size across the whole population of a country. In that example, larger shoe sizes correlate with better reading ability, but this is caused by the fact that young children have small feet and have not yet (or only recently) been taught to read.

The corration can calculated directly in Spark using `stat.corr('column1','column2')`. Spark provides other statistical functions for dataframes like the covariance `stat.cov`, cross tabulation `stat.crosstab` or frequent items `stat.freqItems`.

In [None]:
trainpd.corr()

In [None]:
trainpd.corr().iplot(kind='heatmap', colorscale='spectral')

# Cross-Validation

* K-fold cross-validation is to repeat the construction of the model on different subsets of the available training data and then evaluate the model only on data not seen during construction.
* The process can be summarised as follow:

   
   * 1 - *The training data is randomly divided into k groups, or folds, of approximately equal size* 
 
   * 2 - *While (interations < k)*
      * 2.1 - *Set fold k as a validation test*
      * 2.1 - *Train the model using the remaining k − 1 folds* 
      * 2.3 - *Calculate the mean squared error on the fold k, MSE(k)*
      * 2.4 - *Increment iterations* 
      * 2.5 - *Back to (2)*
   
   * 3 - *The k-fold CV estimate is the average of [MSE(1),MSE(2),..,MSE(k)]*
   <br />
   <br />
   <img src="http://genome.tugraz.at/proclassify/help/pages/images/xv_folds.gif">
   <br /><br />

# Model Tuning

* Most of the models you will using in Machine Learning have several parameters and in most of the cases there is no analytics formula to calculate an appropiate value.
* Many of these paremeters control the complexity of the model, poor choices results in critial problems such as bad performance, over-fitting, etc. 
<br />
<br />
<img src="ModelTuning.png">

Now let see how we can apply both concepts using Spark.ml

In [None]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# fill null values
train = train.na.fill(0)
test = test.na.fill(0)

#Set Model
dtClassifierCV = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="features")

#Setting the pipeline
pipelineCV = Pipeline(stages=[assembler, labelIndexer, dtClassifierCV])


#Prepare de evaluator and CV configuration
evaluatorCV = BinaryClassificationEvaluator(labelCol="indexedLabel")
paramGrid = ParamGridBuilder().addGrid(dtClassifierCV.maxBins, [5,10]).build()
crossValidator = CrossValidator(estimator=pipelineCV, estimatorParamMaps=paramGrid, evaluator=evaluatorCV, numFolds=3)

cvModel = crossValidator.fit(train)

#Printing information about the best model
dtBestModelCV = cvModel.bestModel.stages[2]

print dtBestModelCV

In [None]:
#Predicting using the best model
dtPredictionCV = cvModel.transform(test)

In [None]:
#Getting the data in Kaggle
prepareForKaggle(dtPredictionCV, wd+"results-dt-CV-03")
#Result 2.41491

# Ensembles

* Ensemble methods are techniques that combines multiple models to generate the final output.
* Usually ensembles model clasification or regresion obtain better accurary than single models
    * They are compable to model the different pontential behavoius within the original dataset
* Esemble have been used since early 90s but in during the last years when they have gained popularity due to the results obtained in different machine learning competitions 
    * Netflix Challenge
    * Kaggle
   
Some of the most common ways to generate ensembles:

### Voting and Averaging
* The outputs of the different model are combined using one of the following techniques
    * Voting (classification)
    * Weighted Voting (classification)
    * Averaging (Regression)
    * Weighted Averaging (Regression)

### Stacking Models
* Another machine learning method is use to calculate the final output (models combination)
* The output of the members of the ensemble are used as input for the model that will calculate the final output

### Bagging (bootstrap aggregation)
* The method is fairly simple in structure and consists of the steps summarized in the following picture, which illustrates the bagging approach on a small sample containing n = 3 observations. Each bootstrap data set contains n observations, sampled with replacement from the original data set. Each bootstrap data set is used to obtain an estimate of α.
<img src="Bagging.png">
* Then each model in the ensemble generates a prediction (α) which are averaged to give the bagged model’s prediction.

### Boosting
* Boosting works in a similar way than bagging, but in this case the models are grown sequentially: each model is trained using information from previous trained versions. 
* Boosting does not involve bootstrap sampling; instead each model is fit on a modified version of the original data set, which emphasize those samples that were misclassified.
* Boosting tends to get better accuracy that bagging based models but also increase the risk of over-fitting
* Let explain that with a simple example:
  *  Suppose we have just 5 samples [1,2,3,4,5]
  *  Initially each sample has a probablity of 1/5 to be sampled
  *  After the 1st round [1,2,3] are well classified while [4,5] are misclassified
      * Then the algorithm will modify the samples probability
          * Decreasing it for [1,2,3]
          * Incresing it for [4,5]


In [None]:
from pyspark.ml.classification import RandomForestClassifier

#Set Model
rfClassifierCV = RandomForestClassifier(labelCol="indexedLabel", featuresCol="features")

#Setting the pipeline
rfPipelineCV = Pipeline(stages=[assembler, labelIndexer, rfClassifierCV])


#Prepare de evaluator and CV configuration
evaluatorCV = BinaryClassificationEvaluator(labelCol="indexedLabel")
paramGrid = ParamGridBuilder().addGrid(rfClassifierCV.numTrees, [5,10,15]).build()
crossValidator = CrossValidator(estimator=rfPipelineCV, estimatorParamMaps=paramGrid, evaluator=evaluatorCV, numFolds=3)

cvModel = crossValidator.fit(train)

In [None]:
#Predicting using the best model
rfPredictionCV = cvModel.transform(test)

#Getting the data in Kaggle
prepareForKaggle(rfPredictionCV, wd+"results-rf-CV-01")
#2.65322