<a href="https://colab.research.google.com/github/ruizleandro/MLlib-for-PySpark/blob/master/Building_a_classification_model_with_MLlib_for_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Downgrading java and installing pyspark

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
!update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java
!java -version

!pip install pyspark

update-alternatives: using /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java to provide /usr/bin/java (java) in manual mode
openjdk version "1.8.0_252"
OpenJDK Runtime Environment (build 1.8.0_252-8u252-b09-1~18.04-b09)
OpenJDK 64-Bit Server VM (build 25.252-b09, mixed mode)
Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/8e/b0/bf9020b56492281b9c9d8aae8f44ff51e1bc91b3ef5a884385cb4e389a40/pyspark-3.0.0.tar.gz (204.7MB)
[K     |████████████████████████████████| 204.7MB 58kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 39.8MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.0-py2.py3-none-any.whl size=205044182 sha256=d1ee1adf794339d1a8122f18

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('supervised_ml').getOrCreate()

# Collecting and Exploring the Data

### Importing the Dataset:

In [None]:
from google.colab import files

uploaded = files.upload()

for fn in uploaded.keys():
  print('User uploaded file "{name}" with length {length} bytes'.format(
      name=fn, length=len(uploaded[fn])))

Saving loan_classification_data.csv to loan_classification_data.csv
User uploaded file "loan_classification_data.csv" with length 2487678 bytes


In [None]:
df = spark.read.csv('loan_classification_data.csv', inferSchema=True,
                    header=True)

### Shape of the dataframe:

In [None]:
print((df.count(), len(df.columns)))

(46751, 12)


### Columns and datatypes of the dataset:

In [None]:
df.printSchema()

root
 |-- loan_id: string (nullable = true)
 |-- loan_purpose: string (nullable = true)
 |-- is_first_loan: integer (nullable = true)
 |-- total_credit_card_limit: integer (nullable = true)
 |-- avg_percentage_credit_card_limit_used_last_year: double (nullable = true)
 |-- saving_amount: integer (nullable = true)
 |-- checking_amount: integer (nullable = true)
 |-- is_employed: integer (nullable = true)
 |-- yearly_salary: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- dependent_number: integer (nullable = true)
 |-- label: integer (nullable = true)



### Sample of the dataframe:

In [None]:
df.show(10)

+-------+------------+-------------+-----------------------+-----------------------------------------------+-------------+---------------+-----------+-------------+---+----------------+-----+
|loan_id|loan_purpose|is_first_loan|total_credit_card_limit|avg_percentage_credit_card_limit_used_last_year|saving_amount|checking_amount|is_employed|yearly_salary|age|dependent_number|label|
+-------+------------+-------------+-----------------------+-----------------------------------------------+-------------+---------------+-----------+-------------+---+----------------+-----+
|    A_1|    personal|            1|                   7900|                                            0.8|         1103|           6393|          1|        16400| 42|               4|    0|
|    A_2|    personal|            0|                   3300|                                           0.29|         2588|            832|          1|        75500| 56|               1|    0|
|    A_3|    personal|            0|    

### Number of customer that defaulted their loan:

In [None]:
df.groupBy('label').count().show()

+-----+-----+
|label|count|
+-----+-----+
|    1|16201|
|    0|30550|
+-----+-----+



As you can see, more than one third of all customers have defaulted on their loans.

### Loan purposes:

In [None]:
df.groupBy('loan_purpose').count().show()

+------------+-----+
|loan_purpose|count|
+------------+-----+
|      others| 6763|
|   emergency| 7562|
|    property|11388|
|  operations|10580|
|    personal|10458|
+------------+-----+



We can see that people prefer to apply for a loan mainly for property, operations, and personal reasons.

# Data Transformation

Because all of the variables in the dataframe are numerical, except for the loan purpose, we must convert them into numerical form, using `OneHotEncoder`.

In [None]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

loan_purpose_indexer = StringIndexer(inputCol='loan_purpose',
                                     outputCol='loan_index').fit(df)

df = loan_purpose_indexer.transform(df)

loan_encoder = OneHotEncoder(inputCol='loan_index',
                             outputCol='loan_purpose_vec')

df = loan_encoder.fit(df).transform(df)

In [None]:
df.select(['loan_purpose', 'loan_index', 'loan_purpose_vec']).show(12, False)

Now that we have converted the original loan-purpose feature into vectorized form, we can use `VectorAssembler` to create a single-feature vector for model training.

In [None]:
df_assembler = VectorAssembler(inputCols=['is_first_loan',
  'total_credit_card_limit', 'avg_percentage_credit_card_limit_used_last_year',
  'saving_amount', 'checking_amount', 'is_employed', 'yearly_salary', 'age',
  'dependent_number', 'loan_purpose_vec'], outputCol='features')

df = df_assembler.transform(df)

In [None]:
df.select(['features', 'label']).show(10, False)

+--------------------------------------------------------------------+-----+
|features                                                            |label|
+--------------------------------------------------------------------+-----+
|[1.0,7900.0,0.8,1103.0,6393.0,1.0,16400.0,42.0,4.0,0.0,0.0,1.0,0.0] |0    |
|[0.0,3300.0,0.29,2588.0,832.0,1.0,75500.0,56.0,1.0,0.0,0.0,1.0,0.0] |0    |
|[0.0,7600.0,0.9,1651.0,8868.0,1.0,59000.0,46.0,1.0,0.0,0.0,1.0,0.0] |0    |
|[1.0,3400.0,0.38,1269.0,6863.0,1.0,26000.0,55.0,8.0,0.0,0.0,1.0,0.0]|0    |
|[0.0,2600.0,0.89,1310.0,3423.0,1.0,9700.0,41.0,4.0,0.0,0.0,0.0,1.0] |1    |
|[0.0,7600.0,0.51,1040.0,2406.0,1.0,22900.0,52.0,0.0,0.0,1.0,0.0,0.0]|0    |
|[1.0,6900.0,0.82,2408.0,5556.0,1.0,34800.0,48.0,4.0,0.0,1.0,0.0,0.0]|0    |
|[0.0,5700.0,0.56,1933.0,4139.0,1.0,32500.0,64.0,2.0,0.0,0.0,1.0,0.0]|0    |
|[1.0,3400.0,0.95,3866.0,4131.0,1.0,13300.0,23.0,3.0,0.0,0.0,1.0,0.0]|0    |
|[0.0,2900.0,0.91,88.0,2725.0,1.0,21100.0,52.0,1.0,0.0,0.0,1.0,0.0]  |1    |

We now create a new dataframe with just two columns, features and label:

In [None]:
model_df = df.select(['features', 'label'])

# Splitting into Train and Test Data

In [None]:
train, test = model_df.randomSplit([0.75, 0.25])

# Model Training

First, we will try Logistic Regression, with default parameters:

In [None]:
from pyspark.ml.classification import LogisticRegression

logreg = LogisticRegression().fit(train)

lr_summary = logreg.summary
print('Accuracy:', lr_summary.accuracy)
print('Area under ROC:', lr_summary.areaUnderROC)
print('Precision by label:', lr_summary.precisionByLabel)
print('Recall by label:', lr_summary.recallByLabel)

Accuracy: 0.894044262155003
Area under ROC: 0.9592178824818095
Precision by label: [0.9232266053422962, 0.8412418980555333]
Recall by label: [0.9132108486439195, 0.8582741448281492]


In [None]:
predictions = logreg.transform(test)
predictions.show(10)

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(13,[0,1,2,3,4,7]...|    0|[5.06418659849462...|[0.99372063090264...|       0.0|
|(13,[0,1,2,3,4,7,...|    1|[-5.1060047529926...|[0.00602374144419...|       1.0|
|(13,[0,1,2,3,4,7,...|    1|[-6.4426416280280...|[0.00158966407769...|       1.0|
|(13,[0,1,2,3,4,7,...|    1|[-6.8118823112913...|[0.00109940920997...|       1.0|
|(13,[0,1,2,3,4,7,...|    1|[-6.4840701706600...|[0.00152525055266...|       1.0|
|(13,[0,1,2,3,4,7,...|    1|[-6.4518139839248...|[0.00157517264245...|       1.0|
|(13,[0,1,2,3,4,7,...|    1|[-5.2204719231625...|[0.00537572329424...|       1.0|
|(13,[0,1,2,3,4,7,...|    1|[-6.1079316892328...|[0.00222020806256...|       1.0|
|(13,[0,1,2,3,4,7,...|    1|[-5.0247832041829...|[0.00653008899282...|       1.0|
|(13,[0,1,2,3,4,

In [None]:
model_pred = logreg.transform(test)
model_pred = logreg.evaluate(test)

In [None]:
print('Accuracy:', model_pred.accuracy)
print('Area under ROC:', model_pred.areaUnderROC)
print('Precision by label:', model_pred.precisionByLabel)
print('Recall by label:', model_pred.recallByLabel)

Accuracy: 0.8942621542690259
Area under ROC: 0.9580796607403271
Precision by label: [0.9244315941648048, 0.8373419290850483]
Recall by label: [0.9146944083224967, 0.854504048582996]


# Hyperparameter Tuning

So, using a baseline model, we are getting almost 89 % accuracy on the test data. Now we can build a more sophisticated model, such as a random forest model, which is an ensemble method that can improve the accuracy of predictions.

In [None]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier()
rf_model = rf.fit(train)

Using cross-validation techniques, we now try to come up with the best hyperparameters for this model.

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator()
paramGrid = (ParamGridBuilder()
  .addGrid(rf.maxDepth, [5, 10, 20, 25, 30])
  .addGrid(rf.maxBins, [20, 30, 40])
  .addGrid(rf.numTrees, [5, 20, 50])
  .build())

In [None]:
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid,
                    evaluator=evaluator, numFolds=5)
cv_model = cv.fit(train)

# Best Model

In [None]:
best_rf_model = cv_model.bestModel
model_pred = best_rf_model.transform(test)

true_pos = model_pred.filter(model_pred['label']==1).filter(
    model_pred['prediction']==1).count()
actual_pos = model_pred.filter(model_pred['label']==1).count()
pred_pos = model_pred.filter(model_pred['prediction']==1).count()
recall_rate = float(true_pos)/(actual_pos)
print(recall_rate)

As you can see from the preceding, with the random forest model with best hyperparameters, the recall rate has improved, compared to the baseline method (logistic regression).

# Conclusion

In this chapter, some transformation techniques using PySpark and ways to compute summary statistics were reviewed. You saw how to build a machine learning model from scratch and how to tune hyperparameters, to choose the best parameters for a model.