In [1]:
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('tree_methods_adv').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/22 11:45:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
data = spark.read.csv("./Dataset/MegerdData.csv", header=True, inferSchema=True)

                                                                                

In [3]:
# Let's get an idea of what the data looks like. 
data.printSchema()

root
 |-- Patient ID: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Cholesterol: integer (nullable = true)
 |-- Systolic_BP: integer (nullable = true)
 |-- Diastolic_BP: integer (nullable = true)
 |-- Heart Rate: integer (nullable = true)
 |-- Diabetes: integer (nullable = true)
 |-- Family History: integer (nullable = true)
 |-- Smoking: integer (nullable = true)
 |-- Obesity: integer (nullable = true)
 |-- Alcohol Consumption: integer (nullable = true)
 |-- Exercise Hours Per Week: double (nullable = true)
 |-- Diet: string (nullable = true)
 |-- Previous Heart Problems: integer (nullable = true)
 |-- Medication Use: integer (nullable = true)
 |-- Stress Level: string (nullable = true)
 |-- Sedentary Hours Per Day: double (nullable = true)
 |-- Income: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- Triglycerides: integer (nullable = true)
 |-- Physical Activity Days Per Week: integer (nullable = true)
 |-- Slee

In [4]:
data.head()

24/05/22 11:45:22 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Row(Patient ID='BMW7812', Age=67, Sex='Male', Cholesterol=208, Systolic_BP=158, Diastolic_BP=88, Heart Rate=72, Diabetes=0, Family History=0, Smoking=1, Obesity=0, Alcohol Consumption=0, Exercise Hours Per Week=4.168188835, Diet='Average', Previous Heart Problems=0, Medication Use=0, Stress Level='99', Sedentary Hours Per Day=6.615001453, Income=261404, BMI=31.25123273, Triglycerides=286, Physical Activity Days Per Week=0, Sleep Hours Per Day=None, Country='Argentina', Continent='South America', Hemisphere='Southern Hemisphere', Heart Attack Risk=0)

In [5]:
# A few things we need to do before Spark can accept the data!
# It needs to be in the form of two columns: "label" and "features".

# Import VectorAssembler and Vectors
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [6]:
# Let's visualise the columns to help with assembly. 
data.columns

['Patient ID',
 'Age',
 'Sex',
 'Cholesterol',
 'Systolic_BP',
 'Diastolic_BP',
 'Heart Rate',
 'Diabetes',
 'Family History',
 'Smoking',
 'Obesity',
 'Alcohol Consumption',
 'Exercise Hours Per Week',
 'Diet',
 'Previous Heart Problems',
 'Medication Use',
 'Stress Level',
 'Sedentary Hours Per Day',
 'Income',
 'BMI',
 'Triglycerides',
 'Physical Activity Days Per Week',
 'Sleep Hours Per Day',
 'Country',
 'Continent',
 'Hemisphere',
 'Heart Attack Risk']

In [7]:
# Combine all features into one vector named features.
assembler = VectorAssembler(
  inputCols=[
 'Age',
 
 'Cholesterol',
 'Systolic_BP',
 'Diastolic_BP',
 'Heart Rate',
 'Diabetes',
 'Family History',
 'Smoking',
 'Obesity',
 'Alcohol Consumption',
 'Exercise Hours Per Week',
 
 'Previous Heart Problems',
 'Medication Use',
 
 'Sedentary Hours Per Day',
 'Income',
 'BMI',
 'Triglycerides',
 'Physical Activity Days Per Week',
 'Sleep Hours Per Day',
 
 ],
              outputCol="features",handleInvalid="keep")


In [8]:
# Let's transform the data. 
output = assembler.transform(data)

In [9]:
# Let's import the string indexer (similar to the logistic regression exercises).
from pyspark.ml.feature import StringIndexer


In [10]:
indexer = StringIndexer(inputCol="Heart Attack Risk", outputCol="Heart Attack Risk Index")
output_fixed = indexer.fit(output).transform(output)

                                                                                

In [11]:
# Let's select the two columns we want. Features (which contains vectors), and the predictor.
final_data = output_fixed.select("features",'Heart Attack Risk Index')
final_data.printSchema()
display(final_data)
# Split the training and testing set.
train_data,test_data = final_data.randomSplit([0.8,0.2])

root
 |-- features: vector (nullable = true)
 |-- Heart Attack Risk Index: double (nullable = false)



DataFrame[features: vector, Heart Attack Risk Index: double]

In [12]:
from pyspark.ml.classification import DecisionTreeClassifier,GBTClassifier,RandomForestClassifier
from pyspark.ml import Pipeline

In [13]:
dtc = DecisionTreeClassifier(labelCol='Heart Attack Risk Index',featuresCol='features')
rfc = RandomForestClassifier(labelCol='Heart Attack Risk Index',featuresCol='features')
gbt = GBTClassifier(labelCol='Heart Attack Risk Index',featuresCol='features')

In [14]:
# Train the models (it's three models, so it might take some time).
dtc_model = dtc.fit(train_data)
rfc_model = rfc.fit(train_data)
gbt_model = gbt.fit(train_data)

                                                                                

In [15]:
dtc_predictions = dtc_model.transform(test_data)
rfc_predictions = rfc_model.transform(test_data)
gbt_predictions = gbt_model.transform(test_data)

In [16]:
# Let's start off with binary classification.
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Note that the label column isn't named label, it's named PrivateIndex in this case.
my_binary_eval = BinaryClassificationEvaluator(labelCol = 'Heart Attack Risk Index')

In [17]:
# This is the area under the curve. This indicates that the data is highly seperable.
print("DTC")
print(my_binary_eval.evaluate(dtc_predictions))

# RFC improves accuracy but also model complexity. RFC outperforms DTC in nearly every situation.
print("RFC")
print(my_binary_eval.evaluate(rfc_predictions))

# We can't repeat these exact steps for GBT. If you print the schema of all three, you may be able to notice why.
# Instead, let's redefine the object:
my_binary_gbt_eval = BinaryClassificationEvaluator(labelCol='Heart Attack Risk Index', rawPredictionCol='prediction')
print("GBT")
print(my_binary_gbt_eval.evaluate(gbt_predictions))

# Interesting, GBT didn't perform as well as RFC or DTC. But that's because we left the model's settings as default. 
# In most cases, we should adjust these parameters. More trees may increase accuracy, but decrease precision and recall. 


DTC


                                                                                

0.5016571618037134
RFC


                                                                                

0.5032599469496013
GBT


24/05/22 11:54:03 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/05/22 11:54:04 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
                                                                                

0.49576259946949597


In [18]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


In [19]:
# Select (prediction, true label) and compute test error. 
acc_evaluator = MulticlassClassificationEvaluator(labelCol="Heart Attack Risk Index", predictionCol="prediction", metricName="accuracy")


In [20]:
dtc_acc = acc_evaluator.evaluate(dtc_predictions)
rfc_acc = acc_evaluator.evaluate(rfc_predictions)
gbt_acc = acc_evaluator.evaluate(gbt_predictions)


                                                                                

In [21]:
# Let's do something a bit more complex in terms of printing, just so it's formatted nicer. 
print("Here are the results!")
print('-'*40)
print('A single decision tree has an accuracy of: {0:2.2f}%'.format(dtc_acc*100))
print('-'*40)
print('A random forest ensemble has an accuracy of: {0:2.2f}%'.format(rfc_acc*100))
print('-'*40)
print('An ensemble using GBT has an accuracy of: {0:2.2f}%'.format(gbt_acc*100))

Here are the results!
----------------------------------------
A single decision tree has an accuracy of: 62.43%
----------------------------------------
A random forest ensemble has an accuracy of: 64.09%
----------------------------------------
An ensemble using GBT has an accuracy of: 61.77%


24/05/22 12:02:59 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 121327 ms exceeds timeout 120000 ms
24/05/22 12:08:02 WARN SparkContext: Killing executors is not supported by current scheduler.
