In [34]:
sc

# ML Package

We move to the ML part of Spark that operates
strictly on DataFrames. Also, according to the Spark documentation, the primary
machine learning API for Spark is now the DataFrame-based set of models contained
in the spark.ml package.

## Transformer

The Transformer class, like the name suggests, transforms your data by (normally)
appending a new column to your DataFrame.

There are many Transformers offered in the spark.ml.feature and we will briefly
describe them here (before we use some of them later in this chapter):

- Binarizer: Given a threshold, the method takes a continuous variable and transforms it into a binary one.
- Bucketizer: Similar to the Binarizer, this method takes a list of thresholds (the splits parameter) and transforms a continuous variable into a multinomial one.
- MinMaxScaler: This is similar to the MaxAbsScaler with the difference that it scales the data to be in the [0.0, 1.0] range.
- Normalizer: This method scales the data to be of unit norm using the p-norm value (by default, it is L2).
- OneHotEncoder: This method encodes a categorical column to a column of binary vectors.
- PCA: Performs the data reduction using principal component analysis.
- StandardScaler: Standardizes the column to have a 0 mean and standard deviation equal to 1.
- StopWordsRemover: Removes stop words (such as 'the' or 'a') from a tokenized text.

## Estimators

Estimators can be thought of as statistical models that need to be estimated to make
predictions or classify your observations. Estimators can be divided into three distinct groups of problems: Classification (Supervised), Regression (Supervised) and Clustering (Unsupervised).

## Pipeline

A Pipeline in PySpark ML is a concept of an end-to-end transformation-estimation
process (with distinct stages) that ingests some raw data (in a DataFrame form),
performs the necessary data carpentry (transformations), and finally estimates a
statistical model (estimator).

## Predicting the chances of infant survival with ML package

In [35]:
import pyspark.sql.types as typ

In [36]:
birthsFilePath = './data/births_transformed.csv.gz'

In [37]:
births = spark.read.csv(birthsFilePath,
                       header = True,
                       inferSchema = True)

In [38]:
births.printSchema()

root
 |-- INFANT_ALIVE_AT_REPORT: integer (nullable = true)
 |-- BIRTH_PLACE: integer (nullable = true)
 |-- MOTHER_AGE_YEARS: integer (nullable = true)
 |-- FATHER_COMBINED_AGE: integer (nullable = true)
 |-- CIG_BEFORE: integer (nullable = true)
 |-- CIG_1_TRI: integer (nullable = true)
 |-- CIG_2_TRI: integer (nullable = true)
 |-- CIG_3_TRI: integer (nullable = true)
 |-- MOTHER_HEIGHT_IN: integer (nullable = true)
 |-- MOTHER_PRE_WEIGHT: integer (nullable = true)
 |-- MOTHER_DELIVERY_WEIGHT: integer (nullable = true)
 |-- MOTHER_WEIGHT_GAIN: integer (nullable = true)
 |-- DIABETES_PRE: integer (nullable = true)
 |-- DIABETES_GEST: integer (nullable = true)
 |-- HYP_TENS_PRE: integer (nullable = true)
 |-- HYP_TENS_GEST: integer (nullable = true)
 |-- PREV_BIRTH_PRETERM: integer (nullable = true)



- Creating transformers:

In [39]:
import pyspark.ml.feature as ft

Categorize <i>BIRTH_PLACE</i> numeric variable:

In [40]:
encoder = ft.OneHotEncoder(inputCol = 'BIRTH_PLACE',
                          outputCol = 'BIRTH_PLACE_VEC')

Create a single column with all the features collated together:

In [41]:
featuresCreator = ft.VectorAssembler(
    inputCols = births.columns[2:] +\
    [encoder.getOutputCol()],
    outputCol = 'features'
)

- Creating an estimator:

In [42]:
import pyspark.ml.classification as cl

A simple Logistic Regression model:

In [43]:
logistic = cl.LogisticRegression(featuresCol = 'features',
            maxIter = 10,
            regParam = 0.01,
            labelCol = 'INFANT_ALIVE_AT_REPORT'
)

- Creating a Pipeline:

In [44]:
from pyspark.ml import Pipeline

Here is how the pipeline should look like conceptually:

<img src="imgs/pipeline.png">

In [45]:
pipeline = Pipeline(
    stages = [encoder,
             featuresCreator,
             logistic]
)

### Fitting the model

Split the data into training and test sets:

In [46]:
births_train, births_test = births.randomSplit(
    [0.7, 0.3], 42)

In [47]:
model = pipeline.fit(births_train)
test_model = model.transform(births_test)

In [48]:
test_model.show(3)

+----------------------+-----------+----------------+-------------------+----------+---------+---------+---------+----------------+-----------------+----------------------+------------------+------------+-------------+------------+-------------+------------------+---------------+--------------------+--------------------+--------------------+----------+
|INFANT_ALIVE_AT_REPORT|BIRTH_PLACE|MOTHER_AGE_YEARS|FATHER_COMBINED_AGE|CIG_BEFORE|CIG_1_TRI|CIG_2_TRI|CIG_3_TRI|MOTHER_HEIGHT_IN|MOTHER_PRE_WEIGHT|MOTHER_DELIVERY_WEIGHT|MOTHER_WEIGHT_GAIN|DIABETES_PRE|DIABETES_GEST|HYP_TENS_PRE|HYP_TENS_GEST|PREV_BIRTH_PRETERM|BIRTH_PLACE_VEC|            features|       rawPrediction|         probability|prediction|
+----------------------+-----------+----------------+-------------------+----------+---------+---------+---------+----------------+-----------------+----------------------+------------------+------------+-------------+------------+-------------+------------------+---------------+----------

### Evaluating the performance of the model

In [49]:
import pyspark.ml.evaluation as ev

In [50]:
evaluator = ev.BinaryClassificationEvaluator(
    rawPredictionCol = 'probability',
    labelCol = 'INFANT_ALIVE_AT_REPORT'
)

In [51]:
print(evaluator.evaluate(test_model,
                         {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(test_model,
                         {evaluator.metricName: 'areaUnderPR'}))

0.7426157878507577
0.7141632790662186


### Saving the Pipeline

PySpark allows you to save the Pipeline definition for later use. It not only saves
the pipeline structure, but also all the definitions of all the Transformers and
Estimators:

In [52]:
pipelinePath = './data/infant_oneHotEncoder_Logistic_Pipeline'
pipeline.write().overwrite().save(pipelinePath)

Reading pipeline:

In [53]:
loadedPipeline = Pipeline.load(pipelinePath)

In [54]:
loadedPipeline \
 .fit(births_train)\
 .transform(births_test)\
 .take(1)

[Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE=1, MOTHER_AGE_YEARS=12, FATHER_COMBINED_AGE=99, CIG_BEFORE=0, CIG_1_TRI=0, CIG_2_TRI=0, CIG_3_TRI=0, MOTHER_HEIGHT_IN=62, MOTHER_PRE_WEIGHT=145, MOTHER_DELIVERY_WEIGHT=152, MOTHER_WEIGHT_GAIN=7, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0, BIRTH_PLACE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 12.0, 1: 99.0, 6: 62.0, 7: 145.0, 8: 152.0, 9: 7.0, 16: 1.0}), rawPrediction=DenseVector([0.9489, -0.9489]), probability=DenseVector([0.7209, 0.2791]), prediction=0.0)]

### Saving the Model

If you, however, want to save the estimated model, you can also do that; instead of
saving the Pipeline, you need to save the PipelineModel.

In [55]:
from pyspark.ml import PipelineModel

In [56]:
modelPath = './data/infant_oneHotEncoder_Logistic_PipelineModel'
model.write().overwrite().save(modelPath)

Reading the model:

In [57]:
loadedPipelineModel = PipelineModel.load(modelPath)

In [58]:
loadedPipelineModel\
    .transform(births_test).take(1)

[Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE=1, MOTHER_AGE_YEARS=12, FATHER_COMBINED_AGE=99, CIG_BEFORE=0, CIG_1_TRI=0, CIG_2_TRI=0, CIG_3_TRI=0, MOTHER_HEIGHT_IN=62, MOTHER_PRE_WEIGHT=145, MOTHER_DELIVERY_WEIGHT=152, MOTHER_WEIGHT_GAIN=7, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0, BIRTH_PLACE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 12.0, 1: 99.0, 6: 62.0, 7: 145.0, 8: 152.0, 9: 7.0, 16: 1.0}), rawPrediction=DenseVector([0.9489, -0.9489]), probability=DenseVector([0.7209, 0.2791]), prediction=0.0)]

## Parameter hyper-tuning

A concept of parameter hyper-tuning is to find the best parameters of the model: for
example, the maximum number of iterations needed to properly estimate the logistic
regression model or maximum depth of a decision tree.

- Grid search:

Grid search is an exhaustive algorithm that loops through the list of defined
parameter values, estimates separate models, and chooses the best one given
some evaluation metric.

In [59]:
import pyspark.ml.tuning as tune

In [60]:
logistic = cl.LogisticRegression(
    labelCol = 'INFANT_ALIVE_AT_REPORT')
grid = tune.ParamGridBuilder()\
    .addGrid(logistic.maxIter, [2, 10, 50])\
    .addGrid(logistic.regParam, [0.01, 0.05, 0.3])\
    .build()

In [61]:
evaluator = ev.BinaryClassificationEvaluator(
    rawPredictionCol = 'probability',
    labelCol = 'INFANT_ALIVE_AT_REPORT')

The CrossValidator needs the estimator, the estimatorParamMaps, and the
evaluator to do its job. The model loops through the grid of values, estimates
the models, and compares their performance using the evaluator

In [62]:
cv = tune.CrossValidator(
    estimator = logistic,
    estimatorParamMaps = grid,
    evaluator = evaluator
)

BIRTHS_PLACE still not encoded:

In [63]:
pipeline = Pipeline(stages = [encoder, featuresCreator])
data_transformer = pipeline.fit(births_train)

In [64]:
cvModel = cv.fit(data_transformer.transform(births_test))

We can now use it to see if it
performed better than our previous model:

In [65]:
data_train = data_transformer\
    .transform(births_test)
results = cvModel.transform(data_train)
print(evaluator.evaluate(results,
    {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(results,
    {evaluator.metricName: 'areaUnderPR'}))

0.7434926434958825
0.7144290754256393


Best parameters:

In [66]:
results = [
 (
 [
 {key.name: paramValue}
 for key, paramValue
 in zip(
 params.keys(),
 params.values())
 ], metric
 )
 for params, metric
 in zip(
 cvModel.getEstimatorParamMaps(),
 cvModel.avgMetrics
 )
]
sorted(results,
 key=lambda el: el[1],
 reverse=True)[0]

([{'maxIter': 50}, {'regParam': 0.01}], 0.7411920462870111)

## Classification

In [69]:
import pyspark.sql.functions as fn

Cast label column as double type:

In [70]:
births = births.withColumn(
    'INFANT_ALIVE_AT_REPORT',
    fn.col('INFANT_ALIVE_AT_REPORT').cast(typ.DoubleType())
)
births_train, births_test = births.randomSplit(
    [0.7, 0.3], seed = 666
)

Pipeline:

In [80]:
classifier = cl.RandomForestClassifier(
    numTrees = 10,
    maxDepth = 5,
    labelCol = 'INFANT_ALIVE_AT_REPORT'
)
pipeline = Pipeline(
    stages = [
        encoder,
        featuresCreator,
        classifier]
)

Fitting and testing the model:

In [81]:
model = pipeline.fit(births_train)

In [82]:
test = model.transform(births_test)

Evaluating:

In [83]:
evaluator = ev.BinaryClassificationEvaluator(
    labelCol = 'INFANT_ALIVE_AT_REPORT'
)

In [84]:
print(evaluator.evaluate(test,
    {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(test,
    {evaluator.metricName: 'areaUnderPR'}))

0.7647280689555164
0.7469692302466407


## Clustering

In [138]:
import pyspark.ml.clustering as clus

K-Means:

In [143]:
kmeans = clus.KMeans(k = 5,
                    featuresCol = 'features')
pipeline = Pipeline(stages = [
    encoder,
    featuresCreator,
    kmeans
])

In [144]:
model = pipeline.fit(births_train)

In [145]:
test = model.transform(births_test)

Differences between clusters:

In [149]:
clust = test.groupBy('prediction')\
    .agg({
        '*': 'count',
        'MOTHER_HEIGHT_IN': 'avg'
})
clust.show()

+----------+---------------------+--------+
|prediction|avg(MOTHER_HEIGHT_IN)|count(1)|
+----------+---------------------+--------+
|         1|    83.91154791154791|     407|
|         3|    66.64658634538152|     249|
|         4|    64.31597357170618|   10292|
|         2|    67.69473684210526|     475|
|         0|    64.43472584856397|    2298|
+----------+---------------------+--------+



## Regression

In [150]:
features = ['MOTHER_AGE_YEARS','MOTHER_HEIGHT_IN',
 'MOTHER_PRE_WEIGHT','DIABETES_PRE',
 'DIABETES_GEST','HYP_TENS_PRE',
 'HYP_TENS_GEST', 'PREV_BIRTH_PRETERM',
 'CIG_BEFORE','CIG_1_TRI', 'CIG_2_TRI',
 'CIG_3_TRI'
 ]

In [151]:
featuresCreator = ft.VectorAssembler(
    inputCols = [col for col in features[1:]],
    outputCol = 'features'
)

Select only the top six most important features using Chi Square Test:

In [152]:
selector = ft.ChiSqSelector(
    numTopFeatures = 6,
    outputCol = 'selectedFeatures',
    labelCol = 'MOTHER_WEIGHT_GAIN'
)

We will use the gradient boosted trees regressor:

In [154]:
import pyspark.ml.regression as reg

In [155]:
regressor = reg.GBTRegressor(
    maxIter = 15,
    maxDepth = 3,
    labelCol = 'MOTHER_WEIGHT_GAIN'
)

In [156]:
pipeline = Pipeline(stages = [
    featuresCreator,
    selector,
    regressor
])

In [157]:
weightGain = pipeline.fit(births_train)

Evaluation:

In [158]:
evaluator = ev.RegressionEvaluator(
    predictionCol = 'prediction',
    labelCol = 'MOTHER_WEIGHT_GAIN'
)

In [165]:
print(evaluator.evaluate(
    weightGain.transform(births_test),
    {evaluator.metricName: 'r2'}))

0.4883122086226992
