# Binary Classification
Data: https://archive.ics.uci.edu/ml/datasets/Adult <br>
This data derives from census data, and consists of information about 48842 individuals and their annual income. <br>
Objective: predict if an individual earns <=50K or >50k a year.

In [2]:
%fs ls databricks-datasets/adult/adult.data

path,name,size
dbfs:/databricks-datasets/adult/adult.data,adult.data,3974305


In [3]:
%fs head databricks-datasets/adult/adult.data

In [4]:
%sql DROP TABLE IF EXISTS adult

In [5]:
%sql
CREATE TABLE adult (
  age DOUBLE,
  workclass STRING,
  fnlwgt DOUBLE,
  education STRING,
  education_num DOUBLE,
  marital_status STRING,
  occupation STRING,
  relationship STRING,
  race STRING,
  sex STRING,
  capital_gain DOUBLE,
  capital_loss DOUBLE,
  hours_per_week DOUBLE,
  native_country STRING,
  income STRING)
USING CSV
OPTIONS (path "/databricks-datasets/adult/adult.data", header "true")

In [6]:
dataset = spark.table("adult")
cols = dataset.columns

In [7]:
dataset.count()

In [8]:
dataset.columns

In [9]:
dataset.schema

In [10]:
display(dataset)

age,workclass,fnlwgt,education,education_num,marital_status,occupation,relationship,race,sex,capital_gain,capital_loss,hours_per_week,native_country,income
50.0,Self-emp-not-inc,83311.0,Bachelors,13.0,Married-civ-spouse,Exec-managerial,Husband,White,Male,0.0,0.0,13.0,United-States,<=50K
38.0,Private,215646.0,HS-grad,9.0,Divorced,Handlers-cleaners,Not-in-family,White,Male,0.0,0.0,40.0,United-States,<=50K
53.0,Private,234721.0,11th,7.0,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,0.0,0.0,40.0,United-States,<=50K
28.0,Private,338409.0,Bachelors,13.0,Married-civ-spouse,Prof-specialty,Wife,Black,Female,0.0,0.0,40.0,Cuba,<=50K
37.0,Private,284582.0,Masters,14.0,Married-civ-spouse,Exec-managerial,Wife,White,Female,0.0,0.0,40.0,United-States,<=50K
49.0,Private,160187.0,9th,5.0,Married-spouse-absent,Other-service,Not-in-family,Black,Female,0.0,0.0,16.0,Jamaica,<=50K
52.0,Self-emp-not-inc,209642.0,HS-grad,9.0,Married-civ-spouse,Exec-managerial,Husband,White,Male,0.0,0.0,45.0,United-States,>50K
31.0,Private,45781.0,Masters,14.0,Never-married,Prof-specialty,Not-in-family,White,Female,14084.0,0.0,50.0,United-States,>50K
42.0,Private,159449.0,Bachelors,13.0,Married-civ-spouse,Exec-managerial,Husband,White,Male,5178.0,0.0,40.0,United-States,>50K
37.0,Private,280464.0,Some-college,10.0,Married-civ-spouse,Exec-managerial,Husband,Black,Male,0.0,0.0,80.0,United-States,>50K


**Preprocess Data** <br>
Convert the categorical variables in the dataset into numeric variables. There are 2 ways we can do this.
- Category Indexing
This is basically assigning a numeric value to each category from {0, 1, 2, ...numCategories-1}. 

- One-Hot Encoding
This converts categories into binary vectors with at most one nonzero value 

In this dataset, we have ordinal variables like education (Preschool - Doctorate), and also nominal variables like relationship (Wife, Husband, Own-child, etc). 
For simplicity's sake, we will use One-Hot Encoding to convert all categorical variables into binary vectors. It is possible here to improve prediction accuracy by converting each categorical column with an appropriate method.

Here, we will use a combination of StringIndexer and OneHotEncoderEstimator to convert the categorical variables. The OneHotEncoderEstimator will return a SparseVector.

Since we will have more than 1 stage of feature transformations, we use a Pipeline to tie the stages together.

In [12]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
categoricalColumns = ["workclass", "education", "marital_status", "occupation", "relationship", "race", "sex", "native_country"]
stages = [] # stages in our Pipeline
for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    # encoder = OneHotEncoderEstimator(inputCol=categoricalCol + "Index", outputCol=categoricalCol + "classVec")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]

In [13]:
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol="income", outputCol="label")
stages += [label_stringIdx]

In [14]:
# Transform all features into a vector using VectorAssembler
# combine all the feature columns into a single vector column. This includes both the numeric columns and the one-hot encoded binary vector columns
numericCols = ["age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week"]
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [15]:
from pyspark.ml.classification import LogisticRegression
  
partialPipeline = Pipeline().setStages(stages)
pipelineModel = partialPipeline.fit(dataset)
preppedDataDF = pipelineModel.transform(dataset)

In [16]:
# Fit model to prepped data
lrModel = LogisticRegression().fit(preppedDataDF)

# ROC for training data
display(lrModel, preppedDataDF, "ROC")

False Positive Rate,True Positive Rate,Threshold
0.0,0.0,0.9999999999999976
0.0,0.0416666666666666,0.9999999999999976
0.0,0.0833333333333333,0.9999999999998648
0.0,0.125,0.99911131027025
0.0,0.1666666666666666,0.9598316100920976
0.0,0.2083333333333333,0.9457964835355642
0.0,0.25,0.9445961282754258
0.0,0.2916666666666667,0.9344544738455518
0.0,0.3333333333333333,0.8753462987200405
0.0,0.375,0.8258472681699973


In [17]:
display(lrModel, preppedDataDF)

fitted values,residuals
-0.0733332883807879,0.5183251104769154
1.9490800751172297,0.1246537012799594
-3.310574673803253,-0.0352101920719507
-6.129953876472403,-0.0021719537034928
-4.876329604559768,-0.0075672494154279
0.4145437541478481,-0.6021768837819196
-5.116082187818829,-0.0059637026288861
-5.601450649753476,-0.0036789188802701
-0.6912665609683211,-0.3337513797059621
-4.257238412151042,-0.0139636125963735


In [18]:
# Keep relevant columns
selectedcols = ["label", "features"] + cols
dataset = preppedDataDF.select(selectedcols)
display(dataset)

label,features,age,workclass,fnlwgt,education,education_num,marital_status,occupation,relationship,race,sex,capital_gain,capital_loss,hours_per_week,native_country,income
0.0,"List(0, 100, List(1, 10, 23, 31, 43, 48, 52, 53, 94, 95, 96, 99), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 50.0, 83311.0, 13.0, 13.0))",50.0,Self-emp-not-inc,83311.0,Bachelors,13.0,Married-civ-spouse,Exec-managerial,Husband,White,Male,0.0,0.0,13.0,United-States,<=50K
0.0,"List(0, 100, List(0, 8, 25, 38, 44, 48, 52, 53, 94, 95, 96, 99), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 38.0, 215646.0, 9.0, 40.0))",38.0,Private,215646.0,HS-grad,9.0,Divorced,Handlers-cleaners,Not-in-family,White,Male,0.0,0.0,40.0,United-States,<=50K
0.0,"List(0, 100, List(0, 13, 23, 38, 43, 49, 52, 53, 94, 95, 96, 99), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 53.0, 234721.0, 7.0, 40.0))",53.0,Private,234721.0,11th,7.0,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,0.0,0.0,40.0,United-States,<=50K
0.0,"List(0, 100, List(0, 10, 23, 29, 47, 49, 62, 94, 95, 96, 99), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 28.0, 338409.0, 13.0, 40.0))",28.0,Private,338409.0,Bachelors,13.0,Married-civ-spouse,Prof-specialty,Wife,Black,Female,0.0,0.0,40.0,Cuba,<=50K
0.0,"List(0, 100, List(0, 11, 23, 31, 47, 48, 53, 94, 95, 96, 99), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 37.0, 284582.0, 14.0, 40.0))",37.0,Private,284582.0,Masters,14.0,Married-civ-spouse,Exec-managerial,Wife,White,Female,0.0,0.0,40.0,United-States,<=50K
0.0,"List(0, 100, List(0, 18, 28, 34, 44, 49, 64, 94, 95, 96, 99), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 49.0, 160187.0, 5.0, 16.0))",49.0,Private,160187.0,9th,5.0,Married-spouse-absent,Other-service,Not-in-family,Black,Female,0.0,0.0,16.0,Jamaica,<=50K
1.0,"List(0, 100, List(1, 8, 23, 31, 43, 48, 52, 53, 94, 95, 96, 99), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 52.0, 209642.0, 9.0, 45.0))",52.0,Self-emp-not-inc,209642.0,HS-grad,9.0,Married-civ-spouse,Exec-managerial,Husband,White,Male,0.0,0.0,45.0,United-States,>50K
1.0,"List(0, 100, List(0, 11, 24, 29, 44, 48, 53, 94, 95, 96, 97, 99), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 31.0, 45781.0, 14.0, 14084.0, 50.0))",31.0,Private,45781.0,Masters,14.0,Never-married,Prof-specialty,Not-in-family,White,Female,14084.0,0.0,50.0,United-States,>50K
1.0,"List(0, 100, List(0, 10, 23, 31, 43, 48, 52, 53, 94, 95, 96, 97, 99), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 42.0, 159449.0, 13.0, 5178.0, 40.0))",42.0,Private,159449.0,Bachelors,13.0,Married-civ-spouse,Exec-managerial,Husband,White,Male,5178.0,0.0,40.0,United-States,>50K
1.0,"List(0, 100, List(0, 9, 23, 31, 43, 49, 52, 53, 94, 95, 96, 99), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 37.0, 280464.0, 10.0, 80.0))",37.0,Private,280464.0,Some-college,10.0,Married-civ-spouse,Exec-managerial,Husband,Black,Male,0.0,0.0,80.0,United-States,>50K


In [19]:
### Randomly split data into training and test sets. set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=100)
print(trainingData.count())
print(testData.count())

# Fit and Evaluate Models
Try out some of the Binary Classification algorithms available in the Pipelines API.
- Decision Tree Classifier
- Random Forest Classifier

These are the general steps to build our models:
- Create initial model using the training set
- Tune parameters with a ParamGrid and 5-fold Cross Validation
- Evaluate the best model obtained from the Cross Validation using the test set

We use the BinaryClassificationEvaluator to evaluate our models, which uses areaUnderROC as the default metric.

In [21]:
from pyspark.ml.classification import LogisticRegression

# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

# Train model with Training Data
lrModel = lr.fit(trainingData)

In [22]:
# Make predictions on test data using the transform() method.
# LogisticRegression.transform() will only use the 'features' column.
predictions = lrModel.transform(testData)

In [23]:
# View model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
display(selected)

label,prediction,probability,age,occupation
0.0,0.0,"List(1, 2, List(), List(0.6912640989186466, 0.3087359010813534))",26.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.6213734865155065, 0.3786265134844935))",30.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.6586287948600485, 0.3413712051399515))",31.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.6589958510289854, 0.3410041489710147))",32.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.6157704934546715, 0.38422950654532856))",39.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.5446870779706698, 0.4553129220293301))",47.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.6048473508705535, 0.3951526491294466))",50.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.5944480951080502, 0.40555190489194975))",51.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.5875090690375251, 0.4124909309624749))",60.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.5980792791827193, 0.4019207208172807))",61.0,Prof-specialty


In [24]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

In [25]:
evaluator.getMetricName()

In [26]:
# params are available for tuning, use explainParams() to print a list of all params and their definitions.
print(lr.explainParams())

In [27]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

In [28]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData)

In [29]:
# Use test set to measure the accuracy of our model on new data
predictions = cvModel.transform(testData)

In [30]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)

In [31]:
print('Model Intercept: ', cvModel.bestModel.intercept)

In [32]:
weights = cvModel.bestModel.coefficients
weights = [(float(w),) for w in weights]  # convert numpy type to float, and to tuple
weightsDF = sqlContext.createDataFrame(weights, ["Feature Weight"])
display(weightsDF)

Feature Weight
-0.3013288247935353
-0.6509583616562696
-0.4113902711090522
-0.527199911880614
-0.5000892604470324
-0.0746052837500147
0.2166209376805166
-2.509659126178557
-0.5605378344344435
-0.2386845095846995


In [33]:
# View best model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
display(selected)

label,prediction,probability,age,occupation
0.0,0.0,"List(1, 2, List(), List(0.642611698912397, 0.35738830108760306))",26.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.5993129972737805, 0.40068700272621943))",30.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.6217105151737744, 0.37828948482622554))",31.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.6200681370550873, 0.37993186294491277))",32.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.6026369787567719, 0.39736302124322814))",39.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.5519481908787538, 0.44805180912124615))",47.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.5896188295748096, 0.4103811704251903))",50.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.5841082558815177, 0.4158917441184823))",51.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.5870503528065624, 0.41294964719343763))",60.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.5889386452837937, 0.4110613547162063))",61.0,Prof-specialty


In [34]:
from pyspark.ml.classification import DecisionTreeClassifier

# Create initial Decision Tree Model
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=3)

# Train model with Training Data
dtModel = dt.fit(trainingData)

In [35]:
display(dtModel)

treeNode
"{""index"":5,""featureType"":""categorical"",""prediction"":null,""threshold"":null,""categories"":[0.0],""feature"":23,""overflow"":false}"
"{""index"":1,""featureType"":""continuous"",""prediction"":null,""threshold"":8151.0,""categories"":null,""feature"":97,""overflow"":false}"
"{""index"":0,""featureType"":null,""prediction"":0.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":3,""featureType"":""continuous"",""prediction"":null,""threshold"":21.5,""categories"":null,""feature"":94,""overflow"":false}"
"{""index"":2,""featureType"":null,""prediction"":0.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":4,""featureType"":null,""prediction"":1.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":9,""featureType"":""continuous"",""prediction"":null,""threshold"":12.5,""categories"":null,""feature"":96,""overflow"":false}"
"{""index"":7,""featureType"":""continuous"",""prediction"":null,""threshold"":3448.0,""categories"":null,""feature"":97,""overflow"":false}"
"{""index"":6,""featureType"":null,""prediction"":0.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":8,""featureType"":null,""prediction"":1.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"


In [36]:
print("numNodes = ", dtModel.numNodes)
print("depth = ", dtModel.depth)

In [37]:
# Make predictions on test data using the Transformer.transform() method.
predictions = dtModel.transform(testData)

In [38]:
predictions.printSchema()

In [39]:
# View model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
display(selected)

label,prediction,probability,age,occupation
0.0,0.0,"List(1, 2, List(), List(0.7014420247204238, 0.2985579752795762))",26.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.7014420247204238, 0.2985579752795762))",30.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.7014420247204238, 0.2985579752795762))",31.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.7014420247204238, 0.2985579752795762))",32.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.7014420247204238, 0.2985579752795762))",39.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.7014420247204238, 0.2985579752795762))",47.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.7014420247204238, 0.2985579752795762))",50.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.7014420247204238, 0.2985579752795762))",51.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.7014420247204238, 0.2985579752795762))",60.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.7014420247204238, 0.2985579752795762))",61.0,Prof-specialty


In [40]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Evaluate model
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

In [41]:
dt.getImpurity()

In [42]:
# Create ParamGrid for Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [1, 2, 6, 10])
             .addGrid(dt.maxBins, [20, 40, 80])
             .build())

In [43]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData)

In [44]:
print("numNodes = ", cvModel.bestModel.numNodes)
print("depth = ", cvModel.bestModel.depth)

In [45]:
# Use test set to measure the accuracy of our model on new data
predictions = cvModel.transform(testData)

In [46]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)

In [47]:
# View Best model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
display(selected)

label,prediction,probability,age,occupation
0.0,0.0,"List(1, 2, List(), List(0.8429752066115702, 0.15702479338842976))",26.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.8429752066115702, 0.15702479338842976))",30.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.8429752066115702, 0.15702479338842976))",31.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.8429752066115702, 0.15702479338842976))",32.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.6796426292278238, 0.32035737077217613))",39.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.6796426292278238, 0.32035737077217613))",47.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.6796426292278238, 0.32035737077217613))",50.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.6796426292278238, 0.32035737077217613))",51.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.9003322259136213, 0.09966777408637874))",60.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.9003322259136213, 0.09966777408637874))",61.0,Prof-specialty


In [48]:
from pyspark.ml.classification import RandomForestClassifier

# Create an initial RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features")

# Train model with Training Data
rfModel = rf.fit(trainingData)

In [49]:
# Make predictions on test data using the Transformer.transform() method.
predictions = rfModel.transform(testData)

In [50]:
predictions.printSchema()

In [51]:
# View model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
display(selected)

label,prediction,probability,age,occupation
0.0,0.0,"List(1, 2, List(), List(0.7232420084097775, 0.2767579915902225))",26.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.7058663828134752, 0.2941336171865247))",30.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.7058663828134752, 0.2941336171865247))",31.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.7058663828134752, 0.2941336171865247))",32.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.6815672496051896, 0.3184327503948104))",39.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.6815672496051896, 0.3184327503948104))",47.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.6897944645322172, 0.31020553546778284))",50.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.6897944645322172, 0.31020553546778284))",51.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.6897944645322172, 0.31020553546778284))",60.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.6897944645322172, 0.31020553546778284))",61.0,Prof-specialty


In [52]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

In [53]:
# Create ParamGrid for Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [2, 4, 6])
             .addGrid(rf.maxBins, [20, 60])
             .addGrid(rf.numTrees, [5, 20])
             .build())

In [54]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations.  This can take about 6 minutes since it is training over 20 trees!
cvModel = cv.fit(trainingData)

In [55]:
# Use test set here so we can measure the accuracy of our model on new data
predictions = cvModel.transform(testData)

In [56]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)

In [57]:
# View Best model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
display(selected)

label,prediction,probability,age,occupation
0.0,0.0,"List(1, 2, List(), List(0.6833628666559673, 0.3166371333440327))",26.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.6541827482182802, 0.34581725178171985))",30.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.6541827482182802, 0.34581725178171985))",31.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.6541827482182802, 0.34581725178171985))",32.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.5869050286029408, 0.4130949713970592))",39.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.5869050286029408, 0.4130949713970592))",47.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.5976974927706207, 0.4023025072293794))",50.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.5976974927706207, 0.4023025072293794))",51.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.6154647088693163, 0.38453529113068374))",60.0,Prof-specialty
0.0,0.0,"List(1, 2, List(), List(0.6154647088693163, 0.38453529113068374))",61.0,Prof-specialty


In [58]:
# As Random Forest gives us the best areaUnderROC value, we will use the bestModel obtained from Random Forest for deployment, and use it to generate predictions on new data. 
# we will simulate this by generating predictions on the entire dataset.
bestModel = cvModel.bestModel

In [59]:
# Generate predictions for entire dataset
finalPredictions = bestModel.transform(dataset)

In [60]:
# Evaluate best model
evaluator.evaluate(finalPredictions)

In [61]:
# look into predictions grouped by age and occupation
finalPredictions.createOrReplaceTempView("finalPredictions")

In [62]:
%sql
SELECT occupation, prediction, count(*) AS count
FROM finalPredictions
GROUP BY occupation, prediction
ORDER BY occupation

occupation,prediction,count
?,0.0,1742
?,1.0,101
Adm-clerical,1.0,164
Adm-clerical,0.0,3605
Armed-Forces,1.0,1
Armed-Forces,0.0,8
Craft-repair,1.0,253
Craft-repair,0.0,3846
Exec-managerial,1.0,1454
Exec-managerial,0.0,2612


In [63]:
%sql
SELECT age, prediction, count(*) AS count
FROM finalPredictions
GROUP BY age, prediction
ORDER BY age

age,prediction,count
17.0,0.0,395
18.0,0.0,550
19.0,0.0,712
20.0,0.0,753
21.0,0.0,720
22.0,1.0,1
22.0,0.0,764
23.0,0.0,873
23.0,1.0,4
24.0,1.0,8
