In [12]:
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np

1. Data loading

In [13]:
#Read csv file to dataframe
#=====your code here==========
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
data= spark.read.format("csv").option("inferSchema",'true').load("adult.data")
#===============================
data.show(3)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/10/24 04:30:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


                                                                                

+---+-----------------+--------+----------+----+-------------------+------------------+--------------+------+-----+------+----+----+--------------+------+
|_c0|              _c1|     _c2|       _c3| _c4|                _c5|               _c6|           _c7|   _c8|  _c9|  _c10|_c11|_c12|          _c13|  _c14|
+---+-----------------+--------+----------+----+-------------------+------------------+--------------+------+-----+------+----+----+--------------+------+
| 39|        State-gov| 77516.0| Bachelors|13.0|      Never-married|      Adm-clerical| Not-in-family| White| Male|2174.0| 0.0|40.0| United-States| <=50K|
| 50| Self-emp-not-inc| 83311.0| Bachelors|13.0| Married-civ-spouse|   Exec-managerial|       Husband| White| Male|   0.0| 0.0|13.0| United-States| <=50K|
| 38|          Private|215646.0|   HS-grad| 9.0|           Divorced| Handlers-cleaners| Not-in-family| White| Male|   0.0| 0.0|40.0| United-States| <=50K|
+---+-----------------+--------+----------+----+-------------------+--

In [14]:
from functools import reduce

In [15]:
#change the column names of dataframe
df = data.withColumnRenamed('_c0','age').withColumnRenamed('_c1','workclass').withColumnRenamed('_c2','fnlwgt')\
.withColumnRenamed('_c3', 'education').withColumnRenamed('_c4', 'education_num')\
.withColumnRenamed('_c5','marital_status').withColumnRenamed('_c6', 'occupation').withColumnRenamed('_c7', 'relationship')\
.withColumnRenamed('_c8', 'race').withColumnRenamed('_c9', 'sex').withColumnRenamed('_c10', 'capital_gain')\
.withColumnRenamed('_c11', 'capital_loss').withColumnRenamed('_c12','hours_per_week')\
.withColumnRenamed('_c13', 'native_country').withColumnRenamed('_c14', 'income')

df.printSchema()
df.show(2)

dataset = df

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: double (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: double (nullable = true)
 |-- capital_loss: double (nullable = true)
 |-- hours_per_week: double (nullable = true)
 |-- native_country: string (nullable = true)
 |-- income: string (nullable = true)

+---+-----------------+-------+----------+-------------+-------------------+----------------+--------------+------+-----+------------+------------+--------------+--------------+------+
|age|        workclass| fnlwgt| education|education_num|     marital_status|      occupation|  relationship|  race|  sex|capital_gain|capital_loss|hours_per_week|native_country|income|
+-

2. Data preprocessing

In [16]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

In [17]:
#stages in our Pipeline
stages = []
categoricalColumns = ["workclass","education","marital_status","occupation","relationship","race","sex","native_country"]

In [18]:
for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]

In [19]:
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol="income", outputCol="label")
stages += [label_stringIdx]

In [20]:
# Transform all features into a vector using VectorAssembler
numericCols = ["age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week"]
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [21]:
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(dataset)
preppedDataDF = pipelineModel.transform(dataset)

                                                                                

In [22]:
preppedDataDF.take(3)

22/10/24 04:30:40 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


[Row(age=39, workclass=' State-gov', fnlwgt=77516.0, education=' Bachelors', education_num=13.0, marital_status=' Never-married', occupation=' Adm-clerical', relationship=' Not-in-family', race=' White', sex=' Male', capital_gain=2174.0, capital_loss=0.0, hours_per_week=40.0, native_country=' United-States', income=' <=50K', workclassIndex=4.0, workclassclassVec=SparseVector(8, {4: 1.0}), educationIndex=2.0, educationclassVec=SparseVector(15, {2: 1.0}), marital_statusIndex=1.0, marital_statusclassVec=SparseVector(6, {1: 1.0}), occupationIndex=3.0, occupationclassVec=SparseVector(14, {3: 1.0}), relationshipIndex=1.0, relationshipclassVec=SparseVector(5, {1: 1.0}), raceIndex=0.0, raceclassVec=SparseVector(4, {0: 1.0}), sexIndex=0.0, sexclassVec=SparseVector(1, {0: 1.0}), native_countryIndex=0.0, native_countryclassVec=SparseVector(41, {0: 1.0}), label=0.0, features=SparseVector(100, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 94: 39.0, 95: 77516.0, 96: 13.0, 9

In [23]:
# Keep relevant columns
cols = dataset.columns
selectedcols = ["label", "features"] + cols
dataset = preppedDataDF.select(selectedcols)
display(dataset)

DataFrame[label: double, features: vector, age: int, workclass: string, fnlwgt: double, education: string, education_num: double, marital_status: string, occupation: string, relationship: string, race: string, sex: string, capital_gain: double, capital_loss: double, hours_per_week: double, native_country: string, income: string]

In [24]:
### Randomly split data into training and test sets. set seed for reproducibility
#=====your code here==========
trainingData, testData = dataset.randomSplit([0.7, 0.3], seed=100)


#===============================
print(trainingData.count())
print(testData.count())

                                                                                

22832


[Stage 35:>                                                         (0 + 1) / 1]

9729


                                                                                

3. Modeling

In [25]:
#test accuracy dict 
test_accuracy = {}

In [26]:
# Fit model to prepped data
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
#LogisticRegression model, maxIter=10
#=====your code here==========
# Create a LogisticRegression instance. This instance is an Estimator.
lr = LogisticRegression(maxIter=10, regParam=0.01, elasticNetParam=0.05)
lrModel = lr.fit(trainingData)
trainingSummary = lrModel.summary


#===============================


# select example rows to display.
predictions = lrModel.transform(testData)
##predictions.show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))
test_accuracy[lr] = accuracy


[Stage 52:>                                                         (0 + 1) / 1]

Test set accuracy = 0.8436632747456059


                                                                                

In [27]:
#Random Forest
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=10)
rfModel = rf.fit(trainingData)

# select example rows to display.
predictions = rfModel.transform(testData)
#predictions.show()
# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))
test_accuracy[rf] = accuracy

[Stage 68:>                                                         (0 + 1) / 1]

Test set accuracy = 0.82310617740775


                                                                                

In [28]:
#NaiveBayes
from pyspark.ml.classification import NaiveBayes
#=====your code here==========
nb = NaiveBayes(smoothing=1,modelType="multinomial")
nbModel = nb.fit(trainingData)

#===============================


# select example rows to display.
predictions = nbModel.transform(testData)
#predictions.show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))
test_accuracy[nb] = accuracy


[Stage 73:>                                                         (0 + 1) / 1]

Test set accuracy = 0.7796279165381849


                                                                                

In [29]:
#Decision Tree
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")
dtModel = dt.fit(trainingData)
# select example rows to display.
predictions = dtModel.transform(testData)
#predictions.show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))
test_accuracy[dt] = accuracy

[Stage 89:>                                                         (0 + 1) / 1]

Test set accuracy = 0.8423270634186453


                                                                                

In [36]:
#Gradient Boosting Trees
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(labelCol="label", featuresCol="features", maxIter=10)
gbtModel = gbt.fit(trainingData)

# select example rows to display.
predictions = gbtModel.transform(testData)
#predictions.show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

test_accuracy[gbt] = accuracy


                                                                                

Test set accuracy = 0.8452050570459451


                                                                                

In [37]:
# Multi-layer Perceptron
from pyspark.ml.classification import MultilayerPerceptronClassifier
layers = [100,3,3,2]
mlp = MultilayerPerceptronClassifier(maxIter=10, layers=layers, blockSize=128, seed=1234)
MLPModel = mlp.fit(trainingData)

# select example rows to display.
predictions = MLPModel.transform(testData)
#predictions.show()

# compute accuracy on the test set
predictionAndLabels = predictions.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))
test_accuracy[mlp] = accuracy

[Stage 377:>                                                        (0 + 1) / 1]

Test set accuracy = 0.7570151094665434


                                                                                

In [32]:
# Linear Support Vector Machine
from pyspark.ml.classification import LinearSVC
lsvc = LinearSVC(maxIter=10, regParam=0.1)
lsvcModel = lsvc.fit(trainingData)

# select example rows to display.
predictions = lsvcModel.transform(testData)
#predictions.show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))
test_accuracy[lsvc] = accuracy

[Stage 232:>                                                        (0 + 1) / 1]

Test set accuracy = 0.8348237228903279


                                                                                

In [33]:
# One-vs-Rest
from pyspark.ml.classification import LogisticRegression, OneVsRest
lr = LogisticRegression(maxIter=10, tol=1E-6, fitIntercept=True)
ovr = OneVsRest(classifier=lr)
ovrModel = ovr.fit(trainingData)

# select example rows to display.
predictions = lsvcModel.transform(testData)
#predictions.show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))
test_accuracy[ovr] = accuracy

[Stage 261:>                                                        (0 + 1) / 1]

Test set accuracy = 0.8348237228903279


                                                                                

4. Comparison and analysis

In [34]:
# Rank models according to Test set accuracy
#=====your code here==========
sorted_x = dict(sorted(test_accuracy.items(), key=lambda kv: kv[1]))
sorted_x
#===============================

{MultilayerPerceptronClassifier_5f2b55d9cb1d: 0.7570151094665434,
 NaiveBayes_8b3eaabe9208: 0.7796279165381849,
 RandomForestClassifier_2eb3d6818287: 0.82310617740775,
 LinearSVC_d44818d230d2: 0.8348237228903279,
 OneVsRest_853e8fb93518: 0.8348237228903279,
 DecisionTreeClassifier_1c0de9aa96f6: 0.8423270634186453,
 LogisticRegression_10afd4af5b79: 0.8436632747456059,
 GBTClassifier_3e885d0830d1: 0.8452050570459451}

*your analysis*

The highest accuracy is GBTclassifier and the lowest is MLP. GBT supports binary classification which is perfect for this task where the classification predicts the label between Yes or No (1 or 0). 

In [35]:
!export PATH=/Library/TeX/texbin:$PATH

5360.53s - pydevd: Sending message related to process being replaced timed-out after 5 seconds
