In [0]:
import json
import re
import pyspark.pandas as pd
import numpy as np
# import snowflake.connector
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [0]:
%sql
select * from hive_metastore.analytics.ds_orig_fee_data_input_t
--where loan_id in ('62721387','54075652','61440049','62639335','65030935','64847700','54075652')

loan_id,originating_bank,ir,transaction_amount,term,origination_date,original_maturity_date,loan_amount,principal_balance,transaction_date,reversal_date,reversal_amount,early_paid_at,sold_date,transrefno,max_report_date,max_dwh_timestamp,dwh_flag
60120573,CBW,0.26094552,45.82,11,2021-01-05,2021-12-05,2337.31,0.0,2021-01-05T16:19:26.000+0000,,,,2021-06-23,12109011,2023-05-16,2023-05-17T17:23:03.250+0000,N
60430493,CBW,0.25191554,50.51,18,2021-01-27,2022-07-27,2576.17,0.0,2021-01-27T16:21:27.000+0000,,,,2021-06-23,12362206,2023-05-16,2023-05-17T17:23:03.250+0000,N
60748582,CBW,0.31391842,12.97,11,2021-02-18,2022-01-18,661.49,0.0,2021-02-18T16:20:43.000+0000,,,,2021-06-23,12631600,2023-05-16,2023-05-17T17:23:03.250+0000,N
60917026,CBW,0.23997292,33.33,11,2021-03-01,2022-02-01,1700.09,0.0,2021-03-01T16:23:45.000+0000,,,2021-12-27T00:00:00.000+0000,2021-06-23,12870184,2023-05-16,2023-05-17T17:23:03.250+0000,N
61048552,CBW,0.28623748,8.79,6,2021-03-11,2021-09-11,448.75,0.0,2021-03-11T16:27:06.000+0000,,,,2021-06-23,13012723,2023-05-16,2023-05-17T17:23:03.250+0000,N
61224963,CBW,0.25193133,59.19,11,2021-03-23,2022-02-23,3018.97,0.0,2021-03-23T17:21:06.000+0000,,,2022-02-22T00:00:00.000+0000,2021-06-23,13197554,2023-05-16,2023-05-17T17:23:03.250+0000,N
61237674,CBW,0.286151,4.61,6,2021-03-24,2021-09-24,235.6,0.0,2021-03-24T17:21:16.000+0000,,,2021-09-09T00:00:00.000+0000,2021-06-23,13212748,2023-05-16,2023-05-17T17:23:03.250+0000,N
61357335,CBW,0.28601139,5.19,6,2021-04-02,2021-10-02,265.03,0.0,2021-04-02T17:19:33.000+0000,,,2021-06-30T00:00:00.000+0000,2021-06-23,13345941,2023-05-16,2023-05-17T17:23:03.250+0000,N
61645246,CBW,0.28595343,25.78,6,2021-04-22,2021-10-22,1315.08,0.0,2021-04-22T17:26:19.000+0000,,,,2021-06-23,13683722,2023-05-16,2023-05-17T17:23:03.250+0000,N
61728313,CBW,0.29891309,7.99,11,2021-04-27,2022-03-27,407.79,0.0,2021-04-27T17:26:03.000+0000,,,,2021-06-23,13770103,2023-05-16,2023-05-17T17:23:03.250+0000,N


In [0]:
# load data from snowflake
df = _sqldf

In [0]:
# pmt -> origination fee + loan_amount
# nper-> original_loan_term/ adjusted loan term
# pv -> loan_amount* -1

In [0]:
new_column_name_list= list(map(lambda x: x.upper(), df.columns))
df = df.toDF(*new_column_name_list)
print(new_column_name_list)

['LOAN_ID', 'ORIGINATING_BANK', 'IR', 'TRANSACTION_AMOUNT', 'TERM', 'ORIGINATION_DATE', 'ORIGINAL_MATURITY_DATE', 'LOAN_AMOUNT', 'PRINCIPAL_BALANCE', 'TRANSACTION_DATE', 'REVERSAL_DATE', 'REVERSAL_AMOUNT', 'EARLY_PAID_AT', 'SOLD_DATE', 'TRANSREFNO', 'MAX_REPORT_DATE', 'MAX_DWH_TIMESTAMP', 'DWH_FLAG']


In [0]:
# change columns with decimal type to float

df = df.withColumn('IR',df['IR'].cast('double'))\
       .withColumn('TERM',df['TERM'].cast('int'))\
       .withColumn('LOAN_AMOUNT',df['LOAN_AMOUNT'].cast('double'))\
       .withColumn('TRANSACTION_AMOUNT',df['TRANSACTION_AMOUNT'].cast('double'))\
       .withColumn('REVERSAL_AMOUNT',df['REVERSAL_AMOUNT'].cast('double'))\
       .withColumn('PRINCIPAL_BALANCE',df['PRINCIPAL_BALANCE'].cast('double'))\
       .withColumn('EARLY_PAID_AT', df['EARLY_PAID_AT'].cast('date'))\
       .withColumn('TRANSACTION_DATE',df['TRANSACTION_DATE'].cast('date'))\
       .withColumn('REVERSAL_DATE',df['REVERSAL_DATE'].cast('date'))


In [0]:
# assign amortization type to each loan
#canceled loan will be refunded at the canceled date(pbo=0)

df = df.withColumn('AMORT_TYPE',when(col('IR').isNull(),'non-amortizable')\
                                .when(~col('REVERSAL_DATE').isNull(),'reversal')\
                                .when(col('EARLY_PAID_AT').isNull(),'baseline')\
                                .when((~col('EARLY_PAID_AT').isNull()) & (col('PRINCIPAL_BALANCE')>0),'baseline')\
                                .otherwise('accelerated'))

In [0]:
# create python function that returns monthly interest paid
# rate = IR 
# per = nth payment period (1 to 24)
# nper = adj_term
# pv= loan_amount
def ipmt(rate, per, nper, pv):
    if rate != 0:
        pmt = (rate*(pv*(1+ rate)**nper))/(1*(1-(1+ rate)**nper))
    else:
        pmt = (-1*(pv)/nper)
        
    ipmt = (((1+rate)**(per-1)) * (pv*rate + pmt) - pmt)
    
    return ipmt

# create pyspark udf that returns monthly interest paid
udf_ipmt= udf(ipmt, FloatType())

In [0]:
# create df that contains consecutive integers
day_nbs = spark.range(
    1, 32, 1
).toDF("DAY_NUM")
day_nbs = day_nbs.withColumn('DAY_NUM',day_nbs['DAY_NUM'].cast('int'))

month_nbs = spark.range(
    1, 25, 1
).toDF("MONTH_NUM")
month_nbs = month_nbs.withColumn('MONTH_NUM',month_nbs['MONTH_NUM'].cast('int'))

In [0]:
# create new row for each period for each loan
df = df.join(month_nbs,(month_nbs.MONTH_NUM >= 1) & (month_nbs.MONTH_NUM <= df.TERM), 'left')

In [0]:
# monthly amortization 
df_amort = df.filter(col('IR') > 0)\
                 .withColumn('IPMT',
                             udf_ipmt((col('IR')/12), col('MONTH_NUM'), col('TERM'), col('LOAN_AMOUNT')))\
                 .withColumn('PERIOD_START_DATE',
                             add_months(col('ORIGINATION_DATE'), col('MONTH_NUM') - 1))\
                 .withColumn('DAYS_OF_PERIOD',
                             datediff(add_months(col('ORIGINATION_DATE'), col('MONTH_NUM')), add_months(col('ORIGINATION_DATE'), col('MONTH_NUM') - 1)))

In [0]:
# create new row for each day of each period for each loan
df_amort = df_amort.join(day_nbs,(day_nbs.DAY_NUM >= 1) & (day_nbs.DAY_NUM <= df_amort.DAYS_OF_PERIOD), 'left')

df_amort = df_amort.withColumn('IPMT_D',col('IPMT')/col('DAYS_OF_PERIOD'))\
                   .withColumn('REPORT_DATE',date_add(col('PERIOD_START_DATE'), col('DAY_NUM') - 1))

In [0]:
# calculate the daily amortization share and daily origination cost amortized
df_amort = df_amort.withColumn('AMORT_SHARE',
                                     col('IPMT_D')/sum(col('IPMT_D')).over(Window.partitionBy(col('LOAN_ID'),col('TRANSACTION_DATE'),col('AMORT_TYPE'))))\
                   .withColumn('CUM_SHARE',
                                     sum(col('AMORT_SHARE')).over(Window.partitionBy(col('LOAN_ID'),col('TRANSACTION_DATE'),col('AMORT_TYPE')).orderBy(col('REPORT_DATE').asc()).rowsBetween(Window.unboundedPreceding, Window.currentRow)))\
                   .withColumn('CUM_SHARE_LAG',
                                     lag(col('CUM_SHARE'),1).over(Window.partitionBy(col('LOAN_ID'),col('TRANSACTION_DATE'),col('AMORT_TYPE')).orderBy(col('REPORT_DATE').asc())))\
                   .withColumn('AMORT_SHARE',
                                     when(col('REPORT_DATE') < col('TRANSACTION_DATE'),lit('0'))\
                                    .when(col('REPORT_DATE') == col('TRANSACTION_DATE'),col('CUM_SHARE'))\
                                    .otherwise(col('AMORT_SHARE')))

In [0]:
sold_amort = df_amort.filter((~col('SOLD_DATE').isNull()) & ((col('REPORT_DATE') <= col('SOLD_DATE'))|(col('REPORT_DATE')==col('REVERSAL_DATE'))))

In [0]:
# calculate amort share at sold date
sold_amort = sold_amort.withColumn('AMORT_SHARE',
                                    when(col('REPORT_DATE') == col('SOLD_DATE'), 1 - col('CUM_SHARE_LAG'))\
                                    .when(col('REPORT_DATE') == col('REVERSAL_DATE'),lit('0'))\
                                    .otherwise(col('AMORT_SHARE')))

In [0]:
# calculate daily origination cost amortized, cumulative amortized cost, unearned balance, loan sale ending balance
sold_amort = sold_amort.withColumn('ORIG_FEE_AMORT', 
                                    when(col('REPORT_DATE') == col('SOLD_DATE'), 0)\
                                    .when(col('REPORT_DATE') == col('REVERSAL_DATE'),lit('0'))\
                                     .otherwise(col('TRANSACTION_AMOUNT') * col('AMORT_SHARE')))\
                        .withColumn('CUM_ORIG_FEE_AMORT',
                                    sum(col('ORIG_FEE_AMORT')).over(Window.partitionBy(col('LOAN_ID'),col('AMORT_TYPE')).orderBy(col('REPORT_DATE').asc()).rowsBetween(Window.unboundedPreceding, Window.currentRow)))\
                        .withColumn('DEFERRED_ORIG_FEE', 
                                    when((col('REPORT_DATE') == col('SOLD_DATE')) |(col('CUM_SHARE') == 0), 0)\
                                    .when(col('REPORT_DATE') == col('REVERSAL_DATE'),lit('0'))\
                                     .otherwise(col('TRANSACTION_AMOUNT') * (1 - col('CUM_SHARE'))))\
                        .withColumn('LOAN_SALE_ENDING_BAL',
                                    when(col('REPORT_DATE') == col('SOLD_DATE'), col('TRANSACTION_AMOUNT') * col('AMORT_SHARE'))\
                                    .when(col('REPORT_DATE') == col('REVERSAL_DATE'),lit('0'))\
                                     .otherwise(0))\
                        .withColumn('REVERSAL_AFTER_SOLD_LOAN',
                                    when(col('REPORT_DATE')==col('REVERSAL_DATE'),-1 * col('TRANSACTION_AMOUNT'))\
                                    .otherwise(lit('0')))

In [0]:
# reporting view
sold_report = sold_amort.select(col('LOAN_ID'),
                                col('ORIGINATING_BANK'),
                                col('REPORT_DATE'),
                                col('TRANSACTION_AMOUNT'),
                                col('ORIG_FEE_AMORT'),
                                col('CUM_ORIG_FEE_AMORT'),
                                col('DEFERRED_ORIG_FEE'),
                                col('LOAN_SALE_ENDING_BAL'),
                                col('REVERSAL_AFTER_SOLD_LOAN')
                               )

In [0]:
base_amort = df_amort.filter((col('AMORT_TYPE') == 'baseline') & (col('SOLD_DATE').isNull()))

In [0]:
# calculate amort share at last day
base_amort = base_amort.withColumn('AMORT_SHARE',
                                    when((col('MONTH_NUM') == col('TERM')) & (col('DAY_NUM') == col('DAYS_OF_PERIOD')), 1-col('CUM_SHARE_LAG'))\
                                     .otherwise(col('AMORT_SHARE')))

In [0]:
# calculate daily origination cost amortized, cumulative amortized cost, unearned balance, loan sale ending balance
base_amort = base_amort.withColumn('ORIG_FEE_AMORT', col('TRANSACTION_AMOUNT') * col('AMORT_SHARE'))\
                       .withColumn('CUM_ORIG_FEE_AMORT',
                                    when((col('MONTH_NUM') == col('TERM')) & (col('DAY_NUM') == col('DAYS_OF_PERIOD')), col('TRANSACTION_AMOUNT'))\
                                    .otherwise(sum(col('ORIG_FEE_AMORT')).over(Window.partitionBy(col('LOAN_ID'),col('AMORT_TYPE')).orderBy(col('REPORT_DATE').asc()).rowsBetween(Window.unboundedPreceding, Window.currentRow))))\
                        .withColumn('DEFERRED_ORIG_FEE', 
                                    when((col('CUM_SHARE') == 0) |(col('REPORT_DATE') < col('TRANSACTION_DATE')),lit('0'))
                                    .otherwise(col('TRANSACTION_AMOUNT') - col('CUM_ORIG_FEE_AMORT')))\
                        .withColumn('LOAN_SALE_ENDING_BAL',lit(0))\
                        .withColumn('REVERSAL_AFTER_SOLD_LOAN',lit('0'))

In [0]:
# reporting view
base_report = base_amort.select(col('LOAN_ID'),
                                col('ORIGINATING_BANK'),
                                col('REPORT_DATE'),
                                col('TRANSACTION_AMOUNT'),
                                col('ORIG_FEE_AMORT'),
                                col('CUM_ORIG_FEE_AMORT'),
                                col('DEFERRED_ORIG_FEE'),
                                col('LOAN_SALE_ENDING_BAL'),
                                col('REVERSAL_AFTER_SOLD_LOAN')
                               )

In [0]:
accel_amort = df_amort.filter((col('AMORT_TYPE') == 'accelerated') & (col('SOLD_DATE').isNull()) & (col('REPORT_DATE') <= col('EARLY_PAID_AT')))

In [0]:
accel_amort = accel_amort.withColumn('AMORT_SHARE',
                                    when(col('REPORT_DATE') == col('EARLY_PAID_AT'), 1-col('CUM_SHARE_LAG')).\
                                      otherwise(col('AMORT_SHARE')))

In [0]:
# calculate daily origination cost amortized, cumulative amortized cost, unearned balance, loan sale ending balance
accel_amort = accel_amort.withColumn('ORIG_FEE_AMORT', col('TRANSACTION_AMOUNT') * col('AMORT_SHARE'))\
                        .withColumn('CUM_ORIG_FEE_AMORT',
                                   when(col('REPORT_DATE') == col('EARLY_PAID_AT'), col('TRANSACTION_AMOUNT'))\
                                    .otherwise(sum(col('ORIG_FEE_AMORT')).over(Window.partitionBy(col('LOAN_ID')).orderBy(col('REPORT_DATE').asc()).rowsBetween(Window.unboundedPreceding, Window.currentRow))))\
                        .withColumn('DEFERRED_ORIG_FEE', 
                                    when((col('CUM_SHARE') == 0) |(col('REPORT_DATE') < col('TRANSACTION_DATE')),lit('0'))
                                    .otherwise(col('TRANSACTION_AMOUNT') - col('CUM_ORIG_FEE_AMORT')))\
                        .withColumn('LOAN_SALE_ENDING_BAL', lit(0))\
                        .withColumn('REVERSAL_AFTER_SOLD_LOAN',lit('0'))

In [0]:
# reporting view
accel_report = accel_amort.select(col('LOAN_ID'),
                                col('ORIGINATING_BANK'),
                                col('REPORT_DATE'),
                                col('TRANSACTION_AMOUNT'),
                                col('ORIG_FEE_AMORT'),
                                col('CUM_ORIG_FEE_AMORT'),
                                col('DEFERRED_ORIG_FEE'),
                                col('LOAN_SALE_ENDING_BAL'),
                                col('REVERSAL_AFTER_SOLD_LOAN')
                               )

In [0]:
reverse_amort = df_amort.filter((col('AMORT_TYPE') == 'reversal') & (col('SOLD_DATE').isNull()) & (col('REPORT_DATE') <= col('REVERSAL_DATE')))

In [0]:
reverse_amort = reverse_amort.withColumn('AMORT_SHARE',
                                    when(col('REPORT_DATE') > least(col('EARLY_PAID_AT'),col('REVERSAL_DATE')), lit('0'))\
                                    .when((col('REPORT_DATE') == col('EARLY_PAID_AT')) & (col('REVERSAL_DATE')>col('EARLY_PAID_AT')),1-col('CUM_SHARE_LAG'))\
                                    .otherwise(col('AMORT_SHARE')))\
                   .withColumn('CUM_SHARE',
                                     sum(col('AMORT_SHARE')).over(Window.partitionBy(col('LOAN_ID'),col('TRANSACTION_DATE')).orderBy(col('REPORT_DATE').asc()).rowsBetween(Window.unboundedPreceding, Window.currentRow)))\
                   .withColumn('CUM_SHARE_LAG',
                                     lag(col('CUM_SHARE'),1).over(Window.partitionBy(col('LOAN_ID'),col('TRANSACTION_DATE')).orderBy(col('REPORT_DATE').asc())))\
                   .withColumn('AMORT_SHARE',
                                  when(col('REPORT_DATE') == col('REVERSAL_DATE'), -col('CUM_SHARE_LAG'))
                                  .otherwise(col('AMORT_SHARE')))

In [0]:
# calculate daily origination cost amortized, cumulative amortized cost, unearned balance, loan sale ending balance
reverse_amort = reverse_amort.withColumn('ORIG_FEE_AMORT', col('TRANSACTION_AMOUNT') * col('AMORT_SHARE'))\
                        .withColumn('CUM_ORIG_FEE_AMORT',
                                   when(col('REPORT_DATE') == col('REVERSAL_DATE'), col('TRANSACTION_AMOUNT'))\
                                    .otherwise(sum(col('ORIG_FEE_AMORT')).over(Window.partitionBy(col('LOAN_ID'),col('TRANSACTION_DATE')).orderBy(col('REPORT_DATE').asc()).rowsBetween(Window.unboundedPreceding, Window.currentRow))))\
                        .withColumn('DEFERRED_ORIG_FEE', 
                                    when((col('CUM_SHARE') == 0) |(col('REPORT_DATE') < col('TRANSACTION_DATE')),lit('0'))
                                    .otherwise(col('TRANSACTION_AMOUNT') - col('CUM_ORIG_FEE_AMORT')))\
                        .withColumn('LOAN_SALE_ENDING_BAL', lit(0))\
                        .withColumn('REVERSAL_AFTER_SOLD_LOAN',lit('0'))

In [0]:
# reporting view
reverse_report = reverse_amort.select(col('LOAN_ID'),
                                col('ORIGINATING_BANK'),
                                col('REPORT_DATE'),
                                col('TRANSACTION_AMOUNT'),
                                col('ORIG_FEE_AMORT'),
                                col('CUM_ORIG_FEE_AMORT'),
                                col('DEFERRED_ORIG_FEE'),
                                col('LOAN_SALE_ENDING_BAL'),
                                col('REVERSAL_AFTER_SOLD_LOAN')
                               )

In [0]:
orig_fee_report = sold_report.unionAll(base_report).unionAll(accel_report).unionAll(reverse_report)
orig_fee_report = orig_fee_report.groupBy('LOAN_ID','ORIGINATING_BANK','REPORT_DATE').agg(max('TRANSACTION_AMOUNT').alias('TRANSACTION_AMOUNT'),sum('ORIG_FEE_AMORT').alias('ORIG_FEE_AMORT'),sum('CUM_ORIG_FEE_AMORT').alias('CUM_ORIG_FEE_AMORT'),sum('DEFERRED_ORIG_FEE').alias('DEFERRED_ORIG_FEE'),sum('LOAN_SALE_ENDING_BAL').alias('LOAN_SALE_ENDING_BAL'),sum('REVERSAL_AFTER_SOLD_LOAN').alias('REVERSAL_AFTER_SOLD_LOAN'))

In [0]:
orig_fee_amort = sold_amort.unionAll(base_amort).unionAll(accel_amort).unionAll(reverse_amort)

In [0]:
orig_fee_amort.write.mode("overwrite").saveAsTable('hive_metastore.analytics.orig_fee_amort_raw_check')

In [0]:
orig_fee_report.write.mode("overwrite").saveAsTable('hive_metastore.analytics.orig_fee_amort')

In [0]:
#baseline
#display(df1.filter(col('loan_id')=='64847700'))
#display(base_report.filter(col('loan_id')=='64847700'))

In [0]:
# reversal 
# accelerated + new transaction date
#display(df1.filter(col('loan_id')=='62721387'))
#display(reverse_report.filter(col('loan_id')=='62721387'))
##display(accel_report.filter(col('loan_id')=='62721387'))

In [0]:
#display(df1.filter(col('loan_id')=='61440049'))
#display(reverse_report.filter(col('loan_id')=='61440049'))
#display(accel_report.filter(col('loan_id')=='61440049'))

In [0]:
# sold case
#display(df1.filter(col('loan_id')=='62639335'))
#display(sold_report.filter(col('loan_id')=='62639335'))

In [0]:
#reversal
# baseline + new transaction date
#display(df1.filter(col('loan_id')=='65030935'))
#display(reverse_report.filter(col('loan_id')=='65030935'))
#display(base_report.filter(col('loan_id')=='65030935'))


In [0]:
# reverse > early paid at
#display(df1.filter(col('loan_id')=='54075652'))
#display(reverse_report.filter(col('loan_id')=='54075652'))
#display(accel_report.filter(col('loan_id')=='54075652'))


In [0]:
# sold loan with late reversal
#display(df1.filter(col('loan_id')=='65240701'))
#display(sold_report.filter(col('loan_id')=='65240701'))