In [230]:
import pandas as pd
import numpy as np
import numpy_financial as npf 


In [231]:
# Importing file
df = pd.read_excel("./data_lake/ModifiedAmortization Test.xlsx", "LoanTape")
print(df)
# Convert data to Dict type object
headers = [header for header in df.keys()]
json_obj = []

for obj in range(0, len(df)):
    data = {}
    for head in headers:
        data[head.strip()] = df[head][obj]
    
    json_obj.append(data)


   start_date  original_principal  amortization_term_months  \
0  2024-01-13               10000                       360   
1  2024-01-14               12000                       360   
2  2024-01-15               13000                       360   
3  2024-01-16               14000                       360   
4  2024-01-17               10000                       360   
5  2024-01-18               12000                       360   
6  2024-01-19               13000                       360   
7  2024-01-20               14000                       360   
8  2024-01-21               10000                       360   
9  2024-01-22               12000                       360   
10 2024-01-23               13000                        12   
11 2024-01-24               14000                        24   
12 2024-01-25               10000                        36   
13 2024-01-26               12000                        48   
14 2024-01-27               13000                      

In [232]:
# Calculated Values
COMPOUNDING_FREQ = {
    "Anually":1,
    "Semi-Annually":2,
    "Quarterly":4,
    "Monthly":12
}

PAYMENT_FREQ = {
    "Anually":{
        "ppy":1,
        "mo":12,
        "do":0
    },
    "Semi-Annually":{
        "ppy":2,
        "mo":6,
        "do":0
    },
    "Quarterly":{
        "ppy":4,
        "mo":3,
        "do":0
    },
    "Monthly":{
        "ppy":12,
        "mo":1,
        "do":0
    },
    "Bi-Monthly":{
        "ppy":6,
        "mo":2,
        "do":0
    },
    "Semi-Monthly":{
        "ppy":24,
        "mo":0,
        "do":15
    },
    "Bi-Weekly":{
        "ppy":26,
        "mo":0,
        "do":14
    },
    "Weekly":{
        "ppy":52,
        "mo":0,
        "do":7
    }
}

for instance in json_obj:
    compounding_period = COMPOUNDING_FREQ.get(instance["compounding_frequency"])
    payment_freq = PAYMENT_FREQ.get(instance["payment_frequency"])
    periods_per_year = payment_freq["ppy"]
    interest_rate_per_payment = np.float_power((1+(instance["interest_rate"]/compounding_period)),(compounding_period/periods_per_year))-1
    renewal_period = (instance["mortgage_term_months"]/12)*periods_per_year
    amortization_period = (instance["amortization_term_months"]/12)*periods_per_year
    payment_per_period = -1*npf.pmt(interest_rate_per_payment, amortization_period, instance["original_principal"])
    smm = instance["cpr"]/periods_per_year
    month_offset = payment_freq["mo"]
    day_offset = payment_freq["do"]

    additional_data = {
        "compounding_period":compounding_period,
        "periods_per_year":periods_per_year,
        "interest_rate_per_payment":interest_rate_per_payment,
        "renewal_period":int(renewal_period),
        "amortization_period":amortization_period,
        "payment_per_period":payment_per_period,
        "smm":smm,
        "month_offset":month_offset,
        "day_offset":day_offset,
    }
    instance.update(additional_data)
print(pd.DataFrame.from_dict(json_obj))



   start_date  original_principal  amortization_term_months  \
0  2024-01-13               10000                       360   
1  2024-01-14               12000                       360   
2  2024-01-15               13000                       360   
3  2024-01-16               14000                       360   
4  2024-01-17               10000                       360   
5  2024-01-18               12000                       360   
6  2024-01-19               13000                       360   
7  2024-01-20               14000                       360   
8  2024-01-21               10000                       360   
9  2024-01-22               12000                       360   
10 2024-01-23               13000                        12   
11 2024-01-24               14000                        24   
12 2024-01-25               10000                        36   
13 2024-01-26               12000                        48   
14 2024-01-27               13000                      

In [None]:
# Create periodic amortization schedule

from dateutil.relativedelta import relativedelta # type: ignore
from datetime import datetime

class GenerateAmortization:
    def __init__(self, amortization_type, loan_tape_json:dict=None, consolidated=False, to_csv=False):
        self.data = None
        if amortization_type not in ["periodic", "monthly", "daily"]:
            raise ValueError("amortization_type choices are periodic, monthly, daily")

        self.amortization_type = amortization_type
        
        if not loan_tape_json:
            raise ValueError("Required single input.")
        
        if loan_tape_json:
            self.data = loan_tape_json

        self.consolidated = consolidated
        self.to_csv = to_csv

    def check_by_renewal_period(self):
        if self.amortization_type == "daily":
            return self.payment_date.day == self.payment_day
        return True

    def get_payment(self, opening_balance, payment_per_period):
        if self.check_by_renewal_period():
            if opening_balance > 0:
                if payment_per_period < opening_balance:
                    payment=payment_per_period
                else:
                    payment = 0
            else:
                payment = 0
            return payment
        else: 
            return 0

    def get_interest(self, payment, interest_rate_per_payment):
        if self.check_by_renewal_period():
            if payment in ["", "-"," ", None, 0]:
                return 0
            return payment * interest_rate_per_payment
        else: 
            return 0

    def get_principal(self, payment, interest):
        if self.check_by_renewal_period():
            if payment in ["", "-"," ", None, 0]:
                return 0
            return payment-interest
        else: 
            return 0

    def get_prepayment(self, opening_balance, principal, smm):
        if self.check_by_renewal_period():
            if opening_balance - principal > 0:
                return opening_balance*smm
            return 0
        else: 
            return 0

    def get_maturity(self, period, renewal_period, opening_balance, principal, prepayment):
        if self.check_by_renewal_period():
            if period == renewal_period:
                return opening_balance - principal - prepayment
            return  0
        else: 
            return 0

    def get_closing_balance(self, opening_balance, principal, prepayment, new_origination, maturity, start_opening_balance):
        if self.check_by_renewal_period():
            return opening_balance - principal - prepayment + new_origination - maturity
        else :
            return opening_balance
    
    def get_new_origination(self, original_principal, period):
        if self.check_by_renewal_period():
            if period == 0:
                return original_principal
            else:
                return 0
        else:
                return 0
        
    def get_date(self, start_date, current_date, relative_date):
        date = start_date if not current_date else current_date+relative_date
        if self.amortization_type == "monthly":
            return datetime(date.year, date.month, 1)
        return date 
    
    def build_amortization_row(self, amortization_schedule, date, period, closing_balance, obj, show_maturity=False):
        opening_balance = closing_balance if closing_balance else 0
        payment = self.get_payment(opening_balance, obj["payment_per_period"])
        interest = self.get_interest(opening_balance, obj["interest_rate_per_payment"])
        principal = self.get_principal(payment, interest)
        prepayment = self.get_prepayment(opening_balance, principal, obj["smm"])
        new_origination = self.get_new_origination(obj["original_principal"], period) 
        maturity = self.get_maturity(period, obj["renewal_period"], opening_balance, principal, prepayment)
        closing_balance = self.get_closing_balance(opening_balance, principal, prepayment, new_origination, maturity, obj["original_principal"])

        amortization_schedule["date"] = date
        amortization_schedule["opening_balance"] = round(opening_balance, 2)
        amortization_schedule["payment"] = round(payment, 2)
        amortization_schedule["interest"] = round(interest, 2)
        amortization_schedule["principal"] = round(principal, 2)
        amortization_schedule["prepayment"] = round(prepayment, 2)
        amortization_schedule["new_origination"] = new_origination
        amortization_schedule["maturity"] = round(maturity, 2)
        amortization_schedule["closing_balance"] = round(closing_balance, 2)
        if show_maturity:
            return amortization_schedule, closing_balance, maturity
        return amortization_schedule, closing_balance

    @property
    def generate(self):
        if self.amortization_type == "daily":
            datas =  self.generate_daily_amortization()
        else:
            datas =  self.generate_amortization()

        if self.consolidated:
            amortization_lst = []
            for df in datas:
                for data in df.to_dict("records"):
                    amortization_lst.append(data)
            raw = pd.DataFrame.from_dict(amortization_lst)
            raw["date"] = pd.to_datetime(raw["date"])
            amortization = raw.sort_values(by="date", ascending=True)
        else:
            amortization = datas
        
        if self.to_csv:
            if self.consolidated: 
                # all_schedule = {}
                # amortization = amortization.to_dict("records")
                # for data in amortization:
                #     if data["date"] not in all_schedule:
                #         all_schedule[data["date"]] = []
                #     all_schedule[data["date"]].append(data)
                    
                # for key, schedule in all_schedule.items():
                #     agg_df = pd.DataFrame.from_dict(schedule)
                #     agg_df["date"] = pd.to_datetime(agg_df["date"])
                #     sorted_df = agg_df.sort_values(by="date", ascending=True)
                amortization.to_csv(
                    f"./consolidated_{self.amortization_type}/consolidation {self.amortization_type} amortization.csv", 
                    index=False
                )
            else:
                for key, schedule in enumerate(amortization, start=1):
                    schedule = schedule.to_dict("records")
                    agg_df = pd.DataFrame.from_dict(schedule)
                    agg_df["date"] = pd.to_datetime(agg_df["date"])
                    sorted_df = agg_df.sort_values(by="date", ascending=True)
                    sorted_df.to_csv(
                        f"./{self.amortization_type}_data_amortization/{self.amortization_type} of {key}.csv", 
                        index=False
                    )
            
        return amortization

    def generate_amortization(self):
        amotrizations_dataframes = []
        relative_date = relativedelta(months=1)

        for key, obj in enumerate(self.data, start=1):
            date = None
            closing_balance = None
            amortization_table = []
            for period in range(0, obj["renewal_period"]+1):

                amortization_schedule = {
                    "period":period,
                }
                
                date = self.get_date(obj["start_date"], date, relative_date)
                amortization_schedule, closing_balance =self.build_amortization_row(
                    amortization_schedule, 
                    date, 
                    period, 
                    closing_balance, 
                    obj, 
                    show_maturity=False
                )
                amortization_table.append(amortization_schedule)

            amortization_df = pd.DataFrame.from_dict(amortization_table)
            amortization_df["date"] = pd.to_datetime(amortization_df["date"])
            amortization_df =  amortization_df.sort_values(by="date", ascending=True)
            amotrizations_dataframes.append(amortization_df)
        return amotrizations_dataframes
    
    def generate_daily_amortization(self):
        amotrizations_dataframes = []
        relative_date = relativedelta(days=1)

        for key, obj in enumerate(self.data, start=1):
            date = None
            closing_balance = None
            amortization_table = []
            day = 0
            period = 0
            self.payment_day = obj["start_date"].day
            while True:
                    
                date = self.get_date(obj["start_date"], date, relative_date)
                self.payment_date = date
                amortization_schedule = {
                    "period":period,
                    "day":day,
                    "month-year":datetime(date.year, date.month, 1)
                }

                
                amortization_schedule, closing_balance, maturity =self.build_amortization_row(
                    amortization_schedule, 
                    date, 
                    period, 
                    closing_balance, 
                    obj, 
                    show_maturity=True
                )
                amortization_table.append(amortization_schedule)
                
                day += 1
                if int(maturity) != 0 and period > 0:
                    break

                if self.check_by_renewal_period():
                    period += 1

            amortization_df = pd.DataFrame.from_dict(amortization_table)
            amortization_df["date"] = pd.to_datetime(amortization_df["date"])
            amortization_df =  amortization_df.sort_values(by="date", ascending=True)
            amotrizations_dataframes.append(amortization_df)
        return amotrizations_dataframes

amortization = GenerateAmortization(
    amortization_type="monthly",
    loan_tape_json=json_obj, 
    consolidated=False, 
    to_csv=True
)
list_df = amortization.generate
