# Model Building - Import the training and test data



In [1]:
import os
import pprint
import numpy as np
import os
import pprint
import numpy as np
import pandas as pd

from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

train_data_path = "/spark_ml/AdultCensusIncomeTrain"
test_data_path = "/spark_ml/AdultCensusIncomeTest"

train = spark.read.orc(train_data_path)
test = spark.read.orc(test_data_path)

print("train: ({}, {})".format(train.count(), len(train.columns)))
print("test: ({}, {})".format(test.count(), len(test.columns)))

train.printSchema()


train: (24469, 3)
test: (8092, 3)
root
 |-- age: integer (nullable = true)
 |-- hours_per_week: integer (nullable = true)
 |-- income: string (nullable = true)

# Model building - Encode features and Build Model

In [1]:
label = "income"
reg = 0.1
print("Regularization Rate is {}.".format(reg))

# create a new Logistic Regression model.
lr = LogisticRegression(regParam=reg)

dtypes = dict(train.dtypes)
dtypes.pop(label)

si_xvars = []
ohe_xvars = []
featureCols = []
for idx,key in enumerate(dtypes):
    if dtypes[key] == "string":
        featureCol = "-".join([key, "encoded"])
        featureCols.append(featureCol)
        
        tmpCol = "-".join([key, "tmp"])
        # string-index and one-hot encode the string column
        #https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/ml/feature/StringIndexer.html
        #handleInvalid: Param for how to handle invalid data (unseen labels or NULL values). 
        #Options are 'skip' (filter out rows with invalid data), 'error' (throw an error), 
        #or 'keep' (put invalid data in a special additional bucket, at index numLabels). Default: "error"
        si_xvars.append(StringIndexer(inputCol=key, outputCol=tmpCol, handleInvalid="skip")) #, handleInvalid="keep"
        ohe_xvars.append(OneHotEncoder(inputCol=tmpCol, outputCol=featureCol))
    else:
        featureCols.append(key)

# string-index the label column into a column named "label"
si_label = StringIndexer(inputCol=label, outputCol='label')

# assemble the encoded feature columns in to a column named "features"
assembler = VectorAssembler(inputCols=featureCols, outputCol="features")

# put together the pipeline
stages = []
stages.extend(si_xvars)
stages.extend(ohe_xvars)
stages.append(si_label)
stages.append(assembler)
stages.append(lr)
pipe = Pipeline(stages=stages)

# train the model
model = pipe.fit(train)
print(model)
model.stages


Regularization Rate is 0.1.
PipelineModel_49cfbacdb54dd44bcca2
[StringIndexer_4e5ab09117dc68a07eae, VectorAssembler_43b7be097576e3659c49, LogisticRegression_42b491b66df1978b6ebc]

# Model Building - Select the best model

In [1]:

regs = np.arange(0.0, 1.0, 0.2)

paramGrid = ParamGridBuilder().addGrid(lr.regParam, regs).build()
cv = CrossValidator(estimator=pipe, evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid)

cvModel = cv.fit(train)

model = cvModel.bestModel

# Model Evaluation

In [1]:
# make prediction
pred = model.transform(test)
print(pd.DataFrame(pred.take(10)).to_string())

# evaluate. note only 2 metrics are supported out of the box by Spark ML.
bce = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction')
au_roc = bce.setMetricName('areaUnderROC').evaluate(pred)
au_prc = bce.setMetricName('areaUnderPR').evaluate(pred)

print("Area under ROC: {}".format(au_roc))
print("Area Under PR: {}".format(au_prc))

    0   1      2    3             4                                          5                                           6    7
0  17   4  <=50K  0.0   [4.0, 17.0]    [3.984203061099825, -3.984203061099825]  [0.9817326384088789, 0.018267361591121044]  0.0
1  17   5  <=50K  0.0   [5.0, 17.0]    [3.935897389723122, -3.935897389723122]  [0.9808458778128771, 0.019154122187122896]  0.0
2  17   5  <=50K  0.0   [5.0, 17.0]    [3.935897389723122, -3.935897389723122]  [0.9808458778128771, 0.019154122187122896]  0.0
3  17   6  <=50K  0.0   [6.0, 17.0]  [3.8875917183464184, -3.8875917183464184]  [0.9799169513950979, 0.020083048604902023]  0.0
4  17   6  <=50K  0.0   [6.0, 17.0]  [3.8875917183464184, -3.8875917183464184]  [0.9799169513950979, 0.020083048604902023]  0.0
5  17   8  <=50K  0.0   [8.0, 17.0]  [3.7909803755930116, -3.7909803755930116]  [0.9779248519533819, 0.022075148046618136]  0.0
6  17   8  <=50K  0.0   [8.0, 17.0]  [3.7909803755930116, -3.7909803755930116]  [0.9779248519533819, 0.0

# Model Persistence

In [1]:
##NOTE: by default the model is saved to and loaded from path

model_name = "AdultCensus.mml"
model_fs = "/spark_ml/" + model_name

model.write().overwrite().save(model_fs)
print("saved model to {}".format(model_fs))


# load the model file (from dbfs)
model2 = PipelineModel.load(model_fs)
assert str(model2) == str(model)
print("loaded model from {}".format(model_fs))

saved model to /spark_ml/AdultCensus.mml
loaded model from /spark_ml/AdultCensus.mml