The data contains anonymous information such as age, occupation, education, working class, etc. The goal is to train a binary classifier to predict the income which has two possible values ‘>50K’ and ‘<50K’. There are 48842 instances and 14 attributes in the dataset. The data contains a good blend of categorical, numerical and missing values.

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark= SparkSession.builder.appName('BinaryClassification').getOrCreate()
df= spark.read.csv('adult', sep=',',inferSchema=True,header=True)

In [3]:
df.show(10)

+---+-----------------+--------+----------+---------------+--------------------+------------------+--------------+------+-------+------------+------------+--------------+--------------+------+
|age|        workclass|  fnlwgt| education|educational-num|      marital-status|        occupation|  relationship|  race| gender|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+-----------------+--------+----------+---------------+--------------------+------------------+--------------+------+-------+------------+------------+--------------+--------------+------+
| 39|        State-gov| 77516.0| Bachelors|           13.0|       Never-married|      Adm-clerical| Not-in-family| White|   Male|      2174.0|         0.0|          40.0| United-States| <=50K|
| 50| Self-emp-not-inc| 83311.0| Bachelors|           13.0|  Married-civ-spouse|   Exec-managerial|       Husband| White|   Male|         0.0|         0.0|          13.0| United-States| <=50K|
| 38|          Private|215646.0|   

In [4]:

cols = df.columns
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: double (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: double (nullable = true)
 |-- capital-loss: double (nullable = true)
 |-- hours-per-week: double (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [5]:
df.count()

48842

Total Observations in the dataset : 48842

In [6]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler

Categorical Columns Staging 

In [7]:
categoricalColumns = ["workclass", "education", "marital-status", "occupation", "relationship", 
                      "race", "gender", "native-country"]

In [8]:
stages = []

In [9]:
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

Lable Indexing.

In [10]:
label_stringIdx = StringIndexer(inputCol = 'income', outputCol = 'label')
stages += [label_stringIdx]

Numerical Columns Indexing : 

In [11]:
numericCols = ["age", "fnlwgt", "educational-num", "capital-gain", "capital-loss", "hours-per-week"]

In [12]:
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
print(assemblerInputs)

['workclassclassVec', 'educationclassVec', 'marital-statusclassVec', 'occupationclassVec', 'relationshipclassVec', 'raceclassVec', 'genderclassVec', 'native-countryclassVec', 'age', 'fnlwgt', 'educational-num', 'capital-gain', 'capital-loss', 'hours-per-week']


In [13]:
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
print(assembler)

VectorAssembler_50e1fbf664a3


In [14]:
stages += [assembler]

In [15]:
print(stages)

[StringIndexer_5e7c4813cdf0, OneHotEncoderEstimator_5d695f056a34, StringIndexer_76d734be0f04, OneHotEncoderEstimator_455a5e0cf10f, StringIndexer_297324e700b6, OneHotEncoderEstimator_5879eb2c4a77, StringIndexer_c8be966fdd33, OneHotEncoderEstimator_e42a15d82f3b, StringIndexer_d10abda033b9, OneHotEncoderEstimator_d6f6924e3e0c, StringIndexer_473d7bf16973, OneHotEncoderEstimator_80afe5fb08b9, StringIndexer_86e2c2eb3503, OneHotEncoderEstimator_a60e6632ed64, StringIndexer_64d4135c1807, OneHotEncoderEstimator_a58e8e927ce9, StringIndexer_157882b1a348, VectorAssembler_50e1fbf664a3]


Pipelining 

In [16]:
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)

In [17]:
selectedcols = ["label", "features"] + cols
df = df.select(selectedcols)
df.show(3)

+-----+--------------------+---+-----------------+--------+----------+---------------+-------------------+------------------+--------------+------+------+------------+------------+--------------+--------------+------+
|label|            features|age|        workclass|  fnlwgt| education|educational-num|     marital-status|        occupation|  relationship|  race|gender|capital-gain|capital-loss|hours-per-week|native-country|income|
+-----+--------------------+---+-----------------+--------+----------+---------------+-------------------+------------------+--------------+------+------+------------+------------+--------------+--------------+------+
|  0.0|(100,[4,10,24,32,...| 39|        State-gov| 77516.0| Bachelors|           13.0|      Never-married|      Adm-clerical| Not-in-family| White|  Male|      2174.0|         0.0|          40.0| United-States| <=50K|
|  0.0|(100,[1,10,23,31,...| 50| Self-emp-not-inc| 83311.0| Bachelors|           13.0| Married-civ-spouse|   Exec-managerial|   

Training & Test Data 

In [18]:
train, test = df.randomSplit([0.7, 0.3], seed=100)
print(train.count())
print(test.count())

34238
14604


# Logistic Regression

In [19]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol = 'label', featuresCol = 'features', maxIter=10)
lrModel = lr.fit(train)

In [20]:

predictions = lrModel.transform(test)
predictions.take(1)

[Row(label=0.0, features=SparseVector(100, {0: 1.0, 8: 1.0, 23: 1.0, 29: 1.0, 43: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 94: 26.0, 95: 58426.0, 96: 9.0, 99: 50.0}), age=26, workclass=' Private', fnlwgt=58426.0, education=' HS-grad', educational-num=9.0, marital-status=' Married-civ-spouse', occupation=' Prof-specialty', relationship=' Husband', race=' White', gender=' Male', capital-gain=0.0, capital-loss=0.0, hours-per-week=50.0, native-country=' United-States', income=' <=50K', rawPrediction=DenseVector([0.5645, -0.4683, 0.2903, -0.3865]), probability=DenseVector([0.3996, 0.1423, 0.3038, 0.1544]), prediction=0.0)]

In [21]:
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
selected.show(10)

+-----+----------+--------------------+---+---------------+
|label|prediction|         probability|age|     occupation|
+-----+----------+--------------------+---+---------------+
|  0.0|       0.0|[0.39959773075018...| 26| Prof-specialty|
|  0.0|       0.0|[0.39231673832589...| 30| Prof-specialty|
|  0.0|       0.0|[0.39419842744541...| 31| Prof-specialty|
|  0.0|       0.0|[0.38926267474880...| 32| Prof-specialty|
|  0.0|       0.0|[0.40537909769263...| 39| Prof-specialty|
|  0.0|       0.0|[0.37721610716484...| 47| Prof-specialty|
|  0.0|       0.0|[0.38566036472791...| 50| Prof-specialty|
|  0.0|       0.0|[0.38624026444156...| 51| Prof-specialty|
|  0.0|       0.0|[0.39983233242197...| 60| Prof-specialty|
|  0.0|       0.0|[0.38994746593588...| 61| Prof-specialty|
+-----+----------+--------------------+---+---------------+
only showing top 10 rows



Evaluate Logistic Regression Model

In [22]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print('Area Under ROC', evaluator.evaluate(predictions))

Area Under ROC 0.31253339454697826


In [23]:
evaluator.getMetricName()

'areaUnderROC'