
## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier, LogisticRegression, GBTClassifier, RandomForestClassifier
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator

from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from time import time

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.storagelevel import StorageLevel

In [0]:
IS_SPARK_SUBMIT_CLI = True
if IS_SPARK_SUBMIT_CLI:
    sc = SparkContext.getOrCreate()
    spark = SparkSession(sc)

In [0]:
# Limit the log
spark.sparkContext.setLogLevel("WARN")

In [0]:
# File location and type
file_location = "/FileStore/tables/loan_sample5000.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

id,member_id,loan_amnt,funded_amnt,funded_amnt_inv,term,int_rate,installment,grade,sub_grade,emp_title,emp_length,home_ownership,annual_inc,verification_status,issue_d,loan_status,pymnt_plan,url,desc,purpose,title,zip_code,addr_state,dti,delinq_2yrs,earliest_cr_line,fico_range_low,fico_range_high,inq_last_6mths,mths_since_last_delinq,mths_since_last_record,open_acc,pub_rec,revol_bal,revol_util,total_acc,initial_list_status,out_prncp,out_prncp_inv,total_pymnt,total_pymnt_inv,total_rec_prncp,total_rec_int,total_rec_late_fee,recoveries,collection_recovery_fee,last_pymnt_d,last_pymnt_amnt,next_pymnt_d,last_credit_pull_d,last_fico_range_high,last_fico_range_low,collections_12_mths_ex_med,mths_since_last_major_derog,policy_code,application_type,annual_inc_joint,dti_joint,verification_status_joint,acc_now_delinq,tot_coll_amt,tot_cur_bal,open_acc_6m,open_act_il,open_il_12m,open_il_24m,mths_since_rcnt_il,total_bal_il,il_util,open_rv_12m,open_rv_24m,max_bal_bc,all_util,total_rev_hi_lim,inq_fi,total_cu_tl,inq_last_12m,acc_open_past_24mths,avg_cur_bal,bc_open_to_buy,bc_util,chargeoff_within_12_mths,delinq_amnt,mo_sin_old_il_acct,mo_sin_old_rev_tl_op,mo_sin_rcnt_rev_tl_op,mo_sin_rcnt_tl,mort_acc,mths_since_recent_bc,mths_since_recent_bc_dlq,mths_since_recent_inq,mths_since_recent_revol_delinq,num_accts_ever_120_pd,num_actv_bc_tl,num_actv_rev_tl,num_bc_sats,num_bc_tl,num_il_tl,num_op_rev_tl,num_rev_accts,num_rev_tl_bal_gt_0,num_sats,num_tl_120dpd_2m,num_tl_30dpd,num_tl_90g_dpd_24m,num_tl_op_past_12m,pct_tl_nvr_dlq,percent_bc_gt_75,pub_rec_bankruptcies,tax_liens,tot_hi_cred_lim,total_bal_ex_mort,total_bc_limit,total_il_high_credit_limit,revol_bal_joint,sec_app_fico_range_low,sec_app_fico_range_high,sec_app_earliest_cr_line,sec_app_inq_last_6mths,sec_app_mort_acc,sec_app_open_acc,sec_app_revol_util,sec_app_open_act_il,sec_app_num_rev_accts,sec_app_chargeoff_within_12_mths,sec_app_collections_12_mths_ex_med,sec_app_mths_since_last_major_derog,hardship_flag,hardship_type,hardship_reason,hardship_status,deferral_term,hardship_amount,hardship_start_date,hardship_end_date,payment_plan_start_date,hardship_length,hardship_dpd,hardship_loan_status,orig_projected_additional_accrued_interest,hardship_payoff_balance_amount,hardship_last_payment_amount,disbursement_method,debt_settlement_flag,debt_settlement_flag_date,settlement_status,settlement_date,settlement_amount,settlement_percentage,settlement_term
68407277,,3600.0,3600.0,3600.0,36 months,13.99,123.03,C,C4,leadman,10+ years,MORTGAGE,55000.0,Not Verified,Dec-2015,Fully Paid,n,https://lendingclub.com/browse/loanDetail.action?loan_id=68407277,,debt_consolidation,Debt consolidation,190xx,PA,5.91,0.0,Aug-2003,675.0,679.0,1.0,30.0,,7.0,0.0,2765.0,29.7,13.0,w,0.0,0.0,4421.723916800001,4421.72,3600.0,821.72,0.0,0.0,0.0,Jan-2019,122.67,,Mar-2019,564.0,560.0,0.0,30.0,1.0,Individual,,,,0.0,722.0,144904.0,2.0,2.0,0.0,1.0,21.0,4981.0,36.0,3.0,3.0,722.0,34.0,9300.0,3.0,1.0,4.0,4.0,20701.0,1506.0,37.2,0.0,0.0,148.0,128.0,3.0,3.0,1.0,4.0,69.0,4.0,69.0,2.0,2.0,4.0,2.0,5.0,3.0,4.0,9.0,4.0,7.0,0.0,0.0,0.0,3.0,76.9,0.0,0.0,0.0,178050.0,7746.0,2400.0,13734.0,,,,,,,,,,,,,,N,,,,,,,,,,,,,,,Cash,N,,,,,,
68355089,,24700.0,24700.0,24700.0,36 months,11.99,820.28,C,C1,Engineer,10+ years,MORTGAGE,65000.0,Not Verified,Dec-2015,Fully Paid,n,https://lendingclub.com/browse/loanDetail.action?loan_id=68355089,,small_business,Business,577xx,SD,16.06,1.0,Dec-1999,715.0,719.0,4.0,6.0,,22.0,0.0,21470.0,19.2,38.0,w,0.0,0.0,25679.66,25679.66,24700.0,979.66,0.0,0.0,0.0,Jun-2016,926.35,,Mar-2019,699.0,695.0,0.0,,1.0,Individual,,,,0.0,0.0,204396.0,1.0,1.0,0.0,1.0,19.0,18005.0,73.0,2.0,3.0,6472.0,29.0,111800.0,0.0,0.0,6.0,4.0,9733.0,57830.0,27.1,0.0,0.0,113.0,192.0,2.0,2.0,4.0,2.0,,0.0,6.0,0.0,5.0,5.0,13.0,17.0,6.0,20.0,27.0,5.0,22.0,0.0,0.0,0.0,2.0,97.4,7.7,0.0,0.0,314017.0,39475.0,79300.0,24667.0,,,,,,,,,,,,,,N,,,,,,,,,,,,,,,Cash,N,,,,,,
68341763,,20000.0,20000.0,20000.0,60 months,10.78,432.66,B,B4,truck driver,10+ years,MORTGAGE,63000.0,Not Verified,Dec-2015,Fully Paid,n,https://lendingclub.com/browse/loanDetail.action?loan_id=68341763,,home_improvement,,605xx,IL,10.78,0.0,Aug-2000,695.0,699.0,0.0,,,6.0,0.0,7869.0,56.2,18.0,w,0.0,0.0,22705.9242938784,22705.92,20000.0,2705.92,0.0,0.0,0.0,Jun-2017,15813.3,,Mar-2019,704.0,700.0,0.0,,1.0,Joint App,71000.0,13.85,Not Verified,0.0,0.0,189699.0,0.0,1.0,0.0,4.0,19.0,10827.0,73.0,0.0,2.0,2081.0,65.0,14000.0,2.0,5.0,1.0,6.0,31617.0,2737.0,55.9,0.0,0.0,125.0,184.0,14.0,14.0,5.0,101.0,,10.0,,0.0,2.0,3.0,2.0,4.0,6.0,4.0,7.0,3.0,6.0,0.0,0.0,0.0,0.0,100.0,50.0,0.0,0.0,218418.0,18696.0,6200.0,14877.0,,,,,,,,,,,,,,N,,,,,,,,,,,,,,,Cash,N,,,,,,
66310712,,35000.0,35000.0,35000.0,60 months,14.85,829.9,C,C5,Information Systems Officer,10+ years,MORTGAGE,110000.0,Source Verified,Dec-2015,Current,n,https://lendingclub.com/browse/loanDetail.action?loan_id=66310712,,debt_consolidation,Debt consolidation,076xx,NJ,17.06,0.0,Sep-2008,785.0,789.0,0.0,,,13.0,0.0,7802.0,11.6,17.0,w,15897.65,15897.65,31464.01,31464.01,19102.35,12361.66,0.0,0.0,0.0,Feb-2019,829.9,Apr-2019,Mar-2019,679.0,675.0,0.0,,1.0,Individual,,,,0.0,0.0,301500.0,1.0,1.0,0.0,1.0,23.0,12609.0,70.0,1.0,1.0,6987.0,45.0,67300.0,0.0,1.0,0.0,2.0,23192.0,54962.0,12.1,0.0,0.0,36.0,87.0,2.0,2.0,1.0,2.0,,,,0.0,4.0,5.0,8.0,10.0,2.0,10.0,13.0,5.0,13.0,0.0,0.0,0.0,1.0,100.0,0.0,0.0,0.0,381215.0,52226.0,62500.0,18000.0,,,,,,,,,,,,,,N,,,,,,,,,,,,,,,Cash,N,,,,,,
68476807,,10400.0,10400.0,10400.0,60 months,22.45,289.91,F,F1,Contract Specialist,3 years,MORTGAGE,104433.0,Source Verified,Dec-2015,Fully Paid,n,https://lendingclub.com/browse/loanDetail.action?loan_id=68476807,,major_purchase,Major purchase,174xx,PA,25.37,1.0,Jun-1998,695.0,699.0,3.0,12.0,,12.0,0.0,21929.0,64.5,35.0,w,0.0,0.0,11740.5,11740.5,10400.0,1340.5,0.0,0.0,0.0,Jul-2016,10128.96,,Mar-2018,704.0,700.0,0.0,,1.0,Individual,,,,0.0,0.0,331730.0,1.0,3.0,0.0,3.0,14.0,73839.0,84.0,4.0,7.0,9702.0,78.0,34000.0,2.0,1.0,3.0,10.0,27644.0,4567.0,77.5,0.0,0.0,128.0,210.0,4.0,4.0,6.0,4.0,12.0,1.0,12.0,0.0,4.0,6.0,5.0,9.0,10.0,7.0,19.0,6.0,12.0,0.0,0.0,0.0,4.0,96.6,60.0,0.0,0.0,439570.0,95768.0,20300.0,88097.0,,,,,,,,,,,,,,N,,,,,,,,,,,,,,,Cash,N,,,,,,
68426831,,11950.0,11950.0,11950.0,36 months,13.44,405.18,C,C3,Veterinary Tecnician,4 years,RENT,34000.0,Source Verified,Dec-2015,Fully Paid,n,https://lendingclub.com/browse/loanDetail.action?loan_id=68426831,,debt_consolidation,Debt consolidation,300xx,GA,10.2,0.0,Oct-1987,690.0,694.0,0.0,,,5.0,0.0,8822.0,68.4,6.0,w,0.0,0.0,13708.9485297572,13708.95,11950.0,1758.95,0.0,0.0,0.0,May-2017,7653.56,,May-2017,759.0,755.0,0.0,,1.0,Individual,,,,0.0,0.0,12798.0,0.0,1.0,0.0,0.0,338.0,3976.0,99.0,0.0,0.0,4522.0,76.0,12900.0,0.0,0.0,0.0,0.0,2560.0,844.0,91.0,0.0,0.0,338.0,54.0,32.0,32.0,0.0,36.0,,,,0.0,2.0,3.0,2.0,2.0,2.0,4.0,4.0,3.0,5.0,0.0,0.0,0.0,0.0,100.0,100.0,0.0,0.0,16900.0,12798.0,9400.0,4000.0,,,,,,,,,,,,,,N,,,,,,,,,,,,,,,Cash,N,,,,,,
68476668,,20000.0,20000.0,20000.0,36 months,9.17,637.58,B,B2,Vice President of Recruiting Operations,10+ years,MORTGAGE,180000.0,Not Verified,Dec-2015,Fully Paid,n,https://lendingclub.com/browse/loanDetail.action?loan_id=68476668,,debt_consolidation,Debt consolidation,550xx,MN,14.67,0.0,Jun-1990,680.0,684.0,0.0,49.0,,12.0,0.0,87329.0,84.5,27.0,f,0.0,0.0,21393.800000011,21393.8,20000.0,1393.8,0.0,0.0,0.0,Nov-2016,15681.05,,Mar-2019,654.0,650.0,0.0,,1.0,Individual,,,,0.0,0.0,360358.0,0.0,2.0,0.0,2.0,18.0,29433.0,63.0,2.0,3.0,13048.0,74.0,94200.0,1.0,0.0,1.0,6.0,30030.0,0.0,102.9,0.0,0.0,142.0,306.0,10.0,10.0,4.0,12.0,,10.0,,0.0,4.0,6.0,4.0,5.0,7.0,9.0,16.0,6.0,12.0,0.0,0.0,0.0,2.0,96.3,100.0,0.0,0.0,388852.0,116762.0,31500.0,46452.0,,,,,,,,,,,,,,N,,,,,,,,,,,,,,,Cash,N,,,,,,
67275481,,20000.0,20000.0,20000.0,36 months,8.49,631.26,B,B1,road driver,10+ years,MORTGAGE,85000.0,Not Verified,Dec-2015,Fully Paid,n,https://lendingclub.com/browse/loanDetail.action?loan_id=67275481,,major_purchase,Major purchase,293xx,SC,17.61,1.0,Feb-1999,705.0,709.0,0.0,3.0,,8.0,0.0,826.0,5.7,15.0,w,0.0,0.0,21538.508976797,21538.51,20000.0,1538.51,0.0,0.0,0.0,Jan-2017,14618.23,,Mar-2019,674.0,670.0,0.0,3.0,1.0,Individual,,,,0.0,0.0,141601.0,0.0,3.0,0.0,4.0,13.0,27111.0,75.0,0.0,0.0,640.0,55.0,14500.0,1.0,0.0,2.0,4.0,17700.0,13674.0,5.7,0.0,0.0,149.0,55.0,32.0,13.0,3.0,32.0,,8.0,,1.0,2.0,2.0,3.0,3.0,9.0,3.0,3.0,2.0,8.0,0.0,0.0,1.0,0.0,93.3,0.0,0.0,0.0,193390.0,27937.0,14500.0,36144.0,,,,,,,,,,,,,,N,,,,,,,,,,,,,,,Cash,N,,,,,,
68466926,,10000.0,10000.0,10000.0,36 months,6.49,306.45,A,A2,SERVICE MANAGER,6 years,RENT,85000.0,Not Verified,Dec-2015,Fully Paid,n,https://lendingclub.com/browse/loanDetail.action?loan_id=68466926,,credit_card,Credit card refinancing,160xx,PA,13.07,0.0,Apr-2002,685.0,689.0,1.0,,106.0,14.0,1.0,10464.0,34.5,23.0,w,0.0,0.0,10998.9715749644,10998.97,10000.0,998.97,0.0,0.0,0.0,Aug-2018,1814.48,,Mar-2019,719.0,715.0,0.0,,1.0,Individual,,,,0.0,8341.0,27957.0,2.0,1.0,0.0,0.0,35.0,17493.0,57.0,2.0,7.0,2524.0,46.0,30300.0,2.0,0.0,1.0,7.0,1997.0,8182.0,50.1,0.0,0.0,164.0,129.0,1.0,1.0,1.0,4.0,,1.0,,0.0,6.0,9.0,7.0,10.0,3.0,13.0,19.0,9.0,14.0,0.0,0.0,0.0,2.0,95.7,28.6,1.0,0.0,61099.0,27957.0,16400.0,30799.0,,,,,,,,,,,,,,N,,,,,,,,,,,,,,,Cash,N,,,,,,
68616873,,8000.0,8000.0,8000.0,36 months,11.48,263.74,B,B5,Vendor liaison,10+ years,MORTGAGE,42000.0,Not Verified,Dec-2015,Fully Paid,n,https://lendingclub.com/browse/loanDetail.action?loan_id=68616873,,credit_card,Credit card refinancing,029xx,RI,34.8,0.0,Nov-1994,700.0,704.0,0.0,75.0,,8.0,0.0,7034.0,39.1,18.0,w,0.0,0.0,8939.5805031401,8939.58,8000.0,939.58,0.0,0.0,0.0,Apr-2017,4996.24,,Nov-2018,679.0,675.0,0.0,75.0,1.0,Individual,,,,0.0,0.0,199696.0,0.0,2.0,2.0,3.0,10.0,106748.0,72.0,0.0,2.0,4725.0,49.0,18000.0,0.0,0.0,1.0,5.0,28528.0,9966.0,41.4,0.0,0.0,155.0,253.0,15.0,10.0,1.0,50.0,,10.0,,1.0,3.0,3.0,3.0,6.0,5.0,5.0,11.0,3.0,8.0,0.0,0.0,0.0,2.0,94.4,33.3,0.0,0.0,256513.0,113782.0,17000.0,135513.0,,,,,,,,,,,,,,N,,,,,,,,,,,,,,,Cash,N,,,,,,


In [0]:
df = spark.read.csv('/user/sbelurm/loan_sample1000.csv', inferSchema=True, header=True)

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-3585998245785486>:1[0m
[0;32m----> 1[0m df [38;5;241m=[39m [43mspark[49m[38;5;241;43m.[39;49m[43mread[49m[38;5;241;43m.[39;49m[43mcsv[49m[43m([49m[38;5;124;43m'[39;49m[38;5;124;43m/user/sbelurm/loan_sample1000.csv[39;49m[38;5;124;43m'[39;49m[43m,[49m[43m [49m[43minferSchema[49m[38;5;241;43m=[39;49m[38;5;28;43;01mTrue[39;49;00m[43m,[49m[43m [49m[43mheader[49m[38;5;241;43m=[39;49m[38;5;28;43;01mTrue[39;49;00m[43m)[49m

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=[39m [43mfunc[49m[43m([

In [0]:
print('count:', df.count())

count: 5000


In [0]:
df = df.select("loan_amnt", "int_rate", "installment", "annual_inc", "fico_range_low", "fico_range_high", "grade", "loan_status")
df.show(5)

+---------+--------+-----------+----------+--------------+---------------+-----+-----------+
|loan_amnt|int_rate|installment|annual_inc|fico_range_low|fico_range_high|grade|loan_status|
+---------+--------+-----------+----------+--------------+---------------+-----+-----------+
|   3600.0|   13.99|     123.03|   55000.0|         675.0|          679.0|    C| Fully Paid|
|  24700.0|   11.99|     820.28|   65000.0|         715.0|          719.0|    C| Fully Paid|
|  20000.0|   10.78|     432.66|   63000.0|         695.0|          699.0|    B| Fully Paid|
|  35000.0|   14.85|      829.9|  110000.0|         785.0|          789.0|    C|    Current|
|  10400.0|   22.45|     289.91|  104433.0|         695.0|          699.0|    F| Fully Paid|
+---------+--------+-----------+----------+--------------+---------------+-----+-----------+
only showing top 5 rows



In [0]:
df = df.withColumn("annual_inc", col("annual_inc").cast("double"))
df = df.withColumn("fico_range_low", col("fico_range_low").cast("double"))
df = df.withColumn("fico_range_high", col("fico_range_high").cast("double"))

df.printSchema()

root
 |-- loan_amnt: double (nullable = true)
 |-- int_rate: double (nullable = true)
 |-- installment: double (nullable = true)
 |-- annual_inc: double (nullable = true)
 |-- fico_range_low: double (nullable = true)
 |-- fico_range_high: double (nullable = true)
 |-- grade: string (nullable = true)
 |-- loan_status: string (nullable = true)



In [0]:
grade_mapping = {"A": 1, "B": 2, "C": 3, "D": 4, "E": 5, "F": 6, "G": 7}

df = df.withColumn("grade_numeric", when(df["grade"] == "A", 1)
                                    .when(df["grade"] == "B", 2)
                                    .when(df["grade"] == "C", 3)
                                    .when(df["grade"] == "D", 4)
                                    .when(df["grade"] == "E", 5)
                                    .when(df["grade"] == "F", 6)
                                    .when(df["grade"] == "G", 7)
                                    .otherwise(8))

df = df.withColumn("grade_numeric", df["grade_numeric"].cast(IntegerType()))

In [0]:
loan_amnt = df.agg({'loan_amnt': 'mean'}).collect()[0][0]
df = df.fillna(loan_amnt, subset = ["loan_amnt"])

int_rate = df.agg({'int_rate': 'mean'}).collect()[0][0]
df = df.fillna(int_rate, subset = ["int_rate"])

installment = df.agg({'installment': 'mean'}).collect()[0][0]
df = df.fillna(installment, subset = ["installment"])

annual_inc = df.agg({'annual_inc': 'mean'}).collect()[0][0]
df = df.fillna(annual_inc, subset = ["annual_inc"])

fico_range_low = df.agg({'fico_range_low': 'mean'}).collect()[0][0]
df = df.fillna(fico_range_low, subset = ["fico_range_low"])

fico_range_high = df.agg({'fico_range_high': 'mean'}).collect()[0][0]
df = df.fillna(fico_range_high, subset = ["fico_range_high"])

df.show(5)

+---------+--------+-----------+----------+--------------+---------------+-----+-----------+-------------+
|loan_amnt|int_rate|installment|annual_inc|fico_range_low|fico_range_high|grade|loan_status|grade_numeric|
+---------+--------+-----------+----------+--------------+---------------+-----+-----------+-------------+
|   3600.0|   13.99|     123.03|   55000.0|         675.0|          679.0|    C| Fully Paid|            3|
|  24700.0|   11.99|     820.28|   65000.0|         715.0|          719.0|    C| Fully Paid|            3|
|  20000.0|   10.78|     432.66|   63000.0|         695.0|          699.0|    B| Fully Paid|            2|
|  35000.0|   14.85|      829.9|  110000.0|         785.0|          789.0|    C|    Current|            3|
|  10400.0|   22.45|     289.91|  104433.0|         695.0|          699.0|    F| Fully Paid|            6|
+---------+--------+-----------+----------+--------------+---------------+-----+-----------+-------------+
only showing top 5 rows



In [0]:
df = df.na.replace("nan", None)
df = df.dropna()

In [0]:
from pyspark.sql.functions import when

df = df.withColumn("loan_status",
                   when(df["loan_status"].isin("Fully Paid", "Current"), "0")
                   .otherwise("1"))

In [0]:
df.show(5)

+---------+--------+-----------+----------+--------------+---------------+-----+-----------+-------------+
|loan_amnt|int_rate|installment|annual_inc|fico_range_low|fico_range_high|grade|loan_status|grade_numeric|
+---------+--------+-----------+----------+--------------+---------------+-----+-----------+-------------+
|   3600.0|   13.99|     123.03|   55000.0|         675.0|          679.0|    C|          0|            3|
|  24700.0|   11.99|     820.28|   65000.0|         715.0|          719.0|    C|          0|            3|
|  20000.0|   10.78|     432.66|   63000.0|         695.0|          699.0|    B|          0|            2|
|  35000.0|   14.85|      829.9|  110000.0|         785.0|          789.0|    C|          0|            3|
|  10400.0|   22.45|     289.91|  104433.0|         695.0|          699.0|    F|          0|            6|
+---------+--------+-----------+----------+--------------+---------------+-----+-----------+-------------+
only showing top 5 rows



In [0]:
df_0 = df.filter(col("loan_status") == 0)
df_1 = df.filter(col("loan_status") == 1)

count_1 = df_1.count()
df_0_sampled = df_0.sample(False, count_1 / df_0.count())

df = df_0_sampled.union(df_1)

In [0]:
num_zeros = df.filter(df.loan_status == 0).count()

# Print the result
print("Number of zeros in loan_status column:", num_zeros)

Number of zeros in loan_status column: 849


In [0]:
print(df.dtypes)

[('loan_amnt', 'double'), ('int_rate', 'double'), ('installment', 'double'), ('annual_inc', 'double'), ('fico_range_low', 'double'), ('fico_range_high', 'double'), ('grade', 'string'), ('loan_status', 'string'), ('grade_numeric', 'int')]


In [0]:
print('count:', df.count())

count: 1698


In [0]:
splits = df.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1]
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 1183  Testing Rows: 515


In [0]:
#numVect = VectorAssembler(inputCols = ["loan_amnt", "int_rate", "installment", "annual_inc", "fico_range_low", "fico_range_high"], outputCol="numFeatures")
numVect = VectorAssembler(inputCols = ["loan_amnt", "int_rate", "installment", "annual_inc", "fico_range_low", "fico_range_high", "grade_numeric"], outputCol="numFeatures")
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="normFeatures")

#grade_stringIdx = StringIndexer(inputCol = "grade", outputCol = "grade_index", stringOrderType= "alphabetAsc")

#featVect = VectorAssembler(inputCols=["numFeatures", "grade_index"], outputCol="features")
featVect = VectorAssembler(inputCols=["numFeatures"], outputCol="features")

label_stringIdx = StringIndexer(inputCol = "loan_status", outputCol = "label", stringOrderType= "alphabetAsc")

#### Logistic Regression

In [0]:
lr = LogisticRegression(labelCol="label",featuresCol="features")
#pipeline_lr = Pipeline(stages=[numVect, minMax, grade_stringIdx, featVect, label_stringIdx, lr])
pipeline_lr = Pipeline(stages=[numVect, minMax, featVect, label_stringIdx, lr])


In [0]:
paramGrid_lr = (ParamGridBuilder() \
             .addGrid(lr.regParam, [0.01, 0.5]) \
             .addGrid(lr.elasticNetParam, [0.0, 0.5]) \
             .addGrid(lr.maxIter, [5, 15]) \
             .build())

In [0]:
start = time()

#cv_lr = CrossValidator(estimator=pipeline_lr, evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid_lr)
#model_lr = cv_lr.fit(train)
tv_lr = TrainValidationSplit(estimator=pipeline_lr, evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid_lr, trainRatio=0.7)
model_lr = tv_lr.fit(train)

end = time()
phrase = 'Logistic Regression testing'
print('{} takes {} seconds'.format(phrase, (end - start))) #round(end - start, 2)))

Logistic Regression testing takes 74.62507653236389 seconds


In [0]:
predictions_lr = model_lr.transform(test)

evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
auc = evaluator.evaluate(predictions_lr)

print("Logistic Regression")
print("AUC:", auc)

Logistic Regression
AUC: 0.6841025641025642


In [0]:
predictionAndLabels_lr = predictions_lr.select("prediction", "label").rdd
metrics_lr = MulticlassMetrics(predictionAndLabels_lr)
confusion_matrix_lr = metrics_lr.confusionMatrix().toArray()
print("Logistic Regression_Confusion matrix:")
print(confusion_matrix_lr)



Logistic Regression_Confusion matrix:
[[165.  95.]
 [ 98. 157.]]


In [0]:
predicted = predictions_lr.select("features", "prediction", "label")
#predicted.show(100, truncate=False)

In [0]:
tp = float(predicted.filter("prediction == 1.0 AND label == 1").count())
fp = float(predicted.filter("prediction == 1.0 AND label == 0").count())
tn = float(predicted.filter("prediction == 0.0 AND label == 0").count())
fn = float(predicted.filter("prediction == 0.0 AND label == 1").count())
metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Accuracy", (tp + tn) / (tp + fp + tn + fn)),
 ("Precision", tp / (tp + fp)),
 ("Recall", tp / (tp + fn))],["metric", "value"])

metrics.show() 

+---------+-----------------+
|   metric|            value|
+---------+-----------------+
|       TP|            157.0|
|       FP|             95.0|
|       TN|            165.0|
|       FN|             98.0|
| Accuracy|0.625242718446602|
|Precision|0.623015873015873|
|   Recall|0.615686274509804|
+---------+-----------------+



#### Random Forest

In [0]:
rf = RandomForestClassifier()
pipeline_rf = Pipeline(stages=[numVect, minMax, featVect, label_stringIdx, rf])

In [0]:
paramGrid_rf = (ParamGridBuilder() \
             .addGrid(rf.maxDepth, [5, 15]) \
             .addGrid(rf.maxBins, [10, 20]) \
             .addGrid(rf.numTrees, [25, 50]) \
             .build())

In [0]:
start = time()

#cv_rf = CrossValidator(estimator=pipeline_rf, evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid_rf)
#model_rf = cv_rf.fit(train)
tv_rf = TrainValidationSplit(estimator=pipeline_rf, evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid_rf, trainRatio=0.7)
model_rf = tv_rf.fit(train)

end = time()
phrase = 'Random Forest testing'
print('{} takes {} seconds'.format(phrase, (end - start))) #round(end - start, 2)))

Random Forest testing takes 114.02939939498901 seconds


In [0]:
predictions_rf = model_rf.transform(test)

evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
auc = evaluator.evaluate(predictions_rf)

print("Random Forest")
print("AUC:", auc)

Random Forest
AUC: 0.6835746606334843


In [0]:
predictionAndLabels_rf = predictions_rf.select("prediction", "label").rdd
metrics_rf = MulticlassMetrics(predictionAndLabels_rf)
confusion_matrix_rf = metrics_rf.confusionMatrix().toArray()
print("Random Forest_Confusion matrix:")
print(confusion_matrix_rf)



Random Forest_Confusion matrix:
[[144. 116.]
 [ 80. 175.]]


In [0]:
predicted = predictions_rf.select("features", "prediction", "label")
#predicted.show(100, truncate=False)

In [0]:
tp = float(predicted.filter("prediction == 1.0 AND label == 1").count())
fp = float(predicted.filter("prediction == 1.0 AND label == 0").count())
tn = float(predicted.filter("prediction == 0.0 AND label == 0").count())
fn = float(predicted.filter("prediction == 0.0 AND label == 1").count())
metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Accuracy", (tp + tn) / (tp + fp + tn + fn)),
 ("Precision", tp / (tp + fp)),
 ("Recall", tp / (tp + fn))],["metric", "value"])

metrics.show() 

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|             175.0|
|       FP|             116.0|
|       TN|             144.0|
|       FN|              80.0|
| Accuracy|0.6194174757281553|
|Precision|0.6013745704467354|
|   Recall|0.6862745098039216|
+---------+------------------+



#### Decision Tree

In [0]:
dt = DecisionTreeClassifier()
pipeline_dt = Pipeline(stages=[numVect, minMax, featVect, label_stringIdx, dt])

In [0]:
paramGrid_dt = (ParamGridBuilder()
             .addGrid(dt.impurity, ["gini", "entropy"])
             .addGrid(dt.maxDepth, [15, 25])
             .build())

In [0]:
start = time()

#cv_dt = CrossValidator(estimator=pipeline_dt, evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid_dt)
#model_dt = cv_dt.fit(train)
tv_dt = TrainValidationSplit(estimator=pipeline_dt, evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid_dt, trainRatio=0.7)
model_dt = tv_dt.fit(train)

end = time()
phrase = 'Decision Tree testing'
print('{} takes {} seconds'.format(phrase, (end - start))) #round(end - start, 2)))

Decision Tree testing takes 37.29272198677063 seconds


In [0]:
predictions_dt = model_dt.transform(test)

evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
auc = evaluator.evaluate(predictions_dt)

print("Decision Tree")
print("AUC:", auc)

Decision Tree
AUC: 0.5886349924585219


In [0]:
predictionAndLabels_dt = predictions_dt.select("prediction", "label").rdd
metrics_dt = MulticlassMetrics(predictionAndLabels_dt)
confusion_matrix_dt = metrics_dt.confusionMatrix().toArray()
print("Decision Tree_Confusion matrix:")
print(confusion_matrix_dt)

Decision Tree_Confusion matrix:
[[153. 107.]
 [113. 142.]]


In [0]:
predicted = predictions_dt.select("features", "prediction", "label")
#predicted.show(100, truncate=False)

In [0]:
tp = float(predicted.filter("prediction == 1.0 AND label == 1").count())
fp = float(predicted.filter("prediction == 1.0 AND label == 0").count())
tn = float(predicted.filter("prediction == 0.0 AND label == 0").count())
fn = float(predicted.filter("prediction == 0.0 AND label == 1").count())
metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Accuracy", (tp + tn) / (tp + fp + tn + fn)),
 ("Precision", tp / (tp + fp)),
 ("Recall", tp / (tp + fn))],["metric", "value"])

metrics.show() 

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|             142.0|
|       FP|             107.0|
|       TN|             153.0|
|       FN|             113.0|
| Accuracy|0.5728155339805825|
|Precision| 0.570281124497992|
|   Recall|0.5568627450980392|
+---------+------------------+



#### GBT

In [0]:
gbt = GBTClassifier()
pipeline_gbt = Pipeline(stages=[numVect, minMax, featVect, label_stringIdx, gbt])

In [0]:
paramGrid_gbt = (ParamGridBuilder()
             .addGrid(gbt.maxDepth, [5, 15])
             .addGrid(gbt.maxBins, [10, 20])
             .addGrid(gbt.maxIter, [15])
             .build())

In [0]:
start = time()

#cv_gbt = CrossValidator(estimator=pipeline_gbt, evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid_gbt)
#model_gbt = cv_gbt.fit(train)
tv_gbt = TrainValidationSplit(estimator=pipeline_gbt, evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid_gbt, trainRatio=0.7)
model_gbt = tv_gbt.fit(train)

end = time()
phrase = 'Gradient-Boosted Tree Classifier testing'
print('{} takes {} seconds'.format(phrase, (end - start))) #round(end - start, 2)))

Gradient-Boosted Tree Classifier testing takes 328.6698832511902 seconds


In [0]:
predictions_gbt = model_gbt.transform(test)

evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
auc = evaluator.evaluate(predictions_gbt)

print("GBT")
print("AUC:", auc)

GBT
AUC: 0.660739064856712


In [0]:
predictionAndLabels_gbt = predictions_gbt.select("prediction", "label").rdd
metrics_gbt = MulticlassMetrics(predictionAndLabels_gbt)
confusion_matrix_gbt = metrics_gbt.confusionMatrix().toArray()
print("GBT_Confusion matrix:")
print(confusion_matrix_gbt)

GBT_Confusion matrix:
[[151. 109.]
 [ 88. 167.]]


In [0]:
predicted = predictions_gbt.select("features", "prediction", "label")
#predicted.show(100, truncate=False)

In [0]:
tp = float(predicted.filter("prediction == 1.0 AND label == 1").count())
fp = float(predicted.filter("prediction == 1.0 AND label == 0").count())
tn = float(predicted.filter("prediction == 0.0 AND label == 0").count())
fn = float(predicted.filter("prediction == 0.0 AND label == 1").count())
metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Accuracy", (tp + tn) / (tp + fp + tn + fn)),
 ("Precision", tp / (tp + fp)),
 ("Recall", tp / (tp + fn))],["metric", "value"])

metrics.show() 

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|             167.0|
|       FP|             109.0|
|       TN|             151.0|
|       FN|              88.0|
| Accuracy|0.6174757281553398|
|Precision| 0.605072463768116|
|   Recall|0.6549019607843137|
+---------+------------------+

