In [0]:
# Task 1: Data collection

from pyspark.sql.types import StructField, StructType, StringType, IntegerType, FloatType

sch = StructType([
    StructField("age", FloatType(), True),
    StructField("workClass", StringType(), True),
    StructField("fnlwgt", FloatType(), True),
    StructField("education", StringType(), True),
    StructField("educationNumber", StringType(), True),
    StructField("maritalStatus", StringType(), True),
    StructField("occupation", StringType(), True),
    StructField("relationship", StringType(), True),
    StructField("race", StringType(), True),
    StructField("sex", StringType(), True),
    StructField("capitalGain", FloatType(), True),
    StructField("capitalLoss", FloatType(), True),
    StructField("hoursPerWeek", FloatType(), True),
    StructField("nativeCountry", StringType(), True),
    StructField("income", StringType(), True),
])

df = spark.read.format("csv")\
    .option('ignoreLeadingWhiteSpace', True)\
    .option('ignoreTrailingWhiteSpace', True)\
    .schema(sch)\
    .load("/FileStore/tables/adult.data")\
    .dropna()

df.display()

age,workClass,fnlwgt,education,educationNumber,maritalStatus,occupation,relationship,race,sex,capitalGain,capitalLoss,hoursPerWeek,nativeCountry,income
39.0,State-gov,77516.0,Bachelors,13,Never-married,Adm-clerical,Not-in-family,White,Male,2174.0,0.0,40.0,United-States,<=50K
50.0,Self-emp-not-inc,83311.0,Bachelors,13,Married-civ-spouse,Exec-managerial,Husband,White,Male,0.0,0.0,13.0,United-States,<=50K
38.0,Private,215646.0,HS-grad,9,Divorced,Handlers-cleaners,Not-in-family,White,Male,0.0,0.0,40.0,United-States,<=50K
53.0,Private,234721.0,11th,7,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,0.0,0.0,40.0,United-States,<=50K
28.0,Private,338409.0,Bachelors,13,Married-civ-spouse,Prof-specialty,Wife,Black,Female,0.0,0.0,40.0,Cuba,<=50K
37.0,Private,284582.0,Masters,14,Married-civ-spouse,Exec-managerial,Wife,White,Female,0.0,0.0,40.0,United-States,<=50K
49.0,Private,160187.0,9th,5,Married-spouse-absent,Other-service,Not-in-family,Black,Female,0.0,0.0,16.0,Jamaica,<=50K
52.0,Self-emp-not-inc,209642.0,HS-grad,9,Married-civ-spouse,Exec-managerial,Husband,White,Male,0.0,0.0,45.0,United-States,>50K
31.0,Private,45781.0,Masters,14,Never-married,Prof-specialty,Not-in-family,White,Female,14084.0,0.0,50.0,United-States,>50K
42.0,Private,159449.0,Bachelors,13,Married-civ-spouse,Exec-managerial,Husband,White,Male,5178.0,0.0,40.0,United-States,>50K


In [0]:
# Task 2: Data cleaning

df = df.where(df.workClass != '?')\
    .where(df.occupation != '?')\
    .where(df.nativeCountry != '?')

df.display()

age,workClass,fnlwgt,education,educationNumber,maritalStatus,occupation,relationship,race,sex,capitalGain,capitalLoss,hoursPerWeek,nativeCountry,income
39.0,State-gov,77516.0,Bachelors,13,Never-married,Adm-clerical,Not-in-family,White,Male,2174.0,0.0,40.0,United-States,<=50K
50.0,Self-emp-not-inc,83311.0,Bachelors,13,Married-civ-spouse,Exec-managerial,Husband,White,Male,0.0,0.0,13.0,United-States,<=50K
38.0,Private,215646.0,HS-grad,9,Divorced,Handlers-cleaners,Not-in-family,White,Male,0.0,0.0,40.0,United-States,<=50K
53.0,Private,234721.0,11th,7,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,0.0,0.0,40.0,United-States,<=50K
28.0,Private,338409.0,Bachelors,13,Married-civ-spouse,Prof-specialty,Wife,Black,Female,0.0,0.0,40.0,Cuba,<=50K
37.0,Private,284582.0,Masters,14,Married-civ-spouse,Exec-managerial,Wife,White,Female,0.0,0.0,40.0,United-States,<=50K
49.0,Private,160187.0,9th,5,Married-spouse-absent,Other-service,Not-in-family,Black,Female,0.0,0.0,16.0,Jamaica,<=50K
52.0,Self-emp-not-inc,209642.0,HS-grad,9,Married-civ-spouse,Exec-managerial,Husband,White,Male,0.0,0.0,45.0,United-States,>50K
31.0,Private,45781.0,Masters,14,Never-married,Prof-specialty,Not-in-family,White,Female,14084.0,0.0,50.0,United-States,>50K
42.0,Private,159449.0,Bachelors,13,Married-civ-spouse,Exec-managerial,Husband,White,Male,5178.0,0.0,40.0,United-States,>50K


In [0]:
# Task 3: Feature Engineering

from pyspark.ml.feature import RFormula

supervised = RFormula(formula='income ~ . - fnlwgt - educationNumber',
                      featuresCol='features',
                      labelCol='label')

fittedRF = supervised.fit(df)
prepDF = fittedRF.transform(df)

train, test = prepDF.randomSplit([0.8, 0.2])

train[['features', 'label']].display(), test[['features', 'label']].display() 

Out[117]: (None, None)

features,label
"Map(vectorType -> sparse, length -> 94, indices -> List(0, 6, 12, 23, 31, 42, 47, 52, 53, 54), values -> List(17.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1602.0, 40.0, 1.0))",0.0
"Map(vectorType -> sparse, length -> 94, indices -> List(0, 3, 17, 23, 33, 43, 47, 50, 53, 54), values -> List(17.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 9.0, 1.0))",0.0
"Map(vectorType -> sparse, length -> 94, indices -> List(0, 3, 14, 23, 39, 43, 46, 52, 53, 54), values -> List(17.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1602.0, 40.0, 1.0))",0.0
"Map(vectorType -> sparse, length -> 94, indices -> List(0, 3, 12, 23, 28, 43, 47, 50, 53, 54), values -> List(17.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 15.0, 1.0))",0.0
"Map(vectorType -> sparse, length -> 94, indices -> List(0, 3, 12, 23, 39, 43, 46, 50, 53, 54), values -> List(17.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 30.0, 1.0))",0.0
"Map(vectorType -> sparse, length -> 94, indices -> List(0, 3, 12, 23, 33, 43, 46, 53, 54), values -> List(17.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 16.0, 1.0))",0.0
"Map(vectorType -> sparse, length -> 94, indices -> List(0, 3, 17, 23, 33, 43, 46, 50, 53, 54), values -> List(17.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 45.0, 1.0))",0.0
"Map(vectorType -> sparse, length -> 94, indices -> List(0, 3, 12, 23, 29, 43, 46, 50, 53, 54), values -> List(17.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 35.0, 1.0))",0.0
"Map(vectorType -> sparse, length -> 94, indices -> List(0, 3, 12, 23, 28, 43, 46, 53, 54), values -> List(17.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 40.0, 1.0))",0.0
"Map(vectorType -> sparse, length -> 94, indices -> List(0, 3, 12, 23, 28, 43, 46, 53, 58), values -> List(17.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 20.0, 1.0))",0.0


features,label
"Map(vectorType -> sparse, length -> 94, indices -> List(0, 3, 14, 23, 33, 43, 46, 53, 54), values -> List(17.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 25.0, 1.0))",0.0
"Map(vectorType -> sparse, length -> 94, indices -> List(0, 3, 12, 23, 31, 43, 46, 53, 54), values -> List(17.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 12.0, 1.0))",0.0
"Map(vectorType -> sparse, length -> 94, indices -> List(0, 3, 12, 23, 32, 43, 46, 50, 53, 54), values -> List(17.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 20.0, 1.0))",0.0
"Map(vectorType -> sparse, length -> 94, indices -> List(0, 3, 18, 23, 31, 43, 46, 53, 54), values -> List(17.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 12.0, 1.0))",0.0
"Map(vectorType -> sparse, length -> 94, indices -> List(0, 1, 14, 23, 32, 43, 46, 53, 54), values -> List(17.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 12.0, 1.0))",0.0
"Map(vectorType -> sparse, length -> 94, indices -> List(0, 1, 17, 23, 33, 43, 46, 50, 53, 54), values -> List(17.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 16.0, 1.0))",0.0
"Map(vectorType -> sparse, length -> 94, indices -> List(0, 1, 14, 23, 32, 43, 46, 53, 54), values -> List(17.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 10.0, 1.0))",0.0
"Map(vectorType -> sparse, length -> 94, indices -> List(0, 1, 12, 23, 31, 43, 46, 53, 54), values -> List(17.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 23.0, 1.0))",0.0
"Map(vectorType -> sparse, length -> 94, indices -> List(0, 1, 12, 23, 33, 43, 46, 53, 54), values -> List(17.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 15.0, 1.0))",0.0
"Map(vectorType -> sparse, length -> 94, indices -> List(0, 1, 14, 23, 33, 43, 46, 53, 54), values -> List(17.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 6.0, 1.0))",0.0


In [0]:
# Task 4: Training

from pyspark.ml.classification import LogisticRegression, GBTClassifier

lr = LogisticRegression(labelCol='label', featuresCol='features')
gbt = GBTClassifier(labelCol='label', featuresCol='features')

lrModel = lr.fit(train)
gbtModel = gbt.fit(train)

print(f'LR Model summary: {lrModel}')
print(f'GBT Model summary: {gbtModel}')

LR Model summary: LogisticRegressionModel: uid=LogisticRegression_d2c8ac607334, numClasses=2, numFeatures=94
GBT Model summary: GBTClassificationModel: uid = GBTClassifier_72b7f3fc20eb, numTrees=20, numClasses=2, numFeatures=94


In [0]:
# Task 5: Tuning and Evaluation

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline

lrPipeline = Pipeline().setStages([lrModel])
gbtPipeline = Pipeline().setStages([gbtModel])

lrParams = ParamGridBuilder()\
    .addGrid(lrModel.regParam, [0, 0.5, 1])\
    .addGrid(lrModel.elasticNetParam, [0, 0.5, 1])\
    .build()

gbtParams = ParamGridBuilder()\
    .addGrid(gbtModel.maxDepth, [2, 5, 10])\
    .addGrid(gbtModel.maxBins, [10, 20, 40])\
    .build()

bce = BinaryClassificationEvaluator()

lrCrossVal = CrossValidator(
    estimator=lrPipeline,
    estimatorParamMaps=lrParams,
    evaluator=bce,
    numFolds=5
)

gbtCrossVal = CrossValidator(
    estimator=gbtPipeline,
    estimatorParamMaps=gbtParams,
    evaluator=bce,
    numFolds=5
)

lrCVTrain = lrCrossVal.fit(train)
gbtCVTrain = gbtCrossVal.fit(train)

lrTrain = lrCVTrain.transform(train)
gbtTrain = gbtCVTrain.transform(train)

print(f'LR  Model training set (areaUnderROC): {round(bce.evaluate(lrTrain), 4)}')
print(f'GBT Model training set (areaUnderROC): {round(bce.evaluate(gbtTrain), 4)}')

LR  Model training set (areaUnderROC): 0.9059
GBT Model training set (areaUnderROC): 0.9152


In [0]:
# Task 6: Predictions

lrPred = lrCVTrain.transform(test)
gbtPred = gbtCVTrain.transform(test)

print(f'LR  Model test set prediction (areaUnderROC): {round(bce.evaluate(lrPred), 4)}')
print(f'GBT Model test set prediction (areaUnderROC): {round(bce.evaluate(gbtPred), 4)}')

LR  Model test set prediction (areaUnderROC): 0.9066
GBT Model test set prediction (areaUnderROC): 0.9097
