In [73]:
from pyspark.sql import *
import getpass
username = getpass.getuser()
spark = SparkSession.builder \
    .appName('loans_repayment') \
    .config('spark.ui.port', '0') \
    .config("spark.sql.warehouse.dir", f"/user/{username}/warehouse") \
    .enableHiveSupport() \
    .master('yarn') \
    .getOrCreate()

In [72]:
spark.stop()

In [74]:
schema = "loan_id string, total_principal_received float, total_interest_received float, total_late_fee_received float, total_payment float, last_payment float,last_payment_date string, next_payment_dates string"

In [75]:
loan_df = spark.read.csv("/user/itv012667/lendingclub/raw/loans_repayments_csv",schema = schema,header=True)

In [76]:
loan_df.count()

2260701

In [77]:
from pyspark.sql.functions import *

# Ingest Time Stamp

In [78]:
loans_repay_ingest = loan_df.withColumn("ingest_time",current_timestamp())

In [79]:
loans_repay_ingest

loan_id,total_principal_received,total_interest_received,total_late_fee_received,total_payment,last_payment,last_payment_date,next_payment_dates,ingest_time
141581221,1055.81,2591.7,0.0,3647.51,709.23,Mar-2019,Apr-2019,2024-05-27 08:09:...
141506948,1252.75,306.04,0.0,1558.79,312.63,Mar-2019,Apr-2019,2024-05-27 08:09:...
141357400,626.37,354.96,0.0,981.33,197.27,Mar-2019,Apr-2019,2024-05-27 08:09:...
139445427,1118.16,297.36,0.0,1415.52,283.95,Mar-2019,Apr-2019,2024-05-27 08:09:...
141407409,1169.72,3605.3,0.0,4775.02,964.9,Mar-2019,Apr-2019,2024-05-27 08:09:...
141360802,2313.98,2512.88,0.0,4826.86,952.02,Mar-2019,Apr-2019,2024-05-27 08:09:...
141163960,4689.63,1994.93,0.0,6684.56,1342.57,Mar-2019,Apr-2019,2024-05-27 08:09:...
141533932,585.29,640.53,15.0,1240.82,235.13,Mar-2019,Apr-2019,2024-05-27 08:09:...
141441276,2030.82,762.81,0.0,2793.63,477.62,Mar-2019,Apr-2019,2024-05-27 08:09:...
141569080,1803.55,1110.59,0.0,2914.14,585.91,Mar-2019,Apr-2019,2024-05-27 08:09:...


In [80]:
loans_repay_ingest.count()

2260701

# Dropping Nulls 

In [81]:
check_columns = ["total_principal_received","total_interest_received","total_late_fee_received","total_payment","last_payment"]

In [82]:
loans_repay_ingest.filter("total_principal_received is null").count()

69

In [83]:
loans_check = loans_repay_ingest.na.drop(subset = check_columns)

In [84]:
loans_check.filter("total_principal_received is null").count()

0

# Ensure that total payment is always equal to sum of interest, latefee and principal fee payment

In [85]:
loans_new_df = loans_check.withColumn("total_payment",col("total_principal_received")+col("total_interest_received")+col("total_late_fee_received"))

In [86]:
loans_new_df

loan_id,total_principal_received,total_interest_received,total_late_fee_received,total_payment,last_payment,last_payment_date,next_payment_dates,ingest_time
141581221,1055.81,2591.7,0.0,3647.51,709.23,Mar-2019,Apr-2019,2024-05-27 08:09:...
141506948,1252.75,306.04,0.0,1558.79,312.63,Mar-2019,Apr-2019,2024-05-27 08:09:...
141357400,626.37,354.96,0.0,981.32996,197.27,Mar-2019,Apr-2019,2024-05-27 08:09:...
139445427,1118.16,297.36,0.0,1415.52,283.95,Mar-2019,Apr-2019,2024-05-27 08:09:...
141407409,1169.72,3605.3,0.0,4775.02,964.9,Mar-2019,Apr-2019,2024-05-27 08:09:...
141360802,2313.98,2512.88,0.0,4826.86,952.02,Mar-2019,Apr-2019,2024-05-27 08:09:...
141163960,4689.63,1994.93,0.0,6684.56,1342.57,Mar-2019,Apr-2019,2024-05-27 08:09:...
141533932,585.29,640.53,15.0,1240.8201,235.13,Mar-2019,Apr-2019,2024-05-27 08:09:...
141441276,2030.82,762.81,0.0,2793.63,477.62,Mar-2019,Apr-2019,2024-05-27 08:09:...
141569080,1803.55,1110.59,0.0,2914.1401,585.91,Mar-2019,Apr-2019,2024-05-27 08:09:...


# Ignore columns with total payment reveived is 0.0

In [87]:
loan_total_payment_no_impact = loans_new_df.filter("total_payment!=0.0")

In [88]:
loan_total_payment_no_impact

loan_id,total_principal_received,total_interest_received,total_late_fee_received,total_payment,last_payment,last_payment_date,next_payment_dates,ingest_time
141581221,1055.81,2591.7,0.0,3647.51,709.23,Mar-2019,Apr-2019,2024-05-27 08:09:...
141506948,1252.75,306.04,0.0,1558.79,312.63,Mar-2019,Apr-2019,2024-05-27 08:09:...
141357400,626.37,354.96,0.0,981.32996,197.27,Mar-2019,Apr-2019,2024-05-27 08:09:...
139445427,1118.16,297.36,0.0,1415.52,283.95,Mar-2019,Apr-2019,2024-05-27 08:09:...
141407409,1169.72,3605.3,0.0,4775.02,964.9,Mar-2019,Apr-2019,2024-05-27 08:09:...
141360802,2313.98,2512.88,0.0,4826.86,952.02,Mar-2019,Apr-2019,2024-05-27 08:09:...
141163960,4689.63,1994.93,0.0,6684.56,1342.57,Mar-2019,Apr-2019,2024-05-27 08:09:...
141533932,585.29,640.53,15.0,1240.8201,235.13,Mar-2019,Apr-2019,2024-05-27 08:09:...
141441276,2030.82,762.81,0.0,2793.63,477.62,Mar-2019,Apr-2019,2024-05-27 08:09:...
141569080,1803.55,1110.59,0.0,2914.1401,585.91,Mar-2019,Apr-2019,2024-05-27 08:09:...


# There are a few last payment and next payment dates as 0.0, change them to null

In [90]:
loan_total_payment_no_impact.filter("last_payment_date = 0.0")

loan_id,total_principal_received,total_interest_received,total_late_fee_received,total_payment,last_payment,last_payment_date,next_payment_dates,ingest_time
1064185,11600.98,11600.98,10000.0,33201.96,0.0,0.0,Dec-2014,2024-05-27 08:09:...
516382,21890.229,21856.03,16000.0,59746.258,0.0,0.0,Mar-2014,2024-05-27 08:09:...
529353,1.0,7288.0,47.0,7336.0,0.0,0.0,21221.5644086071,2024-05-27 08:09:...
528899,3045.0364,3019.64,2500.0,8564.676,0.0,0.0,Jan-2013,2024-05-27 08:09:...
527598,2398.9092,2220.51,2200.0,6819.419,0.0,0.0,Jul-2011,2024-05-27 08:09:...
525697,21797.86,19894.9,15750.0,57442.758,0.0,0.0,Jun-2015,2024-05-27 08:09:...
522641,3146.8193,3146.82,3000.0,9293.64,0.0,0.0,Sep-2011,2024-05-27 08:09:...
515655,29938.576,29905.75,22800.0,82644.33,0.0,0.0,May-2013,2024-05-27 08:09:...
500643,17215.0,91.6,14.0,17320.6,14166.7705,0.0,14135.97,2024-05-27 08:09:...
501234,15219.313,15155.9,12000.0,42375.215,0.0,0.0,May-2013,2024-05-27 08:09:...


In [91]:
loan_total_payment_no_impact.printSchema()

root
 |-- loan_id: string (nullable = true)
 |-- total_principal_received: float (nullable = true)
 |-- total_interest_received: float (nullable = true)
 |-- total_late_fee_received: float (nullable = true)
 |-- total_payment: float (nullable = true)
 |-- last_payment: float (nullable = true)
 |-- last_payment_date: string (nullable = true)
 |-- next_payment_dates: string (nullable = true)
 |-- ingest_time: timestamp (nullable = false)



In [92]:
loan_replacing_dates = loan_total_payment_no_impact.withColumn("last_payment_date",when(col("last_payment_date")=='0.0',None).otherwise(col("last_payment_date"))).withColumn("next_payment_dates",when(col("next_payment_dates")=='0.0',None).otherwise(col("next_payment_dates"))).drop("next_payment_date")

In [93]:
loan_replacing_dates.filter("next_payment_dates =0.0").count()

0

# write it to disk

In [None]:
loan_replacing_dates.repartition(1).write.mode("overwrite").option("header",True).option("path","/user/itv012667/lendingclub/cleaneddata/loan_repayment").save()

In [69]:
loan_replacing_dates.repartition(1).write.format("parquet").mode("overwrite").option("header",True).option("path","/user/itv012667/lendingclub/cleaneddata/loan_repayment").save()

Py4JJavaError: An error occurred while calling o466.save.
: java.util.NoSuchElementException: None.get
	at scala.None$.get(Option.scala:529)
	at scala.None$.get(Option.scala:527)
	at org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker$.metrics(BasicWriteStatsTracker.scala:175)
	at org.apache.spark.sql.execution.command.DataWritingCommand.metrics(DataWritingCommand.scala:51)
	at org.apache.spark.sql.execution.command.DataWritingCommand.metrics$(DataWritingCommand.scala:51)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.metrics$lzycompute(InsertIntoHadoopFsRelationCommand.scala:49)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.metrics(InsertIntoHadoopFsRelationCommand.scala:49)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.metrics$lzycompute(commands.scala:104)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.metrics(commands.scala:104)
	at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:63)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:101)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)
