# NVtabular preprocessing pipeline prototype

This is a prototype of a Vertex Pipeline pipeline that uses NVIDIA NVTabular for feature engineering and data preprocessing. During prototyping Jupyter notebook is used for developing and submitting test runs. The final sample will be refactored to Python modules.

![NVT pipeline](images/nvt-pipeline.png)

In [1]:
import kfp
import json

from typing import NamedTuple
import numpy as np

from google.cloud import aiplatform

from kfp.v2 import compiler
from kfp.v2 import dsl

from kfp.v2.google.client import AIPlatformClient

from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                        OutputPath)

from typing import Optional

## Configure Vertex AI SDK

In [2]:
PROJECT_ID = 'jk-mlops-dev'
REGION = 'us-central1'
STAGING_BUCKET = 'gs://jk-vertex-us-central1'
VERTEX_SA = f'vertex-sa@{PROJECT_ID}.iam.gserviceaccount.com'

aiplatform.init(
    project=PROJECT_ID,
    location=REGION,
    staging_bucket=STAGING_BUCKET
)

## Define KFP components

### Prepare a base image for NVTabular

Eventually, we will use the NGC Merlin image. In the interim we will use a custom image based on a GCP DL container.

#### Create a Dockerfile

In [3]:
%%writefile Dockerfile
FROM gcr.io/deeplearning-platform-release/base-cu110

WORKDIR /nvtabular

RUN conda install -c nvidia -c rapidsai -c numba -c conda-forge pynvml dask-cuda nvtabular=0.5.3  cudatoolkit=11.0

ENV LD_LIBRARY_PATH /usr/local/cuda/lib:/usr/local/cuda/lib64:/usr/local/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/local/nvidia/lib:/usr/local/nvidia/lib64
                    

Overwriting Dockerfile


#### Build and push the image

In [4]:
BASE_IMAGE_NAME = f'gcr.io/{PROJECT_ID}/nvt_base_image'

In [None]:
!docker build -t {BASE_IMAGE_NAME} . 

In [None]:
!docker push {BASE_IMAGE_NAME}

### Data ingestion component

In [11]:
@dsl.component(base_image=BASE_IMAGE_NAME)
def ingest_csv_op(
    train_files: list,
    valid_files: list,
    sep: str,
    schema: list,
    gpus: list,
    output_dataset: Output[Dataset]
):
    import logging
    import nvtabular as nvt
    import numpy as np
    import os
    
    from dask_cuda import LocalCUDACluster
    from dask.distributed import Client
    
    TRAIN_SPLIT_FOLDER = 'train'
    VALID_SPLIT_FOLDER = 'valid'
    
    client = None
    if len(gpus) > 1:
        logging.info('Creating a Dask CUDA cluster')
        cluster = LocalCUDACluster(
            CUDA_VISIBLE_DEVICES=','.join(gpus),
            n_workers=len(gpus)
        )
        client = Client(cluster)
    
    names = [feature[0] for feature in schema]
    dtypes = {feature[0]: feature[1] for feature in schema}
    
    for folder_name, files in zip([TRAIN_SPLIT_FOLDER, VALID_SPLIT_FOLDER], [train_files, valid_files]):
        dataset = nvt.Dataset(
            path_or_source = files,
            engine='csv',
            names=names,
            sep=sep,
            dtypes=dtypes,
            client=client
        )
        
        output_path = os.path.join(output_dataset.uri, folder_name)
        os.makedirs(output_path, exist_ok=True)
        
        logging.info('Writing a parquet file to {}'.format(output_path))
        dataset.to_parquet(
            output_path=output_path,
            preserve_files=True
        )
    
    output_dataset.metadata['split_names'] = [TRAIN_SPLIT_FOLDER, VALID_SPLIT_FOLDER]
    
    

In [12]:
@dsl.component(base_image=BASE_IMAGE_NAME)
def fit_workflow_op(
    dataset: Input[Dataset],
    fitted_workflow: Output[Artifact],
    gpus: list,
    part_mem_frac: Optional[float]=0.1,
    device_limit_frac: Optional[float]=0.7,
    device_pool_frac: Optional[float]=0.8,
    split_name: Optional[str]='train'
):
    import logging
    import nvtabular as nvt
    import numpy as np
    
    from pathlib import Path
    from dask_cuda import LocalCUDACluster
    from dask.distributed import Client
    from nvtabular.utils import _pynvml_mem_size, device_mem_size
    
    from nvtabular.ops import (
        Categorify,
        Clip,
        FillMissing,
        Normalize,
    )
    
    STATS_FOLDER = 'stats'
    WORKFLOW_FOLDER = 'workflow'
    
    if not split_name in dataset.metadata['split_names']:
        raise RuntimeError('Dataset does not have {} split'.format(split_name))
        
 
    CONTINUOUS_COLUMNS = ["I" + str(x) for x in range(1, 14)]
    CATEGORICAL_COLUMNS = ["C" + str(x) for x in range(1, 27)]
    LABEL_COLUMNS = ["label"]
    COLUMNS = CONTINUOUS_COLUMNS + CATEGORICAL_COLUMNS + LABEL_COLUMNS

    
    device_size = device_mem_size(kind="total")
    part_size = int(part_mem_frac * device_size)
    device_limit = int(device_limit_frac * device_size)
    device_pool_size = int(device_pool_frac * device_size)
    
    client = None
    if len(gpus) > 1:
        logging.info('Creating a Dask CUDA cluster')
        cluster = LocalCUDACluster(
            CUDA_VISIBLE_DEVICES=','.join(gpus),
            n_workers=len(gpus),
            device_memory_limit=device_limit,
            rmm_pool_size=(device_pool_size // 256) * 256
        )
        client = Client(cluster)
    
    num_buckets = 10000000
    cat_features = CATEGORICAL_COLUMNS >> categorify_op
    cont_features = CONTINUOUS_COLUMNS >> FillMissing() >> Clip(min_value=0) >> Normalize()
    features = cat_features + cont_features + LABEL_COLUMNS

    workflow = nvt.Workflow(features, client=client)  
    
    train_paths = [str(path) for path in Path(dataset.uri, split_name).glob('*.parquet')]
    train_dataset = nvt.Dataset(train_paths, engine="parquet", part_size=part_size)
    
    workflow.fit(train_dataset)
    workflow.save(fitted_workflow.uri)
        
    

In [6]:
class DatasetMock:
    def __init__(self, uri):
        self.uri = uri
        self.metadata = {}
        
input_dataset = DatasetMock(uri='/home/jupyter/output')
input_dataset.metadata['split_names'] = ['train', 'valid']

fitted_workflow = DatasetMock(uri='/home/jupyter/workflow')
gpus = ['0','1']


fit_workflow_op(
    dataset=input_dataset, 
    fitted_workflow=fitted_workflow,
    gpus=gpus)

INFO:numba.cuda.cudadrv.driver:init
INFO:root:Creating a Dask CUDA cluster


<nvtabular.workflow.Workflow at 0x7fd59bd6e9d0>

In [15]:
PIPELINE_NAME = 'nvt-test-pipeline'


@dsl.pipeline(
    name=PIPELINE_NAME
)
def nvt_pipeline(
    train_files: list,
    valid_files: list,
    sep: str,
    gpus: list,
    schema: list,
):
    ingest_csv_files = ingest_csv_op(
        train_files=train_files,
        valid_files=valid_files,
        sep=sep,
        gpus=gpus,
        schema=schema,
    )
    ingest_csv_files.set_cpu_limit("48")
    ingest_csv_files.set_memory_limit("312G")
    ingest_csv_files.set_gpu_limit("4")
    ingest_csv_files.add_node_selector_constraint('cloud.google.com/gke-accelerator', 'nvidia-tesla-t4')
    
    fit_workflow = fit_workflow_op(
        dataset=ingest_csv_files.outputs['output_dataset'],
        gpus=gpus
    )
    fit_workflow.set_cpu_limit("48")
    fit_workflow.set_memory_limit("312G")
    fit_workflow.set_gpu_limit("4")
    fit_workflow.add_node_selector_constraint('cloud.google.com/gke-accelerator', 'nvidia-tesla-t4')
    
    

In [16]:
package_path = 'nvt_pipeline.json'

compiler.Compiler().compile(
    pipeline_func=nvt_pipeline,
    package_path=package_path)

In [17]:
job_name = 'test_pipeline_run'

cont_features = [[name, "int32"] for name in ["I" + str(x) for x in range(1, 14)]]
cat_features = [[name, "hex"] for name in ["C" + str(x) for x in range(1, 27)]]
schema = [['label', 'int32']] + cont_features + cat_features
sep = "\t"
gpus = ['0','1']

train_files = ['/gcs/jk-criteo-bucket/criteo_orig/day_0']
valid_files = ['/gcs/jk-criteo-bucket/criteo_orig/day_1']

params = {
    'train_files': json.dumps(train_files),
    'valid_files': json.dumps(valid_files),
    'sep': "\t",
    'schema': json.dumps(schema),
    'gpus': json.dumps(gpus)
}

pipeline_job = aiplatform.PipelineJob(
    display_name=job_name,
    template_path=package_path,
    enable_caching=False,
    parameter_values=params,
)

pipeline_job.run(
    service_account=VERTEX_SA,
    sync=False
)

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/895222332033/locations/us-central1/pipelineJobs/nvt-test-pipeline-20210916051700
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/895222332033/locations/us-central1/pipelineJobs/nvt-test-pipeline-20210916051700')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/nvt-test-pipeline-20210916051700?project=895222332033
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/895222332033/locations/us-central1/pipelineJobs/nvt-test-pipeline-20210916051700 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/895222332033/locations/us-central1/pipelineJo