### Introduction 

This project will utilize MLlib, Apache Spark's scalable machine learning library to perform binary classification on some US census data. The aim is to predict the income class of individuals, i.e. if an individual earns <=50K or >50k a year given some specific socio-economic variables. 

### Dataset Review

This data derives from census data, and consists of information about 48842 individuals and their annual income. The dataset consists of both numeric and categorical variables.

Attribute Information:

    age: continuous
    workclass: Private,Self-emp-not-inc, Self-emp-inc, Federal-gov, Local-gov, State-gov, Without-pay, Never-worked
    fnlwgt: continuous
    education: Bachelors, Some-college, 11th, HS-grad, Prof-school, Assoc-acdm, Assoc-voc...
    education-num: continuous
    marital-status: Married-civ-spouse, Divorced, Never-married, Separated, Widowed, Married-spouse-absent...
    occupation: Tech-support, Craft-repair, Other-service, Sales, Exec-managerial, Prof-specialty, Handlers-cleaners...
    relationship: Wife, Own-child, Husband, Not-in-family, Other-relative, Unmarried
    race: White, Asian-Pac-Islander, Amer-Indian-Eskimo, Other, Black
    sex: Female, Male
    capital-gain: continuous
    capital-loss: continuous
    hours-per-week: continuous
    native-country: United-States, Cambodia, England, Puerto-Rico, Canada, Germany...

Income: <=50K, >50K

### 1. Load data

In [202]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('adult.data').getOrCreate()

#data = '/home/william/Downloads/datasets/adult.data'

data = '/home/william/Downloads/adult.data'

df = spark.read.csv(data, inferSchema = True, header = True)

df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: double (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: double (nullable = true)
 |-- capital_loss: double (nullable = true)
 |-- hours_per_week: double (nullable = true)
 |-- native_country: string (nullable = true)
 |-- income: string (nullable = true)



In [203]:
# get data description pandas style

df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
age,32561,38.58164675532078,13.640432553581356,17,90
workclass,32561,,,?,Without-pay
fnlwgt,32561,189778.36651208502,105549.97769702227,12285.0,1484705.0
education,32561,,,10th,Some-college
education_num,32561,10.0806793403151,2.572720332067397,1.0,16.0
marital_status,32561,,,Divorced,Widowed
occupation,32561,,,?,Transport-moving
relationship,32561,,,Husband,Wife
race,32561,,,Amer-Indian-Eskimo,White


### Preprocessing of Data

Since MLlib can only work with numeric data, convert all categorical variables in dataset to numeric using one of 2 methods.

#### 1. Category indexing: 

Assigns numeric value to each category. More suitable for ordinal variables as it introduces implicit ordering among   variables. e.g Poor - 0, Average - 1, Good -2.
    
    
#### 2. One-Hot encoding

Converts categories into binary vectors with at most one non-zero value. e.g.a blue ball in the space of 3 posbbile  balls R,B, G becomes encoded as [0,1,0]
    
One-Hot encoding results in a sparseVector.  for high cardinality, the feature space can really blow up quickly and you start fighting with the curse of dimensionality. In such cases, employ one-hot-encoding followed by PCA for dimensionality reduction. 

In [204]:
#imports

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler

In [205]:
# Indexes each categorical col using StringIndexer, and then convert the indexed categories 
#into one-hot encoded variables. These are then appended to a stages list to be used later.

#specify categorical cols

categoricalCols = ["workclass", "education", "marital_status", "occupation", 
                   "relationship", "race", "sex", "native_country"]

stages = []
for col in categoricalCols:
    #categoryindexing using StringIndexer
    stringIndexer = StringIndexer(inputCol=col, outputCol = col + "Index")
    
    #use OneHotEncoderEstimator (for multilple estimators) to convert categorical variables into binary sparseVectors
    encoder = OneHotEncoderEstimator(inputCols = [stringIndexer.getOutputCol()], outputCols=[col + "classVec"])
    
    #add stringIndexers and encoders to stages
    stages += [stringIndexer, encoder]

In [206]:
# Convert label into label indices using the StringIndexer

label_stringIndx = StringIndexer(inputCol="income", outputCol="label")

#add to stages
stages += [label_stringIndx]

In [207]:
#check out stages object.
#we now have 9 stringIndexer obj relating to all string cols plus label(also a string col)

stages

[StringIndexer_d76e9cbda328,
 OneHotEncoderEstimator_0f2fe5a7a921,
 StringIndexer_482226f0de3c,
 OneHotEncoderEstimator_d3cb2398abc8,
 StringIndexer_e9ecfae95231,
 OneHotEncoderEstimator_ec6aef9f3506,
 StringIndexer_7991eff73ce1,
 OneHotEncoderEstimator_3953a559de1d,
 StringIndexer_34d632e279e6,
 OneHotEncoderEstimator_097b98103d92,
 StringIndexer_683093b05b61,
 OneHotEncoderEstimator_3be8d579fdeb,
 StringIndexer_e1644936d6c9,
 OneHotEncoderEstimator_91c045a0be56,
 StringIndexer_d8098ebc772e,
 OneHotEncoderEstimator_2fa09ab3f0f5,
 StringIndexer_e9c3c42c74c5]

In [208]:
# Use a VectorAssembler to combine all the feature columns into a single vector column. 
# This is made up of both numeric cols and one-hot encoded binary vector cols

numericCols = ["age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week"]

assemblerInputs = [c + "classVec" for c in categoricalCols] + numericCols

assembler = VectorAssembler(inputCols=assemblerInputs, outputCol = "features")

stages += [assembler]

In [209]:
# Run stages as a pipeline. This puts the data through all the feature transformations in a single call

#specify pipeline. partialPipeline = Pipeline().setStages(stages)
partialPipeline = Pipeline(stages=stages)

#fit pipleline to data
pipelineModel = partialPipeline.fit(df)

#transform the data
preppedDataDF = pipelineModel.transform(df)

In [210]:
preppedDataDF.toPandas().head(4)

Unnamed: 0,age,workclass,fnlwgt,education,education_num,marital_status,occupation,relationship,race,sex,...,relationshipIndex,relationshipclassVec,raceIndex,raceclassVec,sexIndex,sexclassVec,native_countryIndex,native_countryclassVec,label,features
0,39,State-gov,77516.0,Bachelors,13.0,Never-married,Adm-clerical,Not-in-family,White,Male,...,1.0,"(0.0, 1.0, 0.0, 0.0, 0.0)",0.0,"(1.0, 0.0, 0.0, 0.0)",0.0,(1.0),0.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",0.0,"(0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, ..."
1,50,Self-emp-not-inc,83311.0,Bachelors,13.0,Married-civ-spouse,Exec-managerial,Husband,White,Male,...,0.0,"(1.0, 0.0, 0.0, 0.0, 0.0)",0.0,"(1.0, 0.0, 0.0, 0.0)",0.0,(1.0),0.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",0.0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
2,38,Private,215646.0,HS-grad,9.0,Divorced,Handlers-cleaners,Not-in-family,White,Male,...,1.0,"(0.0, 1.0, 0.0, 0.0, 0.0)",0.0,"(1.0, 0.0, 0.0, 0.0)",0.0,(1.0),0.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",0.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ..."
3,53,Private,234721.0,11th,7.0,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,...,0.0,"(1.0, 0.0, 0.0, 0.0, 0.0)",1.0,"(0.0, 1.0, 0.0, 0.0)",0.0,(1.0),0.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",0.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."


In [211]:
# Keep relevant columns. ie "label', "features" and original df cols from preppedDataDF 

cols = df.columns
selectedcols = ["label", "features"] + cols
dataset = preppedDataDF.select(selectedcols)
dataset.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: double (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: double (nullable = true)
 |-- capital_loss: double (nullable = true)
 |-- hours_per_week: double (nullable = true)
 |-- native_country: string (nullable = true)
 |-- income: string (nullable = true)



In [259]:
### Randomly split data into training and test sets. set seed for reproducibility

(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=100)

print(trainingData.count())
print(testData.count())

22838
9723


#### Logistic Regression

In [260]:
from pyspark.ml.classification import LogisticRegression

#create logistic regression model 
lr = LogisticRegression(featuresCol="features", labelCol="label")

#train model with training data
lrModel = lr.fit(trainingData)

In [261]:
#make predictions on test data using "transform" method. 
#LogisticRegression will only use the 'features' col

predictions = lrModel.transform(testData)

In [262]:
#View model's predictions and probabilities of each prediction class
# We can select any columns in the above schema to view as well

selected = predictions.select("label","prediction","probability","age","occupation","race")
selected.show(5)

+-----+----------+--------------------+---+---------------+------+
|label|prediction|         probability|age|     occupation|  race|
+-----+----------+--------------------+---+---------------+------+
|  0.0|       0.0|[0.69202851495175...| 26| Prof-specialty| White|
|  0.0|       0.0|[0.63269684263405...| 30| Prof-specialty| White|
|  0.0|       0.0|[0.66174975816368...| 31| Prof-specialty| White|
|  0.0|       0.0|[0.65972433569650...| 32| Prof-specialty| White|
|  0.0|       0.0|[0.62758081234794...| 39| Prof-specialty| White|
+-----+----------+--------------------+---+---------------+------+
only showing top 5 rows



In [263]:
#lets check some results where label = 1

predictions.select("label","prediction","probability","age","occupation").filter("label >=1 and prediction =1").show()

+-----+----------+--------------------+---+---------------+
|label|prediction|         probability|age|     occupation|
+-----+----------+--------------------+---+---------------+
|  1.0|       1.0|[0.13195626207502...| 41| Prof-specialty|
|  1.0|       1.0|[0.10873454851983...| 42| Prof-specialty|
|  1.0|       1.0|[0.11293855754228...| 52| Prof-specialty|
|  1.0|       1.0|[0.37579401228828...| 40| Prof-specialty|
|  1.0|       1.0|[0.45381853136164...| 34| Prof-specialty|
|  1.0|       1.0|[0.33909167088873...| 68| Prof-specialty|
|  1.0|       1.0|[0.35939719642615...| 32|   Craft-repair|
|  1.0|       1.0|[7.29120604407629...| 32|   Craft-repair|
|  1.0|       1.0|[0.18753132463830...| 39|   Craft-repair|
|  1.0|       1.0|[0.23060753694683...| 39|   Craft-repair|
|  1.0|       1.0|[0.36824810712030...| 42|   Craft-repair|
|  1.0|       1.0|[0.12986902302045...| 45|   Craft-repair|
|  1.0|       1.0|[0.15213216811005...| 49|   Craft-repair|
|  1.0|       1.0|[0.14467016280457...| 

In [264]:
# We check if to use ROC or Precision-Recall(PR) curve to evaluate models.

# ROC curves should be used when there are roughly equal numbers of observations for each class.
 
# Precision-Recall curves should be used when there is a moderate to large class imbalance.

#check label imbalance

print("0 labels: {}".format(dataset.filter(dataset['label'] <= 0).count()))

print("1 labels: {}".format(dataset.filter(dataset['label'] >= 1).count()))



0 labels: 24720
1 labels: 7841


In [293]:
#evaluate model
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
#even though dataset is only binary classification can use MultiClassificationEvaluator to get more fine grained metrics
#such as accuracy, True positive rate, etc and not only Area under curve as in BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction',   
                                             labelCol='label')

#default metric for BinaryClassificationEvaluator is areaUnderROC. This can be used when labels are largely unbalanced. 
#In our situation we use area under PR curve

evaluator.setMetricName("areaUnderPR")

print(evaluator.getMetricName())

AUC = evaluator.evaluate(predictions)
AUC

areaUnderPR


0.7578113378461419

Now we will try tuning the model with the ParamGridBuilder and the CrossValidator

In [None]:
#get an explanation of the parameters

print(lr.explainParams())

We use 3 values for maxIter, regParam and elasticNetParam. Thus grid will have 3 x 3 x 3 = 27 parameter settings for Crossalidator to choose from. We will create a 5-fold cross validator

In [295]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

#create ParamGrid for cross validation

paramGrid = (ParamGridBuilder() 
             .addGrid(lr.regParam,[0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam,[0, 0.5, 1.0])
             .addGrid(lr.maxIter,[1,5,10])
             .build())

In [296]:
# Create 5-fold CrossValidator
#K-fold cross validation performs model selection by splitting the dataset into a set of
#non-overlapping randomly partitioned folds which are used as separate training and test datasets

cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData)

In [297]:
# Generate predictions using test data set

predictions = cvModel.transform(testData)

In [298]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model

evaluator.evaluate(predictions)

0.7561892436534638

In [299]:
#print(cvModel.bestModel.explainParams()) to get print of best models's parameters

In [300]:
# View best model's predictions and probabilities of each prediction class

selected = predictions.select("label", "prediction", "probability", "age", "occupation").show(5)

+-----+----------+--------------------+---+---------------+
|label|prediction|         probability|age|     occupation|
+-----+----------+--------------------+---+---------------+
|  0.0|       0.0|[0.64155216164797...| 26| Prof-specialty|
|  0.0|       0.0|[0.59755712902274...| 30| Prof-specialty|
|  0.0|       0.0|[0.61999363679023...| 31| Prof-specialty|
|  0.0|       0.0|[0.61798613046987...| 32| Prof-specialty|
|  0.0|       0.0|[0.60105070352727...| 39| Prof-specialty|
+-----+----------+--------------------+---+---------------+
only showing top 5 rows



In [301]:
bestmodel = cvModel.bestModel

bestmodel

LogisticRegressionModel: uid = LogisticRegression_06de9eae696d, numClasses = 2, numFeatures = 100

#### Decision Trees
Decision Trees handles categorical data well 

In [302]:
from pyspark.ml.classification import DecisionTreeClassifier

#create initial decision tree model
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=3)

#train model using Training data
dtModel = dt.fit(trainingData)

In [303]:
#check number of nodes and tree depth in model
print("Num of nodes = ", dtModel.numNodes)
print("Tree depth = ", dtModel.depth)

Num of nodes =  11
Tree depth =  3


In [304]:
#make predictions of test data
predictions = dtModel.transform(testData)

In [305]:
predictions.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: double (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: double (nullable = true)
 |-- capital_loss: double (nullable = true)
 |-- hours_per_week: double (nullable = true)
 |-- native_country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [306]:
#evaluate model using BinaryClassificationEvaluator

#evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',   
 #                                            labelCol='label')

evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

0.7608600693350143

In [307]:
#Entropy and the Gini coefficient are the supported measures of impurity for Decision Trees. Gini is the default. 
#Change this value using, model.setImpurity("Entropy").

dt.getImpurity()

'gini'

In [308]:
#create paramGrid for cross validation
paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth,[1,2,6,10])
             .addGrid(dt.maxBins,[20,40,80])
             .build())
    

In [309]:
#create 5-fold CrossValidator
cv = CrossValidator(estimator = dt, estimatorParamMaps = paramGrid, evaluator=evaluator, numFolds=5)

#run cross validation on training data
cvModel = cv.fit(trainingData)

In [310]:
print("Num of nodes = ", cvModel.bestModel.numNodes)
print("Tree Depth = ", cvModel.bestModel.depth)

Num of nodes =  451
Tree Depth =  10


In [311]:
#generate predictions using best model on test data
predictions = cvModel.transform(testData)

#evaluate the best model
evaluator.evaluate(predictions)

0.7693973010915965

In [312]:
#view best model's predictions and probabilities
selected = predictions.select("label","prediction","probability","age", "race")
selected.show(5)

+-----+----------+--------------------+---+------+
|label|prediction|         probability|age|  race|
+-----+----------+--------------------+---+------+
|  0.0|       1.0|           [0.4,0.6]| 26| White|
|  0.0|       0.0|         [0.75,0.25]| 30| White|
|  0.0|       0.0|         [0.75,0.25]| 31| White|
|  0.0|       0.0|         [0.75,0.25]| 32| White|
|  0.0|       0.0|[0.67791215786123...| 39| White|
+-----+----------+--------------------+---+------+
only showing top 5 rows



#### Random Forests
RF uses an ensemble of trees and should lead to improved model accuracy

In [313]:
from pyspark.ml.classification import RandomForestClassifier

#create initial RF model
rf = RandomForestClassifier(labelCol="label", featuresCol="features")

#train model using training data
rfModel = rf.fit(trainingData)

In [314]:
#make predictions on test data
predictions = rfModel.transform(testData)


In [315]:
# View model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
selected.show(5)

+-----+----------+--------------------+---+---------------+
|label|prediction|         probability|age|     occupation|
+-----+----------+--------------------+---+---------------+
|  0.0|       0.0|[0.63505114651070...| 26| Prof-specialty|
|  0.0|       0.0|[0.57220342364277...| 30| Prof-specialty|
|  0.0|       0.0|[0.57220342364277...| 31| Prof-specialty|
|  0.0|       0.0|[0.57220342364277...| 32| Prof-specialty|
|  0.0|       0.0|[0.57220342364277...| 39| Prof-specialty|
+-----+----------+--------------------+---+---------------+
only showing top 5 rows



In [316]:
#evaluate model
evaluator = BinaryClassificationEvaluator(labelCol='label', rawPredictionCol='rawPrediction')
evaluator.evaluate(predictions)

0.8831446294014879

In [317]:
#tune model by creating a ParamGrid for Cross Validation

paramGrid = (ParamGridBuilder()
            .addGrid(rf.maxDepth,[2, 4, 6])
            .addGrid(rf.maxBins,[20, 60])
            .addGrid(rf.numTrees,[5,20])
            .build())

In [318]:
#create 5-fold crossvalidator
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid,evaluator=evaluator, numFolds=5)

#run cross validations
cvModel = cv.fit(testData)

In [319]:
#measure accuracy of new model on testData
predictions = cvModel.transform(testData)

#evaluate best model
evaluator.evaluate(predictions)

0.8984244907314489

In [320]:
# View model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
selected.show(4)

+-----+----------+--------------------+---+---------------+
|label|prediction|         probability|age|     occupation|
+-----+----------+--------------------+---+---------------+
|  0.0|       0.0|[0.69588505643786...| 26| Prof-specialty|
|  0.0|       0.0|[0.65522581396495...| 30| Prof-specialty|
|  0.0|       0.0|[0.64821295826655...| 31| Prof-specialty|
|  0.0|       0.0|[0.64130717992890...| 32| Prof-specialty|
+-----+----------+--------------------+---+---------------+
only showing top 4 rows



#### Gradient Boosted Trees

In [322]:
from pyspark.ml.classification import GBTClassifier

#create initial GBT model
gbt = GBTClassifier(labelCol="label", featuresCol="features")

#train model using training data
gbtModel = gbt.fit(trainingData)

In [323]:
#make predictions on test data
predictions = gbtModel.transform(testData)

# View gbt model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
selected.show(5)

+-----+----------+--------------------+---+---------------+
|label|prediction|         probability|age|     occupation|
+-----+----------+--------------------+---+---------------+
|  0.0|       0.0|[0.81230136501736...| 26| Prof-specialty|
|  0.0|       0.0|[0.75685837511868...| 30| Prof-specialty|
|  0.0|       0.0|[0.75685837511868...| 31| Prof-specialty|
|  0.0|       0.0|[0.75685837511868...| 32| Prof-specialty|
|  0.0|       0.0|[0.55038995052028...| 39| Prof-specialty|
+-----+----------+--------------------+---+---------------+
only showing top 5 rows



In [325]:
predictions.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: double (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: double (nullable = true)
 |-- capital_loss: double (nullable = true)
 |-- hours_per_week: double (nullable = true)
 |-- native_country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [329]:
#evaluate model
evaluator = BinaryClassificationEvaluator(labelCol='label', rawPredictionCol='rawPrediction')
evaluator.evaluate(predictions)

0.9072755923559589

In [333]:
#tune model by creating a ParamGrid for Cross Validation

paramGrid = (ParamGridBuilder()
            .addGrid(gbt.maxDepth,[2, 5])
            .addGrid(gbt.maxBins,[20, 60])
            .addGrid(gbt.maxIter,[5,20])
            .build())

In [334]:
#create 5-fold crossvalidator
cv = CrossValidator(estimator=gbt, estimatorParamMaps=paramGrid,evaluator=evaluator, numFolds=5)

#run cross validations
cvModel = cv.fit(testData)

In [335]:
#measure accuracy of new model on testData
predictions = cvModel.transform(testData)

#evaluate best model
evaluator.evaluate(predictions)

0.9297325731312439

In [344]:
#The GBT model gives the best area under PR Curve, we can apply the MultiClassificationEvaluator to get some more details
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

gbt_eval = MulticlassClassificationEvaluator(labelCol='label',predictionCol='prediction', metricName='accuracy')
gbt_eval.evaluate(predictions)

0.8733929857040008

#### Make Predictions on unlabeled data

As GBT model gave the best results, use the best model, transform any incoming data  

In [433]:
# get best model
bestModel = cvModel.bestModel

In [440]:
#use best model to transform unlabeled data to get final predictions. In this case we simulate on whole dataset
finalPredictions = bestModel.transform(dataset)

In [460]:
#predictions.select("prediction", "probability", "age", "occupation")

preds = finalPredictions.select("prediction", "probability","age", "race").show(10)

+----------+--------------------+---+------+
|prediction|         probability|age|  race|
+----------+--------------------+---+------+
|       0.0|[0.87620310098616...| 39| White|
|       0.0|[0.80347916601378...| 50| White|
|       0.0|[0.93055511014988...| 38| White|
|       0.0|[0.84196220188525...| 53| Black|
|       1.0|[0.23929550745614...| 28| Black|
|       1.0|[0.18637832767941...| 37| White|
|       0.0|[0.94566177620039...| 49| Black|
|       0.0|[0.69287693140286...| 52| White|
|       1.0|[0.07047152314889...| 31| White|
|       1.0|[0.05546618411321...| 42| White|
+----------+--------------------+---+------+
only showing top 10 rows



In [396]:
#set up for SQL querying
finalPredictions.createOrReplaceTempView("finalPredictions")

In [442]:
#query data into for analysis or lead targeting etc.
query1 = spark.sql("SELECT occupation, prediction, count(*) as Count from finalPredictions group by occupation, prediction") 