### Resources Used

- https://docs.rapids.ai/api/cudf/stable/10min.html
- https://www.dataquest.io/blog/data-science-portfolio-machine-learning/
- https://docs.dask.org/en/latest/dataframe-best-practices.html
- https://docs.dask.org/en/latest/setup/single-distributed.html#localcluster
- https://distributed.dask.org/en/latest/memory.html
- Dataset: https://docs.rapids.ai/datasets/mortgage-data
- https://dask-cuda.readthedocs.io/en/latest/specializations.html
- https://docs.blazingdb.com/docs/using-blazingsql
- #!conda create -n rapids-0.17 -c rapidsai -c nvidia -c conda-forge -c defaults rapids-blazing=0.17 python=3.7 cudatoolkit=11.0 matplotlib=3.3.3 gcsfs=0.7.1


## Create conda enviornment with the following libraries
```
conda create -n rapids-0.17 -c rapidsai -c nvidia -c conda-forge -c defaults rapids-blazing=0.17 python=3.7 cudatoolkit=11.0 matplotlib=3.3.3 gcsfs=0.7.1
```


## Check Environment

In [None]:
%%bash
nvidia-smi
nvcc --version

In [None]:
import numpy as np; print('numpy Version:', np.__version__)
import pandas as pd; print('pandas Version:', pd.__version__)
import xgboost as xgb; print('XGBoost Version:', xgb.__version__)
import cudf; print('cudf Version:', cudf.__version__)
import cuml; print('cudf Version:', cuml.__version__)
import gcsfs; print('gcsfs Version:', gcsfs.__version__)
import time
import dask_cudf; print('dask_cudf Version:', gcsfs.__version__)
import dask; print('dask Version:', gcsfs.__version__)
import dask.dataframe as dask_df
import glob;
import matplotlib; print('matplotlib Version:', matplotlib.__version__)
from dask.diagnostics import ProgressBar
from dask.distributed import Client, progress, wait


## Set up Dask Cluster

In [None]:
import blazingsql
import dask_cudf
from dask.distributed import Client
from dask_cuda import LocalCUDACluster

# num_workers=2
# processes=True
# threads_per_worker=4
# cluster = LocalCUDACluster(n_workers=num_workers, processes=processes, threads_per_worker=threads_per_worker)
cluster = LocalCUDACluster()
client = Client(cluster)
bc = blazingsql.BlazingContext(dask_client=client, network_interface='lo')
client

In [None]:
# client.restart()
# client

## Define Data Schema & Data Types

In [None]:
col_acq_names = ['LoanID','Channel','SellerName','OrInterestRate','OrUnpaidPrinc','OrLoanTerm',
        'OrDate','FirstPayment','OrLTV','OrCLTV','NumBorrow','DTIRat','CreditScore',
        'FTHomeBuyer','LoanPurpose','PropertyType','NumUnits','OccStatus','PropertyState',
        'Zip','MortInsPerc','ProductType','CoCreditScore','MortInsType','RelMortInd']

col_per_names = ['LoanID','MonthRep','Servicer','CurrInterestRate','CAUPB','LoanAge','MonthsToMaturity',
          'AdMonthsToMaturity','MaturityDate','MSA','CLDS','ModFlag','ZeroBalCode','ZeroBalDate',
          'LastInstallDate','ForeclosureDate','DispositionDate','PPRC','AssetRecCost','MHRC',
          'ATFHP','NetSaleProceeds','CreditEnhProceeds','RPMWP','OFP','NIBUPB','PFUPB','RMWPF',
          'FPWA','ServicingIndicator']

col_acq = ['LoanID','OrDate','Channel','SellerName','PropertyType','NumUnits','PropertyState']

col_per = ['LoanID','MonthRep','Servicer','ZeroBalCode','CLDS','ForeclosureDate']


parse_dates_acq =['OrDate','FirstPayment']
parse_dates_per =['MonthRep','MaturityDate','ZeroBalDate','LastInstallDate','ForeclosureDate','DispositionDate']

dtype_acq={ "LoanID":"int","Channel":"str","SellerName":"str","OrInterestRate":"float","OrUnpaidPrinc":"float","OrLoanTerm":"float","OrDate":"str",
   "FirstPayment":"str","OrLTV":"float","OrCLTV":"float",  "NumBorrow":"float", "DTIRat":"float", "CreditScore":"float", "FTHomeBuyer":"str",
   "LoanPurpose":"str", "PropertyType":"str", "NumUnits":"float", "OccStatus":"str",  "PropertyState":"str",  "Zip":"int", "MortInsPerc":"float",
   "ProductType":"str", "CoCreditScore":"float", "MortInsType":"float", "RelMortInd":"str"}

dtype_per={"LoanID":"int","MonthRep":"str","Servicer":"str", "CurrInterestRate":"float", "CAUPB":"float", "LoanAge":"float","MonthsToMaturity":"float",
   "AdMonthsToMaturity":"float", "MaturityDate":"str", "MSA":"float", "CLDS":"float", "ModFlag":"str", "ZeroBalCode":"float", "ZeroBalDate":"str",
    "LastInstallDate":"str",  "ForeclosureDate":"str", "DispositionDate":"str", "PPRC":"float", "AssetRecCost":"float", "MHRC":"float", "ATFHP":"float",
    "NetSaleProceeds":"float", "CreditEnhProceeds":"float","RPMWP":"float","OFP":"float","NIBUPB":"float", "PFUPB":"float", "RMWPF":"float",
   "FPWA":"str", "ServicingIndicator":"str"
}

## Test on small sample

In [None]:
# sample_acq_fnames='gs://mchrestkha-github-ml-examples/fannie_mae_loans/acq/Acquisition_2016Q1.txt'
# sample_perf_fnames='gs://mchrestkha-github-ml-examples/fannie_mae_loans/perf/Performance_2016Q1.txt'

# sample_df_acq = pd.read_csv(sample_acq_fnames, sep='|',  index_col=False, nrows=100)
# sample_df_per = pd.read_csv(sample_perf_fnames, sep='|', index_col=False, nrows=100)

# sample_df_acq = pd.read_csv(sample_acq_fnames, sep='|', names=col_acq_names, index_col=False, dtype=dtype_acq, parse_dates=parse_dates_acq, nrows=100)
# sample_df_per = pd.read_csv(sample_perf_fnames, sep='|', names=col_per_names, index_col=False, dtype=dtype_per, parse_dates=parse_dates_per, nrows=100)


In [None]:
#parquet_out='gs://mchrestkha-github-ml-examples/fannie_mae_loans/perf/parquet/'
#pdf_per.to_parquet(parquet_out,write_index=False)

## Run on full population

### Data Ingestion

In [None]:
csv_acq_fnames='gs://mchrestkha-github-ml-examples/fannie_mae_loans/acq/Acquisition_20*'
csv_perf_fnames='gs://mchrestkha-github-ml-examples/fannie_mae_loans/perf/Performance_20*'
csv_perf_fnames='gs://mchrestkha-github-ml-examples/fannie_mae_loans/perf/parquet/*'


# !gsutil du -sh 'gs://mchrestkha-github-ml-examples/fannie_mae_loans/acq/'
# !gsutil du -sh 'gs://mchrestkha-github-ml-examples/fannie_mae_loans/perf/'

In [None]:
%time df_acq = dask_cudf.read_csv(csv_acq_fnames, sep='|', names=col_acq_names, dtype=dtype_acq, columns=col_acq, parse_dates=parse_dates_acq)
%time df_per = dask_cudf.read_csv(csv_perf_fnames, sep='|', names=col_per_names, dtype=dtype_per, columns= col_per, parse_dates=parse_dates_per)
#%time df_per = dask_cudf.read_parquet(parquet_perf_fnames, sep='|', names=col_per, columns= col_per, dtype=dtype_per, parse_dates=parse_dates_per)

In [None]:
#print("Required Memory for df_acq:",df_acq.memory_usage().sum().compute()/(1024*1024*1024), 'GB')
#print("Required Memory for df_per:",df_per.memory_usage().sum().compute()/(1024*1024*1024), 'GB')

## Data Quality Check against Summary Statistics 
- Data Dictionary: https://loanperformancedata.fanniemae.com/lppub-docs/FNMA_SF_Loan_Performance_Glossary.pdf
- Sumary Statistics: https://loanperformancedata.fanniemae.com/lppub-docs/FNMA_SF_Loan_Performance_Stat_Summary_Primary.pdf
- Sample Data: https://docs.google.com/spreadsheets/d/1nCtusAE2naZlWHFKGRsQTxxusjfZYiBLdd5SF5AEGMA/edit



## Data Profiling & Exploration

#### ID
- LoanID

#### Dimensions
- MonthRep --> Monthly Reporting Period MMYYYY
- Channel ---> R= Retail, C=Correspondent, B=Broker 
- Servicer
- CLDS = Current Loan Delinquency Status 
- PropertyState
- ForeclosureDate
- CAUPB --> Current Actual Unpaid Balance

In [None]:
test=df_acq.persist()
progress(test)

In [None]:
df_ZeroBalCode = df_per.groupby('ZeroBalCode')['LoanID'].nunique()
df_ZeroBalCode = df_ZeroBalCode.compute()

In [None]:
df_CLDS = df_per.groupby('CLDS')['LoanID'].nunique().persist()
progress(df_CLDS)

In [None]:
df_ForeclosureDate = df_per.groupby('ForeclosureDate')['LoanID'].nunique().persist()
progress(df_ForeclosureDate)

In [None]:
print(df_ZeroBalCode.compute())
print(df_CLDS.compute())
print(df_CLDS.compute())

In [None]:
print(type(df_per))
print(df_per.npartitions)
#df_per=df_per.repartition(npartitions=10)
#print(df_per.npartitions)
print(df_per.shape)

In [None]:
df_per_summary = df_per.groupby('MonthRep',as_index=False).agg({'CAUPB': 'sum','LoanID': 'count'}).persist()
progress(df_per_summary)

In [None]:
df_per_summary=df_per_summary.compute()

In [None]:
df_per_summary

In [None]:
# test=df_per_summary.to_pandas().reset_index()
#test
#test['MonthRep']=pd.to_datetime(test['MonthRep'])
test.sort_values(by=['MonthRep'])
test.sort_values(by=['MonthRep']).plot.line(x='MonthRep', y='CAUPB')

In [None]:
df_per_latest=df_per.drop_duplicates(subset='LoanID', keep='last', inplace=False).persist()
progress(df_per_latest)



In [None]:
df_acq=df_acq.compute()
df_per_latest=df_per_latest.compute()
print(df_per_latest.shape)
print(df_acq.shape)

In [None]:
#joined=df_acq.merge(df_per_latest,on=['LoanID'],how='left')
joined['OrYr']=joined['OrDate'].str[-4:]
joined['CAUPB $M']=joined['CAUPB']/1000000
joined['OrUnpaidPrinc $M']=joined['OrUnpaidPrinc']/1000000
#joined['Active']=np.where(joined['MonthsToMaturity']>0,'Active','Not Active')
joined['Active']='Active'
joined['Active'].where(joined['MonthsToMaturity']>0,'Not Active')

In [None]:
joined[joined['Active']=='Active']

In [None]:
joined_summary = joined.groupby('OrYr').agg({'OrUnpaidPrinc $M': 'sum','LoanID': 'count'})
# df_per_latest_summary.compute().to_pandas().sort_values(by=['MonthRep'])

In [None]:
active loands by year-month

In [None]:
joined_summary.to_pandas().plot.line()

In [None]:
#del df_per_latest
del df_acq

In [None]:
joined[joined['OrYr']=='1999']

In [None]:
joined_summary[]

In [None]:
by year-month: # of defaults / active loands = default rates

## Appendix

In [None]:
rows=df_per.shape[0].persist() # start computation in the background
progress(rows)      # watch progress (takes ~3 min)

In [None]:
#rows=rows.compute()
cols=df_per.shape[1]
#print(rows,',', cols) 

In [None]:
df_memory=.persist()
progress(df_memory)