# CLASSIFICATION MODEL IN SPARK APACHE

In [1]:
import os
import sys
 
os.environ["SPARK_HOME"] = "/usr/hdp/current/spark2-client"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/local/anaconda/bin/python" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/local/anaconda/bin/python"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.4-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [2]:
from pyspark.sql import SparkSession
spark=SparkSession\
.builder\
.getOrCreate()

In [3]:
df = spark.read.csv('data/insurance_data.csv',
                       header=True,inferSchema=True)

In [8]:
df.printSchema()

root
 |-- smoker_encoded: double (nullable = true)
 |-- age: integer (nullable = true)
 |-- bmi: double (nullable = true)
 |-- children: integer (nullable = true)
 |-- sex_encoded: double (nullable = true)
 |-- charges: double (nullable = true)
 |-- region_encoded: double (nullable = true)



In [4]:
df.take(5)

[Row(age=19, sex='female', bmi=27.9, children=0, smoker='yes', region='southwest', charges=16884.924),
 Row(age=18, sex='male', bmi=33.77, children=1, smoker='no', region='southeast', charges=1725.5523),
 Row(age=28, sex='male', bmi=33.0, children=3, smoker='no', region='southeast', charges=4449.462),
 Row(age=33, sex='male', bmi=22.705, children=0, smoker='no', region='northwest', charges=21984.47061),
 Row(age=32, sex='male', bmi=28.88, children=0, smoker='no', region='northwest', charges=3866.8552)]

In [5]:
for col in df.columns:
    print("no. of cells in column", col, "with null values:",
          df.filter(df[col].isNull()).count())

no. of cells in column age with null values: 0
no. of cells in column sex with null values: 0
no. of cells in column bmi with null values: 0
no. of cells in column children with null values: 0
no. of cells in column smoker with null values: 0
no. of cells in column region with null values: 0
no. of cells in column charges with null values: 0


In [6]:
#Label encoder
from pyspark.ml.feature import StringIndexer
indexed = df
for col in ["sex","smoker","region"]:
    stringIndexer = StringIndexer(inputCol=col, outputCol=col+"_encoded")
    model = stringIndexer.fit(indexed)
    indexed = model.transform(indexed)
indexed.show(3)

+---+------+-----+--------+------+---------+---------+-----------+--------------+--------------+
|age|   sex|  bmi|children|smoker|   region|  charges|sex_encoded|smoker_encoded|region_encoded|
+---+------+-----+--------+------+---------+---------+-----------+--------------+--------------+
| 19|female| 27.9|       0|   yes|southwest|16884.924|        1.0|           1.0|           1.0|
| 18|  male|33.77|       1|    no|southeast|1725.5523|        0.0|           0.0|           0.0|
| 28|  male| 33.0|       3|    no|southeast| 4449.462|        0.0|           0.0|           0.0|
+---+------+-----+--------+------+---------+---------+-----------+--------------+--------------+
only showing top 3 rows



In [7]:
df = indexed.select(indexed.smoker_encoded,indexed.age, indexed.bmi, indexed.children, 
                    indexed.sex_encoded,indexed.charges,indexed.region_encoded)
df.show(3)

+--------------+---+-----+--------+-----------+---------+--------------+
|smoker_encoded|age|  bmi|children|sex_encoded|  charges|region_encoded|
+--------------+---+-----+--------+-----------+---------+--------------+
|           1.0| 19| 27.9|       0|        1.0|16884.924|           1.0|
|           0.0| 18|33.77|       1|        0.0|1725.5523|           0.0|
|           0.0| 28| 33.0|       3|        0.0| 4449.462|           0.0|
+--------------+---+-----+--------+-----------+---------+--------------+
only showing top 3 rows



In [9]:
# Split the data into train and test sets
featurecols = df.columns[1:]

from pyspark.ml.feature import VectorAssembler, StandardScaler
assembler = VectorAssembler(inputCols=featurecols, 
                            outputCol="features")
df_feature_vec=assembler.transform(df)

In [10]:
# Split the data into train and test sets
train_data, test_data = df_feature_vec.randomSplit([.8,.2],seed=1)

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=True)
scalerModel = scaler.fit(train_data)
scaledData = scalerModel.transform(train_data)
scaledData_test = scalerModel.transform(test_data)
scaledData.select("smoker_encoded","scaledFeatures").take(3)

[Row(smoker_encoded=0.0, scaledFeatures=DenseVector([-1.4914, -2.369, -0.8953, -0.9948, -0.9403, 1.3722])),
 Row(smoker_encoded=0.0, scaledFeatures=DenseVector([-1.4914, -1.5905, -0.8953, 1.0043, -0.9476, -1.2851])),
 Row(smoker_encoded=0.0, scaledFeatures=DenseVector([-1.4914, -1.4809, -0.8953, -0.9948, -0.9397, 1.3722]))]

In [11]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.mllib.util import MLUtils
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [None]:
#LOGISTIC REGRESSION MODEL

In [17]:
# Create ParamGrid for Cross Validation
from pyspark.ml.classification import LogisticRegression
evaluator=BinaryClassificationEvaluator(rawPredictionCol="scaledFeatures",labelCol="smoker_encoded")
lr = LogisticRegression(labelCol="smoker_encoded", featuresCol="scaledFeatures",maxIter=10)
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = ParamGridBuilder()\
    .addGrid(lr.aggregationDepth,[2,5,10])\
    .addGrid(lr.elasticNetParam,[0.0, 0.5, 1.0])\
    .addGrid(lr.fitIntercept,[False, True])\
    .addGrid(lr.maxIter,[10, 100, 1000])\
    .addGrid(lr.regParam,[0.01, 0.5, 2.0]) \
    .build()

In [18]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=10)
# Run cross validations
cvModel = cv.fit(scaledData)
predict_train=cvModel.transform(scaledData)
predict_test=cvModel.transform(scaledData_test)
print("The area under ROC for train set after CV  is {}".format(evaluator.evaluate(predict_train)))
print("The area under ROC for test set after CV  is {}".format(evaluator.evaluate(predict_test)))

The area under ROC for train set after CV  is 0.511649788880656
The area under ROC for test set after CV  is 0.47285409073771545


In [21]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator=BinaryClassificationEvaluator(rawPredictionCol="scaledFeatures",\
                                        labelCol="smoker_encoded")
predict_test.select("smoker_encoded","rawPrediction","prediction",\
                    "probability").show(5)


+--------------+--------------------+----------+--------------------+
|smoker_encoded|       rawPrediction|prediction|         probability|
+--------------+--------------------+----------+--------------------+
|           0.0|[1.20585478779220...|       0.0|[0.76956467984901...|
|           0.0|[1.26482148171884...|       0.0|[0.77985498230114...|
|           0.0|[1.29860471228702...|       0.0|[0.78560006475281...|
|           0.0|[1.12134448782214...|       0.0|[0.75423801996374...|
|           0.0|[1.17427716603809...|       0.0|[0.76391726324050...|
+--------------+--------------------+----------+--------------------+
only showing top 5 rows



In [23]:
evaluator = MulticlassClassificationEvaluator(
        labelCol="smoker_encoded", predictionCol="prediction", metricName="accuracy")
accuracy=evaluator.evaluate(predict_test)
print(accuracy)
evaluator = MulticlassClassificationEvaluator(
        labelCol="smoker_encoded", predictionCol="prediction", metricName="weightedRecall")
recall=evaluator.evaluate(predict_test)
print(recall)
evaluator = MulticlassClassificationEvaluator(
        labelCol="smoker_encoded", predictionCol="prediction", metricName="weightedPrecision")
precision=evaluator.evaluate(predict_test)
print(precision)

0.9209621993127147
0.9209621993127147
0.9398983390607102


In [30]:
predict_test.select(['smoker_encoded','prediction', 'probability']).show(5)

+--------------+----------+--------------------+
|smoker_encoded|prediction|         probability|
+--------------+----------+--------------------+
|           0.0|       0.0|[0.76956467984901...|
|           0.0|       0.0|[0.77985498230114...|
|           0.0|       0.0|[0.78560006475281...|
|           0.0|       0.0|[0.75423801996374...|
|           0.0|       0.0|[0.76391726324050...|
+--------------+----------+--------------------+
only showing top 5 rows



In [28]:
predict_test.select('smoker_encoded','prediction').\
groupby('smoker_encoded','prediction').count().\
sort('prediction').sort('smoker_encoded').show()

+--------------+----------+-----+
|smoker_encoded|prediction|count|
+--------------+----------+-----+
|           0.0|       0.0|  195|
|           0.0|       1.0|   23|
|           1.0|       1.0|   73|
+--------------+----------+-----+



In [None]:
#RANDOM FOREST

In [31]:
# Create ParamGrid for Cross Validation
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

rf = RandomForestClassifier(labelCol="smoker_encoded", featuresCol="scaledFeatures")

paramGrid = (ParamGridBuilder()\
             .addGrid(rf.maxDepth, [2, 6])\
             .addGrid(rf.numTrees, [5, 20])\
             .build())

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

cvModel = cv.fit(scaledData)

In [32]:
# Use test set here so we can measure the accuracy of our model on new data
predictions_rf = cvModel.transform(scaledData_test)
evaluator.evaluate(predictions_rf)

0.966692651537049

In [33]:
evaluator = MulticlassClassificationEvaluator(
        labelCol="smoker_encoded", predictionCol="prediction", metricName="accuracy")
accuracy=evaluator.evaluate(predictions_rf)
print(accuracy)
evaluator = MulticlassClassificationEvaluator(
        labelCol="smoker_encoded", predictionCol="prediction", metricName="weightedRecall")
recall=evaluator.evaluate(predictions_rf)
print(recall)
evaluator = MulticlassClassificationEvaluator(
        labelCol="smoker_encoded", predictionCol="prediction", metricName="weightedPrecision")
precision=evaluator.evaluate(predictions_rf)
print(precision)

0.9656357388316151
0.9656357388316151
0.966692651537049


In [34]:
predictions_rf.select(['smoker_encoded','prediction', 'probability']).show(5)

+--------------+----------+--------------------+
|smoker_encoded|prediction|         probability|
+--------------+----------+--------------------+
|           0.0|       0.0|       [0.992,0.008]|
|           0.0|       0.0|[0.99930232558139...|
|           0.0|       0.0|[0.98076923076923...|
|           0.0|       0.0|           [1.0,0.0]|
|           0.0|       0.0|           [1.0,0.0]|
+--------------+----------+--------------------+
only showing top 5 rows



In [35]:
predictions_rf.select('smoker_encoded','prediction').\
groupby('smoker_encoded','prediction').count().\
sort('prediction').sort('smoker_encoded').show()

+--------------+----------+-----+
|smoker_encoded|prediction|count|
+--------------+----------+-----+
|           0.0|       1.0|    7|
|           0.0|       0.0|  211|
|           1.0|       1.0|   70|
|           1.0|       0.0|    3|
+--------------+----------+-----+



In [36]:
spark.stop()