In [0]:
# File location and type
file_location = "/FileStore/tables/credit_approval_csv.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

A1,A2,A3,A4,A5,A6,A7,A8,A9,A10,A11,A12,A13,A14,A15,class
b,30.83,0.0,u,g,w,v,1.25,t,t,1,f,g,202.0,0,+
a,58.67,4.46,u,g,q,h,3.04,t,t,6,f,g,43.0,560,+
a,24.5,0.5,u,g,q,h,1.5,t,f,0,f,g,280.0,824,+
b,27.83,1.54,u,g,w,v,3.75,t,t,5,t,g,100.0,3,+
b,20.17,5.625,u,g,w,v,1.71,t,f,0,f,s,120.0,0,+
b,32.08,4.0,u,g,m,v,2.5,t,f,0,t,g,360.0,0,+
b,33.17,1.04,u,g,r,h,6.5,t,f,0,t,g,164.0,31285,+
a,22.92,11.585,u,g,cc,v,0.04,t,f,0,f,g,80.0,1349,+
b,54.42,0.5,y,p,k,h,3.96,t,f,0,f,g,180.0,314,+
b,42.5,4.915,y,p,w,v,3.165,t,f,0,t,g,52.0,1442,+


### Pandas API on Spark

In [0]:
import pyspark.pandas as ps
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier

In [0]:
p_df = df.pandas_api()

In [0]:
p_df.head(3)

Unnamed: 0,A1,A2,A3,A4,A5,A6,A7,A8,A9,A10,A11,A12,A13,A14,A15,class
0,b,30.83,0.0,u,g,w,v,1.25,t,t,1,f,g,202,0,+
1,a,58.67,4.46,u,g,q,h,3.04,t,t,6,f,g,43,560,+
2,a,24.5,0.5,u,g,q,h,1.5,t,f,0,f,g,280,824,+


In [0]:
p_df.shape

Out[46]: (690, 16)

In [0]:
p_df.info()

<class 'pyspark.pandas.frame.DataFrame'>
Int64Index: 690 entries, 0 to 689
Data columns (total 16 columns):
 #   Column  Non-Null Count  Dtype 
---  ------  --------------  ----- 
 0   A1      678 non-null    object
 1   A2      678 non-null    object
 2   A3      690 non-null    object
 3   A4      684 non-null    object
 4   A5      684 non-null    object
 5   A6      681 non-null    object
 6   A7      681 non-null    object
 7   A8      690 non-null    object
 8   A9      690 non-null    object
 9   A10     690 non-null    object
 10  A11     690 non-null    object
 11  A12     690 non-null    object
 12  A13     690 non-null    object
 13  A14     677 non-null    object
 14  A15     690 non-null    object
 15  class   690 non-null    object
dtypes: object(16)

### Data Wrangling

In [0]:
p_df['class'] = p_df['class'].replace({'+':'1', '-':'0'})

In [0]:
# 7 vars with nans, A1, A4, A5, A6, A7 cat, A2 and A14 num
p_df.isnull().sum()

Out[49]: A1       12
A2       12
A3        0
A4        6
A5        6
A6        9
A7        9
A8        0
A9        0
A10       0
A11       0
A12       0
A13       0
A14      13
A15       0
class     0
dtype: int64

In [0]:
p_df['class'] = p_df['class'].astype('int')

In [0]:
# A2, A3, A8, A14 float 
p_df[['A2', 'A3', 'A8', 'A14']] = p_df[['A2', 'A3', 'A8', 'A14']].astype('float')

In [0]:
p_df[['A11', 'A15']] = p_df[['A11', 'A15']].astype('int')

In [0]:
p_df['A2'].mean()

Out[53]: 31.56817109144546

In [0]:
p_df['A14'].mean()

Out[54]: 184.01477104874445

In [0]:
# Impute numerical values with the mean
p_df.fillna({'A2': 31.57, 'A14':184.014}, inplace=True)

In [0]:
# A14 was originally integer
p_df['A14'] = p_df['A14'].astype('int')

In [0]:
# dict comprenhension to take all the modes 
modes = {col: p_df[col].mode()[0] for col in p_df.select_dtypes(include=['object'])}
print(modes)

{'A1': 'b', 'A4': 'u', 'A5': 'g', 'A6': 'c', 'A7': 'v', 'A9': 't', 'A10': 'f', 'A12': 'f', 'A13': 'g'}


In [0]:
# impute categorical values with the mode
p_df.fillna({'A1': 'b', 'A4':'u', 'A5':'g', 'A6':'q', 'A7':'h'}, inplace=True)

### Pipeline Model

In [0]:
num_vars = p_df.dtypes[p_df.dtypes != 'object'].index.to_list()
cat_vars = p_df.dtypes[(p_df.dtypes == 'object')].index.to_list()

In [0]:
df_pyspark = p_df.to_spark(index_col='index')

In [0]:
train, test = df_pyspark.randomSplit([0.7,0.3])

In [0]:
# features 
feats = ['A2', 'A3', 'A8', 'A11', 'A14', 'A15', 'A1_enc', 'A4_enc', 'A5_enc', 'A6_enc', 'A7_enc', 'A9_enc', 'A10_enc', 'A12_enc', 'A13_enc']

In [0]:
# StringIndexer
indexers = [StringIndexer(inputCol=column, outputCol=column+"_enc").fit(df_pyspark) for column in cat_vars]

In [0]:
rfc = RandomForestClassifier(labelCol="class", featuresCol="featureVector")

In [0]:
# stage_1 = indexers

In [0]:
stage_2 = VectorAssembler(inputCols=feats, outputCol="featureVector")

In [0]:
stage_3 = rfc

In [0]:
pipeline = Pipeline(stages=indexers + [stage_2, stage_3])

### Cross validation | GridSearch

In [0]:
evaluator = BinaryClassificationEvaluator(labelCol="class", rawPredictionCol="prediction", metricName='areaUnderROC')

In [0]:
paramGrid = ParamGridBuilder().addGrid(rfc.numTrees,[30,40,50]).addGrid(rfc.maxDepth,[8,9,10]).build()

In [0]:
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3)

In [0]:
rf_model = cv.fit(train)

In [0]:
test_results=rf_model.transform(test)

In [0]:
test_results.head(2)

Out[74]: [Row(index=1, A1='a', A2=58.67, A3=4.46, A4='u', A5='g', A6='q', A7='h', A8=3.04, A9='t', A10='t', A11=6, A12='f', A13='g', A14=43, A15=560, class=1, A1_enc=1.0, A4_enc=0.0, A5_enc=0.0, A6_enc=1.0, A7_enc=1.0, A9_enc=0.0, A10_enc=1.0, A12_enc=0.0, A13_enc=0.0, featureVector=DenseVector([58.67, 4.46, 3.04, 6.0, 43.0, 560.0, 1.0, 0.0, 0.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0]), rawPrediction=DenseVector([0.0104, 29.9896]), probability=DenseVector([0.0003, 0.9997]), prediction=1.0),
 Row(index=2, A1='a', A2=24.5, A3=0.5, A4='u', A5='g', A6='q', A7='h', A8=1.5, A9='t', A10='f', A11=0, A12='f', A13='g', A14=280, A15=824, class=1, A1_enc=1.0, A4_enc=0.0, A5_enc=0.0, A6_enc=1.0, A7_enc=1.0, A9_enc=0.0, A10_enc=0.0, A12_enc=0.0, A13_enc=0.0, featureVector=SparseVector(15, {0: 24.5, 1: 0.5, 2: 1.5, 4: 280.0, 5: 824.0, 6: 1.0, 9: 1.0, 10: 1.0}), rawPrediction=DenseVector([4.7452, 25.2548]), probability=DenseVector([0.1582, 0.8418]), prediction=1.0)]

### Evaluation

In [0]:
rf_auc=BinaryClassificationEvaluator(labelCol='class').evaluate(test_results)
print(rf_auc)

0.9390350877192976


In [0]:
rf_accuracy=MulticlassClassificationEvaluator(labelCol='class', metricName='accuracy').evaluate(test_results)
print(rf_accuracy)

0.9019607843137255


### Best Model and Feature Importance

In [0]:
rf_best = rf_model.bestModel

In [0]:
# Best model params
print("NumTrees: " + str(rf_best.stages[-1]._java_obj.getNumTrees()))
print("MaxDepth: " + str(rf_best.stages[-1]._java_obj.getMaxDepth()))

NumTrees: 30
MaxDepth: 10


In [0]:
feature_importance_list = list(zip(feats, rf_best.stages[-1].featureImportances.toArray()))
feature_importance_list.sort(key=lambda x: x[1], reverse=True)

In [0]:
feature_importance_list

Out[80]: [('A9_enc', 0.2919233451687551),
 ('A11', 0.10679584564543516),
 ('A8', 0.09736974861744427),
 ('A6_enc', 0.08995131183263554),
 ('A3', 0.0799580480774031),
 ('A2', 0.06927529709594832),
 ('A15', 0.05921262435835365),
 ('A14', 0.054603865769116966),
 ('A10_enc', 0.043516633404807444),
 ('A7_enc', 0.033403578468830686),
 ('A4_enc', 0.020601412051289087),
 ('A5_enc', 0.015755473600892404),
 ('A13_enc', 0.014318331290370247),
 ('A12_enc', 0.013270481538430285),
 ('A1_enc', 0.01004400308028765)]