# Getting started with MLlib - binary classification example


This tutorial is designed to get you started with Apache Spark MLlib. It investigates a binary classification problem - can you predict if an individual's income is greater than $50,000 based on demographic data? The dataset is from the [UCI Machine Learning Repository](https://archive.ics.uci.edu/ml/datasets/Adult) and is provided with Databricks Runtime. This notebook demonstrates some of the capabilities available in MLlib, including tools for data preprocessing, machine learning pipelines, and several different machine learning algorithms.

This notebook includes the following steps:

1. Load the dataset
0. Feature preprocessing
0. Define the model
0. Build the pipeline
0. Evaluate the model
0. Hyperparameter tuning
0. Make predictions and evaluate model performance

## Requirements
Databricks Runtime 7.0 or above or Databricks Runtime 7.0 ML or above. If you are running Databricks Runtime 6.x or Databricks Runtime 6.x ML, see ([AWS](https://docs.databricks.com/getting-started/spark/machine-learning.html)|[Azure](https://docs.microsoft.com/azure/databricks/getting-started/spark/machine-learning/)) for the correct notebook.

## Step 1. Load the dataset

Use Databricks utilities to view the first few rows of the data.

In [0]:
%fs head --maxBytes=1024 databricks-datasets/adult/adult.data

Because the dataset does not include column names, create a schema to assign column names and datatypes. 

In [0]:
schema = """`age` DOUBLE,
`workclass` STRING,
`fnlwgt` DOUBLE,
`education` STRING,
`education_num` DOUBLE,
`marital_status` STRING,
`occupation` STRING,
`relationship` STRING,
`race` STRING,
`sex` STRING,
`capital_gain` DOUBLE,
`capital_loss` DOUBLE,
`hours_per_week` DOUBLE,
`native_country` STRING,
`income` STRING"""

dataset = spark.read.csv("/databricks-datasets/adult/adult.data", schema=schema)

Randomly split data into training and test sets, and set seed for reproducibility.

It's best to split the data before doing any preprocessing. This allows the test dataset to more closely simulate new data when we evaluate the model.

In [0]:
trainDF, testDF = dataset.randomSplit([0.8, 0.2], seed=42)
print(trainDF.cache().count()) # Cache because accessing training data multiple times
print(testDF.count())

Let's review the data.

In [0]:
display(trainDF)

What's the distribution of the number of `hours_per_week`?

In [0]:
display(trainDF.select("hours_per_week").summary())

How about `education` status?

In [0]:
display(trainDF
        .groupBy("education")
        .count()
        .sort("count", ascending=False))

## Background: Transformers, estimators, and pipelines

Three important concepts in MLlib machine learning that are illustrated in this notebook are **Transformers**, **Estimators**, and **Pipelines**. 

- **Transformer**: Takes a DataFrame as input, and returns a new DataFrame. Transformers do not learn any parameters from the data and simply apply rule-based transformations to either prepare data for model training or generate predictions using a trained MLlib model. You call a transformer with a `.transform()` method.

- **Estimator**: Learns (or "fits") parameters from your DataFrame via a `.fit()` method and returns a Model, which is a transformer.

- **Pipeline**: Combines multiple steps into a single workflow that can be easily run. Creating a machine learning model typically involves setting up many different steps and iterating over them. Pipelines help you automate this process.

For more information:
[ML Pipelines](https://spark.apache.org/docs/latest/ml-pipeline.html#ml-pipelines)

## Step 2. Feature preprocessing 

The goal of this notebook is to build a model that predicts the `income` level from the features included in the dataset (education level, marital status, occupation, and so on). The first step is to manipulate, or preprocess, the features so they are in the format MLlib requires.

### Convert categorical variables to numeric

Some machine learning algorithms, such as linear and logistic regression, require numeric features. The Adult dataset includes categorical features such as education, occupation, and marital status. 

The following code block illustrates how to use `StringIndexer` and `OneHotEncoder` to convert categorical variables into a set of numeric variables that only take on values 0 and 1. 

- `StringIndexer` converts a column of string values to a column of label indexes. For example, it might convert the values "red", "blue", and "green" to 0, 1, and 2. 
- `OneHotEncoder` maps a column of category indices to a column of binary vectors, with at most one "1" in each row that indicates the category index for that row.

One-hot encoding in Spark is a two-step process. You first use the StringIndexer, followed by the OneHotEncoder. The following code block defines the StringIndexer and OneHotEncoder but does not apply it to any data yet.

For more information:   
[StringIndexer](http://spark.apache.org/docs/latest/ml-features.html#stringindexer)   
[OneHotEncoder](https://spark.apache.org/docs/latest/ml-features.html#onehotencoder)

In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

categoricalCols = ["workclass", "education", "marital_status", "occupation", "relationship", "race", "sex"]

# The following two lines are estimators. They return functions that we will later apply to transform the dataset.
stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=[x + "Index" for x in categoricalCols]) 
encoder = OneHotEncoder(inputCols=stringIndexer.getOutputCols(), outputCols=[x + "OHE" for x in categoricalCols]) 

# The label column ("income") is also a string value - it has two possible values, "<=50K" and ">50K". 
# Convert it to a numeric value using StringIndexer.
labelToIndex = StringIndexer(inputCol="income", outputCol="label")

In this notebook, we'll build a pipeline combining all of our feature engineering and modeling steps. But let's take a minute to look more closely at how estimators and transformers work by applying the `stringIndexer` estimator that we created in the previous code block.

You can call the `.fit()` method to return a `StringIndexerModel`, which you can then use to transform the dataset. 

The `.transform()` method of `StringIndexerModel` returns a new DataFrame with the new columns appended. Scroll right to see the new columns if necessary. 

For more information: [StringIndexerModel](https://spark.apache.org/docs/latest/api/java/org/apache/spark/ml/feature/StringIndexerModel.html)

In [0]:
stringIndexerModel = stringIndexer.fit(trainDF)
display(stringIndexerModel.transform(trainDF))

### Combine all feature columns into a single feature vector

Most MLlib algorithms require a single features column as input. Each row in this column contains a vector of data points corresponding to the set of features used for prediction. 

MLlib provides the `VectorAssembler` transformer to create a single vector column from a list of columns.

The following code block illustrates how to use VectorAssembler.

For more information: [VectorAssembler](https://spark.apache.org/docs/latest/ml-features.html#vectorassembler)

In [0]:
from pyspark.ml.feature import VectorAssembler

# This includes both the numeric columns and the one-hot encoded binary vector columns in our dataset.
numericCols = ["age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week"]
assemblerInputs = [c + "OHE" for c in categoricalCols] + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

## Step 3. Define the model

This notebook uses a [logistic regression](https://spark.apache.org/docs/latest/ml-classification-regression.html#logistic-regression) model. 

Logistic Regression information from Databricks [logistic regression pyspark](https://api-docs.databricks.com/python/pyspark/latest/api/pyspark.ml.classification.LogisticRegression.html)

In [0]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol="features", labelCol="label", regParam=1.0)

## Step 4. Build the pipeline

A `Pipeline` is an ordered list of transformers and estimators. You can define a pipeline to automate and ensure repeatability of the transformations to be applied to a dataset. In this step, we define the pipeline and then apply it to the test dataset.

Similar to what we saw with `StringIndexer`, a `Pipeline` is an estimator. The `pipeline.fit()` method returns a `PipelineModel`, which is a transformer.

For more information:   
[Pipelines](https://spark.apache.org/docs/latest/ml-pipeline.html#ml-pipelines)  

In [0]:
from pyspark.ml import Pipeline

# Define the pipeline based on the stages created in previous steps.
pipeline = Pipeline(stages=[stringIndexer, encoder, labelToIndex, vecAssembler, lr])

# Define the pipeline model.
pipelineModel = pipeline.fit(trainDF)

# Apply the pipeline model to the test dataset.
predDF = pipelineModel.transform(testDF)

Display the predictions from the model. The `features` column is a [sparse vector](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.linalg.SparseVector.html#pyspark.ml.linalg.SparseVector), which is often the case after one-hot encoding, because there are so many 0 values.

In [0]:
display(predDF.select("features", "label", "prediction", "probability"))

## Step 5. Evaluate the model

The `display` command has a built-in ROC curve option.

[Receiver operating characteristics](https://en.wikipedia.org/wiki/Receiver_operating_characteristic)

Article on [Understanding AUC-ROC Curve](https://towardsdatascience.com/understanding-auc-roc-curve-68b2303cc9c5)

In [0]:
display(pipelineModel.stages[-1], predDF.drop("prediction", "rawPrediction", "probability"), "ROC")

To evaluate the model, we use the `BinaryClassificationEvaluator` to evalute the area under the ROC curve and the `MulticlassClassificationEvaluator` to evalute the accuracy.

For more information:  
[BinaryClassificationEvaluator](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.evaluation.BinaryClassificationEvaluator.html#binaryclassificationevaluator)  
[MulticlassClassificationEvaluator](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.evaluation.MulticlassClassificationEvaluator.html#multiclassclassificationevaluator)

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

bcEvaluator = BinaryClassificationEvaluator(metricName="areaUnderROC", labelCol="label")
print(f"Area under ROC curve: {bcEvaluator.evaluate(predDF)}")

mcEvaluator = MulticlassClassificationEvaluator(metricName="accuracy", labelCol="label")
print(f"Accuracy: {mcEvaluator.evaluate(predDF)}")

## Step 6. Hyperparameter tuning

MLlib provides methods to facilitate hyperparameter tuning and cross validation. 
- For hyperparameter tuning, `ParamGridBuilder` lets you define a grid search over a set of model hyperparameters.
- For cross validation, `CrossValidator` lets you specify an estimator (the pipeline to apply to the input dataset), an evaluator, a grid space of hyperparameters, and the number of folds to use for cross validation.
  
For more information:   
[Model selection using cross-validation](https://spark.apache.org/docs/latest/ml-tuning.html)  
[ParamGridBuilder](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.tuning.ParamGridBuilder.html#paramgridbuilder)  
[CrossValidator](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.tuning.CrossValidator.html#crossvalidator)   

Use `ParamGridBuilder` and `CrossValidator` to tune the model. This example uses three values for `regParam` and three for `elasticNetParam`, for a total of 3 x 3 = 9 hyperparameter combinations for `CrossValidator` to examine. 

`regParam` and `elasticNetParam` are types of Regularization.

In mathematics, statistics, finance, computer science, particularly in machine learning and inverse problems, regularization is a process that changes the result answer to be "simpler". It is often used to obtain results for ill-posed problems or to prevent overfitting.  

[Regularization](https://runawayhorse001.github.io/LearningApacheSpark/reg.html)



  
The fixed regularization parameter (regParam in the code) defines the trade-off between the two goals of minimizing the loss (i.e., training error) and minimizing model complexity (i.e., to avoid overfitting). regParam prevents overfitting by penalizing models with extreme parameter values.  

[source](https://spark.apache.org/docs/1.5.2/mllib-linear-methods.html)

[example notebook](https://notebook.community/waichee/pyspark-ipython-notebook/spark-pyspark-mllib-101)

A problem with linear regression is that estimated coefficients of the model can become large, making the model sensitive to inputs and possibly unstable. This is particularly true for problems with few observations (samples) or more samples (n) than input predictors (p) or variables (so-called p >> n problems).

One approach to addressing the stability of regression models is to change the loss function to include additional costs for a model that has large coefficients. Linear regression models that use these modified loss functions during training are referred to collectively as penalized linear regression.

One popular penalty is to penalize a model based on the sum of the squared coefficient values. This is called an L2 penalty. An L2 penalty minimizes the size of all coefficients, although it prevents any coefficients from being removed from the model.

Another popular penalty is to penalize a model based on the sum of the absolute coefficient values. This is called the L1 penalty. An L1 penalty minimizes the size of all coefficients and allows some coefficients to be minimized to the value zero, which removes the predictor from the model.


Elastic net is a penalized linear regression model that includes both the L1 and L2 penalties during training.  

[source](https://machinelearningmastery.com/elastic-net-regression-in-python/)  

[elasticNetParam](https://runawayhorse001.github.io/LearningApacheSpark/reg.html)  

[Article explaining L1 and L2](https://towardsdatascience.com/l1-and-l2-regularization-methods-ce25e7fc831c)


In [0]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .build())

Whenever you call `CrossValidator` in MLlib, Databricks automatically tracks all of the runs using [MLflow](https://mlflow.org/). You can use the MLflow UI ([AWS](https://docs.databricks.com/applications/mlflow/index.html)|[Azure](https://docs.microsoft.com/azure/databricks/applications/mlflow/)|[GCP](https://docs.gcp.databricks.com/applications/mlflow/index.html)) to compare how each model performed.

In this example we use the pipeline we created as the estimator.

In [0]:
# import mlflow
# experiment_name = "/Shared/classification_experiment/"
# mlflow.set_experiment(experiment_name)
# # mlflow.pyspark.ml.autolog

In [0]:
# Create a 3-fold CrossValidator
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=bcEvaluator, numFolds=3, parallelism = 4)

# Run cross validations. This step takes a few minutes and returns the best model found from the cross validation.
cvModel = cv.fit(trainDF)

## Step 7. Make predictions and evaluate model performance
Use the best model identified by the cross-validation to make predictions on the test dataset, and then evaluate the model's performance using the area under the ROC curve. 

In [0]:
# Use the model identified by the cross-validation to make predictions on the test dataset
cvPredDF = cvModel.transform(testDF)

# Evaluate the model's performance based on area under the ROC curve and accuracy 
print(f"Area under ROC curve: {bcEvaluator.evaluate(cvPredDF)}")
print(f"Accuracy: {mcEvaluator.evaluate(cvPredDF)}")

Using SQL commands, you can also display predictions grouped by age and occupation. This requires creating a temporary view of the predictions dataset. 

In [0]:
cvPredDF.createOrReplaceTempView("finalPredictions")

In [0]:
%sql
SELECT occupation, prediction, count(*) AS count
FROM finalPredictions
GROUP BY occupation, prediction
ORDER BY occupation

In [0]:
%sql
SELECT age, prediction, count(*) AS count
FROM finalPredictions
GROUP BY age, prediction
ORDER BY age