# Overview

This is just a bunch of snippets ordered to create a quick pipeline which demonstrates Vertex capabilities, the pipeline contains different technologies, frameworks, etc, like dask, rapids, docker for the training leveraging GPU and using Flask as webserver for online predictions. 

## Set Variables

In [256]:
PROJECT_ID = 'jchavezar-demo'
REGION = 'us-central1'
CUSTOM_TRAIN_NAME = 'gpu_custom_job'
PIPELINE_ROOT_PATH = 'gs://vtx-root-path'
MODEL_FILE_BUCKET = 'gs://vtx-pipe-models'
TRAINING_REPOSITORY = 'trainings'
IMAGE_URI = f'us-central1-docker.pkg.dev/jchavezar-demo/{TRAINING_REPOSITORY}/train_xgb_gpu:latest'
PREDICTION_REPOSITORY = 'predictions'
PREDICTION_IMAGE_URI = f'us-central1-docker.pkg.dev/jchavezar-demo/{PREDICTION_REPOSITORY}/pred_xgb_cpu:latest'
ART_REG = IMAGE_URI.split('/')[0]
DATASET_DISPLAY_NAME = 'covertype-4Mr'
DATASET_SOURCE = 'gs://vtx-datasets-public/cover_type_4Mrows.csv'

## Create Folders

In [257]:
!rm -fr training
!rm -fr prediction
!mkdir training
!mkdir prediction
!mkdir training/trainer
!touch training/trainer/__init__.py

## Utils for Storing Artifacts

In [258]:
%%writefile training/trainer/utils.py

from google.cloud import storage
import os
import logging

def save_model(args):
    """Saves the model to Google Cloud Storage or local file system
    Args:
      args: contains name for saved model.
    """
    scheme = 'gs://'
    if args.job_dir.startswith(scheme):
        print(f"Reading input job_dir: {args.job_dir}")
        job_dir = args.job_dir.split("/")
        bucket_name = job_dir[2]
        object_prefix = "/".join(job_dir[3:]).rstrip("/")
        print(f"Reading object_prefix: {object_prefix}")

        if object_prefix:
            model_path = '{}/{}'.format(object_prefix, "xgboost")
        else:
            model_path = '{}'.format("xgboost")
            
        print(f"The model path is {model_path}")
        bucket = storage.Client().bucket(bucket_name)    
        local_path = os.path.join("/tmp", "xgboost")
        files = [f for f in os.listdir(local_path) if os.path.isfile(os.path.join(local_path, f))]
        for file in files:
            local_file = os.path.join(local_path, file)
            blob = bucket.blob("/".join([model_path, file]))
            blob.upload_from_filename(local_file)
        print(local_file)
        print(f"gs://{bucket_name}/{model_path}")
        print(f"Saved model files in gs://{bucket_name}/{model_path}")
    else:
        print(f"Saved model files at {os.path.join('/tmp', args.model_name)}")
        print(f"To save model files in GCS bucket, please specify job_dir starting with gs://")

Writing training/trainer/utils.py


## Training Code with Dask + CUDA (GPU)

In [259]:
%%writefile training/trainer/task.py

from dask_cuda import LocalCUDACluster
from dask.distributed import Client
from dask.distributed import wait
from dask import array as da
import xgboost as xgb
import pandas as pd
from xgboost import dask as dxgb
from xgboost.dask import DaskDMatrix
import argparse
import time
import utils
import gcsfs
import dask_cudf
import os, json
import subprocess
import pandas as pd
from dask.utils import parse_bytes

parser = argparse.ArgumentParser()
parser.add_argument(
    '--dataset_source', 
    dest='dataset',
    type=str,
    help='Dataset.')
parser.add_argument(
    '--job-dir',
    dest='job_dir',
    type=str,
    default=os.getenv('AIP_MODEL_DIR'),
    help='GCS location to export models')
parser.add_argument(
    '--model-name',
    dest='model_name',
    default="custom-train",
    help='The name of your saved model')
parser.add_argument(
    '--num-gpu-per-worker', type=str, help='num of workers',
    default=2)
parser.add_argument(
    '--threads-per-worker', type=str, help='num of threads per worker',
    default=4)
args = parser.parse_args()


def using_quantile_device_dmatrix(client: Client, dataset_source, job_dir, model_name):
    
    start_time = time.time()
    print(f"[INFO] ------ Importing dataset {dataset_source}")
    df = dask_cudf.read_csv(dataset_source)

    print("Cleaning and standarizing dataset")
    df = df.dropna() 

    print(f"[INFO] ------ Splitting dataset")
    df_train, df_eval = df.random_split([0.8, 0.2], random_state=123)
    df_train_features= df_train.drop('Cover_Type', axis=1)
    df_eval_features= df_eval.drop('Cover_Type', axis=1)
    df_train_labels = df_train.pop('Cover_Type')
    df_eval_labels = df_eval.pop('Cover_Type')

    print(xgb.__version__)

    print("[INFO] ------ Dataset for dask")
    dtrain = dxgb.DaskDeviceQuantileDMatrix(client, df_train_features, df_train_labels)
    
    print("[INFO] ------ Dataset for dask")
    dvalid = dxgb.DaskDeviceQuantileDMatrix(client, df_eval_features, df_eval_labels)
    print("[INFO]: ------ QuantileDMatrix is formed in {} seconds ---".format((time.time() - start_time)))

    del df_train_features
    del df_train_labels
    del df_eval_features
    del df_eval_labels
    
    start_time = time.time()
    print("Training")
    output = xgb.dask.train(
        client,
        {
            "verbosity": 2, 
            "tree_method": "gpu_hist", 
            "objective": "multi:softprob",
            "eval_metric": ["mlogloss"],
            "learning_rate": 0.1,
            "gamma": 0.9,
            "subsample": 0.5,
            "max_depth": 9,
            "num_class": 8
        },
        dtrain,
        num_boost_round=10,
        evals=[(dvalid, "valid1")],
        early_stopping_rounds=5
    ) 
    print("[INFO]: ------ Training is completed in {} seconds ---".format((time.time() - start_time)))

    # Saving models and exporting performance metrics
    
    df_eval_metrics = pd.DataFrame(output["history"]["valid1"])
    model = output["booster"]
    best_model = model[: model.best_iteration]
    print(f"[INFO] ------ Best model: {best_model}")
    temp_dir = "/tmp/xgboost"
    os.mkdir(temp_dir)
    print(job_dir)
    best_model.save_model("{}/{}".format(temp_dir, model_name))
    df_eval_metrics.to_json("{}/all_results.json".format(temp_dir))

    utils.save_model(args)

def get_scheduler_info():
    scheduler_ip =  subprocess.check_output(['hostname','--all-ip-addresses'])
    scheduler_ip = scheduler_ip.decode('UTF-8').split()[0]
    scheduler_port = '8786'
    scheduler_uri = '{}:{}'.format(scheduler_ip, scheduler_port)
    return scheduler_ip, scheduler_uri

if __name__ == '__main__':
    print("[INFO] ------ Creating dask cluster")
    
    sched_ip, sched_uri = get_scheduler_info()
    
    print(f"[INFO] ------ Sched_ip and Sched_uri, {sched_ip}, {sched_uri}")

    print("[INFO]: ------ LocalCUDACluster is being formed ")
    
    with LocalCUDACluster(
        ip=sched_ip,
        n_workers=int(args.num_gpu_per_worker), 
        threads_per_worker=int(args.threads_per_worker) 
    ) as cluster:
        with Client(cluster) as client:
            print('[INFO]: ------ Calling main function ')
            using_quantile_device_dmatrix(client, dataset_source=args.dataset, job_dir=args.job_dir, model_name=args.model_name)

Writing training/trainer/task.py


## Wrapping Code (Custom Container)

In [None]:
%%writefile training/Dockerfile

FROM rapidsai/rapidsai-nightly:22.04-cuda11.2-base-ubuntu20.04-py3.9

RUN pip install google.cloud[storage] \
  && pip install gcsfs \
  && pip install pandas

COPY trainer trainer/

ENTRYPOINT ["python", "trainer/task.py"]

Writing training/Dockerfile


## Create: Repositories and Push Containers

In [261]:
!gcloud artifacts repositories create $TRAINING_REPOSITORY --location $REGION --repository-format docker

[1;31mERROR:[0m (gcloud.artifacts.repositories.create) ALREADY_EXISTS: the repository already exists


In [293]:
!docker build -t $IMAGE_URI training/.

Sending build context to Docker daemon  10.75kB
    RUN pip install google-cloud-storage   && pip install gcsfs COPY trainer trainer/
[0mStep 1/3 : FROM rapidsai/rapidsai-nightly:22.04-cuda11.2-base-ubuntu20.04-py3.9
 ---> 6f5057ed56a0
Step 2/3 : RUN pip install google-cloud-storage   && pip install gcsfs COPY trainer trainer/
 ---> Running in c07d92678834
Collecting google-cloud-storage
  Downloading google_cloud_storage-2.5.0-py2.py3-none-any.whl (106 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 107.0/107.0 KB 2.9 MB/s eta 0:00:00
Collecting google-resumable-media>=2.3.2
  Downloading google_resumable_media-2.3.3-py2.py3-none-any.whl (76 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 76.9/76.9 KB 12.7 MB/s eta 0:00:00
Collecting google-cloud-core<3.0dev,>=2.3.0
  Downloading google_cloud_core-2.3.2-py2.py3-none-any.whl (29 kB)
Collecting google-api-core!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.0,<3.0.0dev,>=1.31.5
  Downloading google_api_core-2.8.2-py3-none-any.whl (114 kB)
     ━━━━━━━

In [263]:
!docker push $IMAGE_URI

The push refers to repository [us-central1-docker.pkg.dev/jchavezar-demo/trainings/train_xgb_gpu]

[1B91d07aa7: Preparing 
[1B02f738af: Preparing 
[1Bd20a3982: Preparing 
[1B177f7dba: Preparing 
[1Be2396578: Preparing 
[1B749f56f5: Preparing 
[1Ba57855d6: Preparing 
[1Beca41527: Preparing 
[1B32e4a10b: Preparing 
[1Bc89280f3: Preparing 
[1Bd1e3350d: Preparing 
[1B8e8f7e67: Preparing 
[1B0767a47c: Layer already exists [6A[2K[3A[2Klatest: digest: sha256:8fd54135b058de05c60ff36b497c6d19ed6d61427be2e4db45cea8b776931f0b size: 3063


## Create: Vertex Pipe Evaluation Component

In [264]:
from kfp.v2.dsl import (component, Input, Output, Model, Metrics, Model)
from typing import NamedTuple

@component(
    packages_to_install=[
        "pandas",
        "gcsfs",
    ],
)
def get_train_job_details(
    model_dir: str,
    model_display_name: str,
    model: Output[Model],
    metrics: Output[Metrics],
    eval_metric_key: str
    ) -> NamedTuple(
    "Outputs", [("eval_metric", float)]
):
    import pandas as pd
    import logging
    from collections import namedtuple
    
    metrics_uri = "{}/model/xgboost/all_results.json".format(model_dir)
    metrics_df = pd.read_json(metrics_uri, typ="series")
    for k,v in metrics_df.items():
        logging.info(f"    {k} -> {v}")
        metrics.log_metric(k, min(v.values()))
        
    eval_metric = (min(metrics_df[eval_metric_key].values()) if eval_metric_key in metrics_df.keys() else None)
    model.metadata[eval_metric_key] = eval_metric
    outputs = namedtuple("Outputs", ["eval_metric"])
    
    return outputs(eval_metric)

## Create: Prediction Custom Container Vertex|Flask

In [265]:
%%writefile prediction/app.py

import os
import logging
import pandas as pd
import xgboost as xgb
from flask import Flask, request, Response, jsonify
from google.cloud import storage

client = storage.Client(project=os.environ['PROJECT_ID'])

# Model Download from gcs

fname = "model.json"

with open(fname, "wb") as model:
    client.download_blob_to_file(
        f"{os.environ['AIP_STORAGE_URI']}/{fname}", model
    )

# Loading model
print("[INFO] ------ Loading model from: {}".format(fname))
model = xgb.Booster(model_file=fname)

# Creation of the Flask app
app = Flask(__name__)

# Flask route for Liveness checks
@app.route(os.environ['AIP_HEALTH_ROUTE'])
def isalive():
    status_code = Response(status=200)
    return status_code

# Flask route for predictions
@app.route(os.environ['AIP_PREDICT_ROUTE'],methods=['GET','POST'])
def prediction():
    _features = ['Id','Elevation', 'Aspect', 'Slope', 'Horizontal_Distance_To_Hydrology', 'Vertical_Distance_To_Hydrology', 'Horizontal_Distance_To_Roadways',
                          'Hillshade_9am', 'Hillshade_Noon', 'Hillshade_3pm','Horizontal_Distance_To_Fire_Points', 'Wilderness_Area1', 'Wilderness_Area2', 'Wilderness_Area3', 
                          'Wilderness_Area4', 'Soil_Type1', 'Soil_Type2', 'Soil_Type3', 'Soil_Type4', 'Soil_Type5', 'Soil_Type6', 'Soil_Type7', 'Soil_Type8', 'Soil_Type9',
                          'Soil_Type10','Soil_Type11','Soil_Type12','Soil_Type13','Soil_Type14','Soil_Type15','Soil_Type16','Soil_Type17','Soil_Type18','Soil_Type19', 
                          'Soil_Type20', 'Soil_Type21', 'Soil_Type22', 'Soil_Type23', 'Soil_Type24', 'Soil_Type25', 'Soil_Type26', 'Soil_Type27', 'Soil_Type28', 'Soil_Type29',
                          'Soil_Type30', 'Soil_Type31', 'Soil_Type32', 'Soil_Type33', 'Soil_Type34', 'Soil_Type35', 'Soil_Type36', 'Soil_Type37', 'Soil_Type38', 'Soil_Type39', 'Soil_Type40']
    data = request.get_json(silent=True, force=True)
    dmf = xgb.DMatrix(pd.DataFrame(data["instances"], columns=_features))
    response = pd.DataFrame(model.predict(dmf))
    logging.info(f"Response: {response}")
    return jsonify({"Cover Type": str(response.idxmax(axis=1)[0])})

if __name__ == "__main__":
    app.run(debug=True, host='0.0.0.0', port=8080)

Writing prediction/app.py


### Preparing Docker Container Declarative Files

In [266]:
%%writefile prediction/requirements.txt

google-cloud-storage
numpy
pandas
flask
xgboost

Writing prediction/requirements.txt


In [267]:
%%writefile prediction/Dockerfile

FROM python:3.7-buster

RUN mkdir my-model

COPY app.py ./app.py
COPY requirements.txt ./requirements.txt
RUN pip install -r requirements.txt 

# Flask Env Variable
ENV FLASK_APP=app

# Expose port 8080
EXPOSE 8080

CMD flask run --host=0.0.0.0 --port=8080

Writing prediction/Dockerfile


### Push Container Image to Repository

In [268]:
!gcloud artifacts repositories create $PREDICTION_REPOSITORY --location $REGION --repository-format docker

[1;31mERROR:[0m (gcloud.artifacts.repositories.create) ALREADY_EXISTS: the repository already exists


In [269]:
!gcloud builds submit --tag $PREDICTION_IMAGE_URI prediction/. --timeout 3000

Creating temporary tarball archive of 3 file(s) totalling 2.5 KiB before compression.
Uploading tarball of [prediction/.] to [gs://jchavezar-demo_cloudbuild/source/1659824719.677456-63972e551a1f492c91461731f87a4a24.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/jchavezar-demo/locations/global/builds/0edd44f7-1237-4962-a716-b814034efd25].
Logs are available at [ https://console.cloud.google.com/cloud-build/builds/0edd44f7-1237-4962-a716-b814034efd25?project=569083142710 ].
----------------------------- REMOTE BUILD OUTPUT ------------------------------
starting build "0edd44f7-1237-4962-a716-b814034efd25"

FETCHSOURCE
Fetching storage object: gs://jchavezar-demo_cloudbuild/source/1659824719.677456-63972e551a1f492c91461731f87a4a24.tgz#1659824720077592
Copying gs://jchavezar-demo_cloudbuild/source/1659824719.677456-63972e551a1f492c91461731f87a4a24.tgz#1659824720077592...
/ [1 files][  1.2 KiB/  1.2 KiB]                                                
Operation completed over 

## Create: Pipeline

In [291]:
from kfp.v2.dsl import pipeline, Condition
from google.cloud import aiplatform
from google_cloud_pipeline_components.types import artifact_types
from google_cloud_pipeline_components import aiplatform as gcc
from google_cloud_pipeline_components.v1.custom_job import CustomTrainingJobOp as custom_job
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from kfp.v2.components import importer_node


@pipeline(name='dask-gpu-1')
def pipeline(
    project_id: str,
    custom_train_name: str,
    region: str,
    eval_acc_threshold: float,
    eval_metric_key: str,
    model_file_bucket: str,
):
    worker_pool_specs = [
    {
        "machine_spec": {
            "machine_type": "n1-standard-32",
            "accelerator_type": "NVIDIA_TESLA_T4",
            "accelerator_count": 4
        },
        "replica_count": "1",
        "container_spec": {
            "image_uri": IMAGE_URI,
            "env": [{"name": "AIP_TRAINING_DATA_URI", "value":'test'}],
            "command": [
                "python",
                "trainer/task.py"
            ],
            "args": [
                "--dataset_source", "gs://vtx-datasets-public/cover_type_4Mrows.csv",
                "--model-name", "model.json",
                "--num-gpu-per-worker", "4",
                "--threads-per-worker", "4"
            ]
        }
    },
    ]
    train_with_cpu_task = custom_job(
        project=project_id,
        display_name=custom_train_name,
        worker_pool_specs=worker_pool_specs,
        base_output_directory=model_file_bucket
    )
    get_train_details_task = get_train_job_details(
        model_dir=MODEL_FILE_BUCKET,
        model_display_name="xgboost-dask",
        eval_metric_key=eval_metric_key, # mlogloss
    ).after(train_with_cpu_task)
    
    with Condition(
        get_train_details_task.outputs["eval_metric"] > eval_acc_threshold,
        name="model-deploy-decision",
    ):
        import_unmanaged_model_op = importer_node.importer(
            artifact_uri="gs://vtx-models/model/xgboost",
            artifact_class=artifact_types.UnmanagedContainerModel,
            metadata={
                "containerSpec": {
                    "imageUri": PREDICTION_IMAGE_URI,
                    "env": [
                        {
                            "name": "PROJECT_ID",
                            "value": PROJECT_ID},
                    ],
                    "predictRoute": "/predict",
                    "healthRoute": "/health",
                    "ports": [
                        {
                            "containerPort": 8080
                        }
                    ]
                },
            },
        )
        custom_model_upload_job = gcc.ModelUploadOp(
            project=project_id,
            display_name="xgb-model",
            unmanaged_container_model=import_unmanaged_model_op.outputs["artifact"],
        ).after(import_unmanaged_model_op)
        
        endpoint_create_job = gcc.EndpointCreateOp(
            project=project_id,
            display_name="pipelines-created-endpoint",
        )
        
        custom_model_deploy_job = (gcc.ModelDeployOp(
            model=custom_model_upload_job.outputs["model"],
            endpoint=endpoint_create_job.outputs["endpoint"],
            deployed_model_display_name="xgboost_model_end",
            traffic_split={"0":"100"},
            dedicated_resources_machine_type="n1-standard-4",
            dedicated_resources_min_replica_count=1,
            dedicated_resources_max_replica_count=1
        )).set_caching_options(False)

## Compile Pipeline

In [289]:
from kfp.v2 import compiler
import warnings
warnings.filterwarnings('ignore')

compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='dask_cpu.json')

## Run Pipeline Job

In [290]:
import google.cloud.aiplatform as aip

job = aip.PipelineJob(
    display_name="dask_cpu",
    template_path="dask_cpu.json",
    pipeline_root=PIPELINE_ROOT_PATH,
    parameter_values={
        'project_id': PROJECT_ID,
        'custom_train_name': CUSTOM_TRAIN_NAME,
        'region': REGION,
        'eval_acc_threshold': 0.5,
        'eval_metric_key': 'mlogloss', # mlogloss
        'model_file_bucket': MODEL_FILE_BUCKET,
    },
    enable_caching=False
)

job.submit(service_account='vtx-pipe@jchavezar-demo.iam.gserviceaccount.com')

Creating PipelineJob
PipelineJob created. Resource name: projects/569083142710/locations/us-central1/pipelineJobs/dask-gpu-1-20220806203310
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/569083142710/locations/us-central1/pipelineJobs/dask-gpu-1-20220806203310')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/dask-gpu-1-20220806203310?project=569083142710


![](images/vertex-pipe-gpu.PNG)