# Mortgage Workflow

## The Dataset
The dataset used with this workflow is derived from [Fannie Maeâ€™s Single-Family Loan Performance Data](http://www.fanniemae.com/portal/funding-the-market/data/loan-performance-data.html) with all rights reserved by Fannie Mae. This processed dataset is redistributed with permission and consent from Fannie Mae.

To acquire this dataset, please visit [RAPIDS Datasets Homepage](https://docs.rapids.ai/datasets/mortgage-data)

## Introduction
The Mortgage workflow is composed of three core phases:

1. ETL - Extract, Transform, Load
2. Data Conversion
3. ML - Training

### ETL
Data is 
1. Read in from storage
2. Transformed to emphasize key features
3. Loaded into volatile memory for conversion

### Data Conversion
Features are
1. Broken into (labels, data) pairs
2. Distributed across many workers
3. Converted into compressed sparse row (CSR) matrix format for XGBoost

### Machine Learning
The CSR data is fed into a distributed training session with `xgboost.dask`

---
If required, the notebook can be converted to a python script for execution using tools like `nbconvert`

```sh
$ jupyter nbconvert --to python mortgage_e2e.ipynb
$ python mortgage_e2e.py
```


#### Imports statements

In [1]:
from utils.utils import (
    determine_dataset,
    get_data,
    memory_info,
)

from dask_cuda import LocalCUDACluster
from dask.delayed import delayed
from dask.distributed import Client, wait
import rmm

import numpy as np

from collections import OrderedDict
import argparse
import gc
from glob import glob
import os
import subprocess
import time

#### Define functions to encapsulate the workflow into a single call

In [2]:
def run_dask_task(func, **kwargs):
    task = func(**kwargs)
    return task


def process_quarter_gpu(
    year=2000, quarter=1, perf_file="", data_dir="", client=None, **kwargs
):
    ml_arrays = run_dask_task(
        delayed(run_gpu_workflow), quarter=quarter, year=year, perf_file=perf_file
    )
    return client.compute(ml_arrays, optimize_graph=False, fifo_timeout="0ms")


def run_gpu_workflow(
    quarter=1, year=2000, perf_file="", acq_file="", names_file="", **kwargs
):
    names = gpu_load_names(col_names_path=data_dir + "names.csv")
    names = hash_df_string_columns(names)
    acq_gdf = gpu_load_acquisition_csv(
        acquisition_path=data_dir
        + "acq"
        + "/Acquisition_"
        + str(year)
        + "Q"
        + str(quarter)
        + ".txt"
    )
    acq_gdf = hash_df_string_columns(acq_gdf)
    acq_gdf = acq_gdf.merge(names, how="left", on=["seller_name"])
    acq_gdf.drop_column("seller_name")
    acq_gdf["seller_name"] = acq_gdf["new"]
    acq_gdf.drop_column("new")
    perf_df_tmp = gpu_load_performance_csv(perf_file)
    perf_df_tmp = hash_df_string_columns(perf_df_tmp)
    gdf = perf_df_tmp
    everdf = create_ever_features(gdf)
    delinq_merge = create_delinq_features(gdf)
    everdf = join_ever_delinq_features(everdf, delinq_merge)
    del delinq_merge
    joined_df = create_joined_df(gdf, everdf)
    testdf = create_12_mon_features(joined_df)
    joined_df = combine_joined_12_mon(joined_df, testdf)
    del testdf
    perf_df = final_performance_delinquency(gdf, joined_df)
    del (gdf, joined_df)
    final_gdf = join_perf_acq_gdfs(perf_df, acq_gdf)
    del perf_df
    del acq_gdf
    final_gdf = last_mile_cleaning(final_gdf)
    return final_gdf


def gpu_load_performance_csv(performance_path, **kwargs):
    """ 
    Loads performance data

    Returns
    -------
    GPU DataFrame
    """

    cols = [
        "loan_id",
        "monthly_reporting_period",
        "servicer",
        "interest_rate",
        "current_actual_upb",
        "loan_age",
        "remaining_months_to_legal_maturity",
        "adj_remaining_months_to_maturity",
        "maturity_date",
        "msa",
        "current_loan_delinquency_status",
        "mod_flag",
        "zero_balance_code",
        "zero_balance_effective_date",
        "last_paid_installment_date",
        "foreclosed_after",
        "disposition_date",
        "foreclosure_costs",
        "prop_preservation_and_repair_costs",
        "asset_recovery_costs",
        "misc_holding_expenses",
        "holding_taxes",
        "net_sale_proceeds",
        "credit_enhancement_proceeds",
        "repurchase_make_whole_proceeds",
        "other_foreclosure_proceeds",
        "non_interest_bearing_upb",
        "principal_forgiveness_upb",
        "repurchase_make_whole_proceeds_flag",
        "foreclosure_principal_write_off_amount",
        "servicing_activity_indicator",
    ]

    dtypes = OrderedDict(
        [
            ("loan_id", "int64"),
            ("monthly_reporting_period", "date"),
            ("servicer", "str"),
            ("interest_rate", "float64"),
            ("current_actual_upb", "float64"),
            ("loan_age", "float64"),
            ("remaining_months_to_legal_maturity", "float64"),
            ("adj_remaining_months_to_maturity", "float64"),
            ("maturity_date", "date"),
            ("msa", "float64"),
            ("current_loan_delinquency_status", "int32"),
            ("mod_flag", "str"),
            ("zero_balance_code", "str"),
            ("zero_balance_effective_date", "date"),
            ("last_paid_installment_date", "date"),
            ("foreclosed_after", "date"),
            ("disposition_date", "date"),
            ("foreclosure_costs", "float64"),
            ("prop_preservation_and_repair_costs", "float64"),
            ("asset_recovery_costs", "float64"),
            ("misc_holding_expenses", "float64"),
            ("holding_taxes", "float64"),
            ("net_sale_proceeds", "float64"),
            ("credit_enhancement_proceeds", "float64"),
            ("repurchase_make_whole_proceeds", "float64"),
            ("other_foreclosure_proceeds", "float64"),
            ("non_interest_bearing_upb", "float64"),
            ("principal_forgiveness_upb", "float64"),
            ("repurchase_make_whole_proceeds_flag", "str"),
            ("foreclosure_principal_write_off_amount", "float64"),
            ("servicing_activity_indicator", "str"),
        ]
    )

    return cudf.read_csv(
        performance_path,
        names=cols,
        delimiter="|",
        dtype=list(dtypes.values()),
        skiprows=1,
    )


def gpu_load_acquisition_csv(acquisition_path, **kwargs):
    """ 
    Loads acquisition data

    Returns
    -------
    GPU DataFrame
    """

    cols = [
        "loan_id",
        "orig_channel",
        "seller_name",
        "orig_interest_rate",
        "orig_upb",
        "orig_loan_term",
        "orig_date",
        "first_pay_date",
        "orig_ltv",
        "orig_cltv",
        "num_borrowers",
        "dti",
        "borrower_credit_score",
        "first_home_buyer",
        "loan_purpose",
        "property_type",
        "num_units",
        "occupancy_status",
        "property_state",
        "zip",
        "mortgage_insurance_percent",
        "product_type",
        "coborrow_credit_score",
        "mortgage_insurance_type",
        "relocation_mortgage_indicator",
    ]

    dtypes = OrderedDict(
        [
            ("loan_id", "int64"),
            ("orig_channel", "str"),
            ("seller_name", "str"),
            ("orig_interest_rate", "float64"),
            ("orig_upb", "int64"),
            ("orig_loan_term", "int64"),
            ("orig_date", "date"),
            ("first_pay_date", "date"),
            ("orig_ltv", "float64"),
            ("orig_cltv", "float64"),
            ("num_borrowers", "float64"),
            ("dti", "float64"),
            ("borrower_credit_score", "float64"),
            ("first_home_buyer", "str"),
            ("loan_purpose", "str"),
            ("property_type", "str"),
            ("num_units", "int64"),
            ("occupancy_status", "str"),
            ("property_state", "str"),
            ("zip", "int64"),
            ("mortgage_insurance_percent", "float64"),
            ("product_type", "str"),
            ("coborrow_credit_score", "float64"),
            ("mortgage_insurance_type", "float64"),
            ("relocation_mortgage_indicator", "str"),
        ]
    )

    return cudf.read_csv(
        acquisition_path,
        names=cols,
        delimiter="|",
        dtype=list(dtypes.values()),
        skiprows=1,
    )


def gpu_load_names(col_names_path="", **kwargs):
    """ 
    Loads names used for renaming the banks

    Returns
    -------
    GPU DataFrame
    """

    cols = ["seller_name", "new"]

    dtypes = OrderedDict([("seller_name", "str"), ("new", "str"),])

    return cudf.read_csv(
        col_names_path,
        names=cols,
        delimiter="|",
        dtype=list(dtypes.values()),
        skiprows=1,
    )


def hash_df_string_columns(gdf):
    """
    Hash all string columns in a cudf dataframe

    Returns
    -------
    Dataframe with all string columns replaced by hashed values for the strings
    """
    for col in gdf.columns:
        if cudf.utils.dtypes.is_string_dtype(gdf[col]):
            gdf[col] = gdf[col].hash_values()
    return gdf


def create_ever_features(gdf, **kwargs):
    """
    Creates features denoting whether a loan_id has ever been delinquent
    for over 30, 90 and 180 days.
    """
    everdf = gdf[["loan_id", "current_loan_delinquency_status"]]
    everdf = everdf.groupby("loan_id", method="hash", as_index=False).max()
    del gdf
    everdf["ever_30"] = (everdf["current_loan_delinquency_status"] >= 1).astype("int8")
    everdf["ever_90"] = (everdf["current_loan_delinquency_status"] >= 3).astype("int8")
    everdf["ever_180"] = (everdf["current_loan_delinquency_status"] >= 6).astype("int8")
    everdf.drop_column("current_loan_delinquency_status")
    return everdf


def create_delinq_features(gdf, **kwargs):
    """
    Computes features denoting the earliest reported date when a loan_id
    became delinquent for more than 30, 90 and 180 days.
    """
    delinq_gdf = gdf[
        ["loan_id", "monthly_reporting_period", "current_loan_delinquency_status",]
    ]
    del gdf
    delinq_30 = (
        delinq_gdf.query("current_loan_delinquency_status >= 1")[
            ["loan_id", "monthly_reporting_period"]
        ]
        .groupby("loan_id", method="hash", as_index=False)
        .min()
    )
    delinq_30["delinquency_30"] = delinq_30["monthly_reporting_period"]
    delinq_30.drop_column("monthly_reporting_period")
    delinq_90 = (
        delinq_gdf.query("current_loan_delinquency_status >= 3")[
            ["loan_id", "monthly_reporting_period"]
        ]
        .groupby("loan_id", method="hash", as_index=False)
        .min()
    )
    delinq_90["delinquency_90"] = delinq_90["monthly_reporting_period"]
    delinq_90.drop_column("monthly_reporting_period")
    delinq_180 = (
        delinq_gdf.query("current_loan_delinquency_status >= 6")[
            ["loan_id", "monthly_reporting_period"]
        ]
        .groupby("loan_id", method="hash", as_index=False)
        .min()
    )
    delinq_180["delinquency_180"] = delinq_180["monthly_reporting_period"]
    delinq_180.drop_column("monthly_reporting_period")
    del delinq_gdf
    delinq_merge = delinq_30.merge(delinq_90, how="left", on=["loan_id"], type="hash")
    delinq_merge = delinq_merge.merge(
        delinq_180, how="left", on=["loan_id"], type="hash"
    )
    del delinq_30
    del delinq_90
    del delinq_180
    return delinq_merge


def join_ever_delinq_features(everdf_tmp, delinq_merge, **kwargs):
    """
    Merges the ever and delinq features table on loan_id
    """
    everdf = everdf_tmp.merge(delinq_merge, on=["loan_id"], how="left", type="hash")
    del everdf_tmp
    del delinq_merge
    return everdf


def create_joined_df(gdf, everdf, **kwargs):
    """
    Join the performance table with the features table. (delinq and ever features)
    """
    test = gdf[
        [
            "loan_id",
            "monthly_reporting_period",
            "current_loan_delinquency_status",
            "current_actual_upb",
        ]
    ]
    del gdf
    test["timestamp"] = test["monthly_reporting_period"]
    test.drop_column("monthly_reporting_period")
    test["timestamp_month"] = test["timestamp"].dt.month
    test["timestamp_year"] = test["timestamp"].dt.year
    test["delinquency_12"] = test["current_loan_delinquency_status"]
    test.drop_column("current_loan_delinquency_status")
    test["upb_12"] = test["current_actual_upb"]
    test.drop_column("current_actual_upb")

    joined_df = test.merge(everdf, how="left", on=["loan_id"], type="hash")
    del everdf
    del test

    joined_df["timestamp_year"] = joined_df["timestamp_year"].astype("int32")
    joined_df["timestamp_month"] = joined_df["timestamp_month"].astype("int32")

    return joined_df


def create_12_mon_features(joined_df, **kwargs):
    """
    For every loan_id in a 12 month window compute a feature denoting
    whether it has been delinquent for over 3 months or had an unpaid principal balance.
    The 12 month window moves by a month to span across all months of the year.
    
    The computations windows for each loan_id follows the pattern below
    Window 1: Jan 2000 - Jan 2001, Jan 2001 - Jan 2002
    Window 2: Feb 2000- Feb 2001, Feb 2001 - Feb 2002
    """
    testdfs = []
    n_months = 12
    for y in range(1, n_months + 1):
        tmpdf = joined_df[
            ["loan_id", "timestamp_year", "timestamp_month", "delinquency_12", "upb_12"]
        ]
        tmpdf["josh_months"] = tmpdf["timestamp_year"] * 12 + tmpdf["timestamp_month"]
        tmpdf["josh_mody_n"] = (
            (tmpdf["josh_months"].astype("float64") - 24000 - y) / 12
        ).floor()
        tmpdf = tmpdf.groupby(
            ["loan_id", "josh_mody_n"], method="hash", as_index=False
        ).agg({"delinquency_12": "max", "upb_12": "min"})
        tmpdf["delinquency_12"] = (tmpdf["delinquency_12"] > 3).astype("int32")
        tmpdf["delinquency_12"] += (tmpdf["upb_12"] == 0).astype("int32")
        tmpdf["timestamp_year"] = (
            (((tmpdf["josh_mody_n"] * n_months) + 24000 + (y - 1)) / 12)
            .floor()
            .astype("int16")
        )
        tmpdf["timestamp_month"] = np.int8(y)
        tmpdf.drop_column("josh_mody_n")
        testdfs.append(tmpdf)
        del tmpdf
    del joined_df

    return cudf.concat(testdfs)


def combine_joined_12_mon(joined_df, testdf, **kwargs):
    """
    Combines the 12_mon features table with the ever_delinq features tables
    """
    joined_df.drop_column("delinquency_12")
    joined_df.drop_column("upb_12")
    joined_df["timestamp_year"] = joined_df["timestamp_year"].astype("int16")
    joined_df["timestamp_month"] = joined_df["timestamp_month"].astype("int8")
    return joined_df.merge(
        testdf,
        how="left",
        on=["loan_id", "timestamp_year", "timestamp_month"],
        type="hash",
    )


def final_performance_delinquency(gdf, joined_df, **kwargs):
    """
    Combines the grouped table with all features with the original Performance table
    """
    merged = gdf
    joined_df["timestamp_month"] = joined_df["timestamp_month"].astype("int8")
    joined_df["timestamp_year"] = joined_df["timestamp_year"].astype("int16")
    merged["timestamp_month"] = merged["monthly_reporting_period"].dt.month
    merged["timestamp_month"] = merged["timestamp_month"].astype("int8")
    merged["timestamp_year"] = merged["monthly_reporting_period"].dt.year
    merged["timestamp_year"] = merged["timestamp_year"].astype("int16")
    merged = merged.merge(
        joined_df,
        how="left",
        on=["loan_id", "timestamp_year", "timestamp_month"],
        type="hash",
    )
    merged.drop_column("timestamp_year")
    merged.drop_column("timestamp_month")
    return merged


def join_perf_acq_gdfs(perf, acq, **kwargs):
    """
    Combines the Acquisition and Performance tables on loan_id
    """
    return perf.merge(acq, how="left", on=["loan_id"], type="hash")


def last_mile_cleaning(df, **kwargs):
    """
    Final cleanup to drop columns not passed to the XGBoost model for training.
    Convert all string/categorical features to numeric features.

    Returns
    ------
    Arrow Table (Host memory)
    """
    drop_list = [
        "loan_id",
        "orig_date",
        "first_pay_date",
        "seller_name",
        "monthly_reporting_period",
        "last_paid_installment_date",
        "maturity_date",
        "ever_30",
        "ever_90",
        "ever_180",
        "delinquency_30",
        "delinquency_90",
        "delinquency_180",
        "upb_12",
        "zero_balance_effective_date",
        "foreclosed_after",
        "disposition_date",
        "timestamp",
    ]
    for column in drop_list:
        df.drop_column(column)
    for col, dtype in df.dtypes.iteritems():
        if str(dtype) == "category":
            df[col] = df[col].cat.codes
        df[col] = df[col].astype("float32")
    df["delinquency_12"] = df["delinquency_12"] > 0
    df["delinquency_12"] = df["delinquency_12"].fillna(False).astype("int32")
    for column in df.columns:
        df[column] = df[column].fillna(np.dtype(str(df[column].dtype)).type(-1))
    return df.to_arrow(preserve_index=False)


def prepare_data(arrow_input):
    """
    Convert a list of arrow tables to a single GPU dataframe
    
    Returns
    -------
    GPU Dataframe
    """
    gpu_dataframes = []
    for arrow_df in arrow_input:
        gpu_dataframes.append(cudf.DataFrame.from_arrow(arrow_df))

    concat_df = cudf.concat(gpu_dataframes)
    del gpu_dataframes
    return concat_df


def xgb_training(arrow_dfs, client=None):
    """
    Convert the post ETL data to Dmatrix format for XGBoost training input.
    Train the XGBoost model.
    
    Returns
    -------
    The trained model and time taken for preparing, training data.
    """
    dxgb_gpu_params = {
        "max_depth": 8,
        "max_leaves": 2 ** 8,
        "alpha": 0.9,
        "eta": 0.1,
        "gamma": 0.1,
        "learning_rate": 0.1,
        "subsample": 1,
        "reg_lambda": 1,
        "scale_pos_weight": 2,
        "min_child_weight": 30,
        "tree_method": "gpu_hist",
        "objective": "binary:logistic",
        "grow_policy": "lossguide",
    }
    NUM_BOOST_ROUND = 100

    part_count = len(arrow_dfs)
    print(f"Preparing data for training with part count: {part_count}")
    t1 = time.time()
    tmp_map = [
        (arrow_df, list(client.who_has(arrow_df).values())[0][0])
        for arrow_df in arrow_dfs
    ]
    new_map = OrderedDict()
    for key, value in tmp_map:
        if value not in new_map:
            new_map[value] = [key]
        else:
            new_map[value].append(key)

    del (tmp_map, key, value)

    train_x_y = []
    for list_delayed in new_map.values():
        train_x_y.append(delayed(prepare_data)(list_delayed))

    del (new_map, list_delayed)

    worker_list = OrderedDict()
    for task in train_x_y:
        worker_list[task] = list(client.who_has(task).values())[0][0]

    del task

    persisted_train_x_y = []
    for task in train_x_y:
        persisted_train_x_y.append(
            client.persist(
                collections=task,
                workers=worker_list[task],
                optimize_graph=False,
                fifo_timeout="0ms",
            )
        )

    del (arrow_dfs, train_x_y, worker_list, task)

    wait(persisted_train_x_y)
    persisted_train_x_y = dask_cudf.from_delayed(persisted_train_x_y)

    dmat = xgb.dask.DaskDMatrix(
        client=client,
        data=persisted_train_x_y[
            persisted_train_x_y.columns.difference(["delinquency_12"])
        ],
        label=persisted_train_x_y[["delinquency_12"]],
        missing=-1,
    )

    del persisted_train_x_y
    gc.collect()

    dmat_time = time.time() - t1
    print("Prepared data for XGB training")

    print("Training model")
    t1 = time.time()

    print("XGB training for part_count:{}".format(part_count))
    bst = xgb.dask.train(
        client, dxgb_gpu_params, dmat, num_boost_round=NUM_BOOST_ROUND,
    )

    train_time = time.time() - t1
    print("Training complete")
    return (bst, dmat_time, train_time)


def run_etl(start_year, end_year, data_dir, client):
    """
    Driver function for the ETL step
    
    Iterates through all files in `data_dir` between `start_year` 
    and `end_year` and calls the ETL function for each file.
    
    Returns
    -------
    Dask futures to arrow tables containing post ETL data for all processed files.
    """
    print("Starting ETL")
    t1 = time.time()

    perf_data_path = data_dir + "perf/"

    gpu_dfs = []
    quarter = 1
    year = start_year
    count = 0
    while year <= end_year:
        for file in glob(
            os.path.join(
                perf_data_path + "/Performance_" + str(year) + "Q" + str(quarter) + "*"
            )
        ):
            gpu_dfs.append(
                process_quarter_gpu(
                    year=year, quarter=quarter, perf_file=file, client=client
                )
            )
            count += 1
        quarter += 1
        if quarter == 5:
            year += 1
            quarter = 1
    print("ETL for start_year:{} and end_year:{}".format(start_year, end_year))
    wait(gpu_dfs)

    etl_time = time.time() - t1

    print("ETL done!")
    return (gpu_dfs, etl_time)

#### The cell below runs the workflow end to end including the ETL and XGBoost model training step

**Notes** 

The mortgage dataset for years 2000-2016 is about 200GB of data. There are two key factors that determine the `start_year`, `end_year`, `part_count` and `use_1GB_splits` params used in the notebook for processing this data. 

_Total GPU memory_: Determines the amount of data that can be trained using XGBoost (`part_count`). The ETL is performed on one part file at a time (per GPU) whereas XGBoost training requires all the training data to be loaded in GPU memory.

_Memory per GPU_: Determines the variation of the dataset to use (1GB vs 2GB splits). The 2GB splits version of the data results in larger partitions being processed per task resulting in better utilization of the GPU, with the tradeoff of increased memory usage that can be handled by GPUs cards with greater than `32GB` of memory.

The `determine_dataset` utility used below automatically queries these two parameters based on the machine and decides suitable values for `part_count` and consequently `start_year`, `end_year`(to ensure ETL is performed on enough parts for training), as well as the variation of the dataset (1GB split part files vs 2GB split part files) that should work on such systems.

If you'd like to use existing data that has already been downloaded to your own location, or manually adjust these parameters based on the amount of data needed for processing, you can change these parameters provided in the notebook, by assigning new values to the variables or setting enivronment variables for `MORTGAGE_DATA_DIR` and `part_count`. You can visit the [RAPIDS Datasets Homepage](https://docs.rapids.ai/datasets/mortgage-data) for more information on downloading the data manually.

In [3]:
if __name__ == "__main__":

    import cudf
    import xgboost as xgb
    import dask_cudf

    cmd = "hostname --all-ip-addresses"
    process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
    output, error = process.communicate()
    IPADDR = str(output.decode()).split()[0]

    cluster = LocalCUDACluster(ip=IPADDR)
    client = Client(cluster)

    data_dir = os.environ.get("MORTGAGE_DATA_DIR", "") # Default to current working directory
    res = client.run(memory_info)
    # Total GPU memory on the system
    total_mem = sum(res.values()) 
    # Memory of a single GPU on the machine
    # If the machine has multiple GPUs of different sizes, this is the size of the smallest GPU
    min_mem = min(res.values()) 
 
    # Start year for processing mortgage data
    start_year = None
    # End year for processing mortgage data
    end_year = None
    # The number of part files to train against. 
    # If not provided, default to auto selection based on GPU memory available on the system
    part_count = os.environ.get("part_count")

    start_year, end_year, part_count, use_1GB_splits = determine_dataset(
        total_mem=total_mem, min_mem=min_mem, part_count=part_count
    )

    # Download data based on these parameters
    # The 2GB split mortgage performance files are used if the system has 32GB GPUs.
    # On machines with GPUs less than 32GB we use the 1GB split files (to help reduce memory load)
    get_data(data_dir, start_year, end_year, use_1GB_splits)

    # Initialize a GPU pool allocating 90% of GPU memory for each worker
    client.run(rmm.reinitialize, pool_allocator=True, initial_pool_size=0.9 * min_mem)
    etl_result, etl_time = run_etl(start_year, end_year, data_dir, client)

    # Clear the existing RMM pool post-ETL to make space for GPU accelerated XGBoost
    # This makes space for XGBoost to operate since it doesn't have visibility into the cuDF memory pool
    client.run(rmm.reinitialize, pool_allocator=False)

    total_file_count = len(etl_result)
    etl_result = etl_result[:part_count]  # Select subset for training
    model, dmat_time, train_time = xgb_training(etl_result, client)

    print(
        f"\nTime taken to run ETL from {start_year} to {end_year}"
        f" ({total_file_count} parts) was {round(etl_time,4)} s"
    )
    print(
        f"Time taken to prepare {len(etl_result)} parts"
        f" for XGB training {round(dmat_time,4)} s"
    )
    print(f"Time taken to train XGB model {round(train_time, 4)} s")
    print(f"Total E2E time: {round(etl_time+dmat_time+train_time, 4)} s")
    client.close()
    cluster.close()

Downloading data for year 2000
Download complete
Decompressing and extracting data
Done extracting year 2000
Downloading data for year 2001
Download complete
Decompressing and extracting data
Done extracting year 2001
Downloading data for year 2002
Download complete
Decompressing and extracting data
Done extracting year 2002
Downloading data for year 2003
Download complete
Decompressing and extracting data
Done extracting year 2003
Downloading data for year 2004
Download complete
Decompressing and extracting data
Done extracting year 2004
Starting ETL
ETL for start_year:2000 and end_year:2004
ETL done!
Preparing data for training with part count: 12
Prepared data for XGB training
Training model
XGB training for part_count:12
Training complete

Time taken to run ETL from 2000 to 2004 (108 parts) was 68.7227 s
Time taken to prepare 12 parts for XGB training 3.3915 s
Time taken to train XGB model 87.521 s
Total E2E time: 159.6352 s
