In [43]:
from pyspark.sql import SparkSession
import getpass 
username=getpass.getuser()
spark=SparkSession. \
    builder. \
    config('spark.ui.port','0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    config('spark.shuffle.useOldFetchProtocol', 'true'). \
    enableHiveSupport(). \
    master('yarn'). \
    getOrCreate()

# Associating points to the grades in order to calculate the Loan Score

In [44]:
spark.conf.set("spark.sql.unacceptable_rated_pts", 0)
spark.conf.set("spark.sql.very_bad_rated_pts", 100)
spark.conf.set("spark.sql.bad_rated_pts", 250)
spark.conf.set("spark.sql.good_rated_pts", 500)
spark.conf.set("spark.sql.very_good_rated_pts", 650)
spark.conf.set("spark.sql.excellent_rated_pts", 800)

In [45]:
spark.conf.set("spark.sql.unacceptable_grade_pts", 750)
spark.conf.set("spark.sql.very_bad_grade_pts", 1000)
spark.conf.set("spark.sql.bad_grade_pts", 1500)
spark.conf.set("spark.sql.good_grade_pts", 2000)
spark.conf.set("spark.sql.very_good_grade_pts", 2500)

# The tables required to calculate the Loan Score
### customers_new

### loans

### loans_repayments

### loans_defaulters_delinq_new

### loans_defaulters_detail_rec_enq_new

## Loan Score Calculation Criteria 1: Payment History(ph)

In [47]:
bad_customer_data_final_df = spark.read \
.format("csv") \
.option("header", True) \
.option("inferSchema", True) \
.load("/user/itv007473/lendingclubproject/bad/bad_customer_data_final")

In [48]:
bad_customer_data_final_df.createOrReplaceTempView("bad_data_customer")

In [49]:
ph_df = spark.sql("""select c.member_id,
case 
   when p.last_payment_amount < (c.monthly_installment * 0.5) then ${spark.sql.very_bad_rated_pts} 
   when p.last_payment_amount >= (c.monthly_installment * 0.5) and p.last_payment_amount < c.monthly_installment then ${spark.sql.very_bad_rated_pts} 
   when (p.last_payment_amount = (c.monthly_installment)) then ${spark.sql.good_rated_pts} 
   when p.last_payment_amount > (c.monthly_installment) and p.last_payment_amount <= (c.monthly_installment * 1.50) then ${spark.sql.very_good_rated_pts} 
   when p.last_payment_amount > (c.monthly_installment * 1.50) then ${spark.sql.excellent_rated_pts} 
   else ${spark.sql.unacceptable_rated_pts} 
   end as last_payment_pts, 
case 
   when p.total_payment_received >= (c.funded_amount * 0.50) then ${spark.sql.very_good_rated_pts} 
   when p.total_payment_received < (c.funded_amount * 0.50) and p.total_payment_received > 0 then ${spark.sql.good_rated_pts} 
   when p.total_payment_received = 0 or (p.total_payment_received) is null then ${spark.sql.unacceptable_rated_pts} 
end as total_payment_pts 
   from itv007473_lending_club.loans_repayments p 
   inner join itv007473_lending_club.loans c on c.loan_id = p.loan_id where member_id NOT IN (select member_id from bad_data_customer)""")

In [51]:
ph_df

member_id,last_payment_pts,total_payment_pts
dcec9334e70f1cc95...,800,650
fc58ca61f51f9dcac...,500,650
2fb62a6ca51063b11...,500,650
488268a5531951622...,800,650
ade6026208e48f5f9...,500,650
7c8b0ca6acddfaeb1...,800,650
a707b7fe7c38bad65...,800,650
1df639cddea30c288...,800,650
22d67005e12d8726d...,500,650
009cf312bd46551b4...,500,650


In [53]:
ph_df.createOrReplaceTempView("ph_pts")

In [54]:
spark.sql("select * from ph_pts")

member_id,last_payment_pts,total_payment_pts
dcec9334e70f1cc95...,800,650
fc58ca61f51f9dcac...,500,650
2fb62a6ca51063b11...,500,650
488268a5531951622...,800,650
ade6026208e48f5f9...,500,650
7c8b0ca6acddfaeb1...,800,650
a707b7fe7c38bad65...,800,650
1df639cddea30c288...,800,650
22d67005e12d8726d...,500,650
009cf312bd46551b4...,500,650


## Loan Score Calculation Criteria 2: Loan Defaulters History(ldh)

In [10]:
ldh_ph_df = spark.sql(
    "select p.*, \
    CASE \
    WHEN d.delinq_2yrs = 0 THEN ${spark.sql.excellent_rated_pts} \
    WHEN d.delinq_2yrs BETWEEN 1 AND 2 THEN ${spark.sql.bad_rated_pts} \
    WHEN d.delinq_2yrs BETWEEN 3 AND 5 THEN ${spark.sql.very_bad_rated_pts} \
    WHEN d.delinq_2yrs > 5 OR d.delinq_2yrs IS NULL THEN ${spark.sql.unacceptable_grade_pts} \
    END AS delinq_pts, \
    CASE \
    WHEN l.pub_rec = 0 THEN ${spark.sql.excellent_rated_pts} \
    WHEN l.pub_rec BETWEEN 1 AND 2 THEN ${spark.sql.bad_rated_pts} \
    WHEN l.pub_rec BETWEEN 3 AND 5 THEN ${spark.sql.very_bad_rated_pts} \
    WHEN l.pub_rec > 5 OR l.pub_rec IS NULL THEN ${spark.sql.very_bad_rated_pts} \
    END AS public_records_pts, \
    CASE \
    WHEN l.pub_rec_bankruptcies = 0 THEN ${spark.sql.excellent_rated_pts} \
    WHEN l.pub_rec_bankruptcies BETWEEN 1 AND 2 THEN ${spark.sql.bad_rated_pts} \
    WHEN l.pub_rec_bankruptcies BETWEEN 3 AND 5 THEN ${spark.sql.very_bad_rated_pts} \
    WHEN l.pub_rec_bankruptcies > 5 OR l.pub_rec_bankruptcies IS NULL THEN ${spark.sql.very_bad_rated_pts} \
    END as public_bankruptcies_pts, \
    CASE \
    WHEN l.inq_last_6mths = 0 THEN ${spark.sql.excellent_rated_pts} \
    WHEN l.inq_last_6mths BETWEEN 1 AND 2 THEN ${spark.sql.bad_rated_pts} \
    WHEN l.inq_last_6mths BETWEEN 3 AND 5 THEN ${spark.sql.very_bad_rated_pts} \
    WHEN l.inq_last_6mths > 5 OR l.inq_last_6mths IS NULL THEN ${spark.sql.unacceptable_rated_pts} \
    END AS enq_pts \
    FROM itv007473_lending_club.loans_defaulters_detail_rec_enq_new l \
    INNER JOIN itv007473_lending_club.loans_defaulters_delinq_new d ON d.member_id = l.member_id  \
    INNER JOIN ph_pts p ON p.member_id = l.member_id where l.member_id NOT IN (select member_id from bad_data_customer)")

In [11]:
ldh_ph_df.createOrReplaceTempView("ldh_ph_pts")

In [12]:
spark.sql("select * from ldh_ph_pts")

member_id,last_payment_pts,total_payment_pts,delinq_pts,public_records_pts,public_bankruptcies_pts,enq_pts
000c8875b71a6b47c...,800,650,250,800,800,800
0012728d9f616bdf2...,500,500,800,800,800,800
00151ece27c7ca280...,800,650,800,800,800,250
003769d7f54c7859e...,500,500,250,800,800,800
0037bb910c0a758f5...,500,500,800,800,800,800
003e1e6cbd2920bbb...,500,650,250,250,250,800
004017b21bd4d6271...,100,650,750,800,800,800
005b4c3db3fce07dc...,500,650,250,250,800,250
00710707c563c2119...,800,650,250,800,800,800
007da79904f69970d...,800,650,250,800,800,800


In [13]:
fh_ldh_ph_df = spark.sql("select ldef.*, \
   CASE \
   WHEN LOWER(l.loan_status) LIKE '%fully paid%' THEN ${spark.sql.excellent_rated_pts} \
   WHEN LOWER(l.loan_status) LIKE '%current%' THEN ${spark.sql.good_rated_pts} \
   WHEN LOWER(l.loan_status) LIKE '%in grace period%' THEN ${spark.sql.bad_rated_pts} \
   WHEN LOWER(l.loan_status) LIKE '%late (16-30 days)%' OR LOWER(l.loan_status) LIKE '%late (31-120 days)%' THEN ${spark.sql.very_bad_rated_pts} \
   WHEN LOWER(l.loan_status) LIKE '%charged off%' THEN ${spark.sql.unacceptable_rated_pts} \
   else ${spark.sql.unacceptable_rated_pts} \
   END AS loan_status_pts, \
   CASE \
   WHEN LOWER(a.home_ownership) LIKE '%own' THEN ${spark.sql.excellent_rated_pts} \
   WHEN LOWER(a.home_ownership) LIKE '%rent' THEN ${spark.sql.good_rated_pts} \
   WHEN LOWER(a.home_ownership) LIKE '%mortgage' THEN ${spark.sql.bad_rated_pts} \
   WHEN LOWER(a.home_ownership) LIKE '%any' OR LOWER(a.home_ownership) IS NULL THEN ${spark.sql.very_bad_rated_pts} \
   END AS home_pts, \
   CASE \
   WHEN l.funded_amount <= (a.total_high_credit_limit * 0.10) THEN ${spark.sql.excellent_rated_pts} \
   WHEN l.funded_amount > (a.total_high_credit_limit * 0.10) AND l.funded_amount <= (a.total_high_credit_limit * 0.20) THEN ${spark.sql.very_good_rated_pts} \
   WHEN l.funded_amount > (a.total_high_credit_limit * 0.20) AND l.funded_amount <= (a.total_high_credit_limit * 0.30) THEN ${spark.sql.good_rated_pts} \
   WHEN l.funded_amount > (a.total_high_credit_limit * 0.30) AND l.funded_amount <= (a.total_high_credit_limit * 0.50) THEN ${spark.sql.bad_rated_pts} \
   WHEN l.funded_amount > (a.total_high_credit_limit * 0.50) AND l.funded_amount <= (a.total_high_credit_limit * 0.70) THEN ${spark.sql.very_bad_rated_pts} \
   WHEN l.funded_amount > (a.total_high_credit_limit * 0.70) THEN ${spark.sql.unacceptable_rated_pts} \
   else ${spark.sql.unacceptable_rated_pts} \
   END AS credit_limit_pts, \
   CASE \
   WHEN (a.grade) = 'A' and (a.sub_grade)='A1' THEN ${spark.sql.excellent_rated_pts} \
   WHEN (a.grade) = 'A' and (a.sub_grade)='A2' THEN (${spark.sql.excellent_rated_pts} * 0.95) \
   WHEN (a.grade) = 'A' and (a.sub_grade)='A3' THEN (${spark.sql.excellent_rated_pts} * 0.90) \
   WHEN (a.grade) = 'A' and (a.sub_grade)='A4' THEN (${spark.sql.excellent_rated_pts} * 0.85) \
   WHEN (a.grade) = 'A' and (a.sub_grade)='A5' THEN (${spark.sql.excellent_rated_pts} * 0.80) \
   WHEN (a.grade) = 'B' and (a.sub_grade)='B1' THEN (${spark.sql.very_good_rated_pts}) \
   WHEN (a.grade) = 'B' and (a.sub_grade)='B2' THEN (${spark.sql.very_good_rated_pts} * 0.95) \
   WHEN (a.grade) = 'B' and (a.sub_grade)='B3' THEN (${spark.sql.very_good_rated_pts} * 0.90) \
   WHEN (a.grade) = 'B' and (a.sub_grade)='B4' THEN (${spark.sql.very_good_rated_pts} * 0.85) \
   WHEN (a.grade) = 'B' and (a.sub_grade)='B5' THEN (${spark.sql.very_good_rated_pts} * 0.80) \
   WHEN (a.grade) = 'C' and (a.sub_grade)='C1' THEN (${spark.sql.good_rated_pts}) \
   WHEN (a.grade) = 'C' and (a.sub_grade)='C2' THEN (${spark.sql.good_rated_pts} * 0.95) \
   WHEN (a.grade) = 'C' and (a.sub_grade)='C3' THEN (${spark.sql.good_rated_pts} * 0.90) \
   WHEN (a.grade) = 'C' and (a.sub_grade)='C4' THEN (${spark.sql.good_rated_pts} * 0.85) \
   WHEN (a.grade) = 'C' and (a.sub_grade)='C5' THEN (${spark.sql.good_rated_pts} * 0.80) \
   WHEN (a.grade) = 'D' and (a.sub_grade)='D1' THEN (${spark.sql.bad_rated_pts}) \
   WHEN (a.grade) = 'D' and (a.sub_grade)='D2' THEN (${spark.sql.bad_rated_pts} * 0.95) \
   WHEN (a.grade) = 'D' and (a.sub_grade)='D3' THEN (${spark.sql.bad_rated_pts} * 0.90) \
   WHEN (a.grade) = 'D' and (a.sub_grade)='D4' THEN (${spark.sql.bad_rated_pts} * 0.85) \
   WHEN (a.grade) = 'D' and (a.sub_grade)='D5' THEN (${spark.sql.bad_rated_pts} * 0.80) \
   WHEN (a.grade) = 'E' and (a.sub_grade)='E1' THEN (${spark.sql.very_bad_rated_pts}) \
   WHEN (a.grade) = 'E' and (a.sub_grade)='E2' THEN (${spark.sql.very_bad_rated_pts} * 0.95) \
   WHEN (a.grade) = 'E' and (a.sub_grade)='E3' THEN (${spark.sql.very_bad_rated_pts} * 0.90) \
   WHEN (a.grade) = 'E' and (a.sub_grade)='E4' THEN (${spark.sql.very_bad_rated_pts} * 0.85) \
   WHEN (a.grade) = 'E' and (a.sub_grade)='E5' THEN (${spark.sql.very_bad_rated_pts} * 0.80) \
   WHEN (a.grade) in ('F', 'G') THEN (${spark.sql.unacceptable_rated_pts}) \
   END AS grade_pts \
   FROM ldh_ph_pts ldef \
   INNER JOIN itv007473_lending_club.loans l ON ldef.member_id = l.member_id \
   INNER JOIN itv007473_lending_club.customers_new a ON a.member_id = ldef.member_id where ldef.member_id NOT IN (select member_id from bad_data_customer)") 

In [14]:
fh_ldh_ph_df.createOrReplaceTempView("fh_ldh_ph_pts")

## Final loan score calculation by considering all the 3 criterias with the following %**
### 1. Payment History = 20%
### 2. Loan Defaults = 45%
### 3. Financial Health = 35%

In [15]:
loan_score = spark.sql("SELECT member_id, \
((last_payment_pts+total_payment_pts)*0.20) as payment_history_pts, \
((delinq_pts + public_records_pts + public_bankruptcies_pts + enq_pts) * 0.45) as defaulters_history_pts, \
((loan_status_pts + home_pts + credit_limit_pts + grade_pts)*0.35) as financial_health_pts \
FROM fh_ldh_ph_pts")

In [16]:
loan_score

member_id,payment_history_pts,defaulters_history_pts,financial_health_pts
000c8875b71a6b47c...,290.0,1192.5,885.5
0012728d9f616bdf2...,200.0,1440.0,766.5
00151ece27c7ca280...,290.0,1192.5,717.5
003769d7f54c7859e...,200.0,1192.5,402.5
0037bb910c0a758f5...,200.0,1440.0,787.5
003e1e6cbd2920bbb...,230.0,697.5,766.5
004017b21bd4d6271...,150.0,1417.5,822.5
005b4c3db3fce07dc...,230.0,697.5,619.5
00710707c563c2119...,290.0,1192.5,829.5
007da79904f69970d...,290.0,1192.5,822.5


In [17]:
final_loan_score = loan_score.withColumn('loan_score', loan_score.payment_history_pts + loan_score.defaulters_history_pts + loan_score.financial_health_pts)

In [18]:
final_loan_score.createOrReplaceTempView("loan_score_eval")

In [19]:
loan_score_final = spark.sql("select ls.*, \
case \
WHEN loan_score > ${spark.sql.very_good_grade_pts} THEN 'A' \
WHEN loan_score <= ${spark.sql.very_good_grade_pts} AND loan_score > ${spark.sql.good_grade_pts} THEN 'B' \
WHEN loan_score <= ${spark.sql.good_grade_pts} AND loan_score > ${spark.sql.bad_grade_pts} THEN 'C' \
WHEN loan_score <= ${spark.sql.bad_grade_pts} AND loan_score  > ${spark.sql.very_bad_grade_pts} THEN 'D' \
WHEN loan_score <= ${spark.sql.very_bad_grade_pts} AND loan_score > ${spark.sql.unacceptable_grade_pts} THEN 'E'  \
WHEN loan_score <= ${spark.sql.unacceptable_grade_pts} THEN 'F' \
end as loan_final_grade \
from loan_score_eval ls")

In [20]:
loan_score_final.createOrReplaceTempView("loan_final_table")

In [21]:
spark.sql("select * from loan_final_table where loan_final_grade in ('C')")

member_id,payment_history_pts,defaulters_history_pts,financial_health_pts,loan_score,loan_final_grade
003769d7f54c7859e...,200.0,1192.5,402.5,1795.0,C
003e1e6cbd2920bbb...,230.0,697.5,766.5,1694.0,C
005b4c3db3fce07dc...,230.0,697.5,619.5,1547.0,C
00fc8144cb210ba8c...,230.0,697.5,717.5,1645.0,C
0134d807fbc9e8ff8...,230.0,1192.5,553.0,1975.5,C
013630bb77d0f3c6a...,290.0,1192.5,399.0,1881.5,C
017ce564dc0d6f975...,200.0,945.0,591.5,1736.5,C
01b48af926b98a718...,200.0,1192.5,573.125,1965.625,C
01d0c48835e969a01...,200.0,1192.5,262.5,1655.0,C
024f077b5b4e47ea0...,230.0,1192.5,437.5,1860.0,C


In [22]:
spark.sql("select count(*) from loan_final_table")

count(1)
1102587


In [23]:
loan_score_final.write \
.format("parquet") \
.mode("overwrite") \
.option("path", "/user/itv007473/lendingclubproject/processed/loan_score") \
.save()