In [1]:
import numpy as np
from pyspark.sql import SparkSession
from pyspark.ml.feature import Imputer
from pyspark.sql.types import FloatType, ArrayType
from pyspark.sql.functions import col, to_date, year, month, regexp_like, \
                                    regexp_extract, isnan, when, count, lit, udf, \
                                    pow as psf_pow, sum as psf_sum

In [2]:
import warnings
warnings.filterwarnings('ignore')

In [3]:
# spark url: localhost:4040
spark = SparkSession \
    .builder \
    .appName("LoanEtl") \
    .getOrCreate()

In [4]:
df = spark \
    .read \
    .option('header', "true") \
    .option("inferSchema", "true") \
    .csv('data/loan_10%.csv')

rename some columns

In [7]:
find = [
    'mths_since_rcnt_il', 'mo_sin_rcnt_rev_tl_op', 'mo_sin_rcnt_tl', # rcnt and mo_sin_rcnt
    'mths_since_recent_bc_dlq', 'pct_tl_nvr_dlq', # dlq
    'mo_sin_old_il_acct', 'mo_sin_old_rev_tl_op', # mo_sin_old
]
replace = [
    'mths_since_recent_il', 'mths_since_recent_rev_tl_op', 'mths_since_recent_tl', # recent and mths_since
    'mths_since_recent_bc_delinq', 'percent_tl_nvr_deliq', # delinq
    'mths_since_old_il_acct', 'mths_since_old_rev_tl_op' # mths_since_old
]

for f, r in zip(find, replace):
    df = df.withColumnRenamed(f, r)

del find, replace

convert to datetime

In [8]:
# example

# original
print(df.select('issue_d').first())
# want-to-be
print(df.select(to_date(df['issue_d'], "MMM-yyyy")).first()[0])

Row(issue_d='Aug-2015')
2015-08-01


In [9]:
datetime_columns = ['issue_d', 'earliest_cr_line', 'last_pymnt_d', 'next_pymnt_d',
                    'last_credit_pull_d', 'sec_app_earliest_cr_line', 'hardship_start_date',
                    'hardship_end_date', 'payment_plan_start_date']

for dt_col in datetime_columns:
    df = df.withColumn(dt_col, to_date(df[dt_col], "MMM-yyyy"))

remove loan data issued before 1-1-2013

In [10]:
df = df.filter(year('issue_d') > 2012)

modify column int_rate and revol_util: changing from "xx.xx%" to xx.xx

In [11]:
df = df.withColumn('int_rate', regexp_extract('int_rate', r'(\d+\.\d+)', 1).cast('float'))

In [12]:
df.select('int_rate').first()

Row(int_rate=7.889999866485596)

In [13]:
df = df.withColumn('revol_util', regexp_extract('revol_util', r'(\d+\.\d+)', 1).cast("float"))

In [14]:
df.select('revol_util').first()

Row(revol_util=25.100000381469727)

create a column for deciding if it is matured loan; empty row = matured

In [15]:
df = df.withColumn('is_matured_loan', col('next_pymnt_d').isNull())

I only analyzed the loan data that is matured

In [16]:
df = df.filter(df['is_matured_loan'])

handle na / null data in dataframe

In [17]:
# turned into binary: missing = 0; existing = 1
mths_since_to_drop = [
    col_name for col_name in df.columns \
    if col_name not in ['mths_since_recent_rev_tl_op', 'mths_since_recent_tl'] \
    and 'mths_since_' in col_name
]

for col_name in mths_since_to_drop:
    df = df.withColumn(col_name,
                       when(col(col_name).isNull(), 0) \
                       .otherwise(1))

In [18]:
# filled by zero
df = df \
    .withColumn('deferral_term', when(col('deferral_term').isNull(), 0) \
    .otherwise(col('deferral_term')).cast("integer"))

In [19]:
# filled by mean
col_fill_by_mean = ['il_util', 'all_util', 'total_bal_il', 'max_bal_bc', 
                    'percent_bc_gt_75', 'bc_util', 'revol_util', 'dti',
                    'percent_tl_nvr_deliq', 'avg_cur_bal', 'bc_open_to_buy']

imputer = Imputer(strategy='mean', inputCols=col_fill_by_mean, outputCols=col_fill_by_mean)
imputer = imputer.fit(df)
df = imputer.transform(df)

In [20]:
# filled by mode (numeric)
col_fill_by_mode = ['open_acc_6m', 'open_act_il', 'open_il_12m', 'open_il_24m',
                    'open_rv_12m', 'open_rv_24m', 'inq_fi', 'total_cu_tl', 'inq_last_12m',
                    'num_tl_120dpd_2m', 'num_rev_accts', 'inq_last_6mths']

imputer = Imputer(strategy='mode', inputCols=col_fill_by_mode, outputCols=col_fill_by_mode)
imputer = imputer.fit(df)
df = imputer.transform(df)

In [21]:
# filled by mode (string)
for col_name in ['emp_length', 'zip_code', 'hardship_flag']:
    most_common = df.freqItems([col_name]).first()[0][0]
    df = df \
        .withColumn(col_name, when(col(col_name).isNull(), most_common) \
        .otherwise(col(col_name)))

In [22]:
# filled by empty
col_fill_by_empty_str = ['emp_title', 'title', 'purpose']

for col_name in col_fill_by_empty_str:
    df = df \
        .withColumn(col_name, when(col(col_name).isNull(), '') \
        .otherwise(col(col_name)))
    
# filled by df.na.fill('') for string columns (I guess)
# https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.fillna.html

create columns for modelling the probablilty of default

In [23]:
# scale some columns by annual_inc
for col_name in ['loan_amnt', 'delinq_amnt', 'revol_bal', 'tot_cur_bal', 
                 'max_bal_bc', 'avg_cur_bal', 'total_bal_ex_mort']:
    
    df = df.withColumn(col_name + '_to_' + 'annual_inc', 
                       col(col_name) / (col('annual_inc') + 1))
    
# ratio of the number of open credit lines to the number of 
# total credit lines in the borrower's credit file.
df = df.withColumn('open_cl_ratio', col('open_acc') / col('total_acc'))

create column: "expected"/realized installment

In [24]:
df = df \
    .withColumn('num_mths_to_pay',
                (year(col('last_pymnt_d')) - year(col('issue_d'))) * 12
                + month(col('last_pymnt_d')) - month(col('issue_d')))

In [25]:
# logic:
# if last_pymnt_d not null => no need to fill => mnths_diff
# else
#   if total pymnt > 0 => total_pymnt / installment => approx mnths_diff
#   else 0 => no payment made

df = df.withColumn(
    'num_mths_to_pay',
    when(
        col('last_pymnt_d').isNull(),
        when(
            col('total_pymnt') > 0, 
            col('total_pymnt') / col('installment')) \
        .otherwise(0)) \
    .otherwise(col('num_mths_to_pay')).cast("integer"))

In [26]:
# create a term integer column and drop soon
df = df.withColumn('term_int', regexp_extract('term', r'(\d\d)', 1).cast('integer'))

In [27]:
def calculate_installment(p, r, n):
    r = r / 100
    return p * (r / 12) * (1 + r / 12) ** n / ((1 + r / 12) ** n - 1)

df = df.withColumn(
    'calculated_installment',
    calculate_installment(col('funded_amnt'), col('int_rate'), col('term_int'))
)

In [28]:
# scale some installment columns by annual_inc
for col_name in ['installment', 'calculated_installment']:
    
    df = df.withColumn(col_name + '_to_' + 'annual_inc', 
                       col(col_name) / (col('annual_inc') + 1))

create column: ROI

In [29]:
# assume
depreciation_rate = 0.02

In [30]:
def calculate_avg_pymnt(total_pymnt, num_mths_to_pay, last_pymnt_amnt):
    return when(num_mths_to_pay == 0, last_pymnt_amnt) \
            .otherwise(col('total_pymnt') / col('num_mths_to_pay'))

In [31]:
df = df.withColumn(
    'avg_pymnt', 
    calculate_avg_pymnt(
        col('total_pymnt'),
        col('num_mths_to_pay'), 
        col('last_pymnt_amnt')))

In [32]:
def calculate_cdf(num, depreciation_rate=0.02):
    return sum([(1 + depreciation_rate / 12) ** (-t) for t in range(1, num + 1)])

udf_calculate_cdf = udf(calculate_cdf, returnType=FloatType())

In [33]:
df = df.withColumn('cdf', udf_calculate_cdf(col('num_mths_to_pay')))

In [34]:
df = df.withColumn('CDP', col('cdf') * col('avg_pymnt'))

In [35]:
df = df.withColumn(
    'CDP',
    when(
        (col('avg_pymnt') > 0)& \
        (col('num_mths_to_pay') == 0), 
        col('avg_pymnt')
    ).otherwise(col('CDP')))

In [36]:
def calculate_discounted_net_recoveries(depreciation_rate=0.02):
    return (
        (col('recoveries') - col('collection_recovery_fee')) \
        * 1 / (1 + depreciation_rate / 12)**(col('num_mths_to_pay') + 6)
    )
    
df = df.withColumn('CDP', col('CDP') + calculate_discounted_net_recoveries())

In [37]:
df = df.withColumn(
    'ROI', 
    (col('CDP') - col('funded_amnt')) \
    / col('funded_amnt'))

In [38]:
df.select('ROI').first()

Row(ROI=0.08976702591577829)

create column: IRR

In [39]:
df = df.withColumn(
    'num_mths_to_pay',
    when(
        (regexp_like('loan_status', lit(r'(Fully)')))& \
        (col('issue_d') == col('last_pymnt_d')),
        1
    ).otherwise(col('num_mths_to_pay')))

In [40]:
def make_list_of_cashflow(f, p, n):
    return [-f] + [p] * n

udf_make_list_of_cashflow = udf(make_list_of_cashflow, returnType=ArrayType(FloatType()))

In [41]:
# equivalently, it means 
# df.apply(lambda r: [-r['funded_amnt']] + [r['avg_pymnt']] * r['num_mths_to_pay'], axis=1)

df = df.withColumn(
    'lst_of_cf', 
    udf_make_list_of_cashflow(
        col('funded_amnt'), 
        col('avg_pymnt'), 
        col('num_mths_to_pay')))

In [42]:
df.select('lst_of_cf').first()

Row(lst_of_cf=[-8500.0, 364.3417053222656, 364.3417053222656, 364.3417053222656, 364.3417053222656, 364.3417053222656, 364.3417053222656, 364.3417053222656, 364.3417053222656, 364.3417053222656, 364.3417053222656, 364.3417053222656, 364.3417053222656, 364.3417053222656, 364.3417053222656, 364.3417053222656, 364.3417053222656, 364.3417053222656, 364.3417053222656, 364.3417053222656, 364.3417053222656, 364.3417053222656, 364.3417053222656, 364.3417053222656, 364.3417053222656, 364.3417053222656, 364.3417053222656])

In [43]:
def irr(cash_flow):
    """
    Solving this to get IRR:
    p0 + p1 * (1 + IRR)^(-1) + p2 * (1 + IRR)^(-2) + ... + pn * (1 + IRR)^(-n) = 0 _ (*)

    Let g = 1 + IRR and multiply (*) by g^n and you will get:
    p0 * g^n + p1 * g^(n-1) + p2 * g^(n-2) + ... + pn = 0

    Then solve the polynomial and get the positive roots
    IRR = g - 1

    :param: cash_flow: a stream of cash flow to pay and receive with a NPV = 0
    """
    cash_flow = np.array(cash_flow)
    same_sign = np.all(cash_flow > 0) if cash_flow[0] > 0 else np.all(cash_flow < 0)
    if same_sign:
        if len(cash_flow) == 1 and cash_flow[0] < 0:
            return -1
        
        raise ValueError("cash_flow must contain positive and negative values")

    g = np.roots(cash_flow)
    IRR = np.real(g[np.isreal(g)]) - 1

    # realistic IRR
    IRR = IRR[(IRR <= 1)&(IRR >= -1)]

    # if no real solution
    if len(IRR) == 0:
        return np.nan

    # if only one real solution
    if len(IRR) == 1:
        return IRR[0]

    # check sign of all IRR solutions
    same_sign = np.all(IRR > 0) if IRR[0] > 0 else np.all(IRR < 0)
    
    # if not the same, first filter potential IRR by comparing the total positive and negative cash flows
    if not same_sign:
        pos = sum(cash_flow[cash_flow>0])
        neg = sum(cash_flow[cash_flow<0])
        if pos >= neg:
            IRR = IRR[IRR >= 0]
        else:
            IRR = IRR[IRR < 0]
    
    # pick the smallest one in magnitude and return
    abs_IRR = np.abs(IRR)
    return IRR[np.argmin(abs_IRR)]

In [44]:
from pyspark.sql.types import StructType, StructField

In [45]:
irr_rdd = df.rdd.map(lambda x: (float(irr(x[-1])),))

In [46]:
schema = StructType([StructField("IRR", FloatType(), True)])
irr_df = irr_rdd.toDF(schema)

In [47]:
df = df.join(irr_df)

drop irrelevant columns

In [48]:
to_drop = [
    col for col in df.columns if 
        ('hardship' in col and 'hardship_flag' != col) or 
        ('mths_since_' in col and '_binary' not in col) or 
        ('sec_app' in col) or 
        ('joint' in col)
        
] + [
    'orig_projected_additional_accrued_interest', 
    'payment_plan_start_date', 'pymnt_plan',
    'issue_d_year', 'policy_code',
    'next_pymnt_d', 'earliest_cr_line',
    'url', 'last_credit_pull_d', 'last_pymnt_d'
    
] + mths_since_to_drop + [
    'lst_of_cf', 'cdf', 'CDP', 'is_matured_loan'
]

In [49]:
df = df.drop(*to_drop)

check na count 
- tried: count can run so long. so i collect it back work as a chkpt and turn it back to df (failed)
    - potential reason: can't handle such a large list

In [57]:
# not going to do now
# # Set a checkpoint directory
# spark.sparkContext.setCheckpointDir("hdfs://your_checkpoint_directory")

# # Perform checkpointing on a DataFrame
# df_checkpointed = df.checkpoint()

binning

In [58]:
def binning_int_rate(int_rate):
    if int_rate < 10:
        return 'less than 10%'
    for i in range(2, 6):
        if i*5 <= int_rate and (i+1)*5 > int_rate:
            return f'within {i*5}% and {i*5+5}%'
    return 'greater than or equal to 30%'

udf_binning_int_rate = udf(binning_int_rate)

In [70]:
df = df.withColumn('int_rate_bin', udf_binning_int_rate(col('int_rate')))

In [74]:
def binning_income(income):
    if income < 20000:
        return 'less than 20k'
    for i in range(2, 16):
        if i*10000 <= income and (i+1)*10000 > income:
            return f'within {i*10}k and {(i+1)*10}k'
    return f'greater than or equal to {(i+1)*10}k'

udf_binning_income = udf(binning_income)

In [75]:
df = df.withColumn('annual_inc_bin', udf_binning_income(col('annual_inc')))

load to local and save as csv

In [80]:
df.explain(mode="formatted")

== Physical Plan ==
Project (13)
+- BatchEvalPython (12)
   +- CartesianProduct Inner (11)
      :- Project (9)
      :  +- Project (8)
      :     +- BatchEvalPython (7)
      :        +- Project (6)
      :           +- Project (5)
      :              +- * Project (4)
      :                 +- * Project (3)
      :                    +- * Filter (2)
      :                       +- Scan csv  (1)
      +- * Scan ExistingRDD (10)


(1) Scan csv 
Output [96]: [id#17, loan_amnt#18, funded_amnt#19, funded_amnt_inv#20, term#21, int_rate#22, installment#23, grade#24, sub_grade#25, emp_title#26, emp_length#27, home_ownership#28, annual_inc#29, verification_status#30, issue_d#31, loan_status#32, purpose#35, title#36, zip_code#37, addr_state#38, dti#39, delinq_2yrs#40, fico_range_low#42, fico_range_high#43, inq_last_6mths#44, open_acc#47, pub_rec#48, revol_bal#49, revol_util#50, total_acc#51, initial_list_status#52, out_prncp#53, out_prncp_inv#54, total_pymnt#55, total_pymnt_inv#56, total_re

In [82]:
# Specify the path where you want to save the CSV file
csv_path = "data"

# Write DataFrame to CSV
df.limit(10) \
    .write \
    .format("csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .csv(csv_path)

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "C:\Users\User\anaconda3\Lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\User\anaconda3\Lib\site-packages\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\User\anaconda3\Lib\socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 