In [1]:
# Copyright 2020 NVIDIA Corporation. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

In [2]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "6"
from time import time
import re
import glob
import warnings

# tools for data preproc/loading
import torch
import rmm
import nvtabular as nvt
from nvtabular.ops import Normalize,  Categorify,  LogOp, FillMissing, Clip, LambdaClassOp, get_embedding_sizes
from nvtabular.loader.torch import TorchAsyncItr, DLDataLoader
from nvtabular.utils import device_mem_size
import cudf

# tools for training
from fastai.basics import Learner
from fastai.tabular.model import TabularModel
from fastai.tabular.data import TabularDataLoaders
from fastai.metrics import accuracy

In [3]:
# define some information about where to get our data
INPUT_DATA_DIR = os.environ.get('INPUT_DATA_DIR', '/raid/criteo/tests/crit_int_pq')
OUTPUT_DATA_DIR = os.environ.get('OUTPUT_DATA_DIR', '/raid/criteo/tests/newmu') # where we'll save our procesed data to
BATCH_SIZE = int(os.environ.get('BATCH_SIZE', 32768))
PARTS_PER_CHUNK = int(os.environ.get('PARTS_PER_CHUNK', 10))
SHUFFLE = True
NUM_TRAIN_DAYS = 23 # number of days worth of data to use for training, the rest will be used for validation

# define our dataset schema
CONTINUOUS_COLUMNS = ['I' + str(x) for x in range(1,14)]
CATEGORICAL_COLUMNS =  ['C' + str(x) for x in range(1,27)]
LABEL_COLUMNS = ['label']
COLUMNS = CONTINUOUS_COLUMNS + CATEGORICAL_COLUMNS + LABEL_COLUMNS

In [4]:
fname = 'day_{}.parquet'
num_days = len([i for i in os.listdir(INPUT_DATA_DIR) if re.match(fname.format('[0-9]{1,2}'), i) is not None])
train_paths = [os.path.join(INPUT_DATA_DIR, fname.format(day)) for day in range(NUM_TRAIN_DAYS)][:1]
valid_paths = [os.path.join(INPUT_DATA_DIR, fname.format(day)) for day in range(NUM_TRAIN_DAYS, num_days)][:1]

In [5]:
class WritePartChunk:
    """
    Write one partition before it has been collated into a chunk.
    """
    def _exec(self, gdf):
        """
        Ensure only one parition represented.
        """
        self.f_name = open("./rando/part_chunk.txt", "a+")
        if gdf["part_idx"].min() == gdf["part_idx"].max():
            part_idx = gdf["part_idx"][0]
            self.f_name.write(f"{part_idx} {gdf.shape[0]} \n")
        return gdf
    def __del__(self):
        self.f_name.close()
        
class WritePart:
    """
    Write one chunk at a time, consisting of multiple partitions
    """
    def __init__(self, in_dir, col_name="part_idx", group="batch", extra_cols=None, export=True):
        self.count = 0
        self.col_name = col_name
        self.group = group
        self.in_dir = in_dir
        self.export = export
        self.extra_cols = extra_cols
    
    def _exec(self, gdf, columns_ctx):
        gdf[self.col_name] = self.count
        if 'label' not in columns_ctx["final"]["ctx"]:
            columns_ctx["final"]["ctx"]['label'] = []
        if self.col_name not in columns_ctx["final"]["ctx"]['label']:
            columns_ctx["final"]["ctx"]['label'].append(self.col_name)
        self.count = self.count + 1
        return gdf

class WriteCollectParts:
    def __init__(self, in_dir, col_name="part_idx", group="batch", export=True, extra_cols = []):
        self.count = 0
        self.col_name = col_name
        self.group = group
        self.in_dir = in_dir
        self.export = export
        self.extra_cols = extra_cols
    
    def _exec(self, gdf):
        gdf[self.col_name] = self.count
        if self.export:
            cols = self.extra_cols.append(self.col_name)
            file_name = f"{self.group}_{self.count}.parquet"
            file_path = os.path.join(self.in_dir, file_name)
            gdf[cols].to_parquet(file_path)
        self.count = self.count + 1
        return gdf

In [6]:
cb = {}
bb = WritePart("/raid/criteo/tests/dump", col_name="part_origin", group="origin", export=False)

In [7]:
proc = nvt.Workflow(
    cat_names=CATEGORICAL_COLUMNS,
    cont_names=CONTINUOUS_COLUMNS,
    label_name=LABEL_COLUMNS)

# log -> normalize continuous features. Note that doing this in the opposite
# order wouldn't make sense! Note also that we're zero filling continuous
# values before the log: this is a good time to remember that LogOp
# performs log(1+x), not log(x)
proc.add_feature(LambdaClassOp(bb))
proc.add_cont_feature([FillMissing(), Clip(min_value=0), LogOp()])
proc.add_cont_preprocess(Normalize())

# categorification with frequency thresholding
proc.add_cat_preprocess(Categorify(freq_threshold=15, columns=CATEGORICAL_COLUMNS, out_path=OUTPUT_DATA_DIR))

In [8]:
output_train_dir = os.path.join(OUTPUT_DATA_DIR, 'train/')
output_valid_dir = os.path.join(OUTPUT_DATA_DIR, 'valid/')

In [9]:
train_dataset = nvt.Dataset(train_paths, engine='parquet', part_mem_fraction=0.15, callbacks=cb)
valid_dataset = nvt.Dataset(valid_paths, engine='parquet', part_mem_fraction=0.15, callbacks=cb)

In [12]:
%%time
proc.apply(train_dataset, shuffle=nvt.io.Shuffle.PER_PARTITION, output_path=output_train_dir, out_files_per_proc=40)

CPU times: user 44.1 s, sys: 43.6 s, total: 1min 27s
Wall time: 1min 28s


In [11]:
%%time
proc.apply(valid_dataset, record_stats=False, shuffle=nvt.io.Shuffle.PER_PARTITION, output_path=output_valid_dir, out_files_per_proc=40)

CPU times: user 25.6 s, sys: 31.6 s, total: 57.2 s
Wall time: 58 s


In [None]:
output_train_dir = os.path.join(OUTPUT_DATA_DIR, 'train/')
output_valid_dir = os.path.join(OUTPUT_DATA_DIR, 'valid/')

In [None]:
rmm.reinitialize(pool_allocator=True, initial_pool_size=int(0.3 * device_mem_size(kind='free')/256) * 256)

In [None]:
train_paths = glob.glob(os.path.join(output_train_dir, "*.parquet"))
valid_paths = glob.glob(os.path.join(output_valid_dir, "*.parquet"))

In [None]:
train_data = nvt.Dataset(train_paths, engine="parquet", part_mem_fraction=0.04/PARTS_PER_CHUNK)
valid_data = nvt.Dataset(valid_paths, engine="parquet", part_mem_fraction=0.04/PARTS_PER_CHUNK)

In [None]:
class WritePartChunk:
    """
    Write one partition before it has been collated into a chunk.
    """
    def _exec(self, gdf):
        """
        Ensure only one parition represented.
        """
        self.f_name = open("./rando/part_chunk.txt", "a+")
        if gdf["part_idx"].min() == gdf["part_idx"].max():
            part_idx = gdf["part_idx"][0]
            self.f_name.write(f"{part_idx} {gdf.shape[0]} \n")
        return gdf
    def __del__(self):
        self.f_name.close()
        

callbacks = {}
callbacks["PART_CHUNK"] = [WritePartChunk()]
callbacks["BATCH_GET"] = [WriteCollectParts("/raid/criteo/tests/dump", extra_cols=["part_origin"])]


train_data_itrs = TorchAsyncItr(
    train_data,
    batch_size=BATCH_SIZE,
    cats=CATEGORICAL_COLUMNS,
    conts=CONTINUOUS_COLUMNS,
    labels=LABEL_COLUMNS,
    parts_per_chunk=PARTS_PER_CHUNK,
    callbacks=callbacks,
    shuffle=SHUFFLE,
)
valid_data_itrs = TorchAsyncItr(
    valid_data,
    batch_size=BATCH_SIZE,
    cats=CATEGORICAL_COLUMNS,
    conts=CONTINUOUS_COLUMNS,
    labels=LABEL_COLUMNS,
    parts_per_chunk=PARTS_PER_CHUNK,
    callbacks=callbacks,
    shuffle=SHUFFLE,
)

In [None]:
def gen_col(batch):
    return (batch[0], batch[1], batch[2].long())

In [None]:
train_dataloader = DLDataLoader(train_data_itrs, collate_fn=gen_col, batch_size=None, pin_memory=False, num_workers=0)
valid_dataloader = DLDataLoader(valid_data_itrs, collate_fn=gen_col, batch_size=None, pin_memory=False, num_workers=0)
databunch = TabularDataLoaders(train_dataloader, valid_dataloader)

In [None]:
embeddings = [(7599500, 16),
 (5345303, 16),
 (561810, 16),
 (242827, 16),
 (11, 6),
 (2209, 16),
 (10616, 16),
 (100, 16),
 (4, 3),
 (968, 16),
 (15, 7),
 (33521, 16),
 (7838519, 16),
 (2580502, 16),
 (6878028, 16),
 (298771, 16),
 (11951, 16),
 (97, 16),
 (35, 12),
 (17022, 16),
 (7339, 16),
 (20046, 16),
 (4, 3),
 (7068, 16),
 (1377, 16),
 (63, 16)]

In [None]:
model = TabularModel(emb_szs=embeddings, n_cont=len(CONTINUOUS_COLUMNS), out_sz=2, layers=[512, 256]).cuda()
learn =  Learner(databunch, model, loss_func = torch.nn.CrossEntropyLoss(), metrics=[accuracy])

In [None]:
from fastai.callback.schedule import fit_one_cycle

In [None]:
learning_rate = 1.32e-2
epochs = 1
start = time()
#learn.fit(epochs, learning_rate)
fit_one_cycle(learn, n_epoch=epochs, lr_max=learning_rate)
t_final = time() - start
print(t_final)