In [1]:
#imports
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer

In [6]:
df = spark.read.format("csv").load("/PATH_TO_YOUR_FILE/insuranceCharges.csv", header = True,inferSchema = True)
df.show()


+---+------+------+--------+------+---------+-----------+
|age|   sex|   bmi|children|smoker|   region|    charges|
+---+------+------+--------+------+---------+-----------+
| 19|female|  27.9|       0|   yes|southwest|  16884.924|
| 18|  male| 33.77|       1|    no|southeast|  1725.5523|
| 28|  male|  33.0|       3|    no|southeast|   4449.462|
| 33|  male|22.705|       0|    no|northwest|21984.47061|
| 32|  male| 28.88|       0|    no|northwest|  3866.8552|
| 31|female| 25.74|       0|    no|southeast|  3756.6216|
| 46|female| 33.44|       1|    no|southeast|  8240.5896|
| 37|female| 27.74|       3|    no|northwest|  7281.5056|
| 37|  male| 29.83|       2|    no|northeast|  6406.4107|
| 60|female| 25.84|       0|    no|northwest|28923.13692|
| 25|  male| 26.22|       0|    no|northeast|  2721.3208|
| 62|female| 26.29|       0|   yes|southeast| 27808.7251|
| 23|  male|  34.4|       0|    no|southwest|   1826.843|
| 56|female| 39.82|       0|    no|southeast| 11090.7178|
| 27|  male| 4

In [8]:
df.count()

1338

In [9]:
df.describe()

DataFrame[summary: string, age: string, sex: string, bmi: string, children: string, smoker: string, region: string, charges: string]

In [10]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- sex: string (nullable = true)
 |-- bmi: double (nullable = true)
 |-- children: integer (nullable = true)
 |-- smoker: string (nullable = true)
 |-- region: string (nullable = true)
 |-- charges: double (nullable = true)



In [17]:
from pyspark.ml.classification import LogisticRegression

In [14]:

#Select feature columns and convert them to numerical format 


assembler = VectorAssembler(
    inputCols=["age","bmi", "charges"],
    outputCol="features")
output = assembler.transform(df)
output.select('features', 'smoker').show()

+--------------------+------+
|            features|smoker|
+--------------------+------+
|[19.0,27.9,16884....|   yes|
|[18.0,33.77,1725....|    no|
|[28.0,33.0,4449.462]|    no|
|[33.0,22.705,2198...|    no|
|[32.0,28.88,3866....|    no|
|[31.0,25.74,3756....|    no|
|[46.0,33.44,8240....|    no|
|[37.0,27.74,7281....|    no|
|[37.0,29.83,6406....|    no|
|[60.0,25.84,28923...|    no|
|[25.0,26.22,2721....|    no|
|[62.0,26.29,27808...|   yes|
|[23.0,34.4,1826.843]|    no|
|[56.0,39.82,11090...|    no|
|[27.0,42.13,39611...|   yes|
|[19.0,24.6,1837.237]|    no|
|[52.0,30.78,10797...|    no|
|[23.0,23.845,2395...|    no|
|[56.0,40.3,10602....|    no|
|[30.0,35.3,36837....|   yes|
+--------------------+------+
only showing top 20 rows



In [19]:
#split train and test data
train, test = output.randomSplit([0.7,0.3])

In [20]:
output.show(5)

+---+------+------+--------+------+---------+-----------+--------------------+
|age|   sex|   bmi|children|smoker|   region|    charges|            features|
+---+------+------+--------+------+---------+-----------+--------------------+
| 19|female|  27.9|       0|   yes|southwest|  16884.924|[19.0,27.9,16884....|
| 18|  male| 33.77|       1|    no|southeast|  1725.5523|[18.0,33.77,1725....|
| 28|  male|  33.0|       3|    no|southeast|   4449.462|[28.0,33.0,4449.462]|
| 33|  male|22.705|       0|    no|northwest|21984.47061|[33.0,22.705,2198...|
| 32|  male| 28.88|       0|    no|northwest|  3866.8552|[32.0,28.88,3866....|
+---+------+------+--------+------+---------+-----------+--------------------+
only showing top 5 rows



In [21]:
features_labelDf = output.select("smoker", "features")

In [40]:
#change smoker column to vector
indexer = StringIndexer(inputCol="smoker", outputCol="label")
indexed = indexer.fit(features_labelDf).transform(features_labelDf)
indexed.show()

+------+--------------------+-----+
|smoker|            features|label|
+------+--------------------+-----+
|   yes|[19.0,27.9,16884....|  1.0|
|    no|[18.0,33.77,1725....|  0.0|
|    no|[28.0,33.0,4449.462]|  0.0|
|    no|[33.0,22.705,2198...|  0.0|
|    no|[32.0,28.88,3866....|  0.0|
|    no|[31.0,25.74,3756....|  0.0|
|    no|[46.0,33.44,8240....|  0.0|
|    no|[37.0,27.74,7281....|  0.0|
|    no|[37.0,29.83,6406....|  0.0|
|    no|[60.0,25.84,28923...|  0.0|
|    no|[25.0,26.22,2721....|  0.0|
|   yes|[62.0,26.29,27808...|  1.0|
|    no|[23.0,34.4,1826.843]|  0.0|
|    no|[56.0,39.82,11090...|  0.0|
|   yes|[27.0,42.13,39611...|  1.0|
|    no|[19.0,24.6,1837.237]|  0.0|
|    no|[52.0,30.78,10797...|  0.0|
|    no|[23.0,23.845,2395...|  0.0|
|    no|[56.0,40.3,10602....|  0.0|
|   yes|[30.0,35.3,36837....|  1.0|
+------+--------------------+-----+
only showing top 20 rows



In [41]:
#split train and test data
train, test = indexed.randomSplit([0.7,0.3])

In [42]:

#final model
final_model = LogisticRegression(featuresCol='features', labelCol='label', maxIter=10)

In [43]:
fit_final = final_model.fit(train)

In [44]:
#Evaluate predictions on test
predictions_and_labels = fit_final.evaluate(test)

In [45]:
predictions_and_labels.predictions.show(5)

+------+--------------------+-----+--------------------+--------------------+----------+
|smoker|            features|label|       rawPrediction|         probability|prediction|
+------+--------------------+-----+--------------------+--------------------+----------+
|    no|[18.0,22.99,1704....|  0.0|[4.71906598009089...|[0.99115541534159...|       0.0|
|    no|[18.0,23.21,1121....|  0.0|[4.95640124008155...|[0.99301097906187...|       0.0|
|    no|[18.0,23.32,1711....|  0.0|[4.76403521893022...|[0.99154104934873...|       0.0|
|    no|[18.0,23.75,1705....|  0.0|[4.82751272589899...|[0.99205718279791...|       0.0|
|    no|[18.0,26.18,2304....|  0.0|[4.96407518590030...|[0.99306403659012...|       0.0|
+------+--------------------+-----+--------------------+--------------------+----------+
only showing top 5 rows



In [47]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

my_eval =  BinaryClassificationEvaluator(labelCol="label")
my_final_roc = my_eval.evaluate(predictions_and_labels.predictions)

In [48]:
print my_final_roc

0.986049107143
