### Tree Methods

In [8]:
# Must be included at the beginning of each new notebook. Remember to change the app name.
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('722TreeMethods').getOrCreate()

# import other packages
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.types import (StructField,StringType,IntegerType,StructType)

# Import VectorAssembler and Vectors
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

 Load Data

In [2]:
# integrate the 2 sources
# code adapted from https://www.geeksforgeeks.org/merge-two-dataframes-in-pyspark/
import functools 
def unionAll(dfs):
    return functools.reduce(lambda df1, df2: df1.union(df2.select(df1.columns)), dfs)

In [3]:
# create a variable with the correct structure
data_schema = [StructField('Total Household Income',IntegerType(),True), StructField('Region',StringType(),True),
              StructField('Total Food Expenditure',IntegerType(),True), StructField('Main Source of Income',StringType(),True),
              StructField('Agricultural Household indicator',IntegerType(),True), StructField('Bread and Cereals Expenditure',IntegerType(),True),
              StructField('Total Rice Expenditure',IntegerType(),True), StructField('Meat Expenditure',IntegerType(),True),
              StructField('Total Fish and  marine products Expenditure',IntegerType(),True), StructField('Fruit Expenditure',IntegerType(),True),
              StructField('Vegetables Expenditure',IntegerType(),True), StructField('Restaurant and hotels Expenditure',IntegerType(),True),
              StructField('Alcoholic Beverages Expenditure',IntegerType(),True), StructField('Tobacco Expenditure',IntegerType(),True),
              StructField('Clothing, Footwear and Other Wear Expenditure',IntegerType(),True), StructField('Housing and water Expenditure',IntegerType(),True),
              StructField('Imputed House Rental Value',IntegerType(),True), StructField('Medical Care Expenditure',IntegerType(),True),
              StructField('Transportation Expenditure',IntegerType(),True), StructField('Communication Expenditure',IntegerType(),True),
              StructField('Education Expenditure',IntegerType(),True), StructField('Miscellaneous Goods and Services Expenditure',IntegerType(),True),
              StructField('Special Occasions Expenditure',IntegerType(),True), StructField('Crop Farming and Gardening expenses',IntegerType(),True),
              StructField('Total Income from Entrepreneurial Acitivites',IntegerType(),True), StructField('Household Head Sex',StringType(),True),
              StructField('Household Head Age',IntegerType(),True), StructField('Household Head Marital Status',StringType(),True),
              StructField('Household Head Highest Grade Completed',StringType(),True), StructField('Household Head Job or Business Indicator',StringType(),True),
              StructField('Household Head Occupation',StringType(),True), StructField('Household Head Class of Worker',StringType(),True),
              StructField('Type of Household',StringType(),True), StructField('Total Number of Family members',IntegerType(),True),
              StructField('Members with age less than 5 year old',IntegerType(),True), StructField('Members with age 5 - 17 years old',IntegerType(),True),
              StructField('Total number of family members employed',IntegerType(),True), StructField('Type of Building/House',StringType(),True),
              StructField('Type of Roof',StringType(),True), StructField('Type of Walls',StringType(),True),
              StructField('House Floor Area',IntegerType(),True), StructField('House Age',IntegerType(),True),
              StructField('Number of bedrooms',IntegerType(),True), StructField('Tenure Status',StringType(),True),
              StructField('Toilet Facilities',StringType(),True), StructField('Electricity',IntegerType(),True),
              StructField('Main Source of Water Supply',StringType(),True), StructField('Number of Television',IntegerType(),True),
              StructField('Number of CD/VCD/DVD',IntegerType(),True), StructField('Number of Component/Stereo set',IntegerType(),True),
              StructField('Number of Refrigerator/Freezer',IntegerType(),True), StructField('Number of Washing Machine',IntegerType(),True),
              StructField('Number of Airconditioner',IntegerType(),True), StructField('Number of Car, Jeep, Van',IntegerType(),True),
              StructField('Number of Landline/wireless telephones',IntegerType(),True), StructField('Number of Cellular phone',IntegerType(),True),
              StructField('Number of Personal Computer',IntegerType(),True), StructField('Number of Stove with Oven/Gas Range',IntegerType(),True),
              StructField('Number of Motorized Banca',IntegerType(),True), StructField('Number of Motorcycle/Tricycle',IntegerType(),True)
              ]

final_struct = StructType(fields=data_schema)

In [4]:
# And now we can read in the data using that schema. The other fields are read as integer. 
df1 = spark.read.option("header", True).csv('dataset/data_ncr.csv', schema=final_struct)
df2 = spark.read.option("header", True).csv('dataset/data_therest.csv', schema=final_struct)
statsdata = unionAll([df1, df2])

print("Combined dataset with updated dtypes:", statsdata.count(), "x", len(statsdata.columns))
statsdata.printSchema()



Combined dataset with updated dtypes: 41544 x 60
root
 |-- Total Household Income: integer (nullable = true)
 |-- Region: string (nullable = true)
 |-- Total Food Expenditure: integer (nullable = true)
 |-- Main Source of Income: string (nullable = true)
 |-- Agricultural Household indicator: integer (nullable = true)
 |-- Bread and Cereals Expenditure: integer (nullable = true)
 |-- Total Rice Expenditure: integer (nullable = true)
 |-- Meat Expenditure: integer (nullable = true)
 |-- Total Fish and  marine products Expenditure: integer (nullable = true)
 |-- Fruit Expenditure: integer (nullable = true)
 |-- Vegetables Expenditure: integer (nullable = true)
 |-- Restaurant and hotels Expenditure: integer (nullable = true)
 |-- Alcoholic Beverages Expenditure: integer (nullable = true)
 |-- Tobacco Expenditure: integer (nullable = true)
 |-- Clothing, Footwear and Other Wear Expenditure: integer (nullable = true)
 |-- Housing and water Expenditure: integer (nullable = true)
 |-- Impute

                                                                                

Clean Data

In [5]:
# Drop columns
Columns_To_Remove = ['Region', 'Number of Television', 'Number of CD/VCD/DVD', 'Number of Component/Stereo set', 
                     'Number of Refrigerator/Freezer', 'Number of Washing Machine', 'Number of Airconditioner', 
                     'Number of Car, Jeep, Van', 'Number of Landline/wireless telephones', 'Number of Cellular phone', 
                     'Number of Personal Computer', 'Number of Stove with Oven/Gas Range', 'Number of Motorized Banca', 
                     'Number of Motorcycle/Tricycle']
statsupdate = statsdata.drop(*Columns_To_Remove)

# Add New Column with Total Expenses
expenses_cols = ['Total Food Expenditure', 'Bread and Cereals Expenditure', 'Total Rice Expenditure', 'Meat Expenditure', 'Total Fish and  marine products Expenditure', 'Fruit Expenditure', 'Vegetables Expenditure', 'Restaurant and hotels Expenditure', 'Alcoholic Beverages Expenditure', 'Tobacco Expenditure', 'Clothing, Footwear and Other Wear Expenditure', 'Housing and water Expenditure', 'Medical Care Expenditure', 'Transportation Expenditure', 'Communication Expenditure', 'Education Expenditure', 'Miscellaneous Goods and Services Expenditure', 'Special Occasions Expenditure', 'Crop Farming and Gardening expenses']
statsupdate = statsupdate.withColumn('Total Expenditures', sum(statsupdate[col] for col in expenses_cols))

# Drop the other expenses columns, except Medical Care Expenditure
expenses_cols.remove('Medical Care Expenditure')
statsupdate = statsupdate.drop(*expenses_cols)
print("Updated dimensions after removing",len(expenses_cols)+len(Columns_To_Remove),"columns:",statsupdate.count(), "x", len(statsupdate.columns))

# remove na/nulls
statsupdate = statsupdate.na.fill('Not Specified')

# remove outliers
# calculate upper and lower bounds for outliers, adapted from internet
def calculate_bounds(df):
    bounds = {
        c: dict(
            zip(["q1", "q3"], df.approxQuantile(c, [0.25, 0.75], 0))
        )
        for c,d in zip(df.columns, df.dtypes) if d[1] == "int"
    }
    for c in bounds:
        iqr = bounds[c]['q3'] - bounds[c]['q1']
        bounds[c]['min'] = bounds[c]['q1'] - (iqr * 1.5)
        bounds[c]['max'] = bounds[c]['q3'] + (iqr * 1.5)
    return bounds
bounds = calculate_bounds(statsupdate)

# remove outliers for "Total Household Income"
c = "Total Household Income"
newstats = statsupdate.filter(F.col(c).between(bounds[c]['min'], bounds[c]['max']))

# remove outliers for "Medical Care Expenditure"
c = "Medical Care Expenditure"
newstats = newstats.filter(F.col(c).between(bounds[c]['min'], bounds[c]['max']))

# rename misspelled column
newstats = newstats.withColumnRenamed("Total Income from Entrepreneurial Acitivites", "Total Income from Entrepreneurial Activities")

# categorize
#create a list of the columns that are string types
categoricalColumns = [item[0] for item in newstats.dtypes if item[1].startswith('string') ]

#define a list of stages in your pipeline. The string indexer will be one stage
stages = []
#iterate through all categorical values, create a string indexer, assign new column name with 'Index' at end
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + ' Index')
    stages += [stringIndexer]

pipeline = Pipeline(stages = stages)
newstats = pipeline.fit(newstats).transform(newstats)

newstats.printSchema()

Updated dimensions after removing 18 columns: 41544 x 29


                                                                                

root
 |-- Total Household Income: integer (nullable = true)
 |-- Main Source of Income: string (nullable = false)
 |-- Agricultural Household indicator: integer (nullable = true)
 |-- Imputed House Rental Value: integer (nullable = true)
 |-- Medical Care Expenditure: integer (nullable = true)
 |-- Total Income from Entrepreneurial Activities: integer (nullable = true)
 |-- Household Head Sex: string (nullable = false)
 |-- Household Head Age: integer (nullable = true)
 |-- Household Head Marital Status: string (nullable = false)
 |-- Household Head Highest Grade Completed: string (nullable = false)
 |-- Household Head Job or Business Indicator: string (nullable = false)
 |-- Household Head Occupation: string (nullable = false)
 |-- Household Head Class of Worker: string (nullable = false)
 |-- Type of Household: string (nullable = false)
 |-- Total Number of Family members: integer (nullable = true)
 |-- Members with age less than 5 year old: integer (nullable = true)
 |-- Members wit

### Run Tree Methods Classifiers

In [6]:
from pyspark.ml.classification import DecisionTreeClassifier,GBTClassifier,RandomForestClassifier


Prepare Data Formats

In [48]:
assembler = VectorAssembler(
    inputCols=["Total Household Income", "Main Source of Income Index", 
            "Agricultural Household indicator","Imputed House Rental Value",
            "Total Income from Entrepreneurial Activities",
            "Household Head Age", "Total Number of Family members",
            "Members with age less than 5 year old","Members with age 5 - 17 years old",
            "Total number of family members employed", "House Floor Area",
             "House Age","Number of bedrooms",
             "Electricity","Total Expenditures",
             "Main Source of Income Index","Household Head Sex Index",
             "Household Head Marital Status Index","Household Head Highest Grade Completed Index",
             "Household Head Job or Business Indicator Index",
             "Household Head Class of Worker Index","Type of Household Index",
             "Type of Building/House Index","Type of Roof Index","Type of Walls Index",
             "Tenure Status Index","Toilet Facilities Index",
             "Main Source of Water Supply Index"],
    outputCol="features")
output = assembler.transform(newstats)

labelIndexer = StringIndexer(inputCol = "Medical Care Expenditure", outputCol = "Medical Care Expenditure Label")

output = labelIndexer.fit(output).transform(output)


In [49]:
# Let's select the two columns we want. Features (which contains vectors), and the predictor.
final_data = output.select("features",'Medical Care Expenditure Label')

In [50]:
final_data.printSchema()
final_data.head()

root
 |-- features: vector (nullable = true)
 |-- Medical Care Expenditure Label: double (nullable = false)



Row(features=SparseVector(28, {0: 301900.0, 3: 72000.0, 5: 73.0, 6: 4.0, 9: 1.0, 10: 25.0, 11: 15.0, 12: 2.0, 13: 1.0, 14: 381306.0, 16: 1.0, 17: 1.0, 18: 1.0, 19: 1.0, 20: 2.0, 21: 1.0}), Medical Care Expenditure Label=54.0)

In [51]:
train_data,test_data = final_data.randomSplit([0.7,0.3])

Create and Train Models

In [57]:
# create all 3 models
# Use defaults to make the comparison "fair". This simplifies the comparison process.
dtc = DecisionTreeClassifier(labelCol='Medical Care Expenditure Label',featuresCol='features', maxBins=46)
rfc = RandomForestClassifier(labelCol='Medical Care Expenditure Label',featuresCol='features',  maxBins=46)
gbt = GBTClassifier(labelCol='Medical Care Expenditure Label',featuresCol='features',  maxBins=46)

In [56]:
dtc_model = dtc.fit(train_data)
dtc_model.numClasses

22/10/12 10:12:28 WARN DAGScheduler: Broadcasting large task binary with size 1532.0 KiB
22/10/12 10:12:33 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB
22/10/12 10:12:39 WARN DAGScheduler: Broadcasting large task binary with size 4.3 MiB
                                                                                

6725

In [None]:
# Train the models (it's three models, so it might take some time).
dtc_model = dtc.fit(train_data)
rfc_model = rfc.fit(train_data)
gbt_model = gbt.fit(train_data)

In [58]:
rfc_model = rfc.fit(train_data)


22/10/12 10:14:26 WARN DAGScheduler: Broadcasting large task binary with size 3.5 MiB
22/10/12 10:14:27 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
22/10/12 10:14:31 WARN DAGScheduler: Broadcasting large task binary with size 6.0 MiB
22/10/12 10:14:33 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
22/10/12 10:14:38 WARN DAGScheduler: Broadcasting large task binary with size 8.0 MiB
22/10/12 10:14:40 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
22/10/12 10:14:44 WARN DAGScheduler: Broadcasting large task binary with size 9.5 MiB
22/10/12 10:14:46 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
22/10/12 10:14:49 WARN DAGScheduler: Broadcasting large task binary with size 10.9 MiB
22/10/12 10:14:51 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
22/10/12 10:14:55 WARN DAGScheduler: Broadcasting large task binary with size 9.5 MiB
22/10/12 10:14:57 WARN DAGScheduler: Broadcasting lar

In [59]:
gbt_model = gbt.fit(train_data)

22/10/12 10:16:25 ERROR Executor: Exception in task 0.0 in stage 149.0 (TID 297)
java.lang.IllegalArgumentException: requirement failed: GBTClassifier was given dataset with invalid label 6007.0.  Labels must be in {0,1}; note that GBTClassifier currently only supports binary classification.
	at scala.Predef$.require(Predef.scala:281)
	at org.apache.spark.ml.classification.GBTClassifier.$anonfun$train$2(GBTClassifier.scala:176)
	at org.apache.spark.ml.classification.GBTClassifier.$anonfun$train$2$adapted(GBTClassifier.scala:173)
	at org.apache.spark.ml.PredictorParams.$anonfun$extractInstances$2(Predictor.scala:96)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$SliceIterator.next(Iterator.scala:273)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)

Py4JJavaError: An error occurred while calling o1810.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 149.0 failed 1 times, most recent failure: Lost task 0.0 in stage 149.0 (TID 297) (ip-172-31-14-89.ec2.internal executor driver): java.lang.IllegalArgumentException: requirement failed: GBTClassifier was given dataset with invalid label 6007.0.  Labels must be in {0,1}; note that GBTClassifier currently only supports binary classification.
	at scala.Predef$.require(Predef.scala:281)
	at org.apache.spark.ml.classification.GBTClassifier.$anonfun$train$2(GBTClassifier.scala:176)
	at org.apache.spark.ml.classification.GBTClassifier.$anonfun$train$2$adapted(GBTClassifier.scala:173)
	at org.apache.spark.ml.PredictorParams.$anonfun$extractInstances$2(Predictor.scala:96)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$SliceIterator.next(Iterator.scala:273)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at scala.collection.AbstractIterator.to(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1449)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
	at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1449)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.take(RDD.scala:1422)
	at org.apache.spark.ml.tree.impl.DecisionTreeMetadata$.buildMetadata(DecisionTreeMetadata.scala:119)
	at org.apache.spark.ml.tree.impl.GradientBoostedTrees$.boost(GradientBoostedTrees.scala:333)
	at org.apache.spark.ml.tree.impl.GradientBoostedTrees$.run(GradientBoostedTrees.scala:61)
	at org.apache.spark.ml.classification.GBTClassifier.$anonfun$train$1(GBTClassifier.scala:209)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.classification.GBTClassifier.train(GBTClassifier.scala:170)
	at org.apache.spark.ml.classification.GBTClassifier.train(GBTClassifier.scala:58)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:151)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: requirement failed: GBTClassifier was given dataset with invalid label 6007.0.  Labels must be in {0,1}; note that GBTClassifier currently only supports binary classification.
	at scala.Predef$.require(Predef.scala:281)
	at org.apache.spark.ml.classification.GBTClassifier.$anonfun$train$2(GBTClassifier.scala:176)
	at org.apache.spark.ml.classification.GBTClassifier.$anonfun$train$2$adapted(GBTClassifier.scala:173)
	at org.apache.spark.ml.PredictorParams.$anonfun$extractInstances$2(Predictor.scala:96)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$SliceIterator.next(Iterator.scala:273)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at scala.collection.AbstractIterator.to(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1449)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


In [60]:
# Model Comparison - compare each of these models
dtc_predictions = dtc_model.transform(test_data)
rfc_predictions = rfc_model.transform(test_data)
# gbt_predictions = gbt_model.transform(test_data)

In [63]:
# Evaluation metrics
# Let's start off with binary classification.
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Note that the label column isn't named label, it's named PrivateIndex in this case.
my_binary_eval = BinaryClassificationEvaluator(labelCol = 'Medical Care Expenditure Label')

In [70]:
print(type(my_binary_eval))

<class 'pyspark.ml.evaluation.BinaryClassificationEvaluator'>


In [64]:
# This is the area under the curve. This indicates that the data is highly seperable.
print("DTC")
print(my_binary_eval.evaluate(dtc_predictions))

# RFC improves accuracy but also model complexity. RFC outperforms DTC in nearly every situation.
print("RFC")
print(my_binary_eval.evaluate(rfc_predictions))


DTC


IllegalArgumentException: requirement failed: rawPredictionCol vectors must have length=2, but got 6725

In [65]:
# Let's import the evaluator.
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [66]:
# Select (prediction, true label) and compute test error. 
acc_evaluator = MulticlassClassificationEvaluator(labelCol="Medical Care Expenditure Label", predictionCol="prediction", metricName="accuracy")

In [67]:
dtc_acc = acc_evaluator.evaluate(dtc_predictions)
rfc_acc = acc_evaluator.evaluate(rfc_predictions)
#gbt_acc = acc_evaluator.evaluate(gbt_predictions)

22/10/12 10:21:37 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
22/10/12 10:21:42 WARN DAGScheduler: Broadcasting large task binary with size 49.9 MiB
                                                                                

In [68]:
# Let's do something a bit more complex in terms of printing, just so it's formatted nicer. 
print("Here are the results!")
print('-'*40)
print('A single decision tree has an accuracy of: {0:2.2f}%'.format(dtc_acc*100))
print('-'*40)
print('A random forest ensemble has an accuracy of: {0:2.2f}%'.format(rfc_acc*100))
print('-'*40)
#print('An ensemble using GBT has an accuracy of: {0:2.2f}%'.format(gbt_acc*100))

Here are the results!
----------------------------------------
A single decision tree has an accuracy of: 3.90%
----------------------------------------
A random forest ensemble has an accuracy of: 3.96%
----------------------------------------


22/10/12 10:25:52 WARN NettyRpcEnv: Ignored failure: java.util.concurrent.TimeoutException: Cannot receive any reply from ip-172-31-14-89.ec2.internal:34423 in 10000 milliseconds
