Using https://www.kaggle.com/wendykan/lending-club-loan-data to build a predictive model for dtermining whether a loan application will be charged off.

Our taget column is `loan_status`

In [1]:
import pyspark
from pyspark.sql import SQLContext
# from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.sql.functions import udf, col
# from pyspark.sql.types import ArrayType, DoubleType

In [2]:
sc = pyspark.SparkContext('local[*]')
sc.setCheckpointDir("data_source")
sqc = SQLContext(sc)

In [3]:
# Get data
df_raw = sqc.read.csv(
    "data_source/lending-club-loan-data/loan.csv.gz",
    header=True, inferSchema=True)
df_raw = df_raw.checkpoint(eager=True)
# Get shape
df_raw.count(), len(df_raw.columns)

(2260668, 145)

In [8]:
# Analyse the target column
df_t = df_raw.groupBy("loan_status").count().orderBy('count')
df_t.show()

+--------------------+-------+
|         loan_status|  count|
+--------------------+-------+
|            Oct-2015|      1|
|             Default|     31|
|Does not meet the...|    761|
|Does not meet the...|   1988|
|   Late (16-30 days)|   3737|
|     In Grace Period|   8952|
|  Late (31-120 days)|  21897|
|         Charged Off| 261654|
|             Current| 919695|
|          Fully Paid|1041952|
+--------------------+-------+



In [5]:
from pyspark.mllib.stat import Statistics
import pandas as pd

def corr(sdf_) -> pd.DataFrame:
    col_names = sdf_.columns
    features = sdf_.rdd.map(lambda row: row[0:])
    corr_mat=Statistics.corr(features, method="pearson")
    corr_df = pd.DataFrame(corr_mat)
    corr_df.index, corr_df.columns = col_names, col_names
    return corr_df
    

In [22]:
# Describe the data
df_desc = df_raw.describe()
df_desc_pd = df_desc.toPandas()
df_desc_pd

Unnamed: 0,summary,id,member_id,loan_amnt,funded_amnt,funded_amnt_inv,term,int_rate,installment,grade,...,hardship_payoff_balance_amount,hardship_last_payment_amount,disbursement_method,debt_settlement_flag,debt_settlement_flag_date,settlement_status,settlement_date,settlement_amount,settlement_percentage,settlement_term
0,count,0.0,0.0,1303606.0,1303606.0,1303606.0,1303606,1303606.0,1303606.0,1303606,...,5336,5336,1303380,1303435,32080,32065,32045,32024,32010,32006
1,mean,,,14416.808280262594,14408.1236777063,14385.198832128604,,13.257222941598576,438.08512518353245,,...,11028.726163074049,184.57801686972817,,,,,,5052.1445137391,47.66803963862599,13.091844071399544
2,stddev,,,8699.544879428344,8695.474688987893,8697.986737787332,,4.760545049907237,261.0632118093469,,...,7466.642417336538,195.41304080997548,,,,,,3688.425616386922,7.322990784794977,8.280573985586306
3,min,,,500.0,500.0,0.0,36 months,5.31,4.93,A,...,10008.88,0.01,Cash,Cash,Apr-2013,ACTIVE,Apr-2013,1000,0.2,0
4,max,,,40000.0,40000.0,40000.0,60 months,30.99,1719.83,G,...,N,N,DirectPay,Y,Sep-2018,N,Sep-2018,N,N,N


In [6]:
# Select only rows whose `loan_status` value is in ["Fully Paid", "Charged Off"]
df_raw = df_raw.filter((df_raw["loan_status"] == "Fully Paid") | (df_raw["loan_status"] == "Charged Off"))

In [7]:
cols_to_drop_1 = ["id", "member_id", "pymnt_plan", "url"]

In [8]:
selected_columns_1 = [x for x in df_raw.columns if x not in cols_to_drop_1]

In [9]:
def is_null_column(df_: pd.DataFrame, column: str, num_rows_: int, threshold: float = 0.8) -> bool:
    """If column values has more than the threshold null"""
    _df = df_.groupBy(column).count().orderBy('count')
    _df = _df.where(col(column).isNull()).select("count")
    try:
        frequency = float(_df.collect()[0][0])
        return (frequency / num_rows_) > threshold
    except IndexError:
        return False

In [10]:
from pyspark.sql.functions import countDistinct

# d = df_raw.agg(countDistinct("desc").alias('desc')).collect()

In [11]:
# Reduce dataframe

In [12]:
# Drop null value rows and duplicates row-wise
na_thresh = int(0.7 * len(selected_columns_1)) # 70% threshold
df_raw = df_raw.drop(*cols_to_drop_1).dropna(thresh=na_thresh).dropDuplicates()

In [13]:
# cols_to_drop_2 = [col for col in df_raw.columns if is_null_column(df_raw, col, num_rows, 0.7)]
cols_to_drop_2 = ['desc', 'mths_since_last_record', 'next_pymnt_d', 'mths_since_last_major_derog', 'annual_inc_joint', 'dti_joint', 'verification_status_joint', 'mths_since_recent_bc_dlq', 'revol_bal_joint', 'sec_app_earliest_cr_line', 'sec_app_inq_last_6mths', 'sec_app_mort_acc', 'sec_app_open_acc', 'sec_app_revol_util', 'sec_app_open_act_il', 'sec_app_num_rev_accts', 'sec_app_chargeoff_within_12_mths', 'sec_app_collections_12_mths_ex_med', 'sec_app_mths_since_last_major_derog', 'hardship_type', 'hardship_reason', 'hardship_status', 'deferral_term', 'hardship_amount', 'hardship_start_date', 'hardship_end_date', 'payment_plan_start_date', 'hardship_length', 'hardship_dpd', 'hardship_loan_status', 'orig_projected_additional_accrued_interest', 'hardship_payoff_balance_amount', 'hardship_last_payment_amount', 'debt_settlement_flag_date', 'settlement_status', 'settlement_date', 'settlement_amount', 'settlement_percentage', 'settlement_term']

In [14]:
df_raw = df_raw.drop(*cols_to_drop_2)
df_raw = df_raw.checkpoint(eager=True)

# Get shape
# num_rows = df_raw4.count()
len(df_raw.columns)

102

In [11]:
df_raw.limit(5).toPandas()

Unnamed: 0,loan_amnt,funded_amnt,funded_amnt_inv,term,int_rate,installment,grade,sub_grade,emp_title,emp_length,...,percent_bc_gt_75,pub_rec_bankruptcies,tax_liens,tot_hi_cred_lim,total_bal_ex_mort,total_bc_limit,total_il_high_credit_limit,hardship_flag,disbursement_method,debt_settlement_flag
0,6950,6950,6950.0,36 months,5.32,209.3,A,A1,,,...,12.5,0,0,408936,38049,71200,22436,N,Cash,N
1,9600,9600,9600.0,36 months,12.79,322.5,C,C1,,,...,9.1,0,0,175600,21313,81900,0,N,Cash,N
2,10425,10425,10425.0,60 months,25.29,307.77,E,E5,,,...,25.0,0,0,43434,11803,14500,0,N,Cash,N
3,27675,27675,27675.0,36 months,18.25,1004.0,D,D3,,,...,40.0,0,0,48844,28458,24600,15000,N,Cash,Y
4,2000,2000,2000.0,36 months,15.49,69.82,C,C4,,,...,33.3,0,0,651850,78053,53000,68087,N,Cash,N


In [15]:
cols_to_drop_3 = ["emp_title", "emp_length"]
df_raw = df_raw.drop(*cols_to_drop_3)
len(df_raw.columns)

100

In [16]:
feature_columns = df_raw.columns[:-1]
"loan_status" not in feature_columns

False

In [17]:
# See distinct columns
str_columns_ = [col[0] for col in df_raw.dtypes if col[1] == "string"]
non_str_columns = [col[0] for col in df_raw.dtypes if col[1] != "string"]

In [18]:
import pandas as pd

cols_ = df_raw.columns
df_pd_uniq1 = pd.DataFrame(
    index=str_columns_,
    columns=["num_distinct"],
    data=(df_raw.select(col).distinct().count() for col in str_columns_)
)

In [18]:
# unqs = df_pd_uniq1.sort_values("num_distinct", axis=0, ascending=True).to_dict()["num_distinct"]
unqs = {"term":2,"policy_code":2,"out_prncp_inv":2,"out_prncp":2,"loan_status":2,"hardship_flag":3,"application_type":3,"disbursement_method":3,"initial_list_status":3,"debt_settlement_flag":3,"verification_status":3,"home_ownership":5,"acc_now_delinq":6,"grade":7,"inq_last_6mths":8,"collections_12_mths_ex_med":13,"purpose":14,"open_il_12m":20,"open_acc_6m":20,"delinq_2yrs":25,"open_rv_12m":28,"inq_fi":32,"open_il_24m":32,"pub_rec":32,"sub_grade":35,"last_credit_pull_d":41,"last_pymnt_d":45,"inq_last_12m":45,"open_rv_24m":47,"open_act_il":51,"addr_state":51,"total_cu_tl":55,"title":66,"issue_d":74,"open_acc":78,"total_acc":132,"mths_since_last_delinq":147,"all_util":171,"il_util":232,"mths_since_rcnt_il":334,"earliest_cr_line":696,"zip_code":914,"revol_util":1176,"bc_util":1276,"total_rec_late_fee":6764,"dti":6859,"tot_coll_amt":8815,"total_rev_hi_lim":11635,"max_bal_bc":26954,"annual_inc":29736,"recoveries":51743,"collection_recovery_fee":54244,"avg_cur_bal":61117,"revol_bal":62052,"total_rec_prncp":90304,"total_bal_il":115314,"tot_cur_bal":258722,"total_rec_int":276554,"last_pymnt_amnt":345672,"total_pymnt_inv":439404,"total_pymnt":475538}
unqs

{'term': 2,
 'policy_code': 2,
 'out_prncp_inv': 2,
 'out_prncp': 2,
 'loan_status': 2,
 'hardship_flag': 3,
 'application_type': 3,
 'disbursement_method': 3,
 'initial_list_status': 3,
 'debt_settlement_flag': 3,
 'verification_status': 3,
 'home_ownership': 5,
 'acc_now_delinq': 6,
 'grade': 7,
 'inq_last_6mths': 8,
 'collections_12_mths_ex_med': 13,
 'purpose': 14,
 'open_il_12m': 20,
 'open_acc_6m': 20,
 'delinq_2yrs': 25,
 'open_rv_12m': 28,
 'inq_fi': 32,
 'open_il_24m': 32,
 'pub_rec': 32,
 'sub_grade': 35,
 'last_credit_pull_d': 41,
 'last_pymnt_d': 45,
 'inq_last_12m': 45,
 'open_rv_24m': 47,
 'open_act_il': 51,
 'addr_state': 51,
 'total_cu_tl': 55,
 'title': 66,
 'issue_d': 74,
 'open_acc': 78,
 'total_acc': 132,
 'mths_since_last_delinq': 147,
 'all_util': 171,
 'il_util': 232,
 'mths_since_rcnt_il': 334,
 'earliest_cr_line': 696,
 'zip_code': 914,
 'revol_util': 1176,
 'bc_util': 1276,
 'total_rec_late_fee': 6764,
 'dti': 6859,
 'tot_coll_amt': 8815,
 'total_rev_hi_lim': 

In [19]:
str_col_to_double = [k for k,v in unqs.items() if (int(v) > 1000)]

In [20]:
category_cols = [k for k,v in unqs.items() if (int(v) <= 10 )]

In [21]:
category_cols

['term',
 'policy_code',
 'out_prncp_inv',
 'out_prncp',
 'loan_status',
 'hardship_flag',
 'application_type',
 'disbursement_method',
 'initial_list_status',
 'debt_settlement_flag',
 'verification_status',
 'home_ownership',
 'acc_now_delinq',
 'grade',
 'inq_last_6mths']

In [22]:
df_train = df_raw.selectExpr(*non_str_columns, *category_cols, *["cast({col} as float) {col}".format(col=col) for col in str_col_to_double])
df_train = df_train.checkpoint(eager=True)

In [24]:
len(df_train.columns)

73

In [25]:
"loan_status" in category_cols

True

In [26]:
# from pyspark.sql.functions import when

# df_train2 = df_train.withColumn("label", \
#               when(df_train["loan_status"] == "Charged Off", 1).otherwise(0))

In [29]:
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

df_corr = df_train.select(*df_train.columns)
for col_ in category_cols:
    indexer_ = StringIndexer(inputCol=col_, outputCol="indexed{}".format(col_))
    indexer_.setHandleInvalid("skip")
    model_ = indexer_.fit(df_corr)
    df_corr = model_.transform(df_corr)

corr_columns = non_str_columns + str_col_to_double + ["indexed{}".format(col_) for col_ in category_cols]
df_corr = df_corr.select(*corr_columns)  
corred_df = corr(df_corr)
corred_df

Unnamed: 0,loan_amnt,funded_amnt,funded_amnt_inv,int_rate,installment,acc_open_past_24mths,bc_open_to_buy,chargeoff_within_12_mths,delinq_amnt,mo_sin_old_il_acct,...,indexedhardship_flag,indexedapplication_type,indexeddisbursement_method,indexedinitial_list_status,indexeddebt_settlement_flag,indexedverification_status,indexedhome_ownership,indexedacc_now_delinq,indexedgrade,indexedinq_last_6mths
loan_amnt,1.000000,1.000000,0.999993,0.153055,0.954716,0.013086,,-0.004334,0.001007,,...,-0.001020,0.107950,0.006530,-0.066785,0.005112,0.078608,-0.122259,-0.000809,0.116410,-0.011623
funded_amnt,1.000000,1.000000,0.999993,0.153055,0.954716,0.013086,,-0.004334,0.001007,,...,-0.001020,0.107950,0.006530,-0.066785,0.005112,0.078608,-0.122259,-0.000809,0.116410,-0.011623
funded_amnt_inv,0.999993,0.999993,1.000000,0.153071,0.954625,0.013038,,-0.004345,0.001000,,...,-0.001019,0.107956,0.006499,-0.067611,0.005053,0.078589,-0.122287,-0.000831,0.116314,-0.011685
int_rate,0.153055,0.153055,0.153071,1.000000,0.194181,0.201701,,0.008882,0.003152,,...,-0.002187,0.053495,0.010670,0.154123,0.068833,0.108396,0.053858,0.005909,0.539518,0.191889
installment,0.954716,0.954716,0.954625,0.194181,1.000000,0.033065,,-0.002219,0.002017,,...,-0.001045,0.095152,0.004520,0.003629,0.009841,0.086530,-0.098394,0.001750,0.159332,0.012099
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
indexedverification_status,0.078608,0.078608,0.078589,0.108396,0.086530,0.049892,,0.005364,0.008051,,...,-0.001516,0.063065,0.015337,0.017394,0.020613,1.000000,-0.024063,0.013290,0.072646,0.028876
indexedhome_ownership,-0.122259,-0.122259,-0.122287,0.053858,-0.098394,-0.048185,,-0.003110,0.000595,,...,-0.001283,-0.066372,0.005588,0.032240,0.021992,-0.024063,1.000000,-0.007412,0.008554,0.000009
indexedacc_now_delinq,-0.000809,-0.000809,-0.000831,0.005909,0.001750,-0.007716,,0.037271,0.180353,,...,-0.000099,-0.005686,-0.003885,0.001364,0.000796,0.013290,-0.007412,1.000000,-0.004600,-0.002920
indexedgrade,0.116410,0.116410,0.116314,0.539518,0.159332,0.084502,,-0.004734,-0.002251,,...,0.000532,0.016400,0.042829,0.116577,0.037265,0.072646,0.008554,-0.004600,1.000000,0.089317


In [30]:
corred_df = corr(df_corr)

In [49]:
import math
d = corred_df["indexedloan_status"].to_dict()
cols_to_drop_4 = [k.replace("indexed", "") for k,v in d.items() if math.isnan(v)]
cols_to_drop_4

['bc_open_to_buy',
 'mo_sin_old_il_acct',
 'mths_since_recent_bc',
 'mths_since_recent_inq',
 'mths_since_recent_revol_delinq',
 'num_tl_120dpd_2m',
 'pct_tl_nvr_dlq',
 'percent_bc_gt_75',
 'revol_util',
 'bc_util',
 'dti',
 'max_bal_bc',
 'avg_cur_bal',
 'total_bal_il',
 'policy_code',
 'out_prncp_inv',
 'out_prncp']

In [63]:
df_train3 = df_train.drop(*cols_to_drop_4)
final_features = [x for x in df_train3.columns if x != "loan_status"]
len(final_features)

55

In [55]:
"loan_status" in final_features

False

In [57]:
df_train3.dtypes

[('loan_amnt', 'int'),
 ('funded_amnt', 'int'),
 ('funded_amnt_inv', 'double'),
 ('int_rate', 'double'),
 ('installment', 'double'),
 ('acc_open_past_24mths', 'int'),
 ('chargeoff_within_12_mths', 'double'),
 ('delinq_amnt', 'int'),
 ('mo_sin_old_rev_tl_op', 'int'),
 ('mo_sin_rcnt_rev_tl_op', 'int'),
 ('mo_sin_rcnt_tl', 'int'),
 ('mort_acc', 'int'),
 ('num_accts_ever_120_pd', 'int'),
 ('num_actv_bc_tl', 'int'),
 ('num_actv_rev_tl', 'int'),
 ('num_bc_sats', 'int'),
 ('num_bc_tl', 'int'),
 ('num_il_tl', 'int'),
 ('num_op_rev_tl', 'int'),
 ('num_rev_accts', 'int'),
 ('num_rev_tl_bal_gt_0', 'int'),
 ('num_sats', 'int'),
 ('num_tl_30dpd', 'int'),
 ('num_tl_90g_dpd_24m', 'int'),
 ('num_tl_op_past_12m', 'int'),
 ('pub_rec_bankruptcies', 'int'),
 ('tax_liens', 'int'),
 ('tot_hi_cred_lim', 'int'),
 ('total_bal_ex_mort', 'int'),
 ('total_bc_limit', 'int'),
 ('total_il_high_credit_limit', 'int'),
 ('term', 'string'),
 ('loan_status', 'string'),
 ('hardship_flag', 'string'),
 ('application_type', 

In [64]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

cols_to_index = [k for k,v in df_train3.dtypes if (v == "string" and k != "loan_status")]
non_to_index = [k for k,v in df_train3.dtypes if v != "string"]
df_trainx = df_train3.select(*df_train3.columns)
for col_ in cols_to_index:
    indexer_ = StringIndexer(inputCol=col_, outputCol="indexed{}".format(col_))
    indexer_.setHandleInvalid("skip")
    model_ = indexer_.fit(df_trainx)
    df_trainx = model_.transform(df_trainx)
    
columns = non_to_index + ["indexed{}".format(col_) for col_ in cols_to_index]
    
assembler = VectorAssembler(inputCols=columns, outputCol="features", handleInvalid="skip")
df_train3 = assembler.transform(df_trainx)


# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="loan_status", outputCol="indexedloan_status", handleInvalid="skip")
labelIndexer.setHandleInvalid("skip")
labelIndexer = labelIndexer.fit(df_train)

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
     VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=len(cols_to_index), handleInvalid="skip")
featureIndexer.setHandleInvalid("skip")


# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = df_train3.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestClassifier(
    labelCol="indexedloan_status",
    featuresCol="indexedFeatures",
    predictionCol="prediction",
    numTrees=10)

# # Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("predictedLabel", "loan_status", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedloan_status", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

rfModel = model.stages[2]
print(rfModel)  # summary only

+--------------+-----------+--------------------+
|predictedLabel|loan_status|            features|
+--------------+-----------+--------------------+
|    Fully Paid| Fully Paid|[1000.0,1000.0,95...|
|    Fully Paid| Fully Paid|(55,[0,1,2,3,4,5,...|
|    Fully Paid| Fully Paid|[1000.0,1000.0,10...|
|    Fully Paid| Fully Paid|(55,[0,1,2,3,4,8,...|
|    Fully Paid| Fully Paid|[1000.0,1000.0,10...|
+--------------+-----------+--------------------+
only showing top 5 rows

Test Error = 0.01928
RandomForestClassificationModel (uid=RandomForestClassifier_9ff182238118) with 10 trees


In [65]:
predictions.select("predictedLabel", "loan_status", "features").show(50)

+--------------+-----------+--------------------+
|predictedLabel|loan_status|            features|
+--------------+-----------+--------------------+
|    Fully Paid| Fully Paid|[1000.0,1000.0,95...|
|    Fully Paid| Fully Paid|(55,[0,1,2,3,4,5,...|
|    Fully Paid| Fully Paid|[1000.0,1000.0,10...|
|    Fully Paid| Fully Paid|(55,[0,1,2,3,4,8,...|
|    Fully Paid| Fully Paid|[1000.0,1000.0,10...|
|    Fully Paid| Fully Paid|(55,[0,1,2,3,4,5,...|
|   Charged Off|Charged Off|(55,[0,1,2,3,4,5,...|
|    Fully Paid| Fully Paid|[1400.0,1400.0,14...|
|    Fully Paid| Fully Paid|(55,[0,1,2,3,4,5,...|
|    Fully Paid| Fully Paid|(55,[0,1,2,3,4,5,...|
|    Fully Paid| Fully Paid|(55,[0,1,2,3,4,5,...|
|    Fully Paid| Fully Paid|(55,[0,1,2,3,4,5,...|
|   Charged Off|Charged Off|(55,[0,1,2,3,4,5,...|
|    Fully Paid| Fully Paid|(55,[0,1,2,3,4,5,...|
|    Fully Paid| Fully Paid|[1900.0,1900.0,18...|
|    Fully Paid| Fully Paid|[2000.0,2000.0,20...|
|    Fully Paid| Fully Paid|(55,[0,1,2,3,4,5,...|


In [66]:
cols_dropped = cols_to_drop_4 + cols_to_drop_3 + cols_to_drop_2 + cols_to_drop_1
cols_dropped

['bc_open_to_buy',
 'mo_sin_old_il_acct',
 'mths_since_recent_bc',
 'mths_since_recent_inq',
 'mths_since_recent_revol_delinq',
 'num_tl_120dpd_2m',
 'pct_tl_nvr_dlq',
 'percent_bc_gt_75',
 'revol_util',
 'bc_util',
 'dti',
 'max_bal_bc',
 'avg_cur_bal',
 'total_bal_il',
 'policy_code',
 'out_prncp_inv',
 'out_prncp',
 'emp_title',
 'emp_length',
 'desc',
 'mths_since_last_record',
 'next_pymnt_d',
 'mths_since_last_major_derog',
 'annual_inc_joint',
 'dti_joint',
 'verification_status_joint',
 'mths_since_recent_bc_dlq',
 'revol_bal_joint',
 'sec_app_earliest_cr_line',
 'sec_app_inq_last_6mths',
 'sec_app_mort_acc',
 'sec_app_open_acc',
 'sec_app_revol_util',
 'sec_app_open_act_il',
 'sec_app_num_rev_accts',
 'sec_app_chargeoff_within_12_mths',
 'sec_app_collections_12_mths_ex_med',
 'sec_app_mths_since_last_major_derog',
 'hardship_type',
 'hardship_reason',
 'hardship_status',
 'deferral_term',
 'hardship_amount',
 'hardship_start_date',
 'hardship_end_date',
 'payment_plan_start_da