In [1]:
import os
GPU_id = 2
os.environ['CUDA_VISIBLE_DEVICES'] = str(GPU_id)

In [21]:
import warnings
warnings.filterwarnings("ignore")
import cudf
import pandas as pd
import time
import nvstrings
import rmm
import ds_itr.ds_iterator as di
import sys
try:
    import cupy as cp
except ImportError:
    import numpy as cp

In [3]:
cudf.__version__

'0.11.0a+1941.g3efd22b.dirty'

In [4]:
!pwd

/home/nfs/rzamora/workspace/rapids-dl/ds-itr/examples


### Functions

In [5]:
def on_gpu(words,func,arg=None,dtype=cp.int32):
    res = rmm.device_array(words.size(), dtype=dtype)
    if arg is None:
        cmd = 'words.%s(res.device_ctypes_pointer.value)'%(func)
    else:
        cmd = 'words.%s(arg,res.device_ctypes_pointer.value)'%(func)
    eval(cmd)
    return res

In [6]:
def randomize_split(gdf, num_splits=5, save_path=''):
    if len(gdf) < num_splits:
        return
    # grab all files in save_path
    # get batch size
    gdf = gdf.reindex()
    batch = len(gdf) // num_splits
    for i in range(0, len(gdf), batch):
        # append batch to file set
        gdf[i:i+batch].to_parquet(save_path)

In [23]:
def merging(gdf1, gdf2_location, on, how='inner', file=False, gpu_memory_frac=0.5, gdf1_pri=False, clean_up=True):
    itrs = []
    if file:
        r_fi  = [gdf2_location]
    else:
        r_fi = [gdf2_location + x for x in os.listdir(gdf2_location) if x.endswith(".parquet")]
    itr = di.GPUDatasetIterator(r_fi, gpu_memory_frac=gpu_memory_frac, use_row_groups=True)
    new_data_gd = cudf.DataFrame()
    for item in itr:
        data_temp_gd = item.merge(gdf1, on=on, how=how) if not gdf1_pri else gdf1.merge(item, on=on, how=how)
        new_data_gd = cudf.concat([new_data_gd, data_temp_gd], axis=0) if new_data_gd else data_temp_gd
        del item, data_temp_gd
    del itr
    del gdf1
    if clean_up:
        for fi in r_fi:
            os.remove(fi)
    return new_data_gd

### Read data

In [8]:
if os.path.exists('cache')==False:
    os.mkdir('cache')

In [9]:
path = '/datasets/trivago/data/'

### cudf read csv

In [10]:
%%time
train  = cudf.read_csv('%s/train.csv'%path)
test = cudf.read_csv('%s/test.csv'%path)

print(train.shape, test.shape)
data_gd = cudf.concat([train, test])
del train, test
submission_gd = cudf.read_csv('%s/submission_popular.csv'%path)

(15932992, 12) (3782335, 12)
CPU times: user 3.06 s, sys: 1.96 s, total: 5.02 s
Wall time: 5.6 s


In [11]:
%%time
data_gd['is_click_out'] = on_gpu(data_gd['action_type'].data,'compare',arg='clickout item')
data_gd['is_click_out'] = data_gd['is_click_out']==0 # 0 means string match
data_gd['is_click_out'] = data_gd['is_click_out'].astype('bool')
data_gd = data_gd[data_gd['is_click_out']]

data_gd.drop_column('is_click_out')
print("# of clickouts:",data_gd.shape[0])
data_gd['clickout_missing'] = data_gd['reference'].isnull()

print('true test',data_gd[data_gd['clickout_missing']].shape)
assert submission_gd.shape[0] == data_gd[data_gd['clickout_missing']].shape[0]
print('true test shape match submission shape')

# of clickouts: 2115365
true test (253573, 13)
true test shape match submission shape
CPU times: user 1.87 s, sys: 1.58 s, total: 3.45 s
Wall time: 4.06 s


In [12]:
%%time
data_gd['row_id'] = cp.arange(data_gd.shape[0])

CPU times: user 120 ms, sys: 8 ms, total: 128 ms
Wall time: 149 ms


### Exporting, over 10GB GPU memory Limit

In [14]:
%%time
data_gd.to_parquet('data_gd/', chunk_size=100000)
del data_gd

CPU times: user 4.24 s, sys: 688 ms, total: 4.93 s
Wall time: 9.1 s


In [19]:
%%time
import os
r_fi = ['data_gd/' + x for x in os.listdir('data_gd/') if x.endswith(".parquet")]
data_itr = di.GPUDatasetIterator(r_fi, gpu_memory_frac=0.01, use_row_groups=True)

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 592 µs


In [24]:
%%time
num_ones = 0
for data_gd in data_itr:
    data_gd['row_id'] = cp.arange(data_gd.shape[0])
    #----------
    # pull apart impressions and prices
    #----------
    candidates_gd = data_gd['impressions'].data.split('|')
    prices_gd = data_gd['prices'].data.split('|')
    data_gd.drop_column('impressions')
    data_gd.drop_column('prices')
    data_gd_rec_list = data_gd[['row_id']].to_pandas()
    #----------
    # map prices to items
    #----------
    for i in range(len(candidates_gd)):
        data_gd_rec_list['item_%d'%i] = candidates_gd[i].to_host()
        data_gd_rec_list['price_%d'%i] = prices_gd[i].to_host()
    del prices_gd, candidates_gd
    data_gd_rec_list = data_gd_rec_list.set_index('row_id')
    #----------
    # pull item specific information
    #----------
    cols = [i for i in data_gd_rec_list.columns if i.startswith('item_')]
    items = data_gd_rec_list[cols].stack().reset_index()
    items.columns = ['row_id','candidate_order','item_id']
    #----------
    # get pricing info for items
    #----------
    cols = [i for i in data_gd_rec_list.columns if i.startswith('price_')]
    prices = data_gd_rec_list[cols].stack().reset_index()
    del data_gd_rec_list
    #----------
    # combine items and prices dataframes 
    #----------
    prices.columns = ['row_id','candidate_order','price']
    items['price'] = prices['price'].astype(int)
    del prices
    items['candidate_order'] = items['candidate_order'].apply(lambda x:x.split('_')[1]).astype(int)
    count = items['row_id'].value_counts()
    items['row_id_count'] = items['row_id'].map(count)
    items = items[items['row_id_count']>1]
    #----------
    # export items dataframe to parquet
    #----------
    print('items export')
    items.to_parquet('items.parquet')
    del items
    data_gd['clickout_missing'] = data_gd['clickout_missing'].astype(int)
    print('merging with items...')
    #----------
    # merge main dataframe chunk with items 
    #----------
    data_gd = merging(data_gd, 'items.parquet', 'row_id', how='left', file=True, gpu_memory_frac=0.10, clean_up=False)
    print('merging with items binary meta...')
    data_gd['item_id'] = data_gd['item_id'].astype(int)
    #----------
    # merge main dataframe chunk with items binary feature data
    #----------
    data_pair_gd = data_gd
    print('done with concatenation')
#######
    data_pair_gd['reference'] = data_pair_gd['reference'].astype(int)
    data_pair_gd['item_id'] = data_pair_gd['item_id'].astype(int)
    data_pair_gd['target'] = data_pair_gd['reference'] == data_pair_gd['item_id']
    data_pair_gd['target'] = data_pair_gd['target'].astype(int)
    print('splitting')

    train_pair_gd = data_pair_gd[data_pair_gd.clickout_missing==0]
    test_pair_gd = data_pair_gd[data_pair_gd.clickout_missing>0]
    del data_pair_gd
    train_pair_gd['is_va'] = train_pair_gd.row_id%5 == 0
    train_pair = train_pair_gd[train_pair_gd.is_va==0]
    valid_pair = train_pair_gd[train_pair_gd.is_va>0]
    del train_pair_gd
    train_pair = train_pair.drop(['is_va'])
    valid_pair = valid_pair.drop(['is_va'])
    ###
    
    ###
    print('exporting')
    print(train_pair.shape, valid_pair.shape, test_pair_gd.shape)
# add the shuffle and split function here
    if len(valid_pair)>0:
        randomize_split(valid_pair, save_path='cache/valid/')
        print('valid')
        sys.stdout.flush()
    if len(test_pair_gd)>0:
        print('test')
        sys.stdout.flush()
        randomize_split(test_pair_gd, save_path='cache/test/')
    if len(train_pair)>0:
        randomize_split(train_pair, save_path='cache/train/')
        print('train')
        sys.stdout.flush()
    del valid_pair, train_pair, test_pair_gd

items export
merging with items...
merging with items binary meta...
done with concatenation
splitting
exporting
(3672953, 17) (917655, 17) (0, 17)
valid
train
items export
merging with items...
merging with items binary meta...
done with concatenation
splitting
exporting
(3664951, 17) (916957, 17) (0, 17)
valid
train
items export
merging with items...
merging with items binary meta...
done with concatenation
splitting
exporting
(3669690, 17) (917480, 17) (0, 17)
valid
train
items export
merging with items...
merging with items binary meta...
done with concatenation
splitting
exporting
(3663781, 17) (915556, 17) (0, 17)
valid
train
items export
merging with items...
merging with items binary meta...
done with concatenation
splitting
exporting
(3670316, 17) (917173, 17) (0, 17)
valid
train
items export
merging with items...
merging with items binary meta...
done with concatenation
splitting
exporting
(3665316, 17) (915441, 17) (0, 17)
valid
train
items export
merging with items...
mergi