# Machine Learning Example with PySpark

Now that you have a brief idea of Spark and SQLContext, you are ready to build your first Machine learning program.

## Following are the steps to build a Machine Learning program with PySpark:

Step 1) Basic operation with PySpark

Step 2) Data preprocessing

Step 3) Build a data processing pipeline

Step 4) Build the classifier: logistic

Step 5) Train and evaluate the model

Step 6) Tune the hyperparameter

In this PySpark Machine Learning tutorial, we will use the adult dataset. The purpose of this tutorial is to learn how to use Pyspark. For more information about the dataset, refer to this tutorial.

 Note that, the dataset is not significant and you may think that the computation takes a long time. Spark is designed to process a considerable amount of data. Spark's performances increase relative to other machine learning libraries when the dataset processed grows larger.

In [39]:
import findspark
findspark.init()
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
#from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml.feature import OneHotEncoder

In [2]:
import pyspark
from pyspark import SparkContext
sc = SparkContext()

## Step 1) Basic operation with PySpark
First of all, you need to initialize the SQLContext is not already in initiated yet.

In [3]:
from pyspark.sql import SQLContext
url = "hr_employee.csv"

from pyspark import SparkFiles
sc.addFile(url)
sqlContext = SQLContext(sc)

then, you can read the cvs file with sqlContext.read.csv. You use inferSchema set to True to tell Spark to guess automatically the type of data. By default, it is turn to False.

In [4]:
df = sqlContext.read.csv(SparkFiles.get("hr_employee.csv"),header=True, inferSchema = True)

In [5]:
#Now let's take a view on inferSchema 

df.printSchema()

root
 |-- empid: integer (nullable = true)
 |-- satisfaction_level: double (nullable = true)
 |-- last_evaluation: double (nullable = true)
 |-- number_project: integer (nullable = true)
 |-- average_montly_hours: integer (nullable = true)
 |-- time_spend_company: integer (nullable = true)
 |-- Work_accident: integer (nullable = true)
 |-- promotion_last_5years: integer (nullable = true)
 |-- salary: string (nullable = true)
 |-- left: integer (nullable = true)



You can see the data with show.

In [6]:
df.show(5, truncate= False)

+-----+------------------+---------------+--------------+--------------------+------------------+-------------+---------------------+------+----+
|empid|satisfaction_level|last_evaluation|number_project|average_montly_hours|time_spend_company|Work_accident|promotion_last_5years|salary|left|
+-----+------------------+---------------+--------------+--------------------+------------------+-------------+---------------------+------+----+
|1    |0.38              |0.53           |2             |157                 |3                 |0            |0                    |low   |1   |
|2    |0.8               |0.86           |5             |262                 |6                 |0            |0                    |medium|1   |
|3    |0.11              |0.88           |7             |272                 |4                 |0            |0                    |medium|1   |
|4    |0.72              |0.87           |5             |223                 |5                 |0            |0            

If you didn't set inderShema to True, here is what is happening to the type. There are all in string.

In [7]:
df_string = sqlContext.read.csv(SparkFiles.get("hr_employee.csv"),header=True, inferSchema = False)
df_string.printSchema()

root
 |-- empid: string (nullable = true)
 |-- satisfaction_level: string (nullable = true)
 |-- last_evaluation: string (nullable = true)
 |-- number_project: string (nullable = true)
 |-- average_montly_hours: string (nullable = true)
 |-- time_spend_company: string (nullable = true)
 |-- Work_accident: string (nullable = true)
 |-- promotion_last_5years: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- left: string (nullable = true)



To convert the continuous variable in the right format, you can use recast the columns. You can use withColumn to tell Spark which column to operate the transformation.

In [8]:
#Import all from 'sql.types'
from pyspark.sql.types import *

#Write a custom function to convert the data type of Dataframe columns
def convertColumn(df, names, newType):
    for name in names:
        df = df.withColumn(name, df[name].cast(newType))
        return df

#list of continuous features
CONTI_FEATURES = ['empid','satisfaction_level','last_evaluation','number_project','average_montly_hours','time_spend_company','Work_accident','promotion_last_5years','salary','left']

#Covert the type
df_string = convertColumn(df_string, CONTI_FEATURES, FloatType())

#Check the dataset
df_string.printSchema()

root
 |-- empid: float (nullable = true)
 |-- satisfaction_level: string (nullable = true)
 |-- last_evaluation: string (nullable = true)
 |-- number_project: string (nullable = true)
 |-- average_montly_hours: string (nullable = true)
 |-- time_spend_company: string (nullable = true)
 |-- Work_accident: string (nullable = true)
 |-- promotion_last_5years: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- left: string (nullable = true)



In [9]:
from pyspark.ml.feature import StringIndexer
#stringIndexer = StringIndexer(inputCol="label", outputCol="newlabel")
#model = StringIndexer.fit(df)
#df = model.transform(df)
df.printSchema()

root
 |-- empid: integer (nullable = true)
 |-- satisfaction_level: double (nullable = true)
 |-- last_evaluation: double (nullable = true)
 |-- number_project: integer (nullable = true)
 |-- average_montly_hours: integer (nullable = true)
 |-- time_spend_company: integer (nullable = true)
 |-- Work_accident: integer (nullable = true)
 |-- promotion_last_5years: integer (nullable = true)
 |-- salary: string (nullable = true)
 |-- left: integer (nullable = true)



## Select columns

You can select and show the rows with select and the names of the features. 

Below, empid and satisfaction_level are selected.

In [10]:
df.select('empid','satisfaction_level').show(5)

+-----+------------------+
|empid|satisfaction_level|
+-----+------------------+
|    1|              0.38|
|    2|               0.8|
|    3|              0.11|
|    4|              0.72|
|    5|              0.37|
+-----+------------------+
only showing top 5 rows



## Count by group

If you want to count the number of occurence by group, you can chain:

1. groupBy()

2. count()

together. In the PySpark example below, you count the number of rows by the time_spend_company.

In [11]:
df.groupby("time_spend_company").count().sort("count",ascending=True).show()

+------------------+-----+
|time_spend_company|count|
+------------------+-----+
|                 8|  162|
|                 7|  188|
|                10|  214|
|                 6|  718|
|                 5| 1473|
|                 4| 2557|
|                 2| 3244|
|                 3| 6443|
+------------------+-----+



## Describe the data

To get a summary statistics, of the data, you can use describe(). It will compute the :

1. count

2. mean

3. standarddeviation

4. min

5. max

In [12]:
df.describe().show()

+-------+----------------+------------------+-------------------+------------------+--------------------+------------------+-------------------+---------------------+------+-------------------+
|summary|           empid|satisfaction_level|    last_evaluation|    number_project|average_montly_hours|time_spend_company|      Work_accident|promotion_last_5years|salary|               left|
+-------+----------------+------------------+-------------------+------------------+--------------------+------------------+-------------------+---------------------+------+-------------------+
|  count|           14999|             14997|              14999|             14999|               14999|             14999|              14999|                14999| 14999|              14999|
|   mean|          7500.0|0.6128625725145038| 0.7161017401159978|  3.80305353690246|   201.0503366891126| 3.498233215547703| 0.1446096406427095| 0.021268084538969265|  null| 0.2380825388359224|
| stddev|4329.98267894919|0.24

If you want the summary statistic of only one column, add the name of the column inside describe()

In [13]:
df.describe('number_project').show()

+-------+------------------+
|summary|    number_project|
+-------+------------------+
|  count|             14999|
|   mean|  3.80305353690246|
| stddev|1.2325923553183513|
|    min|                 2|
|    max|                 7|
+-------+------------------+



## Crosstab computation
In some occasion, it can be interesting to see the descriptive statistics between two pairwise columns. For instance, you can count the number of people with income below or above 50k by education level. This operation is called a crosstab.

In [14]:
#df.crosstab('salary','average_montly_hours').sort("salary_average_montly_hours").show()

## Drop column
There are two intuitive API to drop columns:

1. drop(): Drop a column
2. dropna(): Drop NA's

Below you drop the column

In [15]:
df.drop('empid').columns

['satisfaction_level',
 'last_evaluation',
 'number_project',
 'average_montly_hours',
 'time_spend_company',
 'Work_accident',
 'promotion_last_5years',
 'salary',
 'left']

## Filter data
You can use filter() to apply descriptive statistics in a subset of data. 

For instance, you can count the number of people number_project

In [16]:
df.filter(df.number_project < 40).count()

14999

## Descriptive statistics by group
Finally, you can group data by group and compute statistical operations like the mean.

In [17]:
df.groupby('last_evaluation').agg({'average_montly_hours': 'mean'}).show()

+---------------+-------------------------+
|last_evaluation|avg(average_montly_hours)|
+---------------+-------------------------+
|           0.66|       194.07207207207207|
|           0.84|       224.50680272108843|
|           0.87|       224.03987730061348|
|           0.93|       223.51301115241637|
|           0.89|       220.09459459459458|
|           0.39|        177.1153846153846|
|           0.79|        218.9377593360996|
|           0.72|       199.13270142180096|
|            0.7|        197.4131455399061|
|           0.54|       173.74857142857144|
|           0.45|       157.66086956521738|
|           0.61|       198.05555555555554|
|           0.99|       215.01162790697674|
|            1.0|       220.70671378091873|
|            0.6|       196.89592760180994|
|           0.64|       200.84255319148937|
|            0.8|        222.6374501992032|
|           0.63|       198.47457627118644|
|           0.92|        222.4089219330855|
|           0.42|       175.5178

## Step 2) Data preprocessing
Data processing is a critical step in machine learning. After you remove garbage data, you get some important insights.

In [18]:
df.printSchema()

root
 |-- empid: integer (nullable = true)
 |-- satisfaction_level: double (nullable = true)
 |-- last_evaluation: double (nullable = true)
 |-- number_project: integer (nullable = true)
 |-- average_montly_hours: integer (nullable = true)
 |-- time_spend_company: integer (nullable = true)
 |-- Work_accident: integer (nullable = true)
 |-- promotion_last_5years: integer (nullable = true)
 |-- salary: string (nullable = true)
 |-- left: integer (nullable = true)



### To view the column by row wise

In [19]:
# COLUMNS = ['','','',............columns according to your arrangement]
#df = df.select(COLUMNS)


df.first()

Row(empid=1, satisfaction_level=0.38, last_evaluation=0.53, number_project=2, average_montly_hours=157, time_spend_company=3, Work_accident=0, promotion_last_5years=0, salary='low', left=1)

## Step 3) Build a data processing pipeline
Similar to scikit-learn, Pyspark has a pipeline API.

A pipeline is very convenient to maintain the structure of the data. You push the data into the pipeline. Inside the pipeline, various operations are done, the output is used to feed the algorithm.

For instance, one universal transformation in machine learning consists of converting a string to one hot encoder, i.e., one column by a group. One hot encoder is usually a matrix full of zeroes.

The steps to transform the data are very similar to scikit-learn. You need to:

1. Index the string to numeric
2. Create the one hot encoder
3. Transform the data

Two APIs do the job: StringIndexer, OneHotEncoder



#### 1. First of all, you select the string column to index. The inputCol is the name of the column in the dataset. outputCol is the new name given to the transformed column.

In [20]:
StringIndexer(inputCol = "salary", outputCol="salary_encoded")

StringIndexer_8a17e52aaf52

### 2. fit the data and transform it

Importing the required libraries

In [21]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

In [23]:
#model = StringIndexer.fit(df)
#`indexers = model.transform(df)``

#### 3. Create the news columns based on the group. For instance, if there are 10 groups in the feature, the new matrix will have 10 columns, one for each group.

In [24]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

OneHotEncoder(dropLast =False, inputCol="salaryencoded", outputCol= "salaryvec")

OneHotEncoder_c3679b9e9732

## Build the pipeline
You will build a pipeline to convert all the precise features and add them to the final dataset. The pipeline will have four operations, but feel free to add as many operations as you want.

1. Encode the categorical data

2. Index the label feature

3. Add continuous variable

4. Assemble the steps.

Each step is stored in a list named stages. This list will tell the VectorAssembler what operation to perform inside the pipeline.

#### 1. Encode the categorical data

This step is exaclty the same as the above example, except that you loop over all the categorical features.

In [26]:
from pyspark.ml import pipeline
CATE_FEATURES = ['salary']
stages = [] #stages in our pipeline

for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol= categoricalCol, outputCol= categoricalCol + "Index")
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()],
                                    outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

#### 2. Index the label feature

Spark, like many other libraries, does not accept string values for the label. You convert the label feature with StringIndexer and add it to the list stages

In [28]:
# Convert label into label indices using StringIndexer

label_stringIdx = StringIndexer(inputCol ="label", outputCol = "newlabel")
stages += [label_stringIdx]

#### 3. Add continuous variable

The inputCols of the VectorAssembler is a list of columns. You can create a new list containing all the new columns. The code below popluate the list with encoded categorical features and the continuous features.

In [29]:
assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES

#### 4. Assemble the steps.

Finally, you pass all the steps in the VectorAssembler

In [37]:
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

Now that all the steps are ready, you push the data to the pipeline.

In [41]:
#Create a pipeline

#pipeline = Pipeline(stages = stages)
#pipelineModel = pipeline.fit(df)
#model = pipelineModel.transform(df)