In [1]:
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('basics').getOrCreate()

In [2]:
data = spark.read.csv("data.csv",inferSchema=True,header=True)

In [3]:
data.printSchema()

root
 |-- id: integer (nullable = true)
 |-- diagnosis: string (nullable = true)
 |-- radius_mean: double (nullable = true)
 |-- texture_mean: double (nullable = true)
 |-- perimeter_mean: double (nullable = true)
 |-- area_mean: double (nullable = true)
 |-- smoothness_mean: double (nullable = true)
 |-- compactness_mean: double (nullable = true)
 |-- concavity_mean: double (nullable = true)
 |-- concave points_mean: double (nullable = true)
 |-- symmetry_mean: double (nullable = true)
 |-- fractal_dimension_mean: double (nullable = true)
 |-- radius_se: double (nullable = true)
 |-- texture_se: double (nullable = true)
 |-- perimeter_se: double (nullable = true)
 |-- area_se: double (nullable = true)
 |-- smoothness_se: double (nullable = true)
 |-- compactness_se: double (nullable = true)
 |-- concavity_se: double (nullable = true)
 |-- concave points_se: double (nullable = true)
 |-- symmetry_se: double (nullable = true)
 |-- fractal_dimension_se: double (nullable = true)
 |-- radi

In [4]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [5]:
data = data['diagnosis', 'radius_mean', 'perimeter_mean', 'area_mean',
       'compactness_mean', 'concavity_mean', 'concave points_mean',
       'radius_worst', 'perimeter_worst', 'area_worst', 'compactness_worst',
       'concavity_worst', 'concave points_worst']

In [6]:
data.columns

['diagnosis',
 'radius_mean',
 'perimeter_mean',
 'area_mean',
 'compactness_mean',
 'concavity_mean',
 'concave points_mean',
 'radius_worst',
 'perimeter_worst',
 'area_worst',
 'compactness_worst',
 'concavity_worst',
 'concave points_worst']

In [7]:
assembler = VectorAssembler(
  inputCols=['radius_mean', 'perimeter_mean', 'area_mean',
       'compactness_mean', 'concavity_mean', 'concave points_mean',
       'radius_worst', 'perimeter_worst', 'area_worst', 'compactness_worst',
       'concavity_worst', 'concave points_worst'],
              outputCol="features")

In [8]:
output = assembler.transform(data)

In [9]:
from pyspark.ml.feature import StringIndexer

In [10]:
indexer = StringIndexer(inputCol="diagnosis", outputCol="diagnosisIndex")
output_fixed = indexer.fit(output).transform(output)

In [11]:
final_data = output_fixed.select("features",'diagnosisIndex')

In [12]:
train_data,test_data = final_data.randomSplit([0.7,0.3])

In [13]:
from pyspark.ml.classification import DecisionTreeClassifier,GBTClassifier,RandomForestClassifier
from pyspark.ml import Pipeline

In [15]:
dtc = DecisionTreeClassifier(labelCol='diagnosisIndex',featuresCol='features')
rfc = RandomForestClassifier(labelCol='diagnosisIndex',featuresCol='features')
gbt = GBTClassifier(labelCol='diagnosisIndex',featuresCol='features')

In [14]:
dtc = DecisionTreeClassifier(labelCol='diagnosisIndex',featuresCol='features',maxDepth=10)
rfc = RandomForestClassifier(labelCol='diagnosisIndex',featuresCol='features',numTrees=30,maxDepth=10)
gbt = GBTClassifier(labelCol='diagnosisIndex',featuresCol='features',maxDepth=10)

In [15]:
dtc_model = dtc.fit(train_data)
rfc_model = rfc.fit(train_data)
gbt_model = gbt.fit(train_data)

In [16]:
dtc_model

DecisionTreeClassificationModel (uid=DecisionTreeClassifier_4f309fd779c3f9a3ddbb) of depth 8 with 37 nodes

In [17]:
rfc_model

RandomForestClassificationModel (uid=rfc_e12721c441c0) with 30 trees

In [18]:
gbt_model

GBTClassificationModel (uid=GBTClassifier_4022a54253bba2f764ce) with 20 trees

In [19]:
dtc_predictions = dtc_model.transform(test_data)
rfc_predictions = rfc_model.transform(test_data)
gbt_predictions = gbt_model.transform(test_data)

In [20]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Note that the label column isn't named label, it's named PrivateIndex in this case.
my_binary_eval = BinaryClassificationEvaluator(labelCol = 'diagnosisIndex')

In [21]:
print("DTC")
print(my_binary_eval.evaluate(dtc_predictions))

print("RFC")
print(my_binary_eval.evaluate(rfc_predictions))

my_binary_gbt_eval = BinaryClassificationEvaluator(labelCol='diagnosisIndex', rawPredictionCol='prediction')
print("GBT")
print(my_binary_gbt_eval.evaluate(gbt_predictions))

DTC
0.9334880561530623
RFC
0.969489414694894
GBT
0.9245443224272614


In [22]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [23]:
acc_evaluator = MulticlassClassificationEvaluator(labelCol="diagnosisIndex", predictionCol="prediction", metricName="accuracy")

In [24]:
dtc_acc = acc_evaluator.evaluate(dtc_predictions)
rfc_acc = acc_evaluator.evaluate(rfc_predictions)
gbt_acc = acc_evaluator.evaluate(gbt_predictions)

In [25]:
print("Here are the results!")
print('-'*40)
print('A single decision tree has an accuracy of: {0:2.2f}%'.format(dtc_acc*100))
print('-'*40)
print('A random forest ensemble has an accuracy of: {0:2.2f}%'.format(rfc_acc*100))
print('-'*40)
print('An ensemble using GBT has an accuracy of: {0:2.2f}%'.format(gbt_acc*100))

Here are the results!
----------------------------------------
A single decision tree has an accuracy of: 93.30%
----------------------------------------
A random forest ensemble has an accuracy of: 93.81%
----------------------------------------
An ensemble using GBT has an accuracy of: 93.30%
