In [1]:
from global_funcs import load_program_config
# Standard Libraries
import shutil
import pathlib

# External Dependencies
import cupy as cp
import numpy as np
import cudf
import dask_cudf
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
from dask.utils import parse_bytes
from dask.delayed import delayed
import rmm

# NVTabular
import nvtabular as nvt
import nvtabular.ops as ops
from nvtabular.io import Shuffle
from nvtabular.utils import device_mem_size


In [2]:
print("loading configs...")
configs = load_program_config()

loading configs...


In [3]:
print("setting variables...")
output_dir = configs['output_dir']
project_name = configs['project_name']
input_col_name = configs['input_col_name']
label_col_name = configs['label_col_name']
random_seed = configs['random_seed']
data_splits = configs['data_splits']
base_col_names = configs['base_col_names']

setting variables...


In [4]:
base_in_pathname = f"{output_dir}/{project_name}/data/{project_name}"
base_out_pathname = f"{output_dir}/{project_name}/nvtab"

In [5]:
dict_dtypes = {}
dict_dtypes[input_col_name] = np.int64
dict_dtypes[label_col_name] = np.float32

In [6]:
# define some information about where to get our data
dask_workdir = pathlib.Path(base_out_pathname, "dask", "workdir")
stats_path = pathlib.Path(base_out_pathname, "dask", "stats")

# Make sure we have a clean worker space for Dask
if pathlib.Path.is_dir(dask_workdir):
    shutil.rmtree(dask_workdir)
dask_workdir.mkdir(parents=True)

# Make sure we have a clean stats space for Dask
if pathlib.Path.is_dir(stats_path):
    shutil.rmtree(stats_path)
stats_path.mkdir(parents=True)

# Get device memory capacity
capacity = device_mem_size(kind="total")

In [7]:

# Deploy a Single-Machine Multi-GPU Cluster
protocol = "tcp"  # "tcp" or "ucx"
visible_devices = "0"  # Delect devices to place workers
device_spill_frac = 0.5  # Spill GPU-Worker memory to host at this limit.
# Reduce if spilling fails to prevent
# device memory errors.
cluster = None  # (Optional) Specify existing scheduler port
if cluster is None:
    cluster = LocalCUDACluster(
        protocol=protocol,
        CUDA_VISIBLE_DEVICES=visible_devices,
        local_directory=dask_workdir,
        device_memory_limit=capacity * device_spill_frac,
    )

# Create the distributed client
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://127.0.0.1:42453  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 1  Cores: 1  Memory: 31.21 GiB


In [8]:
# Initialize RMM pool on ALL workers
def _rmm_pool():
    rmm.reinitialize(
        pool_allocator=True,
        initial_pool_size=None,  # Use default size
    )


client.run(_rmm_pool)

{'tcp://127.0.0.1:36905': None}

In [9]:
cat_features = [input_col_name] >> nvt.ops.Categorify() >>  ops.ListSlice(start=0, end=150, pad=True, pad_value=0)
# labels = [label_col_name] >> nvt.ops.Categorify()
output = cat_features + label_col_name


In [10]:
nvt.__version__

'1.1.1'

In [11]:
workflow = nvt.Workflow(output)

In [12]:
# [input_col_name]  >> (lambda col: nvt.ops.Categorify(dtype=np.int64))

In [13]:
shuffle= nvt.io.Shuffle.PER_PARTITION

In [14]:


# all_data = dask_cudf.read_parquet(base_in_pathname)
# all_iter = nvt.Dataset(all_data, part_size="100MB", engine='parquet')
# workflow.fit(all_iter)

In [15]:
# cat_features = [input_col_name] >> nvt.ops.Categorify() >>  ops.ListSlice(start=0, end=150, pad=True, pad_value=0)
# output = cat_features + label_col_name
# workflow = nvt.Workflow(output)
workflow.fit(nvt.Dataset(f"{base_in_pathname}_train", engine='parquet'))
# workflow.transform(nvt.Dataset(f"{base_in_pathname}_train", engine='parquet')).to_parquet(output_path=f"{base_out_pathname}/train")
# del workflow

<nvtabular.workflow.workflow.Workflow at 0x7f8d818ff090>

In [16]:
# cat_features = [input_col_name] >> nvt.ops.Categorify() >>  ops.ListSlice(start=0, end=150, pad=True, pad_value=0)
# output = cat_features + label_col_name
# workflow = nvt.Workflow(output)
# workflow.fit(nvt.Dataset(f"{base_in_pathname}_val", engine='parquet'))
# # workflow.transform(nvt.Dataset(f"{base_in_pathname}_val", engine='parquet')).to_parquet(output_path=f"{base_out_pathname}/val")
# # del workflow

In [17]:
# cat_features = [input_col_name] >> nvt.ops.Categorify() >>  ops.ListSlice(start=0, end=150, pad=True, pad_value=0)
# output = cat_features + label_col_name
# workflow = nvt.Workflow(output)
# workflow.fit(nvt.Dataset(f"{base_in_pathname}_test", engine='parquet'))
# # workflow.transform(nvt.Dataset(f"{base_in_pathname}_test", engine='parquet')).to_parquet(output_path=f"{base_out_pathname}/test")
# # del workflow

In [18]:
# workflow.fit(merlin.io.Dataset(TRAIN_PATH))

In [19]:
# # workflow = nvt.Workflow(output)

# train_data = dask_cudf.read_parquet(f"{base_in_pathname}_train")
# train_iter = nvt.Dataset(train_data, part_size="100MB", engine='parquet')
# # workflow.fit(train_iter)

# workflow.transform(train_iter).to_parquet(
#     output_path=f"{base_out_pathname}/train",
#     shuffle=shuffle,
#     # out_files_per_proc=out_files_per_proc,
#     cats=[input_col_name],
#     labels=[label_col_name],
# )

In [20]:
# # workflow = nvt.Workflow(output)

# val_data = dask_cudf.read_parquet(f"{base_in_pathname}_val")
# valid_iter = nvt.Dataset(val_data, part_size="100MB", engine='parquet')
# # workflow.fit(valid_iter)

# workflow.transform(valid_iter).to_parquet(
#     output_path=f"{base_out_pathname}/val",
#     # shuffle=shuffle,
#     # out_files_per_proc=out_files_per_proc,
#     cats=[input_col_name],
#     labels=[label_col_name],
# )

In [21]:
# # workflow = nvt.Workflow(output)

# test_data = dask_cudf.read_parquet(f"{base_in_pathname}_test")
# test_iter = nvt.Dataset(test_data, part_size="100MB", engine='parquet')
# # workflow.fit(test_iter)

# workflow.transform(test_iter).to_parquet(
#     output_path=f"{base_out_pathname}/test",
#     # shuffle=shuffle,
#     # out_files_per_proc=out_files_per_proc,
#     cats=[input_col_name],
#     labels=[label_col_name],
# )

In [22]:
# all_data = dask_cudf.read_parquet(base_in_pathname)
# all_iter = nvt.Dataset(all_data, part_size="100MB", engine='parquet')
# workflow.fit(all_iter)

In [23]:
# train_data = dask_cudf.read_parquet(f"{base_in_pathname}_train")
# train_iter = nvt.Dataset(train_data, part_size="100MB", engine='parquet')
# val_data = dask_cudf.read_parquet(f"{base_in_pathname}_val")
# valid_iter = nvt.Dataset(val_data, part_size="100MB", engine='parquet')
# test_data = dask_cudf.read_parquet(f"{base_in_pathname}_test")
# test_iter = nvt.Dataset(test_data, part_size="100MB", engine='parquet')

In [24]:
# shuffle = Shuffle.PER_WORKER

In [25]:
# out_files_per_proc = 4

In [26]:
workflow.save(f"{base_out_pathname}/workflow")

In [27]:
workflow.transform(nvt.Dataset(f"{base_in_pathname}_train", engine='parquet')).to_parquet(
    output_path=f"{base_out_pathname}/train",
    shuffle=shuffle,
    cats=[input_col_name],
    labels=[label_col_name],
)

In [28]:
workflow.transform(nvt.Dataset(f"{base_in_pathname}_val", engine='parquet')).to_parquet(
    output_path=f"{base_out_pathname}/val",
    shuffle=None,
    out_files_per_proc=None,
    cats=[input_col_name],
    labels=[label_col_name],
)

In [29]:
workflow.transform(nvt.Dataset(f"{base_in_pathname}_test", engine='parquet')).to_parquet(
    output_path=f"{base_out_pathname}/test",
    shuffle=None,
    out_files_per_proc=None,
    cats=[input_col_name],
    labels=[label_col_name],
)

In [30]:
client.shutdown()
cluster.close()

