In [10]:
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql.types import IntegerType

In [2]:
spark = SparkSession.builder \
        .master('local[*]') \
        .appName('first_spark_application') \
        .getOrCreate() #if there is a active session it will get or create one

In [3]:
# loading data 

health = spark.read.csv('./data/train.csv', header=True, inferSchema=True)

In [4]:
# Indexing categorical data
indexer1 = StringIndexer(inputCol='Vehicle_Age',
outputCol='Vehicle_Age_idx')

indexer2 = StringIndexer(inputCol='Gender',
outputCol='Gender_idx')

indexer3 = StringIndexer(inputCol='Vehicle_Damage',
outputCol='Vehicle_Damage_idx')


# now we need to combine all the features in a single list
# we are doing this because pyspark.ml what all the features in a list

assembler = VectorAssembler(inputCols=['Age', 'Driving_License', 
                                       'Region_Code', 'Previously_Insured', 
                                       'Annual_Premium', 
                                       'Policy_Sales_Channel', 'Vintage', 
                                       'Vehicle_Age_idx', 'Gender_idx','Vehicle_Damage_idx'],
                            outputCol='features')

pipeline = Pipeline(stages=[indexer1, indexer2, indexer3, assembler])

In [44]:
# using pipeline to prepare data for models 
healthe = pipeline.fit(health).transform(health)

In [6]:
healthe.show(2)

+---+------+---+---------------+-----------+------------------+-----------+--------------+--------------+--------------------+-------+--------+---------------+----------+------------------+--------------------+
| id|Gender|Age|Driving_License|Region_Code|Previously_Insured|Vehicle_Age|Vehicle_Damage|Annual_Premium|Policy_Sales_Channel|Vintage|Response|Vehicle_Age_idx|Gender_idx|Vehicle_Damage_idx|            features|
+---+------+---+---------------+-----------+------------------+-----------+--------------+--------------+--------------------+-------+--------+---------------+----------+------------------+--------------------+
|  1|  Male| 44|              1|       28.0|                 0|  > 2 Years|           Yes|       40454.0|                26.0|    217|       1|            2.0|       0.0|               0.0|[44.0,1.0,28.0,0....|
|  2|  Male| 76|              1|        3.0|                 0|   1-2 Year|            No|       33536.0|                26.0|    183|       0|            0

# preparing data for model fitting

- We are selecting the features column and response column to get that data needed for predictions

- we need to change the output column name to label for the models to work right in spark


In [46]:
# making the column names as pyspark wants
health_ml = healthe.select(['features','Response']).withColumnRenamed('Response', 'label')

In [20]:
health_ml.show(2)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[44.0,1.0,28.0,0....|    1|
|[76.0,1.0,3.0,0.0...|    0|
+--------------------+-----+
only showing top 2 rows



In [38]:
health_ml.groupby('label').count().show()

+-----+------+
|label| count|
+-----+------+
|    1| 46710|
|    0|334399|
+-----+------+



As you can see we have more values corrospoding to 0 class, so we need to do something to tackel this problem

In [21]:
sScaler = StandardScaler(
    withMean=True, withStd=True, inputCol="features", outputCol="features_scc"
        )

In [22]:
# using standard scaler to scale data 
health_ml = sScaler.fit(health_ml).transform(health_ml)

In [23]:
health_ml.show(5)

+--------------------+-----+--------------------+
|            features|label|        features_scc|
+--------------------+-----+--------------------+
|[44.0,1.0,28.0,0....|    1|[0.33377683521258...|
|[76.0,1.0,3.0,0.0...|    0|[2.39674759709345...|
|[47.0,1.0,28.0,0....|    1|[0.52718034413891...|
|[21.0,1.0,11.0,1....|    0|[-1.1489833998892...|
|[29.0,1.0,41.0,1....|    0|[-0.6332407094190...|
+--------------------+-----+--------------------+
only showing top 5 rows



In [24]:
health_ml = health_ml.drop('features').withColumnRenamed('features_scc', 'features')
health_ml.show(5)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|    1|[0.33377683521258...|
|    0|[2.39674759709345...|
|    1|[0.52718034413891...|
|    0|[-1.1489833998892...|
|    0|[-0.6332407094190...|
+-----+--------------------+
only showing top 5 rows



In [25]:
# Specify a seed for reproducibility
# we are spliting the data in test train

health_train, health_test = health_ml.randomSplit([0.8, 0.2], seed= 23 )

In [26]:
print(health_train.count(), health_test.count())

304788 76321


In [27]:
from pyspark.ml.classification import LogisticRegression

In [28]:
model = LogisticRegression(featuresCol='features', labelCol='label')

In [29]:
logistic = model.fit(health_train)

In [30]:
predection = logistic.transform(health_test)

In [31]:
predection.show(2)

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|    0|[-1.2134512361980...|[1.21186143675719...|[0.77062814310638...|       0.0|
|    0|[-1.2134512361980...|[3.59225988362292...|[0.97320188200154...|       0.0|
+-----+--------------------+--------------------+--------------------+----------+
only showing top 2 rows



In [33]:
predection.groupBy("label", "prediction").count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 9291|
|    0|       0.0|67030|
+-----+----------+-----+



In [40]:
# Import the Decision Tree Classifier class
from pyspark.ml.classification import DecisionTreeClassifier

# Create a classifier object and fit to the training data
tree = DecisionTreeClassifier()
tree_model = tree.fit(health_train)

# Create predictions for the testing data and take a look at the predictions
prediction = tree_model.transform(health_test)
prediction.select('label', 'prediction', 'probability').show(5, False)

+-----+----------+----------------------------------------+
|label|prediction|probability                             |
+-----+----------+----------------------------------------+
|0    |0.0       |[0.8772294184810425,0.12277058151895744]|
|0    |0.0       |[0.8772294184810425,0.12277058151895744]|
|0    |0.0       |[0.8772294184810425,0.12277058151895744]|
|0    |0.0       |[0.8772294184810425,0.12277058151895744]|
|0    |0.0       |[0.8772294184810425,0.12277058151895744]|
+-----+----------+----------------------------------------+
only showing top 5 rows



In [41]:
prediction.groupBy("label", "prediction").count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 9291|
|    0|       0.0|67030|
+-----+----------+-----+



In [43]:
tree_model.explainParam

<bound method Params.explainParam of DecisionTreeClassificationModel: uid=DecisionTreeClassifier_44a355ac5c07, depth=0, numNodes=1, numClasses=2, numFeatures=10>