# Mortgage Risk

The example trains a model to perform home loan risk assessment using all of the loan data for the years 2000 to 2016 in the Fannie Mae loan performance dataset, consisting of roughly 400GB of data in memory.


<a id='contents'></a>
## Contents
__[Initialization](#initialize)__<br>
__[Pandas Dask DataFrame](#pandas_dataframe)__<br>
>__[Model Training (Multi-core)](#dask_pandas_model_training)__<br>

__[cuDF Dask DataFrame](#cudf_dataframe)__<br>
>__[Model Training (Multi-GPU)](#dask_cudf_model_training)__<br>


<a id='initialize'></a>
## Initialize

The mortgage dataset used for this demo: __[Fannie Mae Loan Dataset](http://www.fanniemae.com/portal/funding-the-market/data/loan-performance-data.html)__


You can use these commands in a bash shell to download and decompress the 1TB dataset into the data folder.

In [None]:
!nvidia-smi
#!nproc

In [None]:
import numpy as np
import numba
import numba.cuda as cuda
import pandas as pd
import time
import xgboost as xgb
import cudf
import gc

#Dask 
import dask_xgboost as dxgb
import dask
import dask_cudf
from dask.delayed import delayed
from dask.distributed import Client, wait

In [None]:
START_YEAR = 2000
END_YEAR = 2002 # end_year is not inclusive

ACQ_DATA_PATH = "/rapids/data/mortgage/acq"
PERF_DATA_PATH = "/rapids/data/mortgage/perf"
COL_NAMES_PATH = "/rapids/data/mortgage/names.csv"

PERCENT_TRAIN = 0.8
NUM_ROUNDS = 100

DASK_UTILS_PATH = "/rapids/notebooks/utils"
#The row stride to reduce the dataset for boosting, and increase the total number of datasets by part_count.
#All the rows in the dataset are processed, just in a smaller or larger chunks.
#1: Every row: 100% of the data
#2: Every 2nd row, 50% of the data
#3: Every 3rd row, 33% of the data
#etc
PART_COUNT = 2 


In [None]:
import subprocess
import numba.cuda as cuda
import multiprocessing

#Initialize variables
_is_dask_initialized = False
_dask_client = None

def dask_initialize(nworkers=-1, ngpus=-1):
    global _is_dask_initialized
    global _dask_client
    
    if _is_dask_initialized == True:
        return _dask_client
    
    if (nworkers==-1):
        nworkers=multiprocessing.cpu_count()
    
    if (ngpus==-1):
        ngpus=len(cuda.gpus)
    
    #read in data files using 1 dask worker per gpu for multi-gpu processing
    if (ngpus > 0):
        nworkers=ngpus
        
    #Assume all node have the same configuration
    IPADDR = subprocess.check_output(['hostname','--all-ip-addresses'])
    IPADDR = IPADDR.decode('UTF-8').split()[0]

    bash_command_1 = "{0}/dask-setup.sh rapids 0,0".format(DASK_UTILS_PATH)
    bash_command_2 = "{0}/dask-setup.sh rapids {1},{2} 8786 8787 8790 {3} MASTER".format(DASK_UTILS_PATH, nworkers, ngpus, IPADDR)

    print(bash_command_1)
    output = subprocess.check_output(['bash','-c', bash_command_1])
    print(bash_command_2)
    output = subprocess.check_output(['bash','-c', bash_command_2])
    
    ipaddr_client = IPADDR + str(":8786")
    
    _dask_client = dask.distributed.Client(ipaddr_client)
    _is_dask_initialized = True
    return _dask_client

def dask_release():
    global _is_dask_initialized
    global _dask_client
    
    bash_command_1 = "{0}/dask-setup.sh rapids 0,0".format(DASK_UTILS_PATH)
    print(bash_command_1)
    output = subprocess.check_output(['bash','-c', bash_command_1])
    
    _is_dask_initialized = False
    del(_dask_client)
    _dask_client = None

def dask_run_task(func, **kwargs):
    task = func(**kwargs)
    return task


In [None]:
def rmm_initialize():
    from librmm_cffi import librmm_config as rmm_cfg

    rmm_cfg.use_pool_allocator = True
    #rmm_cfg.initial_pool_size = 2<<30 # set to 2GiB. Default is 1/2 total GPU memory
    import cudf
    return cudf._gdf.rmm_initialize()

def rmm_initialize_nopool():
    from librmm_cffi import librmm_config as rmm_cfg

    rmm_cfg.use_pool_allocator = False
    #rmm_cfg.initial_pool_size = 2<<30 # set to 2GiB. Default is 1/2 total GPU memory
    import cudf
    return cudf._gdf.rmm_initialize()

def rmm_release():
    return cudf._gdf.rmm_finalize

<a id='pandas_dataframe'></a>
## Pandas Dask Dataframe

### Data Preparation

In [None]:
from collections import OrderedDict

def load_performance_csv(performance_path, **kwargs):
    """ Loads performance data

    Returns
    -------
    GPU DataFrame
    """
    
    cols = [
        "loan_id", "monthly_reporting_period", "servicer", "interest_rate", "current_actual_upb",
        "loan_age", "remaining_months_to_legal_maturity", "adj_remaining_months_to_maturity",
        "maturity_date", "msa", "current_loan_delinquency_status", "mod_flag", "zero_balance_code",
        "zero_balance_effective_date", "last_paid_installment_date", "foreclosed_after",
        "disposition_date", "foreclosure_costs", "prop_preservation_and_repair_costs",
        "asset_recovery_costs", "misc_holding_expenses", "holding_taxes", "net_sale_proceeds",
        "credit_enhancement_proceeds", "repurchase_make_whole_proceeds", "other_foreclosure_proceeds",
        "non_interest_bearing_upb", "principal_forgiveness_upb", "repurchase_make_whole_proceeds_flag",
        "foreclosure_principal_write_off_amount", "servicing_activity_indicator"
    ]
    
    #using categoricals to replace the string with a hashed int32
    dtypes = OrderedDict([
        ("loan_id", "int64"),
        ("monthly_reporting_period", "date"),
        ("servicer", "category"),
        ("interest_rate", "float64"),
        ("current_actual_upb", "float64"),
        ("loan_age", "float64"),
        ("remaining_months_to_legal_maturity", "float64"),
        ("adj_remaining_months_to_maturity", "float64"),
        ("maturity_date", "date"),
        ("msa", "float64"),
        ("current_loan_delinquency_status", "int32"),
        ("mod_flag", "category"),
        ("zero_balance_code", "category"),
        ("zero_balance_effective_date", "date"),
        ("last_paid_installment_date", "date"),
        ("foreclosed_after", "date"),
        ("disposition_date", "date"),
        ("foreclosure_costs", "float64"),
        ("prop_preservation_and_repair_costs", "float64"),
        ("asset_recovery_costs", "float64"),
        ("misc_holding_expenses", "float64"),
        ("holding_taxes", "float64"),
        ("net_sale_proceeds", "float64"),
        ("credit_enhancement_proceeds", "float64"),
        ("repurchase_make_whole_proceeds", "float64"),
        ("other_foreclosure_proceeds", "float64"),
        ("non_interest_bearing_upb", "float64"),
        ("principal_forgiveness_upb", "float64"),
        ("repurchase_make_whole_proceeds_flag", "category"),
        ("foreclosure_principal_write_off_amount", "float64"),
        ("servicing_activity_indicator", "category")
    ])

    df = pd.read_csv(performance_path, names=cols, delimiter='|', skiprows=1)
    
    #dtype was not pass into read_csv, so convert/create categorical fields 
    for c in dtypes: 
        if (dtypes[c] == "category"):
            df[c] = pd.Categorical(df[c]).codes          
            
    return df  

def load_acquisition_csv(acquisition_path, **kwargs):
    """ Loads acquisition data

    Returns
    -------
    GPU DataFrame
    """
    
    cols = [
        'loan_id', 'orig_channel', 'seller_name', 'orig_interest_rate', 'orig_upb', 'orig_loan_term', 
        'orig_date', 'first_pay_date', 'orig_ltv', 'orig_cltv', 'num_borrowers', 'dti', 'borrower_credit_score', 
        'first_home_buyer', 'loan_purpose', 'property_type', 'num_units', 'occupancy_status', 'property_state',
        'zip', 'mortgage_insurance_percent', 'product_type', 'coborrow_credit_score', 'mortgage_insurance_type', 
        'relocation_mortgage_indicator'
    ]
 
    dtypes = OrderedDict([
        ("loan_id", "int64"),
        ("orig_channel", "category"),
        ("seller_name", "category"),
        ("orig_interest_rate", "float64"),
        ("orig_upb", "int64"),
        ("orig_loan_term", "int64"),
        ("orig_date", "date"),
        ("first_pay_date", "date"),
        ("orig_ltv", "float64"),
        ("orig_cltv", "float64"),
        ("num_borrowers", "float64"),
        ("dti", "float64"),
        ("borrower_credit_score", "float64"),
        ("first_home_buyer", "category"),
        ("loan_purpose", "category"),
        ("property_type", "category"),
        ("num_units", "int64"),
        ("occupancy_status", "category"),
        ("property_state", "category"),
        ("zip", "int64"),
        ("mortgage_insurance_percent", "float64"),
        ("product_type", "category"),
        ("coborrow_credit_score", "float64"),
        ("mortgage_insurance_type", "float64"),
        ("relocation_mortgage_indicator", "category")
    ])
    
    #dtype was not pass into read_csv, so convert/create categorical fields 
    df = pd.read_csv(acquisition_path, names=cols, delimiter='|', skiprows=1)
    
    for c in dtypes: 
        if (dtypes[c] == "category"):
            df[c] = pd.Categorical(df[c]).codes          
    return df      

def load_names(col_names_path, **kwargs):
    """ Loads names used for renaming the banks
    
    Returns
    -------
    GPU DataFrame
    """

    cols = [
        'seller_name', 'new'
    ]

    #using categoricals to replace the string with a hashed int32
    dtypes = OrderedDict([
        ("seller_name", "category"),
        ("new", "category"),
    ])

    #dtype was not pass into read_csv, so convert/create categorical fields 
    df = pd.read_csv(col_names_path, names=cols, delimiter='|', skiprows=1)
    
    for c in dtypes: 
        if (dtypes[c] == "category"):
            df[c] = pd.Categorical(df[c]).codes          
       
    return df

ETL and Feature Engineering Functions

In [None]:
pd.options.mode.chained_assignment = None  # default='warn'

def null_workaround(df, **kwargs):
    for column, data_type in df.dtypes.items():
        if str(data_type) == "category":
            df[column] = df[column].codes.astype('int32').fillna(-1)

        if str(data_type) in ['int8', 'int16', 'int32', 'int64', 'float32', 'float64']:
            df[column].fillna(-1, inplace=True)
    return df

def create_12_mon_features(joined_df, **kwargs):
    result_dfs = []
    n_months = 12
    for y in range(1, n_months + 1):
        temp_df = joined_df[['loan_id', 'timestamp_year', 'timestamp_month', 'delinquency_12', 'upb_12']]
        temp_df['temp_months'] = temp_df['timestamp_year'] * 12 + temp_df['timestamp_month']
        temp_df['temp_mody_n'] = np.floor((temp_df['temp_months'].astype('float64') - 24000 - y) / 12)
        temp_df = temp_df.groupby(['loan_id', 'temp_mody_n']).agg({'delinquency_12': 'max','upb_12': 'min'}).reset_index()
        temp_df.rename(columns={'delinquency_12':'max_delinquency_12','upb_12':'min_upb_12'}, inplace=True)
        
        temp_df['delinquency_12'] = (temp_df['max_delinquency_12']>3).astype('int32')
        temp_df['delinquency_12'] +=(temp_df['min_upb_12']==0).astype('int32')
        temp_df.drop('max_delinquency_12', axis=1, inplace=True)
        temp_df['upb_12'] = temp_df['min_upb_12']
        temp_df.drop('min_upb_12', axis=1, inplace=True)
        temp_df['timestamp_year'] = np.floor(((temp_df['temp_mody_n'] * n_months) + 24000 + (y - 1)) / 12).astype('int16')
        temp_df['timestamp_month'] = np.int8(y)
        temp_df.drop('temp_mody_n', axis=1, inplace=True)
        
        result_dfs.append(temp_df)
        del(temp_df)
        
    del(joined_df)
    return pd.concat(result_dfs)

def create_ever_features(df, **kwargs):
    ever_df = df[['loan_id', 'current_loan_delinquency_status']]
    ever_df = ever_df.groupby('loan_id').max().reset_index()
    ever_df.rename(columns={'current_loan_delinquency_status':'max_current_loan_delinquency_status'}, inplace=True)
    del(df)
    
    ever_df['ever_30'] = (ever_df['max_current_loan_delinquency_status'] >= 1).astype('int8')
    ever_df['ever_90'] = (ever_df['max_current_loan_delinquency_status'] >= 3).astype('int8')
    ever_df['ever_180'] = (ever_df['max_current_loan_delinquency_status'] >= 6).astype('int8')
    ever_df.drop('max_current_loan_delinquency_status', axis=1, inplace=True, errors = 'ignore')
    return ever_df

def create_delinq_features(df, **kwargs):
    delinq_df = df[['loan_id', 'monthly_reporting_period', 'current_loan_delinquency_status']]
    del(df)
    
    delinq_30 = delinq_df.query('current_loan_delinquency_status >= 1')[['loan_id', 'monthly_reporting_period']].groupby('loan_id').min().reset_index()
    delinq_30.rename(columns={'monthly_reporting_period':'min_monthly_reporting_period'}, inplace=True)
    delinq_30['delinquency_30'] = delinq_30['min_monthly_reporting_period']
    delinq_30.drop('min_monthly_reporting_period', axis=1, inplace=True)
    
    delinq_90 = delinq_df.query('current_loan_delinquency_status >= 3')[['loan_id', 'monthly_reporting_period']].groupby('loan_id').min().reset_index()
    delinq_90.rename(columns={'monthly_reporting_period':'min_monthly_reporting_period'}, inplace=True)
    delinq_90['delinquency_90'] = delinq_90['min_monthly_reporting_period']
    delinq_90.drop('min_monthly_reporting_period', axis=1, inplace=True)
    
    delinq_180 = delinq_df.query('current_loan_delinquency_status >= 6')[['loan_id', 'monthly_reporting_period']].groupby('loan_id').min().reset_index()
    delinq_180.rename(columns={'monthly_reporting_period':'min_monthly_reporting_period'}, inplace=True)
    delinq_180['delinquency_180'] = delinq_180['min_monthly_reporting_period']
    delinq_180.drop('min_monthly_reporting_period', axis=1, inplace=True)
    del(delinq_df)
    
    delinq_merge_df = delinq_30.merge(delinq_90, how='left', on=['loan_id'])
    delinq_merge_df['delinquency_90'] = delinq_merge_df['delinquency_90'].fillna(np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]'))
    delinq_merge_df = delinq_merge_df.merge(delinq_180, how='left', on=['loan_id'])
    delinq_merge_df['delinquency_180'] = delinq_merge_df['delinquency_180'].fillna(np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]'))
    del(delinq_30)
    del(delinq_90)
    del(delinq_180)
    return delinq_merge_df

def join_ever_delinq_features(df, delinq_df, **kwargs):
    ever_df = df.merge(delinq_df, on=['loan_id'], how='left')
    del(df)
    del(delinq_df)
    
    ever_df['delinquency_30'] = ever_df['delinquency_30'].fillna(np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]'))
    ever_df['delinquency_90'] = ever_df['delinquency_90'].fillna(np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]'))
    ever_df['delinquency_180'] = ever_df['delinquency_180'].fillna(np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]'))
    return ever_df

def create_joined_df(df, ever_df, **kwargs):
    test = df[['loan_id', 'monthly_reporting_period', 'current_loan_delinquency_status', 'current_actual_upb']]
    del(df)
    
    test['timestamp'] = pd.to_datetime(test['monthly_reporting_period'], infer_datetime_format=True)
    test.drop('monthly_reporting_period', axis=1, inplace=True)
    test['timestamp_month'] = test['timestamp'].dt.month
    test['timestamp_year'] = test['timestamp'].dt.year
    test['delinquency_12'] = test['current_loan_delinquency_status']
    test.drop('current_loan_delinquency_status', axis=1, inplace=True)
    test['upb_12'] = test['current_actual_upb']
    test.drop('current_actual_upb', axis=1, inplace=True)
    test['upb_12'].fillna(999999999, inplace=True)
    test['delinquency_12'].fillna(-1, inplace=True)
    
    joined_df = test.merge(ever_df, how='left', on=['loan_id'])
    del(ever_df)
    del(test)
    
    joined_df['ever_30'].fillna(-1, inplace=True)
    joined_df['ever_90'].fillna(-1, inplace=True)
    joined_df['ever_180'].fillna(-1, inplace=True)
    joined_df['delinquency_30'].fillna(-1, inplace=True)
    joined_df['delinquency_90'].fillna(-1, inplace=True)
    joined_df['delinquency_180'].fillna(-1, inplace=True)
    
    joined_df['timestamp_year'] = joined_df['timestamp_year'].astype('int32')
    joined_df['timestamp_month'] = joined_df['timestamp_month'].astype('int32')
    
    return joined_df

def combine_joined_12_mon(joined_df, testdf, **kwargs):
    joined_df.drop('delinquency_12', axis=1, inplace=True)
    joined_df.drop('upb_12', axis=1, inplace=True)
    joined_df['timestamp_year'] = joined_df['timestamp_year'].astype('int16')
    joined_df['timestamp_month'] = joined_df['timestamp_month'].astype('int8')
    return joined_df.merge(testdf, how='left', on=['loan_id', 'timestamp_year', 'timestamp_month'])

def final_performance_delinquency(df, joined_df, **kwargs):
    merged_df = null_workaround(df)
    joined_df = null_workaround(joined_df)
    merged_df['timestamp_month'] = pd.to_datetime(merged_df['monthly_reporting_period'], infer_datetime_format=True).dt.month
    merged_df['timestamp_month'] = merged_df['timestamp_month'].astype('int8')
    merged_df['timestamp_year'] = pd.to_datetime(merged_df['monthly_reporting_period'], infer_datetime_format=True).dt.year
    merged_df['timestamp_year'] = merged_df['timestamp_year'].astype('int16')
    merged_df = merged_df.merge(joined_df, how='left', on=['loan_id', 'timestamp_year', 'timestamp_month'])
    merged_df.drop('timestamp_year', axis=1, inplace=True)
    merged_df.drop('timestamp_month', axis=1, inplace=True)
    return merged_df

def join_perf_acq_gdfs(perf_df, acq_df, **kwargs):
    perf_df = null_workaround(perf_df)
    acq_df = null_workaround(acq_df)
    return perf_df.merge(acq_df, how='left', on=['loan_id'])

def last_mile_cleaning(df, **kwargs):
    drop_list = [
        'loan_id', 'orig_date', 'first_pay_date', 'seller_name',
        'monthly_reporting_period', 'last_paid_installment_date', 'maturity_date', 'ever_30', 'ever_90', 'ever_180',
        'delinquency_30', 'delinquency_90', 'delinquency_180', 'upb_12',
        'zero_balance_effective_date','foreclosed_after', 'disposition_date','timestamp'
    ]
    #for column in drop_list:
    df.drop(drop_list, axis=1, inplace=True)
    
    for col, dtype in df.dtypes.iteritems():
        if str(dtype)=='category':
            df[col] = df[col].cat.codes
        df[col] = df[col].astype('float32')
    df['delinquency_12'] = df['delinquency_12'] > 0
    df['delinquency_12'] = df['delinquency_12'].fillna(False).astype('int32')
    return df

In [None]:
import glob
from glob import glob
import os
import pandas as pd

def process_quarter(names_df, acq_df, perf_df):
    acq_df = acq_df.merge(names_df, how='left', on=['seller_name'])
    acq_df.drop('seller_name', axis=1, inplace=True)
    acq_df['seller_name'] = acq_df['new']
    acq_df.drop('new', axis=1, inplace=True)
        
    df = perf_df
    ever_df = create_ever_features(df)
    delinq_merge = create_delinq_features(df)
    ever_df = join_ever_delinq_features(ever_df, delinq_merge)
    del(delinq_merge)
 
    joined_df = create_joined_df(df, ever_df)
    test_df = create_12_mon_features(joined_df)
    joined_df = combine_joined_12_mon(joined_df, test_df)
    del(test_df)
    
    perf_df = final_performance_delinquency(df, joined_df)
    del(df, joined_df)
    
    final_df = join_perf_acq_gdfs(perf_df, acq_df)
    del(perf_df)
    del(acq_df)
  
    final_df = last_mile_cleaning(final_df)
    return final_df

def process_quarter_file(acquisition_file, perf_file):
    print("Processing file: {0}".format(perf_file))
    
    #Load Data
    names_df = load_names(COL_NAMES_PATH)
    acq_df = load_acquisition_csv(acquisition_file)
    perf_df = load_performance_csv(perf_file)
    
    df = process_quarter(names_df, acq_df, perf_df)
    del(acq_df, perf_df, names_df)
    
    return df


<a id='dask_pandas_model_training'></a>
### Model Training (Multi-core)
CPU XGBoost using the Dask *train* method

In [None]:
import glob
from glob import glob
import os

def dask_process_quarter(client, acquisition_file, perf_file):
    print("Processing file: {0}".format(perf_file))
    #process_quarter_file(acquisition_file=acquisition_file,perf_file=perf_file)
    task = dask_run_task(delayed(process_quarter_file),
                                          acquisition_file=acquisition_file,
                                          perf_file=perf_file)
    return client.compute(task,
                          optimize_graph=False,
                          fifo_timeout="0ms")

# NOTE: The ETL calculates additional features which are then dropped before creating the XGBoost DMatrix.
# This can be optimized to avoid calculating the dropped features.
def dask_process_quarters(client, start_year, end_year):
    frames = []
    quarter = 1
    year = start_year
    while year != end_year:
        acquisition_file = ACQ_DATA_PATH + "/Acquisition_" + str(year) + "Q" + str(quarter) + ".txt"
        files = glob(os.path.join(PERF_DATA_PATH + "/Performance_" + str(year) + "Q" + str(quarter) + "*"))
        for file in files:
            delayed_df = dask_process_quarter(client, acquisition_file, file)
            frames.append(delayed_df)
                
        quarter += 1
        if quarter == 5:
            year += 1
            quarter = 1

    wait(frames)
    
    print('Concatenating dataframes...')
    frames = [delayed(pd.DataFrame)(df) for df in frames]
    frames = [df for df in frames]
    wait(frames)
        
    tmp_map = [(df, list(client.who_has(df).values())[0]) for df in frames]    
    new_map = {}
    for key, value in tmp_map:
        if value not in new_map:
            new_map[value] = [key]
        else:
            new_map[value].append(key)
    del(tmp_map)
    
    dfs = []
    for list_delayed in new_map.values():
        dfs.append(delayed(pd.concat)(list_delayed))
        
    del(frames, new_map)
    
    return dfs

In [None]:
%%time

client = dask_initialize(-1,0)
df_train = dask_process_quarters(client, START_YEAR, END_YEAR)

#Optional
#print("Frame Count: {0}".format(len(dfs_train)))

In [None]:
%%time
#filter the data by the desired columns
PRED_VARS_Y ='delinquency_12'

dfs_xy = [(df[[PRED_VARS_Y]], df[delayed(list)(df.columns.difference([PRED_VARS_Y]))]) for df in df_train]
dfs_xy = [(df[0].persist(), df[1].persist()) for df in dfs_xy]
gc.collect()
wait(dfs_xy)
#del (df_train) #Comment out for PART_COUNT benchmarking

In [None]:
%%time

#PART_COUNT=4 #Override the global part_count to increase DMatrices count which increases the CPU cores

#Add a rows_train calculation to the tuple
dfs_xyr = [(df[0], df[1], dask.delayed(len)(df[1])) for df in dfs_xy]
dfs_xyr = [(df[0].persist(), df[1].persist(), int(df[2].compute()*PERCENT_TRAIN)) for df in dfs_xyr]
wait(dfs_xyr)
del (dfs_xy)

#Use PART_COUNT as a stride to partition the dataset for each GPU
dfs_train = []
for i in range(0,PART_COUNT):
    dfs_train += ([(df[0][i:df[2]:PART_COUNT], df[1][i:df[2]:PART_COUNT]) for df in dfs_xyr])
dfs_train = [(df[0].persist(), df[1].persist()) for df in dfs_train]

#Use PART_COUNT as a stride to partition the dataset for each GPU
dfs_test = []
for i in range(0,PART_COUNT):
    dfs_test += ([(df[0][df[2]+i::PART_COUNT], df[1][df[2]+i::PART_COUNT]) for df in dfs_xyr])
dfs_test = [(df[0].persist(), df[1].persist()) for df in dfs_test]

wait(dfs_train)
wait(dfs_test)
del(dfs_xyr)

gc.collect()

#Optional: 
#rows_train = [dask.delayed(len)(df[1]) for df in dfs_train]
#rows_train = [df.compute() for df in rows_train]
#wait(rows_train)
#print(rows_train)
#rows_train_sum = sum(rows_train)
#print(rows_train_sum)

In [None]:
%%time
dtrain = [dask.delayed(xgb.DMatrix)(df[1].values, label=df[0].values, nthread=-1) for df in dfs_train]
dtrain = [dmatrix.persist() for dmatrix in dtrain]
wait(dtrain)

del(dfs_train) 
gc.collect()

#Optional
print("DMatrix Count: {0}".format(len(dtrain)))
#print(dtrain)

In [None]:
%%time

#The number of processing tasks determined by dask_initialize
NTHREADS_ALWAYS_1_IN_DASK=1

dxgb_params = {
    'nround':            NUM_ROUNDS,
    'max_depth':         8,
    'max_leaves':        2**8,
    'alpha':             0.9,
    'eta':               0.1,
#    'gamma':             0.1, #breaks the dask multi-core training wqith these parameters
    'learning_rate':     0.1,
    'subsample':         1,
    'reg_lambda':        1,
    'scale_pos_weight':  2,
    'min_child_weight':  30,
    'tree_method':       'hist',
    'nthread':           NTHREADS_ALWAYS_1_IN_DASK,
    'distributed_dask':  True,
    'loss':              'ls',
    'objective':         'reg:linear',
    'max_features':      'auto',
    'criterion':         'friedman_mse',
    'grow_policy':       'lossguide',
    'verbose':           True
}

gbm = dxgb.train(client, dxgb_params,dtrain, labels=None,num_boost_round=NUM_ROUNDS)

### Evaluation
Accuracy (AUC) is measured by the area under the ROC curve. The definition of an acceptable AUC is relative and not absolute based on the data and feature. An area of 1 represents a perfect test; an area of .5 represents a poor or unrealiable test. A suggested rough guide for classifying the accuracy of a XGBoost:<br>
>90 - 100 = Excellent <br>
>80 - 90 = Good <br>
>70 - 80 = Fair <br>
>60 - 70 = Poor <br>
>50 - 60 = Fail <br>
>50 and Below = Unreliable

In [None]:
%%time
#make some room in memory
del dtrain

#create the testing datasets for prediciotn and auc
dtest = [dask.dataframe.from_delayed(df[1]) for df in dfs_test]
dtest = [xgb.DMatrix(df.values, nthread=-1) for df in dtest]

y_test = [dask.dataframe.from_delayed(df[0]) for df in dfs_test]
y_test = [df.compute() for df in y_test]

wait(y_test)
wait(dtest)

del(dfs_test)


In [None]:
from sklearn import metrics
import xgboost as xgb

y_row_count = float(sum([len(df) for df in y_test]))

weighted_auc = 0
weighted_roc = 0
print("Partitions:")
for i in range(0,len(dtest)):
    d = dtest[i]
    y = y_test[i]
    pred = gbm.predict(d)
    
    fpr, tpr, thresholds = metrics.roc_curve(y, pred)
    auc = metrics.auc(fpr, tpr) * 100
    acc = metrics.roc_auc_score(y, pred) * 100

    scale = float(len(y))/y_row_count
    weighted_auc += auc * scale
    weighted_roc += acc * scale
    
    print("{0}: AUC: {1}, ROC AUC: {2}".format(i, auc, acc))

print("======================================================")
print("Weighted: ")
print("-- AUC: {0}, ROC AUC: {1}\n".format(weighted_auc, weighted_roc))


In [None]:
#cleanup for the next iteration

del dtrain
del dtest
del y_test
del dxgb_params
del gbm

gc.collect()

In [None]:
dask_release()

<a id='cudf_dataframe'></a>
## cuDF Dask DataFrame

### Data Preparation

In [None]:
from collections import OrderedDict

def cudf_load_performance_csv(performance_path, **kwargs):
    """ Loads performance data

    Returns
    -------
    GPU DataFrame
    """
    
    cols = [
        "loan_id", "monthly_reporting_period", "servicer", "interest_rate", "current_actual_upb",
        "loan_age", "remaining_months_to_legal_maturity", "adj_remaining_months_to_maturity",
        "maturity_date", "msa", "current_loan_delinquency_status", "mod_flag", "zero_balance_code",
        "zero_balance_effective_date", "last_paid_installment_date", "foreclosed_after",
        "disposition_date", "foreclosure_costs", "prop_preservation_and_repair_costs",
        "asset_recovery_costs", "misc_holding_expenses", "holding_taxes", "net_sale_proceeds",
        "credit_enhancement_proceeds", "repurchase_make_whole_proceeds", "other_foreclosure_proceeds",
        "non_interest_bearing_upb", "principal_forgiveness_upb", "repurchase_make_whole_proceeds_flag",
        "foreclosure_principal_write_off_amount", "servicing_activity_indicator"
    ]
    
    #using categoricals to replace the string with a hashed int32
    dtypes = OrderedDict([
        ("loan_id", "int64"),
        ("monthly_reporting_period", "date"),
        ("servicer", "category"),
        ("interest_rate", "float64"),
        ("current_actual_upb", "float64"),
        ("loan_age", "float64"),
        ("remaining_months_to_legal_maturity", "float64"),
        ("adj_remaining_months_to_maturity", "float64"),
        ("maturity_date", "date"),
        ("msa", "float64"),
        ("current_loan_delinquency_status", "int32"),
        ("mod_flag", "category"),
        ("zero_balance_code", "category"),
        ("zero_balance_effective_date", "date"),
        ("last_paid_installment_date", "date"),
        ("foreclosed_after", "date"),
        ("disposition_date", "date"),
        ("foreclosure_costs", "float64"),
        ("prop_preservation_and_repair_costs", "float64"),
        ("asset_recovery_costs", "float64"),
        ("misc_holding_expenses", "float64"),
        ("holding_taxes", "float64"),
        ("net_sale_proceeds", "float64"),
        ("credit_enhancement_proceeds", "float64"),
        ("repurchase_make_whole_proceeds", "float64"),
        ("other_foreclosure_proceeds", "float64"),
        ("non_interest_bearing_upb", "float64"),
        ("principal_forgiveness_upb", "float64"),
        ("repurchase_make_whole_proceeds_flag", "category"),
        ("foreclosure_principal_write_off_amount", "float64"),
        ("servicing_activity_indicator", "category")
    ])

    return cudf.read_csv(performance_path, names=cols, delimiter='|', dtype=list(dtypes.values()), skiprows=1)

def cudf_load_acquisition_csv(acquisition_path, **kwargs):
    """ Loads acquisition data

    Returns
    -------
    GPU DataFrame
    """
    
    cols = [
        'loan_id', 'orig_channel', 'seller_name', 'orig_interest_rate', 'orig_upb', 'orig_loan_term', 
        'orig_date', 'first_pay_date', 'orig_ltv', 'orig_cltv', 'num_borrowers', 'dti', 'borrower_credit_score', 
        'first_home_buyer', 'loan_purpose', 'property_type', 'num_units', 'occupancy_status', 'property_state',
        'zip', 'mortgage_insurance_percent', 'product_type', 'coborrow_credit_score', 'mortgage_insurance_type', 
        'relocation_mortgage_indicator'
    ]
 
    #using categoricals to replace the string with a hashed int32
    dtypes = OrderedDict([
        ("loan_id", "int64"),
        ("orig_channel", "category"),
        ("seller_name", "category"),
        ("orig_interest_rate", "float64"),
        ("orig_upb", "int64"),
        ("orig_loan_term", "int64"),
        ("orig_date", "date"),
        ("first_pay_date", "date"),
        ("orig_ltv", "float64"),
        ("orig_cltv", "float64"),
        ("num_borrowers", "float64"),
        ("dti", "float64"),
        ("borrower_credit_score", "float64"),
        ("first_home_buyer", "category"),
        ("loan_purpose", "category"),
        ("property_type", "category"),
        ("num_units", "int64"),
        ("occupancy_status", "category"),
        ("property_state", "category"),
        ("zip", "int64"),
        ("mortgage_insurance_percent", "float64"),
        ("product_type", "category"),
        ("coborrow_credit_score", "float64"),
        ("mortgage_insurance_type", "float64"),
        ("relocation_mortgage_indicator", "category")
    ])
    
    return cudf.read_csv(acquisition_path, names=cols, delimiter='|', dtype=list(dtypes.values()), skiprows=1)

def cudf_load_names(col_names_path, **kwargs):
    """ Loads names used for renaming the banks
    
    Returns
    -------
    GPU DataFrame
    """

    cols = [
        'seller_name', 'new'
    ]

    #using categoricals to replace the string with a hashed int32
    dtypes = OrderedDict([
        ("seller_name", "category"),
        ("new", "category"),
    ])

    return cudf.read_csv(col_names_path, names=cols, delimiter='|', dtype=list(dtypes.values()), skiprows=1)
    

ETL and Feature Engineering Functions

In [None]:

def cudf_null_workaround(df, **kwargs):
    for column, data_type in df.dtypes.items():
        if str(data_type) == "category":
            df[column] = df[column].astype('int32').fillna(-1)
        if str(data_type) in ['int8', 'int16', 'int32', 'int64', 'float32', 'float64']:
            df[column] = df[column].fillna(-1)
    return df

def cudf_create_12_mon_features(joined_df, **kwargs):
    result_dfs = []
    n_months = 12
    for y in range(1, n_months + 1):
        temp_df = joined_df[['loan_id', 'timestamp_year', 'timestamp_month', 'delinquency_12', 'upb_12']]
        temp_df['temp_months'] = temp_df['timestamp_year'] * 12 + temp_df['timestamp_month']
        temp_df['temp_mody_n'] = ((temp_df['temp_months'].astype('float64') - 24000 - y) / 12).floor()
        temp_df = temp_df.groupby(['loan_id', 'temp_mody_n'], method='hash').agg({'delinquency_12': 'max','upb_12': 'min'})
        temp_df['delinquency_12'] = (temp_df['max_delinquency_12']>3).astype('int32')
        temp_df['delinquency_12'] +=(temp_df['min_upb_12']==0).astype('int32')
        temp_df.drop_column('max_delinquency_12')
        temp_df['upb_12'] = temp_df['min_upb_12']
        temp_df.drop_column('min_upb_12')
        temp_df['timestamp_year'] = (((temp_df['temp_mody_n'] * n_months) + 24000 + (y - 1)) / 12).floor().astype('int16')
        temp_df['timestamp_month'] = np.int8(y)
        temp_df.drop_column('temp_mody_n')
        
        result_dfs.append(temp_df)
        del(temp_df)
        
    del(joined_df)
    return cudf.concat(result_dfs)

def cudf_create_ever_features(df, **kwargs):
    ever_df = df[['loan_id', 'current_loan_delinquency_status']]
    ever_df = ever_df.groupby('loan_id', method='hash').max()
    del(df)
    
    ever_df['ever_30'] = (ever_df['max_current_loan_delinquency_status'] >= 1).astype('int8')
    ever_df['ever_90'] = (ever_df['max_current_loan_delinquency_status'] >= 3).astype('int8')
    ever_df['ever_180'] = (ever_df['max_current_loan_delinquency_status'] >= 6).astype('int8')
    ever_df.drop_column('max_current_loan_delinquency_status')
    return ever_df

def cudf_create_delinq_features(df, **kwargs):
    delinq_df = df[['loan_id', 'monthly_reporting_period', 'current_loan_delinquency_status']]
    del(df)
    
    delinq_30 = delinq_df.query('current_loan_delinquency_status >= 1')[['loan_id', 'monthly_reporting_period']].groupby('loan_id', method='hash').min()
    delinq_30['delinquency_30'] = delinq_30['min_monthly_reporting_period']
    delinq_30.drop_column('min_monthly_reporting_period')
    delinq_90 = delinq_df.query('current_loan_delinquency_status >= 3')[['loan_id', 'monthly_reporting_period']].groupby('loan_id', method='hash').min()
    delinq_90['delinquency_90'] = delinq_90['min_monthly_reporting_period']
    delinq_90.drop_column('min_monthly_reporting_period')
    delinq_180 = delinq_df.query('current_loan_delinquency_status >= 6')[['loan_id', 'monthly_reporting_period']].groupby('loan_id', method='hash').min()
    delinq_180['delinquency_180'] = delinq_180['min_monthly_reporting_period']
    delinq_180.drop_column('min_monthly_reporting_period')
    del(delinq_df)
    
    delinq_merge_df = delinq_30.merge(delinq_90, how='left', on=['loan_id'], type='hash')
    delinq_merge_df['delinquency_90'] = delinq_merge_df['delinquency_90'].fillna(np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]'))
    delinq_merge_df = delinq_merge_df.merge(delinq_180, how='left', on=['loan_id'], type='hash')
    delinq_merge_df['delinquency_180'] = delinq_merge_df['delinquency_180'].fillna(np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]'))
    del(delinq_30)
    del(delinq_90)
    del(delinq_180)
    return delinq_merge_df

def cudf_join_ever_delinq_features(df, delinq_df, **kwargs):
    ever_df = df.merge(delinq_df, on=['loan_id'], how='left', type='hash')
    del(df)
    del(delinq_df)
    
    ever_df['delinquency_30'] = ever_df['delinquency_30'].fillna(np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]'))
    ever_df['delinquency_90'] = ever_df['delinquency_90'].fillna(np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]'))
    ever_df['delinquency_180'] = ever_df['delinquency_180'].fillna(np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]'))
    return ever_df

def cudf_create_joined_df(df, ever_df, **kwargs):
    test = df[['loan_id', 'monthly_reporting_period', 'current_loan_delinquency_status', 'current_actual_upb']]
    del(df)
    
    test['timestamp'] = test['monthly_reporting_period']
    test.drop_column('monthly_reporting_period')
    test['timestamp_month'] = test['timestamp'].dt.month
    test['timestamp_year'] = test['timestamp'].dt.year
    test['delinquency_12'] = test['current_loan_delinquency_status']
    test.drop_column('current_loan_delinquency_status')
    test['upb_12'] = test['current_actual_upb']
    test.drop_column('current_actual_upb')
    test['upb_12'] = test['upb_12'].fillna(999999999)
    test['delinquency_12'] = test['delinquency_12'].fillna(-1)
    
    joined_df = test.merge(ever_df, how='left', on=['loan_id'], type='hash')
    del(ever_df)
    del(test)
    
    joined_df['ever_30'] = joined_df['ever_30'].fillna(-1)
    joined_df['ever_90'] = joined_df['ever_90'].fillna(-1)
    joined_df['ever_180'] = joined_df['ever_180'].fillna(-1)
    joined_df['delinquency_30'] = joined_df['delinquency_30'].fillna(-1)
    joined_df['delinquency_90'] = joined_df['delinquency_90'].fillna(-1)
    joined_df['delinquency_180'] = joined_df['delinquency_180'].fillna(-1)
    
    joined_df['timestamp_year'] = joined_df['timestamp_year'].astype('int32')
    joined_df['timestamp_month'] = joined_df['timestamp_month'].astype('int32')
    
    return joined_df

def cudf_combine_joined_12_mon(joined_df, testdf, **kwargs):
    joined_df.drop_column('delinquency_12')
    joined_df.drop_column('upb_12')
    joined_df['timestamp_year'] = joined_df['timestamp_year'].astype('int16')
    joined_df['timestamp_month'] = joined_df['timestamp_month'].astype('int8')
    return joined_df.merge(testdf, how='left', on=['loan_id', 'timestamp_year', 'timestamp_month'], type='hash')

def cudf_final_performance_delinquency(df, joined_df, **kwargs):
    merged_df = cudf_null_workaround(df)
    joined_df = cudf_null_workaround(joined_df)
    merged_df['timestamp_month'] = merged_df['monthly_reporting_period'].dt.month
    merged_df['timestamp_month'] = merged_df['timestamp_month'].astype('int8')
    merged_df['timestamp_year'] = merged_df['monthly_reporting_period'].dt.year
    merged_df['timestamp_year'] = merged_df['timestamp_year'].astype('int16')
    merged_df = merged_df.merge(joined_df, how='left', on=['loan_id', 'timestamp_year', 'timestamp_month'], type='hash')
    merged_df.drop_column('timestamp_year')
    merged_df.drop_column('timestamp_month')
    return merged_df

def cudf_join_perf_acq_gdfs(perf_df, acq_df, **kwargs):
    perf_df = cudf_null_workaround(perf_df)
    acq_df = cudf_null_workaround(acq_df)
    return perf_df.merge(acq_df, how='left', on=['loan_id'], type='hash')

def cudf_last_mile_cleaning(df, **kwargs):
    drop_list = [
        'loan_id', 'orig_date', 'first_pay_date', 'seller_name',
        'monthly_reporting_period', 'last_paid_installment_date', 'maturity_date', 'ever_30', 'ever_90', 'ever_180',
        'delinquency_30', 'delinquency_90', 'delinquency_180', 'upb_12',
        'zero_balance_effective_date','foreclosed_after', 'disposition_date','timestamp'
    ]
    for column in drop_list:
        df.drop_column(column)
    for col, dtype in df.dtypes.iteritems():
        if str(dtype)=='category':
            df[col] = df[col].cat.codes
        df[col] = df[col].astype('float32')
    df['delinquency_12'] = df['delinquency_12'] > 0
    df['delinquency_12'] = df['delinquency_12'].fillna(False).astype('int32')
    return df

In [None]:
def cudf_process_quarter(names_df, acq_df, perf_df):
    acq_df = acq_df.merge(names_df, how='left', on=['seller_name'])
    acq_df.drop_column('seller_name')
    acq_df['seller_name'] = acq_df['new']
    acq_df.drop_column('new')
        
    df = perf_df
    ever_df = cudf_create_ever_features(df)
    delinq_merge = cudf_create_delinq_features(df)
    ever_df = cudf_join_ever_delinq_features(ever_df, delinq_merge)
    del(delinq_merge)
    
    joined_df = cudf_create_joined_df(df, ever_df)
    test_df = cudf_create_12_mon_features(joined_df)
    joined_df = cudf_combine_joined_12_mon(joined_df, test_df)
    del(test_df)
    
    perf_df = cudf_final_performance_delinquency(df, joined_df)
    del(df, joined_df)
    
    final_df = cudf_join_perf_acq_gdfs(perf_df, acq_df)
    del(perf_df)
    del(acq_df)
    
    final_df = cudf_last_mile_cleaning(final_df)
    return final_df

def cudf_process_quarter_file(acquisition_file, perf_file):
    #Load Data
    names_df = cudf_load_names(COL_NAMES_PATH)
    acq_df = cudf_load_acquisition_csv(acquisition_file)
    perf_df = cudf_load_performance_csv(perf_file)
    
    df = cudf_process_quarter(names_df, acq_df, perf_df)
    del(names_df, acq_df,perf_df)
    
 #   return df.to_arrow(index=False)
    return df.to_pandas()


<a id='dask_cudf_model_training'></a>
### Model Training (Multi-GPU)
GPU accelerated XGBoost using the Dask *train* method

In [None]:
import glob
from glob import glob
import os

def dask_cudf_process_quarter(client, acquisition_file, perf_file):
    print("Processing file: {0}".format(perf_file))
    #cudf_process_quarter_file(acquisition_file=acquisition_file,perf_file=perf_file)
    task = dask_run_task(delayed(cudf_process_quarter_file),
                                          acquisition_file=acquisition_file,
                                          perf_file=perf_file)
    return client.compute(task,
                          optimize_graph=False,
                          fifo_timeout="0ms")

# NOTE: The ETL calculates additional features which are then dropped before creating the XGBoost DMatrix.
# This can be optimized to avoid calculating the dropped features.
def dask_cudf_process_quarters(client, start_year, end_year):
    frames = []
    quarter = 1
    year = start_year
    while year != end_year:
        acquisition_file = ACQ_DATA_PATH + "/Acquisition_" + str(year) + "Q" + str(quarter) + ".txt"
        files = glob(os.path.join(PERF_DATA_PATH + "/Performance_" + str(year) + "Q" + str(quarter) + "*"))
        for file in files:
            delayed_df = dask_cudf_process_quarter(client, acquisition_file, file)
            frames.append(delayed_df)
                
        quarter += 1
        if quarter == 5:
            year += 1
            quarter = 1

    wait(frames)
    
    print('Concatenating dataframes...')
    frames = [delayed(pd.DataFrame)(df) for df in frames]
    frames = [df for df in frames]
    wait(frames)

    tmp_map = [(df, list(client.who_has(df).values())[0]) for df in frames]
    new_map = {}
    for key, value in tmp_map:
        if value not in new_map:
            new_map[value] = [key]
        else:
            new_map[value].append(key)
    del(tmp_map)

    dfs = []
    for list_delayed in new_map.values():
#        dfs.append(delayed(cudf.concat)(list_delayed))
        dfs.append(delayed(pd.concat)(list_delayed))
        
    del(frames, new_map)
    
    return dfs

In [None]:
%%time

client = dask_initialize()
client.run(rmm_initialize)

df_train = dask_cudf_process_quarters(client, START_YEAR, END_YEAR)

client.run(rmm_release)
client.run(rmm_initialize_nopool)
#Optional
#print("Frame Count: {0}".format(len(dfs_train)))

In [None]:
%%time
#filter the data by the desired columns

PRED_VARS_Y ='delinquency_12'

dfs_xy = [(df[[PRED_VARS_Y]], df[delayed(list)(df.columns.difference([PRED_VARS_Y]))]) for df in df_train]
dfs_xy = [(df[0].persist(), df[1].persist()) for df in dfs_xy]
gc.collect()
wait(dfs_xy)
#del (dfs_train)  #Comment out for PART_COUNT benchmarking

In [None]:
%%time

#PART_COUNT = 1 #Override the global part_count to increase DMatrices count and reduce dataset size to match the gpu framebuffer size

#Add a rows_train calculation to the tuple
dfs_xyr = [(df[0], df[1], dask.delayed(len)(df[1])) for df in dfs_xy]
dfs_xyr = [(df[0].persist(), df[1].persist(), int(df[2].compute()*PERCENT_TRAIN)) for df in dfs_xyr]
wait(dfs_xyr)
del (dfs_xy)

#Use PART_COUNT as a stride to partition the dataset for each GPU
dfs_train = []
for i in range(0,PART_COUNT):
    dfs_train += ([(df[0][i:df[2]:PART_COUNT], df[1][i:df[2]:PART_COUNT]) for df in dfs_xyr])
dfs_train = [(df[0].persist(), df[1].persist()) for df in dfs_train]

#Use PART_COUNT as a stride to partition the dataset for each GPU
dfs_test = []
for i in range(0,PART_COUNT):
    dfs_test += ([(df[0][df[2]+i::PART_COUNT], df[1][df[2]+i::PART_COUNT]) for df in dfs_xyr])
dfs_test = [(df[0].persist(), df[1].persist()) for df in dfs_test]

wait(dfs_train)
wait(dfs_test)
del(dfs_xyr)

gc.collect()

#Optional: 
#rows_train = [dask.delayed(len)(df[1]) for df in dfs_train]
#rows_train = [df.compute() for df in rows_train]
#wait(rows_train)
#print(rows_train)
#rows_train_sum = sum(rows_train)
#print(rows_train_sum)

In [None]:
%%time
dtrain = [dask.delayed(xgb.DMatrix)(df[1].values, label=df[0].values, nthread=-1) for df in dfs_train]
dtrain = [dmatrix.persist() for dmatrix in dtrain]
wait(dtrain)
del(dfs_train)

gc.collect()

#Optional
print("DMatrix Count: {0}".format(len(dtrain)))
#print(dtrain)

In [None]:
%%time

NTHREADS_ALWAYS_1_IN_DASK=1
NGPUS_ALWAYS_1_IN_DASK=1

dxgb_params = {
    'nround':            NUM_ROUNDS,
    'max_depth':         8,
    'max_leaves':        2**8,
    'alpha':             0.9,
    'eta':               0.1,
    'gamma':             0.1,
    'learning_rate':     0.1,
    'subsample':         1,
    'reg_lambda':        1,
    'scale_pos_weight':  2,
    'min_child_weight':  30,
    'tree_method':       'gpu_hist',
    'nthread':           NTHREADS_ALWAYS_1_IN_DASK,
    'n_gpus':            NGPUS_ALWAYS_1_IN_DASK,
    'distributed_dask':  True,
    'loss':              'ls',
    'objective':         'gpu:reg:linear',
    'max_features':      'auto',
    'criterion':         'friedman_mse',
    'grow_policy':       'lossguide',
    'verbose':           True
}

gbm = dxgb.train(client, dxgb_params,dtrain, labels=None,num_boost_round=NUM_ROUNDS)

### Evaluation
Accuracy (AUC) is measured by the area under the ROC curve. The definition of an acceptable AUC is relative and not absolute based on the data and feature. An area of 1 represents a perfect test; an area of .5 represents a poor or unrealiable test. A suggested rough guide for classifying the accuracy of a XGBoost:<br>
>90 - 100 = Excellent <br>
>80 - 90 = Good <br>
>70 - 80 = Fair <br>
>60 - 70 = Poor <br>
>50 - 60 = Fail <br>
>50 and Below = Unreliable

In [None]:
%%time
#make some room in memory
del dtrain

#create the testing datasets for prediciotn and auc
dtest = [dask.dataframe.from_delayed(df[1]) for df in dfs_test]
dtest = [xgb.DMatrix(df.values, nthread=-1) for df in dtest]

y_test = [dask.dataframe.from_delayed(df[0]) for df in dfs_test]
y_test = [df.compute() for df in y_test]

wait(y_test)
wait(dtest)

del(dfs_test)


In [None]:
%%time
from sklearn import metrics
import xgboost as xgb

#Total row count in test dataset
y_row_count = float(sum([len(df) for df in y_test]))

weighted_auc = 0
weighted_roc = 0
print("Partitions:")
for i in range(0,len(dtest)):
    d = dtest[i]
    y = y_test[i]
    pred = gbm.predict(d)
    
    fpr, tpr, thresholds = metrics.roc_curve(y, pred)
    auc = metrics.auc(fpr, tpr) * 100
    acc = metrics.roc_auc_score(y, pred) * 100

    scale = float(len(y))/y_row_count
    weighted_auc += auc * scale
    weighted_roc += acc * scale
    
    print("{0}: AUC: {1}, ROC AUC: {2}".format(i, auc, acc))

print("======================================================")
print("Weighted: ")
print("-- AUC: {0}, ROC AUC: {1}\n".format(weighted_auc, weighted_roc))


In [None]:
#cleanup for the next iteration
del dtest
del y_test
del dxgb_params
del gbm

gc.collect()

In [None]:
client.run(rmm_release)
del client

In [None]:
dask_release()

<a id='cleanup'></a>
## Cleanup / Finalize

In [None]:
gc.collect()