In [31]:
"""
Before running the notebook, open two terminal windows in your environment containing Dask, and then run
'dask-scheduler' in one of them, and 'dask-worker <localhost address from the output of dask-scheduler>'
in the other. 
"""

from dask.distributed import Client
client = Client('tcp://127.0.0.1:8786')

In [38]:
"""
Creating a method to prepare out dataframe for training. Note that df is intended to be a Dask dataframe.
"""
import matplotlib.pyplot as plt
import gc
global column_sets

def apply_restore_nan(df, column_sets):
    return df.apply(restore_nan_category, column_sets=column_sets, axis=1)

def restore_nan_category(series, column_sets):
#     print(series, flush=True)
    for column_set in column_sets:
        all_zero = True
        for col in column_set:
#             print(series)
            if not series[col] == 0:
                all_zero = False
        if all_zero:
            for col in column_set:
                series[col] = float('NaN')
#     del(column_sets)
#     gc.collect()
    return series 
        

def prepare_data(dfl0, test_time):
    

    cat_vars = ['B_30', 'B_38', 'D_114', 'D_116', 'D_117', 'D_120', 'D_126', 'D_63', 'D_64', 'D_66', 'D_68']
    cat_data0 = dfl0[['B_30', 'B_38', 'D_114', 'D_116', 'D_117', 'D_120', 'D_126', 'D_63', 
                               'D_64', 'D_66', 'D_68']].copy()
    
    dfl1 = dfl0.categorize(columns=['B_30', 'B_38', 'D_114', 'D_116', 'D_117', 'D_120', 'D_126', 
                                            'D_63', 'D_64', 'D_66', 'D_68'])
    
    dfl2 = dd.get_dummies(dfl1, drop_first=False, 
                               columns=['B_30', 'B_38', 'D_114', 'D_116', 'D_117', 'D_120', 'D_126', 'D_63', 
                                        'D_64','D_66', 'D_68'])
    
    
    column_sets = []
    column_sets.append([col for col in dfl2.columns if 'B_30_' in col])
    column_sets.append([col for col in dfl2.columns if 'B_38_' in col])
    column_sets.append([col for col in dfl2.columns if 'D_114_' in col])
    column_sets.append([col for col in dfl2.columns if 'D_116_' in col])
    column_sets.append([col for col in dfl2.columns if 'D_117_' in col])
    column_sets.append([col for col in dfl2.columns if 'D_120_' in col])
    column_sets.append([col for col in dfl2.columns if 'D_126_' in col])
    column_sets.append([col for col in dfl2.columns if 'D_63_' in col])
    column_sets.append([col for col in dfl2.columns if 'D_64_' in col])
    column_sets.append([col for col in dfl2.columns if 'D_66_' in col])
    column_sets.append([col for col in dfl2.columns if 'D_68_' in col])

    
    """
    To ensure that data_onehot will have the desired shape when we use this function at test time.
    For now, our model simply ignores categorical data, which 'if test_time' simply results in a pass.
    """
#     https://stackoverflow.com/questions/41335718/keep-same-dummy-variable-in-training-and-testing-data
    if test_time:
        dfl3 = dfl2
#         _,data_onehot = train_data_onehot.align(df_onehot, join='outer', axis=1, fill_value=0)
        #have to fix this df = df_onehot.reindex(columns = train_data_onehot.columns, fill_value=float('NaN'))
    else:
        dfl3 = dfl2
        
    #Assigning NaN values back to the columns representing the categorical variables that had NaN originally
    dfl4 = dfl3.map_partitions(apply_restore_nan, column_sets=column_sets)
    
    # Setting the index to 'customer_ID' will help us do the following calculations
    dfl5 = dfl4.set_index('customer_ID')
    num_columns = []
    cat_columns = ['B_30_', 'B_38_', 'D_114_', 'D_116_', 'D_117_', 'D_120_', 
                   'D_126_', 'D_63_', 'D_64_', 'D_66_', 'D_68_']
    for key in list(dfl5.columns):
        if key[0:5] in [col[0:5] for col in cat_columns]:
            pass
        else:
            num_columns.append(key)
    num_columns.remove('S_2')  #Datetime; will have to remove this line once model becomes more sophisticated

    valtype_dict = {}
    for key in num_columns:
        if not key == 'customer_ID' and not key == 'S_2':
            valtype_dict[key] = 'mean'

    #removing all categorical columns (obviously we want to change this soon)
    dfl6 = dfl5[num_columns]

    dfl7 = dfl6.astype(float)
    
    #Generating some simple features by inserting the average value in each column for each customer
    dfl8 = dfl7.groupby("customer_ID").agg(valtype_dict)

    return dfl8

In [39]:
"""
Wrapper function to train the model. Uses https://xgboost.readthedocs.io/en/stable/tutorials/dask.html
"""


import xgboost as xgb
import dask.array as da
import dask.distributed
import pandas as pd
import dask.dataframe as dd


def train_xgboost(train_data_onehot, train_labels, n_boost_rounds, client):

    dtrain = xgb.dask.DaskDMatrix(client, train_data_onehot, train_labels)

    output = xgb.dask.train(
        client,
        {"verbosity": 2, "tree_method": "hist", "objective": "reg:squarederror"},
        dtrain,
        num_boost_round=n_boost_rounds,
        evals=[(dtrain, "train")],
    )
    return output

In [34]:
"""
Uncomment here and comment out the 'train_data == dd.read_csv' line below if 
testing with 10,000 line subset of training data.
"""

train_data = pd.read_csv('./train_data.csv', nrows=10000)
train_data = dd.from_pandas(train_data, npartitions=20)

In [35]:
"""
Processing the data and training the model
"""

import pandas as pd
import dask.dataframe as dd

# Uncomment below if using the full training dataset. Additionally, experiment with increasing blocksize above 25e6.
# train_data = dd.read_csv('./train_data.csv', blocksize=25e6)
train_data2 = train_data
train_labels = pd.read_csv('./train_labels.csv', index_col='customer_ID')

train_data_prepared = prepare_data(train_data2, False)


# We do the merge below in order to ensure that the partitions of the dask dataframes line up
train_data_onehot_and_labels = dd.merge(train_data_prepared, train_labels) #, left_index=True, right_index=True)
X_train = train_data_onehot_and_labels.drop(columns=['target'])
y_train = train_data_onehot_and_labels['target']
output = train_xgboost(X_train, y_train, 20, client) #third arg is number of training iterations I think


In [36]:
import pickle 

# pickle.dump( output, open( "./Models/model_trained_on_10k_subset.p", "wb" ) )

In [None]:
"""
ToDo: Now we start writing code to load the model trained above, and then ultimately output a submission .csv file
containing our predictions over the test dataset.
"""
import dask.dataframe as dd


model_path = './Models/model1.p' 
file = open(model_path, 'rb')
model_output = pickle.load(file)

test_data = dd.read_csv('./train_data.csv', blocksize=25e6)
prepared_test_data = prepare_data(test_data, True)

dtest = xgb.dask.DaskDMatrix(client, prepared_test_data)
prediction = xgb.dask.predict(client, model_output['booster'], dtest)