In [None]:
import certifi
import cudf
import cuml
import cupy as cp
import gcsfs
import json
import numpy as np
import os
import pandas as pd
import random
import seaborn as sns
import time
import uuid
import yaml

from collections import OrderedDict
from functools import partial
from math import cos, sin, asin, sqrt, pi
from tqdm import tqdm
from typing import Optional

import dask
import dask.array as da
import dask.dataframe as dd
import dask_cudf

from dask_kubernetes import KubeCluster, make_pod_from_dict
from dask.distributed import Client, WorkerPlugin, wait, progress, get_worker

from cuml import ForestInference


class SimpleTimer:
    def __init__(self):
        self.start = None
        self.end = None
        self.elapsed = None

    def __enter__(self):
        self.start = time.perf_counter_ns()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.end = time.perf_counter_ns()
        self.elapsed = self.end - self.start

        
def create_pod_from_yaml(yaml_file):
    with open(yaml_file, 'r') as reader:
        d = yaml.safe_load(reader)
        d = dask.config.expand_environment_variables(d)
    return make_pod_from_dict(d)


def build_worker_and_scheduler_pods(sched_spec, worker_spec):
    assert os.path.isfile(sched_spec)
    assert os.path.isfile(worker_spec)

    sched_pod = create_pod_from_yaml(sched_spec)
    worker_pod = create_pod_from_yaml(worker_spec)

    return sched_pod, worker_pod


dask.config.set({"logging.kubernetes": "info",
                 "logging.distributed": "info",
                 "kubernetes.scheduler-service-type": "LoadBalancer",
                 "kubernetes.idle-timeout": None,
                 "kubernetes.scheduler-service-wait-timeout": 3600,
                 "kubernetes.deploy-mode": "remote",
                 "kubernetes.logging": "info",
                 "distributed.logging": "info",
                 "distributed.scheduler.idle-timeout": None,
                 "distributed.scheduler.locks.lease-timeout": None,
                 "distributed.comm.timeouts.connect": 3600,
                 "distributed.comm.tls.ca-file": certifi.where()})

YOUR_GCP_PROJECT ='<project_id>'
YOUR_GCP_TOKEN = '/etc/secrets/keyfile.json'
YOUR_MORTGAGE_DATA_PATH = 'gs://<path_to_file>'
YOUR_STORAGE_OPTS = {
    'token': YOUR_GCP_TOKEN # This will be the service account keyfile
}

# This should be the path where you have uploaded the full mortgage CSV data.
# - names.csv, perf, acq
YOUR_MORTGAGE_DATA_DIR = "gs://<mortgage_folder_path>/mortgage_large_csv"

# Where you want to land the converted Parquet files
YOUR_OUTPUT_PATH = "gs://<mortgage_out_folder_path>"

sched_spec_path = "./specs/sched-spec-custom.yaml"
worker_spec_path = "./specs/worker-spec-custom.yaml"

sched_pod, worker_pod = build_worker_and_scheduler_pods(sched_spec=sched_spec_path,
                                                        worker_spec=worker_spec_path)

In [None]:
cluster = KubeCluster(pod_template=worker_pod,
                      scheduler_pod_template=sched_pod)

client = Client(cluster)
scheduler_address = cluster.scheduler_address

In [None]:
cluster.scale(client, n_workers=8)

In [None]:
def load_names_csv(col_names_path, storage_opts):
    """ 
    Loads names used for renaming the banks

    Returns
    -------
    GPU DataFrame
    """

    cols = ["seller_name", "new"]

    dtypes = OrderedDict([
        ("seller_name", "str"),
        ("new", "str")
    ])

    df_names = dask_cudf.read_csv(
        col_names_path,
        names=cols,
        delimiter="|",
        dtype=list(dtypes.values()),
        skiprows=1,
        storage_options=storage_opts
    )
        
    return df_names

def load_acquisition_csv(fpath, storage_opts):
    """ 
    Loads acquisition data

    Returns
    -------
    GPU DataFrame
    """

    cols = [
        "loan_id",
        "orig_channel",
        "seller_name",
        "orig_interest_rate",
        "orig_upb",
        "orig_loan_term",
        "orig_date",
        "first_pay_date",
        "orig_ltv",
        "orig_cltv",
        "num_borrowers",
        "dti",
        "borrower_credit_score",
        "first_home_buyer",
        "loan_purpose",
        "property_type",
        "num_units",
        "occupancy_status",
        "property_state",
        "zip",
        "mortgage_insurance_percent",
        "product_type",
        "coborrow_credit_score",
        "mortgage_insurance_type",
        "relocation_mortgage_indicator",
    ]

    dtypes = OrderedDict(
        [
            ("loan_id", "int64"),
            ("orig_channel", "str"),
            ("seller_name", "str"),
            ("orig_interest_rate", "float64"),
            ("orig_upb", "int64"),
            ("orig_loan_term", "int64"),
            ("orig_date", "date"),
            ("first_pay_date", "date"),
            ("orig_ltv", "float64"),
            ("orig_cltv", "float64"),
            ("num_borrowers", "float64"),
            ("dti", "float64"),
            ("borrower_credit_score", "float64"),
            ("first_home_buyer", "str"),
            ("loan_purpose", "str"),
            ("property_type", "str"),
            ("num_units", "int64"),
            ("occupancy_status", "str"),
            ("property_state", "str"),
            ("zip", "int64"),
            ("mortgage_insurance_percent", "float64"),
            ("product_type", "str"),
            ("coborrow_credit_score", "float64"),
            ("mortgage_insurance_type", "float64"),
            ("relocation_mortgage_indicator", "str"),
        ]
    )

    print(fpath)
    df_acq = dask_cudf.read_csv(
        fpath,
        names=cols,
        delimiter="|",
        dtype=list(dtypes.values()),
        skiprows=1,
        storage_options=storage_opts
    )
    
    return df_acq

def load_performance_csv(fpath, storage_opts):
    """ 
    Loads performance data

    Returns
    -------
    GPU DataFrame
    """

    cols = [
        "loan_id",
        "monthly_reporting_period",
        "servicer",
        "interest_rate",
        "current_actual_upb",
        "loan_age",
        "remaining_months_to_legal_maturity",
        "adj_remaining_months_to_maturity",
        "maturity_date",
        "msa",
        "current_loan_delinquency_status",
        "mod_flag",
        "zero_balance_code",
        "zero_balance_effective_date",
        "last_paid_installment_date",
        "foreclosed_after",
        "disposition_date",
        "foreclosure_costs",
        "prop_preservation_and_repair_costs",
        "asset_recovery_costs",
        "misc_holding_expenses",
        "holding_taxes",
        "net_sale_proceeds",
        "credit_enhancement_proceeds",
        "repurchase_make_whole_proceeds",
        "other_foreclosure_proceeds",
        "non_interest_bearing_upb",
        "principal_forgiveness_upb",
        "repurchase_make_whole_proceeds_flag",
        "foreclosure_principal_write_off_amount",
        "servicing_activity_indicator",
    ]

    dtypes = OrderedDict(
        [
            ("loan_id", "int64"),
            ("monthly_reporting_period", "date"),
            ("servicer", "str"),
            ("interest_rate", "float64"),
            ("current_actual_upb", "float64"),
            ("loan_age", "float64"),
            ("remaining_months_to_legal_maturity", "float64"),
            ("adj_remaining_months_to_maturity", "float64"),
            ("maturity_date", "date"),
            ("msa", "float64"),
            ("current_loan_delinquency_status", "int32"),
            ("mod_flag", "str"),
            ("zero_balance_code", "str"),
            ("zero_balance_effective_date", "date"),
            ("last_paid_installment_date", "date"),
            ("foreclosed_after", "date"),
            ("disposition_date", "date"),
            ("foreclosure_costs", "float64"),
            ("prop_preservation_and_repair_costs", "float64"),
            ("asset_recovery_costs", "float64"),
            ("misc_holding_expenses", "float64"),
            ("holding_taxes", "float64"),
            ("net_sale_proceeds", "float64"),
            ("credit_enhancement_proceeds", "float64"),
            ("repurchase_make_whole_proceeds", "float64"),
            ("other_foreclosure_proceeds", "float64"),
            ("non_interest_bearing_upb", "float64"),
            ("principal_forgiveness_upb", "float64"),
            ("repurchase_make_whole_proceeds_flag", "str"),
            ("foreclosure_principal_write_off_amount", "float64"),
            ("servicing_activity_indicator", "str"),
        ]
    )

    print(fpath)
    df_perf = dask_cudf.read_csv(
        fpath,
        names=cols,
        delimiter="|",
        dtype=list(dtypes.values()),
        skiprows=1,
        storage_options=storage_opts
    )
    
    return df_perf

def process_acq(df_names, fpath, storage_opts):
    df_acq = load_acquisition_csv(fpath, storage_opts)
    
    #Normalize seller names
    df_acq = df_acq.merge(df_names, how="left", on=["seller_name"])
    df_acq["seller_name"] = df_acq["new"]
    df_acq.drop(columns="new")
        
    return df_acq


def process_perf(df_acq, fpath, storage_opts):
    df_perf = load_performance_csv(fpath, storage_opts)
    
    return df_perf

def ingest_and_convert(start_year, end_year, data_dir, client, filesystem, storage_opts):
    """
    Driver function for the ETL step
    
    Iterates through all files in `data_dir` between `start_year` 
    and `end_year` and calls the ETL function for each file.
    
    Returns
    -------
    Dask futures to arrow tables containing post ETL data for all processed files.
    """
    print("Starting ETL")
    etl_start = time.perf_counter_ns()
    workers = client.has_what().keys()
    
    path_names = f"{data_dir}/names.csv"
    path_perf = f"{data_dir}/perf"
    path_acq = f"{data_dir}/acq"
    
    # Pre-load name map
    df_names = load_names_csv(col_names_path=path_names, storage_opts=storage_opts)
    
    perf_dfs = []
    acq_dfs = []
    for year in range(start_year, end_year+1):
        for quarter in range(1, 4+1):
            # Pre-load acquisition data for the current year/quarter
            acq = f"{path_acq}/Acquisition_{year}Q{quarter}.txt"
            acq_dfs.extend([process_acq(df_names, acq, storage_opts)])
            
            # Find all performance files for the current year/quarter
            globstr = f"{path_perf}/Performance_{year}Q{quarter}*"
            files = filesystem.glob(globstr)
            
            for file in files:
                # Process each performance file and combine them into a single dataframe
                perf_dfs.append(process_perf(acq_dfs[-1], f"gs://{file}", storage_opts))
    
    df_acq = dask.dataframe.multi.concat(acq_dfs)
    df_perf = dask.dataframe.multi.concat(perf_dfs)
    
    with dask.annotate(workers=set(workers)):
        df_acq = client.persist(collections=df_acq)
    
    wait(df_acq)
    df_acq.to_parquet(f"{YOUR_OUTPUT_PATH}/mortgage_acq.parquet")
    del df_acq
    
    with dask.annotate(workers=set(workers)):
        df_perf = client.persist(collections=df_perf)
    
    wait(df_perf)
    df_perf.to_parquet(f"{YOUR_OUTPUT_PATH}/mortgage_perf.parquet")
    del df_perf
    
    etl_elapsed_ns = time.perf_counter_ns() - etl_start
    print(f"Conversion took: {etl_elapsed_ns/1e9:0.2f} sec.")

In [None]:
gcs_fs = gcsfs.core.GCSFileSystem(project=YOUR_PROJECT_ID)

ingest_and_convert(start_year=2000, end_year=2016, data_dir=YOUR_MORTGAGE_DATA_DIR, client=client,
                   filesystem=gcs_fs, storage_opts=YOUR_STORAGE_OPTS)