# Bank credit scoring

Scoring is a system used by banks to evaluate clients, based on statistical methods. As a rule, it is an algorithm where the data of a potential borrower is entered. In response, a result is given - whether it is worth granting him a loan.

The **task** is to implement logic to train a model for assessing the trustworthiness of bank customers. Use the accuracy metric for the evaluation.

**The structure of the source data:**

| Field | Description |
|-------|-------------|
| client_id | Unique identifier of the client |
| age       | Client's age at the time of review |
| sex       | Gender of the client |
| married   | Marital status |
| salary    | Official and confirmed salary of the client |
| successful_credit_completed | Number of repaid credits |
| credit_completed_amount | Total amount of paid credits |
| active_credits | Number of active credits |
| active_credits_amount | Total amount of active credits |
| credit_amount | Amount of credits |
| is_credit_closed | Indicator indicating whether the credit was successfully closed (paid to the bank)|


Connect the spark session, pipelines, and different transformers from the spark module: StringIndexer, VectorAssembler, MulticlassClassificationEvaluator

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.types import IntegerType

Creating a Spark Session.

In [2]:
spark = SparkSession.builder.appName("PySparkBank").getOrCreate()

22/04/12 10:47:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


We see that it is running in local mode - we do not have a master cluster. We will use the notebook's own resources.

In [3]:
spark

In [4]:
bank_df = spark.read.parquet('train.parquet')

                                                                                

Let's look at the data. The target feature is_credit_closed, i.e. whether the credit issued to the client was successfully closed (paid to the bank)

In [5]:
bank_df.show(5)

+--------------------+---+----+-------+------+-----------------------------+-----------------------+--------------+---------------------+-------------+----------------+
|           client_id|age| sex|married|salary|successfully_credit_completed|credit_completed_amount|active_credits|active_credits_amount|credit_amount|is_credit_closed|
+--------------------+---+----+-------+------+-----------------------------+-----------------------+--------------+---------------------+-------------+----------------+
|000070b1e6d44001b...| 29|male|  false| 11000|                            0|                      0|             0|                    0|       600000|               0|
|000561d9b7774179a...| 36|male|   true| 70000|                            0|                      0|             0|                    0|      4450000|               1|
|0005f6d032444a75a...| 37|male|  false| 70000|                            0|                      0|             0|                    0|      4250000|    

In [6]:
bank_df = bank_df.withColumn("is_married", bank_df.married.cast(IntegerType()))

For the model to be able to use this data, we need to prepare it. First, we work with the categorical attributes - to encode the is_married column. To do this, we use StringIndexer. 

In [7]:
sex_index = StringIndexer(inputCol='sex', outputCol="Sex_index")

We call the fit operation on our df, which will result in a transformer and we can call the transform function to transform our data.

In [8]:
bank_df = sex_index.fit(bank_df).transform(bank_df)

                                                                                

In [9]:
bank_df.show(5)

+--------------------+---+----+-------+------+-----------------------------+-----------------------+--------------+---------------------+-------------+----------------+----------+---------+
|           client_id|age| sex|married|salary|successfully_credit_completed|credit_completed_amount|active_credits|active_credits_amount|credit_amount|is_credit_closed|is_married|Sex_index|
+--------------------+---+----+-------+------+-----------------------------+-----------------------+--------------+---------------------+-------------+----------------+----------+---------+
|000070b1e6d44001b...| 29|male|  false| 11000|                            0|                      0|             0|                    0|       600000|               0|         0|      0.0|
|000561d9b7774179a...| 36|male|   true| 70000|                            0|                      0|             0|                    0|      4450000|               1|         1|      0.0|
|0005f6d032444a75a...| 37|male|  false| 70000|    

Now we have to do vectorization, since the models in the spark work with embeddings, and we need to vectorize the data so they can be applied to the models.

First, we form a list of features to be used in model training. We form an array, place it in the features variable, specify columns to be used.

In [10]:
features = ['age', 'salary', 'successfully_credit_completed', 
            'credit_completed_amount', 'active_credits', 
            'active_credits_amount', 'credit_amount', 'is_married', 'Sex_index']

Apply VectorAssembler, where we pass the inputCols parameter to the list of features columns. We assign outputCol, which is called features by default.

Since VectorAssembler is a transformer, we call its transform method to apply it to our df.

In [11]:
feature = VectorAssembler(inputCols=features, outputCol="features")
training_data= feature.transform(bank_df)

In [None]:
test_df = spark.read.parquet('test.parquet')

In [13]:
test_df = test_df.withColumn("is_married", test_df.married.cast(IntegerType()))

In [14]:
test_df = sex_index.fit(test_df).transform(test_df)

In [15]:
test_data = feature.transform(test_df)

# ML models

## LogisticRegression

We want to create a logistic regression model to solve the classification problem. In this case logistic regression suits us, because we have only 2 classes, which we predict (2 possible values - either the loan is paid or not).

To evaluate models, we first need to create an evaluator (MulticlassClassificationEvaluator). We specify the column where the true value is stored in order to use it for comparison - labelCon with the feature we want to predict. Then we specify the column where the prediction from our model will be stored. The  metric of our choice is accuracy.

In [16]:
evaluator = MulticlassClassificationEvaluator(labelCol="is_credit_closed", predictionCol="prediction", metricName="accuracy")

In [17]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="is_credit_closed", featuresCol="features")

lrModel = lr.fit(training_data)
lr_prediction = lrModel.transform(test_data)
lr_prediction.select("prediction", "is_credit_closed", "features").show(5)

22/04/12 10:55:17 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/04/12 10:55:17 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

+----------+----------------+--------------------+
|prediction|is_credit_closed|            features|
+----------+----------------+--------------------+
|       1.0|               1|(9,[0,1,6,7],[40....|
|       0.0|               0|(9,[0,1,6],[41.0,...|
|       1.0|               1|(9,[0,1,6,7],[52....|
|       1.0|               1|(9,[0,1,6,7],[44....|
|       0.0|               1|(9,[0,1,6],[46.0,...|
+----------+----------------+--------------------+
only showing top 5 rows



To apply the estimator and get an estimate from the model, we have to refer to the previously created Evaluator, call the evaluate function.

In [18]:
lr_accuracy = evaluator.evaluate(lr_prediction)
print("LogisticRegression [Accuracy] = %g"% (lr_accuracy))
print("LogisticRegression [Error] = %g " % (1.0 - lr_accuracy))

[Stage 73:>                                                         (0 + 1) / 1]

LogisticRegression [Accuracy] = 0.809644
LogisticRegression [Error] = 0.190356 


                                                                                

## DecisionTreeClassifier

Let's experiment with other models.

Let's use DecisionTreeClassifier - a decision tree. We will take it from the same classification package. We'll set the same parameters and train it in the same way.

In [19]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="is_credit_closed", featuresCol="features")
dt_model = dt.fit(training_data)
dt_prediction = dt_model.transform(test_data)

dt_prediction.select("prediction", "is_credit_closed", "features").show(5)

                                                                                

+----------+----------------+--------------------+
|prediction|is_credit_closed|            features|
+----------+----------------+--------------------+
|       1.0|               1|(9,[0,1,6,7],[40....|
|       1.0|               0|(9,[0,1,6],[41.0,...|
|       1.0|               1|(9,[0,1,6,7],[52....|
|       1.0|               1|(9,[0,1,6,7],[44....|
|       1.0|               1|(9,[0,1,6],[46.0,...|
+----------+----------------+--------------------+
only showing top 5 rows



We estimate the accuracy. We see that this model is more accurate.

In [20]:
dt_accuracy = evaluator.evaluate(dt_prediction)
print("DecisionTreeClassifier [Accuracy] = %g"% (dt_accuracy))
print("DecisionTreeClassifier [Error] = %g " % (1.0 - dt_accuracy))

[Stage 92:>                                                         (0 + 1) / 1]

DecisionTreeClassifier [Accuracy] = 0.810827
DecisionTreeClassifier [Error] = 0.189173 


                                                                                

## RandomForestClassifier

In [21]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="is_credit_closed", featuresCol="features")
rf_model = rf.fit(training_data)
rf_prediction = rf_model.transform(test_data)
rf_prediction.select("prediction", "is_credit_closed", "features").show(5)

                                                                                

+----------+----------------+--------------------+
|prediction|is_credit_closed|            features|
+----------+----------------+--------------------+
|       1.0|               1|(9,[0,1,6,7],[40....|
|       0.0|               0|(9,[0,1,6],[41.0,...|
|       1.0|               1|(9,[0,1,6,7],[52....|
|       1.0|               1|(9,[0,1,6,7],[44....|
|       1.0|               1|(9,[0,1,6],[46.0,...|
+----------+----------------+--------------------+
only showing top 5 rows



In [22]:
rf_accuracy = evaluator.evaluate(rf_prediction)
print("RandomForestClassifier [Accuracy] = %g"% (rf_accuracy))
print("RandomForestClassifier [Error] = %g" % (1.0 - rf_accuracy))

[Stage 111:>                                                        (0 + 1) / 1]

RandomForestClassifier [Accuracy] = 0.904116
RandomForestClassifier [Error] = 0.0958841


                                                                                

## Gradient-boosted tree classifier

In [23]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(labelCol="is_credit_closed", featuresCol="features",maxIter=10)
gbt_model = gbt.fit(training_data)
gbt_prediction = gbt_model.transform(test_data)
gbt_prediction.select("prediction", "is_credit_closed", "features").show(5)

                                                                                

+----------+----------------+--------------------+
|prediction|is_credit_closed|            features|
+----------+----------------+--------------------+
|       1.0|               1|(9,[0,1,6,7],[40....|
|       0.0|               0|(9,[0,1,6],[41.0,...|
|       1.0|               1|(9,[0,1,6,7],[52....|
|       1.0|               1|(9,[0,1,6,7],[44....|
|       1.0|               1|(9,[0,1,6],[46.0,...|
+----------+----------------+--------------------+
only showing top 5 rows



Having evaluated the model, we found out that it's the most accurate one.

In [24]:
gbt_accuracy = evaluator.evaluate(gbt_prediction)
print("Gradient-boosted [Accuracy] = %g"% (gbt_accuracy))
print("Gradient-boosted [Error] = %g"% (1.0 - gbt_accuracy))

Gradient-boosted [Accuracy] = 0.909649
Gradient-boosted [Error] = 0.0903505


Doing a model swap is easy enough, because all sparkML models out of the box to solve regression and classification problems work through vectorization, which makes it easy enough to change the model and experiment - we have a common pipeline of what needs to be done with the data.

## Save & Load Model

Once we have trained the model, we can save it. To do this, we can use the save method of the model, which allows us to reset it to the hard disk or to some file system 

In [25]:
rf_model.write().overwrite().save('rf_model')

In [26]:
from pyspark.ml.classification import RandomForestClassificationModel
type(RandomForestClassificationModel.load('rf_model'))

pyspark.ml.classification.RandomForestClassificationModel

## Pipeline

Pipeline is a key thing to simplify the process of implementing models in the required environment. Let's see how it's done.

In [27]:
from pyspark.ml.pipeline import PipelineModel

In [29]:
train = spark.read.parquet('train.parquet')

In [31]:
test = spark.read.parquet('test.parquet')

In [33]:
train = train.withColumn("is_married", train.married.cast(IntegerType()))
test = test.withColumn("is_married", test.married.cast(IntegerType()))

StringIndexer is created, but not applied immediately. We create VectorAssembler, but don't apply it to the data.

In [34]:
sex_index = StringIndexer(inputCol='sex', outputCol="Sex_index")

In [38]:
feature = VectorAssembler(inputCols=['age', 'salary', 'successfully_credit_completed', 
            'credit_completed_amount', 'active_credits', 
            'active_credits_amount', 'credit_amount', 'is_married', 'Sex_index'],
                          outputCol="features")

Choose the model we want to train. Specify a column with the target feature we want to predict, and a column with vectors from the features.

In [39]:
rf_classifier = RandomForestClassifier(labelCol="is_credit_closed", featuresCol="features")

After we have defined the model, we create a pipeline:

indexer for gender-> vector-assembler to build the features -> model to be trained on this data.

In [41]:
pipeline = Pipeline(stages=[sex_index, feature, rf_classifier])

Once we have assembled the pipelayer, we can train it. Since it is an estimator, we can call the fit method and pass our training dataset to it.

In [42]:
p_model = pipeline.fit(train)

                                                                                

In [43]:
type(p_model)

pyspark.ml.pipeline.PipelineModel

In [44]:
p_model.write().overwrite().save('p_model')

In [45]:
model = PipelineModel.load('p_model')

                                                                                

Let's specify a test dataset. It does not contain the model we need for the columns. Our model will apply the transformations one by one and as a result of this transformation we will get a more complete df for the model to work.

In [46]:
prediction = p_model.transform(test)

In [47]:
test.show(5)

+--------------------+---+----+-------+------+-----------------------------+-----------------------+--------------+---------------------+-------------+----------------+----------+
|           client_id|age| sex|married|salary|successfully_credit_completed|credit_completed_amount|active_credits|active_credits_amount|credit_amount|is_credit_closed|is_married|
+--------------------+---+----+-------+------+-----------------------------+-----------------------+--------------+---------------------+-------------+----------------+----------+
|0000726b27784ee6b...| 40|male|   true| 90000|                            0|                      0|             0|                    0|      4350000|               1|         1|
|0001fe9b70a44611a...| 41|male|  false| 60000|                            0|                      0|             0|                    0|      3300000|               0|         0|
|000405b412d64b0bb...| 52|male|   true| 90000|                            0|                      0|

In [48]:
prediction.select("prediction", "is_credit_closed", "features").show(5)

+----------+----------------+--------------------+
|prediction|is_credit_closed|            features|
+----------+----------------+--------------------+
|       1.0|               1|(9,[0,1,6,7],[40....|
|       0.0|               0|(9,[0,1,6],[41.0,...|
|       1.0|               1|(9,[0,1,6,7],[52....|
|       1.0|               1|(9,[0,1,6,7],[44....|
|       1.0|               1|(9,[0,1,6],[46.0,...|
+----------+----------------+--------------------+
only showing top 5 rows



In [49]:
prediction.printSchema()

root
 |-- client_id: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- sex: string (nullable = true)
 |-- married: boolean (nullable = true)
 |-- salary: integer (nullable = true)
 |-- successfully_credit_completed: integer (nullable = true)
 |-- credit_completed_amount: integer (nullable = true)
 |-- active_credits: integer (nullable = true)
 |-- active_credits_amount: integer (nullable = true)
 |-- credit_amount: integer (nullable = true)
 |-- is_credit_closed: integer (nullable = true)
 |-- is_married: integer (nullable = true)
 |-- Sex_index: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [50]:
evaluator = MulticlassClassificationEvaluator(labelCol="is_credit_closed", predictionCol="prediction", metricName="accuracy")

In [51]:
p_accuracy = evaluator.evaluate(prediction)
print("Pipeline model [Accuracy] = %g"% (p_accuracy))
print("Pipeline model [Error] = %g " % (1.0 - p_accuracy))

Pipeline model [Accuracy] = 0.904116
Pipeline model [Error] = 0.0958841 


                                                                                

## Hyperparameter tuning

Optimization of hyperparameters. This requires the ParamGridBuilder component and the TrainValidationSplit optimization algorithm. Select the hyperparameters to improve the quality of the trained model.


In [52]:
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

We need to set the grid of parameters that will be used to build various combinations, and then our model will be built based on these combinations. We specify the object that we want to optimize, the hyperparameter that we want to optimize, and the range of values from which the combinations are made.

In [53]:
paramGrid = ParamGridBuilder()\
                  .addGrid(rf_classifier.maxDepth, [2, 3, 4])\
                  .addGrid(rf_classifier.maxBins, [4, 5, 6])\
                  .addGrid(rf_classifier.minInfoGain, [0.05, 0.1, 0.15])\
                  .build()

Declare TrainValidationSplit, where we specify estimator - the model to be trained (in this case we use pipeline), then specify paramMaps, from which combinations of hyperparameters will be made to train the model. Here we also need the estimator we created earlier, and specify trainRatio - the proportion of how we will split. 

In [54]:
 tvs = TrainValidationSplit(estimator=pipeline,
                            estimatorParamMaps=paramGrid,
                            evaluator=evaluator,
                            trainRatio=0.8)

We can start the learning process of the model.

In [55]:
model = tvs.fit(train)

                                                                                

As a result we get a model, but this model has TrainValidationSplitModel type, ie it is already trained TrainValidationSplit, which by default will apply the best model. But we don't need all of it, we want to use the best model.

Let's turn to the parameter bestModel, to get the best model.

In [56]:
type(model)

pyspark.ml.tuning.TrainValidationSplitModel

In [57]:
model.bestModel

PipelineModel_e20c465df05d

In [58]:
type(model.bestModel)

pyspark.ml.pipeline.PipelineModel

Let's see what hyperparameters were obtained in the optimization process.

In [59]:
jo = model.bestModel.stages[-1]._java_obj
print('Max Depth: {}'.format(jo.getMaxDepth()))
print('Num Trees: {}'.format(jo.getMaxBins()))
print('Impurity: {}'.format(jo.getMinInfoGain()))

Max Depth: 2
Num Trees: 4
Impurity: 0.05
