<a href="https://colab.research.google.com/github/xdderekchen/public_loan_data/blob/master/PubLoanPops.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<h1>ETL for public loan data from FNMA and FRED</h1>



# Introduction

A python implementation for processing public loan performance data from FNMA and FRED is presented here.

Features:
  - handling data from FNMA and FRED
  - implemented in both pandas and pyspark
  - results can be saved to parquet or sqlite.

Data Sources:
- FNMA Loan Data can be accessed from https://www.fanniemae.com/portal/funding-the-market/data/loan-performance-data.html
- FRED Loan Data can be accessed from http://www.freddiemac.com/research/datasets/sf_loanlevel_dataset.page

Ideally the source code should be packed into a python package, however in this early stage of implementation, I find it is easier to put all codes in the multiple sessions all in this notebook.

# Core Source Code

  ##  Utility Functions

In [0]:
import time
from datetime import datetime

def showtime(tstart):
    """
    Show runtime duration since tstart

    parameters
    ----------
    tstart: datetime, since this time, the duration is calculated
    Returns
    -------
    out : duration in ms
    """
    te = time.time()
    return f"{int((te - tstart) * 1000)} ms"

def decorator_time(method):
    """
    Decorator function. Show runtime duration for the wrapped function.
    """
    def timed(*args, **kw):
        ts = time.time()
        result = method(*args, **kw)
        te = time.time()
        if 'log_time' in kw:
            name = kw.get('log_name', method.__name__.upper())
            kw['log_time'][name] = f"{int((te - ts) * 1000)} ms"
        else:
            print('%r  %2.2f ms' % \
                  (method.__name__, (te - ts) * 1000)
                  )
        return result
    return timed


def compute_amortization(principals, monthly_rates, terms,  start_period = 0, end_period = None):
    """
    Compute amortization of loans

    Parameters
    ----------
    principals : scalar or array_like of shape(M, )
        pricipals of loans
    monthly_rates: scalar or array_like if  principals is a scalar
                   or
                   array_like or matrix_like if principals is an array_like
        For FRM, one Rate for each loan
        For ARM, one full time series for each loan
    
    terms:  scalar or array_like of shape(M, )
    loan terms, type should match that of principals

    Returns
    -------
    out : ndarray (M, N)

    """
    num_loans = 1
    if isinstance(principals, pd.Series):
       principals = principals.values
    elif np.ndim(principals) == 0:
       principals = [principals]
       
    #assume principals is np.array
    num_loans = len(principals)

    if isinstance(monthly_rates, pd.Series):
       monthly_rates = monthly_rates.values
    elif np.ndim(principals) == 0:
       monthly_rates = [monthly_rates]

    if isinstance(terms, pd.Series):
       terms = terms.values
    elif np.ndim(terms) == 0:
       terms = [terms]
    
    num_month = terms.max()
    mini_term = terms.min()
    if end_period is not None:
      num_month = min(num_month, max(1, (end_period - start_period)))
    
    upb_matrix = np.zeros((num_month, num_loans))
    the_payment = principals * (monthly_rates / (1 - (1 + monthly_rates) ** (-terms)))
    
    current_upb = principals
    if start_period > 0:
       current_upb = principals
       for t in range(1, start_period+1):
          pp_payment = the_payment - current_upb * monthly_rates
          current_upb = current_upb - pp_payment
    
    upb_matrix[0, :] = current_upb
      
    i = 1
    for t in range(start_period+1, start_period +num_month ):
       isbeyondTerm = np.greater(t , terms)
       pp_payment = the_payment - upb_matrix[i-1, :] * monthly_rates
       upb_matrix[i, :] = (upb_matrix[i-1, :] - pp_payment) 
       if t >= mini_term:
          iswithinTerm = np.greater( terms, t).astype(int)
          isbeyondTerm = np.greater(t , terms).astype(int)
          upb_matrix[i, :] = upb_matrix[i, :] * np.array(iswithinTerm) + (-999) * isbeyondTerm
       i = i +  1
    return upb_matrix.T

  ## class Data_Schema

In [0]:
class Data_Schema(object):
    _AcquisitionSchema_FNMA = {"LOAN_ID":         {"dtype": "string"},
                          "ORIG_CHN":        {"dtype": "string"},
                          "SellerName":      {"dtype": "string", "drop":True},
                          "ORIG_RT":         {"dtype": "float"},
                          "ORIG_AMT":        {"dtype": "double"},
                          "ORIG_TRM":        {"dtype": "int" },
                          "ORIG_DTE":        {"dtype": "date", "format":"%m/%Y", "format2":"MM/yyyy"},
                          "FRST_DTE":        {"dtype": "date", "format":"%m/%Y", "format2":"MM/yyyy"},
                          "OLTV":            {"dtype": "float", "default": 0},
                          "OCLTV":           {"dtype": "float", "default": 0},
                          "NUM_BO":          {"dtype": "int", "default": -1},
                          "DTI":             {"dtype": "int", "default": -1},
                          "CSCORE_B":        {"dtype": "float", "default": -1},
                          "FTHB_FLG":        {"dtype": "string"},
                          "PURPOSE":         {"dtype": "string"},
                          "PROP_TYP":        {"dtype": "string"},
                          "NUM_UNIT":        {"dtype": "int", "default": -1},
                          "OCC_STAT":        {"dtype": "string"},
                          "STATE":           {"dtype": "string"},
                          "ZIP_3":           {"dtype": "string"},
                          "MI_PCT":          {"dtype": "int", "default": 0},
                          "Product_Type":    {"dtype": "string"},
                          "CSCORE_C":        {"dtype": "int", "default": -1},
                          "MI_TYPE":         {"dtype": "string", "default": "0"},
                          "RELOCATION_FLG":  {"dtype": "string"}
                          }

    # schema of Performance Data
    _PerformanceSchema_FNMA = {"LOAN_ID":         {"dtype": "string"},
                          "ACT_DTE":         {"dtype": "date", "format":"%d/%m/%Y", "format2":"MM/dd/yyyy"},
                          "SERVICER":        {"dtype": "string", "drop":True},
                          "LAST_RT":         {"dtype": "float"},
                          "LAST_UPB":        {"dtype": "double"},
                          "LOAN_AGE":        {"dtype": "int"},
                          "Months_To_Legal_Mat": {"dtype": "int", "default": -1},
                          "Adj_Month_To_Mat": {"dtype": "int", "default": -1},
                          "Maturity_Date":   {"dtype": "date",                      "format2":"MM/yyyy"},
                          "MSA":             {"dtype": "string", "drop":True},
                          "DLQ_STATUS":      {"dtype": "string"},
                          "MOD_FLAG":        {"dtype": "string", "drop":True},
                          "ZB_CODE":         {"dtype": "string", "drop":True},
                          "ZB_DTE":          {"dtype": "date",   "drop":True,        "format2":"MM/yyyy"    },
                          "LPI_DTE":         {"dtype": "date",   "drop":True,        "format2":"MM/dd/yyyy" },
                          "FCC_DTE":         {"dtype": "date",   "drop":True,        "format2":"MM/dd/yyyy" },
                          "DISP_DTE":         {"dtype": "date",   "drop":True,        "format2":"MM/dd/yyyy" },
                          "FCC_COST":        {"dtype": "float", "drop":True},
                          "PP_COST":         {"dtype": "float", "drop":True},
                          "AR_COST":         {"dtype": "float", "drop":True},
                          "IE_COST":         {"dtype": "float", "drop":True},
                          "TAX_COST":        {"dtype": "float", "drop":True},
                          "NS_PROCS":        {"dtype": "float", "drop":True},
                          "CE_PROCS":        {"dtype": "float", "drop":True},
                          "RMW_PROCS":       {"dtype": "float", "drop":True},
                          "O_PROCS":         {"dtype": "float", "drop":True},
                          "NON_INT_UPB":     {"dtype": "float"},
                          "PRIN_FORG_UPB_FHFA": {"dtype": "float"},
                          "REPCH_FLAG":      {"dtype": "string"},
                          "PRIN_FORG_UPB_OTH": {"dtype": "string"},
                          "TRANSFER_FLG":    {"dtype": "string"},
                          }
    _AcquisitionSchema_FRED = { "CSCORE_B":        {"dtype": "float", "default": -1},
                               "FRST_DTE":        {"dtype": "date", "format":"%Y/%m", "format2":"yyyyMM"},
                              "FTHB_FLG":        {"dtype": "string"},
                              "MAT_DTE" :        {"dtype": "date", "format":"%Y/%m", "format2":"yyyyMM"},
                              "MSA":             {"dtype": "string", "drop":True},
                              "MI_PCT":          {"dtype": "int", "default": 0},
                              "NUM_UNIT":        {"dtype": "int", "default": -1},
                               "OCC_STAT":        {"dtype": "string"},
                              "OCLTV":           {"dtype": "float", "default": 0},
                              "DTI":             {"dtype": "int", "default": -1},
                              "ORIG_AMT":        {"dtype": "double"},
                              "OLTV":            {"dtype": "float", "default": 0},
                              "ORIG_RT":         {"dtype": "float"},
                              "ORIG_CHN":        {"dtype": "string"},
                              "ppmt_pnlty" :        {"dtype": "string"},
                              "Product_Type":    {"dtype": "string"},
                              "STATE":           {"dtype": "string"},
                              "PROP_TYP":        {"dtype": "string"},
                              "ZIP_3":           {"dtype": "string"},
                              "LOAN_ID":         {"dtype": "string"},
                              "PURPOSE":         {"dtype": "string"},
                              "ORIG_TRM":        {"dtype": "int" },
                              "NUM_BO":          {"dtype": "int", "default": -1},
                              "SellerName":      {"dtype": "string", "drop":True},
                              "SERVICER":        {"dtype": "string", "drop":True},
                              "flag_sc":        {"dtype": "string", "drop":True}
                          }

    # schema of Performance Data
    _PerformanceSchema_FRED = {"LOAN_ID":         {"dtype": "string"},
                          "ACT_DTE":         {"dtype": "date", "format":"%Y/%m/%d", "format2":"yyyyMM"},                       
                          "LAST_UPB":        {"dtype": "double"},
                          "DLQ_STATUS":      {"dtype": "string"},
                          "LOAN_AGE":        {"dtype": "int"},
                          "Months_To_Legal_Mat": {"dtype": "int", "default": -1},
                          "REPCH_FLAG":      {"dtype": "string"},
                          "MOD_FLAG":        {"dtype": "string", "drop":True},
                          "ZB_CODE":         {"dtype": "string", "drop":True},
                          "ZB_DTE":          {"dtype": "date",   "drop":True,        "format2":"yyyyMM"    },
                          "LAST_RT":         {"dtype": "float"},
                          "NON_INT_UPB":     {"dtype": "float"},
                          "LPI_DTE":         {"dtype": "date",   "drop":True,        "format2":"yyyyMM" },
                          "CE_PROCS":        {"dtype": "float", "drop":True},
                          "NS_PROCS":        {"dtype": "float", "drop":True},
                          "RMW_PROCS":       {"dtype": "float", "drop":True},  
                          "EXpenses":        {"dtype": "float", "drop":True},
                          "LEGAL_COST":       {"dtype": "float", "drop":True},
                          "MAINT_COST":       {"dtype": "float", "drop":True},
                          "TAX_COST":        {"dtype": "float", "drop":True},
                          "MISC_COST":       {"dtype": "float", "drop":True},
                          "ACTUAL_LOSS":     {"dtype": "float", "drop":False},
                          "MOD_LOSS":        {"dtype": "float", "drop":False},
                          "STEPMOD_IND":     {"dtype": "string", "drop":True},
                          "DPM_IND":         {"dtype": "string", "drop":True},
                          "ELTV"   :         {"dtype": "float", "drop":True}
                          }
    def __init__(self, agency="FNMA"):
        self.agency = agency
   
    def AcquisitionSchema(self):
       if self.agency == "FNMA":
          return self._AcquisitionSchema_FNMA;
       else:
          return self._AcquisitionSchema_FRED;

    def PerformanceSchema(self):
       if  self.agency == "FNMA":
          return self._PerformanceSchema_FNMA;
       else:
          return self._PerformanceSchema_FRED;

    @staticmethod
    def columnType(v):
        vout = str
        dtype = v.get('dtype')
        if dtype == "float":
            vout = np.float32
        elif dtype == "double":
            vout = np.float64
        elif dtype  == "int":
            value = v.get('default')
            if value is None:
                vout = np.int32
            else:
                vout = np.float32
        elif dtype  == "string":
            vout = str
        elif dtype == "date":
            vout = "date"
        else:
            vout = "other"
        return vout

## Class Agency_Loan
 pandas version

In [0]:
import numpy as np
import pandas as pd
import sqlalchemy as db

class Agency_Loan(object):
    '''processing public loan data from Fannie Mae and FRED'''
    def __init__(self, agency, acqYYYYQQ, stageFolder=None,  acquisition_file=None, performance_file=None):
        self.Data_Schema = Data_Schema(agency)
        self.acqYYYYQQ = acqYYYYQQ
        self.acquisition_file = acquisition_file
        self.performance_file = performance_file
        self.stageFolder = stageFolder
        self._Loan_Data = None
        self._Performance_Data = None

    @property
    def Loan_Data(self):
        return self._Loan_Data

    @Loan_Data.setter
    def Loan_Data(self, x):
        self._Loan_Data = x

    @property
    def Performance_Data(self):
        return self._Performance_Data

    @Performance_Data.setter
    def Performance_Data(self, x):
        self._Performance_Data = x

    def read_data_acquisition(self, acquisition_file=None):
        '''
        read and pre-process acqusition data
        '''
        if acquisition_file is not None:
            self.acquisition_file = acquisition_file

        col_names = [ k for k in self.Data_Schema.AcquisitionSchema().keys()]
        col_dtype = { k: Data_Schema.columnType(v) for k, v in self.Data_Schema.AcquisitionSchema().items() \
                         if Data_Schema.columnType(v) not in ( "other", "date") }

        parse_dates =[ k for k, v in self.Data_Schema.AcquisitionSchema().items() \
                         if Data_Schema.columnType(v) == "date" ]

        #print(parse_dates)

        df = pd.read_csv(self.acquisition_file, delimiter ='|', header=None,
                    names=col_names,
                    dtype=col_dtype,  parse_dates = parse_dates
                    )
        df['OCLTV'] = df['OCLTV'].fillna(df['OLTV'])
        self.Loan_Data = df
        return None


    def read_data_performance(self, performance_file=None):
        '''
        read and pre-process performance data
        '''
        if performance_file is not None:
            self.performance_file = performance_file

        col_names = [k for k in self.Data_Schema.PerformanceSchema().keys()]
        col_dtype = {k: Data_Schema.columnType(v) for k, v in self.Data_Schema.PerformanceSchema().items() \
                     if Data_Schema.columnType(v) not in ("other", "date")}

        parse_dates = [k for k, v in self.Data_Schema.PerformanceSchema().items() \
                       if Data_Schema.columnType(v) == "date"]

        df = pd.read_csv(self.performance_file, delimiter='|', header=None,
                         names=col_names,
                         dtype=col_dtype,
                         parse_dates=parse_dates
                         )

        for k, v in self.Data_Schema.PerformanceSchema().items():
            value = v.get('default')
            if value is not None:
                if v.get("dtype") == "int":
                    df[k] = df[k].fillna(value).astype("int32")
                else:
                    df[k] = df[k].fillna(value)

        self.Performance_Data = df
        return None
        #df["ACQ"] = self.acqYYYYQQ
        #if self.resultFolder is not None:
        #   df.to_parquet(self.resultFolder + "\\performance.parquet", engine='pyarrow', partition_cols=['ACQ'])
      
    @decorator_time
    def read_data_performance_chunk(self, performance_file=None, **kwargs):
        '''
        read and pre-process performance data

        df.write.mode('append').parquet('parquet_data_file')

        '''
        if performance_file is not None:
            self.performance_file = performance_file

        col_names = [k for k in self.Data_Schema.PerformanceSchema().keys()]
        col_dtype = {k: Data_Schema.columnType(v) for k, v in self.Data_Schema.PerformanceSchema().items() \
                     if Data_Schema.columnType(v) not in ("other", "date")}

        parse_dates = [k for k, v in self.Data_Schema.PerformanceSchema().items() \
                       if Data_Schema.columnType(v) == "date"]

        
        db_engine_file = 'sqlite:///' + self.stageFolder + "\\performance10.db"
        db_engine = db.create_engine(db_engine_file)

        chunksize = 10000
        loop = 0
        create_it = True
        pqwriter = None
        for df in pd.read_csv(self.performance_file, delimiter='|', header=None,
                         chunksize=chunksize, iterator=True,
                         names=col_names,
                         dtype=col_dtype,
                         parse_dates=parse_dates
                         ):
            loop = loop + 1
            #print(loop)
            for k, v in self.Data_Schema.PerformanceSchema().items():
                value = v.get('default')
                if value is not None:
                    if v.get("dtype") == "int":
                       df[k] = df[k].fillna(value).astype("int32")
                    else:
                       df[k] = df[k].fillna(value)


            #print("save_sqlite")
            if create_it:
               df.to_sql("perf", db_engine, if_exists='replace')
               create_it = False
            else:
               df.to_sql("perf", db_engine, if_exists='append')

        if pqwriter:
            pqwriter.close()
        
        ##self.SQLite2Parquet("performance9.db",  "performance.parquet")
        return d

    def SQLite2Parquet(self, dbFile, parquetFile):
        print("SQLite2Parquet")
        db_engine_file = 'sqlite:///' + self.resultFolder + "\\" + dbFile
        db_engine = db.create_engine(db_engine_file)
        df =  pd.read_sql_query("select * from  perf ", db_engine )
        #db_engine.close()
        print("finsing reading")
        df["ACQ"] = self.acqYYYYQQ
        df.to_parquet(self.resultFolder + "\\" + parquetFile, engine='pyarrow', partition_cols=['ACQ'])
        return True

    def save_as_parquet(self, resultFolder= None, Loan_Data =True, Performance_Data=True):
       if resultFolder is not None:
            if os.path.isdir(resultFolder):
              try:
                 if (Loan_Data == True) and (self.Loan_Data is not None):
                   self.Loan_Data.write.mode('overwrite').parquet(resultFolder + "/output/FNMA/Loan.parquet/ACQ=" + self.acqYYYYQQ)
                 if (Performance_Data == True) and (self.Performance_Data is not None):
                   self.Performance_Data.write.mode('overwrite').parquet(resultFolder + "/output/FNMA/LoanPerformance.parquet/ACQ=" + self.acqYYYYQQ)
              except Exception as err:
                  print("save_as_parquet: Error:  {0}".format(err))

    def clear_data(self):
       if _Loan_Data is not None:
         del _Loan_Data
         _Loan_Data = None
       if _Performance_Data is not None:
         del _Performance_Data
         _Performance_Data = None
       
    def compute_schd_upb(self, monthCount, outAsMatrix=True, loan_Pandas_Dataframe=None):
          '''
          Calculate the scheduled UPB based on the Loan_Data["ORIG_AMT", "ORIG_RT", "ORIG_TRM"]

          Parameters
          ----------
          monthCount: int, sepcify the period to get UPB.
          outAsMatrix : default to be True, returning "matrix" or  "pandas_df"
       
          Returns
          -------
          out : ndarray (loanCount, monthCount), pandas dataframe
          '''
          if loan_Pandas_Dataframe  is None:
             sub_df = self._Loan_Data
          else:
             sub_df = loan_Pandas_Dataframe
         
          upb_matrix = compute_amortization(principals    = sub_df["ORIG_AMT"], 
                                            monthly_rates = sub_df["ORIG_RT"] / 1200,
                                            terms         = sub_df["ORIG_TRM"],
                                            start_period  = 0, 
                                            end_period    = monthCount)
          
          if outAsMatrix == True:
             return upb_matrix
          else:
             upb_array = upb_matrix.flatten()
             r,c = upb_matrix.shape
             ages_value = np.arange(c)
             ages_value = np.tile(ages_value, r)
             loanids = np.repeat(sub_df["LOAN_ID"].values, c)
             upb_array = pd.DataFrame({"LOAN_ID": loanids,
                           "LOAN_AGE": ages_value,
                           "SCHD_UPB": upb_array})
             
             return (upb_array)



## Test Driver for class Agency_Loan

In [0]:
#Case 1, FNMA loan
myobj = Agency_Loan(agency="FNMA", acqYYYYQQ = "2000Q1", stageFolder = "/content/drive/My Drive/ML_Data/stock",
                                          acquisition_file = "/content/drive/My Drive/ML_Data/stock/Acquisition_2000Q1.txt",
                                   performance_file = "/content/drive/My Drive/ML_Data/stock/Performance_2000Q1.txt" )
loandf  = myobj.read_data_acquisition()
perfdf = myobj.read_data_performance_chunk()
upb =    myobj.compute_schd_upb(monthCount =12, outAsMatrix=False)


# Install PySpark and configure the environment

# Class PUBLIC_LOAN_FNMA

In [1]:
! git clone https://github.com/xdderekchen/public_loan_data.git

Cloning into 'public_loan_data'...
remote: Enumerating objects: 102, done.[K
remote: Counting objects:   0% (1/102)[Kremote: Counting objects:   1% (2/102)[Kremote: Counting objects:   2% (3/102)[Kremote: Counting objects:   3% (4/102)[Kremote: Counting objects:   4% (5/102)[Kremote: Counting objects:   5% (6/102)[Kremote: Counting objects:   6% (7/102)[Kremote: Counting objects:   7% (8/102)[Kremote: Counting objects:   8% (9/102)[Kremote: Counting objects:   9% (10/102)[Kremote: Counting objects:  10% (11/102)[Kremote: Counting objects:  11% (12/102)[Kremote: Counting objects:  12% (13/102)[Kremote: Counting objects:  13% (14/102)[Kremote: Counting objects:  14% (15/102)[Kremote: Counting objects:  15% (16/102)[Kremote: Counting objects:  16% (17/102)[Kremote: Counting objects:  17% (18/102)[Kremote: Counting objects:  18% (19/102)[Kremote: Counting objects:  19% (20/102)[Kremote: Counting objects:  20% (21/102)[Kremote: Counting objects:  2

In [0]:

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz

#package to add PySpark to sys.path at runtime
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

import findspark
findspark.init() 

# Mount the Google Drive so we can use data storage for inputs and outputs

In [24]:
from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


# Class PUBLIC_LOAN_FNMA_spark. 

## Re-implementation of  PUBLIC_LOAN_FNMA using **pyspark** API

In [0]:
from datetime import datetime
#cannot import name 'monotonically_increasing_id'
from pyspark.sql import functions as F,  Window
##from pyspark.sql.functions import col, udf, to_date, when, collect_list, lag,  max, row_number, monotonically_increasing_id
from pyspark.sql.types import DateType, StructType, StructField,  DoubleType, FloatType, IntegerType, LongType, StringType, DateType
import numpy as np
import time

class PUBLIC_LOAN_FNMA_spark(PUBLIC_LOAN_FNMA):
    '''processing public loan data from Fannie Mae '''
      #schema of Acquisition Data
    
    def __init__(self, acqYYYYQQ, stageFolder=None,  acquisition_file=None, performance_file=None):
         super().__init__(acqYYYYQQ, stageFolder    , acquisition_file, performance_file)
         

    @staticmethod
    def convTypeToSpark(v):
          vout = StringType()
          dtype = v.get('dtype')
          if dtype == "float":
             vout = FloatType()
          elif dtype == "double":
            vout = DoubleType()
          elif dtype  == "int":
            vout = IntegerType()
          elif dtype  == "string":
            vout = StringType()
          elif dtype == "date":
            vout = StringType()
          else:
            vout = StringType()
          return vout

    @staticmethod
    def save_to_hive(dbname, tablename, dataframe_src):
          #1. Creating Hive Database
          spark.sql('create database IF NOT EXISTS ' + dbname)
          spark.sql('use ' + dbname )
          spark.sql("drop table IF  EXISTS " + tablename)                                   
          spark.sql("show tables").show()

          spark.sql("insert into table ratings \
                     select * from " +dataframe_src )

    def compute_schd_upb(self, monthCount, outAsMatrix=True):
          '''
          Calculate the scheduled UPB based on the Loan_Data["ORIG_AMT", "ORIG_RT", "ORIG_TRM"]

          Parameters
          ----------
          monthCount: int, sepcify the period to get UPB.
          outformat : string, "matrix", "pandas_df", "spark_df"
       
          Returns
          -------
          out : ndarray (M, N),  spark dataframe
          '''
          sub_df = self.Loan_Data.select("LOAN_ID", "ORIG_AMT", "ORIG_RT", "ORIG_TRM").toPandas()
         
          upb_matrix = super().compute_schd_upb(monthCount, outAsMatrix, sub_df)
          
          if outAsMatrix == True:
             return upb_matrix
          else:
             return spark.createDataFrame(upb_matrix)
        


    def read_data_acquisition (self, acquisition_file=None):
          '''
          read and pre-process acqusition data
          '''
          if acquisition_file is not None:
             self.acquisition_file = acquisition_file

          # need to check file existing
          ColumnSchema = StructType([StructField(k, PUBLIC_LOAN_FNMA_spark.convTypeToSpark(v), True) for k, v in self._AcquisitionSchema.items()])                   
          ts = time.time()
          acq_df = spark.read.format("csv").options(header='False', delimiter="|").schema(ColumnSchema).load(self.acquisition_file )
          
          for k, v in self._AcquisitionSchema.items():
             if v.get('dtype') == "date":
                acq_df = acq_df.withColumn(k,  F.to_date(F.col(k), v.get('format2')))
             else:
                value = v.get('default')
                if value is not None:
                   acq_df = acq_df.withColumn(k, F.when(F.col(k).isNull(), value).otherwise(F.col(k)))

          acq_df = acq_df.withColumn('OCLTV',     F.expr("case when OCLTV ==0 then OLTV else OCLTV end"))

          for k, v in self._AcquisitionSchema.items():
             if v.get('drop') == True:
                acq_df= acq_df.drop(k)
         
          self.Loan_Data = acq_df
          
          return None

    def read_data_performance (self, performance_file=None):
          '''
          read and pre-process acqusition data
          '''
          if performance_file is not None:
             self.performance_file = performance_file

          # need to check file existing
          ColumnSchema = [StructField(k, PUBLIC_LOAN_FNMA_spark.convTypeToSpark(v), True) for k, v in self._PerformanceSchema.items()]
          ColumnSchema = StructType(ColumnSchema)                                

          #udf
          #str_to_date =  udf (lambda x: datetime.strptime(x, '%m/%Y'), DateType())   
          windowIncF12 =  Window().partitionBy("LOAN_ID").orderBy("ACT_DTE").rowsBetween(1, 12)
          windowIncF24 =  Window().partitionBy("LOAN_ID").orderBy("ACT_DTE").rowsBetween(1, 24)
          windowIncP12 =  Window().partitionBy("LOAN_ID").orderBy("ACT_DTE").rowsBetween(-12, -1)
          windowInc    =  Window().partitionBy("LOAN_ID").orderBy("ACT_DTE")

          ts = time.time()
          per_df = spark.read.format("csv").options(header='False', delimiter="|").schema(ColumnSchema).load(self.performance_file )
          #per_df.show(10)
          print("read cvs: ", showtime(ts))
        
          ts = time.time()
          # date columnd
          for k, v in self._PerformanceSchema.items():
             if v.get('dtype') == "date":
                per_df = per_df.withColumn(k,  F.to_date(F.col(k), v.get('format2')))

          per_df = per_df.withColumn('DLQ_STATUS', F.when(F.col('DLQ_STATUS').isNull(), -1).when(F.col('DLQ_STATUS') == "X", -2).otherwise(F.col('DLQ_STATUS').cast(IntegerType())))
          per_df = per_df.withColumn('DEFT_COST',  F.expr("FCC_COST + PP_COST + AR_COST + IE_COST + TAX_COST"))
          per_df = per_df.withColumn('DEFT_PROCS',  F.expr("NS_PROCS + CE_PROCS + RMW_PROCS + O_PROCS"))

          per_df = per_df.withColumn("DLQ_LAG", F.lag('DLQ_STATUS', 1).over(windowInc)) \
              .withColumn("DLQ_LAGs12",         F.collect_list('DLQ_STATUS').over(windowIncP12)) \
              .withColumn("DLQ_NEXT12MAX",      F.max('DLQ_STATUS').over(windowIncF12)) \
              .withColumn("DLQ_NEXT24MAX",      F.max('DLQ_STATUS').over(windowIncF24)) \
              .withColumn("ZBCODE_NEXT12",      F.max('ZB_CODE').over(windowIncF12)) \
              .withColumn("ZBCODE_NEXT24",      F.max('ZB_CODE').over(windowIncF24)) \
              .withColumn("MOD_NEXT12",         F.max('MOD_FLAG').over(windowIncF12)) \
              .withColumn("MOD_NEXT24",         F.max('MOD_FLAG').over(windowIncF24)) 

          print(per_df.printSchema())
          
          print("transform: ", showtime(ts))
        
          ts = time.time()
          Mod_DF = per_df.filter(F.col("MOD_FLAG")=='Y').withColumn("rn", F.row_number().over(windowInc)).where((F.col("rn") ==1)) \
                      .select("LOAN_ID", 
                              F.col("ACT_DTE").alias("MOD_DTE"), 
                              F.col("LOAN_AGE").alias("MOD_AGE"), 
                              F.col("DLQ_STATUS").alias("MOD_DLQ"),
                              F.col("DLQ_LAG").alias("MOD_DLQ_LAG"),
                              F.col("DLQ_LAGs12").alias("MOD_DLQ_LAGs12"),
                              F.col("DLQ_NEXT12MAX").alias("MOD_POST_MAXDLQ_12"),
                              F.col("DLQ_NEXT24MAX").alias("MOD_POST_MAXDLQ_24"),
                              F.col("ZBCODE_NEXT12").alias("MOD_POST_ZBCODE_12"),
                              F.col("ZBCODE_NEXT24").alias("MOD_POST_ZBCODE_24")
                              )
          print("Mod DF: ", showtime(ts))
        
          ts = time.time()
          F3Q_DF = per_df.filter(F.col("DLQ_STATUS")>2).withColumn("rn", F.row_number().over(windowInc)).where((F.col("rn") ==1)) \
                     .select("LOAN_ID", 
                              F.col("ACT_DTE").alias("F3Q_DTE"), 
                              F.col("LOAN_AGE").alias("F3Q_AGE"), 
                              F.col("DLQ_LAGs12").alias("F3Q_DLQ_LAGs12"),
                              F.col("DLQ_NEXT12MAX").alias("F3Q_POST_MAXDLQ_12"),
                              F.col("DLQ_NEXT24MAX").alias("F3Q_POST_MAXDLQ_24"),
                              F.col("ZBCODE_NEXT12").alias("F3Q_POST_ZBCODE_12"),
                              F.col("ZBCODE_NEXT24").alias("F3Q_POST_ZBCODE_24")
                              )
          print("F3Q DF: ", showtime(ts))
        
          ts = time.time()
          ZB_DF = per_df.filter(F.col("ZB_CODE").isNotNull()) \
                    .select("LOAN_ID", "ZB_CODE", "ZB_DTE",
                              F.col("LOAN_AGE").alias("ZB_AGE"), 
                              F.col("DLQ_LAGs12").alias("ZB_DLQ_LAGs12"),
                              F.col("DLQ_LAG").alias("ZB_DLQ_LAG"),
                              F.col("LAST_UPB").alias("ZB_LAST_UPB"),
                              "LPI_DTE",
                              "FCC_DTE",
                              "DISP_DTE", "DEFT_COST", "DEFT_PROCS"
                    )
          print("ZB DF: ", showtime(ts))
        
          ts = time.time()

          if self.Loan_Data is None:
             loanLevelDF =  Mod_DF.select("LOAN_ID") \
                     .union(F3Q_DF.select("LOAN_ID")) \
                     .union( ZB_DF.select("LOAN_ID")).distinct()
          
             loanLevelDF = loanLevelDF.join(Mod_DF, "LOAN_ID", how="left")
             loanLevelDF = loanLevelDF.join(F3Q_DF, "LOAN_ID", how="left")
             loanLevelDF = loanLevelDF.join(ZB_DF,  "LOAN_ID",  how="left") 
          else:
             self.Loan_Data = self._Loan_Data.join(Mod_DF, "LOAN_ID", how="left") \
                                         .join(F3Q_DF, "LOAN_ID", how="left") \
                                         .join(ZB_DF,  "LOAN_ID",  how="left").orderBy(["LOAN_ID"])
              
          print("join: ", showtime(ts))
          select_col = [k for k, v in self._PerformanceSchema.items() if v.get("drop", False) == False]
          Performance_Data = per_df.select(select_col)

          #calcualte schd_upb
          age_max = 12  #Performance_Data.agg({"LOAN_AGE": "max"}).first()[0]
          ts = time.time()
          schd_upbData = self.compute_schd_upb(monthCount=age_max, outAsMatrix=False)
          print("compute upb: ", showtime(ts))
         
          ts = time.time()
          self.Performance_Data = Performance_Data.join(schd_upbData, on=["LOAN_ID", "LOAN_AGE"], how="left").orderBy(["LOAN_ID",  "ACT_DTE"])
          print("joining upb: ", showtime(ts))
          ts = time.time()
          self.Performance_Data = self.Performance_Data.filter(F.col("LOAN_AGE")>=0).withColumn("LAST_UPB",  F.expr("case when LAST_UPB is Null then round(SCHD_UPB,3) else LAST_UPB end") )
          self.Performance_Data=  self.Performance_Data.drop("SCHD_UPB")
          print("filtering out: ", showtime(ts))
           
          return None


def _test():
    import doctest
    from pyspark.context import SparkContext
    from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType, StructField
    from pyspark.sql.types import DoubleType, LongType, StringType, DateType
    globs = globals()
    spark = SparkSession.builder.enableHiveSupport().getOrCreate()
    sc = spark.sparkContext
    globs['sc'] = sc
    globs['spark'] = spark
   
  
    myobj = PUBLIC_LOAN_FNMA_spark(acqYYYYQQ = "2000Q1", stageFolder = "/content/drive/My Drive/ML_Data/stock",
                                   acquisition_file = "/content/drive/My Drive/ML_Data/stock/Acquisition_2000Q1.txt",
                                   performance_file = "/content/drive/My Drive/ML_Data/stock/Performance_2000Q1.txt" )
    myobj.read_data_acquisition()
    myobj.read_data_performance()
    myobj.save_as_parquet(resultFolder="/content/drive/My Drive/ML_Data")

    #if (scenario == 0):
    #  myobj.read_data_acquisition("/content/drive/My Drive/ML_Data/stock/Acquisition_2000Q1.txt")
    #  mydata = myobj.Loan_Data
    #  mydata.show(100)
      #acq_df = spark.read.parquet("output/FNMA/LoanAcq.parquet")
      #acq_df.createOrReplaceTempView("acq_df_table")
      #a = spark.sql("select case when ORIG_CHN in ('R', 'C') then 'R1' when ORIG_CHN == 'B' then 'B1' else ORIG_CHN end as K, ORIG_CHN from acq_df_table where OCLTV is not null  ")
      #a = spark.sql("select * from acq_df_table where ACQ = '" + mydata.acqYYYYQQ + "' limit 10")
      #a.show()
      #a.printSchema()
      #a = spark.sql("select * from acq_df_table order by  LOAN_ID, ACQ  limit 10")
      #a.show()
     
      #a = spark.sql("select count(*) from acq_df_table ") #where ACQ = '" + mydata.acqYYYYQQ + "' ")
      #a.toPandas().head(100)
     
    #else:
    #  mydata = myobj.read_data_performance("/content/drive/My Drive/ML_Data/stock/Performance_2000Q1.txt")
    #  mydata.show(100)
      #per_df = spark.read.parquet("output/FNMA/LoanPerformance.parquet")
      #per_df.createOrReplaceTempView("per_df_table")
      #a = spark.sql("select case when ORIG_CHN in ('R', 'C') then 'R1' when ORIG_CHN == 'B' then 'B1' else ORIG_CHN end as K, ORIG_CHN from acq_df_table where OCLTV is not null  ")
      #a = spark.sql("select * from per_df_table  limit 10")
      #a.show()
      #a.printSchema()
    return (myobj)


if __name__ == "__main__":
    acqData = _test()
    acqData.Loan_Data.show(10)
    acqData.Performance_Data.show(100)
    '''
    per_df = spark.read.parquet("output/FNMA/LoanPerformance.parquet")
    wInc =  Window().partitionBy("LOAN_ID").orderBy(col("ACT_DTE"))
    postMod12 = per_df.filter(col("MOD_FLAG")=='Y').withColumn("rn", row_number().over(wInc)).where((col("rn") ==1))
    post3Q12 = per_df.filter(col("DLQ_STATUS")>2).withColumn("rn", row_number().over(wInc)).where((col("rn") ==1))
    postMod12.show(100)
    post3Q12.show(100)
 '''
    #wDesc = Window().partitionBy("LOAN_ID").orderBy(col("ACT_DTE").desc())
    #a = per_df.filter(col("MOD_FLAG")=='Y').withColumn("rn", row_number().over(w)).where((col("rn") <3))
    #per_df.createOrReplaceTempView("per_df_table")
    #a = spark.sql("select  * from per_df_table where DLQ_STATUS is  null")
    #a.show(100)
    #aa = spark.sql("select  * from per_df_table where LOAN_ID in ('254323574864', '254461287100') or DLQ_STATUS='X' ")
    #aa.show(300)
    '''
    1.Find the first 3D
    2.Find the first mod and type
    3.Find the next 12/24 months, MAX DLQ or ZB_CODE
    4.Given ZB_CODE is not null, what is the DLQ_STATUS at the previous month.
    '''
    #w = Window().partitionBy("LOAN_ID").orderBy(col("ACT_DTE").desc())
    #d = per_df.filter().withColumn("rn", row_number().over(w)).where((col("rn") == 1) | (col("rn") == 2)).select("LOAN_ID", "ACT_DTE", "rn")
    #d.show(100)

#w = Window().partitionBy("store_product_id").orderBy(col("time_create").desc())
#(df
#  .withColumn("rn", row_number().over(w))
#  .where(col("rn") == 1)
#  .select("store_product_id", "time_create", "state"))

    #per_df.createOrReplaceTempView("per_df_table")
    #a = spark.sql("select case when ORIG_CHN in ('R', 'C') then 'R1' when ORIG_CHN == 'B' then 'B1' else ORIG_CHN end as K, ORIG_CHN from acq_df_table where OCLTV is not null  ")
    #a = spark.sql("select  * from per_df_table where LPI_DTE is not null limit 100")
    #a.show()
#a = spark.sql("select * from per_df_table where ZB_CODE='09' limit 1000")
#a.show()
#a.printSchema()

In [0]:
acqData.Performance_Data.filter("LOAN_AGE < 0").select("LOAN_ID", "ACT_DTE","LOAN_AGE",  "LAST_UPB", "SCHD_UPB", "DLQ_STATUS").show(2000)
round(15.555533355, 3)

+-------+-------+--------+--------+--------+----------+
|LOAN_ID|ACT_DTE|LOAN_AGE|LAST_UPB|SCHD_UPB|DLQ_STATUS|
+-------+-------+--------+--------+--------+----------+
+-------+-------+--------+--------+--------+----------+



15.556

In [0]:
import pandas as pd
pfile = "output/FNMA/LoanPerformance.parquet"
pfile = "output/FNMA/LL_LoanPerformance.parquet"
ts = time.time()
data = pd.read_parquet(pfile)
#print(data)

#print(data.info())

#mdata = data[data.ZB_AGE  >0]

#mm = data.DLQ_LAGs12.to_numpy()
#print(mm)
#print(type(mm))
#print("transform: " , showtime(ts))
'''
ts = time.time()
per_df = spark.read.parquet(pfile)
per_df.printSchema()
#per_df.show(300)
print("transform: " , showtime(ts))
print(data.info())
my = per_df.select("ZB_DLQ_LAGs12").flat()

my.show(10)
'''


def convert_array_elements_as_dataframe(df, colname, requested_size):
 
    #V = df[colname].apply(lambda x: [] if x== np.NaN else x)
    df[colname].fillna("", inplace=True)
    df[colname + "_OrigSize"] = df[colname].apply(lambda x: len(x))
    V = df[colname].apply(lambda x: (([-1]*(12-len(x))) + list(x)) if   len(x) < 12 else x  )
    V =V.to_numpy().flatten()
    V = np.vstack(V)
    result_df = pd.DataFrame(V)
    result_df["LOAN_ID"] = df["LOAN_ID"]

    df = pd.merge(df, result_df, how="inner", on="LOAN_ID")
    return (df)
    

v = convert_array_elements_as_dataframe(data, "F3Q_DLQ_LAGs12", 12)

R = v.query("F3Q_AGE <12 ")
#data[ ((data["F3Q_AGE"] > 0) & (data["F3Q_AGE"] < 12)) ][["LOAN_ID", "F3Q_DLQ_LAGs12"]]
#mydata = data.loc[ ((data["F3Q_AGE"] > 0) & (data["F3Q_AGE"] < 12)) ,["LOAN_ID", "F3Q_DLQ_LAGs12"]]
#mydata["NumSize"] = mydata["F3Q_DLQ_LAGs12"].apply(lambda x: len(x))

#mydata["F3Q_DLQ_LAGs12"] = mydata["F3Q_DLQ_LAGs12"].apply(lambda x: (([-1]*(12-len(x))) + list(x)) if len(x) < 12 else x  )


#mydata[["F3Q_DLQ_LAGs12"]]



#dd = mydata[["F3Q_DLQ_LAGs12"]].to_numpy().flatten()
#dd = np.vstack(dd)
#print(dd.shape)
#m =[]
#for d in dd:
#   m.append(list(d))

#print(m.flatten())
#mm = np.concatenate(m)
#print("DDDDDDD")
#print(mm)
#print(mm.shape)

#a = mydata.to_numpy().apply(lambda x: pd.DataFrame({"A": x[0], "B": x[1]}), axis=1 ) #["LOAN_ID"], x["F3Q_DLQ_LAGs12"]))

#print(type(a))

#pa=pd.DataFrame({'a':[[1.,4.],  [2.],             [3.,4.,5.]]})
#Q = pa.to_numpy().flatten()
#print(Q)

R

In [0]:
!pip install numpy_financial

import pandas as pd
import numpy as np
import numpy_financial as npf
import time

pfile = "output/FNMA/LoanAcq.parquet"
ts = time.time()

#gapminder.assign(pop_in_millions=lambda x: x['pop']/1e6,
#                pop_in_billions=lambda x: x['pop_in_millions']/1e3).head()

ts = time.time()
data = pd.read_parquet(pfile)
print("read_parquet: " , showtime(ts))
print(data["ORIG_RT"][1:10])

#@timeit2
def test1(data):
    def compute_schd_upb(self, rates, num_months, principals, max_month = None):
        print(f"{principals[0]} : {num_months[0]} : {rates[0]*1200}")
        print(f"{principals[1]} : {num_months[1]} : {rates[1]*1200}")
        #expect acq_data is a pandas dataframe, having fields
        #   LOAN_ID, ORIG_RT, ORIG_AMT, ORIG_TRM
        num_loans = rates.count()
      
        #using numpy array for now
        num_month = num_months.max()

        if max_month is not None:
           num_month = max(1, max_month)

        upb_matrix = np.zeros((num_loans, num_month))
        upb_matrix[:, 0] = principals
        for i in range(1, num_month):
            p_payment = -npf.ppmt(rate =rates , per=i, nper=num_months, pv= principals)
            upb_matrix[:, i] =  upb_matrix[:, i-1] -  p_payment

        return upb_matrix
     
    
    r = compute_schd_upb(0, rates      = data["ORIG_RT"] / 1200,
                            num_months = data["ORIG_TRM"],
                            principals = data["ORIG_AMT"] )
    #print(r[1, 0:185])
    return r.shape

#@timeit2
def test2(data):
   
    def compute_schd_upb(self, rates, num_months, principals, max_month = None):
        
        #print(f"{principals[0]} : {num_months[0]} : {rates[0]*1200}")
        #print(f"{principals[1]} : {num_months[1]} : {rates[1]*1200}")
        #expect acq_data is a pandas dataframe, having fields
        #   LOAN_ID, ORIG_RT, ORIG_AMT, ORIG_TRM
        num_loans = rates.count()
        #using numpy array for now
        num_month = num_months.max()

        if max_month is not None:
           num_month = max(1, max_month)

        upb_matrix = np.zeros((num_loans, num_month))
        upb_matrix[:, 0] = principals
        total_payments = principals * (rates / (1 - (1 + rates) ** (-num_months)))
        for i in range(1, num_month):
            #total_payments = principals * (rates / (1 - (1 + rates) ** (-num_months)))
            p_payment = total_payments - upb_matrix[:, i-1]*rates
            #p_payment = -npf.ppmt(rate =rates , per=i, nper=num_months, pv= principals)
            upb_matrix[:, i] =  upb_matrix[:, i-1] -  p_payment
            #upb_matrix[:, i] =  upb_matrix[:, i-1] * (1+ rates) - total_payments
        return upb_matrix
     
    
    r = compute_schd_upb(0, rates      = data["ORIG_RT"] / 1200,
                            num_months = data["ORIG_TRM"],
                            principals = data["ORIG_AMT"] )
    #print(r[1, 1:20])
    return r.shape

def compute_amortization(principals, monthly_rates, terms,  start_period = 0, end_period = None):
    """
    Compute amortization of loans

    Parameters
    ----------
    principals : scalar or array_like of shape(M, )
        pricipals of loans
    monthly_rates: scalar or array_like if  principals is a scalar
                   or
                   array_like or matrix_like if principals is an array_like
        For FRM, one Rate for each loan
        For ARM, one full time series for each loan
    
    terms:  scalar or array_like of shape(M, )
    loan terms, type should match that of principals

    Returns
    -------
    out : ndarray (M, N)

    """
    num_loans = 1
    if isinstance(principals, pd.Series):
       principals = principals.values
    elif np.ndim(principals) == 0:
       principals = [principals]
    #assume principals is np.array
    num_loans = len(principals)

    if isinstance(monthly_rates, pd.Series):
       monthly_rates = monthly_rates.values
    elif np.ndim(principals) == 0:
       monthly_rates = [monthly_rates]

    if isinstance(terms, pd.Series):
       terms = terms.values
    elif np.ndim(terms) == 0:
       terms = [terms]
    
    num_month = terms.max()
    mini_term = terms.min()
    if end_period is not None:
      num_month = min(num_month, max(1, (end_period - start_period)))
    
    upb_matrix = np.zeros((num_month, num_loans))
    the_payment = principals * (monthly_rates / (1 - (1 + monthly_rates) ** (-terms)))
    
    current_upb = principals
    if start_period > 0:
       current_upb = principals
       for t in range(1, start_period+1):
          pp_payment = the_payment - current_upb * monthly_rates
          current_upb = current_upb - pp_payment
    
    upb_matrix[0, :] = current_upb
      
    i = 1
    for t in range(start_period+1, start_period +num_month ):
       isbeyondTerm = np.greater(t , terms)
       pp_payment = the_payment - upb_matrix[i-1, :] * monthly_rates
       upb_matrix[i, :] = (upb_matrix[i-1, :] - pp_payment) 
       if t >= mini_term:
          iswithinTerm = np.greater( terms, t).astype(int)
          isbeyondTerm = np.greater(t , terms).astype(int)
          upb_matrix[i, :] = upb_matrix[i, :] * np.array(iswithinTerm) + (-999) * isbeyondTerm
       i = i +  1
    
    ##post processing
    #1, The UPB beyond its term is set to be -999
    #i = 1
    #for t in range(start_period+1, start_period +num_month ):
    #   isbeyondTerm = np.greater(t , terms)
    #   upb_matrix[i, :] = upb_matrix[i, :] + (-999) * np.array(isbeyondTerm)
    #   i = i +  1

    return upb_matrix.T


%timeit cal_amortization(principals    = data["ORIG_AMT"], \
                     monthly_rates = data["ORIG_RT"] / 1200, \
                     terms         = data["ORIG_TRM"])

for i, x in enumerate(r[1, :]):
  print(i, "  ", x)
#%timeit test2(data)
'''
 174         3,556.73     521.65     498.31      23.34       3,058.42
 175         3,058.42     521.65     501.58      20.07       2,556.84
 176         2,556.84     521.65     504.87      16.78       2,051.97
 177         2,051.97     521.65     508.18      13.47       1,543.79
 178         1,543.79     521.65     511.52      10.13       1,032.27
 179         1,032.27     521.65     514.88       6.77         517.39
 180           517.39     521.65     518.25       3.40          -0.86
       
  1         55,000.00     521.65     160.71     360.94      54,839.29
  2         54,839.29     521.65     161.77     359.88      54,677.52
  3         54,677.52     521.65     162.83     358.82      54,514.69
  4         54,514.69     521.65     163.90     357.75      54,350.79
  5      
'''

In [0]:
np.greater(4,[5,2]) * -999

array([   0, -999])

In [0]:
from collections.abc import Sequence 
A =[pd.Series([1,2,3]),pd.Series([1,2,3])] 
K = isinstance(A, np.ndarray)

B = np.array(A)
K = isinstance(B, np.ndarray)
K = isinstance(pd.Series([1,2,3]).values, np.ndarray)
print(K)


True


**package: LoanPerformance**

1. Processing public loan performance (from FNMA and FRED)

2. Doing Amortization. (UPB)
 
 2.1 FRM

 2.2 ARM (flexible rate)
 https://files.consumerfinance.gov/f/201204_CFPB_ARMs-brochure.pdf

 * initial rate and payment
 * The adjustment period
 * The index
 * The margin
 * Interest-Rate Caps
 * Payment Caps

 * Type of ARMS
  * Hybrid
  * Interest-only
  * Payment-option
  * stepped
  



3. Rate incentive if IR is provided

4. MTMLTV is HP is provided

5. Predicting Prepayment, Default, DQL12.




In [0]:
from __future__ import division, absolute_import, print_function
r = 0.06/12
n =240
s = 150000
p = r*s*(1+r)**(n) /(1-(1+r)**(n))
p


from decimal import *

def amortization_table(principal, rate, term):
    ''' Prints the amortization table for a loan.

    Prints the amortization table for a loan given
    the principal, the interest rate (as an APR), and
    the term (in months).'''

    payment = pmt(principal, rate, term)
    begBal = principal

    # Print headers
    print ('Pmt no'.rjust(6), ' ', 'Beg. bal.'.ljust(13), ' ',  end = '')
    print ('Payment'.ljust(9), ' ', 'Principal'.ljust(9), ' ',  end = '')
    print ('Interest'.ljust(9), ' ', 'End. bal.'.ljust(13),  end = '')
    print (''.rjust(6, '-'), ' ', ''.ljust(13, '-'), ' ',  end = '')
    print (''.rjust(9, '-'), ' ', ''.ljust(9, '-'), ' ',  end = '')
    print (''.rjust(9, '-'), ' ', ''.ljust(13, '-'), ' ')
    # Print data
    for num in range(1, term + 1):
        
        interest = round(begBal * (rate / (12 * 100.0)), 2)
        applied = (payment - interest)
        endBal = (begBal - applied)
        
        print (str(num).center(6), ' ',  end = '')
        print ('{0:,.2f}'.format(begBal).rjust(13), ' ',  end = ''),
        print ('{0:,.2f}'.format(payment).rjust(9), ' ', end = ''),
        print ('{0:,.2f}'.format(applied).rjust(9), ' ',  end = ''),
        print ('{0:,.2f}'.format(interest).rjust(9), ' ', end = ''),
        print ('{0:,.2f}'.format(endBal).rjust(13))

        begBal = endBal
    
def pmt(principal, rate, term):
    '''Calculates the payment on a loan.

    Returns the payment amount on a loan given
    the principal, the interest rate (as an APR),
    and the term (in months).'''
    
    ratePerTwelve = rate / (12 * 100.0)
    
    result = principal * (ratePerTwelve / (1 - (1 + ratePerTwelve) ** (-term)))

    # Convert to decimal and round off to two decimal
    # places.
    result = (result)
    #result = round(result, 2)
    return result

amortization_table(55000, 7.875, 180)
