In [2]:
from pyspark.sql import SparkSession
spark=SparkSession.\
builder.\
config('spark.shuffle.useOldFetchProtocol','true').\
config('spark.ui.port','0').\
config("spark.sql.warehouse.dir","/user/itv012713/warehouse").\
enableHiveSupport().\
master('yarn').\
getOrCreate()



In [3]:
from pyspark.sql.functions import current_timestamp, when, col

## Generating a Dataframe with proper datatypes

In [4]:
loan_repayments_schema = 'loan_id string,total_principal_received float,total_interest_received float,total_late_fee_received float,total_payment_received float,last_payment_amount float,last_payment_date string,next_payment_date string'

In [5]:
loans_repay_raw_df = spark.read.format("csv").option("header",True).schema(loan_repayments_schema).load("/user/itv012713/lendingclubproject/raw/loans_repayments_csv")

loans_repay_raw_df

## Inserting Ingestion Date for data processing

In [6]:
loans_repay_df_ingestd = loans_repay_raw_df.withColumn("ingest_date", current_timestamp())

In [7]:
loans_repay_df_ingestd

loan_id,total_principal_received,total_interest_received,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date,ingest_date
109653473,8582.03,4166.19,0.0,12748.22,198.22,Nov-2018,Apr-2019,2024-08-10 16:27:...
109677516,7736.13,2402.89,0.0,10139.02,482.57,Mar-2019,Apr-2019,2024-08-10 16:27:...
109910042,4000.0,226.8,0.0,4226.7983,2994.33,May-2018,,2024-08-10 16:27:...
109767343,16493.94,6995.31,0.0,23489.25,1070.93,Mar-2019,Apr-2019,2024-08-10 16:27:...
109858215,6407.11,2345.53,0.0,8752.64,398.94,Mar-2019,Apr-2019,2024-08-10 16:27:...
109740125,25100.0,555.41,0.0,25655.408,25711.26,Jun-2017,,2024-08-10 16:27:...
108257437,4800.0,433.18,0.0,5233.1846,115.71,Mar-2019,,2024-08-10 16:27:...
109898089,3086.26,3155.35,0.0,6241.61,296.95,Mar-2019,Apr-2019,2024-08-10 16:27:...
109253226,4254.25,3573.9,0.0,7828.15,357.25,Mar-2019,Apr-2019,2024-08-10 16:27:...
109848461,6400.0,51.74,0.0,6451.7383,6461.0,Jun-2017,,2024-08-10 16:27:...


## Dropping rows having null values

In [8]:
loans_repay_df_ingestd.createOrReplaceTempView("loans_repayments")

In [9]:
spark.sql("select count(*) from loans_repayments where total_principal_received is null")

count(1)
69


In [10]:
columns_to_check=["total_principal_received","total_interest_received","total_late_fee_received","total_payment_received","last_payment_amount"]

In [14]:
loan_repay_filtered_df = loans_repay_df_ingestd.na.drop(subset = columns_to_check)

In [15]:
loan_repay_filtered_df.count()

2260498

## Correcting the total_payment_received Column Where Value is 0

In [16]:
loan_repay_filtered_df.createOrReplaceTempView("loan_repayments")

In [17]:
spark.sql("select * from loan_repayments where total_payment_received = 0.0 and total_principal_received != 0")

loan_id,total_principal_received,total_interest_received,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date,ingest_date
1064185,11600.98,11600.98,10000.0,0.0,0.0,0.0,Dec-2014,2024-08-10 16:28:...
516382,21890.229,21856.03,16000.0,0.0,0.0,0.0,Mar-2014,2024-08-10 16:28:...
528899,3045.0364,3019.64,2500.0,0.0,0.0,0.0,Jan-2013,2024-08-10 16:28:...
527598,2398.9092,2220.51,2200.0,0.0,0.0,0.0,Jul-2011,2024-08-10 16:28:...
525697,21797.86,19894.9,15750.0,0.0,0.0,0.0,Jun-2015,2024-08-10 16:28:...
522641,3146.8193,3146.82,3000.0,0.0,0.0,0.0,Sep-2011,2024-08-10 16:28:...
515655,29938.576,29905.75,22800.0,0.0,0.0,0.0,May-2013,2024-08-10 16:28:...
501234,15219.313,15155.9,12000.0,0.0,0.0,0.0,May-2013,2024-08-10 16:28:...
498194,11642.714,11031.47,10000.0,0.0,0.0,0.0,Jan-2013,2024-08-10 16:28:...
495171,11138.843,10024.96,10000.0,0.0,0.0,0.0,Apr-2013,2024-08-10 16:28:...


In [18]:
loans_payments_fixed_df = loan_repay_filtered_df.withColumn(
    "total_payment_received", 
    when(
         (col("total_principal_received") != 0.0) &
         (col ("total_payment_received") == 0.0),
         col("total_principal_received") + col("total_interest_received") + col ("total_late_fee_received")
    ).otherwise (col("total_payment_received"))
)

In [19]:
loans_payments_fixed_df

loan_id,total_principal_received,total_interest_received,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date,ingest_date
109653473,8582.03,4166.19,0.0,12748.22,198.22,Nov-2018,Apr-2019,2024-08-10 16:28:...
109677516,7736.13,2402.89,0.0,10139.02,482.57,Mar-2019,Apr-2019,2024-08-10 16:28:...
109910042,4000.0,226.8,0.0,4226.7983,2994.33,May-2018,,2024-08-10 16:28:...
109767343,16493.94,6995.31,0.0,23489.25,1070.93,Mar-2019,Apr-2019,2024-08-10 16:28:...
109858215,6407.11,2345.53,0.0,8752.64,398.94,Mar-2019,Apr-2019,2024-08-10 16:28:...
109740125,25100.0,555.41,0.0,25655.408,25711.26,Jun-2017,,2024-08-10 16:28:...
108257437,4800.0,433.18,0.0,5233.1846,115.71,Mar-2019,,2024-08-10 16:28:...
109898089,3086.26,3155.35,0.0,6241.61,296.95,Mar-2019,Apr-2019,2024-08-10 16:28:...
109253226,4254.25,3573.9,0.0,7828.15,357.25,Mar-2019,Apr-2019,2024-08-10 16:28:...
109848461,6400.0,51.74,0.0,6451.7383,6461.0,Jun-2017,,2024-08-10 16:28:...


## Removing Rows where total_payment received is still 0

In [20]:
loans_payments = loans_payments_fixed_df.filter("total_payment_received != 0.0")

In [21]:
loans_payments

loan_id,total_principal_received,total_interest_received,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date,ingest_date
109653473,8582.03,4166.19,0.0,12748.22,198.22,Nov-2018,Apr-2019,2024-08-10 16:28:...
109677516,7736.13,2402.89,0.0,10139.02,482.57,Mar-2019,Apr-2019,2024-08-10 16:28:...
109910042,4000.0,226.8,0.0,4226.7983,2994.33,May-2018,,2024-08-10 16:28:...
109767343,16493.94,6995.31,0.0,23489.25,1070.93,Mar-2019,Apr-2019,2024-08-10 16:28:...
109858215,6407.11,2345.53,0.0,8752.64,398.94,Mar-2019,Apr-2019,2024-08-10 16:28:...
109740125,25100.0,555.41,0.0,25655.408,25711.26,Jun-2017,,2024-08-10 16:28:...
108257437,4800.0,433.18,0.0,5233.1846,115.71,Mar-2019,,2024-08-10 16:28:...
109898089,3086.26,3155.35,0.0,6241.61,296.95,Mar-2019,Apr-2019,2024-08-10 16:28:...
109253226,4254.25,3573.9,0.0,7828.15,357.25,Mar-2019,Apr-2019,2024-08-10 16:28:...
109848461,6400.0,51.74,0.0,6451.7383,6461.0,Jun-2017,,2024-08-10 16:28:...


## Writing None in last_payment_date and next_payment_date where value is 0

In [22]:
loans_payments.filter("last_payment_date=0.0").count()

48

In [23]:
loans_payments.filter("next_payment_date=0.0").count()

24

In [35]:
loans_payments.filter("last_payment_date is null").count()

1477

In [36]:
loans_payments.filter("next_payment_date is null").count()

1344240

In [24]:
loans_payments_date_fixed_df=loans_payments \
.withColumn("last_payment_date",
            when(
                (col("last_payment_date")==0.0),None).otherwise(col("last_payment_date")
                )
           ) \
.withColumn("next_payment_date",
            when(
                (col("next_payment_date")==0.0),None).otherwise(col("next_payment_date")
                )
           )

In [25]:
loans_payments_date_fixed_df

loan_id,total_principal_received,total_interest_received,total_late_fee_received,total_payment_received,last_payment_amount,last_payment_date,next_payment_date,ingest_date
109653473,8582.03,4166.19,0.0,12748.22,198.22,Nov-2018,Apr-2019,2024-08-10 16:29:...
109677516,7736.13,2402.89,0.0,10139.02,482.57,Mar-2019,Apr-2019,2024-08-10 16:29:...
109910042,4000.0,226.8,0.0,4226.7983,2994.33,May-2018,,2024-08-10 16:29:...
109767343,16493.94,6995.31,0.0,23489.25,1070.93,Mar-2019,Apr-2019,2024-08-10 16:29:...
109858215,6407.11,2345.53,0.0,8752.64,398.94,Mar-2019,Apr-2019,2024-08-10 16:29:...
109740125,25100.0,555.41,0.0,25655.408,25711.26,Jun-2017,,2024-08-10 16:29:...
108257437,4800.0,433.18,0.0,5233.1846,115.71,Mar-2019,,2024-08-10 16:29:...
109898089,3086.26,3155.35,0.0,6241.61,296.95,Mar-2019,Apr-2019,2024-08-10 16:29:...
109253226,4254.25,3573.9,0.0,7828.15,357.25,Mar-2019,Apr-2019,2024-08-10 16:29:...
109848461,6400.0,51.74,0.0,6451.7383,6461.0,Jun-2017,,2024-08-10 16:29:...


## Writing the data back to hdfs in parquet and csv format for further use

In [27]:
loans_payments_date_fixed_df.write \
.option("header", True) \
.format("csv") \
.mode("overwrite") \
.option("path", "/user/itv012713/lendingclubproject/clean_data/loans_repayments_cleaned_csv").save()

In [28]:
loans_payments_date_fixed_df.write \
.option("header", True) \
.format("parquet") \
.mode("overwrite") \
.option("path", "/user/itv012713/lendingclubproject/clean_data/loans_repayments_cleaned").save()