In [1]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port', '0'). \
config('spark.shuffle.useOldFetchProtocol', 'true'). \
config("spark.sql.warehouse.dir", f"/user/itv005857/warehouse"). \
enableHiveSupport(). \
master('yarn'). \
getOrCreate()

In [2]:
loans_repay_schema = 'loan_id string, total_principal_received float, total_interest_received float, total_late_fee_received float, total_payment_received float, last_payment_amount float, last_payment_date string, next_payment_date string'

In [3]:
loans_repay_df = spark.read \
.format("csv") \
.option("header", True) \
.option("inferSchema", True) \
.load("/user/itv010110/lendingclubprojectJ/raw/loans_repayments_csv")

In [4]:
loans_repay_df

loan_id,total_rec_prncp,total_rec_int,total_rec_late_fee,total_pymnt,last_pymnt_amnt,last_pymnt_d,next_pymnt_d
76003861,6423.8,5607.85,0.0,12031.65,574.88,Jan-2018,
76263914,2400.0,323.18,0.0,2723.1800875923,905.18,Dec-2017,
75537401,12600.0,1392.76,0.0,13992.7563947386,3437.74,Aug-2018,
75038986,7805.03,7583.71,0.0,15388.74,440.72,Mar-2019,Apr-2019
76301424,2614.78,1080.09,0.0,3694.87,153.89,Apr-2018,
75333198,8664.09,3431.32,68.8,12164.21,343.9,Mar-2019,Apr-2019
76391453,35000.0,10266.01,0.0,45266.0062708179,18884.35,Feb-2019,
76363364,15000.0,2093.11,0.0,17093.108291696,4648.59,Aug-2018,
76272510,14463.21,11178.79,110.13,25752.13,734.18,Mar-2019,Apr-2019
76304116,4800.0,1051.65,0.0,5851.6523334103,1761.59,Dec-2017,


In [5]:
loans_repay_df.printSchema()

root
 |-- loan_id: string (nullable = true)
 |-- total_rec_prncp: string (nullable = true)
 |-- total_rec_int: string (nullable = true)
 |-- total_rec_late_fee: string (nullable = true)
 |-- total_pymnt: string (nullable = true)
 |-- last_pymnt_amnt: string (nullable = true)
 |-- last_pymnt_d: string (nullable = true)
 |-- next_pymnt_d: string (nullable = true)



In [6]:
loans_repay_raw_df = spark.read \
.format("csv") \
.option("header", True) \
.schema(loans_repay_schema) \
.load("/user/itv010110/lendingclubprojectJ/raw/loans_repayments_csv")

In [7]:
loans_repay_raw_df.printSchema()

root
 |-- loan_id: string (nullable = true)
 |-- total_principal_received: float (nullable = true)
 |-- total_interest_received: float (nullable = true)
 |-- total_late_fee_received: float (nullable = true)
 |-- total_payment_received: float (nullable = true)
 |-- last_payment_amount: float (nullable = true)
 |-- last_payment_date: string (nullable = true)
 |-- next_payment_date: string (nullable = true)



In [8]:
from pyspark.sql.functions import current_timestamp

In [9]:
loans_repay_df_ingestd = loans_repay_raw_df.withColumn("ingest_date", current_timestamp())

In [10]:
loans_repay_df_ingestd

loan_id,total_principal_received,total_interest_received,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date,ingest_date
76003861,6423.8,5607.85,0.0,12031.65,574.88,Jan-2018,,2024-05-15 11:41:...
76263914,2400.0,323.18,0.0,2723.1802,905.18,Dec-2017,,2024-05-15 11:41:...
75537401,12600.0,1392.76,0.0,13992.757,3437.74,Aug-2018,,2024-05-15 11:41:...
75038986,7805.03,7583.71,0.0,15388.74,440.72,Mar-2019,Apr-2019,2024-05-15 11:41:...
76301424,2614.78,1080.09,0.0,3694.87,153.89,Apr-2018,,2024-05-15 11:41:...
75333198,8664.09,3431.32,68.8,12164.21,343.9,Mar-2019,Apr-2019,2024-05-15 11:41:...
76391453,35000.0,10266.01,0.0,45266.008,18884.35,Feb-2019,,2024-05-15 11:41:...
76363364,15000.0,2093.11,0.0,17093.107,4648.59,Aug-2018,,2024-05-15 11:41:...
76272510,14463.21,11178.79,110.13,25752.13,734.18,Mar-2019,Apr-2019,2024-05-15 11:41:...
76304116,4800.0,1051.65,0.0,5851.6523,1761.59,Dec-2017,,2024-05-15 11:41:...


In [11]:
loans_repay_df_ingestd.createOrReplaceTempView("loan_repayments")

In [12]:
loans_repay_df_ingestd.count()

2260701

In [13]:
spark.sql("select count(*) from loan_repayments where total_principal_received is null")

count(1)
69


In [14]:
columns_to_check = ["total_principal_received","total_interest_received","total_late_fee_received","total_payment_received","last_payment_amount"]

In [15]:
loans_repay_filtered_df = loans_repay_df_ingestd.na.drop(subset = columns_to_check)

In [16]:
loans_repay_filtered_df.count()

2260498

In [17]:
loans_repay_filtered_df.createOrReplaceTempView("loan_repayments")

In [18]:
spark.sql("select * from loan_repayments where total_payment_received = 0.0 and total_principal_received != 0.0")

loan_id,total_principal_received,total_interest_received,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date,ingest_date
1064185,11600.98,11600.98,10000.0,0.0,0.0,0.0,Dec-2014,2024-05-15 11:41:...
516382,21890.229,21856.03,16000.0,0.0,0.0,0.0,Mar-2014,2024-05-15 11:41:...
528899,3045.0364,3019.64,2500.0,0.0,0.0,0.0,Jan-2013,2024-05-15 11:41:...
527598,2398.9092,2220.51,2200.0,0.0,0.0,0.0,Jul-2011,2024-05-15 11:41:...
525697,21797.86,19894.9,15750.0,0.0,0.0,0.0,Jun-2015,2024-05-15 11:41:...
522641,3146.8193,3146.82,3000.0,0.0,0.0,0.0,Sep-2011,2024-05-15 11:41:...
515655,29938.576,29905.75,22800.0,0.0,0.0,0.0,May-2013,2024-05-15 11:41:...
501234,15219.313,15155.9,12000.0,0.0,0.0,0.0,May-2013,2024-05-15 11:41:...
498194,11642.714,11031.47,10000.0,0.0,0.0,0.0,Jan-2013,2024-05-15 11:41:...
495171,11138.843,10024.96,10000.0,0.0,0.0,0.0,Apr-2013,2024-05-15 11:41:...


In [19]:
from pyspark.sql.functions import when, col

In [20]:
loans_payments_fixed_df = loans_repay_filtered_df.withColumn(
"total_payment_received",
    when(
        (col("total_principal_received")!=0.0)& 
        (col("total_payment_received")==0.0),
        col("total_principal_received") + col("total_principal_received") + col("total_late_fee_received")
    ). otherwise(col("total_principal_received"))
)

In [22]:
loans_payments_fixed_df.filter("loan_id =='1064185'")

loan_id,total_principal_received,total_interest_received,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date,ingest_date
1064185,11600.98,11600.98,10000.0,33201.96,0.0,0.0,Dec-2014,2024-05-15 11:42:...


In [28]:
loans_payments_fixed2_df = loans_payments_fixed_df.filter("total_payment_received != 0.0")

In [29]:
loans_payments_fixed2_df

loan_id,total_principal_received,total_interest_received,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date,ingest_date
76003861,6423.8,5607.85,0.0,6423.8,574.88,Jan-2018,,2024-05-15 11:45:...
76263914,2400.0,323.18,0.0,2400.0,905.18,Dec-2017,,2024-05-15 11:45:...
75537401,12600.0,1392.76,0.0,12600.0,3437.74,Aug-2018,,2024-05-15 11:45:...
75038986,7805.03,7583.71,0.0,7805.03,440.72,Mar-2019,Apr-2019,2024-05-15 11:45:...
76301424,2614.78,1080.09,0.0,2614.78,153.89,Apr-2018,,2024-05-15 11:45:...
75333198,8664.09,3431.32,68.8,8664.09,343.9,Mar-2019,Apr-2019,2024-05-15 11:45:...
76391453,35000.0,10266.01,0.0,35000.0,18884.35,Feb-2019,,2024-05-15 11:45:...
76363364,15000.0,2093.11,0.0,15000.0,4648.59,Aug-2018,,2024-05-15 11:45:...
76272510,14463.21,11178.79,110.13,14463.21,734.18,Mar-2019,Apr-2019,2024-05-15 11:45:...
76304116,4800.0,1051.65,0.0,4800.0,1761.59,Dec-2017,,2024-05-15 11:45:...


In [32]:
loans_payments_fixed2_df.filter("next_payment_date = 0.0").count()

5

In [33]:
loans_payments_fixed2_df.filter("last_payment_date = 0.0").count()

48

In [34]:
loans_payments_fixed2_df.filter("last_payment_date is null").count()

1

In [35]:
loans_payments_fixed2_df.filter("next_payment_date is null").count()

1342626

In [41]:
loans_payments_ldate_fixed_df = loans_payments_fixed2_df.withColumn(
"last_payment_date",
    when(
        (col("last_payment_date") ==0.0),
         None
        ). otherwise(col("last_payment_date"))
)

In [42]:
loans_payments_ndate_fixed_df = loans_payments_ldate_fixed_df.withColumn(
"next_payment_date",
    when(
        (col("next_payment_date") ==0.0),
         None
        ). otherwise(col("next_payment_date"))
)

In [43]:
loans_payments_ndate_fixed_df.filter("last_payment_date = 0.0").count()

0

In [45]:
loans_payments_ndate_fixed_df.filter("next_payment_date = 0.0").count()

0

In [46]:
loans_payments_ndate_fixed_df.write \
.option("header", True) \
.format("csv") \
.mode("overwrite") \
.option("path", "/user/itv010110/lendingclubprojectJ/cleaned/loans_repayments_csv") \
.save()

In [47]:
loans_payments_ndate_fixed_df.write \
.format("parquet") \
.mode("overwrite") \
.option("path", "/user/itv010110/lendingclubprojectJ/cleaned/loans_repayments_parquet") \
.save()