In [None]:
%%bash
# Do not change or modify this file
# Need to install pyspark
# if pyspark is already installed, will print a message indicating pyspark already isntalled
pip install pyspark

In [None]:
# import statements
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

MAX_MEMORY = "12g"
spark = SparkSession \
  .builder \
  .master("local[*]")\
  .config("spark.memory.fraction", 0.8) \
  .config("spark.executor.memory", MAX_MEMORY) \
  .config("spark.driver.memory", MAX_MEMORY)\
  .config("spark.memory.offHeap.enabled",'true')\
  .config("spark.memory.offHeap.size",MAX_MEMORY)\
  .getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)

In [None]:
train_df = spark.read.format("csv").option("header", "true").load("train.csv")
train_df.take(5)

In [None]:
from pyspark.sql.functions import *
train_df = train_df.drop('id')
display(train_df.toPandas().head())

In [None]:
import seaborn as sns
train_pd_df = train_df.toPandas()
sns.countplot(train_pd_df.Gender)

In [None]:
sns.countplot(train_pd_df.Vehicle_Age)

In [None]:
sns.countplot(train_pd_df.Vehicle_Damage)

In [None]:
train_df = train_df.withColumn("Gender", when(train_df.Gender == 'Male', 1).otherwise(0))
train_df = train_df.withColumn("Vehicle_Damage", when(train_df.Vehicle_Damage == 'Yes', 1).otherwise(0))
train_df = train_df.withColumn("Vehicle_Age", when(train_df.Vehicle_Age == '> 2 Years', 2).when(train_df.Vehicle_Age == '1-2 Year', 1).otherwise(0))

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StringIndexerModel

indexer_1 = StringIndexerModel.from_labels(['Female', 'Male'], inputCol="Gender", outputCol="Gender_idx")
indexer_2 = StringIndexerModel.from_labels(['No', 'Yes'], inputCol="Vehicle_Damage", outputCol="Vehicle_Damage_idx")
indexer_3 = StringIndexerModel.from_labels(['< 1 Year', '1-2 Year', '> 2 Years'], inputCol="Vehicle_Age", outputCol="Vehicle_Age_idx")

feature_engineering_pipe = Pipeline(stages=[indexer_1, indexer_2, indexer_3])
temp_df = feature_engineering_pipe.fit(train_df).transform(train_df)
columns_to_drop = ['Gender', 'Vehicle_Damage', 'Vehicle_Age']
temp_df = temp_df.drop(*columns_to_drop)
train_df_new = temp_df.withColumnRenamed("Gender_idx", "Gender").withColumnRenamed("Vehicle_Damage_idx", "Vehicle_Damage").withColumnRenamed("Vehicle_Age_idx", "Vehicle_Age")

In [None]:
display(train_df_new.toPandas().head())

In [None]:
train_df_new.dtypes

In [None]:
from pyspark.sql.types import IntegerType
from pyspark.sql.types import DoubleType

train_df_new = train_df_new.withColumn("Age", train_df_new["Age"].cast(DoubleType()))
train_df_new = train_df_new.withColumn("Driving_License", train_df_new["Driving_License"].cast(DoubleType()))
train_df_new = train_df_new.withColumn("Region_Code", train_df_new["Region_Code"].cast(DoubleType()))
train_df_new = train_df_new.withColumn("Previously_Insured", train_df_new["Previously_Insured"].cast(DoubleType()))
train_df_new = train_df_new.withColumn("Annual_Premium", train_df_new["Annual_Premium"].cast(DoubleType()))
train_df_new = train_df_new.withColumn("Policy_Sales_Channel", train_df_new["Policy_Sales_Channel"].cast(DoubleType()))
train_df_new = train_df_new.withColumn("Vintage", train_df_new["Vintage"].cast(DoubleType()))
train_df_new = train_df_new.withColumn("Response", train_df_new["Response"].cast(DoubleType()))

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
#train_df.dtypes
vecAssembler = VectorAssembler(inputCols=['Gender', 'Age', 'Driving_License', 'Region_Code', 'Previously_Insured', 'Vehicle_Age', 'Vehicle_Damage', 'Annual_Premium', 'Policy_Sales_Channel', 'Vintage'], outputCol="features")
#train_df = vecAssembler.transform(train_df)
sc = StandardScaler(withMean=True, withStd=True, inputCol='features')

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator


evaluator = BinaryClassificationEvaluator(labelCol="Response", metricName="areaUnderROC")

In [None]:
training_df, testing_df = train_df_new.randomSplit([0.7, 0.3])

In [None]:
#RF
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol="Response", featuresCol="features")

rf_pipeline = Pipeline(stages=[vecAssembler, rf]).fit(training_df)
rf_model = rf_pipeline.transform(testing_df)

In [None]:
print(evaluator.evaluate(rf_model))

In [None]:
feature_importances = rf_pipeline.stages[-1].featureImportances
feature_imp_array = feature_importances.toArray()

In [None]:
print(feature_imp_array)

In [None]:
predictors = ['Gender', 'Age', 'Driving_License', 'Region_Code', 'Previously_Insured', 'Vehicle_Age', 'Vehicle_Damage', 'Annual_Premium', 'Policy_Sales_Channel', 'Vintage']

feats = {} # a dict to hold feature_name: feature_importance
for feature, importance in zip(predictors, feature_imp_array):
    feats[feature] = importance

In [None]:
import pandas as pd

importances = pd.DataFrame.from_dict(feats, orient='index').rename(columns={0: 'Gini-importance'})
importances.sort_values(by='Gini-importance').plot(kind='bar', rot=90, title='RF feature importance')

In [None]:
from sklearn.metrics import precision_score, recall_score
from sklearn.metrics import classification_report

#actual = rf_model_pd['Response'].tolist()
#pred = rf_model_pd['prediction'].tolist()

y_true = rf_model.select(['Response']).collect()
y_pred = rf_model.select(['prediction']).collect()

#print(classification_report(actual, pred))
#print(precision_score(actual, pred))
print(classification_report(y_true, y_pred))