# Machine learning with SPARK in SQL Server 2019 Big Data Cluster
Spark in Unified Big data compute engine that enables big data processing, Machine learning and AI

Key Spark advantages are 
1. Distributed compute enging 
2. Choice of langauge (Python, R, Scala, Java)
3. Single engine for Batch and Streaming jobs

In this tutorial we'll cover how we can use Spark to create and deploy machine learning models. The example is a python(PySpark) sample. The same can also be done using Scala and R ( SparkR) in Spark.

<img src = "Train_Score_Export_with_Spark.jpg" style="float: center;" alt="drawing" width="900">

## Steps
1. Explore your Data
2. Data Prep and split Data as Training and Test set
3. Model Training
4. Model Scoring 
5. Persist as Spark Model
6. Persist as Portable Model

E2E machine learning involves several additional step e.g data exploration, feature selection and principal component analysis,model selection etc. Many of these steps are ignored here for brevity.





## Step 1 - Explore your data
### Load the data
For this example we'll use **AdultCensusIncome** data from [here]( https://amldockerdatasets.azureedge.net/AdultCensusIncome.csv ). From your Azure Data Studio connect to the HDFS/Spark gateway and create a directory called spark_data under HDFS. 
Download [AdultCensusIncome.csv]( https://amldockerdatasets.azureedge.net/AdultCensusIncome.csv ) to your local machine and upload to HDFS.Upload AdultCensusIncome.csv to the folder we created.

### Exploratory Analysis
- Baisc exploration on the data
- Labels & Features
1. **Label**    - This refers to predicted value. This is represented as a column in the data. Label is **income** 
2. **Features** - This refers to the characteristics that are used to predict. **age**, **hours_per_week**, and **education**

Note : In reality features are chosen by applying some correlations techniques to understand what best characterize the Label we are predicting.

### The Model we will build
In AdultCensusIncome.csv contains several columsn like Income range, age, hours-per-week, education, occupation etc. We'll build a model that can predict income range would be >50K or <50K.


In [3]:
datafile = "/spark_data/AdultCensusIncome.csv"

#Read the data to a spark data frame.
data_all = spark.read.format('csv').options(header='true', inferSchema='true', ignoreLeadingWhiteSpace='true', ignoreTrailingWhiteSpace='true').load(datafile)
print("Number of rows: {},  Number of coulumns : {}".format(data_all.count(), len(data_all.columns)))
data_all.printSchema() 

#Replace "-" with "_" in column names
columns_new = [col.replace("-", "_") for col in data_all.columns]
data_all = data_all.toDF(*columns_new)
data_all.printSchema()


Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1582749467073_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Number of rows: 32561,  Number of coulumns : 15
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: stri

In [4]:
#Basic data exploration

##1. Sub set the data and print some important columns
print("Select few columns to see the data")
data_all.select(['income','age','hours_per_week', 'education']).show(10)

## Find the number of distict values
print("Number of distinct values for income")
ds_sub = data_all.select('income').distinct()
ds_sub.show()

##Add a numberic column(income_code) derived from income column
print("Added numeric column(income_code) derived from income column")
from pyspark.sql.functions import expr

df_new = data_all.withColumn("income_code", expr("case \
                                            when income like '%<=50K%' then 0 \
                                            when income like '%>50K%' then 1 \
                                            else 2 end "))

df_new.select(['income', 'age', 'hours_per_week', 'education', 'income_code']).show(10)

##Summary  statistical operations on dataframe
print("Print a statistical summary of a few columns")
df_new.select(['income','age','hours_per_week', 'education','income_code']).describe().show()

print("Calculate Co variance between a few columns to understand features to use")
mycov = df_new.stat.cov('income_code','hours_per_week')
print("Covariance between income and hours_per_week is", round(mycov,1))

mycov = df_new.stat.cov('income_code','age')
print("Covariance between income and age is", round(mycov,1))



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Select few columns to see the data
+------+---+--------------+---------+
|income|age|hours_per_week|education|
+------+---+--------------+---------+
| <=50K| 39|            40|Bachelors|
| <=50K| 50|            13|Bachelors|
| <=50K| 38|            40|  HS-grad|
| <=50K| 53|            40|     11th|
| <=50K| 28|            40|Bachelors|
| <=50K| 37|            40|  Masters|
| <=50K| 49|            16|      9th|
|  >50K| 52|            45|  HS-grad|
|  >50K| 31|            50|  Masters|
|  >50K| 42|            40|Bachelors|
+------+---+--------------+---------+
only showing top 10 rows

Number of distinct values for income
+------+
|income|
+------+
| <=50K|
|  >50K|
+------+

Added numeric column(income_code) derived from income column
+------+---+--------------+---------+-----------+
|income|age|hours_per_week|education|income_code|
+------+---+--------------+---------+-----------+
| <=50K| 39|            40|Bachelors|          0|
| <=50K| 50|            13|Bachelors|          0|
| <=

In [5]:
# Choose feature columns and the label column.
label = "income"
xvars = ["age", "hours_per_week", 'education'] #numeric and string

print("label = {}".format(label))
print("features = {}".format(xvars))

#Check label counts to check data bias
print("Count of rows that are <=50K", data_all[data_all.income=="<=50K"].count())
print("Count of rows that are >50K", data_all[data_all.income==">50K"].count())


select_cols = xvars
select_cols.append(label)
data = data_all.select(select_cols)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

label = income
features = ['age', 'hours_per_week', 'education']
Count of rows that are <=50K 24720
Count of rows that are >50K 7841

## Step 2 - Split as training and test set
We'll use 75% of rows to train the model and rest of the 25% to evaluate the model. Additionally we persist the train and test data sets to HDFS storage. The step is not necessary , but shown to demonstrate saving and loading with ORC format. Other format e.g. Parquet may also be used. Post this step you should see 2 directories created with the name "AdultCensusIncomeTrain" and "AdultCensusIncomeTest"


In [6]:
train, test = data.randomSplit([0.75, 0.25], seed=123)

print("train ({}, {})".format(train.count(), len(train.columns)))
print("test ({}, {})".format(test.count(), len(test.columns)))

train_data_path = "/spark_ml/AdultCensusIncomeTrain"
test_data_path = "/spark_ml/AdultCensusIncomeTest"

train.write.mode('overwrite').orc(train_data_path)
test.write.mode('overwrite').orc(test_data_path)
print("train and test datasets saved to {} and {}".format(train_data_path, test_data_path))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

train (24469, 4)
test (8092, 4)
train and test datasets saved to /spark_ml/AdultCensusIncomeTrain and /spark_ml/AdultCensusIncomeTest

## Step 3 - Train a model
[Spark ML pipelines](https://spark.apache.org/docs/latest/ml-pipeline.html) allow to sequence all steps as a workflow and make it easier to experiment with various algorithms and their parameters. The following code first constructs the stages and then puts these stages together in Ml pipeline.  LogisticRegression is used to create the model.

In [7]:
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression

reg = 0.1
print("Using LogisticRegression model with Regularization Rate of {}.".format(reg))

# create a new Logistic Regression model.
lr = LogisticRegression(regParam=reg)

dtypes = dict(train.dtypes)
dtypes.pop(label)

si_xvars = []
ohe_xvars = []
featureCols = []
for idx,key in enumerate(dtypes):
    if dtypes[key] == "string":
        featureCol = "-".join([key, "encoded"])
        featureCols.append(featureCol)
        
        tmpCol = "-".join([key, "tmp"])
        si_xvars.append(StringIndexer(inputCol=key, outputCol=tmpCol, handleInvalid="skip")) #, handleInvalid="keep"
        ohe_xvars.append(OneHotEncoderEstimator(inputCols=[tmpCol], outputCols=[featureCol]))
    else:
        featureCols.append(key)

# string-index the label column into a column named "label"
si_label = StringIndexer(inputCol=label, outputCol='label')

# assemble the encoded feature columns in to a column named "features"
assembler = VectorAssembler(inputCols=featureCols, outputCol="features")


stages = []
stages.extend(si_xvars)
stages.extend(ohe_xvars)
stages.append(si_label)
stages.append(assembler)
stages.append(lr)
pipe = Pipeline(stages=stages)
print("Pipeline Created")

model = pipe.fit(train)
print("Model Trained")
print("Model is ", model)
print("Model Stages", model.stages)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Using LogisticRegression model with Regularization Rate of 0.1.
Pipeline Created
Model Trained
Model is  PipelineModel_dd10dea16fcc
Model Stages [StringIndexer_a72809f2344f, OneHotEncoderEstimator_bff5847e2bae, StringIndexer_e62df3c26c82, VectorAssembler_e84f4c37608c, LogisticRegressionModel: uid = LogisticRegression_96385e6ff190, numClasses = 2, numFeatures = 17]

## Step 4 - Model scoring

Predict using the model and Evaluate the model accuracy
    
The code below use test data set to predict the outcome using the model created in the step above. We measure accuracy of the model with areaUnderROC and areaUnderPR metric.

In [8]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# make prediction
pred = model.transform(test)

# evaluate. note only 2 metrics are supported out of the box by Spark ML.
bce = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction')
au_roc = bce.setMetricName('areaUnderROC').evaluate(pred)
au_prc = bce.setMetricName('areaUnderPR').evaluate(pred)

print("Area under ROC: {}".format(au_roc))
print("Area Under PR: {}".format(au_prc))

pred[pred.prediction==1.0][pred.income,pred.label,pred.prediction].show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Area under ROC: 0.7964496884726682
Area Under PR: 0.5358180243123482
+------+-----+----------+
|income|label|prediction|
+------+-----+----------+
| <=50K|  0.0|       1.0|
|  >50K|  1.0|       1.0|
| <=50K|  0.0|       1.0|
| <=50K|  0.0|       1.0|
|  >50K|  1.0|       1.0|
| <=50K|  0.0|       1.0|
|  >50K|  1.0|       1.0|
| <=50K|  0.0|       1.0|
| <=50K|  0.0|       1.0|
|  >50K|  1.0|       1.0|
| <=50K|  0.0|       1.0|
|  >50K|  1.0|       1.0|
| <=50K|  0.0|       1.0|
|  >50K|  1.0|       1.0|
| <=50K|  0.0|       1.0|
| <=50K|  0.0|       1.0|
| <=50K|  0.0|       1.0|
|  >50K|  1.0|       1.0|
| <=50K|  0.0|       1.0|
|  >50K|  1.0|       1.0|
+------+-----+----------+
only showing top 20 rows

## Step 5 - Persist the Spark Models
Finally we persist the model in HDFS for later use. Post this step the created model get saved as /spark_ml/AdultCensus.mml



In [9]:
model_name = "AdultCensus.mml"
model_fs = "/spark_ml/" + model_name

model.write().overwrite().save(model_fs)
print("saved model to {}".format(model_fs))

# load the model file and check its same as the in-memory model
model2 = PipelineModel.load(model_fs)
assert str(model2) == str(model)
print("Successfully loaded from {}".format(model_fs))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

saved model to /spark_ml/AdultCensus.mml
Successfully loaded from /spark_ml/AdultCensus.mml

## Step 6 - Persist as Portable Model
Here we persist the Model in as Portable Mleap bundle for use outside Spark.

In [27]:
import os
from mleap.pyspark.spark_support import SimpleSparkSerializer
# serialize the model to a zip file in JSON format
model_name_export = "adult_census_pipeline.zip"
model_name_path = os.getcwd()
model_file = os.path.join(model_name_path, model_name_export)

# remove an old model file, if needed.
try:
    os.remove(model_file)
except OSError:
    pass

model_file_path = "jar:file:{}".format(model_file)
model.serializeToBundle(model_file_path, model.transform(train))

print("persist the mleap bundle from local to hdfs")

from subprocess import Popen, PIPE
proc = Popen(["/opt/hadoop/bin/hdfs", "dfs", "-put", "-f", model_file, os.path.join("/spark_ml", model_name_export)], stdout=PIPE, stderr=PIPE)
s_output, s_err = proc.communicate()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

persist the mleap bundle from local to hdfs
/tmp/nm-local-dir/usercache/root/appcache/application_1582749467073_0001/container_1582749467073_0001_01_000001/adult_census_pipeline.zip
adult_census_pipeline.zip
/opt/hadoop/bin/hdfs/spark_ml/adult_census_pipeline.zip