# Sandbox

In [1]:
import os
import re
import shutil
import warnings
import tensorflow as tf
import time

# External Dependencies
import numpy as np
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
import rmm

from google.cloud import aiplatform as vertex_ai

# NVTabular
import nvtabular as nvt
from nvtabular.ops import (
    Categorify,
    Clip,
    FillMissing,
    Normalize,
)
from nvtabular.utils import _pynvml_mem_size, device_mem_size

2021-08-08 03:49:28.493363: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0


## Configure GCP settings

In [2]:
PROJECT = 'jk-mlops-dev'
REGION = 'us-central1'

STAGING_BUCKET = 'gs://jk-vertex-us-central1'

VERTEX_SA = f'vertex-sa@{PROJECT}.iam.gserviceaccount.com'

## Prepare a preprocessing container

In [26]:
TRAIN_IMAGE = f'gcr.io/{PROJECT}/merlin'

In [3]:
SCRIPT_FOLDER = 'preprocess'
if tf.io.gfile.exists(SCRIPT_FOLDER):
    tf.io.gfile.rmtree(SCRIPT_FOLDER)
tf.io.gfile.mkdir(SCRIPT_FOLDER)
file_path = os.path.join(SCRIPT_FOLDER, 'preprocess.py')

### Prepare a Dockerfile

In [42]:
%%writefile {SCRIPT_FOLDER}/Dockerfile

FROM gcr.io/deeplearning-platform-release/base-cu110

WORKDIR /nvtabular

RUN conda install -c nvidia -c rapidsai -c numba -c conda-forge pynvml dask-cuda nvtabular=0.5.3  cudatoolkit=11.0

ENV LD_LIBRARY_PATH /usr/local/cuda/lib:/usr/local/cuda/lib64:/usr/local/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/local/nvidia/lib:/usr/local/nvidia/lib64
ENV PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION python

COPY preprocess.py ./
#COPY entrypoint.sh ./
#RUN chmod +x ./entrypoint.sh


Overwriting preprocess/Dockerfile


### Prepare a preprocessing script

In [75]:
%%writefile {file_path}


# Copyright 2021 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#            http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and

import argparse
import json
import logging
import os
import re
import shutil
import warnings
import time
import numpy as np

import nvtabular as nvt
import rmm

from datetime import datetime
from dask_cuda import LocalCUDACluster
from dask.distributed import Client

from nvtabular.ops import (
    Categorify,
    Clip,
    FillMissing,
    Normalize,
)
from nvtabular.utils import _pynvml_mem_size, device_mem_size


BASE_DIR = '/tmp'
DASK_CLUSTER_PROTOCOL = 'tcp'
DASHBOARD_PORT = '8787'

# Criteo columns
CONTINUOUS_COLUMNS = ["I" + str(x) for x in range(1, 14)]
CATEGORICAL_COLUMNS = ["C" + str(x) for x in range(1, 27)]
LABEL_COLUMNS = ["label"]


def get_args():
    """Defines and parse commandline arguments."""

    parser = argparse.ArgumentParser()

    parser.add_argument(
        "--output_path",
        default="/tmp",
        type=str,
    )
    
    parser.add_argument(
        "--training_data",
        default="/tmp/training",
        type=str,
    )
    
    parser.add_argument(
        "--validation_data",
        default="/tmp/validation",
        type=str,
    )
        
    parser.add_argument(
        "--gpus",
        default="0,1",
        type=str,
    )
        
    parser.add_argument(
        "--device_limit_frac",
        default=0.7,
        type=float,
    )
    
    parser.add_argument(
        "--device_pool_frac",
        default=0.8,
        type=float,
    )
    
    parser.add_argument(
        "--part_mem_frac",
        default=0.1,
        type=float,
    )

    return parser.parse_args()


def create_dask_cuda_cluster(
    gpus,
    device_size,
    device_limit_frac,
    device_pool_frac,
    dask_workdir,
):
    
    # Initialize RMM pool on ALL workers
    def _rmm_pool():
        rmm.reinitialize(
            pool_allocator=True,
            initial_pool_size=(device_pool_size // 256) * 256,  # Use default size
        )
    
    device_limit = int(device_limit_frac * device_size)
    device_pool_size = int(device_pool_frac * device_size)
    
    # Check if any device memory is already occupied
    for dev in gpus.split(","):
        fmem = _pynvml_mem_size(kind="free", index=int(dev))
        used = (device_size - fmem) / 1e9
        if used > 1.0:
            warnings.warn(f"BEWARE - {used} GB is already occupied on device {int(dev)}!")
            
    cluster = LocalCUDACluster(
        protocol=DASK_CLUSTER_PROTOCOL,
        n_workers=len(gpus.split(",")),
        CUDA_VISIBLE_DEVICES=gpus,
        device_memory_limit=device_limit,
        local_directory=dask_workdir
    )  
    
    client = Client(cluster)
    client.run(_rmm_pool)
    
    return client


def create_preprocessing_workflow(
    client,
    stats_path,
    num_buckets=10000000
):
    
    categorify_op = Categorify(out_path=stats_path, max_size=num_buckets)
    cat_features = CATEGORICAL_COLUMNS >> categorify_op
    cont_features = CONTINUOUS_COLUMNS >> FillMissing() >> Clip(min_value=0) >> Normalize()
    features = cat_features + cont_features + LABEL_COLUMNS
    workflow = nvt.Workflow(features, client=client)
    
    return workflow

def create_datasets(
    train_paths,
    valid_paths,
    part_mem_frac,
    device_size,
):
    
    dict_dtypes = {}

    for col in CATEGORICAL_COLUMNS:
        dict_dtypes[col] = np.int64

    for col in CONTINUOUS_COLUMNS:
        dict_dtypes[col] = np.float32

    for col in LABEL_COLUMNS:
        dict_dtypes[col] = np.float32
        
    part_size = int(part_mem_frac * device_size)
    train_dataset = nvt.Dataset(train_paths, engine="parquet", part_size=part_size)
    valid_dataset = nvt.Dataset(valid_paths, engine="parquet", part_size=part_size)
    
    return dict_dtypes, train_dataset, valid_dataset
    

def main():
    args = get_args()
    

    dask_workdir = os.path.join(BASE_DIR, "test_dask/workdir")
    stats_path = os.path.join(BASE_DIR, "test_dask/stats")

    # Make sure we have a clean worker space for Dask
    if os.path.isdir(dask_workdir):
        shutil.rmtree(dask_workdir)
    os.makedirs(dask_workdir)

    # Make sure we have a clean stats space for Dask
    if os.path.isdir(stats_path):
        shutil.rmtree(stats_path)
    os.mkdir(stats_path)
    

    fname = "day_{}.parquet"
    train_paths = [
        os.path.join(args.training_data, filename) for filename in os.listdir(args.training_data)]
    valid_paths = [
        os.path.join(args.validation_data, filename) for filename in os.listdir(args.validation_data)]
    
    logging.info(f"Training data path: {train_paths}")
    logging.info(f"Validation data path: {valid_paths}")
    
    logging.info("Creating Dask-Cuda cluster")
    device_size = device_mem_size(kind="total")
    client = create_dask_cuda_cluster(
        gpus=args.gpus,
        device_size=device_size,
        device_limit_frac=args.device_limit_frac,
        device_pool_frac=args.device_pool_frac,
        dask_workdir=dask_workdir,
    )
    logging.info("Cluster created")
    logging.info(str(client))
    
    logging.info("Creating workflow")
    workflow = create_preprocessing_workflow(
        client=client,
        stats_path=stats_path)
    logging.info("Workflow created")
    
    logging.info("Creating datasets")
    dict_dtypes, train_dataset, valid_dataset = create_datasets(
        train_paths=train_paths,
        valid_paths=train_paths,
        part_mem_frac=args.part_mem_frac,
        device_size=device_size,
    )
    logging.info("Datasets created")
    
    start_time = datetime.now()
    logging.info(f"Starting fitting the preprocessing workflow on a training dataset. Datetime: {start_time}")
    workflow.fit(train_dataset)
    end_time = datetime.now()
    logging.info('Fitting completed. Datetime: {}, Elapsed time: {}'.format(end_time, end_time-start_time))
    
    start_time = datetime.now()
    logging.info(f"Starting  the preprocessing workflow on a training dataset. Datetime: {start_time}")
    workflow.transform(train_dataset).to_parquet(
        output_path=f'{args.output_path}/train',
        shuffle=nvt.io.Shuffle.PER_PARTITION,
        dtypes=dict_dtypes,
        cats=CATEGORICAL_COLUMNS,
        conts=CONTINUOUS_COLUMNS,
        labels=LABEL_COLUMNS,
    )
    end_time = datetime.now()
    logging.info('Processing completed. Datetime: {}, Elapsed time: {}'.format(end_time, end_time-start_time))
    
    start_time = datetime.now()
    logging.info(f"Starting the preprocessing workflow on a validation datasets. Datetime: {start_time}")
    workflow.transform(valid_dataset).to_parquet(
        output_path=f'{args.output_path}/valid',
        shuffle=nvt.io.Shuffle.PER_PARTITION,
        dtypes=dict_dtypes,
        cats=CATEGORICAL_COLUMNS,
        conts=CONTINUOUS_COLUMNS,
        labels=LABEL_COLUMNS,
    )
    end_time = datetime.now()
    logging.info('Processing completed. Datetime: {}, Elapsed time: {}'.format(end_time, end_time-start_time))
    
    logging.info(f"Saving workflow to {args.output_path}")
    workflow.save(os.path.join(args.output_path, "workflow"))
    logging.info("Workflow saved")
    
if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    main()

Overwriting preprocess/preprocess.py


### Build a container

In [76]:
TRAIN_IMAGE = f'gcr.io/{PROJECT}/merlin-preprocess'

! docker build -t {TRAIN_IMAGE} {SCRIPT_FOLDER}

Sending build context to Docker daemon  10.75kB
Step 1/6 : FROM gcr.io/deeplearning-platform-release/base-cu110
 ---> a88534d17a8b
Step 2/6 : WORKDIR /nvtabular
 ---> Using cache
 ---> 29b40fc1e76a
Step 3/6 : RUN conda install -c nvidia -c rapidsai -c numba -c conda-forge pynvml dask-cuda nvtabular=0.5.3  cudatoolkit=11.0
 ---> Using cache
 ---> d12c90589672
Step 4/6 : ENV LD_LIBRARY_PATH /usr/local/cuda/lib:/usr/local/cuda/lib64:/usr/local/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/local/nvidia/lib:/usr/local/nvidia/lib64
 ---> Using cache
 ---> d67437c6dbc9
Step 5/6 : ENV PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION python
 ---> Using cache
 ---> 323e76b8ce44
Step 6/6 : COPY preprocess.py ./
 ---> Using cache
 ---> 0b5fb7d6f6b7
Successfully built 0b5fb7d6f6b7
Successfully tagged gcr.io/jk-mlops-dev/merlin-preprocess:latest
INFO:google.cloud.aiplatform.jobs:CustomJob projects/895222332033/locations/us-central1/customJobs/2807180729060950016 current state:
JobState.JOB_STATE_RUNNING

In [77]:
! docker push {TRAIN_IMAGE}

Using default tag: latest
The push refers to repository [gcr.io/jk-mlops-dev/merlin-preprocess]

[1Bc9099694: Preparing 
[1Bb29a95c2: Preparing 
[1Baca5bb00: Preparing 
[1Bc6abb710: Preparing 
[1B119bf2cd: Preparing 
[1Bbdf9b557: Preparing 
[1Bdbc2b748: Preparing 
[1Bb8f29c2e: Preparing 
[1B7b2f7486: Preparing 
[1B97a3e6e4: Preparing 
[1Ba5e8117f: Preparing 
[1B8124ed57: Preparing 
[1B4704bb3d: Preparing 
[1B6ef24b4b: Preparing 
[1B113f67c8: Preparing 
[1B857a1d48: Preparing 
[1B97864c52: Preparing 
[1Bbaac3e32: Preparing 
[1Ba1af4c10: Preparing 
[1Ba468ca49: Preparing 
[1B205798d1: Preparing 
[13B7a3e6e4: Waiting g 
[1B55c89c2a: Preparing 
[14B5e8117f: Waiting g 
[14B124ed57: Waiting g 
[1B9ca3db46: Preparing 
[1B1a1930ab: Preparing 
[22Bbc2b748: Waiting g 
[29B9099694: Pushed lready exists 7kB[29A[2K[27A[2K[25A[2K[22A[2K[20A[2K[18A[2K[16A[2K[14A[2K[11A[2K[9A[2K[7A[2K[5A[2K[2A[2K[1A[2K[29A[2Klatest: digest: sha256:51798be2a915

## Submit Vertex job

In [66]:
vertex_ai.init(
    project=PROJECT,
    location=REGION,
    staging_bucket=STAGING_BUCKET
)

In [79]:
job_name = 'MERLIN_CONTAINER_TEST_{}'.format(time.strftime("%Y%m%d_%H%M%S"))
base_output_dir = '{}/jobs/{}/test.txt'.format(STAGING_BUCKET, job_name)

training_data = '/gcs/jk-vertex-us-central1/criteo-small-train'
validation_data = '/gcs/jk-vertex-us-central1/criteo-small-valid'
output_path = f'/gcs/jk-vertex-us-central1/merlin-testing/{job_name}'

worker_pool_specs =  [
    {
        "machine_spec": {
            "machine_type": "n1-standard-8",
            "accelerator_type": "NVIDIA_TESLA_T4",
            "accelerator_count": 2,
        },
        "replica_count": 1,
        "container_spec": {
            "image_uri": TRAIN_IMAGE,
            "command": ["python", "preprocess.py",],
            "args": [             
                '--training_data=' + training_data, 
                '--validation_data=' + validation_data,
                '--output_path=' + output_path,
            ],
        },
    }
]

print(worker_pool_specs)

[{'machine_spec': {'machine_type': 'n1-standard-8', 'accelerator_type': 'NVIDIA_TESLA_T4', 'accelerator_count': 2}, 'replica_count': 1, 'container_spec': {'image_uri': 'gcr.io/jk-mlops-dev/merlin-preprocess', 'command': ['python', 'preprocess.py'], 'args': ['--training_data=/gcs/jk-vertex-us-central1/criteo-small-train', '--validation_data=/gcs/jk-vertex-us-central1/criteo-small-valid', '--output_path=/gcs/jk-vertex-us-central1/merlin-testing/MERLIN_CONTAINER_TEST_20210808_232940']}}]


In [80]:
job = vertex_ai.CustomJob(
    display_name=job_name,
    worker_pool_specs=worker_pool_specs,
    staging_bucket=base_output_dir
)

job.run(sync=False, 
        service_account=VERTEX_SA,
#        tensorboard=TENSORBOARD
)

INFO:google.cloud.aiplatform.jobs:Creating CustomJob
INFO:google.cloud.aiplatform.jobs:CustomJob created. Resource name: projects/895222332033/locations/us-central1/customJobs/643201108109426688
INFO:google.cloud.aiplatform.jobs:To use this CustomJob in another session:
INFO:google.cloud.aiplatform.jobs:custom_job = aiplatform.CustomJob.get('projects/895222332033/locations/us-central1/customJobs/643201108109426688')
INFO:google.cloud.aiplatform.jobs:View Custom Job:
https://console.cloud.google.com/ai/platform/locations/us-central1/training/643201108109426688?project=895222332033
INFO:google.cloud.aiplatform.jobs:CustomJob projects/895222332033/locations/us-central1/customJobs/643201108109426688 current state:
JobState.JOB_STATE_PENDING
INFO:google.cloud.aiplatform.jobs:CustomJob projects/895222332033/locations/us-central1/customJobs/643201108109426688 current state:
JobState.JOB_STATE_PENDING
INFO:google.cloud.aiplatform.jobs:CustomJob projects/895222332033/locations/us-central1/custo