In [1]:
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('lachlan_722').getOrCreate()

In [2]:
from pyspark.sql.types import (StructField,StringType,IntegerType,BinaryType,StructType)

In [3]:
# import data 1
df1 = spark.read.csv('./Data 1.csv',inferSchema=True,header=True)

In [4]:
# import dat 2
df2 = spark.read.csv('./Data 2.csv',inferSchema=True,header=True)

In [5]:
#join data 1 and data 2
df3 = df1.join(df2, on=['id'], how='left_outer')

In [6]:
#drop blood_type due to incompleteness
df4 = df3.drop("blood_type")

In [7]:
# replace null weights with average 
df5 = df4.na.fill(75, subset=['weight'])

In [8]:
# replace null weights with zero
df5a = df4.na.fill(0, subset=['weight'])

In [9]:
# remove extreme ap_hi and ap_lo values
df6 =df5.where("gender > 0")

In [10]:
df7 =df6.where("ap_hi < 400")

In [11]:
df8 =df7.where("ap_hi > 0")

In [12]:
df9 =df8.where("ap_lo < 400")

In [13]:
df10 =df9.where("ap_lo > 0")

In [14]:
# construct bmi column
df11 = df10.withColumn('height_in_meters',df10['height']/100)

In [15]:
df12 = df11.withColumn('height_squared',df11['height_in_meters']*df11['height_in_meters'])

In [16]:
df13 = df12.withColumn('bmi',df11['weight']/df12['height_squared'])

In [17]:
df14 = df13.drop("height_in_meters", "height_squared")

In [18]:
df15 = df14.drop("height", "weight")

In [19]:
# only include continuous
dfcontinuous = df15.drop("id", "gender", "eduation", "cholesterol", "gluc", "smoke", "alco", "active")

In [20]:
# only include categorical
dfcategorical = df15.drop("id", "age", "height", "weight", "job_current","ap_hi", "ap_lo", "heart_rate")

In [21]:
# only include most highly correlated
df16 = df15.drop("id", "height", "weight", "eduation", "job_current", "heart_rate")

In [22]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [23]:
# assemble all features
assembler = VectorAssembler(
  inputCols=[ 
         'id',
         'age',
         'gender',
         'height',
         'weight',
         'eduation',
         'job_current',
         'ap_hi',
         'ap_lo',
         'cholesterol',
         'gluc',
         'smoke',
         'alco',
         'active',
         'heart_rate',
         'bmi'],
              outputCol="features")

In [24]:
# height and weight included
assemblerA = VectorAssembler(
  inputCols=[ 
         'id',
         'age',
         'gender',
         'height',
         'weight',
         'eduation',
         'job_current',
         'ap_hi',
         'ap_lo',
         'cholesterol',
         'gluc',
         'smoke',
         'alco',
         'active',
         'heart_rate',
         ],
              outputCol="features")

In [25]:
# bmi included
assemblerB = VectorAssembler(
  inputCols=[ 
         'id',
         'age',
         'gender',
         'eduation',
         'job_current',
         'ap_hi',
         'ap_lo',
         'cholesterol',
         'gluc',
         'smoke',
         'alco',
         'active',
         'heart_rate',
         'bmi'],
              outputCol="features")

In [26]:
# assemble only continuous features
assemblerC = VectorAssembler(
  inputCols=[ 
         'age',
         'job_current',
         'ap_hi',
         'ap_lo',
         'heart_rate',
         'bmi'],
              outputCol="features")

In [27]:
# assemble only categorical features
assemblerD = VectorAssembler(
  inputCols=[ 
         'gender',
         'eduation',
         'cholesterol',
         'gluc',
         'smoke',
         'alco',
         'active',
         'bmi'],
              outputCol="features")

In [28]:
# assemble only top ten correlated features
assemblerE = VectorAssembler(
  inputCols=[ 
         'age',
         'gender',
         'ap_hi',
         'ap_lo',
         'cholesterol',
         'gluc',
         'smoke',
         'alco',
         'active',
         'bmi'],
              outputCol="features")

In [29]:
output = assemblerA.transform(df10)

In [30]:
output = assemblerB.transform(df15)

In [31]:
output = assemblerC.transform(dfcontinuous)

In [32]:
output = assemblerD.transform(dfcategorical)

In [33]:
outputUsed = assemblerE.transform(df16)

In [34]:
from pyspark.ml.feature import StringIndexer

In [35]:
# index label
indexer = StringIndexer(inputCol="cardio", outputCol="CardioIndex")
output_fixed = indexer.fit(outputUsed).transform(outputUsed)

In [36]:
from pyspark.ml.feature import StringIndexer, VectorIndexer

In [37]:
# recognise categorical features
vectorIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=8)
output_fixed2 = vectorIndexer.fit(output_fixed).transform(output_fixed)

In [38]:
final_data = output_fixed2.select("indexedFeatures",'CardioIndex')

In [39]:
# split data

In [40]:
train_data,test_data = final_data.randomSplit([0.7,0.3])

In [41]:
from pyspark.ml.classification import DecisionTreeClassifier,GBTClassifier,RandomForestClassifier
from pyspark.ml import Pipeline

In [42]:
# load preliminary models
dtc = DecisionTreeClassifier(labelCol='CardioIndex',featuresCol='indexedFeatures')
rfc = RandomForestClassifier(labelCol='CardioIndex',featuresCol='indexedFeatures')
gbt = GBTClassifier(labelCol='CardioIndex',featuresCol='indexedFeatures')

In [43]:
# train preliminary models
dtc_model = dtc.fit(train_data)
rfc_model = rfc.fit(train_data)
gbt_model = gbt.fit(train_data)

KeyboardInterrupt: 

In [None]:
dtc_predictions = dtc_model.transform(test_data)
rfc_predictions = rfc_model.transform(test_data)
gbt_predictions = gbt_model.transform(test_data)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [None]:
my_binary_eval = BinaryClassificationEvaluator(labelCol = 'CardioIndex')

In [None]:
print("DTC")
print(my_binary_eval.evaluate(dtc_predictions))

In [None]:
print("RFC")
print(my_binary_eval.evaluate(rfc_predictions))

In [None]:
my_binary_gbt_eval = BinaryClassificationEvaluator(labelCol='CardioIndex', rawPredictionCol='prediction')
print("GBT")
print(my_binary_gbt_eval.evaluate(gbt_predictions))

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
acc_evaluator = MulticlassClassificationEvaluator(labelCol="CardioIndex", predictionCol="prediction", metricName="accuracy")

In [None]:
dtc_acc = acc_evaluator.evaluate(dtc_predictions)
rfc_acc = acc_evaluator.evaluate(rfc_predictions)
gbt_acc = acc_evaluator.evaluate(gbt_predictions)

In [None]:
# prediction accuracies for preliminary models
print("Here are the results!")
print('-'*40)
print('A single decision tree has an accuracy of: {0:2.2f}%'.format(dtc_acc*100))
print('-'*40)
print('A random forest ensemble has an accuracy of: {0:2.2f}%'.format(rfc_acc*100))
print('-'*40)
print('An ensemble using GBT has an accuracy of: {0:2.2f}%'.format(gbt_acc*100))

In [None]:
# Final Model

In [None]:
# split data for final model
train_data,test_data = final_data.randomSplit([0.8,0.2])

In [None]:
# load model and specify parameters
gbt = GBTClassifier(labelCol='CardioIndex',featuresCol='indexedFeatures', maxIter = 20, maxDepth = 10, maxBins = 32)

In [None]:
# train model (approx 30 seconds)
gbt_model = gbt.fit(train_data)

In [None]:
gbt_predictions = gbt_model.transform(test_data)

In [None]:
# example of predictions made
gbt_predictions.show()

In [None]:
acc_evaluator = MulticlassClassificationEvaluator(labelCol="CardioIndex", predictionCol="prediction", metricName="accuracy")

In [None]:
gbt_acc = acc_evaluator.evaluate(gbt_predictions)

In [None]:
# prediction accuracy of final model
print('-'*40)
print('An ensemble using GBT has an accuracy of: {0:2.2f}%'.format(gbt_acc*100))

In [None]:
# test error of final model
accuracy = acc_evaluator.evaluate(gbt_predictions)
print("Test Error = %g" % (1.0 - accuracy))