# Developing machine learning classification model with Spark MLlib

In this lab you will learn how to develop a machine learning classification model using Spark MLlib in Azure Databricks environment. During the course of the lab you will walk through cardinal phases of a machine learning workflow from data gathering and cleaning through feature engineering and modeling to model inferencing.

## Lab scenario
You will develop a machine learning classification model to predict customer churn. The dataset used during the lab contains historical information about customers of a fictional telecomunication company. You will use Azure Databricks unified analytics platform and Spark MLlib library to implement the ML workflow resulting in a customer churn prediction model.

## MLlib Overview

### What is MLlib?

MLlib is a package, built on and included in Spark, that provides interfaces for
- gathering and cleaning data,
- feature engineering and feature selection,
- training and tuning large scale supervised and unsupervised machine learning models, 
- and using those models in production.

### MLlib Concepts

![MLlib](https://github.com/jakazmie/images-for-hands-on-labs/raw/master/MLlib.png)

## Gather, Analyze, and Preprocess data

### Load and review data

We begin by loading and doing a rudimentary analysis of customer churn historical data, which is stored in CSV format in Azure Blob.

In [4]:
# Reset the widgets
dbutils.widgets.removeAll()

In [5]:
# Set up notebook parameters
dbutils.widgets.text("STORAGE_ACCOUNT", "azureailabs")
dbutils.widgets.text("CONTAINER", "churn")
dbutils.widgets.text("ACCOUNT_KEY", "")

In [6]:
# Load data from Azure Blob
STORAGE_ACCOUNT = dbutils.widgets.get("STORAGE_ACCOUNT").strip()
CONTAINER = dbutils.widgets.get("CONTAINER").strip()
ACCOUNT_KEY = dbutils.widgets.get("ACCOUNT_KEY").strip()

if ACCOUNT_KEY != "":
  # Set up account access key
  conf_key = "fs.azure.account.key.{storage_acct}.blob.core.windows.net".format(storage_acct=STORAGE_ACCOUNT)
  spark.conf.set(conf_key, ACCOUNT_KEY)

source_str = "wasbs://{container}@{storage_acct}.blob.core.windows.net/".format(container=CONTAINER, storage_acct=STORAGE_ACCOUNT)
  
# Read the data from the default datasets repository in Databricks
df = spark.read.option("header", True).option("inferSchema", True).csv(source_str)
display(df)


The `Churn` column indicates whether the customer changed providers. This is our `target` variable or `label`. The goal of our model is to predict this column on new examples.

The subset of other columns will be used as predictors or features.

Some of the columns - e.g. `customerid` and `callingnum` - are not good candidates for features. They don't capture much information about the customer profile and may *leak the target* in the model. We will remove them from the training dataset.

There also some suspicious records. The first two records indicate that a 12 year old makes over $160,000 a year. Although it is possible it is highly improbable.

Let's drill down a little bit.

In [8]:
from pyspark.sql.functions import when

display(
  df.withColumn("agegroup", 
                when(df.age<=13, '0-13')
                .when((df.age>13) & (df.age<18), '14-16')
                .otherwise('>17'))
  .withColumn("incomegroup",
                  when(df.annualincome==0, 'O')
                  .when((df.annualincome>0) & (df.annualincome<10000) , '<10K')
                  .otherwise('>10K'))
  .groupBy('agegroup', 'incomegroup').count()
)


There 583 records of young kids making mor than $10,000. For the sake of this lab we will assume that these are errorneous records and remove them.

We will now create a DataFrame with an explicitly defined schema. We will also remove irrelevant columns and suspicious rows.

#### Cleanse data

In [11]:
from pyspark.sql.types import *

schema = StructType([
  StructField("age", DoubleType()),
  StructField("annualincome", DoubleType()),
  StructField("calldroprate", DoubleType()),
  StructField("callfailurerate", DoubleType()),
  StructField("callingnum", StringType()),
  StructField("customerid", StringType()),
  StructField("customersuspended",  StringType()),
  StructField("education",  StringType()),
  StructField("gender", StringType()),
  StructField("homeowner", StringType()),
  StructField("maritalstatus", StringType()),
  StructField("monthlybilledamount", DoubleType()),
  StructField("noadditionallines", StringType()),
  StructField("numberofcomplaints", DoubleType()),
  StructField("numberofmonthunpaid", DoubleType()),
  StructField("numdayscontractequipmentplanexpiring", DoubleType()),
  StructField("occupation", StringType()),
  StructField("penaltytoswitch", DoubleType()),
  StructField("state", StringType()),
  StructField("totalminsusedinlastmonth", DoubleType()),
  StructField("unpaidbalance", DoubleType()),
  StructField("usesinternetservice", StringType()),
  StructField("usesvoiceservice", StringType()),
  StructField("percentagecalloutsidenetwork", DoubleType()),
  StructField("totalcallduration", DoubleType()),
  StructField("avgcallduration", DoubleType()),
  StructField("churn", DoubleType()),
  StructField("year", DoubleType()),
  StructField("month", DoubleType())
])

df = (spark.read
     .option("header", True)
     .schema(schema)
     .csv(source_str))

display(df)

In [12]:
clean_df = (df.drop("customerid", "callingnum", "year", "month")
    .dropDuplicates()
    .filter(~ ((df.age<14) & (df.annualincome>10000))))
  
display(clean_df.groupBy("churn").count())

As shown by the last query, our dataset is unbalanced with respect to the class label. We will have to take it under consideration when training the model.

#### Split data into training and test sets

At this point we will split our dataset into separate training and test sets.

In [15]:
# Split the dataset randomly into 85% for training and 15% for testing.

train, test = clean_df.randomSplit([0.85, 0.15], 0)
print("We have {} training examples and {} test examples.".format(train.count(), test.count()))

#### Visualize our data

Now that we have preprocessed our features and prepared a training dataset, we can use visualizations to get more insights about the data.

Calling `display()` on a DataFrame in Databricks and clicking the plot icon below the table will let you draw and pivot various plots.  See the [Visualizations section of the Databricks Guide](https://docs.databricks.com/user-guide/visualizations/index.html) for more ideas.

In [17]:
display(train)

You can also use other visualization libraries.

In [19]:
import matplotlib.pyplot as plt
import pandas as pd

# Get a sample of data
sample = train.sample(False, 0.05, 42).toPandas()

ax = sample.plot.scatter(x='percentagecalloutsidenetwork', y='avgcallduration')
display()

#### Save training and testing data

At this point, we are going to save the datasets using `Parquet` format

In [21]:
test.write.mode("overwrite").parquet("/datasets/churn_test_data")
train.write.mode("overwrite").parquet("/datasets/churn_train_data")

d ### Train a Machine Learning Pipeline

Now that we have understood our data and prepared it as a DataFrame with pre-processed data, we are ready to train an ML classifier. In this lab we will focus on a single algorithm - Gradient-boosted tree classifier - however in most cases you should go through a more thorough model selection process to find an algorithm that best fits you scenario and training data. We will also demonstrate how to automate hyperparameter tuning using Spark ML validators.

To achieve it, we will put together a simple Spark ML Pipeline.

Most Spark ML algorithms, including GBT, expect the training data to be provided as a *numeric* column to represent the label and a column of type *Vector* to represent the features. 

The features in our datasets are a mix of *numeric* and *string* values. *String* columns represent categorical features. Most *numeric* columns are continous features. Before we can configure hyper parameter tuning and Random Forest stages of our pipeline we will need to add a few data transformation steps.

Our complete pipeline has the following stages:

* `StringIndexer`: Convert string columns to categorical features
* `VectorAssembler`: Assemble the feature columns into a feature vector.
* `VectorIndexer`: Identify columns which should be treated as categorical. This is done heuristically, identifying any column with a small number of distinct values as being categorical.  For us, this will include columns like `occupation` or `homeowner` .
* `Classifier`: This stage will train the classification algorithm.
* `CrossValidator`: The machine learning algorithms have several [hyperparameters](https://en.wikipedia.org/wiki/Hyperparameter_optimization), and tuning them to our data can improve performance of the model.  We will do this tuning using Spark's [Cross Validation](https://en.wikipedia.org/wiki/Cross-validation_&#40;statistics&#41;) framework, which automatically tests a grid of hyperparameters and chooses the best.

![Image of Pipeline](https://github.com/jakazmie/images-for-hands-on-labs/raw/master/pipeline.png)

First, we define the feature processing stages of the Pipeline:
* Convert string columns to categorical features. 
* Assemble feature columns into a feature vector. 
* Identify categorical features, and index them.
![Image of feature processing](https://github.com/jakazmie/images-for-hands-on-labs/raw/master/features.png)

In [24]:
from pyspark.ml.feature import StringIndexer, VectorAssembler, VectorIndexer
from pyspark.sql.types import *
from pyspark.ml import Pipeline

# Create a list of string indexers - one for each string column
stringCols = [field.name for field in train.schema if field.dataType == StringType()]
stringIndexers = [StringIndexer().setInputCol(name).setOutputCol(name+"_idx") for name in stringCols]

# Get a list of all numeric columns
numericCols = [field.name for field in train.schema if field.dataType != StringType()]

# Remove a label column
numericCols.remove('churn')

# Create a list of all feature columns
featureCols = numericCols + [name + "_idx" for name in stringCols]

# This concatenates all feature columns into a single feature vector in a new column "rawFeatures".
vectorAssembler = VectorAssembler(inputCols=featureCols, outputCol="rawFeatures")

# This identifies categorical features and indexes them.
vectorIndexer = VectorIndexer(inputCol="rawFeatures", outputCol="features", maxCategories=4)

# Create a pipeline
stages = stringIndexers + [vectorAssembler, vectorIndexer]
pipeline = Pipeline(stages=stages)

# Check the Pipeline operation
display(pipeline.fit(train).transform(train))


Second, we define the model training stage of the Pipeline. `GBTClassifier` takes feature vectors and labels as input and learns to predict labels of new examples.
![RF image](https://github.com/jakazmie/images-for-hands-on-labs/raw/master/train.png)

In [26]:
from pyspark.ml.classification import GBTClassifier
# Takes the "features" column and learns to predict "churn"
classifier = GBTClassifier(labelCol="churn", featuresCol="features", maxBins=50)

Third, we wrap the model training stage within a `CrossValidator` stage.  `CrossValidator` knows how to call the classifier algorithm with different hyperparameter settings.  It will train multiple models and choose the best one, based on minimizing some metric.  In this lab, our metric is *AUC*.

![Crossvalidate](https://github.com/jakazmie/images-for-hands-on-labs/raw/master/tune.png)

In [28]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Define a grid of hyperparameters to test:
#  - maxDepth: max depth of each decision tree in the GBT ensemble
#  - maxIter: iterations, i.e., number of trees in each GBT ensemble
# In this example notebook, we keep these values small.  In practice, to get the highest accuracy, you would likely want to try deeper trees (10 or higher) and more trees in the ensemble (>100).
paramGrid = ParamGridBuilder()\
  .addGrid(classifier.maxDepth, [5, 7])\
  .addGrid(classifier.maxIter, [10, 50])\
  .build()
# Define a binary evaluator
evaluator = BinaryClassificationEvaluator(labelCol=classifier.getLabelCol(), rawPredictionCol=classifier.getPredictionCol())
# Declare the CrossValidator, which runs model tuning for us.
cv = CrossValidator(estimator=classifier, evaluator=evaluator, estimatorParamMaps=paramGrid, numFolds=3)

Finally, we can tie our feature processing and model training stages together into a single `Pipeline`.

![Image of Pipeline](https://github.com/jakazmie/images-for-hands-on-labs/raw/master/pipeline.png)

In [30]:
from pyspark.ml import Pipeline

stages = pipeline.getStages()
stages = stages + [cv]
pipeline.setStages(stages)
print(pipeline.getStages())

#### Train the Pipeline!

Now that we have set up our workflow, we can train the Pipeline in a single call.  Calling `fit()` will run feature processing, model tuning, and training in a single call.  We get back a fitted Pipeline with the best model found.

***Note***: This next cell can take up to **10 minutes**.  This is because it is training *a lot* of trees:
* For each random sample of data in Cross Validation,
  * For each setting of the hyperparameters,
    * `CrossValidator` is training a separate GBT ensemble which contains many Decision Trees.
    
Since our training set is unbalanced we will apply a technique called *under sampling*. We will use all instances of a minority class but select a random sample from the majority class.

In [32]:
# Load training data
train = spark.read.parquet("/datasets/churn_train_data")

# Undersample majority class
stratified_train = train.sampleBy('Churn', fractions={0: 0.2, 1: 1.0}).cache()

display(stratified_train.groupby('Churn').count())

Start training.

In [34]:
pipelineModel = pipeline.fit(stratified_train)

## Make predictions, and evaluate results

Our final step will be to use our fitted model to make predictions on new data.  We will use our held-out test set, but you could also use this model to make predictions on completely new data.  

We will also evaluate our predictions.  Computing evaluation metrics is important for understanding the quality of predictions, as well as for comparing models and tuning parameters.

Calling `transform()` on a new dataset passes that data through feature processing and uses the fitted model to make predictions.  We get back a DataFrame with a new column `predictions` (as well as intermediate results such as our `rawFeatures` column from feature processing).

In [37]:
test = spark.read.parquet("/datasets/churn_train_data")

predictions = pipelineModel.transform(test).cache()

It is easier to view the results when we limit the columns displayed to:
* `churn`: the true churn indicator
* `prediction`: our predicted churn
* feature columns: our original (human-readable) feature columns

In [39]:
display(predictions.select("churn", "prediction"))

Calculate classification performance metrics.

![Confusion matrix](https://github.com/jakazmie/images-for-hands-on-labs/raw/master/confusion.png)

The metrics we tried to optimize was *AUC* of ROC.

![ROC](https://github.com/jakazmie/images-for-hands-on-labs/raw/master/roc.png)

In [41]:
# Calculate AOC
print("{} on our test set: {}".format(evaluator.getMetricName(), evaluator.evaluate(predictions, {})))


## Persist the model

Spark MLlib supports model persistence. Key features of ML persistence include:
- Support of all language APIs in Spark: Scala, Java, Python & R
- Support for single models and full Pipelines, both unfitted (a "recipe") and fitted (a result)
- Distributed storage using and exchangealbe format

In Azure Databricks, by default, the model is saved to and loaded from DBFS.

In [43]:
model_path = '/models/churn_classifier'
pipelineModel.write().overwrite().save(model_path)

In [44]:
%fs ls 'dbfs:/models/churn_classifier'

You will use the saved model during the deployment lab.