In [32]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
        builder. \
        config('spark.ui.port','0'). \
        config('spark.shuffle.useOldFetchProtocol','true'). \
        config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
        enableHiveSupport(). \
        master('yarn'). \
        getOrCreate()

In [33]:
spark.conf.set("spark.sql.unacceptable_rated_pts",0)
spark.conf.set("spark.sql.very_bad_rated_pts",100)
spark.conf.set("spark.sql.bad_rated_pts",250)
spark.conf.set("spark.sql.good_rated_pts",500)
spark.conf.set("spark.sql.very_good_rated_pts",650)
spark.conf.set("spark.sql.excellent_rated_pts",800)

In [34]:
spark.conf.set("spark.sql.unacceptable_grade_pts",750)
spark.conf.set("spark.sql.very_bad_grade_pts",1000)
spark.conf.set("spark.sql.bad_grade_pts",1500)
spark.conf.set("spark.sql.good_grade_pts",2000)
spark.conf.set("spark.sql.very_good_grade_pts",2500)

The tables required to calculate the loan score 

customers_new ,
loans ,
loans_repayments ,
loans_defaulters_delinq_new ,
loans_defaulters_detail_rec_enq_new


In [35]:
bad_customer_data_final_df = spark.read \
.format("csv") \
.option("header", True) \
.option("inferSchema", True) \
.load("/public/trendytech/lendingclubproject/bad/bad_customer_data_final")

In [36]:
bad_customer_data_final_df.createOrReplaceTempView("bad_data_customer")

In [37]:
ph_df = spark.sql("select c.member_id, \
case \
    when p.last_payment_amount < (c.monthly_installment * 0.5) then ${spark.sql.very_bad_rated_pts} \
    when p.last_payment_amount >= (c.monthly_installment * 0.5) and p.last_payment_amount < c.monthly_installment then ${spark.sql.bad_rated_pts} \
    when (p.last_payment_amount = (c.monthly_installment)) then ${spark.sql.good_rated_pts} \
    when p.last_payment_amount > (c.monthly_installment ) and p.last_payment_amount <= (c.monthly_installment * 1.50) then ${spark.sql.very_good_rated_pts} \
    when p.last_payment_amount > (c.monthly_installment * 1.50) then ${spark.sql.excellent_rated_pts} \
    else ${spark.sql.unacceptable_rated_pts} \
end as last_payment_pts, \
case \
    when p.total_payment_received >= (c.funded_amount * 0.50) then ${spark.sql.very_good_rated_pts} \
    when p.total_payment_received < (c.funded_amount * 0.50) and p.total_payment_received > 0 then ${spark.sql.good_rated_pts} \
    when p.total_payment_received = 0 or (p.total_payment_received) is null then ${spark.sql.unacceptable_rated_pts} \
    end as total_payment_pts \
from itv006222_lending_club.loans_repayments p \
inner join itv006222_lending_club.loans c on c.loan_id = p.loan_id where member_id NOT IN (select member_id from bad_data_customer)") \

In [38]:
ph_df.createOrReplaceTempView("ph_pts")

In [39]:
spark.sql("select * from ph_pts")

member_id,last_payment_pts,total_payment_pts
dcec9334e70f1cc95...,800,650
fc58ca61f51f9dcac...,500,650
2fb62a6ca51063b11...,500,650
488268a5531951622...,800,650
ade6026208e48f5f9...,500,650
7c8b0ca6acddfaeb1...,800,650
a707b7fe7c38bad65...,800,650
1df639cddea30c288...,800,650
22d67005e12d8726d...,500,650
009cf312bd46551b4...,500,650


Loan Score Calculation Criteria 2: Loan deafulters history(ldh)

In [40]:
ldh_ph_df = spark.sql("select p.*, \
case \
when d.delinq_2yrs = 0 then ${spark.sql.excellent_rated_pts} \
when d.delinq_2yrs BETWEEN 1 and 2 then ${spark.sql.bad_rated_pts} \
when d.delinq_2yrs BETWEEN 3 and 5 then ${spark.sql.very_bad_rated_pts} \
when d.delinq_2yrs > 5 or d.delinq_2yrs IS NULL THEN ${spark.sql.unacceptable_rated_pts} \
END AS delinq_pts, \
CASE \
when l.pub_rec = 0 then ${spark.sql.excellent_rated_pts} \
when l.pub_rec between 1 and 2 then ${spark.sql.bad_rated_pts} \
when l.pub_rec between 3 and 5 then ${spark.sql.very_bad_rated_pts} \
when l.pub_rec > 5 or l.pub_rec IS NULL then ${spark.sql.very_bad_rated_pts} \
END AS public_records_pts,\
CASE \
when l.pub_rec_bankruptcies = 0 then ${spark.sql.excellent_rated_pts}  \
when l.pub_rec_bankruptcies between 1 and 2 then ${spark.sql.bad_rated_pts} \
when l.pub_rec_bankruptcies between 3 and 5 then ${spark.sql.very_bad_rated_pts} \
when l.pub_rec_bankruptcies > 5 or l.inq_last_6mths IS NULL THEN ${spark.sql.unacceptable_rated_pts} \
END AS public_bankruptcies_pts ,\
CASE \
when l.inq_last_6mths = 0 then ${spark.sql.excellent_rated_pts} \
when l.inq_last_6mths between 1 and 2 then ${spark.sql.bad_rated_pts} \
when l.inq_last_6mths between 3 and 5 then ${spark.sql.very_bad_rated_pts} \
when l.inq_last_6mths > 5 or l.inq_last_6mths IS NULL THEN ${spark.sql.unacceptable_rated_pts} \
END AS enq_pts \
from itv005857_lending_club.loans_defaulters_detail_rec_enq_new l \
inner join itv005857_lending_club.loans_defaulters_delinq_new d ON d.member_id  = l.member_id \
inner join ph_pts p ON p.member_id = l.member_id where l.member_id NOT IN (select member_id from bad_data_customer)")

In [41]:
ldh_ph_df.createOrReplaceTempView("ldh_ph_pts")

In [42]:
spark.sql("select * from ldh_ph_pts")

member_id,last_payment_pts,total_payment_pts,delinq_pts,public_records_pts,public_bankruptcies_pts,enq_pts
000c8875b71a6b47c...,800,650,250,800,800,800
003769d7f54c7859e...,500,500,250,800,800,800
003e1e6cbd2920bbb...,500,650,250,250,250,800
004017b21bd4d6271...,100,650,0,800,800,800
005b4c3db3fce07dc...,500,650,250,250,800,250
00710707c563c2119...,800,650,250,800,800,800
007da79904f69970d...,800,650,250,800,800,800
00f435a80d0440ece...,500,500,100,800,800,800
00fc2ae3ffb1213e4...,650,650,250,800,800,800
00fc8144cb210ba8c...,500,650,250,250,250,800


Criteria 3: Financial Health(fh)

In [43]:
fh_ldh_ph_df = spark.sql("select  ldef.*, \
case \
when LOWER(l.loan_status) like '%fully paid%' then ${spark.sql.excellent_rated_pts} \
when LOWER(l.loan_status) like  '%current%' then ${spark.sql.good_rated_pts} \
when LOWER(l.loan_status) like  '%in grace period%'then ${spark.sql.bad_rated_pts} \
when LOWER(l.loan_status) like  '%late (16-30 days)%' or LOWER(l.loan_status) like'%late (31-120 days)%' THEN ${spark.sql.very_bad_rated_pts} \
when LOWER(l.loan_status) like '%charged off%' then ${spark.sql.unacceptable_rated_pts} \
else ${spark.sql.unacceptable_rated_pts} \
END AS loan_status_pts, \
CASE \
when LOWER(a.home_ownership) like '%own%' then ${spark.sql.excellent_rated_pts} \
when LOWER(a.home_ownership) like  '%rent%' then ${spark.sql.good_rated_pts} \
when LOWER(a.home_ownership) like  '%mortgage%'then ${spark.sql.bad_rated_pts} \
when LOWER(a.home_ownership) like  '%any%' or LOWER(a.home_ownership) IS NULL THEN ${spark.sql.very_bad_rated_pts} \
END AS home_pts, \
CASE \
when l.funded_amount <=(a.total_high_credit_limit * 0.10) then ${spark.sql.excellent_rated_pts}  \
when l.funded_amount > (a.total_high_credit_limit * 0.10) and l.funded_amount <=(a.total_high_credit_limit * 0.20) THEN ${spark.sql.very_good_rated_pts} \
when l.funded_amount > (a.total_high_credit_limit * 0.20) and l.funded_amount <=(a.total_high_credit_limit * 0.30) THEN ${spark.sql.good_rated_pts} \
when l.funded_amount > (a.total_high_credit_limit * 0.30) and l.funded_amount <=(a.total_high_credit_limit * 0.50) THEN ${spark.sql.bad_rated_pts} \
when l.funded_amount > (a.total_high_credit_limit * 0.50) and l.funded_amount <=(a.total_high_credit_limit * 0.70) THEN ${spark.sql.very_bad_rated_pts} \
when l.funded_amount > (a.total_high_credit_limit * 0.70) THEN ${spark.sql.unacceptable_rated_pts} \
else ${spark.sql.unacceptable_rated_pts} \
END AS credit_limit_pts, \
CASE \
when (a.grade) = 'A' and (a.sub_grade)='A1' then ${spark.sql.excellent_rated_pts} \
when (a.grade) = 'A' and (a.sub_grade)='A2' then (${spark.sql.excellent_rated_pts}*0.95) \
when (a.grade) = 'A' and (a.sub_grade)='A3' then (${spark.sql.excellent_rated_pts}*0.90) \
when (a.grade) = 'A' and (a.sub_grade)='A4' then (${spark.sql.excellent_rated_pts}*0.85) \
when (a.grade) = 'A' and (a.sub_grade)='A5' then (${spark.sql.excellent_rated_pts}*0.80) \
when (a.grade) = 'B' and (a.sub_grade)='B1' then ${spark.sql.very_good_rated_pts} \
when (a.grade) = 'B' and (a.sub_grade)='B2' then (${spark.sql.very_good_rated_pts}* 0.95) \
when (a.grade) = 'B' and (a.sub_grade)='B3' then (${spark.sql.very_good_rated_pts}* 0.90) \
when (a.grade) = 'B' and (a.sub_grade)='B4' then (${spark.sql.very_good_rated_pts}* 0.85) \
when (a.grade) = 'B' and (a.sub_grade)='B5' then (${spark.sql.very_good_rated_pts}* 0.80) \
when (a.grade) = 'C' and (a.sub_grade)='C1' then ${spark.sql.good_rated_pts} \
when (a.grade) = 'C' and (a.sub_grade)='C2' then (${spark.sql.good_rated_pts}* 0.95) \
when (a.grade) = 'C' and (a.sub_grade)='C3' then (${spark.sql.good_rated_pts}* 0.90) \
when (a.grade) = 'C' and (a.sub_grade)='C4' then (${spark.sql.good_rated_pts}* 0.85) \
when (a.grade) = 'C' and (a.sub_grade)='C5' then (${spark.sql.good_rated_pts}* 0.80) \
when (a.grade) = 'D' and (a.sub_grade)='D1' then ${spark.sql.bad_rated_pts} \
when (a.grade) = 'D' and (a.sub_grade)='D2' then (${spark.sql.bad_rated_pts}* 0.95) \
when (a.grade) = 'D' and (a.sub_grade)='D3' then (${spark.sql.bad_rated_pts}* 0.90) \
when (a.grade) = 'D' and (a.sub_grade)='D4' then (${spark.sql.bad_rated_pts}* 0.85) \
when (a.grade) = 'D' and (a.sub_grade)='D5' then (${spark.sql.bad_rated_pts}* 0.80) \
when (a.grade) = 'E' and (a.sub_grade)='E1' then ${spark.sql.very_bad_rated_pts} \
when (a.grade) = 'E' and (a.sub_grade)='E2' then (${spark.sql.very_bad_rated_pts}* 0.95) \
when (a.grade) = 'E' and (a.sub_grade)='E3' then (${spark.sql.very_bad_rated_pts}* 0.90) \
when (a.grade) = 'E' and (a.sub_grade)='E4' then (${spark.sql.very_bad_rated_pts}* 0.85) \
when (a.grade) = 'E' and (a.sub_grade)='E5' then (${spark.sql.very_bad_rated_pts}* 0.80) \
when (a.grade) in ('F','G') then (${spark.sql.unacceptable_rated_pts}) \
END AS grade_pts \
from ldh_ph_pts ldef \
inner join itv005857_lending_club.loans l ON ldef.member_id  = l.member_id \
inner join itv005857_lending_club.customers_new a ON a.member_id = ldef.member_id where l.member_id NOT IN (select member_id from bad_data_customer) \
")

In [44]:
fh_ldh_ph_df.createOrReplaceTempView("fh_ldh_ph_pts")

#Final loan score calculation by considering all the 3 criterias with the following %**)
1. Payment History = 20%
2. Loan Defaults = 45%
3. Financial Health = 35% 

In [45]:
loan_score = spark.sql("select member_id, \
((last_payment_pts + total_payment_pts )*0.20) as payment_history_pts, \
((delinq_pts + public_records_pts + public_bankruptcies_pts + enq_pts)*0.45) as defaulters_history_pts, \
((loan_status_pts + home_pts + credit_limit_pts + grade_pts)* 0.35) as financial_health_pts \
 from fh_ldh_ph_pts")

In [46]:
loan_score

member_id,payment_history_pts,defaulters_history_pts,financial_health_pts
000c8875b71a6b47c...,290.0,1192.5,885.5
003769d7f54c7859e...,200.0,1192.5,402.5
003e1e6cbd2920bbb...,230.0,697.5,766.5
004017b21bd4d6271...,150.0,1080.0,822.5
005b4c3db3fce07dc...,230.0,697.5,619.5
00710707c563c2119...,290.0,1192.5,829.5
007da79904f69970d...,290.0,1192.5,822.5
00f435a80d0440ece...,200.0,1125.0,708.75
00fc2ae3ffb1213e4...,260.0,1192.5,752.5
00fc8144cb210ba8c...,230.0,697.5,717.5


In [47]:
final_loan_score = loan_score.withColumn('loan_score',loan_score.payment_history_pts + loan_score.defaulters_history_pts + loan_score.financial_health_pts)

In [48]:
final_loan_score.createOrReplaceTempView("loan_score_eval")

In [49]:
spark.sql("select * from loan_score_eval")

member_id,payment_history_pts,defaulters_history_pts,financial_health_pts,loan_score
000c8875b71a6b47c...,290.0,1192.5,885.5,2368.0
003769d7f54c7859e...,200.0,1192.5,402.5,1795.0
003e1e6cbd2920bbb...,230.0,697.5,766.5,1694.0
004017b21bd4d6271...,150.0,1080.0,822.5,2052.5
005b4c3db3fce07dc...,230.0,697.5,619.5,1547.0
00710707c563c2119...,290.0,1192.5,829.5,2312.0
007da79904f69970d...,290.0,1192.5,822.5,2305.0
00f435a80d0440ece...,200.0,1125.0,708.75,2033.75
00fc2ae3ffb1213e4...,260.0,1192.5,752.5,2205.0
00fc8144cb210ba8c...,230.0,697.5,717.5,1645.0


In [50]:
loan_score_final = spark.sql("select ls.*, \
case \
WHEN loan_score > ${spark.sql.very_good_grade_pts} THEN 'A' \
WHEN loan_score <= ${spark.sql.very_good_grade_pts} AND loan_score > ${spark.sql.good_grade_pts} THEN 'B' \
WHEN loan_score <= ${spark.sql.good_grade_pts} AND loan_score > ${spark.sql.bad_grade_pts} THEN 'C' \
WHEN loan_score <= ${spark.sql.bad_grade_pts} AND loan_score > ${spark.sql.very_bad_grade_pts} THEN 'D' \
WHEN loan_score <= ${spark.sql.very_bad_grade_pts} AND loan_score > ${spark.sql.unacceptable_grade_pts} THEN 'E' \
WHEN loan_score <= ${spark.sql.unacceptable_grade_pts} THEN 'F' \
end as loan_final_grade \
from loan_score_eval ls")

In [51]:
loan_score_final.createOrReplaceTempView("loan_final_table")

In [52]:
spark.sql("select * from loan_final_table where loan_final_grade in ('C')")

member_id,payment_history_pts,defaulters_history_pts,financial_health_pts,loan_score,loan_final_grade
003769d7f54c7859e...,200.0,1192.5,402.5,1795.0,C
003e1e6cbd2920bbb...,230.0,697.5,766.5,1694.0,C
005b4c3db3fce07dc...,230.0,697.5,619.5,1547.0,C
00fc8144cb210ba8c...,230.0,697.5,717.5,1645.0,C
017ce564dc0d6f975...,200.0,945.0,591.5,1736.5,C
017d1fd3d6169ee29...,260.0,697.5,694.75,1652.25,C
01d0686210978f0a7...,200.0,1080.0,619.5,1899.5,C
01d0c48835e969a01...,200.0,1192.5,262.5,1655.0,C
02f69de1384fcf78c...,180.0,945.0,864.5,1989.5,C
032503cc8f86dea72...,200.0,945.0,647.5,1792.5,C


In [53]:
spark.sql("select count(*) from loan_final_table")

count(1)
482842


In [None]:
loan_score_final.write \
.format("parquet") \
.mode("overwrite") \
.option("path","/public/trendytech/lendingclubproject/processed/loan_score") \
.save()

In [None]:
spark.stop()