In [1]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()

spark= SparkSession. \
builder. \
config('spark.ui.port','0'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
master('yarn'). \
getOrCreate()

In [2]:
loans_repay_raw_df = spark.read \
.format("csv") \
.option("header",True) \
.option("inferSchema", True) \
.load("/user/itv015703/lendingclubproject/raw/loans_repayments_data_csv")

In [3]:
loans_repay_schema = """loan_id string, total_principal_received float, 
total_interest_received float, total_late_fee_received float, total_payment_received float,
last_payment_amount float, last_payment_date string, next_payment_date string"""

In [5]:
loans_repay_raw_df = spark.read \
.format("csv") \
.option("header",True) \
.schema(loans_repay_schema) \
.load("/user/itv015703/lendingclubproject/raw/loans_repayments_data_csv")

In [11]:
from pyspark.sql.functions import *
loans_repay_df_ingestd  = loans_repay_raw_df.withColumn("ingest_date",current_timestamp())

In [12]:
loans_repay_df_ingestd.createOrReplaceTempView("loan_repayments")

In [13]:
spark.sql("select count(*) from loan_repayments where total_principal_received is null")

count(1)
69


In [14]:
columns_to_check = ["total_principal_received", "total_interest_received", "total_late_fee_received", "total_payment_received", "last_payment_amount"]

In [15]:
loans_repay_filtered_df  = loans_repay_df_ingestd.na.drop(subset = columns_to_check)

In [16]:
loans_repay_filtered_df.createOrReplaceTempView("loan_repayments")

In [17]:
spark.sql("select count(*) from loan_repayments where total_payment_received = 0.0")

count(1)
995


In [18]:
spark.sql("select count(*) from loan_repayments where total_payment_received = 0.0 and total_principal_received > 0.0")

count(1)
46


In [21]:
from pyspark.sql.functions import when, col

loans_payments_fixed_df = loans_repay_filtered_df.withColumn(
   "total_payment_received",
    when(
        (col("total_principal_received") != 0.0) &
        (col("total_payment_received") == 0.0),
        col("total_principal_received") + col("total_interest_received") + col("total_late_fee_received")
    ).otherwise(col("total_payment_received"))
)

In [22]:
loans_payments_fixed2_df = loans_payments_fixed_df.filter("total_payment_received != 0.0")

In [23]:
loans_payments_fixed2_df.filter("last_payment_date = 0.0").count()

48

In [24]:
loans_payments_fixed2_df.filter("next_payment_date = 0.0").count()

24

In [25]:
loans_payments_ldate_fixed_df = loans_payments_fixed2_df.withColumn(
  "last_payment_date",
   when(
       (col("last_payment_date") == 0.0),
       None
       ).otherwise(col("last_payment_date"))
)

In [26]:
loans_payments_ndate_fixed_df = loans_payments_ldate_fixed_df.withColumn(
  "last_payment_date",
   when(
       (col("next_payment_date") == 0.0),
       None
       ).otherwise(col("next_payment_date"))
)

In [27]:
loans_payments_ndate_fixed_df.filter("last_payment_date = 0.0").count()

0

In [28]:
loans_payments_ndate_fixed_df.filter("next_payment_date = 0.0").count()

24

In [30]:
loans_payments_ndate_fixed_df.write \
.format("parquet") \
.mode("overwrite") \
.option("path", "/user/itv015703/lendingclubproject/cleaned/loans_repayments_parquet") \
.save()

In [None]:
loans_payments_ndate_fixed_df.write \
.option("header", True) \
.format("csv") \
.mode("overwrite") \
.option("path", "/user/itv015703/lendingclubproject/cleaned/loans_repayments_csv") \
.save()

In [None]:
spark.stop()