In [5]:
from __future__ import print_function
import findspark
findspark.init()
import pyspark

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StandardScaler
from pyspark.mllib.evaluation import BinaryClassificationMetrics

In [6]:
if __name__ == "__main__":
    spark = SparkSession.builder.appName('PySparkLogisticRegression').getOrCreate()

In [10]:
dataset = spark.read.csv('diabetes.csv', header=True)

In [12]:
dataset.show(5)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
only showing top 5 rows



In [13]:
dataset.printSchema() # All values are in string format.

root
 |-- Pregnancies: string (nullable = true)
 |-- Glucose: string (nullable = true)
 |-- BloodPressure: string (nullable = true)
 |-- SkinThickness: string (nullable = true)
 |-- Insulin: string (nullable = true)
 |-- BMI: string (nullable = true)
 |-- DiabetesPedigreeFunction: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Outcome: string (nullable = true)



In [14]:
from pyspark.sql.functions import col
new_data = dataset.select(*(col(c).cast('float').alias(c) for c in dataset.columns))

In [17]:
new_data.printSchema()# All values converted from String to float.

root
 |-- Pregnancies: float (nullable = true)
 |-- Glucose: float (nullable = true)
 |-- BloodPressure: float (nullable = true)
 |-- SkinThickness: float (nullable = true)
 |-- Insulin: float (nullable = true)
 |-- BMI: float (nullable = true)
 |-- DiabetesPedigreeFunction: float (nullable = true)
 |-- Age: float (nullable = true)
 |-- Outcome: float (nullable = true)



In [37]:
from pyspark.sql.functions import count, col, isnan, when

new_data.select([count(when(col(c).isNull(), c)).alias(c) for c in new_data.columns]).show()

+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin|BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+
|          0|      0|            0|            0|      0|  0|                       0|  0|      0|
+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+



No Null Values to deal with Imputer

In [47]:
cols = new_data.columns
cols.remove("Outcome")
assembler = VectorAssembler(inputCols=cols, outputCol='features')

In [49]:
# Transforming the Dataset:

data = assembler.transform(new_data)
data.select('features', 'Outcome').show(truncate=False)

+-----------------------------------------------------------------------+-------+
|features                                                               |Outcome|
+-----------------------------------------------------------------------+-------+
|[6.0,148.0,72.0,35.0,0.0,33.599998474121094,0.6269999742507935,50.0]   |1.0    |
|[1.0,85.0,66.0,29.0,0.0,26.600000381469727,0.35100001096725464,31.0]   |0.0    |
|[8.0,183.0,64.0,0.0,0.0,23.299999237060547,0.671999990940094,32.0]     |1.0    |
|[1.0,89.0,66.0,23.0,94.0,28.100000381469727,0.16699999570846558,21.0]  |0.0    |
|[0.0,137.0,40.0,35.0,168.0,43.099998474121094,2.2880001068115234,33.0] |1.0    |
|[5.0,116.0,74.0,0.0,0.0,25.600000381469727,0.20100000500679016,30.0]   |0.0    |
|[3.0,78.0,50.0,32.0,88.0,31.0,0.24799999594688416,26.0]                |1.0    |
|[10.0,115.0,0.0,0.0,0.0,35.29999923706055,0.1340000033378601,29.0]     |0.0    |
|[2.0,197.0,70.0,45.0,543.0,30.5,0.15800000727176666,53.0]              |1.0    |
|[8.0,125.0,96.0

In [55]:
standardscaler = StandardScaler().setInputCol('features').setOutputCol('Scaled_features')
data = standardscaler.fit(data).transform(data)

In [57]:
assembled_data = data.select('Scaled_features', 'Outcome')
assembled_data.show()

+--------------------+-------+
|     Scaled_features|Outcome|
+--------------------+-------+
|[1.78063837321943...|    1.0|
|[0.29677306220323...|    0.0|
|[2.37418449762590...|    1.0|
|[0.29677306220323...|    0.0|
|[0.0,4.2849165233...|    1.0|
|[1.48386531101619...|    0.0|
|[0.89031918660971...|    1.0|
|[2.96773062203238...|    0.0|
|[0.59354612440647...|    1.0|
|[2.37418449762590...|    1.0|
|[1.18709224881295...|    0.0|
|[2.96773062203238...|    1.0|
|[2.96773062203238...|    0.0|
|[0.29677306220323...|    1.0|
|[1.48386531101619...|    1.0|
|[2.07741143542266...|    1.0|
|[0.0,3.6906580274...|    1.0|
|[2.07741143542266...|    1.0|
|[0.29677306220323...|    0.0|
|[0.29677306220323...|    1.0|
+--------------------+-------+
only showing top 20 rows



In [58]:
# Splitting training and testing data 70:30

train, test = assembled_data.randomSplit([0.7, 0.3])

In [61]:
print(train)
print(test)

DataFrame[Scaled_features: vector, Outcome: float]
DataFrame[Scaled_features: vector, Outcome: float]


In [62]:
log_reg = LogisticRegression(labelCol='Outcome', featuresCol='Scaled_features', maxIter=10)
model = log_reg.fit(train)

In [63]:
model

LogisticRegressionModel: uid=LogisticRegression_49b97e75ac8f, numClasses=2, numFeatures=8

In [70]:
prediction_test = model.transform(test)
prediction_test.select("Outcome", "prediction").show(10)

+-------+----------+
|Outcome|prediction|
+-------+----------+
|    0.0|       0.0|
|    1.0|       0.0|
|    0.0|       0.0|
|    1.0|       0.0|
|    1.0|       1.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
+-------+----------+
only showing top 10 rows



In [71]:
# Computing row scores on the test set:

predictionAndLabels = prediction_test.select('Outcome', 'prediction').rdd.map(lambda row: row[0:])

In [76]:
metrics = BinaryClassificationMetrics(predictionAndLabels)



In [77]:
print("Area under ROC = %s" % metrics.areaUnderROC)

Area under ROC = 0.7625902992776058


In [83]:
spark.stop()