In [1]:
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('BDASProj').getOrCreate()

In [2]:
#reading it in
df = spark.read.csv('/home/ubuntu/BDASProj/usa_00007.csv',header=True)
df.columns

['YEAR',
 'SAMPLE',
 'SERIAL',
 'CBSERIAL',
 'HHWT',
 'STATEICP',
 'GQ',
 'PERNUM',
 'PERWT',
 'SEX',
 'AGE',
 'MARST',
 'RACE',
 'RACED',
 'EDUC',
 'EDUCD',
 'EMPSTAT',
 'EMPSTATD',
 'OCC',
 'INCWAGE']

In [3]:
#deleting unwanted columns, and setting up df schema
from pyspark.sql.types import DoubleType

#deleting columns and showing new df
df2 = df.select([c for c in df.columns if c not in {'YEAR','SAMPLE',"SERIAL","CBSERIAL","HHWT","STATEICP","GQ","PERNUM","PERWT","RACED","EDUCD","EMPSTAT","EMPSTATD"}])
df2.show()
df2.columns

+---+---+-----+----+----+----+-------+
|SEX|AGE|MARST|RACE|EDUC| OCC|INCWAGE|
+---+---+-----+----+----+----+-------+
|  2| 31|    6|   1|  10| 350|  38500|
|  2| 37|    4|   2|   6| 230|  18000|
|  1| 21|    6|   2|   7|4620|  15000|
|  1| 20|    6|   2|   7|4120|   1200|
|  1| 61|    2|   1|  10|1410| 160000|
|  1| 62|    1|   2|  10| 430| 100000|
|  2| 58|    1|   2|  10|4710| 344000|
|  1| 30|    6|   2|  10|4710| 120000|
|  1| 26|    6|   2|  10|4850|  50000|
|  2| 55|    6|   1|   7|4500|      0|
|  1| 54|    4|   1|  10|4840|  56000|
|  1| 36|    4|   1|   6|9130|  24700|
|  1| 50|    1|   1|  11|2200| 344000|
|  2| 47|    1|   1|   6|2340|  10000|
|  2| 47|    1|   2|   8|4510|  15000|
|  1| 53|    1|   2|   8|4760|  30000|
|  2| 26|    6|   1|   7|5220|  25000|
|  1| 22|    6|   2|   7|8965|  29500|
|  1| 48|    1|   1|   6|3740|  65000|
|  2| 49|    1|   1|  10|3600|  65000|
+---+---+-----+----+----+----+-------+
only showing top 20 rows



['SEX', 'AGE', 'MARST', 'RACE', 'EDUC', 'OCC', 'INCWAGE']

In [4]:
#change values within columns (all except OCC)
sexDict = {'1':'Male','2':'Female'} 
df2 = df2.na.replace(sexDict,1,"SEX")

mDict = {'1':'Married','2':'Married','3':'Separated','4':'Separated','5':'Widowed','6':'NeverMarried-Single'}
df2 = df2.na.replace(mDict,1,"MARST")

rDict = {'1': "White", '2': "Black", '3': "Native-IndianOrAlaskan", '4': "Chinese", '5': "Japanese", '6': "OtherAsianOrPacificIslander"}
df2 = df2.na.replace(rDict,1,"RACE")

eDict = {'0': "NoSchooling", '1': "NurseryToKinderGarten", '2': "MiddleSchool", '3': "HighSchool", '4': "HighSchool", '5': "HighSchool", '6': "HighSchool",'7': "1stYearCollege", '8': "2ndYearCollege", '9': "3rdYearCollege", '10': "4thYearCollege", '11': "5+YearsCollege"}
df2 = df2.na.replace(eDict,1,"EDUC")
df2.show()

#removing 7,8,9 from RACE (we dont consider those race codes)
df2 = df2.filter('RACE not in ("7","8","9")')
df2.select('RACE').distinct().show()

+------+---+-------------------+-----+--------------+----+-------+
|   SEX|AGE|              MARST| RACE|          EDUC| OCC|INCWAGE|
+------+---+-------------------+-----+--------------+----+-------+
|Female| 31|NeverMarried-Single|White|4thYearCollege| 350|  38500|
|Female| 37|          Separated|Black|    HighSchool| 230|  18000|
|  Male| 21|NeverMarried-Single|Black|1stYearCollege|4620|  15000|
|  Male| 20|NeverMarried-Single|Black|1stYearCollege|4120|   1200|
|  Male| 61|            Married|White|4thYearCollege|1410| 160000|
|  Male| 62|            Married|Black|4thYearCollege| 430| 100000|
|Female| 58|            Married|Black|4thYearCollege|4710| 344000|
|  Male| 30|NeverMarried-Single|Black|4thYearCollege|4710| 120000|
|  Male| 26|NeverMarried-Single|Black|4thYearCollege|4850|  50000|
|Female| 55|NeverMarried-Single|White|1stYearCollege|4500|      0|
|  Male| 54|          Separated|White|4thYearCollege|4840|  56000|
|  Male| 36|          Separated|White|    HighSchool|9130|  24

In [5]:
#then retaining only non-zero INCWAGEs
df2 = df2.filter(df2['INCWAGE']>0)

In [6]:
#Monumental Task: Reclassifying OCC
from pyspark.ml.feature import Bucketizer

#first convert stringtype of occrecode to doubletype
df2 = df2.withColumn("OCC", df2["OCC"].cast(DoubleType()))

#then reclassify
bucketizer = Bucketizer(splits=[0,741,951,1966,2060,2161,2551,2921,3656,3956,4651,4966,5941,6131,9751,9921],
                       inputCol="OCC",outputCol="OCCRecode")
df2_buck = bucketizer.setHandleInvalid("keep").transform(df2)

#showing the occrecode column, as well as the number of distinct values (Showing it Worked!)
df2_buck.show()
df2_buck.select("OCCRecode").distinct().show()

+------+---+-------------------+-----+--------------+------+-------+---------+
|   SEX|AGE|              MARST| RACE|          EDUC|   OCC|INCWAGE|OCCRecode|
+------+---+-------------------+-----+--------------+------+-------+---------+
|Female| 31|NeverMarried-Single|White|4thYearCollege| 350.0|  38500|      0.0|
|Female| 37|          Separated|Black|    HighSchool| 230.0|  18000|      0.0|
|  Male| 21|NeverMarried-Single|Black|1stYearCollege|4620.0|  15000|      9.0|
|  Male| 20|NeverMarried-Single|Black|1stYearCollege|4120.0|   1200|      9.0|
|  Male| 61|            Married|White|4thYearCollege|1410.0| 160000|      2.0|
|  Male| 62|            Married|Black|4thYearCollege| 430.0| 100000|      0.0|
|Female| 58|            Married|Black|4thYearCollege|4710.0| 344000|     10.0|
|  Male| 30|NeverMarried-Single|Black|4thYearCollege|4710.0| 120000|     10.0|
|  Male| 26|NeverMarried-Single|Black|4thYearCollege|4850.0|  50000|     10.0|
|  Male| 54|          Separated|White|4thYearCollege

In [7]:
#continuing...
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

t = {0.0:'Business',1.0:'Finance',2.0:'STEM',3.0:'PublicSector',4.0:'Law',
     5.0:'Education',6.0:'Media',7.0:'Healthcare',8.0:'PublicSector',9.0:
     'Hospitality',10.0:'Sales',11.0:'Administration',12.0:'STEM',13.0:'Trades',14.0:'PublicSector'}
udf_foo = udf(lambda x: t[x], StringType())
df3 = df2_buck.withColumn('OCCRecode',udf_foo("OCCRecode"))

#showing the new recoded occupations
df3.select("OCCRecode").distinct().show()
df3.show()

+--------------+
|     OCCRecode|
+--------------+
|     Education|
|         Sales|
|    Healthcare|
|  PublicSector|
|       Finance|
|         Media|
|          STEM|
|   Hospitality|
|Administration|
|           Law|
|      Business|
|        Trades|
+--------------+

+------+---+-------------------+-----+--------------+------+-------+--------------+
|   SEX|AGE|              MARST| RACE|          EDUC|   OCC|INCWAGE|     OCCRecode|
+------+---+-------------------+-----+--------------+------+-------+--------------+
|Female| 31|NeverMarried-Single|White|4thYearCollege| 350.0|  38500|      Business|
|Female| 37|          Separated|Black|    HighSchool| 230.0|  18000|      Business|
|  Male| 21|NeverMarried-Single|Black|1stYearCollege|4620.0|  15000|   Hospitality|
|  Male| 20|NeverMarried-Single|Black|1stYearCollege|4120.0|   1200|   Hospitality|
|  Male| 61|            Married|White|4thYearCollege|1410.0| 160000|          STEM|
|  Male| 62|            Married|Black|4thYearCollege| 4

In [8]:
#now we add a column for male median grouped by OCCRecode
df3 = df3.withColumn("M_MedWage",df3["OCCRecode"])
df3.show()

+------+---+-------------------+-----+--------------+------+-------+--------------+--------------+
|   SEX|AGE|              MARST| RACE|          EDUC|   OCC|INCWAGE|     OCCRecode|     M_MedWage|
+------+---+-------------------+-----+--------------+------+-------+--------------+--------------+
|Female| 31|NeverMarried-Single|White|4thYearCollege| 350.0|  38500|      Business|      Business|
|Female| 37|          Separated|Black|    HighSchool| 230.0|  18000|      Business|      Business|
|  Male| 21|NeverMarried-Single|Black|1stYearCollege|4620.0|  15000|   Hospitality|   Hospitality|
|  Male| 20|NeverMarried-Single|Black|1stYearCollege|4120.0|   1200|   Hospitality|   Hospitality|
|  Male| 61|            Married|White|4thYearCollege|1410.0| 160000|          STEM|          STEM|
|  Male| 62|            Married|Black|4thYearCollege| 430.0| 100000|      Business|      Business|
|Female| 58|            Married|Black|4thYearCollege|4710.0| 344000|         Sales|         Sales|
|  Male| 3

In [9]:
#then putting in values for median wage
medWageMap={
    'Administration':'32000',
    'Business':'80000',
    'Education':'47500',
    'Healthcare':'67000',
    'Hospitality':'18000',
    'Law':'85000',
    'Media':'35000',
    'PublicSector':'46850',
    'STEM':'73500',
    'Sales':'42000',
    'Trades':'36000',
    'Finance':'97000'
}
df3 = df3.na.replace(medWageMap,1,"M_MedWage")
df3 = df3.drop("OCC")
df3.show()

+------+---+-------------------+-----+--------------+-------+--------------+---------+
|   SEX|AGE|              MARST| RACE|          EDUC|INCWAGE|     OCCRecode|M_MedWage|
+------+---+-------------------+-----+--------------+-------+--------------+---------+
|Female| 31|NeverMarried-Single|White|4thYearCollege|  38500|      Business|    80000|
|Female| 37|          Separated|Black|    HighSchool|  18000|      Business|    80000|
|  Male| 21|NeverMarried-Single|Black|1stYearCollege|  15000|   Hospitality|    18000|
|  Male| 20|NeverMarried-Single|Black|1stYearCollege|   1200|   Hospitality|    18000|
|  Male| 61|            Married|White|4thYearCollege| 160000|          STEM|    73500|
|  Male| 62|            Married|Black|4thYearCollege| 100000|      Business|    80000|
|Female| 58|            Married|Black|4thYearCollege| 344000|         Sales|    42000|
|  Male| 30|NeverMarried-Single|Black|4thYearCollege| 120000|         Sales|    42000|
|  Male| 26|NeverMarried-Single|Black|4thYe

In [10]:
#deriving the pay gap ratio column
df3 = df3.withColumn("PayGapRatio",df3['INCWAGE']/df3['M_MedWage'])

#selecting only females
df3 = df3.filter(df3['SEX']=='Female')
df3.show()

+------+---+-------------------+-----+--------------+-------+--------------+---------+-------------------+
|   SEX|AGE|              MARST| RACE|          EDUC|INCWAGE|     OCCRecode|M_MedWage|        PayGapRatio|
+------+---+-------------------+-----+--------------+-------+--------------+---------+-------------------+
|Female| 31|NeverMarried-Single|White|4thYearCollege|  38500|      Business|    80000|            0.48125|
|Female| 37|          Separated|Black|    HighSchool|  18000|      Business|    80000|              0.225|
|Female| 58|            Married|Black|4thYearCollege| 344000|         Sales|    42000|   8.19047619047619|
|Female| 47|            Married|White|    HighSchool|  10000|     Education|    47500|0.21052631578947367|
|Female| 47|            Married|Black|2ndYearCollege|  15000|   Hospitality|    18000| 0.8333333333333334|
|Female| 26|NeverMarried-Single|White|1stYearCollege|  25000|Administration|    32000|            0.78125|
|Female| 49|            Married|White

In [11]:
#then reclassify
pGRatioBucket = Bucketizer(splits=[0, 0.33, 0.66, 0.99, 1.0, float('Inf')],
                       inputCol="PayGapRatio",outputCol="payGapClass")

In [12]:
df3_buck = pGRatioBucket.setHandleInvalid("keep").transform(df3)
df3_buck.select("payGapClass").distinct().show()

df3_buck.show()

+-----------+
|payGapClass|
+-----------+
|        0.0|
|        1.0|
|        4.0|
|        3.0|
|        2.0|
+-----------+

+------+---+-------------------+-----+--------------+-------+--------------+---------+-------------------+-----------+
|   SEX|AGE|              MARST| RACE|          EDUC|INCWAGE|     OCCRecode|M_MedWage|        PayGapRatio|payGapClass|
+------+---+-------------------+-----+--------------+-------+--------------+---------+-------------------+-----------+
|Female| 31|NeverMarried-Single|White|4thYearCollege|  38500|      Business|    80000|            0.48125|        1.0|
|Female| 37|          Separated|Black|    HighSchool|  18000|      Business|    80000|              0.225|        0.0|
|Female| 58|            Married|Black|4thYearCollege| 344000|         Sales|    42000|   8.19047619047619|        4.0|
|Female| 47|            Married|White|    HighSchool|  10000|     Education|    47500|0.21052631578947367|        0.0|
|Female| 47|            Married|Black|2n

In [13]:
#now that we have reclassified the pay gap ratio, and this is a classification task, we proceed

#onehotencoding
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

In [14]:
cat_cols = ["MARST","RACE","EDUC","OCCRecode"]
num_cols = ["AGE"]

In [15]:
#building indexers and encoders for cat variables
indexers = [StringIndexer(inputCol = c, outputCol="{0}_i".format(c)) for c in cat_cols]
encoders = [OneHotEncoder(inputCol = indexer.getOutputCol(), outputCol = "{0}_e".format(indexer.getOutputCol())) 
for indexer in indexers]

In [16]:
#assembler for cat
assemblerCat = VectorAssembler(inputCols = [encoder.getOutputCol() for encoder in encoders], outputCol = "cat")
#pipeline for cat
pipelineCat = Pipeline(stages = indexers + encoders + [assemblerCat])
df3_buck = pipelineCat.fit(df3_buck).transform(df3_buck)

In [17]:
from pyspark.sql.types import IntegerType
df3_buck = df3_buck.withColumn("AGE", df3_buck["AGE"].cast(IntegerType()))
df3_buck.printSchema

<bound method DataFrame.printSchema of DataFrame[SEX: string, AGE: int, MARST: string, RACE: string, EDUC: string, INCWAGE: string, OCCRecode: string, M_MedWage: string, PayGapRatio: double, payGapClass: double, EDUC_i: double, RACE_i: double, MARST_i: double, OCCRecode_i: double, EDUC_i_e: vector, RACE_i_e: vector, MARST_i_e: vector, OCCRecode_i_e: vector, cat: vector]>

In [18]:
#building assembler for num 
assemblerNum = VectorAssembler(inputCols = num_cols, outputCol = "num")
pipelineNum = Pipeline(stages = [assemblerNum])
df3_buck = pipelineNum.fit(df3_buck).transform(df3_buck)

In [19]:
#combining cat and num assemblers
assembler = VectorAssembler(inputCols = ["cat", "num"], outputCol = "features")

In [20]:
#into the pipeline
pipeline = Pipeline(stages = [assembler])

In [21]:
#creating new df with both cat and num as features
df_temp = pipeline.fit(df3_buck).transform(df3_buck)
df_f = df_temp.select("features","payGapClass")

In [22]:
#Combine all features into one vector named features (without Age added in)
assembler = VectorAssembler(
    inputCols=[encoder.getOutputCol() for encoder in encoders],outputCol="features")

In [23]:
#splitting the training and test sets
train_data,test_data = df_f.randomSplit([0.7,0.3])

In [24]:
## The Classifiers
#importing relevant ones
from pyspark.ml.classification import DecisionTreeClassifier,GBTClassifier,RandomForestClassifier

In [25]:
#use defaults to make the comparison fair
dtc = DecisionTreeClassifier(labelCol="payGapClass",featuresCol="features")
rfc = RandomForestClassifier(labelCol='payGapClass',featuresCol='features')
gbt = GBTClassifier(labelCol='payGapClass',featuresCol='features', maxIter=10)

In [27]:
rfc_model = rfc.fit(train_data)
dtc_model = dtc.fit(train_data)
gbt_model = gbt.fit(train_data)

Py4JJavaError: An error occurred while calling o300.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 88.0 failed 1 times, most recent failure: Lost task 0.0 in stage 88.0 (TID 852, localhost, executor driver): java.lang.IllegalArgumentException: requirement failed: GBTClassifier was given dataset with invalid label 2.0.  Labels must be in {0,1}; note that GBTClassifier currently only supports binary classification.
	at scala.Predef$.require(Predef.scala:224)
	at org.apache.spark.ml.classification.GBTClassifier$$anonfun$1.apply(GBTClassifier.scala:154)
	at org.apache.spark.ml.classification.GBTClassifier$$anonfun$1.apply(GBTClassifier.scala:152)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1005)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
	at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
	at org.apache.spark.ml.tree.impl.DecisionTreeMetadata$.buildMetadata(DecisionTreeMetadata.scala:112)
	at org.apache.spark.ml.tree.impl.RandomForest$.run(RandomForest.scala:105)
	at org.apache.spark.ml.regression.DecisionTreeRegressor.train(DecisionTreeRegressor.scala:125)
	at org.apache.spark.ml.tree.impl.GradientBoostedTrees$.boost(GradientBoostedTrees.scala:291)
	at org.apache.spark.ml.tree.impl.GradientBoostedTrees$.run(GradientBoostedTrees.scala:53)
	at org.apache.spark.ml.classification.GBTClassifier.train(GBTClassifier.scala:167)
	at org.apache.spark.ml.classification.GBTClassifier.train(GBTClassifier.scala:60)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:96)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: requirement failed: GBTClassifier was given dataset with invalid label 2.0.  Labels must be in {0,1}; note that GBTClassifier currently only supports binary classification.
	at scala.Predef$.require(Predef.scala:224)
	at org.apache.spark.ml.classification.GBTClassifier$$anonfun$1.apply(GBTClassifier.scala:154)
	at org.apache.spark.ml.classification.GBTClassifier$$anonfun$1.apply(GBTClassifier.scala:152)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1005)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [None]:
rfc_predictions = rfc_model.transform(test_data)
dtc_predictions = dtc_model.transform(test_data)
gbt_predictions = gbt_model.transform(test_data)

In [None]:
#then we evaluate using the multi class classification evaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
# Select (prediction, true label) and compute test error. 
acc_evaluator = MulticlassClassificationEvaluator(labelCol="payGapClass", predictionCol="prediction", metricName="accuracy")

In [None]:
rfc_acc = acc_evaluator.evaluate(rfc_predictions)
dtc_acc = acc_evaluator.evaluate(dtc_predictions)
gbt_acc = acc_evaluator.evaluate(gbt_predictions)

In [None]:
print('An ensemble Random Forest Classifier has an accuracy of: {0:2.2f}%'.format(rfc_acc*100))
print('A single Decision Tree Classifier has an accuracy of: {0:2.2f}%'.format(dtc_acc*100))
print('An ensemble Gradient Boosted Classifier has an accuracy of: {0:2.2f}%'.format(gbt_acc*100))