## 1. Import Libraries

In [1]:
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import *

## 2. Load Data and Convert to Spark Data Frame

In [3]:
# All column names
sc.textFile("loan.csv").take(1)

[u'id,member_id,loan_amnt,funded_amnt,funded_amnt_inv,term,int_rate,installment,grade,sub_grade,emp_title,emp_length,home_ownership,annual_inc,verification_status,issue_d,loan_status,pymnt_plan,url,desc,purpose,title,zip_code,addr_state,dti,delinq_2yrs,earliest_cr_line,inq_last_6mths,mths_since_last_delinq,mths_since_last_record,open_acc,pub_rec,revol_bal,revol_util,total_acc,initial_list_status,out_prncp,out_prncp_inv,total_pymnt,total_pymnt_inv,total_rec_prncp,total_rec_int,total_rec_late_fee,recoveries,collection_recovery_fee,last_pymnt_d,last_pymnt_amnt,next_pymnt_d,last_credit_pull_d,collections_12_mths_ex_med,mths_since_last_major_derog,policy_code,application_type,annual_inc_joint,dti_joint,verification_status_joint,acc_now_delinq,tot_coll_amt,tot_cur_bal,open_acc_6m,open_il_6m,open_il_12m,open_il_24m,mths_since_rcnt_il,total_bal_il,il_util,open_rv_12m,open_rv_24m,max_bal_bc,all_util,total_rev_hi_lim,inq_fi,total_cu_tl,inq_last_12m']

In [4]:
# load data as dataframe
loan_df = spark.read.csv("loan.csv",header=True)
loan_df.rdd.getNumPartitions()
type(loan_df)

pyspark.sql.dataframe.DataFrame

Select useful columns to create smaller dataframe for future use

## 3. Create response variable and features

### 3.1 Remove some columns based on EDA results

In [5]:
loan_df1 = loan_df.drop('desc','mths_since_last_delinq','mths_since_last_record','next_pymnt_d',
                        'mths_since_last_major_derog','annual_inc_joint','dti_joint','verification_status_joint',
                        'open_acc_6m','open_il_6m','open_il_12m','open_il_24m','mths_since_rcnt_il','total_bal_il',
                        'il_util','open_rv_12m','open_rv_24m','max_bal_bc','all_util','inq_fi','total_cu_tl',
                        'inq_last_12m', # with a lot NA
                        'id','member_id','collection_recovery_fee','last_pymnt_amnt','last_pymnt_d','out_prncp','out_prncp_inv',
                        'pymnt_plan','recoveries','term','title','total_pymnt','total_pymnt_inv','total_rec_int',
                        'total_rec_late_fee','total_rec_prncp','url','verification_status', 'initial_list_status', 
                        'last_credit_pull_d','policy_code'# domain knowledge
                       )

In [6]:
loan_df1.head()

Row(loan_amnt=u'5000.0', funded_amnt=u'5000.0', funded_amnt_inv=u'4975.0', int_rate=u'10.65', installment=u'162.87', grade=u'B', sub_grade=u'B2', emp_title=None, emp_length=u'10+ years', home_ownership=u'RENT', annual_inc=u'24000.0', issue_d=u'Dec-2011', loan_status=u'Fully Paid', purpose=u'credit_card', zip_code=u'860xx', addr_state=u'AZ', dti=u'27.65', delinq_2yrs=u'0.0', earliest_cr_line=u'Jan-1985', inq_last_6mths=u'1.0', open_acc=u'3.0', pub_rec=u'0.0', revol_bal=u'13648.0', revol_util=u'83.7', total_acc=u'9.0', collections_12_mths_ex_med=u'0.0', application_type=u'INDIVIDUAL', acc_now_delinq=u'0.0', tot_coll_amt=None, tot_cur_bal=None, total_rev_hi_lim=None)

In [7]:
len(loan_df1.columns)

31

In [97]:
# Convert left columns into right format
loan_df2 = loan_df.select(
    loan_df.loan_amnt.cast("integer"),
    loan_df.funded_amnt.cast("integer"),
    loan_df.funded_amnt_inv.cast("integer"),
    loan_df.int_rate.cast("float"),
    loan_df.installment.cast("integer"),
    'grade',
    'sub_grade',
    'emp_title',
    'emp_length',
    'home_ownership',
    loan_df.annual_inc.cast("integer"),
    'issue_d',
    'loan_status', # response variable
    'purpose',
    'zip_code',
    'addr_state',
    loan_df.dti.cast("float"),
    loan_df.delinq_2yrs.cast("integer"),
    'earliest_cr_line',
    loan_df.inq_last_6mths.cast("integer"),
    loan_df.open_acc.cast("integer"),
    loan_df.pub_rec.cast("integer"),
    loan_df.revol_bal.cast("integer"),
    loan_df.revol_util.cast("float"),
    loan_df.total_acc.cast("integer"),
    loan_df.last_credit_pull_d.cast("integer"),
    'application_type',
    loan_df.acc_now_delinq.cast("integer"),
    loan_df.tot_coll_amt.cast("float"),
    loan_df.tot_cur_bal.cast("float"),
    loan_df.total_rev_hi_lim.cast("integer"),
    loan_df.collections_12_mths_ex_med.cast("integer"),
    'verification_status',
    'initial_list_status'
)


In [131]:
loan_df2.head()

Row(loan_amnt=5000, funded_amnt=5000, funded_amnt_inv=4975, int_rate=11, installment=163, grade=u'B', sub_grade=u'B2', emp_title=None, emp_length=u'10+ years', home_ownership=u'RENT', annual_inc=24000, issue_d=u'Dec-2011', loan_status=u'Fully Paid', purpose=u'credit_card', zip_code=u'860xx', addr_state=u'AZ', dti=27.649999618530273, delinq_2yrs=0, earliest_cr_line=u'Jan-1985', inq_last_6mths=1, open_acc=3, pub_rec=0, revol_bal=13648, revol_util=83.69999694824219, total_acc=9, last_credit_pull_d=None, application_type=u'INDIVIDUAL', acc_now_delinq=0, tot_coll_amt=None, tot_cur_bal=None, total_rev_hi_lim=None, collections_12_mths_ex_med=0, verification_status=u'Verified', initial_list_status=u'f')

### 3.2 Create response variable and remove rows with no valid response variable

In [98]:
def whetherpaid(x):
    if x in ['Default', 'Charged Off', 'Does not meet the credit policy. Status:Charged Off']:
        return 0
    elif x in ['Does not meet the credit policy. Status:Fully Paid', 'Fully Paid']:
        return 1
    else:
        return -1

In [99]:
paidflag = udf(lambda x: whetherpaid(x))

In [100]:
loan_df3 = loan_df2.withColumn('paid_flag',paidflag('loan_status')).where("paid_flag != -1").drop('loan_status')

### 3.3 Create features

#### 3.3.1 Creating a category feature for "Loan Purpose", "grade", "sub_grade", and "verification_status"

There are 14 types of loan purpose. StringIndexer encodes a string column of labels to a column of label indices, and most frequent label gets index 0.

In [133]:
indexer = StringIndexer(inputCol="purpose", outputCol="purposeIndex")
loan_df4 = indexer.fit(loan_df3).transform(loan_df3).drop('purpose')

convert_list = [      
               'grade',
               'sub_grade',
               'verification_status',
#                'zip_code',
#                'addr_state',
#                'initial_list_status'
                ]

for item in convert_list:
    indexer = StringIndexer(inputCol=item, outputCol=item + 'Index')    
    loan_df4 = indexer.fit(loan_df4).transform(loan_df4).drop(item)

####  3.3.2 Create a numeric feature for "emp_length"

In [192]:
import re
def convert_to_int(s):
    s = re.sub('\\D', '', s)
    
    try:
        return s
    except ValueError:
        return 'NaN'

def calculate_ratio(a, b):
    try:
        return a/float(b)
    except TypeError:
        return 'NaN'
    except ZeroDivisionError:
        return 'NaN'

def calculate_monthly_ratio(a, b):
    try:
        return a/(float(b)/12)
    except TypeError:
        return 'NaN'
    except ZeroDivisionError:
        return 'NaN'
    
emp_to_num = udf(convert_to_int)
ratio = udf(calculate_ratio)
monthly_ratio = udf(calculate_monthly_ratio)

loan_df5 = loan_df4.select('loan_amnt',
                           'int_rate',
                           'annual_inc',
                           'purposeIndex',
                           'installment',       
                           'gradeIndex',
                           'sub_gradeIndex',
                           'verification_statusIndex',
#                            'zip_code',
                           'dti',
                           'delinq_2yrs',
                           'inq_last_6mths',
                           'open_acc',
                           'pub_rec',
                           'revol_util',
                           'total_acc',
                           'acc_now_delinq',
#                            'addr_state',
#                            'initial_list_status',
                           'collections_12_mths_ex_med',
                           'tot_coll_amt',
                           'tot_cur_bal',
                           'total_rev_hi_lim',
                           ratio('loan_amnt','annual_inc').alias('loan_inc_ratio').cast('float'),
                           monthly_ratio('installment','annual_inc').alias('instal_inc_ratio').cast('float'),
                           emp_to_num('emp_length').alias('emp_length').cast('integer')
                           )

In [193]:
loan_df6 = loan_df5.fillna(0.0, ['tot_coll_amt','tot_cur_bal', 'total_rev_hi_lim'])

In [194]:
loan_df7 = loan_df6.dropna()

In [195]:
loan_df6.count()

256939

In [196]:
loan_df7.count()

246344

In [197]:
loan_df7.printSchema

<bound method DataFrame.printSchema of DataFrame[loan_amnt: int, int_rate: int, annual_inc: int, purposeIndex: double, installment: int, gradeIndex: double, sub_gradeIndex: double, verification_statusIndex: double, dti: float, delinq_2yrs: int, inq_last_6mths: int, open_acc: int, pub_rec: int, revol_util: float, total_acc: int, acc_now_delinq: int, collections_12_mths_ex_med: int, tot_coll_amt: float, tot_cur_bal: float, total_rev_hi_lim: int, loan_inc_ratio: float, instal_inc_ratio: float, emp_length: int]>