# 0. Load in dataset

In [0]:
# File location and type
file_location = "/FileStore/tables/train.csv"
file_type = "csv"

df =spark.read.csv(file_location,header=True,inferSchema=True)
df.show()

+---+------+---+---------------+-----------+------------------+-----------+--------------+--------------+--------------------+-------+--------+
| id|Gender|Age|Driving_License|Region_Code|Previously_Insured|Vehicle_Age|Vehicle_Damage|Annual_Premium|Policy_Sales_Channel|Vintage|Response|
+---+------+---+---------------+-----------+------------------+-----------+--------------+--------------+--------------------+-------+--------+
|  1|  Male| 44|              1|       28.0|                 0|  > 2 Years|           Yes|       40454.0|                26.0|    217|       1|
|  2|  Male| 76|              1|        3.0|                 0|   1-2 Year|            No|       33536.0|                26.0|    183|       0|
|  3|  Male| 47|              1|       28.0|                 0|  > 2 Years|           Yes|       38294.0|                26.0|     27|       1|
|  4|  Male| 21|              1|       11.0|                 1|   < 1 Year|            No|       28619.0|               152.0|    203|  

In [0]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Driving_License: integer (nullable = true)
 |-- Region_Code: double (nullable = true)
 |-- Previously_Insured: integer (nullable = true)
 |-- Vehicle_Age: string (nullable = true)
 |-- Vehicle_Damage: string (nullable = true)
 |-- Annual_Premium: double (nullable = true)
 |-- Policy_Sales_Channel: double (nullable = true)
 |-- Vintage: integer (nullable = true)
 |-- Response: integer (nullable = true)



In [0]:
print((df.count(), len(df.columns)))

(381109, 12)


# 1. Data preprocessing

## 1.1 Ordinal encoding

In [0]:
# Handle categorical features with Ordinal Encoder
from pyspark.ml.feature import StringIndexer

indexer=StringIndexer(inputCols=['Gender','Vehicle_Damage'],
                      outputCols=['Gender_indexed', 'Vehicle_Damage_indexed'])
df_r=indexer.fit(df).transform(df)
df_r.show()  

+---+------+---+---------------+-----------+------------------+-----------+--------------+--------------+--------------------+-------+--------+--------------+----------------------+
| id|Gender|Age|Driving_License|Region_Code|Previously_Insured|Vehicle_Age|Vehicle_Damage|Annual_Premium|Policy_Sales_Channel|Vintage|Response|Gender_indexed|Vehicle_Damage_indexed|
+---+------+---+---------------+-----------+------------------+-----------+--------------+--------------+--------------------+-------+--------+--------------+----------------------+
|  1|  Male| 44|              1|       28.0|                 0|  > 2 Years|           Yes|       40454.0|                26.0|    217|       1|           0.0|                   0.0|
|  2|  Male| 76|              1|        3.0|                 0|   1-2 Year|            No|       33536.0|                26.0|    183|       0|           0.0|                   1.0|
|  3|  Male| 47|              1|       28.0|                 0|  > 2 Years|           Yes|

## 1.2 One hot encoding

In [0]:
# Get the 3 distinct values of the column: Vehicle_Age
distinct_values = df.select("Vehicle_Age")\
                    .distinct()\
                    .rdd\
                    .flatMap(lambda x: x).collect()
distinct_values

Out[5]: ['> 2 Years', '< 1 Year', '1-2 Year']

In [0]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType

for distinct_value in distinct_values:
    function = udf(lambda item: 
                   1 if item == distinct_value else 0, 
                   IntegerType())
    new_column_name = "Vehicle_Age"+'_'+distinct_value
    df_r = df_r.withColumn(new_column_name, function(col("Vehicle_Age")))
df_r.show()

+---+------+---+---------------+-----------+------------------+-----------+--------------+--------------+--------------------+-------+--------+--------------+----------------------+---------------------+--------------------+--------------------+
| id|Gender|Age|Driving_License|Region_Code|Previously_Insured|Vehicle_Age|Vehicle_Damage|Annual_Premium|Policy_Sales_Channel|Vintage|Response|Gender_indexed|Vehicle_Damage_indexed|Vehicle_Age_> 2 Years|Vehicle_Age_< 1 Year|Vehicle_Age_1-2 Year|
+---+------+---+---------------+-----------+------------------+-----------+--------------+--------------+--------------------+-------+--------+--------------+----------------------+---------------------+--------------------+--------------------+
|  1|  Male| 44|              1|       28.0|                 0|  > 2 Years|           Yes|       40454.0|                26.0|    217|       1|           0.0|                   0.0|                    1|                   0|                   0|
|  2|  Male| 76|

## 1.3 Rename the columns and select the features we need

In [0]:
df_r=df_r.withColumnRenamed('Vehicle_Age_< 1 Year', 'Vehicle_Age_lt_1_Year')
df_r=df_r.withColumnRenamed('Vehicle_Age_> 2 Years', 'Vehicle_Age_gt_2_Years')

df_r=df_r.select(['Gender_indexed','Age','Driving_License','Region_Code','Previously_Insured','Vehicle_Age_gt_2_Years','Vehicle_Age_lt_1_Year','Vehicle_Damage_indexed','Annual_Premium','Policy_Sales_Channel','Vintage','Response'])
df_r.show()

+--------------+---+---------------+-----------+------------------+----------------------+---------------------+----------------------+--------------+--------------------+-------+--------+
|Gender_indexed|Age|Driving_License|Region_Code|Previously_Insured|Vehicle_Age_gt_2_Years|Vehicle_Age_lt_1_Year|Vehicle_Damage_indexed|Annual_Premium|Policy_Sales_Channel|Vintage|Response|
+--------------+---+---------------+-----------+------------------+----------------------+---------------------+----------------------+--------------+--------------------+-------+--------+
|           0.0| 44|              1|       28.0|                 0|                     1|                    0|                   0.0|       40454.0|                26.0|    217|       1|
|           0.0| 76|              1|        3.0|                 0|                     0|                    0|                   1.0|       33536.0|                26.0|    183|       0|
|           0.0| 47|              1|       28.0|       

## 1.4 Train test split

In [0]:
train_data,test_data=df_r.randomSplit([0.75,0.25])

print(f'The training dataset has {train_data.count()} rows.')
print(f'The testing dataset has {test_data.count()} rows.')

The training dataset has 285901 rows.
The testing dataset has 95208 rows.


## 1.5 Rescale numerical features using MinMaxScaler

In [0]:
from pyspark.ml.feature import VectorAssembler,MinMaxScaler

columns_to_scale = ['Age','Annual_Premium','Vintage']
assembler=VectorAssembler(inputCols=columns_to_scale,outputCol='mm_features')

temp_train=assembler.transform(train_data)
temp_test=assembler.transform(test_data)

mm_scaler=MinMaxScaler(inputCol='mm_features',outputCol='scaled')
mm_scaler=mm_scaler.fit(temp_train)

scaled_train_data=mm_scaler.transform(temp_train)
scaled_test_data=mm_scaler.transform(temp_test)

In [0]:
# Examine the min age in the training dataset
scaled_train_data.agg({'Age':'min'}).show()

+--------+
|min(Age)|
+--------+
|      20|
+--------+



## 1.6 Adjust the imbalanced dataset by down-sampling

In [0]:
train_0=scaled_train_data.filter(scaled_train_data['Response']==0)
train_1=scaled_train_data.filter(scaled_train_data['Response']==1)
print((train_0.count(), train_1.count()))

(267622, 37215)


In [0]:
# Sample the class 0 training dataset to make it the same length as class 1
sampled_train_0=train_0.sample(fraction=0.2,seed=1)
sampled_train_0.count()

Out[49]: 53373

In [0]:
# Concatenate the sampled class 0 dataset with the orginal class 1 dataset
df_train=sampled_train_0.unionAll(train_1)
df_train.count()

Out[50]: 90588

## 1.7 Assemble the independent features

In [0]:
from pyspark.ml.feature import VectorAssembler

featureAssembler=VectorAssembler(
               inputCols=['Driving_License','Region_Code','Previously_Insured','Policy_Sales_Channel','Gender_indexed','Vehicle_Age_gt_2_Years','Vehicle_Age_lt_1_Year','Vehicle_Damage_indexed','scaled'],
               outputCol='Independent Features')

# Assemble the training dataset
output=featureAssembler.transform(df_train)
output.select('Independent Features').show(truncate=False)

+-------------------------------------------------------------------------------+
|Independent Features                                                           |
+-------------------------------------------------------------------------------+
|(11,[0,1,3,6,9,10],[1.0,2.0,160.0,1.0,0.03388988623996577,0.41868512110726647])|
|(11,[0,1,3,6,9,10],[1.0,2.0,160.0,1.0,0.04368645762601505,0.9826989619377163]) |
|[1.0,2.0,1.0,160.0,0.0,0.0,1.0,0.0,0.0,0.04812151766861693,0.7820069204152249] |
|[1.0,2.0,1.0,160.0,0.0,0.0,1.0,1.0,0.0,0.0415749672114374,0.3044982698961938]  |
|(11,[0,1,3,6,9,10],[1.0,3.0,160.0,1.0,0.07359706809789131,0.1384083044982699]) |
|[1.0,3.0,0.0,160.0,0.0,0.0,1.0,1.0,0.0,0.05342907903671389,0.5294117647058824] |
|[1.0,3.0,1.0,160.0,0.0,0.0,1.0,1.0,0.0,0.044573841703330945,0.3806228373702422]|
|[1.0,3.0,1.0,160.0,0.0,0.0,1.0,1.0,0.0,0.06924572353428149,0.8166089965397925] |
|(11,[0,1,3,10],[1.0,8.0,157.0,0.4844290657439447])                             |
|(11,[0,1,3,6,9,

In [0]:
# Assemble the test dataset
output_test=featureAssembler.transform(scaled_test_data)
output_test.select('Independent Features').show(truncate=False)

+--------------------------------------------------------------------------------+
|Independent Features                                                            |
+--------------------------------------------------------------------------------+
|(11,[0,1,3,6,9,10],[1.0,2.0,160.0,1.0,0.054234607979015324,0.8581314878892734]) |
|(11,[0,1,3,6,9,10],[1.0,2.0,160.0,1.0,0.055255936822718524,0.43252595155709345])|
|(11,[0,1,3,6,9,10],[1.0,2.0,160.0,1.0,0.059173821239547196,0.07612456747404844])|
|(11,[0,1,3,6,7,10],[1.0,2.0,160.0,1.0,1.0,0.09688581314878893])                 |
|[1.0,2.0,1.0,160.0,0.0,0.0,1.0,1.0,0.0,0.050067437469188056,0.823529411764706]  |
|(11,[0,1,3,7,10],[1.0,3.0,156.0,1.0,0.411764705882353])                         |
|[1.0,3.0,0.0,160.0,0.0,0.0,1.0,1.0,0.0,0.042518161608081334,0.328719723183391]  |
|(11,[0,1,2,3,7,10],[1.0,3.0,1.0,156.0,1.0,0.05536332179930796])                 |
|[1.0,3.0,1.0,160.0,0.0,0.0,1.0,0.0,0.0,0.043930162687080845,0.12456747404844291]|
|[1.

**We would realize there are some erroneous values in our dataset. Let's try to fix it.**

In [0]:
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.ml.linalg import SparseVector, DenseVector

def sparse_to_array(v):
  v = DenseVector(v)
  new_array = list([float(x) for x in v])
  return new_array

sparse_to_array_udf = F.udf(sparse_to_array, T.ArrayType(T.FloatType()))

In [0]:
# Convert the training dataset
train = output.withColumn('features_array', sparse_to_array_udf('Independent Features'))
train.select('features_array').show(truncate=False)

+------------------------------------------------------------------------+
|features_array                                                          |
+------------------------------------------------------------------------+
|[1.0, 2.0, 0.0, 160.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.033889886, 0.4186851] |
|[1.0, 2.0, 0.0, 160.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.043686457, 0.982699]  |
|[1.0, 2.0, 1.0, 160.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.04812152, 0.7820069]  |
|[1.0, 2.0, 1.0, 160.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.041574966, 0.30449826]|
|[1.0, 3.0, 0.0, 160.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.073597066, 0.1384083] |
|[1.0, 3.0, 0.0, 160.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.05342908, 0.5294118]  |
|[1.0, 3.0, 1.0, 160.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.044573843, 0.38062283]|
|[1.0, 3.0, 1.0, 160.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.069245726, 0.816609]  |
|[1.0, 8.0, 0.0, 157.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.48442906]        |
|[1.0, 8.0, 0.0, 160.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.05121527, 0.84775084] |
|[1.0, 8.0, 0.0, 160.0, 0

In [0]:
# Convert the test dataset
test = output_test.withColumn('features_array', sparse_to_array_udf('Independent Features'))
test.select('features_array').show(truncate=False)

+-------------------------------------------------------------------------+
|features_array                                                           |
+-------------------------------------------------------------------------+
|[1.0, 2.0, 0.0, 160.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.05423461, 0.85813147]  |
|[1.0, 2.0, 0.0, 160.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.05525594, 0.43252596]  |
|[1.0, 2.0, 0.0, 160.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.059173822, 0.076124564]|
|[1.0, 2.0, 0.0, 160.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.096885815]        |
|[1.0, 2.0, 1.0, 160.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.050067436, 0.8235294]  |
|[1.0, 3.0, 0.0, 156.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.4117647]          |
|[1.0, 3.0, 0.0, 160.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.04251816, 0.32871974]  |
|[1.0, 3.0, 1.0, 156.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.055363324]        |
|[1.0, 3.0, 1.0, 160.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.04393016, 0.12456747]  |
|[1.0, 3.0, 1.0, 160.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.054528542, 0.9204152]  |
|[1.0, 3.0, 

**Here we can see that we have solved the problem.**<br>
**After that we want to convert this matrix to Vector otherwise we will get error while fitting/transform the data**

In [0]:
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf

list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())

train_with_vectors = train.select(
    train["Response"], 
    list_to_vector_udf(train["features_array"]).alias("features")
)

test_with_vectors = test.select(
    test["Response"], 
    list_to_vector_udf(test["features_array"]).alias("features")
)

train_with_vectors=train_with_vectors.withColumnRenamed('Response', 'label')
test_with_vectors=test_with_vectors.withColumnRenamed('Response', 'label')

In [0]:
train_with_vectors.show(truncate=False)

+-----+-------------------------------------------------------------------------------+
|label|features                                                                       |
+-----+-------------------------------------------------------------------------------+
|0    |[1.0,2.0,0.0,160.0,0.0,0.0,1.0,0.0,0.0,0.03388988623996577,0.41868512110726647]|
|0    |[1.0,2.0,0.0,160.0,0.0,0.0,1.0,0.0,0.0,0.04368645762601505,0.9826989619377163] |
|0    |[1.0,2.0,1.0,160.0,0.0,0.0,1.0,0.0,0.0,0.04812151766861693,0.7820069204152249] |
|0    |[1.0,2.0,1.0,160.0,0.0,0.0,1.0,1.0,0.0,0.0415749672114374,0.3044982698961938]  |
|0    |[1.0,3.0,0.0,160.0,0.0,0.0,1.0,0.0,0.0,0.07359706809789131,0.1384083044982699] |
|0    |[1.0,3.0,0.0,160.0,0.0,0.0,1.0,1.0,0.0,0.05342907903671389,0.5294117647058824] |
|0    |[1.0,3.0,1.0,160.0,0.0,0.0,1.0,1.0,0.0,0.044573841703330945,0.3806228373702422]|
|0    |[1.0,3.0,1.0,160.0,0.0,0.0,1.0,1.0,0.0,0.06924572353428149,0.8166089965397925] |
|0    |[1.0,8.0,0.0,157.0,0.0,0.

In [0]:
test_with_vectors.show(truncate=False)

+-----+--------------------------------------------------------------------------------+
|label|features                                                                        |
+-----+--------------------------------------------------------------------------------+
|0    |[1.0,2.0,0.0,160.0,0.0,0.0,1.0,0.0,0.0,0.054234607979015324,0.8581314878892734] |
|0    |[1.0,2.0,0.0,160.0,0.0,0.0,1.0,0.0,0.0,0.055255936822718524,0.43252595155709345]|
|0    |[1.0,2.0,0.0,160.0,0.0,0.0,1.0,0.0,0.0,0.059173821239547196,0.07612456747404844]|
|0    |[1.0,2.0,0.0,160.0,0.0,0.0,1.0,1.0,0.0,0.0,0.09688581314878893]                 |
|0    |[1.0,2.0,1.0,160.0,0.0,0.0,1.0,1.0,0.0,0.050067437469188056,0.823529411764706]  |
|0    |[1.0,3.0,0.0,156.0,0.0,0.0,0.0,1.0,0.0,0.0,0.411764705882353]                   |
|0    |[1.0,3.0,0.0,160.0,0.0,0.0,1.0,1.0,0.0,0.042518161608081334,0.328719723183391]  |
|0    |[1.0,3.0,1.0,156.0,0.0,0.0,0.0,1.0,0.0,0.0,0.05536332179930796]                 |
|0    |[1.0,3.0,1.0,1

# 2. Train the model

In [0]:
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, roc_auc_score, f1_score

def model_evaluation(predictions):
  predictionAndTarget = predictions.select("label", "prediction")
  predictionAndTargetNumpy = np.array((predictionAndTarget.collect()))

  acc = accuracy_score(predictionAndTargetNumpy[:,0], predictionAndTargetNumpy[:,1])
  f1 = f1_score(predictionAndTargetNumpy[:,0], predictionAndTargetNumpy[:,1])
  precision = precision_score(predictionAndTargetNumpy[:,0], predictionAndTargetNumpy[:,1])
  recall = recall_score(predictionAndTargetNumpy[:,0], predictionAndTargetNumpy[:,1])
  auc = roc_auc_score(predictionAndTargetNumpy[:,0], predictionAndTargetNumpy[:,1])
  print(f'Accuracy: {acc}.')
  print(f'Precision: {precision}.')
  print(f'Recall: {recall}.')
  print(f'f1: {f1}.')
  print(f'AUC: {auc}.')

## 2.1 Simple Logistic Regression

In [0]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)
lrModel = lr.fit(train_with_vectors)

In [0]:
lr_predictions = lrModel.transform(test_with_vectors)
model_evaluation(lr_predictions)

Accuracy: 0.7058422487937906.
Precision: 0.27844547166581063.
Recall: 0.8564507635597683.
f1: 0.42025839793281655.
AUC: 0.7704390182115897.


## 2.2 Random Forest

In [0]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
rfModel = rf.fit(train_with_vectors)

rf_predictions = rfModel.transform(df_test_with_vectors)
model_evaluation(rf_predictions)

Accuracy: 0.7060319387178468.
Precision: 0.28337910989040294.
Recall: 0.8972475515028706.
f1: 0.4307224642820955.
AUC: 0.7881129890252008.


In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator,MulticlassClassificationEvaluator
 
bcEvaluator = BinaryClassificationEvaluator(metricName="areaUnderROC",rawPredictionCol="prediction")
print("Area under ROC curve:", bcEvaluator.evaluate(rf_predictions))
mcEvaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Accuracy:", mcEvaluator.evaluate(rf_predictions))

Area under ROC curve: 0.7881129890252008
Accuracy: 0.7060319387178468


In [0]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Create ParamGrid for Cross Validation
rfparamGrid = (ParamGridBuilder()
             #.addGrid(rf.maxDepth, [2, 5, 10, 20, 30])
               .addGrid(rf.maxDepth, [2, 5, 10])
             #.addGrid(rf.maxBins, [10, 20, 40, 80, 100])
               .addGrid(rf.maxBins, [5, 10, 20])
             #.addGrid(rf.numTrees, [5, 20, 50, 100, 500])
               .addGrid(rf.numTrees, [5, 20, 50])
             .build())

# Create 5-fold CrossValidator
rfcv = CrossValidator(estimator = rf,
                      estimatorParamMaps = rfparamGrid,
                      evaluator = bcEvaluator,
                      numFolds = 3)

# Run cross validations.
rfcvModel = rfcv.fit(train_with_vectors)

# Use test set here so we can measure the accuracy of our model on new data
rfpredictions = rfcvModel.transform(test_with_vectors)

In [0]:
model_evaluation(rfpredictions)

Accuracy: 0.6932139491046183.
Precision: 0.27429916103949253.
Recall: 0.9207521250107323.
f1: 0.42267899020554556.
AUC: 0.7911788725193207.


In [0]:
best_model=rfcvModel.bestModel
print(f'Best Max Depth: {best_model._java_obj.getMaxDepth()}')
print(f'Best Max Bins: {best_model._java_obj.getMaxBins()}')
print(f'Best Number of Trees: {best_model._java_obj.getNumTrees()}')

Best Max Depth: 5
Best Max Bins: 5
Best Number of Trees: 20


## 2.3 Gradient-boosted Tree

In [0]:
from pyspark.ml.classification import GBTClassifier

gb = GBTClassifier(labelCol = 'label', featuresCol = 'features')
gbModel = gb.fit(train_with_vectors)

gb_predictions = gbModel.transform(test_with_vectors)
model_evaluation(gb_predictions)

Accuracy: 0.7339438917837781.
Precision: 0.3005454813504349.
Recall: 0.8678586632609621.
f1: 0.44647393780113886.
AUC: 0.7914548571560066.
