In [1]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port', '0'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
config('spark.shuffle.useOldFetchProtocol', 'true'). \
config('spark.app.name', 'itv012041_Loan_Defaulters_Cleanup'). \
enableHiveSupport(). \
master('yarn'). \
getOrCreate()

In [2]:
spark

In [3]:
defaulters_schema = """member_id string, delinq_2yrs float, delinq_amnt float,
pub_rec float, pub_rec_bankruptcies float, inq_last_6mths float,
total_rec_late_fee float, mths_since_last_delinq float, mths_since_last_record
float"""

In [4]:
loans_defaulters_raw_df = spark.read \
.format("csv") \
.option("header", True) \
.schema(defaulters_schema) \
.load("/user/itv012041/lendingclubproject/raw/loan_defaulters_data_csv")

In [5]:
loans_defaulters_raw_df

member_id,delinq_2yrs,delinq_amnt,pub_rec,pub_rec_bankruptcies,inq_last_6mths,total_rec_late_fee,mths_since_last_delinq,mths_since_last_record
f90d0607c9b0fa739...,0.0,0.0,0.0,0.0,1.0,0.0,31.0,
99d92e92a800aa4ab...,1.0,0.0,0.0,0.0,0.0,0.0,6.0,
6fa139cac7b13feb8...,0.0,0.0,0.0,0.0,0.0,0.0,47.0,
e8f881a1a3858e248...,0.0,0.0,0.0,0.0,0.0,0.0,33.0,
c8655c7f8efef0342...,1.0,0.0,0.0,0.0,0.0,0.0,21.0,
8ddccb7d382d2bfa5...,0.0,0.0,0.0,0.0,0.0,0.0,,
5ee639ebe58af7ebd...,0.0,0.0,1.0,0.0,2.0,0.0,,71.0
0d85da60a31069c9d...,1.0,0.0,2.0,0.0,0.0,0.0,6.0,63.0
d98e23f7616bc19d2...,0.0,0.0,0.0,0.0,0.0,0.0,36.0,
5320d16780c93fe3d...,0.0,0.0,0.0,0.0,0.0,0.0,35.0,


In [6]:
loans_defaulters_raw_df.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- delinq_2yrs: float (nullable = true)
 |-- delinq_amnt: float (nullable = true)
 |-- pub_rec: float (nullable = true)
 |-- pub_rec_bankruptcies: float (nullable = true)
 |-- inq_last_6mths: float (nullable = true)
 |-- total_rec_late_fee: float (nullable = true)
 |-- mths_since_last_delinq: float (nullable = true)
 |-- mths_since_last_record: float (nullable = true)



In [7]:
loans_defaulters_raw_df.createOrReplaceTempView("loan_defaulters")

In [8]:
spark.sql("select distinct(delinq_2yrs) from loan_defaulters")

delinq_2yrs
20.04
18.53
18.0
26.24
6.52
9.0
21.72
17.17
58.0
5.0


In [9]:
spark.sql("select delinq_2yrs, count(*) \
            from loan_defaulters \
            group by 1 \
            order by 2 desc").show(40)

+-----------+--------+
|delinq_2yrs|count(1)|
+-----------+--------+
|        0.0| 1838878|
|        1.0|  281335|
|        2.0|   81285|
|        3.0|   29539|
|        4.0|   13179|
|        5.0|    6599|
|        6.0|    3717|
|        7.0|    2062|
|        8.0|    1223|
|        9.0|     818|
|       10.0|     556|
|       11.0|     363|
|       12.0|     264|
|       null|     261|
|       13.0|     165|
|       14.0|     120|
|       15.0|      87|
|       16.0|      55|
|       17.0|      30|
|       18.0|      30|
|       19.0|      23|
|       20.0|      17|
|       21.0|      12|
|       22.0|       5|
|       24.0|       4|
|       26.0|       3|
|       30.0|       2|
|       25.0|       2|
|       3.44|       2|
|       29.0|       2|
|       23.0|       2|
|       1.41|       1|
|       9.44|       1|
|      26.24|       1|
|      20.04|       1|
|      17.18|       1|
|      13.76|       1|
|       5.52|       1|
|       6.52|       1|
|       14.1|       1|
+----------

In [31]:
from pyspark.sql.functions import col
loans_defaulters_delinq_2yrs_processed_df = loans_defaulters_raw_df.withColumn("delinq_2yrs", col("delinq_2yrs").cast("integer")) \
.fillna(0, subset = ["delinq_2yrs"])

In [32]:
loans_defaulters_pub_rec_processed_df = loans_defaulters_delinq_2yrs_processed_df.withColumn("pub_rec", col("pub_rec").cast("integer")) \
.fillna(0, subset = ["pub_rec"])

In [33]:
loans_defaulters_pub_rec_bankruptcies_processed_df = loans_defaulters_pub_rec_processed_df.withColumn("pub_rec_bankruptcies", col("pub_rec_bankruptcies").cast("integer")) \
.fillna(0, subset = ["pub_rec_bankruptcies"])

In [34]:
loans_defaulters_inq_last_6mths_processed_df = loans_defaulters_pub_rec_bankruptcies_processed_df.withColumn("inq_last_6mths", col("inq_last_6mths").cast("integer")) \
.fillna(0, subset = ["inq_last_6mths"])

In [35]:
loans_defaulters_inq_last_6mths_processed_df.createOrReplaceTempView("loan_defaulters")

In [36]:
spark.sql("select * from loan_defaulters")

member_id,delinq_2yrs,delinq_amnt,pub_rec,pub_rec_bankruptcies,inq_last_6mths,total_rec_late_fee,mths_since_last_delinq,mths_since_last_record
f90d0607c9b0fa739...,0,0.0,0,0,1,0.0,31.0,
99d92e92a800aa4ab...,1,0.0,0,0,0,0.0,6.0,
6fa139cac7b13feb8...,0,0.0,0,0,0,0.0,47.0,
e8f881a1a3858e248...,0,0.0,0,0,0,0.0,33.0,
c8655c7f8efef0342...,1,0.0,0,0,0,0.0,21.0,
8ddccb7d382d2bfa5...,0,0.0,0,0,0,0.0,,
5ee639ebe58af7ebd...,0,0.0,1,0,2,0.0,,71.0
0d85da60a31069c9d...,1,0.0,2,0,0,0.0,6.0,63.0
d98e23f7616bc19d2...,0,0.0,0,0,0,0.0,36.0,
5320d16780c93fe3d...,0,0.0,0,0,0,0.0,35.0,


In [37]:
spark.sql("select delinq_2yrs, count(*) \
            from loan_defaulters \
            group by 1 \
            order by 2 desc").show(100)

+-----------+--------+
|delinq_2yrs|count(1)|
+-----------+--------+
|          0| 1839141|
|          1|  281337|
|          2|   81285|
|          3|   29545|
|          4|   13180|
|          5|    6601|
|          6|    3719|
|          7|    2063|
|          8|    1226|
|          9|     821|
|         10|     558|
|         11|     363|
|         12|     266|
|         13|     167|
|         14|     123|
|         15|      90|
|         16|      56|
|         17|      33|
|         18|      32|
|         19|      24|
|         20|      19|
|         21|      16|
|         22|       7|
|         24|       6|
|         23|       5|
|         26|       4|
|         29|       2|
|         25|       2|
|         30|       2|
|         28|       1|
|         27|       1|
|         32|       1|
|         35|       1|
|         58|       1|
|         42|       1|
|         39|       1|
|         36|       1|
+-----------+--------+



In [38]:
#If a person has not defaulted, then no need to store his information in loan defaulters data

In [39]:
loan_defaulters_actual_df = spark.sql("select member_id, delinq_2yrs, delinq_amnt, \
int(mths_since_last_delinq) from loan_defaulters \
where delinq_2yrs > 0 or mths_since_last_delinq > 0")

In [40]:
loan_defaulters_actual_df

member_id,delinq_2yrs,delinq_amnt,mths_since_last_delinq
f90d0607c9b0fa739...,0,0.0,31
99d92e92a800aa4ab...,1,0.0,6
6fa139cac7b13feb8...,0,0.0,47
e8f881a1a3858e248...,0,0.0,33
c8655c7f8efef0342...,1,0.0,21
0d85da60a31069c9d...,1,0.0,6
d98e23f7616bc19d2...,0,0.0,36
5320d16780c93fe3d...,0,0.0,35
1a2ebfa028dd75ece...,0,0.0,30
2835469dd982ec6fd...,4,0.0,5


In [41]:
loan_defaulters_actual_df.count()

1106163

In [42]:
loan_defaulters_with_records_or_enq_df = spark.sql("select member_id, pub_rec, pub_rec_bankruptcies, \
inq_last_6mths \
from loan_defaulters where pub_rec > 0.0 or pub_rec_bankruptcies > 0.0 \
or inq_last_6mths > 0.0")

In [43]:
loan_defaulters_with_records_or_enq_df

member_id,pub_rec,pub_rec_bankruptcies,inq_last_6mths
f90d0607c9b0fa739...,0,0,1
5ee639ebe58af7ebd...,1,0,2
0d85da60a31069c9d...,2,0,0
610b771e1cd1a948d...,0,0,3
1a2ebfa028dd75ece...,0,0,2
64554289d0dcbfba2...,0,0,2
89f0699b7cf792107...,0,0,2
f403d14f8d8469b78...,0,0,1
879ac1d1414e07371...,2,0,2
44a488cca3a90ae44...,0,0,1


In [44]:
loan_defaulters_with_records_or_enq_df.count()

1070124

In [45]:
loan_defaulters_actual_df.write \
.option("header", True) \
.format("csv") \
.mode("overwrite") \
.option("path", "/user/itv012041/lendingclubproject/cleaned/loan_defaulters_csv") \
.save()

In [46]:
loan_defaulters_actual_df.write \
.option("header", True) \
.format("parquet") \
.mode("overwrite") \
.option("path", "/user/itv012041/lendingclubproject/cleaned/loan_defaulters_parquet") \
.save()

In [47]:
loan_defaulters_with_records_or_enq_df.write \
.option("header", True) \
.format("csv") \
.mode("overwrite") \
.option("path", "/user/itv012041/lendingclubproject/cleaned/loan_defaulters_with_records_or_enq_csv") \
.save()

In [48]:
loan_defaulters_with_records_or_enq_df.write \
.option("header", True) \
.format("parquet") \
.mode("overwrite") \
.option("path", "/user/itv012041/lendingclubproject/cleaned/loan_defaulters_with_records_or_enq_parquet") \
.save()

In [37]:
spark.stop()