In [1]:
from pyspark.sql import SparkSession
import getpass 
username = getpass.getuser()
spark = SparkSession. \
     builder. \
     config('spark.ui.port', '0'). \
     config("spark.sql.warehouse.dir", f"/user/itv009198/warehouse"). \
     config('spark.shuffle.useOldFetchProtocol', 'true'). \
     enableHiveSupport(). \
     master('yarn'). \
     getOrCreate()

In [2]:
spark.conf.set("spark.sql.unacceptable_rated_pts",0)
spark.conf.set("spark.sql.very_bad_rated_pts",100)
spark.conf.set("spark.sql.bad_rated_pts",250)
spark.conf.set("spark.sql.good_rated_pts",500)
spark.conf.set("spark.sql.very_good_rated_pts",650)
spark.conf.set("spark.sql.excellent_rated_pts",800)
  

In [3]:
spark.conf.set("spark.sql.unacceptable_grade_pts",750)
spark.conf.set("spark.sql.very_bad_grade_pts",1000)
spark.conf.set("spark.sql.bad_grade_pts",1500)
spark.conf.set("spark.sql.good_grade_pts",2000)
spark.conf.set("spark.sql.very_good_grade_pts",2500)

In [4]:
bad_customer_data_final_df = spark.read \
.format("csv") \
.option("header", True) \
.option("inferSchema", True) \
.load("/public/trendytech/lendingclubproject/bad/bad_customer_data_final")

In [5]:
bad_customer_data_final_df.createOrReplaceTempView("bad_data_customer")

In [6]:
ph_df = spark.sql("select c.member_id, \
 case \
when p.last_payment_amount < (c.monthly_installment * 0.5) then ${spark.sql.very_bad_rated_pts} \
when p.last_payment_amount >= (c.monthly_installment * 0.5) and p.last_payment_amount < c.monthly_installment then ${spark.sql.very_bad_rated_pts} \
when (p.last_payment_amount = (c.monthly_installment)) then ${spark.sql.good_rated_pts} \
when p.last_payment_amount > (c.monthly_installment) and p.last_payment_amount <= (c.monthly_installment * 1.50) then ${spark.sql.very_good_rated_pts} \
when p.last_payment_amount > (c.monthly_installment * 1.50) then ${spark.sql.excellent_rated_pts} \
else ${spark.sql.unacceptable_rated_pts} \
end as last_payment_pts, \
case \
when p.total_payment_received >= (c.funded_amount * 0.50) then ${spark.sql.very_good_rated_pts} \
when p.total_payment_received < (c.funded_amount * 0.50) and p.total_payment_received > 0 then ${spark.sql.good_rated_pts} \
when p.total_payment_received = 0 or (p.total_payment_received) is null then ${spark.sql.unacceptable_rated_pts} \
end as total_payment_pts \
from itv009198_lending_club.loans_repayments p \
inner join itv009198_lending_club.loans c on c.loan_id = p.loan_id where member_id NOT IN (select member_id from bad_data_customer)")

In [7]:
ph_df.createOrReplaceTempView("ph_pts")

In [8]:
spark.sql("select * from ph_pts")

member_id,last_payment_pts,total_payment_pts
dcec9334e70f1cc95...,800,650
fc58ca61f51f9dcac...,500,650
2fb62a6ca51063b11...,500,650
488268a5531951622...,800,650
ade6026208e48f5f9...,500,650
7c8b0ca6acddfaeb1...,800,650
a707b7fe7c38bad65...,800,650
1df639cddea30c288...,800,650
22d67005e12d8726d...,500,650
009cf312bd46551b4...,500,650


In [9]:
ldh_ph_df = spark.sql(
"select p.*, \
CASE \
WHEN d.delinq_2yrs = 0 THEN ${spark.sql.excellent_rated_pts} \
WHEN d.delinq_2yrs BETWEEN 1 AND 2 THEN ${spark.sql.bad_rated_pts} \
WHEN d.delinq_2yrs BETWEEN 3 AND 5 THEN ${spark.sql.very_bad_rated_pts} \
WHEN d.delinq_2yrs > 5 OR d.delinq_2yrs IS NULL THEN ${spark.sql.unacceptable_grade_pts} \
END AS delinq_pts, \
CASE \
WHEN l.pub_rec = 0 THEN ${spark.sql.excellent_rated_pts} \
WHEN l.pub_rec BETWEEN 1 AND 2 THEN ${spark.sql.bad_rated_pts} \
WHEN l.pub_rec BETWEEN 3 AND 5 THEN ${spark.sql.very_bad_rated_pts} \
WHEN l.pub_rec > 5 OR l.pub_rec IS NULL THEN ${spark.sql.very_bad_rated_pts} \
END AS public_records_pts, \
CASE \
WHEN l.pub_rec_bankruptcies = 0 THEN ${spark.sql.excellent_rated_pts} \
WHEN l.pub_rec_bankruptcies BETWEEN 1 AND 2 THEN ${spark.sql.bad_rated_pts} \
WHEN l.pub_rec_bankruptcies BETWEEN 3 AND 5 THEN ${spark.sql.very_bad_rated_pts} \
WHEN l.pub_rec_bankruptcies > 5 OR l.pub_rec_bankruptcies IS NULL THEN ${spark.sql.very_bad_rated_pts} \
END as public_bankruptcies_pts, \
CASE \
WHEN l.inq_last_6mths = 0 THEN ${spark.sql.excellent_rated_pts} \
WHEN l.inq_last_6mths BETWEEN 1 AND 2 THEN ${spark.sql.bad_rated_pts} \
WHEN l.inq_last_6mths BETWEEN 3 AND 5 THEN ${spark.sql.very_bad_rated_pts} \
WHEN l.inq_last_6mths > 5 OR l.inq_last_6mths IS NULL THEN ${spark.sql.unacceptable_rated_pts} \
END AS enq_pts \
FROM itv009198_lending_club.loans_defaulters_detail_records_enq_new l \
INNER JOIN itv009198_lending_club.loans_defaulters_delinq_new d ON d.member_id = l.member_id  \
INNER JOIN ph_pts p ON p.member_id = l.member_id where l.member_id NOT IN (select member_id from bad_data_customer)")

In [10]:
ldh_ph_df.createOrReplaceTempView("ldh_ph_pts")

In [11]:
spark.sql("select * from ldh_ph_pts")

member_id,last_payment_pts,total_payment_pts,delinq_pts,public_records_pts,public_bankruptcies_pts,enq_pts
000c8875b71a6b47c...,800,650,250,800,800,800
003769d7f54c7859e...,500,500,250,800,800,800
003e1e6cbd2920bbb...,500,650,250,250,250,800
004017b21bd4d6271...,100,650,750,800,800,800
005b4c3db3fce07dc...,500,650,250,250,800,250
00710707c563c2119...,800,650,250,800,800,800
007da79904f69970d...,800,650,250,800,800,800
00f435a80d0440ece...,500,500,100,800,800,800
00fc2ae3ffb1213e4...,650,650,250,800,800,800
00fc8144cb210ba8c...,500,650,250,250,250,800


In [12]:
fh_ldh_ph_df = spark.sql("SELECT ldef.*, \
CASE \
   WHEN LOWER(l.loan_status) LIKE '%fully paid%' THEN ${spark.sql.excellent_rated_pts} \
   WHEN LOWER(l.loan_status) LIKE '%current%' THEN ${spark.sql.good_rated_pts} \
   WHEN LOWER(l.loan_status) LIKE '%in grace period%' THEN ${spark.sql.bad_rated_pts} \
   WHEN LOWER(l.loan_status) LIKE '%late (16-30 days)%' OR LOWER(l.loan_status) LIKE '%late (31-120 days)%' THEN ${spark.sql.very_bad_rated_pts} \
   WHEN LOWER(l.loan_status) LIKE '%charged off%' THEN ${spark.sql.unacceptable_rated_pts} \
   else ${spark.sql.unacceptable_rated_pts} \
  END AS loan_status_pts, \
  CASE \
   WHEN LOWER(a.home_ownership) LIKE '%own' THEN ${spark.sql.excellent_rated_pts} \
   WHEN LOWER(a.home_ownership) LIKE '%rent' THEN ${spark.sql.good_rated_pts} \
   WHEN LOWER(a.home_ownership) LIKE '%mortgage' THEN ${spark.sql.bad_rated_pts} \
   WHEN LOWER(a.home_ownership) LIKE '%any' OR LOWER(a.home_ownership) IS NULL THEN ${spark.sql.very_bad_rated_pts} \
  END AS home_pts, \
  CASE \
   WHEN l.funded_amount <= (a.total_high_credit_limit * 0.10) THEN ${spark.sql.excellent_rated_pts} \
   WHEN l.funded_amount > (a.total_high_credit_limit * 0.10) AND l.funded_amount <= (a.total_high_credit_limit * 0.20) THEN ${spark.sql.very_good_rated_pts} \
   WHEN l.funded_amount > (a.total_high_credit_limit * 0.20) AND l.funded_amount <= (a.total_high_credit_limit * 0.30) THEN ${spark.sql.good_rated_pts} \
   WHEN l.funded_amount > (a.total_high_credit_limit * 0.30) AND l.funded_amount <= (a.total_high_credit_limit * 0.50) THEN ${spark.sql.bad_rated_pts} \
   WHEN l.funded_amount > (a.total_high_credit_limit * 0.50) AND l.funded_amount <= (a.total_high_credit_limit * 0.70) THEN ${spark.sql.very_bad_rated_pts} \
   WHEN l.funded_amount > (a.total_high_credit_limit * 0.70) THEN ${spark.sql.unacceptable_rated_pts} \
   else ${spark.sql.unacceptable_rated_pts} \
   END AS credit_limit_pts, \
   CASE \
   WHEN (a.grade) = 'A' and (a.sub_grade)='A1' THEN ${spark.sql.excellent_rated_pts} \
   WHEN (a.grade) = 'A' and (a.sub_grade)='A2' THEN (${spark.sql.excellent_rated_pts} * 0.95) \
   WHEN (a.grade) = 'A' and (a.sub_grade)='A3' THEN (${spark.sql.excellent_rated_pts} * 0.90) \
   WHEN (a.grade) = 'A' and (a.sub_grade)='A4' THEN (${spark.sql.excellent_rated_pts} * 0.85) \
   WHEN (a.grade) = 'A' and (a.sub_grade)='A5' THEN (${spark.sql.excellent_rated_pts} * 0.80) \
   WHEN (a.grade) = 'B' and (a.sub_grade)='B1' THEN (${spark.sql.very_good_rated_pts}) \
   WHEN (a.grade) = 'B' and (a.sub_grade)='B2' THEN (${spark.sql.very_good_rated_pts} * 0.95) \
   WHEN (a.grade) = 'B' and (a.sub_grade)='B3' THEN (${spark.sql.very_good_rated_pts} * 0.90) \
   WHEN (a.grade) = 'B' and (a.sub_grade)='B4' THEN (${spark.sql.very_good_rated_pts} * 0.85) \
   WHEN (a.grade) = 'B' and (a.sub_grade)='B5' THEN (${spark.sql.very_good_rated_pts} * 0.80) \
   WHEN (a.grade) = 'C' and (a.sub_grade)='C1' THEN (${spark.sql.good_rated_pts}) \
   WHEN (a.grade) = 'C' and (a.sub_grade)='C2' THEN (${spark.sql.good_rated_pts} * 0.95) \
   WHEN (a.grade) = 'C' and (a.sub_grade)='C3' THEN (${spark.sql.good_rated_pts} * 0.90) \
   WHEN (a.grade) = 'C' and (a.sub_grade)='C4' THEN (${spark.sql.good_rated_pts} * 0.85) \
   WHEN (a.grade) = 'C' and (a.sub_grade)='C5' THEN (${spark.sql.good_rated_pts} * 0.80) \
   WHEN (a.grade) = 'D' and (a.sub_grade)='D1' THEN (${spark.sql.bad_rated_pts}) \
   WHEN (a.grade) = 'D' and (a.sub_grade)='D2' THEN (${spark.sql.bad_rated_pts} * 0.95) \
   WHEN (a.grade) = 'D' and (a.sub_grade)='D3' THEN (${spark.sql.bad_rated_pts} * 0.90) \
   WHEN (a.grade) = 'D' and (a.sub_grade)='D4' THEN (${spark.sql.bad_rated_pts} * 0.85) \
   WHEN (a.grade) = 'D' and (a.sub_grade)='D5' THEN (${spark.sql.bad_rated_pts} * 0.80) \
   WHEN (a.grade) = 'E' and (a.sub_grade)='E1' THEN (${spark.sql.very_bad_rated_pts}) \
   WHEN (a.grade) = 'E' and (a.sub_grade)='E2' THEN (${spark.sql.very_bad_rated_pts} * 0.95) \
   WHEN (a.grade) = 'E' and (a.sub_grade)='E3' THEN (${spark.sql.very_bad_rated_pts} * 0.90) \
   WHEN (a.grade) = 'E' and (a.sub_grade)='E4' THEN (${spark.sql.very_bad_rated_pts} * 0.85) \
   WHEN (a.grade) = 'E' and (a.sub_grade)='E5' THEN (${spark.sql.very_bad_rated_pts} * 0.80) \
   WHEN (a.grade) in ('F', 'G') THEN (${spark.sql.unacceptable_rated_pts}) \
  END AS grade_pts \
FROM ldh_ph_pts ldef \
INNER JOIN itv009198_lending_club.loans l ON ldef.member_id = l.member_id \
INNER JOIN itv009198_lending_club.customers_new a ON a.member_id = ldef.member_id where ldef.member_id NOT IN (select member_id from bad_data_customer)") 

In [13]:
fh_ldh_ph_df.createOrReplaceTempView("fh_ldh_ph_pts")

In [14]:
fh_ldh_ph_df

member_id,last_payment_pts,total_payment_pts,delinq_pts,public_records_pts,public_bankruptcies_pts,enq_pts,loan_status_pts,home_pts,credit_limit_pts,grade_pts
000c8875b71a6b47c...,800,650,250,800,800,800,800,250,800,680.0
003769d7f54c7859e...,500,500,250,800,800,800,0,250,800,100.0
003e1e6cbd2920bbb...,500,650,250,250,250,800,500,250,800,640.0
004017b21bd4d6271...,100,650,750,800,800,800,800,250,800,500.0
005b4c3db3fce07dc...,500,650,250,250,800,250,500,250,500,520.0
00710707c563c2119...,800,650,250,800,800,800,800,250,800,520.0
007da79904f69970d...,800,650,250,800,800,800,800,500,250,800.0
00f435a80d0440ece...,500,500,100,800,800,800,500,250,800,475.0
00fc2ae3ffb1213e4...,650,650,250,800,800,800,800,250,650,450.0
00fc8144cb210ba8c...,500,650,250,250,250,800,500,250,800,500.0


In [15]:
#### Final loan score calculation by considering all the 3 criterias with the following %**

In [16]:
#### 1. Payment History = 20%
#### 2. Loan Defaults = 45%
#### 3. Financial Health = 35%

In [17]:
loan_score = spark.sql("SELECT member_id, \
    ((last_payment_pts+total_payment_pts)*0.20) as payment_history_pts, \
    ((delinq_pts + public_records_pts + public_bankruptcies_pts + enq_pts) * 0.45) as defaulters_history_pts, \
    ((loan_status_pts + home_pts + credit_limit_pts + grade_pts)*0.35) as financial_health_pts \
    FROM fh_ldh_ph_pts")

In [18]:
loan_score

member_id,payment_history_pts,defaulters_history_pts,financial_health_pts
000c8875b71a6b47c...,290.0,1192.5,885.5
003769d7f54c7859e...,200.0,1192.5,402.5
003e1e6cbd2920bbb...,230.0,697.5,766.5
004017b21bd4d6271...,150.0,1417.5,822.5
005b4c3db3fce07dc...,230.0,697.5,619.5
00710707c563c2119...,290.0,1192.5,829.5
007da79904f69970d...,290.0,1192.5,822.5
00f435a80d0440ece...,200.0,1125.0,708.75
00fc2ae3ffb1213e4...,260.0,1192.5,752.5
00fc8144cb210ba8c...,230.0,697.5,717.5


In [19]:
final_loan_score = loan_score.withColumn('loan_score',loan_score.payment_history_pts+loan_score.financial_health_pts)

In [20]:
final_loan_score

member_id,payment_history_pts,defaulters_history_pts,financial_health_pts,loan_score
000c8875b71a6b47c...,290.0,1192.5,885.5,1175.5
003769d7f54c7859e...,200.0,1192.5,402.5,602.5
003e1e6cbd2920bbb...,230.0,697.5,766.5,996.5
004017b21bd4d6271...,150.0,1417.5,822.5,972.5
005b4c3db3fce07dc...,230.0,697.5,619.5,849.5
00710707c563c2119...,290.0,1192.5,829.5,1119.5
007da79904f69970d...,290.0,1192.5,822.5,1112.5
00f435a80d0440ece...,200.0,1125.0,708.75,908.75
00fc2ae3ffb1213e4...,260.0,1192.5,752.5,1012.5
00fc8144cb210ba8c...,230.0,697.5,717.5,947.5


In [21]:
final_loan_score.createOrReplaceTempView("loan_score_eval")

In [22]:
spark.sql("select * from loan_score_eval")

member_id,payment_history_pts,defaulters_history_pts,financial_health_pts,loan_score
000c8875b71a6b47c...,290.0,1192.5,885.5,1175.5
003769d7f54c7859e...,200.0,1192.5,402.5,602.5
003e1e6cbd2920bbb...,230.0,697.5,766.5,996.5
004017b21bd4d6271...,150.0,1417.5,822.5,972.5
005b4c3db3fce07dc...,230.0,697.5,619.5,849.5
00710707c563c2119...,290.0,1192.5,829.5,1119.5
007da79904f69970d...,290.0,1192.5,822.5,1112.5
00f435a80d0440ece...,200.0,1125.0,708.75,908.75
00fc2ae3ffb1213e4...,260.0,1192.5,752.5,1012.5
00fc8144cb210ba8c...,230.0,697.5,717.5,947.5


In [23]:
loan_score_final = spark.sql("select ls.*, \
case \
WHEN loan_score > ${spark.sql.very_good_grade_pts} THEN 'A' \
WHEN loan_score <= ${spark.sql.very_good_grade_pts} AND loan_score > ${spark.sql.good_grade_pts} THEN 'B' \
WHEN loan_score <= ${spark.sql.good_grade_pts} AND loan_score > ${spark.sql.bad_grade_pts} THEN 'C' \
WHEN loan_score <= ${spark.sql.bad_grade_pts} AND loan_score  > ${spark.sql.very_bad_grade_pts} THEN 'D' \
WHEN loan_score <= ${spark.sql.very_bad_grade_pts} AND loan_score > ${spark.sql.unacceptable_grade_pts} THEN 'E'  \
WHEN loan_score <= ${spark.sql.unacceptable_grade_pts} THEN 'F' \
end as loan_final_grade \
from loan_score_eval ls")

In [24]:
loan_score_final.show(40)

+--------------------+-------------------+----------------------+--------------------+----------+----------------+
|           member_id|payment_history_pts|defaulters_history_pts|financial_health_pts|loan_score|loan_final_grade|
+--------------------+-------------------+----------------------+--------------------+----------+----------------+
|000c8875b71a6b47c...|             290.00|               1192.50|            885.5000| 1175.5000|               D|
|003769d7f54c7859e...|             200.00|               1192.50|            402.5000|  602.5000|               F|
|003e1e6cbd2920bbb...|             230.00|                697.50|            766.5000|  996.5000|               E|
|004017b21bd4d6271...|             150.00|               1417.50|            822.5000|  972.5000|               E|
|005b4c3db3fce07dc...|             230.00|                697.50|            619.5000|  849.5000|               E|
|00710707c563c2119...|             290.00|               1192.50|            829

In [25]:
loan_score_final.createOrReplaceTempView("loan_final_table")

In [26]:
result = spark.sql("SELECT * FROM loan_final_table WHERE loan_final_grade IN ('D')")
result.show()

+--------------------+-------------------+----------------------+--------------------+----------+----------------+
|           member_id|payment_history_pts|defaulters_history_pts|financial_health_pts|loan_score|loan_final_grade|
+--------------------+-------------------+----------------------+--------------------+----------+----------------+
|000c8875b71a6b47c...|             290.00|               1192.50|            885.5000| 1175.5000|               D|
|00710707c563c2119...|             290.00|               1192.50|            829.5000| 1119.5000|               D|
|007da79904f69970d...|             290.00|               1192.50|            822.5000| 1112.5000|               D|
|00fc2ae3ffb1213e4...|             260.00|               1192.50|            752.5000| 1012.5000|               D|
|01121d7f3f6f27495...|             290.00|               1192.50|            863.6250| 1153.6250|               D|
|018a82c47453bf02d...|             150.00|               1192.50|            948

In [27]:
spark.sql("select count (*) from loan_final_table")

count(1)
482842


In [28]:
loan_score_final.write \
.format("parquet") \
.mode("overwrite") \
.option("path","public/trendytech/lendingclubproject/processed/loan_score") \
.save()

In [29]:
spark.stop()