### Logistic Regression:
    classification problem, we are trying to predict results in a
    discrete output. In other words, we are trying to map input variables into
    discrete categories.
    
    Bank Loan Prediction

# Change Working Directory in Python


In [1]:
import os
import sys
 
os.environ["SPARK_HOME"] = "/usr/hdp/current/spark2-client"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/local/anaconda/bin/python" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/local/anaconda/bin/python"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.4-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [2]:
#import Sparksession driver
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Classification of Bank Loan Prediction Dataset ") \
    .getOrCreate()

In [3]:
df = spark.read.csv('logistic/classificationspark.csv',inferSchema=True,header = True)
df.show()

+----+---+----------+------+-----+--------+--------+--------+--------+-----------+-----------+---------------+--------------------+------------+--------+------------+
| _c0|Age|Experience|Income|CCAvg|Mortgage|Family_2|Family_3|Family_4|Education_2|Education_3|Personal_Loan_1|Securities_Account_1|CD Account_1|Online_1|CreditCard_1|
+----+---+----------+------+-----+--------+--------+--------+--------+-----------+-----------+---------------+--------------------+------------+--------+------------+
|2764| 31|         5|    84|  2.9|     105|       0|       0|       0|          0|          1|              0|                   0|           0|       0|           1|
|4767| 35|         9|    45|  0.9|     101|       0|       1|       0|          0|          0|              0|                   1|           0|       0|           0|
|3814| 34|         9|    35|  1.3|       0|       0|       1|       0|          0|          0|              0|                   0|           0|       0|           0

In [4]:
#Check for missing values
for col in df.columns:
    print("no. of cells in column", col, "with null values:", df.filter(df[col].isNull()).count())

no. of cells in column _c0 with null values: 0
no. of cells in column Age with null values: 0
no. of cells in column Experience with null values: 0
no. of cells in column Income with null values: 0
no. of cells in column CCAvg with null values: 0
no. of cells in column Mortgage with null values: 0
no. of cells in column Family_2 with null values: 0
no. of cells in column Family_3 with null values: 0
no. of cells in column Family_4 with null values: 0
no. of cells in column Education_2 with null values: 0
no. of cells in column Education_3 with null values: 0
no. of cells in column Personal_Loan_1 with null values: 0
no. of cells in column Securities_Account_1 with null values: 0
no. of cells in column CD Account_1 with null values: 0
no. of cells in column Online_1 with null values: 0
no. of cells in column CreditCard_1 with null values: 0


In [5]:
#all the independent variables need to be packed into one column of vector type
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['Age','Experience','Income','CCAvg','Mortgage','Family_2','Family_3','Family_4','Education_2',
                                       'Education_3','Securities_Account_1','CD Account_1','Online_1','CreditCard_1'], 
                            outputCol="features")
feature_vec=assembler.transform(df).select('features','Personal_Loan_1')
feature_vec.show(5)

+--------------------+---------------+
|            features|Personal_Loan_1|
+--------------------+---------------+
|(14,[0,1,2,3,4,9,...|              0|
|(14,[0,1,2,3,4,6,...|              0|
|(14,[0,1,2,3,6],[...|              0|
|(14,[0,1,2,3,4,12...|              0|
|(14,[0,1,2,3,4,6,...|              0|
+--------------------+---------------+
only showing top 5 rows



In [6]:
#Count of target classes
feature_vec.groupBy('Personal_Loan_1').count().show()
#there is data imbalance

+---------------+-----+
|Personal_Loan_1|count|
+---------------+-----+
|              1|  480|
|              0| 4520|
+---------------+-----+



In [7]:
# Split the data into train and test sets
train_data, test_data = feature_vec.randomSplit([.75,.25],seed=0)

In [8]:
from pyspark.ml.classification import RandomForestClassifier
#Grid Search
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
rf = RandomForestClassifier( labelCol='Personal_Loan_1',seed=0)
paramGrid = (ParamGridBuilder()\
             .addGrid(rf.maxDepth,[10,11,12])\
             .addGrid(rf.numTrees,[20,30,40])\
             .build())

evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='Personal_Loan_1', metricName='f1')
# Create 4-fold CrossValidator
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=4)

cvModel = cv.fit(train_data)

In [9]:
#Best Model Params
score_params_list = list(zip(cvModel.avgMetrics, cvModel.getEstimatorParamMaps()))
max(score_params_list,key=lambda item:item[0])

(0.9838957430065822,
 {Param(parent='RandomForestClassifier_4bd98dd525c2f3495467', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 11,
  Param(parent='RandomForestClassifier_4bd98dd525c2f3495467', name='numTrees', doc='Number of trees to train (>= 1).'): 30})

In [10]:
predictions = cvModel.transform(test_data)
evaluator.evaluate(predictions) 

0.9806296571649721