In [0]:
# Task 1 Data collection : Load the adult dataset" from https://archive.ics.uci.edu/ml/datasets/adult.

# Path at which the data file has been uploaded
filepath = "dbfs:/FileStore/shared_uploads/tilloop1@montclair.edu/adult.data"

# Looking at CSV, it was evident that '?' have been put in place of NULL values, hence using nullValue='?'
df = spark.read.csv(filepath,
    header=False,
    nullValue='?',
    ignoreLeadingWhiteSpace=True,
    ignoreTrailingWhiteSpace=True,
    inferSchema=True)

# Renaming columns for better understandability - column names taken from data description
df = df.withColumnRenamed('_c0','Age').withColumnRenamed('_c1', 'Workclass').withColumnRenamed('_c2', 'Fnlwgt').withColumnRenamed('_c3', 'Education').withColumnRenamed('_c4', 'Education_num').withColumnRenamed('_c5', 'Marital_status').withColumnRenamed('_c6', 'Occupation').withColumnRenamed('_c7', 'Relationship').withColumnRenamed('_c8', 'Race').withColumnRenamed('_c9', 'Sex').withColumnRenamed('_c10', 'Capital_gain').withColumnRenamed('_c11', 'Capital_loss').withColumnRenamed('_c12', 'Hours_per_week').withColumnRenamed('_c13', 'Native_country').withColumnRenamed('_c14', 'Income')

df.show(5)
df.schema

+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+--------------+--------------+------+
|Age|       Workclass|Fnlwgt|Education|Education_num|    Marital_status|       Occupation| Relationship| Race|   Sex|Capital_gain|Capital_loss|Hours_per_week|Native_country|Income|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+--------------+--------------+------+
| 39|       State-gov| 77516|Bachelors|           13|     Never-married|     Adm-clerical|Not-in-family|White|  Male|        2174|           0|            40| United-States| <=50K|
| 50|Self-emp-not-inc| 83311|Bachelors|           13|Married-civ-spouse|  Exec-managerial|      Husband|White|  Male|           0|           0|            13| United-States| <=50K|
| 38|         Private|215646|  HS-grad|            9|          Divorced|Handlers-cleaners|Not-i

In [0]:
# Task 2 : Data cleaning : Handle the missing values.

from pyspark.sql.functions import col

# Queries to check how many rows contain NULL values

df.count()   
df.filter((col("Age").isNull()) | (col("Workclass").isNull()) | (col("Fnlwgt").isNull()) | (col("Education").isNull()) | (col("Education_num").isNull()) | (col("Marital_status").isNull()) | (col("Occupation").isNull()) | (col("Relationship").isNull()) | (col("Race").isNull()) | (col("Sex").isNull()) | (col("Capital_gain").isNull()) | (col("Capital_loss").isNull()) | (col("Hours_per_week").isNull()) | (col("Native_country").isNull()) |(col("Income").isNull())).count()

# 2399 out of 32561 rows contain NULL values. These will be dropped
df = df.na.drop()

# 1. Creating capital_return = gain (+ve) or loss (-ve). Drop 'Capital_gain' and 'Capital_loss' as not rqd
df = df.withColumn("Capital_Returns", col("Capital_gain") - col("Capital_loss"))

# 2. Final weights was used for data integration (as per adults.names) and is not useful for us. Hence, drop 'fntwgt'

# 3. 'Education_num' and 'Education' - both correspond to the same data. Drop redundant column 'Education_num'

# As per 1, 2 and 3 : Final list of columns to be dropped
cols_to_drop = ("Capital_loss","Capital_gain","Fnlwgt","Education_num")

df = df.drop(*cols_to_drop)

# Data cleaning is done now!
df.show(5)


+---+----------------+---------+------------------+-----------------+-------------+-----+------+--------------+--------------+------+---------------+
|Age|       Workclass|Education|    Marital_status|       Occupation| Relationship| Race|   Sex|Hours_per_week|Native_country|Income|Capital_Returns|
+---+----------------+---------+------------------+-----------------+-------------+-----+------+--------------+--------------+------+---------------+
| 39|       State-gov|Bachelors|     Never-married|     Adm-clerical|Not-in-family|White|  Male|            40| United-States| <=50K|           2174|
| 50|Self-emp-not-inc|Bachelors|Married-civ-spouse|  Exec-managerial|      Husband|White|  Male|            13| United-States| <=50K|              0|
| 38|         Private|  HS-grad|          Divorced|Handlers-cleaners|Not-in-family|White|  Male|            40| United-States| <=50K|              0|
| 53|         Private|     11th|Married-civ-spouse|Handlers-cleaners|      Husband|Black|  Male|    

In [0]:
# Task 3 : Feature engineering
# Distill the features (all columns except for income) and labels (income). 
# Transform the features into vectors.
# Use one-hot encoder to process categorical features. Split the dataset into a training and testing set with a ratio of 80% vs 20%

from pyspark.ml.feature import OneHotEncoder, StringIndexer, Bucketizer, VectorAssembler
import pyspark.sql.functions as func

simpleDF = df

# Workclass (Categorical)
lblIndxr = StringIndexer().setInputCol("Workclass").setOutputCol("Workclass_Ind")
simpleDF = lblIndxr.fit(simpleDF).transform(simpleDF)
ohe = OneHotEncoder().setInputCol("Workclass_Ind").setOutputCol("Workclass_Ohe")
simpleDF = ohe.fit(simpleDF).transform(simpleDF)

# Education (Categorical)
lblIndxr = StringIndexer().setInputCol("Education").setOutputCol("Education_Ind")
simpleDF = lblIndxr.fit(simpleDF).transform(simpleDF)
ohe = OneHotEncoder().setInputCol("Education_Ind").setOutputCol("Education_Ohe")
simpleDF = ohe.fit(simpleDF).transform(simpleDF)

# Marital_status (Categorical)
lblIndxr = StringIndexer().setInputCol("Marital_status").setOutputCol("Marital_status_Ind")
simpleDF = lblIndxr.fit(simpleDF).transform(simpleDF)
ohe = OneHotEncoder().setInputCol("Marital_status_Ind").setOutputCol("Marital_status_Ohe")
simpleDF = ohe.fit(simpleDF).transform(simpleDF)

# Occupation (Categorical)
lblIndxr = StringIndexer().setInputCol("Occupation").setOutputCol("Occupation_Ind")
simpleDF = lblIndxr.fit(simpleDF).transform(simpleDF)
ohe = OneHotEncoder().setInputCol("Occupation_Ind").setOutputCol("Occupation_Ohe")
simpleDF = ohe.fit(simpleDF).transform(simpleDF)

# Relationship (Categorial)
lblIndxr = StringIndexer().setInputCol("Relationship").setOutputCol("Relationship_Ind")
simpleDF = lblIndxr.fit(simpleDF).transform(simpleDF)
ohe = OneHotEncoder().setInputCol("Relationship_Ind").setOutputCol("Relationship_Ohe")
simpleDF = ohe.fit(simpleDF).transform(simpleDF)

# Race (Categorical)
lblIndxr = StringIndexer().setInputCol("Race").setOutputCol("Race_Ind")
simpleDF = lblIndxr.fit(simpleDF).transform(simpleDF)
ohe = OneHotEncoder().setInputCol("Race_Ind").setOutputCol("Race_Ohe")
simpleDF = ohe.fit(simpleDF).transform(simpleDF)

# Sex (Categorical)
lblIndxr = StringIndexer().setInputCol("Sex").setOutputCol("Sex_Ind")
simpleDF = lblIndxr.fit(simpleDF).transform(simpleDF)
ohe = OneHotEncoder().setInputCol("Sex_Ind").setOutputCol("Sex_Ohe")
simpleDF = ohe.fit(simpleDF).transform(simpleDF)

# Native_country (Categorical)
lblIndxr = StringIndexer().setInputCol("Native_country").setOutputCol("Native_country_Ind")
simpleDF = lblIndxr.fit(simpleDF).transform(simpleDF)
ohe = OneHotEncoder().setInputCol("Native_country_Ind").setOutputCol("Native_country_Ohe")
simpleDF = ohe.fit(simpleDF).transform(simpleDF)

# For continuous features

# Age (Continuous)
simpleDF = simpleDF.withColumn("Age",col("Age").cast('double'))
bucketBorders = [15.0, 25.0, 35.0, 45.0, 55.0, 65.0, 75.0, 85.0, 95.0]
bucketer = Bucketizer().setSplits(bucketBorders).setInputCol("Age").setOutputCol("Age_bucket")
simpleDF = bucketer.transform(simpleDF)

# Capital_Returns (Continuous)
simpleDF = simpleDF.withColumn("Capital_Returns",col("Capital_Returns").cast('double'))
bucketBorders = [-100000,-50000,0,50000,100000]
bucketer = Bucketizer().setSplits(bucketBorders).setInputCol("Capital_Returns").setOutputCol("Capital_Return_bucket")
simpleDF = bucketer.transform(simpleDF)

# Hours_per_week (Continuous)
simpleDF = simpleDF.withColumn("Hours_per_week",col("Hours_per_week").cast('double'))
bucketBorders = [0,10,20,30,40,50,60,70,80,90,100]
bucketer = Bucketizer().setSplits(bucketBorders).setInputCol("Hours_per_week").setOutputCol("Hours_per_week_bucket")
simpleDF = bucketer.transform(simpleDF)

# Vector assembler for Double type columns

va = VectorAssembler(outputCol="Age_Cap_Hrs_Vector").setInputCols(["Age_bucket","Capital_Return_bucket","Hours_per_week_bucket"])
simpleDF = va.transform(simpleDF)

va = VectorAssembler(outputCol="features").setInputCols(["Workclass_Ohe","Education_Ohe","Marital_status_Ohe","Occupation_Ohe","Relationship_Ohe","Race_Ohe","Sex_Ohe","Native_country_Ohe","Age_Cap_Hrs_Vector"])

simpleDF = va.transform(simpleDF)

# For classification label : Income
simpleDF = simpleDF.withColumn("label", func.when((col("Income") == '>50K'), 1.0).otherwise(0.0))

cols_to_drop = ("Workclass","Education","Marital_status","Occupation","Relationship","Race","Sex","Native_country","Age","Capital_Returns","Hours_per_week","Income","Workclass_Ind","Education_Ind","Marital_status_Ind","Occupation_Ind","Relationship_Ind","Race_Ind","Sex_Ind","Native_country_Ind", "Age_bucket", "Capital_Return_bucket","Hours_per_week_bucket","Workclass_Ohe","Education_Ohe","Marital_status_Ohe","Occupation_Ohe","Relationship_Ohe","Race_Ohe","Sex_Ohe","Native_country_Ohe","Age_Cap_Hrs_Vector")

simpleDF = simpleDF.drop(*cols_to_drop)

# There were 7508 class '1' rows and 22654 class '0' rows. Below part addresses class imbalance: 

equalized_df = simpleDF.filter("label==0.0").limit(7508)
one_df = simpleDF.filter("label==1.0")

equalized_df = equalized_df.union(one_df)

# NOTE: This was done after doing some rounds of training. The imbalanced model's AreaUnderROC was 0.7
# After using balanced classes, the areaUnderROC improved to almost 0.8 (to be seen further below)

train, test = equalized_df.randomSplit([0.8,0.2])

equalized_df.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(93,[3,8,22,30,41...|  0.0|
|(93,[1,8,21,29,40...|  0.0|
|(93,[0,6,23,35,41...|  0.0|
|(93,[0,11,21,35,4...|  0.0|
|(93,[0,8,21,27,44...|  0.0|
+--------------------+-----+
only showing top 5 rows



In [0]:
# Task 4 : Build a logistic regression and a gradient-boosted tree model to fit the dataset.

from pyspark.ml.classification import LogisticRegression, GBTClassifier
from pyspark.ml.regression import GBTRegressor

lr = LogisticRegression(featuresCol='features',labelCol='label')

gbt = GBTClassifier(featuresCol='features',labelCol='label')


In [0]:
# Task 5 : Tuning and Evaluation : LOGISTIC REGRESSION
# Use grid parameter search and cross validation over 5 folds to find the parameters that yields the highest areaUnderROC on the training set.

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Define a grid of hyperparameters to test
lr_params = ParamGridBuilder().addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]).addGrid(lr.regParam, [0.1, 2.0]).build()  
    
# Using AreaUnderROC as the evaluation metric
lr_evaluator = BinaryClassificationEvaluator().setMetricName("areaUnderROC").setRawPredictionCol("prediction").setLabelCol("label")

# CrossValidator that will do the model tuning
lr_cv = CrossValidator().setEstimator(lr).setEvaluator(lr_evaluator).setEstimatorParamMaps(lr_params).setNumFolds(5).setParallelism(2)

lr_trained_model = lr_cv.fit(train)




In [0]:
# Task 5 : Tuning and Evaluation : GRADIENT-BOOSTED TREE
# Use grid parameter search and cross validation over 5 folds to find the parameters that yields the highest areaUnderROC on the training set.

# Define a grid of hyperparameters to test
gbt_params = ParamGridBuilder().addGrid(gbt.maxDepth, [2, 5]).addGrid(gbt.maxIter, [5, 10]).build()

gbt_evaluator = BinaryClassificationEvaluator().setMetricName("areaUnderROC").setLabelCol("label")

# CrossValidator that will do the model tuning
gbt_cv = CrossValidator().setEstimator(gbt).setEvaluator(gbt_evaluator).setEstimatorParamMaps(gbt_params).setNumFolds(5).setParallelism(2)

gbt_trained_model = gbt_cv.fit(train)


In [0]:
# Task 6 Prediction : LOGISTIC REGRESSION
# Make predictions on the testing set and display the areaUnderROC.

print(f"Area under ROC for Logistic Regression : {lr_evaluator.evaluate(lr_trained_model.transform(test))}\n")

prediction = lr_trained_model.transform(test)
result = prediction.select("features", "label", "probability", "prediction").collect()

i=0

for row in result:
    print("features=%s, label=%s -> prob=%s, prediction=%s"
          % (row.features, row.label, row.probability, row.prediction))
    if(row["label"] == row["prediction"]):
            i+=1
            
print(f"\nCorrect predictions in test set : {i} out of {test.count()}");


Area under ROC for Logistic Regression : 0.8002910737386805

features=(93,[0,6,21,27,40,45,49,50,90,91,92],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,4.0]), label=0.0 -> prob=[0.3039212983008007,0.6960787016991993], prediction=1.0
features=(93,[0,6,21,27,40,45,49,50,90,91,92],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,6.0]), label=0.0 -> prob=[0.20459775233219615,0.7954022476678039], prediction=1.0
features=(93,[0,6,21,27,40,45,49,50,90,91,92],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,4.0,2.0,1.0]), label=0.0 -> prob=[0.3672498256904832,0.6327501743095167], prediction=1.0
features=(93,[0,6,21,27,44,45,50,90,91,92],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,2.0,4.0]), label=0.0 -> prob=[0.29915712952437784,0.7008428704756222], prediction=1.0
features=(93,[0,6,21,28,40,45,49,50,90,91,92],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,3.0]), label=0.0 -> prob=[0.5218646146363566,0.4781353853636434], prediction=0.0
features=(93,[0,6,21,28,40,45,49,50,90,91,92],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,4.0]), label=0.0 

In [0]:
# Task 6 Prediction : GRADIENT-BOOSTED TREE MODEL
# Make predictions on the testing set and display the areaUnderROC.

print(f"Area under ROC for Gradient-boosted tree model : {gbt_evaluator.evaluate(gbt_trained_model.transform(test))}\n")

prediction = gbt_trained_model.transform(test)
result = prediction.select("features", "label", "prediction").collect()

i=0

for row in result:
    print("features=%s, label=%s -> prediction=%s"
          % (row.features, row.label, row.prediction))
    if(row["label"] == row["prediction"]):
            i+=1
    
print(f"\nCorrect predictions in test set : {i} out of {test.count()}");

Area under ROC for Gradient-boosted tree model : 0.876797587171696

features=(93,[0,6,21,27,40,45,49,50,90,91,92],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,4.0]), label=0.0 -> prediction=1.0
features=(93,[0,6,21,27,40,45,49,50,90,91,92],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,6.0]), label=0.0 -> prediction=1.0
features=(93,[0,6,21,27,40,45,49,50,90,91,92],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,4.0,2.0,1.0]), label=0.0 -> prediction=1.0
features=(93,[0,6,21,27,44,45,50,90,91,92],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,2.0,4.0]), label=0.0 -> prediction=1.0
features=(93,[0,6,21,28,40,45,49,50,90,91,92],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,3.0]), label=0.0 -> prediction=0.0
features=(93,[0,6,21,28,40,45,49,50,90,91,92],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,4.0]), label=0.0 -> prediction=0.0
features=(93,[0,6,21,28,40,45,49,50,90,91,92],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,4.0]), label=0.0 -> prediction=0.0
features=(93,[0,6,21,28,40,45,49,50,90,91,92],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.