In [1]:
!pip install pyspark



In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan,when,mean,count,col,desc,year,quarter,when,lit,concat,avg,countDistinct
from pyspark.sql.types import StringType
from pyspark.sql.functions import regexp_replace
from pyspark.ml.feature import StringIndexer
import pandas as pd


spark = SparkSession.builder.appName("P2P_Lending").getOrCreate()
sc = spark.sparkContext

In [3]:
df = spark.read.format("csv")\
    .option("inferSchema",True)\
    .option("header",True)\
    .option("sep",",")\
    .load("lending_club_loans.csv")


df.printSchema()

dfRows = df.count()
print("Total No of columns - ", len(df.columns))
print("Total No of rows - ", dfRows)

root
 |-- id: string (nullable = true)
 |-- member_id: integer (nullable = true)
 |-- loan_amnt: integer (nullable = true)
 |-- funded_amnt: integer (nullable = true)
 |-- funded_amnt_inv: double (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: string (nullable = true)
 |-- installment: double (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: double (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- issue_d: string (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- pymnt_plan: string (nullable = true)
 |-- url: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- title: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- dti: stri

# 1. Data Pre-Processing

**1.1 Cleaning the columns according to requirement**


***Removing all the columns which has more than 50% of the data empty***

In [4]:
# Find the Feature columns which has more than 50% empty data

def findMissingValueCols(df):
    
    missingValueColumns = []
    for column in df.columns:
        nullRows = df.where(col(column).isNull()).count()
        
        if nullRows > dfRows*0.5 : # i.e. if ALL values are NULL
            missingValueColumns.append(column)
    return missingValueColumns

missingValueColList = findMissingValueCols(df)

print("Total Loan Observations - ", df.count())
print("Total No of columns - ", len(df.columns))
#df.printSchema()

Total Loan Observations -  42542
Total No of columns -  115


In [5]:
# 1. Removing all the features which has more than 50% of the data empty 

df = df.drop(*missingValueColList)

print("Total No of columns - ", len(df.columns))

# 2. Removing all loan observations which has more than 50% of the data empty 

df = df.dropna(thresh=29) 

print("Total Loan Observations - ", df.count())

# 3. Cheking for duplicates and removing them

print("Count the no rows :{0}".format(df.count()))
print("Count the no of distinct rows:{0}".format(df.distinct().count()))

# As we can see there are no duplicates in our dataset

Total No of columns -  58
Total Loan Observations -  42534
Count the no rows :42534
Count the no of distinct rows:42534


***Removing those columns having 90% same value***

In [6]:
num_rows = df.count()
threshold = 0.9  # 90% threshold for same values

# Iterate over the columns and calculate the percentage of the most common value
for col_name in df.columns:
    mode_count = df.groupBy(col_name).agg(count("*").alias("count")).orderBy(col("count").desc()).collect()[0]["count"]
    pct_mode = float(mode_count) / num_rows
    #print(f"{col_name} - mode count: {pct_mode:.2f}")

    # Drop columns with a high percentage of the most common value
    if pct_mode > threshold:
        df = df.drop(col_name)

In [7]:
print("Total No of columns - ", len(df.columns))
print(df.columns)

Total No of columns -  44
['id', 'member_id', 'loan_amnt', 'funded_amnt', 'funded_amnt_inv', 'term', 'int_rate', 'installment', 'grade', 'sub_grade', 'emp_title', 'emp_length', 'home_ownership', 'annual_inc', 'verification_status', 'issue_d', 'loan_status', 'url', 'desc', 'purpose', 'title', 'zip_code', 'addr_state', 'dti', 'delinq_2yrs', 'earliest_cr_line', 'fico_range_low', 'fico_range_high', 'inq_last_6mths', 'open_acc', 'revol_bal', 'revol_util', 'total_acc', 'total_pymnt', 'total_pymnt_inv', 'total_rec_prncp', 'total_rec_int', 'recoveries', 'collection_recovery_fee', 'last_pymnt_d', 'last_pymnt_amnt', 'last_credit_pull_d', 'last_fico_range_high', 'last_fico_range_low']


***Removing columns based on some criteria***

In [8]:
#Reading data dictionary in which information regarding each column is given

loan_dict=spark.read.csv("LCDataDictionary.csv",header=True)

#loan_dict.show()

#We will remove the columns based on following criteria:-

#leaks information from the future (after the loan has already been funded),
#doesn’t affect the borrower’s ability to pay back the loan (e.g. a randomly generated ID value by Lending Club),
#is formatted poorly,
#requires more data or a lot of pre-processing to turn into useful a feature, or
#contains redundant information.

In [9]:
# removing those columns which does not give valueable insights to build model

df = df.drop('id','member_id','int_rate','emp_title','url','desc','zip_code','sub_grade')

# removing those columns which leaks data from the future

df = df.drop('funded_amnt','funded_amnt_inv','total_pymnt','total_pymnt_inv','total_rec_prncp','total_rec_int','recoveries','collection_recovery_fee','last_pymnt_d','last_pymnt_amnt')

print("Total No of columns - ", len(df.columns))

print(df.columns)

Total No of columns -  26
['loan_amnt', 'term', 'installment', 'grade', 'emp_length', 'home_ownership', 'annual_inc', 'verification_status', 'issue_d', 'loan_status', 'purpose', 'title', 'addr_state', 'dti', 'delinq_2yrs', 'earliest_cr_line', 'fico_range_low', 'fico_range_high', 'inq_last_6mths', 'open_acc', 'revol_bal', 'revol_util', 'total_acc', 'last_credit_pull_d', 'last_fico_range_high', 'last_fico_range_low']


***Investigate date columns and Removing column which are not useful***

'earliest_cr_line': The month the borrower's earliest reported credit line was opened; we do not need ths column for our analysis

'last_credit_pull_d': The most recent month LendingClub pulled credit for this loan; we do not need ths column for our analysis.

'issue_d': The date which the loan was funded. It means the date investor received full amount of money they need. We don't need this column; also it leak future information.


In [10]:
df = df.drop('earliest_cr_line', 'last_credit_pull_d','issue_d')

print("Total No of columns - ", len(df.columns))

Total No of columns -  23


***Investigating FICO Score Columns***

In [11]:
#Let’s get rid of the missing values,

print(df.filter(col('fico_range_high').isNull()).count())

#As there are no null values let's proceed further
#Let's take average of fico_range_low and fico_range_high
df = df.withColumn('fico_average', (col('fico_range_high') + col('fico_range_low')) / 2)
print("Total No of columns - ", len(df.columns))

#Let's look at result
df.select('fico_average', 'fico_range_high', 'fico_range_low').show()

#Now we have got mean value of FICO score which we can use further
#Now, we can go ahead and drop fico_range_low, fico_range_high, last_fico_range_low, and last_fico_range_high columns.

df = df.drop('fico_range_low', 'fico_range_high','last_fico_range_high', 'last_fico_range_low')
print("Total No of columns - ", len(df.columns))

0
Total No of columns -  24
+------------+---------------+--------------+
|fico_average|fico_range_high|fico_range_low|
+------------+---------------+--------------+
|       737.0|            739|           735|
|       742.0|            744|           740|
|       737.0|            739|           735|
|       692.0|            694|           690|
|       697.0|            699|           695|
|       732.0|            734|           730|
|       692.0|            694|           690|
|       662.0|            664|           660|
|       677.0|            679|           675|
|       727.0|            729|           725|
|       697.0|            699|           695|
|       677.0|            679|           675|
|       712.0|            714|           710|
|       707.0|            709|           705|
|       722.0|            724|           720|
|       667.0|            669|           665|
|       672.0|            674|           670|
|       762.0|            764|           760|
|     

**1.2 Converting target columns into binary values**

In [12]:
#Our main goal is predict who will pay off a loan and who will default
#loan_status is the only field in the main data set that describes a loan status, so let’s use this column as the target column.

#We’ll use the DataFrame method value_counts() to return the frequency of the unique values in the loan_status column.

df.groupBy("loan_status").count().show()


# Filter the DataFrame
df = df.filter(df.loan_status.isin(["Fully Paid", "Charged Off"]))

# Replace loan_status values with 1 and 0
df = df.withColumn("loan_status", when(df.loan_status == "Fully Paid", 1).otherwise(0))

df.groupBy("loan_status").count().show()

print("Count the no rows :{0}".format(df.count()))

+--------------------+-----+
|         loan_status|count|
+--------------------+-----+
|          Fully Paid|33586|
|     In Grace Period|   16|
|         Charged Off| 5652|
|  Late (31-120 days)|   12|
|             Current|  513|
|   Late (16-30 days)|    5|
|             Default|    1|
|Does not meet the...| 1988|
|Does not meet the...|  761|
+--------------------+-----+

+-----------+-----+
|loan_status|count|
+-----------+-----+
|          1|33586|
|          0| 5652|
+-----------+-----+

Count the no rows :39238


**1.3 Dropping missing values**

In [13]:
# Create an empty dictionary to store the null counts
null_counts = {}

# Loop through each column and get the count of null values
for c in df.columns:
    null_count = df.where(col(c).isNull()).count()
    null_counts[c] = null_count

# Print the null counts dictionary
print("Number of null values in each column:")
print(null_counts)

#Removing the rows having the null values
df = df.na.drop(how="any")

Number of null values in each column:
{'loan_amnt': 0, 'term': 0, 'installment': 0, 'grade': 0, 'emp_length': 0, 'home_ownership': 0, 'annual_inc': 0, 'verification_status': 0, 'loan_status': 0, 'purpose': 0, 'title': 9, 'addr_state': 0, 'dti': 0, 'delinq_2yrs': 0, 'inq_last_6mths': 0, 'open_acc': 71, 'revol_bal': 46, 'revol_util': 78, 'total_acc': 16, 'fico_average': 191}


**1.4 Encoding of Categorical Columns**

In [14]:
#Investigate Categorical Columns

# Filter dataframe columns by datatype 'string'
object_columns_df = df.select([col(c).alias(c) for c in df.columns if df.schema[c].dataType == StringType()])

# Print first row of the dataframe
print(object_columns_df.first())

Row(term=' 36 months', grade='B', emp_length='10+ years', home_ownership='RENT', verification_status='Verified', purpose='credit_card', title='Computer', addr_state='AZ', dti='27.65', delinq_2yrs='0', inq_last_6mths='1', open_acc='3', revol_bal='13648', revol_util='83.70%', total_acc='9')


***Label Encoding : Converting Ordinal categories column to numerical values***

In [15]:
#cleaning the 'term' and 'revol_util' columns

df = df.withColumn("term", regexp_replace("term", "months", ""))

df = df.withColumn("revol_util", regexp_replace("revol_util", "%", ""))

#### Converting Categorical feature "emp_length" and "grade" to continous feature


df = df.withColumn("emp_lengthIndex", when(col("emp_length") == "< 1 year", 0)
                                           .when(col("emp_length") == "1 year", 1)
                                           .when(col("emp_length") == "2 years", 2)
                                           .when(col("emp_length") == "3 years", 3)
                                           .when(col("emp_length") == "4 years", 4)
                                           .when(col("emp_length") == "5 years", 5)
                                           .when(col("emp_length") == "6 years", 6)
                                           .when(col("emp_length") == "7 years", 7)
                                           .when(col("emp_length") == "8 years", 8)
                                           .when(col("emp_length") == "9 years", 9)
                                           .when(col("emp_length") == "10+ years", 10)
                                           .otherwise(0))



df = df.withColumn("grade_Index", when(col("grade") == "A", 1)
                                           .when(col("grade") == "B", 2)
                                           .when(col("grade") == "C", 3)
                                           .when(col("grade") == "D", 4)
                                           .when(col("grade") == "E", 5)
                                           .when(col("grade") == "F", 6)
                                           .when(col("grade") == "G", 7)
                                           .otherwise(0))

In [16]:
print(df.first())

Row(loan_amnt=5000, term=' 36 ', installment=162.87, grade='B', emp_length='10+ years', home_ownership='RENT', annual_inc=24000.0, verification_status='Verified', loan_status=1, purpose='credit_card', title='Computer', addr_state='AZ', dti='27.65', delinq_2yrs='0', inq_last_6mths='1', open_acc='3', revol_bal='13648', revol_util='83.70', total_acc='9', fico_average=737.0, emp_lengthIndex=10, grade_Index=2)


***One Hot Encoding: Converting Nominal categories column to binary vectors*** 

Identify the categorical columns that need to be encoded.
Use StringIndexer to convert the categorical columns into numerical indexes.
Use OneHotEncoder to convert the numerical indexes into sparse vector representation of binary encoded columns.
Use VectorAssembler to combine the encoded columns with other numerical columns in the dataset.

In [17]:
#Applying One-Hot Encoding on Categorical columns


from pyspark.ml.feature import OneHotEncoder, StringIndexer

from pyspark.ml import Pipeline


nominal_columns = ["home_ownership", "verification_status", "purpose", "term"]

indexers = [StringIndexer(inputCol=column, outputCol=column+"_index")    for column in nominal_columns]

encoders = [OneHotEncoder(inputCol=column+"_index", outputCol=column+"_encoded")   for column in nominal_columns]

pipeline = Pipeline(stages=indexers + encoders)
df = pipeline.fit(df).transform(df)

***Converting string type columns into double type***

In [18]:

lst=['dti','delinq_2yrs','inq_last_6mths','open_acc','revol_bal','total_acc','revol_util']

for col_name in lst:
    df = df.withColumn(col_name, col(col_name).cast("double"))

**1.5 Data Balancing and Scaling**

***Data Balancing***

In [19]:
# Count the number of observations in each class
count_charged_off = df.filter(df.loan_status == 0).count()
count_fully_paid = df.filter(df.loan_status == 1).count()

# Compute the ratio of the class counts
ratio = count_charged_off / count_fully_paid

# Create a sample of fully paid observations with a fraction of (1-ratio) 
df_fully_paid = df.filter(df.loan_status == 1).sample(False, 1 - ratio, seed=42)

# Create a dataframe of the minority class 
df_charged_off = df.filter(df.loan_status == 0)

# Combine the two dataframes
df = df_fully_paid.union(df_charged_off)

# Verify the class counts
df.groupBy('loan_status').count().show()


+-----------+-----+
|loan_status|count|
+-----------+-----+
|          1|27912|
|          0| 5603|
+-----------+-----+



***Data Scaling***

In [20]:
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler

# select the columns to normalize
cols_to_norm = ['loan_amnt', 'installment', 'annual_inc','dti','delinq_2yrs','inq_last_6mths','open_acc','revol_bal','total_acc','revol_util','fico_average']

# assemble the features into a vector
assembler = VectorAssembler(inputCols=cols_to_norm, outputCol='features',handleInvalid='skip')
df = assembler.transform(df)

# initialize the scaler
scaler = StandardScaler(inputCol='features', outputCol='scaled_features',withMean=True,withStd=True)

# fit and transform the data
df = scaler.fit(df).transform(df)

# show the first 5 rows of the scaled data
df.show(5)

# droping features cloumn
df = df.drop('features') 

+---------+----+-----------+-----+----------+--------------+----------+-------------------+-----------+------------------+--------------------+----------+-----+-----------+--------------+--------+---------+----------+---------+------------+---------------+-----------+--------------------+-------------------------+-------------+----------+----------------------+---------------------------+---------------+-------------+--------------------+--------------------+
|loan_amnt|term|installment|grade|emp_length|home_ownership|annual_inc|verification_status|loan_status|           purpose|               title|addr_state|  dti|delinq_2yrs|inq_last_6mths|open_acc|revol_bal|revol_util|total_acc|fico_average|emp_lengthIndex|grade_Index|home_ownership_index|verification_status_index|purpose_index|term_index|home_ownership_encoded|verification_status_encoded|purpose_encoded| term_encoded|            features|     scaled_features|
+---------+----+-----------+-----+----------+--------------+----------+-

In [21]:
print(df.first())

Row(loan_amnt=5000, term=' 36 ', installment=162.87, grade='B', emp_length='10+ years', home_ownership='RENT', annual_inc=24000.0, verification_status='Verified', loan_status=1, purpose='credit_card', title='Computer', addr_state='AZ', dti=27.65, delinq_2yrs=0.0, inq_last_6mths=1.0, open_acc=3.0, revol_bal=13648.0, revol_util=83.7, total_acc=9.0, fico_average=737.0, emp_lengthIndex=10, grade_Index=2, home_ownership_index=0.0, verification_status_index=1.0, purpose_index=1.0, term_index=0.0, home_ownership_encoded=SparseVector(4, {0: 1.0}), verification_status_encoded=SparseVector(2, {1: 1.0}), purpose_encoded=SparseVector(17, {1: 1.0}), term_encoded=SparseVector(1, {0: 1.0}), scaled_features=DenseVector([-0.8327, -0.7718, -0.7123, 2.1471, -0.2982, 0.1144, -1.4294, 0.0168, -1.1496, 1.2277, 0.5654]))


In [22]:
#dropping the original columns after one-hot encoding
df = df.drop('addr_state','title','grade','sub_grade','emp_length',"home_ownership", "verification_status", "purpose", "term","home_ownership_index","purpose_index","verification_status_index","purpose_index","term_index")


In [23]:
#checking if there are any string type columns left
df.printSchema()

root
 |-- loan_amnt: integer (nullable = true)
 |-- installment: double (nullable = true)
 |-- annual_inc: double (nullable = true)
 |-- loan_status: integer (nullable = false)
 |-- dti: double (nullable = true)
 |-- delinq_2yrs: double (nullable = true)
 |-- inq_last_6mths: double (nullable = true)
 |-- open_acc: double (nullable = true)
 |-- revol_bal: double (nullable = true)
 |-- revol_util: double (nullable = true)
 |-- total_acc: double (nullable = true)
 |-- fico_average: double (nullable = true)
 |-- emp_lengthIndex: integer (nullable = false)
 |-- grade_Index: integer (nullable = false)
 |-- home_ownership_encoded: vector (nullable = true)
 |-- verification_status_encoded: vector (nullable = true)
 |-- purpose_encoded: vector (nullable = true)
 |-- term_encoded: vector (nullable = true)
 |-- scaled_features: vector (nullable = true)



**1.6 Vector Assembling**

In [24]:
col_list=['emp_lengthIndex','grade_Index','home_ownership_encoded','verification_status_encoded','purpose_encoded','term_encoded','scaled_features']

In [25]:
from pyspark.ml.feature import VectorAssembler

# set the input and output column names
#assembler = VectorAssembler(inputCols=[ *col_list ], outputCol="features",handleInvalid = "keep")
assembler = VectorAssembler(inputCols=col_list, outputCol="features",handleInvalid='skip')

df = assembler.transform( df)


df = df.select("loan_status", "features")

# 2. Machine Learning Algorithms

**2.1 Logistic Regression Model**

In [26]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression( labelCol="loan_status",featuresCol="features", maxIter=10, regParam=0.8)

from pyspark.sql.functions import col
(trainingData, testData) = df.randomSplit([0.7, 0.3], seed=100)

# Train the model on the training data
model =  lr.fit(trainingData)

# Use the model to make predictions on the training data
predictions_train = model.transform(trainingData)

# Use the model to make predictions on the testing data
predictions_test = model.transform(testData)

# Evaluate the performance of the model
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol="loan_status")
print("Train Area Under ROC: " + str(evaluator.evaluate(predictions_train)))
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions_test)))

Train Area Under ROC: 0.6881617663835148
Test Area Under ROC: 0.6792552435554896


**2.2 Decision Tree Model**

In [27]:
from pyspark.ml.classification import DecisionTreeClassifier

# Define the Decision Tree Classifier model
dt = DecisionTreeClassifier(labelCol="loan_status", featuresCol="features")

# Train the model on the training data
model = dt.fit(trainingData)

# Use the model to make predictions on the training data
predictions_train = model.transform(trainingData)

# Use the model to make predictions on the testing data
predictions_test = model.transform(testData)

# Evaluate the performance of the model
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol="loan_status")
accuracy_train = evaluator.evaluate(predictions_train)

accuracy_test = evaluator.evaluate(predictions_test)

print(accuracy_train)
print(accuracy_test)

0.6302612907396437
0.6214427679354979


**2.3 Random Forest Model**

In [28]:
from pyspark.ml.classification import RandomForestClassifier

# Define the Random Forest Classifier model
rf = RandomForestClassifier(labelCol="loan_status", featuresCol="features")

# Train the model on the training data
model = rf.fit(trainingData)

# Use the model to make predictions on the testing data
predictions_train = model.transform(trainingData)

# Use the model to make predictions on the testing data
predictions_test = model.transform(testData)

# Evaluate the performance of the model
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol="loan_status")
accuracy_train = evaluator.evaluate(predictions_train)
accuracy_test = evaluator.evaluate(predictions_test)

print(accuracy_train)
print(accuracy_test)

0.7017075528428357
0.6831466618348274


**2.4 Gradient Boosting Model**

In [29]:
from pyspark.ml.classification import GBTClassifier

# Define the Gradient Boosting Machine Classifier model
gbt = GBTClassifier(labelCol="loan_status", featuresCol="features", maxIter=10)

# Train the model on the training data
model = gbt.fit(trainingData)

# Use the model to make predictions on the testing data
predictions_train = model.transform(trainingData)

# Use the model to make predictions on the testing data
predictions_test = model.transform(testData)

# Evaluate the performance of the model
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol="loan_status")
accuracy_train = evaluator.evaluate(predictions_train)
accuracy_test = evaluator.evaluate(predictions_test)

print(accuracy_train)
print(accuracy_test)

0.7300635865860928
0.6856434206555232


**2.5 Support Vector Machine Model**

In [30]:
from pyspark.ml.classification import LinearSVC

# Define the Support Vector Machine Classifier model
svm = LinearSVC(labelCol="loan_status", featuresCol="features", maxIter=10, regParam=0.1)

# Train the model on the training data
model = svm.fit(trainingData)

# Use the model to make predictions on the testing data
predictions_train = model.transform(trainingData)

# Use the model to make predictions on the testing data
predictions_test = model.transform(testData)

# Evaluate the performance of the model
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol="loan_status")
accuracy_train = evaluator.evaluate(predictions_train)
accuracy_test = evaluator.evaluate(predictions_test)

print(accuracy_train)
print(accuracy_test)

0.6486091409536533
0.6472642099162444
