Dataset source: https://archive.ics.uci.edu/ml/datasets/Adult <br/>
PySpark ML Classifier Reference: https://spark.apache.org/docs/2.3.0/ml-classification-regression.html#classification <br/>
<b> <i> Classifier Models Used: </i> </b> <br/>
<ul>
  <li> Logistic Regression </li>
  <li> Naive Bayes </li>
  <li> Decision Tree </li>
  <li> Gradient-boosted Tree </li>
  <li> Random Forest </li>
  <li> Multilayer Perceptron </li>
  <li> One-vs-All (Logistic Regression, Random Forest) </li>
</ul>
<hr/>

In [2]:
# Computational and Visualisation Packages
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from ggplot import *

# Pyspark Packages
from pyspark.sql import functions as F
from pyspark.sql.functions import col, desc, trim
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.classification import NaiveBayes, LogisticRegression, DecisionTreeClassifier, GBTClassifier, RandomForestClassifier, OneVsRest, MultilayerPerceptronClassifier
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [3]:
adult_gov_data = spark.read.csv('/databricks-datasets/adult/adult.data')
country_codes = spark.sql('SELECT * FROM country_codes')

adult_gov_data = adult_gov_data.select(col('_c0').cast('double').alias('age'), col('_c1').alias('workclass'), col('_c2').cast('double').alias('fnlwgt'), col('_c3').alias('education'), col('_c4').cast('double').alias('education_num'), col('_c5').alias('marital_status'),col('_c6').alias('occupation'), col('_c7').alias('relationship'), col('_c8').alias('race'), col('_c9').alias('sex'), col('_c10').cast('double').alias('capital_gain'), col('_c11').cast('double').alias('capital_loss'), col('_c12').cast('double').alias('hours_per_week'), col('_c13').alias('native_country'), col('_c14').alias('income'))
adult_gov_data = adult_gov_data.withColumn('native_country', F.regexp_replace(col('native_country'), '-', ' '))
adult_gov_data = adult_gov_data.withColumn('native_country', trim(col('native_country')))

expanded_cols = adult_gov_data.columns + ['alpha_3_code', 'latitude', 'longitude']
adult_gov_data_expanded = adult_gov_data.join(country_codes, adult_gov_data.native_country == country_codes.country, how='left')[expanded_cols]

In [4]:
display (adult_gov_data_expanded.sample(False, 0.01), 250)

In [5]:
display (adult_gov_data_expanded.describe())

In [6]:
display(adult_gov_data_expanded.groupBy('alpha_3_code').agg(F.sum('hours_per_week').alias('total_hours')))

In [7]:
col_categorical = ['workclass', 'education', 'marital_status', 'occupation', 'relationship', 'race', 'sex', 'native_country']
pipeline_steps = [] 

for column in col_categorical:
    string_indexed = StringIndexer(inputCol=column, outputCol=column + "Index")
    one_hot_encoded = OneHotEncoderEstimator(inputCols=[string_indexed.getOutputCol()], outputCols=[column + "classVec"])
    pipeline_steps += [string_indexed, one_hot_encoded]
    
label_stringIdx = StringIndexer(inputCol="income", outputCol="label")
pipeline_steps += [label_stringIdx]

numerical_columns = ['age', 'fnlwgt', 'education_num', 'capital_gain', 'capital_loss', 'hours_per_week']
assembler_col = [col + "classVec" for col in col_categorical] + numerical_columns
assemblerModel = VectorAssembler(inputCols=assembler_col, outputCol="features")
pipeline_steps += [assemblerModel]

# Applying the pipeline on the dataset
pipelineInst = Pipeline (stages=pipeline_steps)
pipelineModel = pipelineInst.fit (adult_gov_data_expanded)
adult_gov_data_processed = pipelineModel.transform (adult_gov_data_expanded).select(adult_gov_data_expanded.columns + ['label', 'features'])
train, test = adult_gov_data_processed.randomSplit([.75, .25], seed=121)

In [8]:
display (train.describe())

In [9]:
display_cols = ['label', 'age', 'occupation', 'probability', 'prediction']

In [10]:
lrInst = LogisticRegression(labelCol='label', featuresCol='features', maxIter=50)
lrModel = lrInst.fit(train)
lrPredictions = lrModel.transform(test)

lrbceInst = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print ("Model Accuracy = %.15f" % lrbceInst.evaluate(lrPredictions))

In [11]:
display (lrPredictions[display_cols])

In [12]:
lrpgInst = (ParamGridBuilder()
             .addGrid(lrInst.regParam, [0.01, 0.5, 2.0])
             .addGrid(lrInst.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lrInst.maxIter, [15, 30, 45])
             .build())
lrcvInst = CrossValidator(estimator=lrInst, estimatorParamMaps=lrpgInst, evaluator=lrbceInst, numFolds=5) #5 folds
lrcvModel = lrcvInst.fit(train)
lrcvPredictions = lrcvModel.transform(test)
print ('Best Model Score: %.15f' % lrbceInst.evaluate(lrcvPredictions))
print ('Best Model Intercept: %.15f' % lrcvModel.bestModel.intercept)
new_frame_lrweights = sqlContext.createDataFrame([(float(w),) for w in lrcvModel.bestModel.coefficients], ["Feature Weight"])

In [13]:
display(new_frame_lrweights)

In [14]:
display(lrcvPredictions[display_cols])

In [15]:
nbInst = NaiveBayes(labelCol='label', featuresCol='features', smoothing=2.0, modelType="multinomial")
nbModel = nbInst.fit(train)
nbPredictions = nbModel.transform(test)

nbmceInst = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
print ("Model Accuracy = %.15f" % nbmceInst.evaluate(nbPredictions))

In [16]:
display (nbPredictions[display_cols])

In [17]:
nbpgInst = (ParamGridBuilder()
             .addGrid(nbInst.smoothing, [1.0, 2.0, 3.0, 4.0, 5.0])
             .build())
nbcvInst = CrossValidator(estimator=nbInst, estimatorParamMaps=nbpgInst, evaluator=nbmceInst, numFolds=5) #5 folds
nbcvModel = nbcvInst.fit(train)
nbcvBestPredictions = nbcvModel.transform(test)
print ('Best Model Score: %.15f' % nbmceInst.evaluate(nbcvBestPredictions))

In [18]:
display(nbcvBestPredictions)

In [19]:
lrPrevBestModel = lrcvModel.bestModel
ovalrInst = LogisticRegression(labelCol='label', featuresCol='features', fitIntercept=True, 
                               maxIter=lrPrevBestModel._java_obj.getMaxIter(), 
                               elasticNetParam=lrPrevBestModel._java_obj.getElasticNetParam(), 
                               regParam=lrPrevBestModel._java_obj.getRegParam())
ovaInst = OneVsRest(classifier=ovalrInst)
ovaModel = ovaInst.fit(train)
ovaPredictions = ovaModel.transform(test)

ovamceInst = MulticlassClassificationEvaluator(metricName="accuracy")
print ("Model Accuracy = %.15f" % ovamceInst.evaluate(ovaPredictions))

In [20]:
display (ovaPredictions)

In [21]:
dtInst = DecisionTreeClassifier (labelCol="label", featuresCol="features", maxDepth=4)
dtModel = dtInst.fit (train)
dtPredictions = dtModel.transform (test)
dtbceInst = BinaryClassificationEvaluator()
print ('Model Fit Score: %.15f' % dtbceInst.evaluate(dtPredictions))

In [22]:
display(dtPredictions[display_cols])

In [23]:
dtpgInst = (ParamGridBuilder()
             .addGrid(dtInst.maxDepth, [1, 3, 8, 12])
             .addGrid(dtInst.maxBins, [25, 50, 90])
             .build())
dtcvInst = CrossValidator(estimator=dtInst, estimatorParamMaps=dtpgInst, evaluator=dtbceInst, numFolds=5) #5 folds
dtcvModel = dtcvInst.fit(train)
dtcvBestPredictions = dtcvModel.transform(test)
print ('Best Model Score: %.15f' % dtbceInst.evaluate(dtcvBestPredictions))

In [24]:
display(dtcvBestPredictions[display_cols])

In [25]:
gbtInst = GBTClassifier (labelCol="label", featuresCol="features", maxIter=20, maxDepth=5)
gbtModel = gbtInst.fit (train)
gbtPredictions = gbtModel.transform (test)
gbtmceInst = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
print ('Model Fit Score: %.15f' % gbtmceInst.evaluate(gbtPredictions))

In [26]:
display(gbtPredictions[display_cols])

In [27]:
gbtpgInst = (ParamGridBuilder()
             .addGrid(gbtInst.maxDepth, [3, 5, 7])
             .addGrid(gbtInst.maxIter, [25, 40, 50])
             .build())
gbtcvInst = CrossValidator(estimator=gbtInst, estimatorParamMaps=gbtpgInst, evaluator=gbtmceInst, numFolds=5) #5 folds
gbtcvModel = gbtcvInst.fit(train)
gbtcvBestPredictions = gbtcvModel.transform(test)
print ('Best Model Score: %.15f' % gbtmceInst.evaluate(gbtcvBestPredictions))

In [28]:
display (gbtcvBestPredictions)

In [29]:
rfInst = RandomForestClassifier(labelCol="label", featuresCol="features")
rfModel = rfInst.fit(train)
rfPredictions = rfModel.transform(test)
rfbceInst = BinaryClassificationEvaluator()
print ('Model Fit Score: %.15f' % rfbceInst.evaluate(rfPredictions))

In [30]:
display (rfPredictions[display_cols])

In [31]:
rfpgInst = (ParamGridBuilder()
             .addGrid(rfInst.maxDepth, [1, 3, 8])
             .addGrid(rfInst.maxBins, [25, 50])
             .addGrid(rfInst.numTrees, [5, 20])
             .build())
rfcvInst = CrossValidator(estimator=rfInst, estimatorParamMaps=rfpgInst, evaluator=rfbceInst, numFolds=5)
rfcvModel = rfcvInst.fit(train)
rfcvBestPredictions = rfcvModel.transform(test)
print ('Best Model Score: %.15f' % rfbceInst.evaluate(rfcvBestPredictions))

In [32]:
display (rfcvBestPredictions)

In [33]:
rfPrevBestModel = rfcvModel.bestModel
ovarfInst = RandomForestClassifier(labelCol='label', featuresCol='features',
                               maxDepth=rfPrevBestModel._java_obj.getMaxDepth(), 
                               maxBins=rfPrevBestModel._java_obj.getMaxBins(), 
                               numTrees=rfPrevBestModel._java_obj.getNumTrees())
ovarfInst = OneVsRest(classifier=ovarfInst)
ovarfModel = ovarfInst.fit(train)
ovarfPredictions = ovarfModel.transform(test)

ovamceInst = MulticlassClassificationEvaluator(metricName="accuracy")
print ("Model Accuracy = %.15f" % ovamceInst.evaluate(ovarfPredictions))

In [34]:
display (ovarfPredictions)

In [35]:
layers = [train.schema["features"].metadata["ml_attr"]["num_attrs"], 20, 10, 2]
mpcInst = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=451)
mpcModel = mpcInst.fit(train)
mpcPredictions = mpcModel.transform(test)
mpcBce = MulticlassClassificationEvaluator(metricName="accuracy")
print ("Model Accuracy = %.15f" % mpcBce.evaluate(mpcPredictions))

In [36]:
display (mpcPredictions)

In [37]:
mpcpgInst = (ParamGridBuilder()
             .addGrid(mpcInst.maxIter, [50, 100, 150])
             .addGrid(mpcInst.blockSize, [128, 256, 512])
             .build())
mpccvInst = CrossValidator(estimator=mpcInst, estimatorParamMaps=mpcpgInst, evaluator=mpcBce, numFolds=5)
mpccvModel = mpccvInst.fit(train)
mpccvBestPredictions = mpccvModel.transform(test)
print ("Best Model Accuracy Score = %.15f" % mpcBce.evaluate(mpccvBestPredictions))

In [38]:
display (mpccvBestPredictions)

In [39]:
selectedModel = rfcvModel.bestModel
selectedPredictions = selectedModel.transform(adult_gov_data_processed)
print ("Model Fit Score = ", rfbceInst.evaluate(selectedPredictions))

In [40]:
display(selectedPredictions)

In [41]:
display(selectedPredictions)

In [42]:
classifier_comparison_df = spark.createDataFrame(pd.DataFrame({
  'classifiers':['LR', 'NB', 'DT', 'GBT', 'RF', 'MLP', 'ovA-LR', 'ovA-RF'], 
  'accuracy_scores':[90.1, 78.8, 76.7, 85.5, 89.1, 76.5, 84.8, 84.8]
}))
display(classifier_comparison_df)

We implemented all the classifiers available with the Pyspark ML module, and the RF classifier emerged with the best accuracy for this dataset with a score of 90.6%. We plan to implement XGBoost on the same data in the future notebooks. <br/> 

For the best models against each classifier, the below-mentioned accuracy scores were achieved: <br/>

<ul>
  <li> Logistic Regression - 90.1% </li>
  <li> Naive Bayes - 78.8% </li>
  <li> Decision Tree - 76.7% </li>
  <li> Gradient-boosted Tree - 85.5% </li>
  <li> Random Forest - 89.1% </li>
  <li> Multilayer Perceptron - 76.5%</li>
  <li> One-vs-All (Logistic Regression - 84.8%, Random Forest - 84.8%) </li>
</ul>

The published version of the notebook is available at - <br/> 
https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/3173713035751393/675963439015456/2308983777460038/latest.html