In [1]:
# import SparkSession
from pyspark.sql import SparkSession

In [2]:
# initaiate spark Session by which we shall perform Analysis.
spark = SparkSession.builder.appName("PySparkApp").getOrCreate()

In [3]:
spark

In [4]:
#read csv file
df = spark.read.csv("dataR2.csv", inferSchema=True, header=True)

In [5]:
# count number of rows
df.count()

116

In [6]:
#b show the first five rows in the table
df.show(5)

+---+-----------+-------+-------+-----------+-------+-----------+--------+-------+--------------+
|Age|        BMI|Glucose|Insulin|       HOMA| Leptin|Adiponectin|Resistin|  MCP.1|Classification|
+---+-----------+-------+-------+-----------+-------+-----------+--------+-------+--------------+
| 48|       23.5|     70|  2.707|0.467408667| 8.8071|     9.7024| 7.99585|417.114|             1|
| 83|20.69049454|     92|  3.115|0.706897333| 8.8438|   5.429285| 4.06405|468.786|             1|
| 82|23.12467037|     91|  4.498|1.009651067|17.9393|   22.43204| 9.27715|554.697|             1|
| 68|21.36752137|     77|  3.226|0.612724933| 9.8827|    7.16956|  12.766| 928.22|             1|
| 86|21.11111111|     92|  3.549|  0.8053864| 6.6994|    4.81924|10.57635| 773.92|             1|
+---+-----------+-------+-------+-----------+-------+-----------+--------+-------+--------------+
only showing top 5 rows



In [7]:
# select a column
df.select("Glucose").show(3)

+-------+
|Glucose|
+-------+
|     70|
|     92|
|     91|
+-------+
only showing top 3 rows



In [8]:
# select columns
df.select(["Age","BMI","Glucose"]).show(3)

+---+-----------+-------+
|Age|        BMI|Glucose|
+---+-----------+-------+
| 48|       23.5|     70|
| 83|20.69049454|     92|
| 82|23.12467037|     91|
+---+-----------+-------+
only showing top 3 rows



In [9]:
df.printSchema()

root
 |-- Age: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- Insulin: double (nullable = true)
 |-- HOMA: double (nullable = true)
 |-- Leptin: double (nullable = true)
 |-- Adiponectin: double (nullable = true)
 |-- Resistin: double (nullable = true)
 |-- MCP.1: double (nullable = true)
 |-- Classification: integer (nullable = true)



In [10]:
df.dtypes

[('Age', 'int'),
 ('BMI', 'double'),
 ('Glucose', 'int'),
 ('Insulin', 'double'),
 ('HOMA', 'double'),
 ('Leptin', 'double'),
 ('Adiponectin', 'double'),
 ('Resistin', 'double'),
 ('MCP.1', 'double'),
 ('Classification', 'int')]

In [11]:
df.select(["Age", "BMI"]).describe().show()

+-------+------------------+------------------+
|summary|               Age|               BMI|
+-------+------------------+------------------+
|  count|               116|               116|
|   mean| 57.30172413793103|27.582110827413807|
| stddev|16.112765572452282| 5.020135768568436|
|    min|                24|             18.37|
|    max|                89|       38.57875854|
+-------+------------------+------------------+



In [12]:
df.filter(df.Insulin.isNull()).show()

+---+---+-------+-------+----+------+-----------+--------+-----+--------------+
|Age|BMI|Glucose|Insulin|HOMA|Leptin|Adiponectin|Resistin|MCP.1|Classification|
+---+---+-------+-------+----+------+-----------+--------+-----+--------------+
+---+---+-------+-------+----+------+-----------+--------+-----+--------------+



In [13]:
print(df.columns)

['Age', 'BMI', 'Glucose', 'Insulin', 'HOMA', 'Leptin', 'Adiponectin', 'Resistin', 'MCP.1', 'Classification']


In [14]:
df = df.withColumnRenamed("MCP.1", "MCP")

In [15]:
features = ['Age', 'BMI', 'Glucose', 'Insulin', 'HOMA', 'Leptin', 'Adiponectin', 'Resistin', 'MCP.1']

In [16]:
# import the vector assembler
from pyspark.ml.feature import VectorAssembler
# define feature columns
features = ['Age', 'BMI', 'Glucose', 'Insulin', 'HOMA', 'Leptin', 'Adiponectin', 'Resistin', 'MCP']
# initiate the assembler
vec_assembler = VectorAssembler(inputCols=features, outputCol="Fvec")
# apply the vector assembler to the data
df=vec_assembler.transform(df)

In [17]:
df.show(5)

+---+-----------+-------+-------+-----------+-------+-----------+--------+-------+--------------+--------------------+
|Age|        BMI|Glucose|Insulin|       HOMA| Leptin|Adiponectin|Resistin|    MCP|Classification|                Fvec|
+---+-----------+-------+-------+-----------+-------+-----------+--------+-------+--------------+--------------------+
| 48|       23.5|     70|  2.707|0.467408667| 8.8071|     9.7024| 7.99585|417.114|             1|[48.0,23.5,70.0,2...|
| 83|20.69049454|     92|  3.115|0.706897333| 8.8438|   5.429285| 4.06405|468.786|             1|[83.0,20.69049454...|
| 82|23.12467037|     91|  4.498|1.009651067|17.9393|   22.43204| 9.27715|554.697|             1|[82.0,23.12467037...|
| 68|21.36752137|     77|  3.226|0.612724933| 9.8827|    7.16956|  12.766| 928.22|             1|[68.0,21.36752137...|
| 86|21.11111111|     92|  3.549|  0.8053864| 6.6994|    4.81924|10.57635| 773.92|             1|[86.0,21.11111111...|
+---+-----------+-------+-------+-----------+---

In [18]:
# Split the data into training and test sets (30% held out for testing)
trainData, testData = df.randomSplit([0.7, 0.3])

In [19]:
from pyspark.ml.classification import DecisionTreeClassifier

In [20]:
# train decision tree model
dt = DecisionTreeClassifier(labelCol="Classification", featuresCol="Fvec")
dt_model = dt.fit(trainData)
# make predictions
y_pred = dt_model.transform(testData)

In [21]:
print(y_pred.columns)

['Age', 'BMI', 'Glucose', 'Insulin', 'HOMA', 'Leptin', 'Adiponectin', 'Resistin', 'MCP', 'Classification', 'Fvec', 'rawPrediction', 'probability', 'prediction']


In [22]:
y_pred.select(['Classification', 'Fvec', 'rawPrediction', 'probability', 'prediction']).show()

+--------------+--------------------+--------------+---------------+----------+
|Classification|                Fvec| rawPrediction|    probability|prediction|
+--------------+--------------------+--------------+---------------+----------+
|             1|[24.0,18.67,88.0,...|[0.0,11.0,0.0]|  [0.0,1.0,0.0]|       1.0|
|             1|[25.0,22.86,82.0,...|[0.0,11.0,0.0]|  [0.0,1.0,0.0]|       1.0|
|             1|[28.0,35.85581466...|[0.0,11.0,0.0]|  [0.0,1.0,0.0]|       1.0|
|             2|[34.0,24.24242424...|[0.0,11.0,0.0]|  [0.0,1.0,0.0]|       1.0|
|             2|[38.0,22.4996371,...|[0.0,11.0,0.0]|  [0.0,1.0,0.0]|       1.0|
|             1|[38.0,23.34,75.0,...|[0.0,11.0,0.0]|  [0.0,1.0,0.0]|       1.0|
|             2|[40.0,27.63605442...| [0.0,0.0,3.0]|  [0.0,0.0,1.0]|       2.0|
|             2|[42.0,21.35991456...|[0.0,11.0,0.0]|  [0.0,1.0,0.0]|       1.0|
|             2|[42.0,29.296875,9...| [0.0,0.0,3.0]|  [0.0,0.0,1.0]|       2.0|
|             2|[43.0,31.25,103.0...| [0

In [23]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [24]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="Classification", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(y_pred)
print(accuracy)

0.6410256410256411


In [25]:
accuracy = evaluator.evaluate(y_pred)
print(accuracy)

0.6410256410256411


In [26]:
# print model summary
treeModel = dt_model.toDebugString
print(treeModel)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_ce0e83122e3f, depth=5, numNodes=27, numClasses=3, numFeatures=9
  If (feature 2 <= 96.5)
   If (feature 0 <= 44.5)
    Predict: 1.0
   Else (feature 0 > 44.5)
    If (feature 0 <= 59.5)
     If (feature 7 <= 10.475764999999999)
      If (feature 7 <= 8.124324999999999)
       Predict: 2.0
      Else (feature 7 > 8.124324999999999)
       Predict: 1.0
     Else (feature 7 > 10.475764999999999)
      Predict: 2.0
    Else (feature 0 > 59.5)
     If (feature 7 <= 13.858215000000001)
      If (feature 4 <= 0.5879497335)
       Predict: 2.0
      Else (feature 4 > 0.5879497335)
       Predict: 1.0
     Else (feature 7 > 13.858215000000001)
      If (feature 1 <= 30.288408305)
       Predict: 2.0
      Else (feature 1 > 30.288408305)
       Predict: 1.0
  Else (feature 2 > 96.5)
   If (feature 1 <= 36.651401815)
    If (feature 8 <= 217.0245)
     If (feature 2 <= 113.0)
      Predict: 1.0
     Else (feature 2 > 113.0)
      Predict

In [27]:
df.groupBy("Classification").count().show()

+--------------+-----+
|Classification|count|
+--------------+-----+
|             1|   52|
|             2|   64|
+--------------+-----+

