In [49]:
import re
with open('data/adult.names.txt', 'r') as f:
    featureNames = [ line.split(':')[0] for line in f.readlines() if re.match(r'^[a-z\-]+:.*', line)]

print(featureNames)
columnNames = featureNames + ['income']
print(columnNames)

['age', 'workclass', 'fnlwgt', 'education', 'education-num', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'capital-gain', 'capital-loss', 'hours-per-week', 'native-country']
['age', 'workclass', 'fnlwgt', 'education', 'education-num', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'capital-gain', 'capital-loss', 'hours-per-week', 'native-country', 'income']


In [50]:
inputDF = spark.createDataFrame(spark.read.csv('data/adult.data.bz2', inferSchema=True).rdd, variable_names)
inputDF.printSchema()
display(inputDF.limit(3))

root
 |-- age: long (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: double (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: double (nullable = true)
 |-- capital-loss: double (nullable = true)
 |-- hours-per-week: double (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



Unnamed: 0,age,workclass,fnlwgt,education,education-num,marital-status,occupation,relationship,race,sex,capital-gain,capital-loss,hours-per-week,native-country,income
0,39,State-gov,77516.0,Bachelors,13.0,Never-married,Adm-clerical,Not-in-family,White,Male,2174.0,0.0,40.0,United-States,<=50K
1,50,Self-emp-not-inc,83311.0,Bachelors,13.0,Married-civ-spouse,Exec-managerial,Husband,White,Male,0.0,0.0,13.0,United-States,<=50K
2,38,Private,215646.0,HS-grad,9.0,Divorced,Handlers-cleaners,Not-in-family,White,Male,0.0,0.0,40.0,United-States,<=50K


In [51]:
from pyspark.sql.functions import trim
dataDF = inputDF.withColumn('label',trim(inputDF.income).startswith('>').cast('double'))
display(dataDF.limit(10))

Unnamed: 0,age,workclass,fnlwgt,education,education-num,marital-status,occupation,relationship,race,sex,capital-gain,capital-loss,hours-per-week,native-country,income,label
0,39,State-gov,77516.0,Bachelors,13.0,Never-married,Adm-clerical,Not-in-family,White,Male,2174.0,0.0,40.0,United-States,<=50K,0.0
1,50,Self-emp-not-inc,83311.0,Bachelors,13.0,Married-civ-spouse,Exec-managerial,Husband,White,Male,0.0,0.0,13.0,United-States,<=50K,0.0
2,38,Private,215646.0,HS-grad,9.0,Divorced,Handlers-cleaners,Not-in-family,White,Male,0.0,0.0,40.0,United-States,<=50K,0.0
3,53,Private,234721.0,11th,7.0,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,0.0,0.0,40.0,United-States,<=50K,0.0
4,28,Private,338409.0,Bachelors,13.0,Married-civ-spouse,Prof-specialty,Wife,Black,Female,0.0,0.0,40.0,Cuba,<=50K,0.0
5,37,Private,284582.0,Masters,14.0,Married-civ-spouse,Exec-managerial,Wife,White,Female,0.0,0.0,40.0,United-States,<=50K,0.0
6,49,Private,160187.0,9th,5.0,Married-spouse-absent,Other-service,Not-in-family,Black,Female,0.0,0.0,16.0,Jamaica,<=50K,0.0
7,52,Self-emp-not-inc,209642.0,HS-grad,9.0,Married-civ-spouse,Exec-managerial,Husband,White,Male,0.0,0.0,45.0,United-States,>50K,1.0
8,31,Private,45781.0,Masters,14.0,Never-married,Prof-specialty,Not-in-family,White,Female,14084.0,0.0,50.0,United-States,>50K,1.0
9,42,Private,159449.0,Bachelors,13.0,Married-civ-spouse,Exec-managerial,Husband,White,Male,5178.0,0.0,40.0,United-States,>50K,1.0


In [101]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.types import StringType

catrgoricalFeatures = [ f.name for f in dataDF.schema.fields if f.dataType == StringType() and f.name!='income']
print(catrgoricalFeatures)
indexerPipeline = Pipeline(stages = [ StringIndexer(inputCol=feature, 
                                            outputCol= "cat_%s"% feature, handleInvalid='skip') for feature in catrgoricalFeatures])

pipelineModel = indexerPipeline.fit(dataDF)
indexed_df = pipelineModel.transform(dataDF)
indexed_df.printSchema()

['workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'native-country']
root
 |-- age: long (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: double (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: double (nullable = true)
 |-- capital-loss: double (nullable = true)
 |-- hours-per-week: double (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- label: double (nullable = true)
 |-- cat_workclass: double (nullable = true)
 |-- cat_education: double (nullable = true)
 |-- cat_marital-status: double (nullable = true)
 |-- cat_occupation: double (nullable = true)
 |-- cat_relationship: double (nullable = tru

In [99]:
from pyspark.ml.feature import VectorAssembler, VectorIndexer

categorical_assembler = Pipeline(stages = [
    VectorAssembler(inputCols = ["cat_%s"%c for c in catrgoricalFeatures ], outputCol='cat_vector'),                    
    VectorIndexer(inputCol='cat_vector', outputCol='cat_features')
])

categorical_assembler_model = categorical_assembler.fit(indexed_df)
cat_df = categorical_assembler_model.transform(indexed_df)
cat_df.printSchema()
cat_df.select('cat_features').show(5)

TypeError: __init__() got an unexpected keyword argument 'handleInvalid'

In [54]:
feature_assembler = VectorAssembler(inputCols = ['age', 'fnlwgt', 'hours-per-week', 'cat_features'] , outputCol='features')
feature_df = feature_assembler.transform(cat_df)

feature_df.printSchema()
feature_df.select('features').show()

root
 |-- age: long (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: double (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: double (nullable = true)
 |-- capital-loss: double (nullable = true)
 |-- hours-per-week: double (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- label: double (nullable = true)
 |-- cat_workclass: double (nullable = true)
 |-- cat_education: double (nullable = true)
 |-- cat_marital-status: double (nullable = true)
 |-- cat_occupation: double (nullable = true)
 |-- cat_relationship: double (nullable = true)
 |-- cat_race: double (nullable = true)
 |-- cat_sex: double (nullable = true)
 |-- cat_native-country: d

In [91]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

trainingDF = dataDF.sampleBy('label', fractions = {0.0: 0.7, 1.0:0.7}).cache()
testDF = dataDF.subtract(trainingDF).cache()

22891
9654


In [96]:
print(trainingDF.count())
print(trainingDF.where(col('label') == 1.0).count())
print(testDF.count())
print(testDF.where(col('label') == 1.0).count())

22891
5584
9654
2256


In [103]:
from pyspark.ml.classification import RandomForestClassifier


rfClassifier = RandomForestClassifier(featuresCol='features', labelCol='label', maxBins=50)

rfPipeline = Pipeline(stages = [indexerPipeline, categorical_assembler, feature_assembler, rfClassifier])


rfPipelineModel = rfPipeline.fit(trainingDF)



In [104]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(rfPipelineModel.transform(testDF))


0.8793251792704924

In [105]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", metricName='accuracy')
evaluator.evaluate(rfPipelineModel.transform(testDF))

0.8304154148969233

In [106]:
predictionDF = rfPipelineModel.transform(testDF).select('label','prediction').cache()

In [107]:
tpr = predictionDF.where((col('label') == 1.0) & (col('prediction') ==1.0)).count()
tnr = predictionDF.where((col('label') == 0.0) & (col('prediction') ==0.0)).count()
fpr = predictionDF.where((col('label') == 0.0) & (col('prediction') ==1.0)).count()
fnr = predictionDF.where((col('label') == 1.0) & (col('prediction') ==0.0)).count()

print(tnr, fpr)
print(fnr, tpr)

(6972, 425)
(1212, 1044)


In [111]:
from pyspark.ml.feature import OneHotEncoder

categorical_assembler = Pipeline(stages = [ OneHotEncoder(inputCol="cat_%s" % feature, 
                                            outputCol= "enc_%s"% feature) for feature in catrgoricalFeatures])

categorical_assembler_model = categorical_assembler.fit(indexed_df)
cat_df = categorical_assembler_model.transform(indexed_df)
cat_df.printSchema()

root
 |-- age: long (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: double (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: double (nullable = true)
 |-- capital-loss: double (nullable = true)
 |-- hours-per-week: double (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- label: double (nullable = true)
 |-- cat_workclass: double (nullable = true)
 |-- cat_education: double (nullable = true)
 |-- cat_marital-status: double (nullable = true)
 |-- cat_occupation: double (nullable = true)
 |-- cat_relationship: double (nullable = true)
 |-- cat_race: double (nullable = true)
 |-- cat_sex: double (nullable = true)
 |-- cat_native-country: d

In [118]:
from pyspark.ml.classification import LogisticRegression

lrClassifier = LogisticRegression(featuresCol='features', labelCol='label')

enfFetureAssembler = VectorAssembler(inputCols = ['age', 'fnlwgt', 'hours-per-week'] +
                               ["enc_%s"% feature for feature in catrgoricalFeatures ]      
                                     , outputCol='features')

lrPipeline = Pipeline(stages = [indexerPipeline, categorical_assembler, enfFetureAssembler, lrClassifier])


lrPipeline = lrPipeline.fit(trainingDF)

In [119]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(lrPipeline.transform(testDF))


0.8852387205086987