# Creating A Spark Session

In [1]:
from pyspark.sql import SparkSession
import pandas as pd
import matplotlib.pyplot as plt
import gc

In [2]:
spark= SparkSession.builder.appName('loan_data').getOrCreate()

21/11/28 15:46:58 WARN Utils: Your hostname, amenemope-HP-250-G5-Notebook-PC resolves to a loopback address: 127.0.1.1; using 192.168.43.108 instead (on interface wlp2s0)
21/11/28 15:46:58 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/11/28 15:47:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark

# Reading The Data

In [4]:
sparkData= spark.read.option('header', 'true').csv('loan_data.csv',header=True,inferSchema=True)

                                                                                

In [5]:
sparkData

DataFrame[id: int, year: int, issue_d: string, final_d: int, emp_length_int: double, home_ownership: string, home_ownership_cat: int, income_category: string, annual_inc: int, income_cat: int, loan_amount: int, term: string, term_cat: int, application_type: string, application_type_cat: int, purpose: string, purpose_cat: int, interest_payments: string, interest_payment_cat: int, loan_condition: string, loan_condition_cat: int, interest_rate: double, grade: string, grade_cat: int, dti: double, total_pymnt: double, total_rec_prncp: double, recoveries: double, installment: double, region: string]

# Exploratory Data Analysis

### check schema of each column

In [6]:
sparkData.printSchema()

root
 |-- id: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- issue_d: string (nullable = true)
 |-- final_d: integer (nullable = true)
 |-- emp_length_int: double (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- home_ownership_cat: integer (nullable = true)
 |-- income_category: string (nullable = true)
 |-- annual_inc: integer (nullable = true)
 |-- income_cat: integer (nullable = true)
 |-- loan_amount: integer (nullable = true)
 |-- term: string (nullable = true)
 |-- term_cat: integer (nullable = true)
 |-- application_type: string (nullable = true)
 |-- application_type_cat: integer (nullable = true)
 |-- purpose: string (nullable = true)
 |-- purpose_cat: integer (nullable = true)
 |-- interest_payments: string (nullable = true)
 |-- interest_payment_cat: integer (nullable = true)
 |-- loan_condition: string (nullable = true)
 |-- loan_condition_cat: integer (nullable = true)
 |-- interest_rate: double (nullable = true)
 |-- grade: string 

In [7]:
print(f'there are {len(sparkData.columns)} columns in the data')

there are 30 columns in the data


## summary of data

In [8]:
sparkData.describe().toPandas()

21/11/25 21:24:31 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Unnamed: 0,summary,id,year,issue_d,final_d,emp_length_int,home_ownership,home_ownership_cat,income_category,annual_inc,...,loan_condition_cat,interest_rate,grade,grade_cat,dti,total_pymnt,total_rec_prncp,recoveries,installment,region
0,count,3387379.0,3387379.0,3387379,3387379.0,3387379.0,3387379,3387379.0,3387379,3387379.0,...,3387379.0,3387379.0,3387379,3387379.0,3387379.0,3387379.0,3387379.0,3387379.0,3387379.0,3387379
1,mean,8762594.174506307,2010.1679251126016,,1060524.219081479,5.293106336709553,,2.176661660829804,,70650.9346161147,...,0.3788690311890107,13.283059160583116,,3.688483042493917,15.191611628469696,11015.24379417175,8721.105830526885,1209.328651923013,381.9545994323065,
2,stddev,18328366.136619914,2.470244249668531,,38256.85618165062,3.546021492706171,,0.9730323337491164,,57869.52130436754,...,0.4851055120968272,3.557012351788819,,1.8727577461600664,10.788933311941042,7713.8712688621135,6429.248612460843,2585.602588556803,212.96420493218605,
3,min,54734.0,2007.0,01/01/2008,1012008.0,-1.5949915008595716,ANY,1.0,High,0.0,...,0.0,5.318858162252189,A,1.0,-7.950813226178009,-5302.105831571138,-7805.787931009369,-7951.992350428402,-264.97064527010417,Northern-Irl
4,max,68617057.0,2015.0,01/12/2015,1122015.0,13.56897787805175,RENT,6.0,Medium,9500000.0,...,1.0,28.99,G,7.0,9999.0,57777.57987,35000.03,33520.27,1445.46,ulster


In [15]:
customers=sparkData.select(['id']).distinct().count()
print(f'There are {customers} customers')

[Stage 12:>                                                         (0 + 2) / 2]

There are 887379 customers


                                                                                

# Data PreProcessing

the strings; 'home_ownership','term', 'income_category', 'application_type', 'purpose', 'interest_payments', 'loan_condition', 'grade, columns have their ordinal encoded form tagged with _cat i.e home_ownership_cat, term_cat, etc

In [7]:
### dropping strings and id columns

In [5]:
dropString=sparkData.drop('id','home_ownership','term', 'income_category', 'application_type', 'purpose', 'interest_payments', 'loan_condition', 'grade')

In [6]:
dropString.limit(5).toPandas()

[Stage 2:>                                                          (0 + 1) / 1]                                                                                

Unnamed: 0,year,issue_d,final_d,emp_length_int,home_ownership_cat,annual_inc,income_cat,loan_amount,term_cat,application_type_cat,...,interest_payment_cat,loan_condition_cat,interest_rate,grade_cat,dti,total_pymnt,total_rec_prncp,recoveries,installment,region
0,2011,01/12/2011,1012015,10.0,1,24000,1,5000,1,1,...,1,0,10.65,2,27.65,5861.071414,5000.0,0.0,162.87,munster
1,2011,01/12/2011,1042013,0.5,1,30000,1,2500,2,1,...,2,1,15.27,3,1.0,1008.71,456.46,117.08,59.83,leinster
2,2011,01/12/2011,1062014,10.0,1,12252,1,2400,1,1,...,2,0,15.96,3,8.72,3003.653644,2400.0,0.0,84.33,cannught
3,2011,01/12/2011,1012015,10.0,1,49200,1,10000,1,1,...,2,0,13.49,3,20.0,12226.30221,10000.0,0.0,339.31,ulster
4,2011,01/12/2011,1012016,1.0,1,80000,1,3000,2,1,...,1,0,12.69,2,17.94,3242.17,2233.1,0.0,67.79,ulster


#### the year column and the issue_d column speaks of the same year the loan was applied, the issue_d column is dropped

In [7]:
dropIssue=dropString.drop('issue_d')

In [8]:
del(dropString)
gc.collect()

18

In [9]:
dropIssue.limit(5).toPandas()

Unnamed: 0,year,final_d,emp_length_int,home_ownership_cat,annual_inc,income_cat,loan_amount,term_cat,application_type_cat,purpose_cat,interest_payment_cat,loan_condition_cat,interest_rate,grade_cat,dti,total_pymnt,total_rec_prncp,recoveries,installment,region
0,2011,1012015,10.0,1,24000,1,5000,1,1,1,1,0,10.65,2,27.65,5861.071414,5000.0,0.0,162.87,munster
1,2011,1042013,0.5,1,30000,1,2500,2,1,2,2,1,15.27,3,1.0,1008.71,456.46,117.08,59.83,leinster
2,2011,1062014,10.0,1,12252,1,2400,1,1,3,2,0,15.96,3,8.72,3003.653644,2400.0,0.0,84.33,cannught
3,2011,1012015,10.0,1,49200,1,10000,1,1,4,2,0,13.49,3,20.0,12226.30221,10000.0,0.0,339.31,ulster
4,2011,1012016,1.0,1,80000,1,3000,2,1,4,1,0,12.69,2,17.94,3242.17,2233.1,0.0,67.79,ulster


In [10]:
### extracting the year of final_d

In [11]:
from pyspark.sql.functions import col, udf
function =  udf (lambda x: str(x)[-4:])

dropIssue = dropIssue.withColumn('final_date', function(col('final_d')))
final_date=dropIssue.drop('final_d')


In [12]:
final_date.printSchema()

root
 |-- year: integer (nullable = true)
 |-- emp_length_int: double (nullable = true)
 |-- home_ownership_cat: integer (nullable = true)
 |-- annual_inc: integer (nullable = true)
 |-- income_cat: integer (nullable = true)
 |-- loan_amount: integer (nullable = true)
 |-- term_cat: integer (nullable = true)
 |-- application_type_cat: integer (nullable = true)
 |-- purpose_cat: integer (nullable = true)
 |-- interest_payment_cat: integer (nullable = true)
 |-- loan_condition_cat: integer (nullable = true)
 |-- interest_rate: double (nullable = true)
 |-- grade_cat: integer (nullable = true)
 |-- dti: double (nullable = true)
 |-- total_pymnt: double (nullable = true)
 |-- total_rec_prncp: double (nullable = true)
 |-- recoveries: double (nullable = true)
 |-- installment: double (nullable = true)
 |-- region: string (nullable = true)
 |-- final_date: string (nullable = true)



In [13]:
del(dropIssue)
gc.collect()

23

In [14]:

### converting final_date to integer

In [15]:
final_date_int=final_date.withColumn("final_date",final_date["final_date"].cast("Integer"))
final_date_int.printSchema()

root
 |-- year: integer (nullable = true)
 |-- emp_length_int: double (nullable = true)
 |-- home_ownership_cat: integer (nullable = true)
 |-- annual_inc: integer (nullable = true)
 |-- income_cat: integer (nullable = true)
 |-- loan_amount: integer (nullable = true)
 |-- term_cat: integer (nullable = true)
 |-- application_type_cat: integer (nullable = true)
 |-- purpose_cat: integer (nullable = true)
 |-- interest_payment_cat: integer (nullable = true)
 |-- loan_condition_cat: integer (nullable = true)
 |-- interest_rate: double (nullable = true)
 |-- grade_cat: integer (nullable = true)
 |-- dti: double (nullable = true)
 |-- total_pymnt: double (nullable = true)
 |-- total_rec_prncp: double (nullable = true)
 |-- recoveries: double (nullable = true)
 |-- installment: double (nullable = true)
 |-- region: string (nullable = true)
 |-- final_date: integer (nullable = true)



In [16]:
del(final_date)
gc.collect()

32

In [17]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="region", outputCol="region_cat")
regionData = indexer.fit(final_date_int).transform(final_date_int).drop('region')

regionData.limit(5).toPandas()

                                                                                

Unnamed: 0,year,emp_length_int,home_ownership_cat,annual_inc,income_cat,loan_amount,term_cat,application_type_cat,purpose_cat,interest_payment_cat,loan_condition_cat,interest_rate,grade_cat,dti,total_pymnt,total_rec_prncp,recoveries,installment,final_date,region_cat
0,2011,10.0,1,24000,1,5000,1,1,1,1,0,10.65,2,27.65,5861.071414,5000.0,0.0,162.87,2015,4.0
1,2011,0.5,1,30000,1,2500,2,1,2,2,1,15.27,3,1.0,1008.71,456.46,117.08,59.83,2013,1.0
2,2011,10.0,1,12252,1,2400,1,1,3,2,0,15.96,3,8.72,3003.653644,2400.0,0.0,84.33,2014,3.0
3,2011,10.0,1,49200,1,10000,1,1,4,2,0,13.49,3,20.0,12226.30221,10000.0,0.0,339.31,2015,2.0
4,2011,1.0,1,80000,1,3000,2,1,4,1,0,12.69,2,17.94,3242.17,2233.1,0.0,67.79,2016,2.0


In [18]:
del(final_date_int)
gc.collect()

121

### k means on the categorical features

In [19]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import ClusteringEvaluator


columns = [cols for cols in regionData.columns if 'cat' in cols ]
print(columns)
assembler = VectorAssembler().setInputCols(columns).setOutputCol("features")

features= assembler.transform(regionData)

['home_ownership_cat', 'income_cat', 'term_cat', 'application_type_cat', 'purpose_cat', 'interest_payment_cat', 'loan_condition_cat', 'grade_cat', 'region_cat']


In [20]:
features.select("features").show(5)

+--------------------+
|            features|
+--------------------+
|[1.0,1.0,1.0,1.0,...|
|[1.0,1.0,2.0,1.0,...|
|[1.0,1.0,1.0,1.0,...|
|[1.0,1.0,1.0,1.0,...|
|[1.0,1.0,2.0,1.0,...|
+--------------------+
only showing top 5 rows



In [21]:
###splitting the data into train and test

In [22]:
train, test= features.select("features").randomSplit(weights=[0.7, 0.3], seed=42)

Choosing The Number Of Clusters

In [55]:

import numpy as np
cost = np.zeros(7)
for k in range(2,7):
    kmeans = KMeans()\
            .setK(k)\
            .setSeed(1) \
            .setFeaturesCol("features")

    model = kmeans.fit(train)
    

    trainingCost = model.summary.trainingCost
    print("Training Cost " + str(trainingCost))
    
    cost[k] = trainingCost

21/11/28 13:59:16 WARN MemoryStore: Not enough space to cache rdd_824_2 in memory! (computed 44.3 MiB so far)
21/11/28 13:59:16 WARN BlockManager: Persisting block rdd_824_2 to disk instead.
21/11/28 13:59:28 WARN MemoryStore: Not enough space to cache rdd_824_3 in memory! (computed 29.5 MiB so far)
21/11/28 13:59:28 WARN BlockManager: Persisting block rdd_824_3 to disk instead.
                                                                                

Training Cost 30602622.562184054


21/11/28 14:04:32 WARN MemoryStore: Not enough space to cache rdd_859_2 in memory! (computed 44.3 MiB so far)
21/11/28 14:04:32 WARN BlockManager: Persisting block rdd_859_2 to disk instead.
                                                                                

Training Cost 21915401.34540926


21/11/28 14:09:49 WARN MemoryStore: Not enough space to cache rdd_916_5 in memory! (computed 44.3 MiB so far)
21/11/28 14:09:49 WARN BlockManager: Persisting block rdd_916_5 to disk instead.
                                                                                

Training Cost 19315363.56206024


21/11/28 14:15:37 WARN MemoryStore: Not enough space to cache rdd_969_5 in memory! (computed 19.7 MiB so far)
21/11/28 14:15:37 WARN BlockManager: Persisting block rdd_969_5 to disk instead.
                                                                                

Training Cost 17093278.985239677


21/11/28 14:20:26 WARN MemoryStore: Not enough space to cache rdd_1036_2 in memory! (computed 44.3 MiB so far)
21/11/28 14:20:26 WARN BlockManager: Persisting block rdd_1036_2 to disk instead.
21/11/28 14:21:43 WARN MemoryStore: Not enough space to cache rdd_1036_5 in memory! (computed 44.3 MiB so far)
21/11/28 14:21:43 WARN BlockManager: Persisting block rdd_1036_5 to disk instead.
                                                                                

Training Cost 16414509.725514505


In [56]:
cost[2:]

array([30602622.56218405, 21915401.34540926, 19315363.56206024,
       17093278.98523968, 16414509.72551451])

In [None]:
import numpy as np
import matplotlib.mlab as mlab
import matplotlib.pyplot as plt
import seaborn as sbs
from matplotlib.ticker import MaxNLocator

fig, ax = plt.subplots(1,1, figsize =(8,6))
ax.plot(range(2,7),cost[2:7])
ax.set_xlabel('k')
ax.set_ylabel('cost')
ax.xaxis.set_major_locator(MaxNLocator(integer=True))
plt.show()

In [23]:
##setting the number of clusters to 3

kmeans = KMeans().setK(3).setSeed(1).setFeaturesCol("features")
model = kmeans.fit(train)

21/11/28 15:54:50 WARN MemoryStore: Not enough space to cache rdd_48_2 in memory! (computed 44.3 MiB so far)
21/11/28 15:54:50 WARN BlockManager: Persisting block rdd_48_2 to disk instead.
                                                                                

In [24]:
# Make predictions
predictions = model.transform(test)

In [26]:
predictions.show()

[Stage 49:>                                                         (0 + 1) / 1]

+--------------------+----------+
|            features|prediction|
+--------------------+----------+
|[1.0,1.0,1.0,1.0,...|         2|
|[1.0,1.0,1.0,1.0,...|         2|
|[1.0,1.0,1.0,1.0,...|         2|
|[1.0,1.0,1.0,1.0,...|         2|
|[1.0,1.0,1.0,1.0,...|         2|
|[1.0,1.0,1.0,1.0,...|         2|
|[1.0,1.0,1.0,1.0,...|         2|
|[1.0,1.0,1.0,1.0,...|         2|
|[1.0,1.0,1.0,1.0,...|         2|
|[1.0,1.0,1.0,1.0,...|         2|
|[1.0,1.0,1.0,1.0,...|         2|
|[1.0,1.0,1.0,1.0,...|         2|
|[1.0,1.0,1.0,1.0,...|         2|
|[1.0,1.0,1.0,1.0,...|         2|
|[1.0,1.0,1.0,1.0,...|         2|
|[1.0,1.0,1.0,1.0,...|         2|
|[1.0,1.0,1.0,1.0,...|         2|
|[1.0,1.0,1.0,1.0,...|         2|
|[1.0,1.0,1.0,1.0,...|         2|
|[1.0,1.0,1.0,1.0,...|         2|
+--------------------+----------+
only showing top 20 rows



                                                                                

In [None]:
# Evaluate clustering by computing Silhouette score
from pyspark.ml.evaluation import ClusteringEvaluator

evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

In [None]:
# Show up the centers.
centers = model.clusterCenters()
print("Cluster Centers: \n")
i=1
for center in centers:
    print(f'center {i} : {center} \n')
    i +=1

In [None]:
clustering_features.limit(5).toPandas()

In [27]:
features

DataFrame[year: int, emp_length_int: double, home_ownership_cat: int, annual_inc: int, income_cat: int, loan_amount: int, term_cat: int, application_type_cat: int, purpose_cat: int, interest_payment_cat: int, loan_condition_cat: int, interest_rate: double, grade_cat: int, dti: double, total_pymnt: double, total_rec_prncp: double, recoveries: double, installment: double, final_date: int, region_cat: double, features: vector]

### Linear Regression

In [28]:
columns = ['emp_length_int', 'home_ownership_cat', 'annual_inc', 'purpose_cat', 'dti', 'region_cat']
print(columns)
assembler = VectorAssembler().setInputCols(columns).setOutputCol("features")

features= assembler.transform(regionData)

['emp_length_int', 'home_ownership_cat', 'annual_inc', 'purpose_cat', 'dti', 'region_cat']


In [31]:
features.select('features').show()

+--------------------+
|            features|
+--------------------+
|[10.0,1.0,24000.0...|
|[0.5,1.0,30000.0,...|
|[10.0,1.0,12252.0...|
|[10.0,1.0,49200.0...|
|[1.0,1.0,80000.0,...|
|[3.0,1.0,36000.0,...|
|[8.0,1.0,47004.0,...|
|[9.0,1.0,48000.0,...|
|[4.0,2.0,40000.0,...|
|[0.5,1.0,15000.0,...|
|[5.0,2.0,72000.0,...|
|[10.0,2.0,75000.0...|
|[0.5,1.0,30000.0,...|
|[3.0,1.0,15000.0,...|
|[3.0,1.0,100000.0...|
|[0.5,1.0,28000.0,...|
|[4.0,1.0,42000.0,...|
|[10.0,3.0,110000....|
|[1.0,3.0,84000.0,...|
|[6.0,1.0,77385.0,...|
+--------------------+
only showing top 20 rows



In [35]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

In [36]:
data=features.select(["features", 'loan_amount'])

data= data.withColumnRenamed('loan_amount', 'label')

train, test= data.randomSplit(weights=[0.7, 0.3], seed=42)

In [38]:
model= DecisionTreeRegressor(featuresCol='features', labelCol='label')

grid= ParamGridBuilder().addGrid(model.maxDepth,[10,20,30,40]).build()
cv= CrossValidator(estimator=model, evaluator=RegressionEvaluator(), estimatorParamMaps=grid, numFolds=3)


In [None]:
cv=cv.fit(train)

21/11/28 16:24:40 WARN NettyRpcEnv: Ignored failure: java.util.concurrent.TimeoutException: Cannot receive any reply from 192.168.43.108:39647 in 10000 milliseconds
21/11/28 16:26:40 WARN DAGScheduler: Broadcasting large task binary with size 1145.3 KiB
21/11/28 16:26:46 WARN DAGScheduler: Broadcasting large task binary with size 2.0 MiB
21/11/28 16:26:53 WARN DAGScheduler: Broadcasting large task binary with size 3.6 MiB
21/11/28 16:27:06 WARN DAGScheduler: Broadcasting large task binary with size 6.2 MiB
21/11/28 16:27:20 WARN DAGScheduler: Broadcasting large task binary with size 1549.6 KiB
21/11/28 16:28:39 WARN DAGScheduler: Broadcasting large task binary with size 10.3 MiB

In [None]:
plt.plot([0.0001,0.001,0.01,0.1], cv.avgMetrics)
plt.xlabel('regularization parameter')
plt.ylabel('MSE')

In [49]:
pred=cv.bestModel.evaluate(test)

                                                                                

In [53]:
print(f'model coefficients: \n{cv.bestModel.coefficients}\n\nmodel intercept:\n{cv.bestModel.intercept}')

model coefficients: 
[71.27593720505661,165.1301689139723,9892.385911686355,3870.048362512404,-77.50899926922554,-187.08707509970984,-882.695564846995,-196.69262564676035,-36.042433077472815]

model intercept:
-2383.2991965715246


In [None]:
### Prediction on the test
pred= model.evaluate(test)

In [51]:
pred.predictions.show()

[Stage 109:>                                                        (0 + 1) / 1]

+--------------------+-----+------------------+
|            features|label|        prediction|
+--------------------+-----+------------------+
|[1.0,1.0,1.0,1.0,...| 1000|11154.252483730568|
|[1.0,1.0,1.0,1.0,...| 1200|11154.252483730568|
|[1.0,1.0,1.0,1.0,...| 1400|11154.252483730568|
|[1.0,1.0,1.0,1.0,...| 1500|11154.252483730568|
|[1.0,1.0,1.0,1.0,...| 1500|11154.252483730568|
|[1.0,1.0,1.0,1.0,...| 1600|11154.252483730568|
|[1.0,1.0,1.0,1.0,...| 1600|11154.252483730568|
|[1.0,1.0,1.0,1.0,...| 1800|11154.252483730568|
|[1.0,1.0,1.0,1.0,...| 1925|11154.252483730568|
|[1.0,1.0,1.0,1.0,...| 2000|11154.252483730568|
|[1.0,1.0,1.0,1.0,...| 2000|11154.252483730568|
|[1.0,1.0,1.0,1.0,...| 2000|11154.252483730568|
|[1.0,1.0,1.0,1.0,...| 2000|11154.252483730568|
|[1.0,1.0,1.0,1.0,...| 2000|11154.252483730568|
|[1.0,1.0,1.0,1.0,...| 2000|11154.252483730568|
|[1.0,1.0,1.0,1.0,...| 2000|11154.252483730568|
|[1.0,1.0,1.0,1.0,...| 2000|11154.252483730568|
|[1.0,1.0,1.0,1.0,...| 2100|11154.252483

                                                                                

In [54]:
print(f'Mean Squared Error: {pred.meanSquaredError}\nMean Absolute Error: {pred.meanAbsoluteError}\nR Squared: {pred.r2}')

Mean Squared Error: 43050214.10995168
Mean Absolute Error: 5262.867349441982
R Squared: 0.1561349325749639
