In [1]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('Project').getOrCreate()

In [2]:
df=spark.read.csv("/FileStore/tables/train_Loan.csv",header=True,inferSchema=True)

In [3]:
df.show()

In [4]:
from pyspark.sql.functions import isnan, when, count, col

In [5]:
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [6]:
#Removing nulls in Loan amount 
from pyspark.sql.functions import mean, md5
mean_val= df.select(mean(df['LoanAmount'])).collect()
mean_la=mean_val[0][0]
df=df.na.fill(mean_la,subset=['LoanAmount'])

In [7]:
#Removing nulls in Loan amount term
LA_counts = df.groupBy(['Loan_Amount_Term']).count().alias('counts')
LA_counts.sort(col("count").desc()).show()
LA_mode=LA_counts.agg({"count": "max"}).collect()[0][0]
print(LA_mode)
temp_LA = LA_counts.filter(LA_counts['count']==LA_mode)
temp_LA.printSchema()
LA_mode = temp_LA.select(['Loan_Amount_Term']).collect()[0][0]
df=df.na.fill(LA_mode,subset=['Loan_Amount_Term'])

In [8]:
#removing nulls in gender
Gender_counts = df.groupBy(['Gender']).count().alias('counts')
Gender_counts.sort(col("count").desc()).show()
Gender_mode=Gender_counts.agg({"count": "max"}).collect()[0][0]
temp_Gender = Gender_counts.filter(Gender_counts['count']==Gender_mode)
Gender_mode = temp_Gender.select(['Gender']).collect()[0][0]
df=df.na.fill(Gender_mode,subset=['Gender'])

In [9]:
# Removing null in married by mode
Mar_counts = df.groupBy(['Married']).count().alias('counts')
Mar_counts.sort(col("count").desc()).show()
Mar_mode=Mar_counts.agg({"count": "max"}).collect()[0][0]
temp_mar = Mar_counts.filter(Mar_counts['count']==Mar_mode)
Mar_mode = temp_mar.select(['Married']).collect()[0][0]
df=df.na.fill(Mar_mode,subset=['Married'])

In [10]:
#removing null in dependents 
Dep_counts = df.groupBy(['Dependents']).count().alias('counts')
Dep_counts.sort(col("count").desc()).show()
Dep_mode=Dep_counts.agg({"count": "max"}).collect()[0][0]
temp_dep = Dep_counts.filter(Dep_counts['count']==Dep_mode)
Dep_mode = temp_dep.select(['Dependents']).collect()[0][0]
df=df.na.fill(Dep_mode,subset=['Dependents'])

In [11]:
#removing null in self employed
emp_counts = df.groupBy(['Self_Employed']).count().alias('counts')
emp_counts.sort(col("count").desc()).show()
emp_mode=emp_counts.agg({"count": "max"}).collect()[0][0]
temp_emp = emp_counts.filter(emp_counts['count']==emp_mode)
emp_mode = temp_emp.select(['Self_Employed']).collect()[0][0]
df=df.na.fill(emp_mode,subset=['Self_Employed'])

In [12]:
#removing null in credit history
ch_counts = df.groupBy(['Credit_History']).count().alias('counts')
ch_counts.sort(col("count").desc()).show()
ch_mode=ch_counts.agg({"count": "max"}).collect()[0][0]
temp_ch = ch_counts.filter(ch_counts['count']==ch_mode)
ch_mode = temp_ch.select(['Credit_History']).collect()[0][0]
df=df.na.fill(ch_mode,subset=['Credit_History'])

In [13]:

df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [14]:
df.show()

In [15]:
#Adding new feature total income
df_with_totalincome = df.withColumn('total_income', df['ApplicantIncome']+df['CoapplicantIncome'])
df_with_totalincome.show()

In [16]:
#Adding new feature ratio of total income to loan amount
df_with_ratio = df_with_totalincome.withColumn('ratio', df_with_totalincome['total_income']/df_with_totalincome['LoanAmount'])
df_with_ratio.show()

In [17]:
#final dataset after removing nulls
selected_data = df_with_ratio.select('Gender',
 'Married',
 'Dependents',
 'Education',
 'Self_Employed',
 'ApplicantIncome',
 'CoapplicantIncome',
 'LoanAmount',
 'Loan_Amount_Term',
 'Credit_History',
 'Property_Area',
 'Loan_Status','total_income','ratio')
selected_data.show()

In [18]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder, StringIndexer


In [19]:
final = selected_data.where(selected_data.Gender.isNotNull())
final = final.where(final.Married.isNotNull())
final = final.where(final.Dependents.isNotNull())
final = final.where(final.Education.isNotNull())
final = final.where(final.Self_Employed.isNotNull())
final.show()

In [20]:
#dummy coding gender Male =0 ; Female =1
gen_indexer = StringIndexer(inputCol="Gender", outputCol="_Gender_index" )
gen_model = gen_indexer.fit(final)
gen_indexed = gen_model.transform(final)
gen_encoder = OneHotEncoder( inputCol="_Gender_index", outputCol="_Gender_vec")
final1 = gen_encoder.transform(gen_indexed)
final1.show()

In [21]:
#Dummy coding Married Yes =0, No =1
mar_indexer = StringIndexer(inputCol="Married", outputCol="_Married_index" )
mar_model = mar_indexer.fit(final1)
mar_indexed = mar_model.transform(final1)
mar_encoder = OneHotEncoder( inputCol="_Married_index", outputCol="_Married_vec")
final2 = mar_encoder.transform(mar_indexed)
final2.show()

In [22]:
#Dummy coding Dependent
dep_indexer = StringIndexer(inputCol="Dependents", outputCol="_Dependents_index" )
dep_model = dep_indexer.fit(final2)
dep_indexed = dep_model.transform(final2)
dep_encoder = OneHotEncoder( inputCol="_Dependents_index", outputCol="_Dependents_vec")
final3 = dep_encoder.transform(dep_indexed)
final3.show()

In [23]:
#Dummy coding Education graduate =0 ; not graduate =1
edu_indexer = StringIndexer(inputCol="Education", outputCol="_Education_index" )
edu_model = edu_indexer.fit(final3)
edu_indexed = edu_model.transform(final3)
edu_encoder = OneHotEncoder( inputCol="_Education_index", outputCol="_Education_vec")
final4 = edu_encoder.transform(edu_indexed)
final4.show()

In [24]:
#Dummy coding Self employed No =0 ; Yes= 1
emp_indexer = StringIndexer(inputCol="Self_Employed", outputCol="_Self_Employed_index" )
emp_model = emp_indexer.fit(final4)
emp_indexed = emp_model.transform(final4)
emp_encoder = OneHotEncoder( inputCol="_Self_Employed_index", outputCol="_Self_Employed_vec")
final5 = emp_encoder.transform(emp_indexed)
final5.show()

In [25]:
#Dummy coding Property Area Urban =1; Rural =0;semiurban =2
area_indexer = StringIndexer(inputCol="Property_Area", outputCol="_Property_Area_index" )
area_model = area_indexer.fit(final5)
area_indexed = area_model.transform(final5)
area_encoder = OneHotEncoder( inputCol="_Property_Area_index", outputCol="_Property_Area_vec")
final6 = area_encoder.transform(area_indexed)
final6.show()

In [26]:
#Dummy coding Loan Status yes =0 ; No = 1
loan_indexer = StringIndexer(inputCol="Loan_Status", outputCol="_Loan_Status_index" )
loan_model = loan_indexer.fit(final6)
loan_indexed = loan_model.transform(final6)
loan_encoder = OneHotEncoder( inputCol="_Loan_Status_index", outputCol="_Loan_Status_vec")
final7 = loan_encoder.transform(loan_indexed)
final7.show()

In [27]:
final7.columns

In [28]:
assembler = VectorAssembler(inputCols = ['ApplicantIncome','CoapplicantIncome','LoanAmount','Loan_Amount_Term','Credit_History','total_income','ratio',
 '_Gender_vec', 
 '_Married_vec', 
 '_Dependents_vec','_Education_vec','_Self_Employed_vec','_Property_Area_vec'], outputCol = 'features')

In [29]:
final8 = assembler.transform(final7)

In [30]:
final8.show()

In [31]:
new_final= final8.select("features","_Loan_Status_index")
new_final.show()

In [32]:
train_data, test_data = new_final.randomSplit([0.7, 0.3])

In [33]:
lr = LogisticRegression(featuresCol = 'features', labelCol = '_Loan_Status_index', maxIter = 10)
train_data.printSchema()
lr_Model = lr.fit(train_data)
pred = lr_Model.transform(test_data)
pred.show()

In [34]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

In [35]:
my_eval = BinaryClassificationEvaluator(labelCol = '_Loan_Status_index')

In [36]:
my_eval.evaluate(pred)

In [37]:
#**** Decision Tree
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier, GBTClassifier
dtc = DecisionTreeClassifier()
dtc = DecisionTreeClassifier(labelCol = "_Loan_Status_index", featuresCol = "features")

In [38]:
dtc_model = dtc.fit(train_data)
dtc_pred = dtc_model.transform(test_data)
dtc_pred.show()

In [39]:
my_eval2 = BinaryClassificationEvaluator(labelCol = '_Loan_Status_index')

In [40]:
my_eval2.evaluate(dtc_pred)

In [41]:
#Random forest
rfc = RandomForestClassifier()
rfc = RandomForestClassifier(labelCol = "_Loan_Status_index", featuresCol = "features")
rfc_model = rfc.fit(train_data)
rfc_pred = rfc_model.transform(test_data)
rfc_pred.show()

In [42]:
my_eval3 = BinaryClassificationEvaluator(labelCol = '_Loan_Status_index')
my_eval3.evaluate(rfc_pred)

In [43]:
#Gradient Boosting
gbc = GBTClassifier
gbc = GBTClassifier(labelCol = "_Loan_Status_index", featuresCol = "features")
gbc_model = gbc.fit(train_data)
gbc_pred = gbc_model.transform(test_data)
gbc_pred.show()

In [44]:
my_eval4 = BinaryClassificationEvaluator(labelCol = '_Loan_Status_index')
my_eval4.evaluate(gbc_pred)

In [45]:
#Uploading the test dataset
test=spark.read.csv("/FileStore/tables/test.csv",header=True,inferSchema=True)

In [46]:
test.show()

In [47]:
test.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in test.columns]).show()

In [48]:
#removing nulls in gender
Gender_counts = test.groupBy(['Gender']).count().alias('counts')
Gender_counts.sort(col("count").desc()).show()
Gender_mode=Gender_counts.agg({"count": "max"}).collect()[0][0]
temp_Gender = Gender_counts.filter(Gender_counts['count']==Gender_mode)
Gender_mode = temp_Gender.select(['Gender']).collect()[0][0]
test=test.na.fill(Gender_mode,subset=['Gender'])

In [49]:
#removing null in self employed
emp_counts = test.groupBy(['Self_Employed']).count().alias('counts')
emp_counts.sort(col("count").desc()).show()
emp_mode=emp_counts.agg({"count": "max"}).collect()[0][0]
temp_emp = emp_counts.filter(emp_counts['count']==emp_mode)
emp_mode = temp_emp.select(['Self_Employed']).collect()[0][0]
test=test.na.fill(emp_mode,subset=['Self_Employed'])

In [50]:
#removing null in dependents 
Dep_counts = test.groupBy(['Dependents']).count().alias('counts')
Dep_counts.sort(col("count").desc()).show()
Dep_mode=Dep_counts.agg({"count": "max"}).collect()[0][0]
temp_dep = Dep_counts.filter(Dep_counts['count']==Dep_mode)
Dep_mode = temp_dep.select(['Dependents']).collect()[0][0]
test=test.na.fill(Dep_mode,subset=['Dependents'])

In [51]:
#removing null in credit history
ch_counts = test.groupBy(['Credit_History']).count().alias('counts')
ch_counts.sort(col("count").desc()).show()
ch_mode=ch_counts.agg({"count": "max"}).collect()[0][0]
temp_ch = ch_counts.filter(ch_counts['count']==ch_mode)
ch_mode = temp_ch.select(['Credit_History']).collect()[0][0]
test=test.na.fill(ch_mode,subset=['Credit_History'])

In [52]:
#Removing nulls in Loan amount 
from pyspark.sql.functions import mean, md5
mean_val= test.select(mean(test['LoanAmount'])).collect()
mean_la=mean_val[0][0]
test=test.na.fill(mean_la,subset=['LoanAmount'])


In [53]:
#removing null in loan amount term
ch_counts = test.groupBy(['Loan_Amount_Term']).count().alias('counts')
ch_counts.sort(col("count").desc()).show()
ch_mode=ch_counts.agg({"count": "max"}).collect()[0][0]
temp_ch = ch_counts.filter(ch_counts['count']==ch_mode)
ch_mode = temp_ch.select(['Loan_Amount_Term']).collect()[0][0]
test=test.na.fill(ch_mode,subset=['Loan_Amount_Term'])

In [54]:
test_with_totalincome = test.withColumn('total_income', test['ApplicantIncome']+test['CoapplicantIncome'])
test_with_totalincome.show()

In [55]:
test_with_ratio = test_with_totalincome.withColumn('ratio', test_with_totalincome['total_income']/test_with_totalincome['LoanAmount'])
test_with_ratio.show()

In [56]:
#final dataset after removing nulls
test_final = test_with_ratio.select('Gender',
 'Married',
 'Dependents',
 'Education',
 'Self_Employed',
 'ApplicantIncome',
 'CoapplicantIncome',
 'LoanAmount',
 'Loan_Amount_Term',
 'Credit_History',
 'Property_Area',
'total_income','ratio')
test_final.show()

In [57]:
#Encoding features of test dataset
gen_indexed = gen_model.transform(test_final)
test_final1= gen_encoder.transform(gen_indexed)
mar_indexed = mar_model.transform(test_final1)
test_final2 = mar_encoder.transform(mar_indexed)
dep_indexed = dep_model.transform(test_final2)
test_final3 = dep_encoder.transform(dep_indexed)
edu_indexed = edu_model.transform(test_final3)
test_final4 = edu_encoder.transform(edu_indexed)
emp_indexed = emp_model.transform(test_final4)
test_final5 = emp_encoder.transform(emp_indexed)
area_indexed = area_model.transform(test_final5)
test_final6 = area_encoder.transform(area_indexed)
test_final6.columns

In [58]:
#Assembling columns and creating feature column for test dataset
test_final7=assembler.transform(test_final6)

In [59]:
test_final7.show()

In [60]:
#Building model on entire training dataset(without splitting)
#Random forest
rfc = RandomForestClassifier()
rfc = RandomForestClassifier(labelCol = "_Loan_Status_index", featuresCol = "features")
rfc_model = rfc.fit(new_final)
rfc_pred = rfc_model.transform(test_final7)
rfc_pred.show()

In [61]:
#final predictions on test data
predictions_dataset = rfc_pred.select('features','rawPrediction','probability','prediction')
predictions_dataset.show()

In [62]:
df1=rfc_pred.filter(rfc_pred['prediction']=="1").describe()

In [63]:
df1.show()

In [64]:
df1.select('summary','ApplicantIncome','CoapplicantIncome','LoanAmount','Loan_Amount_Term','total_income','ratio').show()