# Feature Engineering with cudf

In [None]:
from feature_engineering import feature_engineering
import cudf as dd

In [None]:
import gc

In [None]:
bureau_balance = dd.read_parquet('raw_data/bureau_balance.parquet')
bureau = dd.read_parquet('raw_data/bureau.parquet')
cc_balance = dd.read_parquet('raw_data/cc_balance.parquet')
payments = dd.read_parquet('raw_data/payments.parquet')
pc_balance = dd.read_parquet('raw_data/pc_balance.parquet')
prev = dd.read_parquet('raw_data/prev.parquet')
train = dd.read_parquet('raw_data/train.parquet')
test = dd.read_parquet('raw_data/test.parquet')

In [None]:
train.shape

In [None]:
test.shape

In [None]:
train_target = train['TARGET']
unified = dd.concat([train.drop('TARGET', axis=1), test])

In [None]:
unified.shape

In [None]:
del(train)
del(test)
gc.collect()

In [None]:
unified_feat = \
    feature_engineering(bureau_balance, bureau, cc_balance, payments, pc_balance, 
                        prev, unified)

In [None]:
del(avg_bureau)
del(sum_cc_balance)
del(sum_payments)
del(sum_pc_balance)
del(sum_prev)

In [None]:
train_feats = unified_feat.iloc[:307511].merge(train_target, how='left', 
                                               left_index=True, right_index=True)

In [None]:
test_feats = unified_feat.iloc[307511:]

In [None]:
train_feats[['TARGET', 'CODE_GENDER']].groupby('TARGET').count()

In [None]:
train_feats.to_parquet('data_eng/feats/train_feats.parquet')
test_feats.to_parquet('data_eng/feats/test_feats.parquet')

# Testing with cu-dask

In [1]:
#cudf load
from dask_cuda import LocalCUDACluster
cluster=LocalCUDACluster(n_workers=2, threads_per_worker=6 )#, protocol="ucx", enable_tcp_over_ucx=True, enable_nvlink=True,
                        #rmm_pool_size="7GB", device_memory_limit="30GB")

NOTE: enter proxy/8787/status to get the dask dashboard - issue with kernels / conda / juypter

In [2]:
from dask.distributed import Client
#import dask.dataframe as dd
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://127.0.0.1:35911  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 2  Cores: 12  Memory: 125.63 GiB


In [None]:
# when using ucx then we need something more?
# client.close()
client.close()

In [None]:
client.shutdown()

In [3]:
import cudf as dd
import dask_cudf as dc
import gc

In [4]:
### Load Data
bureau_balance = dc.read_parquet('raw_data/bureau_balance.parquet')
bureau = dc.read_parquet('raw_data/bureau.parquet')
cc_balance = dc.read_parquet('raw_data/cc_balance.parquet')
payments = dc.read_parquet('raw_data/payments.parquet')
pc_balance = dc.read_parquet('raw_data/pc_balance.parquet')
prev = dc.read_parquet('raw_data/prev.parquet')
train = dd.read_parquet('raw_data/train.parquet')
test = dd.read_parquet('raw_data/test.parquet')

train_target = dc.from_cudf(train['TARGET'], npartitions=2)
unified = dd.concat([train.drop('TARGET', axis=1), test])

train_index = train.index.to_arrow().tolist()
test_index = test.index.to_arrow().tolist()

unified_dask = dc.from_cudf(unified, npartitions=2)

#del(train)
#del(test)
#del(unified)
#gc.collect()


DEBUG SUM PAYMENTS

In [None]:
agg_func = ['mean', 'max', 'min', 'sum', 'std']

In [None]:
payments['DAYS_ENTRY_PAYMENT'] = payments['DAYS_ENTRY_PAYMENT'].fillna(0)
payments['AMT_PAYMENT'] = payments['AMT_PAYMENT'].fillna(0)
## Buld Payments
sum_payments = payments.drop('SK_ID_PREV', axis=1)
sum_payments['PAYMENT_PERC'] = sum_payments.AMT_PAYMENT / sum_payments.AMT_INSTALMENT
sum_payments['PAYMENT_PERC'] = sum_payments['PAYMENT_PERC'].fillna(0)
sum_payments['PAYMENT_DIFF'] = sum_payments.AMT_INSTALMENT - sum_payments.AMT_PAYMENT
sum_payments['DPD'] = sum_payments.DAYS_ENTRY_PAYMENT - sum_payments.DAYS_INSTALMENT
sum_payments['DBD'] = sum_payments.DAYS_INSTALMENT - sum_payments.DAYS_ENTRY_PAYMENT

# turn negatives into 0
sum_payments['DPD'] = sum_payments['DPD'].map(lambda x: x if x > 0 else 0)
sum_payments['DBD'] = sum_payments['DBD'].map(lambda x: x if x > 0 else 0)

sum_payments = sum_payments.select_dtypes('number').groupby('SK_ID_CURR') \
            .agg(agg_func)
sum_payments.columns = ["_".join(x) for x in sum_payments.columns.ravel()]

sum_payments['NUM_INSTALMENT_VERSION_std'] = sum_payments['NUM_INSTALMENT_VERSION_std'].fillna(0)
sum_payments['NUM_INSTALMENT_NUMBER_std'] = sum_payments['NUM_INSTALMENT_NUMBER_std'].fillna(0)
sum_payments['DAYS_INSTALMENT_std'] = sum_payments['DAYS_INSTALMENT_std'].fillna(0)
sum_payments['DAYS_ENTRY_PAYMENT_std'] = sum_payments['DAYS_ENTRY_PAYMENT_std'].fillna(0)
sum_payments['AMT_INSTALMENT_std'] = sum_payments['AMT_INSTALMENT_std'].fillna(0)
sum_payments['AMT_PAYMENT_std'] = sum_payments['AMT_PAYMENT_std'].fillna(0)
sum_payments['PAYMENT_PERC_std'] = sum_payments['PAYMENT_PERC_std'].fillna(0)
sum_payments['PAYMENT_DIFF_std'] = sum_payments['PAYMENT_DIFF_std'].fillna(0)
sum_payments['DPD_std'] = sum_payments['DPD_std'].fillna(0)
sum_payments['DBD_std'] = sum_payments['DBD_std'].fillna(0)

In [None]:
sum_payments.to_parquet('test')

### Debug

In [5]:
from feature_engineering_adv import *

In [6]:
agg_func = ['mean', 'max', 'min', 'sum', 'std']
print("procecssing bureau balance")
avg_bbalance = process_bureau_balance(bureau_balance, agg_func)

procecssing bureau balance


In [None]:
#avg_bbalance.columns

In [None]:
print("procecssing bureau")
## Build Avg Bureau table
avg_bureau = process_bureau(bureau, avg_bbalance)

In [None]:
#avg_bureau.columns

In [None]:
print("procecssing cc balance")

sum_cc_balance = process_cc_balance(cc_balance, agg_func)

In [None]:
print("procecssing payments")

sum_payments = process_payments(payments, agg_func)

In [None]:
sum_payments.head(10)

In [7]:
print("process unified dataset")

unified = process_unified(unified_dask)

process unified dataset


In [None]:
feats = unified \
    .merge(avg_bureau, how='left', left_index=True, right_index=True) \
    .merge(sum_cc_balance, how='left', left_index=True, right_index=True) \
    .merge(sum_payments, how='left', left_index=True, right_index=True)

In [8]:
tt = unified
for col in tt.select_dtypes('category').columns.tolist():
    tt[col] = tt[col].astype('object')
    
tt.to_parquet('test')

In [None]:
del(tt)
gc.collect()

In [None]:
print("procecssing cc balance")

sum_cc_balance = process_cc_balance(cc_balance, agg_func)


In [None]:
payments['DAYS_ENTRY_PAYMENT'] = payments['DAYS_ENTRY_PAYMENT'].fillna(0)
payments['AMT_PAYMENT'] = payments['AMT_PAYMENT'].fillna(0)

    ## Buld Payments
sum_payments = payments.drop('SK_ID_PREV', axis=1)
sum_payments['PAYMENT_PERC'] = sum_payments.AMT_PAYMENT / sum_payments.AMT_INSTALMENT
sum_payments['PAYMENT_PERC'] = sum_payments['PAYMENT_PERC'].fillna(0)
sum_payments['PAYMENT_DIFF'] = sum_payments.AMT_INSTALMENT - sum_payments.AMT_PAYMENT
sum_payments['DPD'] = sum_payments.DAYS_ENTRY_PAYMENT - sum_payments.DAYS_INSTALMENT
sum_payments['DBD'] = sum_payments.DAYS_INSTALMENT - sum_payments.DAYS_ENTRY_PAYMENT
    
sum_payments['DPD'] = sum_payments['DPD'].map(lambda x: x if x > 0 else 0)
sum_payments['DBD'] = sum_payments['DBD'].map(lambda x: x if x > 0 else 0)

In [None]:
sum_payments = sum_payments.select_dtypes('number').groupby('SK_ID_CURR') \
                .agg(agg_func)

In [None]:
sum_payments.columns = ["_".join(x) for x in sum_payments.columns.ravel()]

In [None]:
test = sum_payments.compute() #.head(10)

In [None]:
print("procecssing payments")

sum_payments = process_payments(payments, agg_func)

### End Break

In [None]:
del(feature_engineering)

In [None]:
from feature_engineering_adv import feature_engineering

In [None]:
unified_feat = feature_engineering(bureau_balance,
        bureau, cc_balance, payments, pc_balance,
        prev, unified_dask, dc, checks=False)

In [None]:
unified_dask.to_parquet('test')

In [None]:
train_feats = unified_feat.loc[train_index] #.merge(train_target, how='left', left_index=True, right_index=True)
test_feats = unified_feat.loc[test_index]

In [None]:
unified_feat #.head(10)

In [None]:
from dask import dataframe as daskd

In [None]:

#avg_bureau.to_parquet(path='data_eng/avg_bureau')
#sum_cc_balance.to_parquet(path='data_eng/sum_cc_balance')
#sum_payments.to_parquet(path='data_eng/sum_payments')
#sum_pc_balance.to_parquet(path='data_eng/sum_pc_balance')
#sum_prev.to_parquet(path='data_eng/sum_prev')

train_feats.to_parquet('data_eng/feats/train_feats.parquet')
test_feats.to_parquet('data_eng/feats/test_feats.parquet')


In [None]:
client.shutdown()