In [None]:
# Copyright 2021 NVIDIA Corporation. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

# 1.Overview

In this notebook, we want to provide an tutorial how to train a wdl model using HugeCTR High-level python API. We will use original Criteo dataset as training data

1. Overview
2. Dataset Preprocessing
3. WDL Model Training
4. Save the Model Files

# 2. Dataset Preprocessing
## 2.1 Generate training and validation data folders

In [1]:
# define some data folder to store the original and preprocessed data
# Standard Libraries
import os
from time import time
import re
import shutil
import glob
import warnings
BASE_DIR = "/wdl_train"
train_path  = os.path.join(BASE_DIR, "train")
val_path = os.path.join(BASE_DIR, "val")
CUDA_VISIBLE_DEVICES = os.environ.get("CUDA_VISIBLE_DEVICES", "0")
n_workers = len(CUDA_VISIBLE_DEVICES.split(","))
frac_size = 0.15
allow_multi_gpu = False
use_rmm_pool = False
max_day = None  # (Optional) -- Limit the dataset to day 0-max_day for debugging

if os.path.isdir(train_path):
    shutil.rmtree(train_path)
os.makedirs(train_path)

if os.path.isdir(val_path):
    shutil.rmtree(val_path)
os.makedirs(val_path)

In [2]:
ls -l $train_path

total 0


## 2.2 Download the Original Criteo Dataset

In [2]:
!apt-get install wget

Reading package lists... Done
Building dependency tree       
Reading state information... Done
wget is already the newest version (1.20.3-1ubuntu1).
0 upgraded, 0 newly installed, 0 to remove and 4 not upgraded.


In [None]:
!wget -P $train_path https://storage.googleapis.com/criteo-cail-datasets/day_0.gz

In [3]:
#Download the split data set to training and validation 
#!gzip -d -c $train_path/day_0.gz > day_0
!head -n 45840617 day_0 > $train_path/train.txt
!tail -n 2000000 day_0 > $val_path/test.txt 

## 2.3 Preprocessing by NVTabular

In [45]:
%%writefile /wdl_train/preprocess.py
import os
import sys
import argparse
import glob
import time
from cudf.io.parquet import ParquetWriter
import numpy as np
import pandas as pd
import concurrent.futures as cf
from concurrent.futures import as_completed
import shutil

import dask_cudf
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
from dask.utils import parse_bytes
from dask.delayed import delayed

import cudf
import rmm
import nvtabular as nvt
from nvtabular.io import Shuffle
from nvtabular.utils import device_mem_size
from nvtabular.ops import Categorify, Clip, FillMissing, HashBucket, LambdaOp, Normalize, Rename, Operator, get_embedding_sizes
#%load_ext memory_profiler

import logging
logging.basicConfig(format='%(asctime)s %(message)s')
logging.root.setLevel(logging.NOTSET)
logging.getLogger('numba').setLevel(logging.WARNING)
logging.getLogger('asyncio').setLevel(logging.WARNING)

# define dataset schema
CATEGORICAL_COLUMNS=["C" + str(x) for x in range(1, 27)]
CONTINUOUS_COLUMNS=["I" + str(x) for x in range(1, 14)]
LABEL_COLUMNS = ['label']
COLUMNS =  LABEL_COLUMNS + CONTINUOUS_COLUMNS +  CATEGORICAL_COLUMNS
#/samples/criteo mode doesn't have dense features
criteo_COLUMN=LABEL_COLUMNS +  CATEGORICAL_COLUMNS
#For new feature cross columns
CROSS_COLUMNS = []


NUM_INTEGER_COLUMNS = 13
NUM_CATEGORICAL_COLUMNS = 26
NUM_TOTAL_COLUMNS = 1 + NUM_INTEGER_COLUMNS + NUM_CATEGORICAL_COLUMNS


# Initialize RMM pool on ALL workers
def setup_rmm_pool(client, pool_size):
    client.run(rmm.reinitialize, pool_allocator=True, initial_pool_size=pool_size)
    return None

#compute the partition size with GB
def bytesto(bytes, to, bsize=1024):
    a = {'k' : 1, 'm': 2, 'g' : 3, 't' : 4, 'p' : 5, 'e' : 6 }
    r = float(bytes)
    return bytes / (bsize ** a[to])

class FeatureCross(Operator):
    def __init__(self, dependency):
        self.dependency = dependency

    def transform(self, columns, gdf):
        new_df = type(gdf)()
        for col in columns.names:
            new_df[col] = gdf[col] + gdf[self.dependency]
        return new_df

    def dependencies(self):
        return [self.dependency]

#process the data with NVTabular
def process_NVT(args):

    if args.feature_cross_list:
        feature_pairs = [pair.split("_") for pair in args.feature_cross_list.split(",")]
        for pair in feature_pairs:
            CROSS_COLUMNS.append(pair[0]+'_'+pair[1])


    logging.info('NVTabular processing')
    train_output = os.path.join(args.out_path, "train")
    print("Training output data: "+train_output)
    val_output = os.path.join(args.out_path, "val")
    print("Validation output data: "+val_output)
    train_input = os.path.join(args.data_path, "train/train.txt")
    print("Training dataset: "+train_input)
    val_input = os.path.join(args.data_path, "val/test.txt")
    PREPROCESS_DIR_temp_train = os.path.join(args.out_path, 'train/temp-parquet-after-conversion')  
    PREPROCESS_DIR_temp_val = os.path.join(args.out_path, "val/temp-parquet-after-conversion")
    if not os.path.exists(PREPROCESS_DIR_temp_train):
        os.makedirs(PREPROCESS_DIR_temp_train)
    
    if not os.path.exists(PREPROCESS_DIR_temp_val):
        os.makedirs(PREPROCESS_DIR_temp_val)
    
    PREPROCESS_DIR_temp = [PREPROCESS_DIR_temp_train, PREPROCESS_DIR_temp_val]
    
    

    # Make sure we have a clean parquet space for cudf conversion
    for one_path in PREPROCESS_DIR_temp:
        if os.path.exists(one_path):
            shutil.rmtree(one_path)
        os.mkdir(one_path)


    ## Get Dask Client

    # Deploy a Single-Machine Multi-GPU Cluster
    device_size = device_mem_size(kind="total")
    cluster = None
    if args.protocol == "ucx":
        UCX_TLS = os.environ.get("UCX_TLS", "tcp,cuda_copy,cuda_ipc,sockcm")
        os.environ["UCX_TLS"] = UCX_TLS
        cluster = LocalCUDACluster(
            protocol = args.protocol,
            CUDA_VISIBLE_DEVICES = args.devices,
            n_workers = len(args.devices.split(",")),
            enable_nvlink=True,
            device_memory_limit = int(device_size * args.device_limit_frac),
            dashboard_address=":" + args.dashboard_port
        )
    else:
        cluster = LocalCUDACluster(
            protocol = args.protocol,
            n_workers = len(args.devices.split(",")),
            CUDA_VISIBLE_DEVICES = args.devices,
            device_memory_limit = int(device_size * args.device_limit_frac),
            dashboard_address=":" + args.dashboard_port
        )



    # Create the distributed client
    client = Client(cluster)
    if args.device_pool_frac > 0.01:
        setup_rmm_pool(client, int(args.device_pool_frac*device_size))


    #calculate the total processing time
    runtime = time.time()

    #test dataset without the label feature
    if args.dataset_type == 'test':
        global LABEL_COLUMNS
        LABEL_COLUMNS = []

    ##-----------------------------------##
    # Dask rapids converts txt to parquet
    # Dask cudf dataframe = ddf

    ## train/valid txt to parquet
    train_valid_paths = [(train_input,PREPROCESS_DIR_temp_train),(val_input,PREPROCESS_DIR_temp_val)]

    for input, temp_output in train_valid_paths:

        ddf = dask_cudf.read_csv(input,sep='\t',names=LABEL_COLUMNS + CONTINUOUS_COLUMNS + CATEGORICAL_COLUMNS)

        ## Convert label col to FP32
        if args.parquet_format and args.dataset_type == 'train':
            ddf["label"] = ddf['label'].astype('float32')

        # Save it as parquet format for better memory usage
        ddf.to_parquet(temp_output,header=True)
        ##-----------------------------------##

    COLUMNS =  LABEL_COLUMNS + CONTINUOUS_COLUMNS + CROSS_COLUMNS + CATEGORICAL_COLUMNS
    train_paths = glob.glob(os.path.join(PREPROCESS_DIR_temp_train, "*.parquet"))
    valid_paths = glob.glob(os.path.join(PREPROCESS_DIR_temp_val, "*.parquet"))

    categorify_op = Categorify(freq_threshold=args.freq_limit)
    cat_features = CATEGORICAL_COLUMNS >> categorify_op
    cont_features = CONTINUOUS_COLUMNS >> FillMissing() >> Clip(min_value=0) >> Normalize()
    cross_cat_op = Categorify(freq_threshold=args.freq_limit)

    features = LABEL_COLUMNS
    
    if args.criteo_mode == 0:
        features += cont_features
        if args.feature_cross_list:
            feature_pairs = [pair.split("_") for pair in args.feature_cross_list.split(",")]
            for pair in feature_pairs:
                col0 = pair[0]
                col1 = pair[1]
                features += col0 >> FeatureCross(col1)  >> Rename(postfix="_"+col1) >> cross_cat_op
            
    features += cat_features

    workflow = nvt.Workflow(features, client=client)

    logging.info("Preprocessing")

    output_format = 'hugectr'
    if args.parquet_format:
        output_format = 'parquet'

    # just for /samples/criteo model
    train_ds_iterator = nvt.Dataset(train_paths, engine='parquet', part_size=int(args.part_mem_frac * device_size))
    valid_ds_iterator = nvt.Dataset(valid_paths, engine='parquet', part_size=int(args.part_mem_frac * device_size))

    shuffle = None
    if args.shuffle == "PER_WORKER":
        shuffle = nvt.io.Shuffle.PER_WORKER
    elif args.shuffle == "PER_PARTITION":
        shuffle = nvt.io.Shuffle.PER_PARTITION

    logging.info('Train Datasets Preprocessing.....')

    dict_dtypes = {}
    for col in CATEGORICAL_COLUMNS:
        dict_dtypes[col] = np.int64
    if not args.criteo_mode:
        for col in CONTINUOUS_COLUMNS:
            dict_dtypes[col] = np.float32
    for col in CROSS_COLUMNS:
        dict_dtypes[col] = np.int64
    for col in LABEL_COLUMNS:
        dict_dtypes[col] = np.float32
    
    conts = CONTINUOUS_COLUMNS if not args.criteo_mode else []
    
    workflow.fit(train_ds_iterator)
    
    if output_format == 'hugectr':
        workflow.transform(train_ds_iterator).to_hugectr(
                cats=CATEGORICAL_COLUMNS + CROSS_COLUMNS,
                conts=conts,
                labels=LABEL_COLUMNS,
                output_path=train_output,
                shuffle=shuffle,
                out_files_per_proc=args.out_files_per_proc,
                num_threads=args.num_io_threads)
    else:
        workflow.transform(train_ds_iterator).to_parquet(
                output_path=train_output,
                dtypes=dict_dtypes,
                cats=CATEGORICAL_COLUMNS + CROSS_COLUMNS,
                conts=conts,
                labels=LABEL_COLUMNS,
                shuffle=shuffle,
                out_files_per_proc=args.out_files_per_proc,
                num_threads=args.num_io_threads)
        
        
        
    ###Getting slot size###    
    #--------------------##
    embeddings_dict_cat = categorify_op.get_embedding_sizes(CATEGORICAL_COLUMNS)
    embeddings_dict_cross = cross_cat_op.get_embedding_sizes(CROSS_COLUMNS)
    embeddings = [embeddings_dict_cat[c][0] for c in CATEGORICAL_COLUMNS] + [embeddings_dict_cross[c][0] for c in CROSS_COLUMNS]
    
    print(embeddings)
    ##--------------------##

    logging.info('Valid Datasets Preprocessing.....')

    if output_format == 'hugectr':
        workflow.transform(valid_ds_iterator).to_hugectr(
                cats=CATEGORICAL_COLUMNS + CROSS_COLUMNS,
                conts=conts,
                labels=LABEL_COLUMNS,
                output_path=val_output,
                shuffle=shuffle,
                out_files_per_proc=args.out_files_per_proc,
                num_threads=args.num_io_threads)
    else:
        workflow.transform(valid_ds_iterator).to_parquet(
                output_path=val_output,
                dtypes=dict_dtypes,
                cats=CATEGORICAL_COLUMNS + CROSS_COLUMNS,
                conts=conts,
                labels=LABEL_COLUMNS,
                shuffle=shuffle,
                out_files_per_proc=args.out_files_per_proc,
                num_threads=args.num_io_threads)

    embeddings_dict_cat = categorify_op.get_embedding_sizes(CATEGORICAL_COLUMNS)
    embeddings_dict_cross = cross_cat_op.get_embedding_sizes(CROSS_COLUMNS)
    embeddings = [embeddings_dict_cat[c][0] for c in CATEGORICAL_COLUMNS] + [embeddings_dict_cross[c][0] for c in CROSS_COLUMNS]
    
    print(embeddings)
    ##--------------------##

    ## Shutdown clusters
    client.close()
    logging.info('NVTabular processing done')

    runtime = time.time() - runtime

    print("\nDask-NVTabular Criteo Preprocessing")
    print("--------------------------------------")
    print(f"data_path          | {args.data_path}")
    print(f"output_path        | {args.out_path}")
    print(f"partition size     | {'%.2f GB'%bytesto(int(args.part_mem_frac * device_size),'g')}")
    print(f"protocol           | {args.protocol}")
    print(f"device(s)          | {args.devices}")
    print(f"rmm-pool-frac      | {(args.device_pool_frac)}")
    print(f"out-files-per-proc | {args.out_files_per_proc}")
    print(f"num_io_threads     | {args.num_io_threads}")
    print(f"shuffle            | {args.shuffle}")
    print("======================================")
    print(f"Runtime[s]         | {runtime}")
    print("======================================\n")


def parse_args():
    parser = argparse.ArgumentParser(description=("Multi-GPU Criteo Preprocessing"))

    #
    # System Options
    #

    parser.add_argument("--data_path", type=str, help="Input dataset path (Required)")
    parser.add_argument("--out_path", type=str, help="Directory path to write output (Required)")
    parser.add_argument(
        "-d",
        "--devices",
        default=os.environ.get("CUDA_VISIBLE_DEVICES", "0"),
        type=str,
        help='Comma-separated list of visible devices (e.g. "0,1,2,3"). '
    )
    parser.add_argument(
        "-p",
        "--protocol",
        choices=["tcp", "ucx"],
        default="tcp",
        type=str,
        help="Communication protocol to use (Default 'tcp')",
    )
    parser.add_argument(
        "--device_limit_frac",
        default=0.5,
        type=float,
        help="Worker device-memory limit as a fraction of GPU capacity (Default 0.8). "
    )
    parser.add_argument(
        "--device_pool_frac",
        default=0.9,
        type=float,
        help="RMM pool size for each worker  as a fraction of GPU capacity (Default 0.9). "
        "The RMM pool frac is the same for all GPUs, make sure each one has enough memory size",
    )
    parser.add_argument(
        "--num_io_threads",
        default=0,
        type=int,
        help="Number of threads to use when writing output data (Default 0). "
        "If 0 is specified, multi-threading will not be used for IO.",
    )

    #
    # Data-Decomposition Parameters
    #

    parser.add_argument(
        "--part_mem_frac",
        default=0.125,
        type=float,
        help="Maximum size desired for dataset partitions as a fraction "
        "of GPU capacity (Default 0.125)",
    )
    parser.add_argument(
        "--out_files_per_proc",
        default=1,
        type=int,
        help="Number of output files to write on each worker (Default 1)",
    )

    #
    # Preprocessing Options
    #

    parser.add_argument(
        "-f",
        "--freq_limit",
        default=0,
        type=int,
        help="Frequency limit for categorical encoding (Default 0)",
    )
    parser.add_argument(
        "-s",
        "--shuffle",
        choices=["PER_WORKER", "PER_PARTITION", "NONE"],
        default="PER_PARTITION",
        help="Shuffle algorithm to use when writing output data to disk (Default PER_PARTITION)",
    )

    parser.add_argument(
        "--feature_cross_list", default=None, type=str, help="List of feature crossing cols (e.g. C1_C2, C3_C4)"
    )

    #
    # Diagnostics Options
    #

    parser.add_argument(
        "--profile",
        metavar="PATH",
        default=None,
        type=str,
        help="Specify a file path to export a Dask profile report (E.g. dask-report.html)."
        "If this option is excluded from the command, not profile will be exported",
    )
    parser.add_argument(
        "--dashboard_port",
        default="8787",
        type=str,
        help="Specify the desired port of Dask's diagnostics-dashboard (Default `3787`). "
        "The dashboard will be hosted at http://<IP>:<PORT>/status",
    )

    #
    # Format
    #

    parser.add_argument('--criteo_mode', type=int, default=0)
    parser.add_argument('--parquet_format', type=int, default=1)
    parser.add_argument('--dataset_type', type=str, default='train')

    args = parser.parse_args()
    args.n_workers = len(args.devices.split(","))
    return args
if __name__ == '__main__':

    args = parse_args()

    process_NVT(args)

Overwriting /wdl_train/preprocess.py


In [42]:
import pandas as pd

In [46]:
!python3 /wdl_train/preprocess.py --data_path /wdl_train/ --out_path /wdl_train/ --freq_limit 6 --feature_cross_list C1_C2,C3_C4 --device_pool_frac 0.5  --devices "0" --num_io_threads 2

2021-11-29 05:26:15,779 NVTabular processing
Training output data: /wdl_train/train
Validation output data: /wdl_train/val
Training dataset: /wdl_train/train/train.txt
distributed.preloading - INFO - Import preload module: dask_cuda.initialize
2021-11-29 05:26:33,989 Preprocessing
2021-11-29 05:26:34,194 Train Datasets Preprocessing.....
[249058, 19561, 14212, 6890, 18592, 4, 6356, 1254, 52, 226170, 80508, 72308, 11, 2169, 7597, 61, 4, 923, 15, 249619, 168974, 243480, 68212, 9169, 75, 34, 278018, 415262]
2021-11-29 05:27:42,216 Valid Datasets Preprocessing.....
[249058, 19561, 14212, 6890, 18592, 4, 6356, 1254, 52, 226170, 80508, 72308, 11, 2169, 7597, 61, 4, 923, 15, 249619, 168974, 243480, 68212, 9169, 75, 34, 278018, 415262]
2021-11-29 05:27:44,134 NVTabular processing done

Dask-NVTabular Criteo Preprocessing
--------------------------------------
data_path          | /wdl_train/
output_path        | /wdl_train/
partition size     | 1.97 GB
protocol           | tcp
device(s)       

### 2.4 Checke the preprocessed training data

In [47]:
!ls -ll /wdl_train/train

total 14542626
-rw-r--r-- 1 root root          34 Nov 29 05:27 _file_list.txt
-rw-r--r-- 1 root root     8554464 Nov 29 05:27 _hugectr.keyset
-rw-r--r-- 1 root root      471772 Nov 29 05:27 _metadata
-rw-r--r-- 1 root root        1510 Nov 29 05:27 _metadata.json
-rw-r--r-- 1 root root  3332773998 Nov 29 05:27 part_0.parquet
-rw-r--r-- 1 root root       21459 Nov 29 05:27 schema.pbtxt
drwxr-xr-x 2 root root        4096 Nov 29 05:26 temp-parquet-after-conversion
-rw-r--r-- 1 root root 11549710546 Nov 29 03:50 train.txt


In [1]:
import pandas as pd
df = pd.read_parquet("/wdl_train/train/part_0.parquet")
df.head(2)

Unnamed: 0,I1,I2,I3,I4,I5,I6,I7,I8,I9,I10,...,C18,C19,C20,C21,C22,C23,C24,C25,C26,label
0,-0.031058,0.350474,0.26416,-0.091675,0.023207,0.068484,-0.064249,-0.224423,-0.760031,1.386036,...,1,2,1,1,1,1,25,1,2,0.0
1,-0.059432,-0.217361,-0.594327,-0.157301,-0.147269,-0.206385,-0.064249,-0.279201,-0.760031,-0.470383,...,2,2,1,1,1,1,210,1,2,0.0


## 3. WDL Model Training

In [1]:
%%writefile './model.py'
import hugectr
from mpi4py import MPI
solver = hugectr.CreateSolver(max_eval_batches = 4000,
                              batchsize_eval = 2720,
                              batchsize = 2720,
                              lr = 0.001,
                              vvgpu = [[2]],
                              repeat_dataset = True,
                              i64_input_key = True)

reader = hugectr.DataReaderParams(data_reader_type = hugectr.DataReaderType_t.Parquet,
                                  source = ["./train/_file_list.txt"],
                                  eval_source = "./val/_file_list.txt",
                                  check_type = hugectr.Check_t.Non,
                                  slot_size_array = [278018, 415262,249058, 19561, 14212, 6890, 18592, 4, 6356, 1254, 52, 226170, 80508, 72308, 11, 2169, 7597, 61, 4, 923, 15, 249619, 168974, 243480, 68212, 9169, 75, 34])
optimizer = hugectr.CreateOptimizer(optimizer_type = hugectr.Optimizer_t.Adam,
                                    update_type = hugectr.Update_t.Global,
                                    beta1 = 0.9,
                                    beta2 = 0.999,
                                    epsilon = 0.0000001)
model = hugectr.Model(solver, reader, optimizer)

model.add(hugectr.Input(label_dim = 1, label_name = "label",
                        dense_dim = 13, dense_name = "dense",
                        data_reader_sparse_param_array = 
                        [hugectr.DataReaderSparseParam("wide_data", 1, True, 2),
                        hugectr.DataReaderSparseParam("deep_data", 2, False, 26)]))

model.add(hugectr.SparseEmbedding(embedding_type = hugectr.Embedding_t.DistributedSlotSparseEmbeddingHash, 
                            workspace_size_per_gpu_in_mb = 8,
                            embedding_vec_size = 1,
                            combiner = "sum",
                            sparse_embedding_name = "sparse_embedding2",
                            bottom_name = "wide_data",
                            optimizer = optimizer))
model.add(hugectr.SparseEmbedding(embedding_type = hugectr.Embedding_t.DistributedSlotSparseEmbeddingHash, 
                            workspace_size_per_gpu_in_mb = 135,
                            embedding_vec_size = 16,
                            combiner = "sum",
                            sparse_embedding_name = "sparse_embedding1",
                            bottom_name = "deep_data",
                            optimizer = optimizer))

model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.Reshape,
                            bottom_names = ["sparse_embedding1"],
                            top_names = ["reshape1"],
                            leading_dim=416))
model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.Reshape,
                            bottom_names = ["sparse_embedding2"],
                            top_names = ["reshape2"],
                            leading_dim=2))
model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.ReduceSum,
                            bottom_names = ["reshape2"],
                            top_names = ["wide_redn"],
                            axis = 1))
model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.Concat,
                            bottom_names = ["reshape1", "dense"],
                            top_names = ["concat1"]))
model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.InnerProduct,
                            bottom_names = ["concat1"],
                            top_names = ["fc1"],
                            num_output=1024))
model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.ReLU,
                            bottom_names = ["fc1"],
                            top_names = ["relu1"]))
model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.Dropout,
                            bottom_names = ["relu1"],
                            top_names = ["dropout1"],
                            dropout_rate=0.5))
model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.InnerProduct,
                            bottom_names = ["dropout1"],
                            top_names = ["fc2"],
                            num_output=1024))
model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.ReLU,
                            bottom_names = ["fc2"],
                            top_names = ["relu2"]))
model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.Dropout,
                            bottom_names = ["relu2"],
                            top_names = ["dropout2"],
                            dropout_rate=0.5))
model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.InnerProduct,
                            bottom_names = ["dropout2"],
                            top_names = ["fc3"],
                            num_output=1))
model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.Add,
                            bottom_names = ["fc3", "wide_redn"],
                            top_names = ["add1"]))
model.add(hugectr.DenseLayer(layer_type = hugectr.Layer_t.BinaryCrossEntropyLoss,
                            bottom_names = ["add1", "label"],
                            top_names = ["loss"]))
model.compile()
model.summary()
model.fit(max_iter = 21000, display = 1000, eval_interval = 4000, snapshot = 20000, snapshot_prefix = "wdl")
model.graph_to_json(graph_config_file = "wdl.json")

Overwriting ./model.py


In [2]:
!python ./model.py

HugeCTR Version: 3.2
[HUGECTR][10:15:54][INFO][RANK0]: Global seed is 1845651137
[HUGECTR][10:15:54][INFO][RANK0]: Device to NUMA mapping:
  GPU 2 ->  node 0

[HUGECTR][10:15:56][INFO][RANK0]: Start all2all warmup
[HUGECTR][10:15:56][INFO][RANK0]: End all2all warmup
[HUGECTR][10:15:56][INFO][RANK0]: Using All-reduce algorithm: NCCL
[HUGECTR][10:15:56][INFO][RANK0]: Device 2: Tesla V100-SXM2-16GB
[HUGECTR][10:15:56][INFO][RANK0]: num of DataReader workers: 1
[HUGECTR][10:15:56][INFO][RANK0]: Vocabulary size: 2138588
[HUGECTR][10:15:56][INFO][RANK0]: max_vocabulary_size_per_gpu_=2097152
[HUGECTR][10:15:56][INFO][RANK0]: max_vocabulary_size_per_gpu_=2211840
[HUGECTR][10:15:56][INFO][RANK0]: Graph analysis to resolve tensor dependency
[HUGECTR][10:16:05][INFO][RANK0]: gpu0 start to init embedding
[HUGECTR][10:16:05][INFO][RANK0]: gpu0 init embedding done
[HUGECTR][10:16:05][INFO][RANK0]: gpu0 start to init embedding
[HUGECTR][10:16:05][INFO][RANK0]: gpu0 init embedding done
[HUGECTR][10:16

[HUGECTR][10:16:48][INFO][RANK0]: Iter: 8000 Time(1000 iters): 4.850043s Loss: 0.137412 lr:0.001000
[HUGECTR][10:16:53][INFO][RANK0]: Evaluation, AUC: 0.764927
[HUGECTR][10:16:53][INFO][RANK0]: Eval Time for 4000 iters: 4.632716s
[HUGECTR][10:16:57][INFO][RANK0]: Iter: 9000 Time(1000 iters): 9.446955s Loss: 0.119597 lr:0.001000
[HUGECTR][10:17:02][INFO][RANK0]: Iter: 10000 Time(1000 iters): 4.802527s Loss: 0.125044 lr:0.001000
[HUGECTR][10:17:07][INFO][RANK0]: Iter: 11000 Time(1000 iters): 4.824831s Loss: 0.124651 lr:0.001000
[HUGECTR][10:17:12][INFO][RANK0]: Iter: 12000 Time(1000 iters): 4.800592s Loss: 0.138358 lr:0.001000
[HUGECTR][10:17:17][INFO][RANK0]: Evaluation, AUC: 0.766296
[HUGECTR][10:17:17][INFO][RANK0]: Eval Time for 4000 iters: 4.642774s
[HUGECTR][10:17:21][INFO][RANK0]: Iter: 13000 Time(1000 iters): 9.470000s Loss: 0.112785 lr:0.001000
[HUGECTR][10:17:26][INFO][RANK0]: Iter: 14000 Time(1000 iters): 4.792785s Loss: 0.132969 lr:0.001000
[HUGECTR][10:17:31][INFO][RANK0]: I

In [3]:
!ls -ll

total 48913925
-rw-r--r-- 1 38458 dip        66603 Nov 30 10:17 HugeCTR_WDL_Training.ipynb
drwxr-xr-x 2 root  root        4096 Nov 29 05:27 categories
drwxr-xr-x 3 root  root        4096 Nov 29 05:27 dask-worker-space
-rw-r--r-- 1 38458 dip  49769814062 Nov 29 03:34 day_0
-rw-r--r-- 1 root  root        2461 Nov 30 09:24 infer_test.csv
-rw-r--r-- 1 root  root        5400 Nov 30 10:15 model.py
-rw-r--r-- 1 root  root       14633 Nov 29 05:25 preprocess.py
drwxr-xr-x 3 root  root        4096 Nov 29 05:27 train
drwxr-xr-x 3 root  root        4096 Nov 29 05:27 val
-rw-r--r-- 1 root  root        3731 Nov 30 10:18 wdl.json
-rw-r--r-- 1 root  root    16777216 Nov 30 10:18 wdl0_opt_sparse_20000.model
drwxr-xr-x 2 root  root        4096 Nov 29 05:49 wdl0_sparse_20000.model
-rw-r--r-- 1 root  root   283115520 Nov 30 10:18 wdl1_opt_sparse_20000.model
drwxr-xr-x 2 root  root        4096 Nov 29 05:49 wdl1_sparse_20000.model
-rwxr-xrwx 1 root  root        2497 Nov 29 09:58 wdl2predict.

## 4. Inference Validation

In [2]:
!ls -l /wdl_train/val

total 645762
-rw-r--r-- 1 root root        32 Nov 29 05:27 _file_list.txt
-rw-r--r-- 1 root root   8554464 Nov 29 05:27 _hugectr.keyset
-rw-r--r-- 1 root root     22726 Nov 29 05:27 _metadata
-rw-r--r-- 1 root root      1509 Nov 29 05:27 _metadata.json
-rw-r--r-- 1 root root 142825257 Nov 29 05:27 part_0.parquet
-rw-r--r-- 1 root root     21459 Nov 29 05:27 schema.pbtxt
drwxr-xr-x 2 root root      4096 Nov 29 05:26 temp-parquet-after-conversion
-rw-r--r-- 1 root root 509766965 Nov 29 03:50 test.txt


In [3]:
import pandas as pd
df = pd.read_parquet("/wdl_train/val/part_0.parquet")

In [4]:
df.head()

Unnamed: 0,I1,I2,I3,I4,I5,I6,I7,I8,I9,I10,...,C18,C19,C20,C21,C22,C23,C24,C25,C26,label
0,-0.055886,-0.548824,-0.272394,-0.157301,-0.224758,-0.206385,-0.064249,0.096421,-0.543133,-0.470383,...,1,1,3856,4891,4119,143,50,1,1,0.0
1,-0.059432,-0.380376,-0.272394,5.629719,-0.224758,-0.206385,-0.064249,-0.279201,-0.253935,-0.470383,...,2,1,2,2,2,0,327,2,1,0.0
2,-0.059432,-0.539315,-0.594327,-0.142386,-0.193763,-0.206385,-0.064249,-0.023569,-0.687732,-0.470383,...,1,1,0,2439,41980,349,3549,6,1,1.0
3,-0.059432,-0.463242,-0.594327,-0.097641,-0.209261,-0.206385,-0.064249,-0.219206,-0.687732,-0.470383,...,1,1,4024,3677,4287,565,306,4,1,0.0
4,0.022145,-0.509429,-0.379705,-0.151335,-0.162767,-0.206385,-0.064249,-0.28181,-0.470833,-0.470383,...,2,3,40847,3862,41562,1066,132,2,1,0.0


In [2]:
df.head(10).to_csv('/wdl_train/infer_test.csv', sep=',', index=False,header=True)

In [4]:
%%writefile /wdl_train/wdl2predict.py
from hugectr.inference import InferenceParams, CreateInferenceSession
import hugectr
import pandas as pd
import numpy as np
import sys
from mpi4py import MPI
def wdl_inference(model_name, network_file, dense_file, embedding_file_list, data_file,enable_cache):
    CATEGORICAL_COLUMNS=["C1_C2","C3_C4"]+["C" + str(x) for x in range(1, 27)]
    CONTINUOUS_COLUMNS=["I" + str(x) for x in range(1, 14)]
    LABEL_COLUMNS = ['label']
    emb_size = [278018, 415262,249058, 19561, 14212, 6890, 18592, 4, 6356, 1254, 52, 226170, 80508, 72308, 11, 2169, 7597, 61, 4, 923, 15, 249619, 168974, 243480, 68212, 9169, 75, 34]
    shift = np.insert(np.cumsum(emb_size), 0, 0)[:-1]
    test_df=pd.read_csv(data_file,sep=',')
    config_file = network_file
    row_ptrs = list(range(0,21))+list(range(0,261))
    dense_features =  list(test_df[CONTINUOUS_COLUMNS].values.flatten())
    test_df[CATEGORICAL_COLUMNS].astype(np.int64)
    embedding_columns = list((test_df[CATEGORICAL_COLUMNS]+shift).values.flatten())

    # create parameter server, embedding cache and inference session
    inference_params = InferenceParams(model_name = model_name,
                                max_batchsize = 64,
                                hit_rate_threshold = 0.9,
                                dense_model_file = dense_file,
                                sparse_model_files = embedding_file_list,
                                device_id = 0,
                                use_gpu_embedding_cache = enable_cache,
                                cache_size_percentage = 0.9,
                                i64_input_key = True,
                                use_mixed_precision = False
                                )
    inference_session = CreateInferenceSession(config_file, inference_params)
    output = inference_session.predict(dense_features, embedding_columns, row_ptrs)
    print("WDL multi-embedding table inference result is {}".format(output))
    
if __name__ == "__main__":
    model_name = sys.argv[1]
    network_file = sys.argv[2]
    dense_file = sys.argv[3]
    embedding_file_list = str(sys.argv[4]).split(',')
    print(embedding_file_list)
    data_file = sys.argv[5]
  

    #wdl_inference(model_name, network_file, dense_file, embedding_file_list, data_file, True,hugectr.Database_t.Redis)
    wdl_inference(model_name, network_file, dense_file, embedding_file_list, data_file, True)
    #wdl_inference(model_name, network_file, dense_file, embedding_file_list, data_file, False)


Overwriting /wdl_train/wdl2predict.py


In [5]:
!python /wdl_train/wdl2predict.py "wdl" "/wdl_train/wdl.json" "/wdl_train/wdl_dense_20000.model" "/wdl_train/wdl0_sparse_20000.model,/wdl_train/wdl1_sparse_20000.model" "/wdl_train/infer_test.csv"

['/wdl_train/wdl0_sparse_20000.model', '/wdl_train/wdl1_sparse_20000.model']
[HUGECTR][10:45:05][INFO][RANK0]: default_emb_vec_value is not specified using default: 0.000000
[HUGECTR][10:45:05][INFO][RANK0]: default_emb_vec_value is not specified using default: 0.000000
[HUGECTR][10:45:05][INFO][RANK0]: Creating ParallelHashMap CPU database backend...
[HUGECTR][10:45:05][INFO][RANK0]: Created parallel (16 partitions) blank database backend in local memory!
[HUGECTR][10:45:05][INFO][RANK0]: ParallelHashMap backend. Table: hctr_et.wdl.sparse_embedding2. Inserted 664320 / 664320 pairs.
[HUGECTR][10:45:05][INFO][RANK0]: Table: hctr_et.wdl.sparse_embedding2; cached 664320 / 664320 embeddings in CPU memory database!
[HUGECTR][10:45:05][INFO][RANK0]: ParallelHashMap backend. Table: hctr_et.wdl.sparse_embedding1. Inserted 1445304 / 1445304 pairs.
[HUGECTR][10:45:05][INFO][RANK0]: Table: hctr_et.wdl.sparse_embedding1; cached 1445304 / 1445304 embeddings in CPU memory database!
[HUGECTR][10:45:0