# Big Data Assignment for Classification
## Assingment Weeek 13
## Use Apache Spark MLlib on Databricks

*   Name: Kurnia Cahya Febryanto
*   Class: Big Data A
*   Student ID: 5025201073
*   Abdul Munif, S.Kom., M.Sc.

## Source:
- https://docs.databricks.com/machine-learning/train-model/mllib/index.html

## Install and Setup

In [None]:
!java --version
!python --version

openjdk 11.0.19 2023-04-18
OpenJDK Runtime Environment (build 11.0.19+7-post-Ubuntu-0ubuntu120.04.1)
OpenJDK 64-Bit Server VM (build 11.0.19+7-post-Ubuntu-0ubuntu120.04.1, mixed mode, sharing)
Python 3.10.11


In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


## Initialize Apache Spark context

In [None]:
# Import Apache Spark SQL
from pyspark.sql import SparkSession

# Create Spark Session/Context
# We are using local machine with all the CPU cores [*]
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Hello Pyspark") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

# Check spark session
print(spark)

<pyspark.sql.session.SparkSession object at 0x7f1fd81e3820>


## Dataset Details and Dataset Review

The Adult dataset is publicly available at the UCI Machine Learning Repository. This data derives from census data and consists of information about 48842 individuals and their annual income. You can use this information to predict if an individual earns <=50K or >50k a year. The dataset consists of both numeric and categorical variables.

Attribute Information:
*   age: continuous
*   workclass: Private,Self-emp-not-inc, Self-emp-inc, Federal-gov, Local-gov, State-gov, Without-pay, Never-worked
*   fnlwgt: continuous
*   education: Bachelors, Some-college, 11th, HS-grad, Prof-school, Assoc-acdm, Assoc-voc...
*   education-num: continuous
*   marital-status: Married-civ-spouse, Divorced, Never-married, Separated, Widowed, Married-spouse-absent...
*   occupation: Tech-support, Craft-repair, Other-service, Sales, Exec-managerial, Prof-specialty, Handlers-cleaners...
*   relationship: Wife, Own-child, Husband, Not-in-family, Other-relative, Unmarried
*   race: White, Asian-Pac-Islander, Amer-Indian-Eskimo, Other, Black
*   sex: Female, Male
*   capital-gain: continuous
*   capital-loss: continuous
*   hours-per-week: continuous
*   native-country: United-States, Cambodia, England, Puerto-Rico, Canada, Germany...

Target/Label: - <=50K, >50K




## Load Data from Dataset

In [None]:
from pyspark.sql.types import DoubleType, StringType, StructField, StructType
 
schema = StructType([
  StructField("age", DoubleType(), False),
  StructField("workclass", StringType(), False),
  StructField("fnlwgt", DoubleType(), False),
  StructField("education", StringType(), False),
  StructField("education_num", DoubleType(), False),
  StructField("marital_status", StringType(), False),
  StructField("occupation", StringType(), False),
  StructField("relationship", StringType(), False),
  StructField("race", StringType(), False),
  StructField("sex", StringType(), False),
  StructField("capital_gain", DoubleType(), False),
  StructField("capital_loss", DoubleType(), False),
  StructField("hours_per_week", DoubleType(), False),
  StructField("native_country", StringType(), False),
  StructField("income", StringType(), False)
])


In [None]:
dataset = spark.read.format("csv").schema(schema).load("adult.data")
cols = dataset.columns

In [None]:
dataset.show()

+----+-----------------+--------+-------------+-------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+
| age|        workclass|  fnlwgt|    education|education_num|      marital_status|        occupation|  relationship|               race|    sex|capital_gain|capital_loss|hours_per_week|native_country|income|
+----+-----------------+--------+-------------+-------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+
|39.0|        State-gov| 77516.0|    Bachelors|         13.0|       Never-married|      Adm-clerical| Not-in-family|              White|   Male|      2174.0|         0.0|          40.0| United-States| <=50K|
|50.0| Self-emp-not-inc| 83311.0|    Bachelors|         13.0|  Married-civ-spouse|   Exec-managerial|       Husband|              White|   Male|         0.0|         0.

In [None]:
display(dataset)

DataFrame[age: double, workclass: string, fnlwgt: double, education: string, education_num: double, marital_status: string, occupation: string, relationship: string, race: string, sex: string, capital_gain: double, capital_loss: double, hours_per_week: double, native_country: string, income: string]

## Preprocess Data

To use algorithms like Logistic Regression, you must first convert the categorical variables in the dataset into numeric variables. There are two ways to do this.

Category Indexing

This is basically assigning a numeric value to each category from {0, 1, 2, ...numCategories-1}. This introduces an implicit ordering among your categories, and is more suitable for ordinal variables (eg: Poor: 0, Average: 1, Good: 2)

One-Hot Encoding

This converts categories into binary vectors with at most one nonzero value (eg: (Blue: [1, 0]), (Green: [0, 1]), (Red: [0, 0]))

This notebook uses a combination of StringIndexer and, depending on your Spark version, either OneHotEncoder or OneHotEncoderEstimator to convert the categorical variables. OneHotEncoder and OneHotEncoderEstimator return a SparseVector.

Since there is more than one stage of feature transformations, use a Pipeline to tie the stages together. This simplifies the code.

In [61]:
import pyspark
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml.classification import LogisticRegression
  
categoricalColumns = ["workclass", "education", "marital_status", "occupation", "relationship", "race", "sex", "native_country"]


In [62]:
stages = [] # stages in Pipeline

for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

In [63]:
label_stringIdx = StringIndexer(inputCol="income", outputCol="label")
stages += [label_stringIdx]

In [64]:
numericCols = ["age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week"]
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [65]:
# Drop NULL values
dataset = dataset.na.drop()

In [66]:
stages

[StringIndexer_a4d5cca76ed8,
 OneHotEncoder_5ad90adab2c0,
 StringIndexer_452aeec41e8e,
 OneHotEncoder_423e4b8ba655,
 StringIndexer_d80f52724e89,
 OneHotEncoder_12708597b97d,
 StringIndexer_20338bc4d6ad,
 OneHotEncoder_3144154a6897,
 StringIndexer_9ea269a88376,
 OneHotEncoder_effd1dfd0691,
 StringIndexer_cd9b1339cfc0,
 OneHotEncoder_343c2335a837,
 StringIndexer_6f52bd0af90e,
 OneHotEncoder_242919e8f87f,
 StringIndexer_6dd0ac577778,
 OneHotEncoder_b50f5569e405,
 StringIndexer_898eb09b0a53,
 VectorAssembler_18e0894aa041]

In [68]:
partialPipeline = Pipeline().setStages(stages)
pipelineModel = partialPipeline.fit(dataset)
preppedDataDF = pipelineModel.transform(dataset)

lr = LogisticRegression(featuresCol='features', labelCol='label')
lrModel = lr.fit(preppedDataDF)

In [69]:
preppedDataDF.show()

+----+-----------------+--------+-------------+-------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+--------------+-----------------+--------------+-----------------+-------------------+----------------------+---------------+------------------+-----------------+--------------------+---------+-------------+--------+-------------+-------------------+----------------------+-----+--------------------+
| age|        workclass|  fnlwgt|    education|education_num|      marital_status|        occupation|  relationship|               race|    sex|capital_gain|capital_loss|hours_per_week|native_country|income|workclassIndex|workclassclassVec|educationIndex|educationclassVec|marital_statusIndex|marital_statusclassVec|occupationIndex|occupationclassVec|relationshipIndex|relationshipclassVec|raceIndex| raceclassVec|sexIndex|  sexclassVec|native_countryIndex|native_countryclassVec|label|      

In [70]:
# Instantiate and fit Logistic Regression model
lr = LogisticRegression(featuresCol='features', labelCol='label')
lrModel = lr.fit(preppedDataDF)
 
# ROC for training data
# Please replace `display` function with the one that's appropriate in your environment.
display(lrModel, preppedDataDF, "ROC")

LogisticRegressionModel: uid=LogisticRegression_16d91ad784c7, numClasses=2, numFeatures=100

DataFrame[age: double, workclass: string, fnlwgt: double, education: string, education_num: double, marital_status: string, occupation: string, relationship: string, race: string, sex: string, capital_gain: double, capital_loss: double, hours_per_week: double, native_country: string, income: string, workclassIndex: double, workclassclassVec: vector, educationIndex: double, educationclassVec: vector, marital_statusIndex: double, marital_statusclassVec: vector, occupationIndex: double, occupationclassVec: vector, relationshipIndex: double, relationshipclassVec: vector, raceIndex: double, raceclassVec: vector, sexIndex: double, sexclassVec: vector, native_countryIndex: double, native_countryclassVec: vector, label: double, features: vector]

'ROC'

In [71]:
display(lrModel, preppedDataDF)

LogisticRegressionModel: uid=LogisticRegression_16d91ad784c7, numClasses=2, numFeatures=100

DataFrame[age: double, workclass: string, fnlwgt: double, education: string, education_num: double, marital_status: string, occupation: string, relationship: string, race: string, sex: string, capital_gain: double, capital_loss: double, hours_per_week: double, native_country: string, income: string, workclassIndex: double, workclassclassVec: vector, educationIndex: double, educationclassVec: vector, marital_statusIndex: double, marital_statusclassVec: vector, occupationIndex: double, occupationclassVec: vector, relationshipIndex: double, relationshipclassVec: vector, raceIndex: double, raceclassVec: vector, sexIndex: double, sexclassVec: vector, native_countryIndex: double, native_countryclassVec: vector, label: double, features: vector]

In [72]:
# Keep relevant columns
selectedcols = ["label", "features"] + cols
dataset = preppedDataDF.select(selectedcols)
display(dataset)
dataset.show()

DataFrame[label: double, features: vector, age: double, workclass: string, fnlwgt: double, education: string, education_num: double, marital_status: string, occupation: string, relationship: string, race: string, sex: string, capital_gain: double, capital_loss: double, hours_per_week: double, native_country: string, income: string]

+-----+--------------------+----+-----------------+--------+-------------+-------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+
|label|            features| age|        workclass|  fnlwgt|    education|education_num|      marital_status|        occupation|  relationship|               race|    sex|capital_gain|capital_loss|hours_per_week|native_country|income|
+-----+--------------------+----+-----------------+--------+-------------+-------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+
|  0.0|(100,[4,10,24,32,...|39.0|        State-gov| 77516.0|    Bachelors|         13.0|       Never-married|      Adm-clerical| Not-in-family|              White|   Male|      2174.0|         0.0|          40.0| United-States| <=50K|
|  0.0|(100,[1,10,23,31,...|50.0| Self-emp-not-inc| 83311.0|

In [73]:
### Randomly split data into training and test sets. set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=100)
print(trainingData.count())
print(testData.count())

18076
7699


## Fit and Evaluate Models
Now, try out some of the Binary Classification algorithms available in the Pipelines API.

Out of these algorithms, the below are also capable of supporting multiclass classification with the Python API:

Decision Tree Classifier
Random Forest Classifier
These are the general steps to build the models:

Create initial model using the training set
Tune parameters with a ParamGrid and 5-fold Cross Validation
Evaluate the best model obtained from the Cross Validation using the test set
Use the BinaryClassificationEvaluator to evaluate the models, which uses areaUnderROC as the default metric.

### Logistic Regression

In the Pipelines API, you can now perform Elastic-Net Regularization with Logistic Regression, as well as other linear methods.

In [74]:
from pyspark.ml.classification import LogisticRegression
 
# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)
 
# Train model with Training Data
lrModel = lr.fit(trainingData)

By adding regParam=0.3, elasticNetParam=0.8, the result of the evaluator become worse which is 0.5. So here I didnt add it to get the better result

In [75]:
# Make predictions on test data using the transform() method.
# LogisticRegression.transform() will only use the 'features' column.
predictions = lrModel.transform(testData)

In [76]:
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
selected.show()

+-----+----------+--------------------+----+---------------+
|label|prediction|         probability| age|     occupation|
+-----+----------+--------------------+----+---------------+
|  0.0|       0.0|[0.68727303172254...|26.0| Prof-specialty|
|  0.0|       0.0|[0.53626097020027...|33.0| Prof-specialty|
|  0.0|       0.0|[0.64353982988969...|33.0| Prof-specialty|
|  0.0|       0.0|[0.60002981673358...|42.0| Prof-specialty|
|  0.0|       0.0|[0.58369476739333...|46.0| Prof-specialty|
|  0.0|       0.0|[0.60042739796493...|51.0| Prof-specialty|
|  0.0|       0.0|[0.54613159252811...|56.0| Prof-specialty|
|  0.0|       0.0|[0.61495947641604...|36.0| Prof-specialty|
|  0.0|       0.0|[0.57232120563562...|38.0| Prof-specialty|
|  0.0|       0.0|[0.63176738867092...|50.0| Prof-specialty|
|  0.0|       0.0|[0.57346331898190...|39.0| Prof-specialty|
|  0.0|       0.0|[0.71223484384091...|20.0|   Craft-repair|
|  0.0|       1.0|[0.40706932293657...|26.0|   Craft-repair|
|  0.0|       0.0|[0.629

In [77]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
 
# Evaluate model
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

0.9080891057661741

In [78]:
evaluator.getMetricName()

'areaUnderROC'

In [79]:
print(lr.explainParams())

aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)
featuresCol: features column name. (default: features, current: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label, current: label)
lowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)
lowerBoundsOnIntercepts: The lower bounds on intercepts if fitting under bound constrained optimization. The

In [80]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
 
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

In [81]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
 
# Run cross validations
cvModel = cv.fit(trainingData)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing

In [82]:
# Use the test set to measure the accuracy of the model on new data
predictions = cvModel.transform(testData)

In [83]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)

0.904508472713419

In [84]:
print('Model Intercept: ', cvModel.bestModel.intercept)

Model Intercept:  -7.148158357119041


In [85]:
print(cvModel.bestModel.explainParams())

aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0, current: 0.5)
family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)
featuresCol: features column name. (default: features, current: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label, current: label)
lowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)
lowerBoundsOnIntercepts: The lower bounds on intercepts if fitting under bound constrained opt

In [86]:
weights = cvModel.bestModel.coefficients
weights = [(float(w),) for w in weights]  # convert numpy type to float, and to tuple
weightsDF = spark.createDataFrame(weights, ["Feature Weight"])
weightsDF.show()

+--------------------+
|      Feature Weight|
+--------------------+
|                 0.0|
|-0.15184188981591537|
|                 0.0|
|-0.12344010488371085|
|                 0.0|
| 0.13232255216289374|
| 0.30802047184753334|
|                 0.0|
|                 0.0|
|                 0.0|
|0.032869197545820295|
| 0.08926126401463103|
|                 0.0|
|                 0.0|
|                 0.0|
|                 0.0|
|                 0.0|
| 0.31504426291670806|
|                 0.0|
|                 0.0|
+--------------------+
only showing top 20 rows



In [87]:
# View best model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
selected.show()

+-----+----------+--------------------+----+---------------+
|label|prediction|         probability| age|     occupation|
+-----+----------+--------------------+----+---------------+
|  0.0|       0.0|[0.66132620562644...|26.0| Prof-specialty|
|  0.0|       0.0|[0.57084093492133...|33.0| Prof-specialty|
|  0.0|       0.0|[0.67298276046275...|33.0| Prof-specialty|
|  0.0|       0.0|[0.60293216716049...|42.0| Prof-specialty|
|  0.0|       0.0|[0.62166062676431...|46.0| Prof-specialty|
|  0.0|       0.0|[0.60735041500564...|51.0| Prof-specialty|
|  0.0|       0.0|[0.58175334891881...|56.0| Prof-specialty|
|  0.0|       0.0|[0.70485005269518...|36.0| Prof-specialty|
|  0.0|       0.0|[0.59494228846478...|38.0| Prof-specialty|
|  0.0|       0.0|[0.66642754922273...|50.0| Prof-specialty|
|  0.0|       0.0|[0.60085249730869...|39.0| Prof-specialty|
|  0.0|       0.0|[0.73449214976058...|20.0|   Craft-repair|
|  0.0|       0.0|[0.53747387052797...|26.0|   Craft-repair|
|  0.0|       0.0|[0.687

## Decision Trees

The Decision Trees algorithm is popular because it handles categorical data and works out of the box with multiclass classification tasks.

In [88]:
from pyspark.ml.classification import DecisionTreeClassifier
 
# Create initial Decision Tree Model
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=3)
 
# Train model with Training Data
dtModel = dt.fit(trainingData)

In [89]:
print("numNodes = ", dtModel.numNodes)
print("depth = ", dtModel.depth)

numNodes =  11
depth =  3


In [90]:
display(dtModel)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_f76f42ff4173, depth=3, numNodes=11, numClasses=2, numFeatures=100

In [91]:
# Make predictions on test data using the Transformer.transform() method.
predictions = dtModel.transform(testData)

In [92]:
predictions.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- age: double (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: double (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: double (nullable = true)
 |-- capital_loss: double (nullable = true)
 |-- hours_per_week: double (nullable = true)
 |-- native_country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [93]:
# View model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
display(selected)

DataFrame[label: double, prediction: double, probability: vector, age: double, occupation: string]

In [94]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Evaluate model
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

0.7284311159503848

In [95]:
dt.getImpurity()

'gini'

In [96]:
# Create ParamGrid for Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [1, 2, 6, 10])
             .addGrid(dt.maxBins, [20, 40, 80])
             .build())

In [97]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
 
# Run cross validations
cvModel = cv.fit(trainingData)
# Takes ~5 minutes

In [98]:
print("numNodes = ", cvModel.bestModel.numNodes)
print("depth = ", cvModel.bestModel.depth)


numNodes =  399
depth =  10


In [99]:
# Use test set to measure the accuracy of the model on new data
predictions = cvModel.transform(testData)

In [100]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)

0.757322868455687

In [101]:
# View Best model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
display(selected)

DataFrame[label: double, prediction: double, probability: vector, age: double, occupation: string]

## Random Forest


Random Forests uses an ensemble of trees to improve model accuracy.

In [102]:
from pyspark.ml.classification import RandomForestClassifier
 
# Create an initial RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
 
# Train model with Training Data
rfModel = rf.fit(trainingData)

In [103]:
# Make predictions on test data using the Transformer.transform() method.
predictions = rfModel.transform(testData)

In [104]:
predictions.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- age: double (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: double (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: double (nullable = true)
 |-- capital_loss: double (nullable = true)
 |-- hours_per_week: double (nullable = true)
 |-- native_country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [105]:
# View model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
display(selected)

DataFrame[label: double, prediction: double, probability: vector, age: double, occupation: string]

In [106]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
 
# Evaluate model
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

0.8926434605070842

In [107]:
# Create ParamGrid for Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
 
paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [2, 4, 6])
             .addGrid(rf.maxBins, [20, 60])
             .addGrid(rf.numTrees, [5, 20])
             .build())

In [108]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
 
# Run cross validations.  This can take about 6 minutes since it is training over 20 trees!
cvModel = cv.fit(trainingData)

In [109]:
# Use the test set to measure the accuracy of the model on new data
predictions = cvModel.transform(testData)

In [110]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)

0.9002739788547749

In [111]:
# View Best model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
display(selected)

DataFrame[label: double, prediction: double, probability: vector, age: double, occupation: string]

## Make Predictions
As Random Forest gives the best areaUnderROC value, use the bestModel obtained from Random Forest for deployment, and use it to generate predictions on new data. Instead of new data, this example generates predictions on the entire dataset.



In [112]:
bestModel = cvModel.bestModel

# Generate predictions for entire dataset
finalPredictions = bestModel.transform(dataset)

# Evaluate best model
evaluator.evaluate(finalPredictions)

0.8998333946659142

In [113]:
finalPredictions.show()

+-----+--------------------+----+-----------------+--------+-------------+-------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+--------------------+--------------------+----------+
|label|            features| age|        workclass|  fnlwgt|    education|education_num|      marital_status|        occupation|  relationship|               race|    sex|capital_gain|capital_loss|hours_per_week|native_country|income|       rawPrediction|         probability|prediction|
+-----+--------------------+----+-----------------+--------+-------------+-------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+--------------------+--------------------+----------+
|  0.0|(100,[4,10,24,32,...|39.0|        State-gov| 77516.0|    Bachelors|         13.0|       Never-married|      Adm-clerical| Not-in-

In [114]:
finalPredictions.createOrReplaceTempView("finalPredictions")