## Overview

This is just a bunch of snippets ordered to create a quick pipeline which demonstrates Vertex capabilities, the pipeline contains different technologies, frameworks, etc, like dask, rapids, docker for the training leveraging GPU and using Flask as webserver for online predictions. 

## Set Variables

In [8]:
PROJECT_ID = 'jchavezar-demo'
REGION = 'us-central1'
CUSTOM_TRAIN_NAME = 'gpu_custom_job'
PIPELINE_ROOT_PATH = 'gs://vtx-root-path'
MODEL_FILE_BUCKET = 'gs://vtx-pipe-models'
TRAINING_REPOSITORY = 'trainings'
IMAGE_URI = f'us-central1-docker.pkg.dev/jchavezar-demo/{TRAINING_REPOSITORY}/train_gpu_xgb:latest'
PREDICTION_REPOSITORY = 'predictions'
PREDICTION_IMAGE_URI = f'us-central1-docker.pkg.dev/jchavezar-demo/{PREDICTION_REPOSITORY}/prediction:latest'
ART_REG = IMAGE_URI.split('/')[0]
DATASET_DISPLAY_NAME = 'covertype-4Mr'
DATASET_SOURCE = 'gs://vtx-datasets-public/cover_type_4Mrows.csv'

## Create Folders

In [3]:
!mkdir training

In [4]:
!mkdir prediction

In [5]:
!mkdir training/trainer

In [6]:
!touch training/trainer/__init__.py

## Utils for Storing Artifacts

In [7]:
%%writefile training/trainer/utils.py

from google.cloud import storage
import os
import logging

def save_model(args):
    """Saves the model to Google Cloud Storage or local file system
    Args:
      args: contains name for saved model.
    """
    scheme = 'gs://'
    if args.job_dir.startswith(scheme):
        print(f"Reading input job_dir: {args.job_dir}")
        job_dir = args.job_dir.split("/")
        bucket_name = job_dir[2]
        object_prefix = "/".join(job_dir[3:]).rstrip("/")
        print(f"Reading object_prefix: {object_prefix}")

        if object_prefix:
            model_path = '{}/{}'.format(object_prefix, "xgboost")
        else:
            model_path = '{}'.format("xgboost")
            
        print(f"The model path is {model_path}")
        bucket = storage.Client().bucket(bucket_name)    
        local_path = os.path.join("/tmp", "xgboost")
        files = [f for f in os.listdir(local_path) if os.path.isfile(os.path.join(local_path, f))]
        for file in files:
            local_file = os.path.join(local_path, file)
            blob = bucket.blob("/".join([model_path, file]))
            blob.upload_from_filename(local_file)
        print(local_file)
        print(f"gs://{bucket_name}/{model_path}")
        print(f"Saved model files in gs://{bucket_name}/{model_path}")
    else:
        print(f"Saved model files at {os.path.join('/tmp', args.model_name)}")
        print(f"To save model files in GCS bucket, please specify job_dir starting with gs://")

Writing training/trainer/utils.py


## Training Code with Dask + CUDA (GPU)

In [9]:
%%writefile training/trainer/task.py

import argparse
import os
import logging
import dask_cudf
import xgboost as xgb
import pandas as pd
#import pickle
from dask.distributed import Client
from dask_cuda import LocalCUDACluster

parser = argparse.ArgumentParser()
parser.add_argument('--dataset_source', dest='dataset',
                    type=str,
                    help='Dataset.')
parser.add_argument(
    '--job-dir',
    default=os.getenv('AIP_MODEL_DIR'),
    help='GCS location to export models')
parser.add_argument(
    '--model-name',
    default="custom-train",
    help='The name of your saved model')

args = parser.parse_args()

logging.info(f"Importing dataset {args.dataset}")
df = dask_cudf.read_csv(args.dataset)

logging.info("Cleaning and standarizing dataset")
df = df.dropna()

logging.info(f"Splitting dataset")
df_train, df_eval = df.random_split([0.8, 0.2], random_state=123)

df_train_features= df_train.drop('Cover_Type', axis=1)
df_eval_features= df_eval.drop('Cover_Type', axis=1)

df_train_labels = df_train.pop('Cover_Type')
df_eval_labels = df_eval.pop('Cover_Type')

if __name__ == '__main__':
    import utils

    logging.info("Creating dask cluster")
    cluster = LocalCUDACluster()
    client = Client(cluster)
    
    logging.info(client)
    
    # X and y must be Dask dataframes or arrays
    
    print(xgb.__version__)

    logging.info("Dataset for dask")
    dtrain = xgb.dask.DaskDMatrix(client, df_train_features, df_train_labels)
    
    logging.info("Dataset for dask")
    dvalid = xgb.dask.DaskDMatrix(client, df_eval_features, df_eval_labels)

    logging.info("Training")
    output = xgb.dask.train(
        client,
        {
            "verbosity": 2, 
            "tree_method": "gpu_hist", 
            "objective": "multi:softprob",
            "eval_metric": ["mlogloss"],
            "num_class": 8
        },
        dtrain,
        num_boost_round=4,
        evals=[(dvalid, "valid1")],
        early_stopping_rounds=5
    )
    
    # Saving models and exporting performance metrics
    
    df_eval_metrics = pd.DataFrame(output["history"]["valid1"])
    model = output["booster"]
    best_model = model[: model.best_iteration]
    logging.info(f"Best model: {best_model}")
    temp_dir = "/tmp/xgboost"
    os.mkdir(temp_dir)
    best_model.save_model("{}/{}".format(temp_dir, args.model_name))
    df_eval_metrics.to_json("{}/all_results.json".format(temp_dir))

    utils.save_model(args)

Writing training/trainer/task.py


## Wrapping Code (Custom Container)

In [11]:
%%writefile training/Dockerfile

FROM rapidsai/rapidsai-nightly:22.04-cuda11.2-base-ubuntu20.04-py3.9

RUN pip install google.cloud[storage] \
  && pip install gcsfs \
  && pip install xgboost --upgrade

COPY trainer trainer/

ENTRYPOINT ["python", "trainer/task.py"]

Overwriting training/Dockerfile


## Create: Repositories and Push Containers

In [None]:
!gcloud artifacts repositories create $TRAINING_REPOSITORY --location $REGION --repository-format docker

In [236]:
!gcloud builds submit -t $IMAGE_URI training/.

Creating temporary tarball archive of 7 file(s) totalling 7.4 KiB before compression.
Uploading tarball of [training/.] to [gs://jchavezar-demo_cloudbuild/source/1659231973.564417-245115c3229644fb961308575895e0e4.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/jchavezar-demo/locations/global/builds/359a736d-7d39-4379-9cd8-2d47e0becfd0].
Logs are available at [https://console.cloud.google.com/cloud-build/builds/359a736d-7d39-4379-9cd8-2d47e0becfd0?project=569083142710].
----------------------------- REMOTE BUILD OUTPUT ------------------------------
starting build "359a736d-7d39-4379-9cd8-2d47e0becfd0"

FETCHSOURCE
Fetching storage object: gs://jchavezar-demo_cloudbuild/source/1659231973.564417-245115c3229644fb961308575895e0e4.tgz#1659231973772453
Copying gs://jchavezar-demo_cloudbuild/source/1659231973.564417-245115c3229644fb961308575895e0e4.tgz#1659231973772453...
/ [1 files][  2.6 KiB/  2.6 KiB]                                                
Operation completed over 1 ob

## Create: Vertex Pipe Evaluation Component

In [1]:
from kfp.v2.dsl import (component, Input, Output, Model, Metrics, Model)
from typing import NamedTuple

@component(
    packages_to_install=[
        "pandas",
        "gcsfs",
    ],
)
def get_train_job_details(
    model_dir: str,
    model_display_name: str,
    model: Output[Model],
    metrics: Output[Metrics],
    eval_metric_key: str
    ) -> NamedTuple(
    "Outputs", [("eval_metric", float)]
):
    import pandas as pd
    import logging
    from collections import namedtuple
    
    metrics_uri = "{}/model/xgboost/all_results.json".format(model_dir)
    metrics_df = pd.read_json(metrics_uri, typ="series")
    for k,v in metrics_df.items():
        logging.info(f"    {k} -> {v}")
        metrics.log_metric(k, min(v.values()))
        
    eval_metric = (min(metrics_df[eval_metric_key].values()) if eval_metric_key in metrics_df.keys() else None)
    model.metadata[eval_metric_key] = eval_metric
    outputs = namedtuple("Outputs", ["eval_metric"])
    
    return outputs(eval_metric)

## Create: Prediction Custom Container Vertex|Flask

In [2]:
%%writefile prediction/app.py

import os
import logging
import pandas as pd
import xgboost as xgb
from flask import Flask, request, Response, jsonify
from google.cloud import storage

client = storage.Client(project=os.environ['PROJECT_ID'])

# Model Download from gcs

fname = "model.json"

with open(fname, "wb") as model:
    client.download_blob_to_file(
        f"{os.environ['AIP_STORAGE_URI']}/{fname}", model
    )

# Loading model
print("Loading model from: {}".format(fname))
model = xgb.Booster(model_file=fname)

# Creation of the Flask app
app = Flask(__name__)

# Flask route for Liveness checks
@app.route(os.environ['AIP_HEALTH_ROUTE'])
def isalive():
    status_code = Response(status=200)
    return status_code

# Flask route for predictions
@app.route(os.environ['AIP_PREDICT_ROUTE'],methods=['GET','POST'])
def prediction():
    _features = ['Id','Elevation', 'Aspect', 'Slope', 'Horizontal_Distance_To_Hydrology', 'Vertical_Distance_To_Hydrology', 'Horizontal_Distance_To_Roadways',
                          'Hillshade_9am', 'Hillshade_Noon', 'Hillshade_3pm','Horizontal_Distance_To_Fire_Points', 'Wilderness_Area1', 'Wilderness_Area2', 'Wilderness_Area3', 
                          'Wilderness_Area4', 'Soil_Type1', 'Soil_Type2', 'Soil_Type3', 'Soil_Type4', 'Soil_Type5', 'Soil_Type6', 'Soil_Type7', 'Soil_Type8', 'Soil_Type9',
                          'Soil_Type10','Soil_Type11','Soil_Type12','Soil_Type13','Soil_Type14','Soil_Type15','Soil_Type16','Soil_Type17','Soil_Type18','Soil_Type19', 
                          'Soil_Type20', 'Soil_Type21', 'Soil_Type22', 'Soil_Type23', 'Soil_Type24', 'Soil_Type25', 'Soil_Type26', 'Soil_Type27', 'Soil_Type28', 'Soil_Type29',
                          'Soil_Type30', 'Soil_Type31', 'Soil_Type32', 'Soil_Type33', 'Soil_Type34', 'Soil_Type35', 'Soil_Type36', 'Soil_Type37', 'Soil_Type38', 'Soil_Type39', 'Soil_Type40']
    data = request.get_json(silent=True, force=True)
    dmf = xgb.DMatrix(pd.DataFrame(data["instances"], columns=_features))
    response = pd.DataFrame(model.predict(dmf))
    logging.info(f"Response: {response}")
    return jsonify({"Cover Type": str(response.idxmax(axis=1)[0])})

if __name__ == "__main__":
    app.run(debug=True, host='0.0.0.0', port=8080)

Writing prediction/app.py


### Preparing Docker Container Declarative Files

In [4]:
%%writefile prediction/requirements.txt

google-cloud-storage
numpy
pandas
flask
xgboost

Overwriting prediction/requirements.txt


In [5]:
%%writefile prediction/Dockerfile

FROM python:3.7-buster

RUN mkdir my-model

COPY app.py ./app.py
COPY requirements.txt ./requirements.txt
RUN pip install -r requirements.txt 

# Flask Env Variable
ENV FLASK_APP=app

# Expose port 8080
EXPOSE 8080

CMD flask run --host=0.0.0.0 --port=8080

Writing prediction/Dockerfile


### Push Container Image to Repository

In [None]:
!gcloud artifacts repositories create $PREDICTION_REPOSITORY --location $REGION --repository-format docker

In [341]:
!gcloud builds submit --tag $PREDICTION_IMAGE_URI prediction/. --timeout 3000

Creating temporary tarball archive of 5 file(s) totalling 131.4 KiB before compression.
Uploading tarball of [prediction/.] to [gs://jchavezar-demo_cloudbuild/source/1659361280.259642-01da885bed7246298da7751489b7144b.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/jchavezar-demo/locations/global/builds/dc057492-3f62-4528-b3ea-870fa07e3ba8].
Logs are available at [https://console.cloud.google.com/cloud-build/builds/dc057492-3f62-4528-b3ea-870fa07e3ba8?project=569083142710].
----------------------------- REMOTE BUILD OUTPUT ------------------------------
starting build "dc057492-3f62-4528-b3ea-870fa07e3ba8"

FETCHSOURCE
Fetching storage object: gs://jchavezar-demo_cloudbuild/source/1659361280.259642-01da885bed7246298da7751489b7144b.tgz#1659361280532657
Copying gs://jchavezar-demo_cloudbuild/source/1659361280.259642-01da885bed7246298da7751489b7144b.tgz#1659361280532657...
/ [1 files][ 34.8 KiB/ 34.8 KiB]                                                
Operation completed over 

## Create: Pipeline

In [14]:
from kfp.v2.dsl import pipeline, Condition
from google.cloud import aiplatform
from google_cloud_pipeline_components.types import artifact_types
from google_cloud_pipeline_components import aiplatform as gcc
from google_cloud_pipeline_components.v1.custom_job import CustomTrainingJobOp as custom_job
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from kfp.v2.components import importer_node


@pipeline(name='dask-gpu-1')
def pipeline(
    project_id: str,
    custom_train_name: str,
    region: str,
    eval_acc_threshold: float,
    eval_metric_key: str,
    model_file_bucket: str,
):
    worker_pool_specs = [
    {
        "machine_spec": {
            "machine_type": "a2-highgpu-1g",
            "accelerator_type": "NVIDIA_TESLA_A100",
            "accelerator_count": 1
        },
        "replica_count": "1",
        "container_spec": {
            "image_uri": IMAGE_URI,
            "env": [{"name": "AIP_TRAINING_DATA_URI", "value":'test'}],
            "command": [
                "python",
                "trainer/task.py"
            ],
            "args": [
                "--dataset_source", "gs://vtx-datasets-public/cover_type_4Mrows.csv",
                "--model-name", "model.json"
            ]
        }
    }
    ]
    train_with_cpu_task = custom_job(
        project=project_id,
        display_name=custom_train_name,
        worker_pool_specs=worker_pool_specs,
        base_output_directory=model_file_bucket
    )
    get_train_details_task = get_train_job_details(
        model_dir=MODEL_FILE_BUCKET,
        model_display_name="xgboost-dask",
        eval_metric_key=eval_metric_key, # mlogloss
    ).after(train_with_cpu_task)
    
    with Condition(
        get_train_details_task.outputs["eval_metric"] > eval_acc_threshold,
        name="model-deploy-decision",
    ):
        import_unmanaged_model_op = importer_node.importer(
            artifact_uri="gs://vtx-models/model/xgboost",
            artifact_class=artifact_types.UnmanagedContainerModel,
            metadata={
                "containerSpec": {
                    "imageUri": PREDICTION_IMAGE_URI,
                    "env": [
                        {
                            "name": "PROJECT_ID",
                            "value": PROJECT_ID},
                    ],
                    "predictRoute": "/predict",
                    "healthRoute": "/health",
                    "ports": [
                        {
                            "containerPort": 8080
                        }
                    ]
                },
            },
        )
        custom_model_upload_job = gcc.ModelUploadOp(
            project=project_id,
            display_name="xgb-model",
            unmanaged_container_model=import_unmanaged_model_op.outputs["artifact"],
        ).after(import_unmanaged_model_op)
        
        endpoint_create_job = gcc.EndpointCreateOp(
            project=project_id,
            display_name="pipelines-created-endpoint",
        )
        
        custom_model_deploy_job = (gcc.ModelDeployOp(
            model=custom_model_upload_job.outputs["model"],
            endpoint=endpoint_create_job.outputs["endpoint"],
            deployed_model_display_name="xgboost_model_end",
            traffic_split={"0":"100"},
            dedicated_resources_machine_type="n1-standard-4",
            dedicated_resources_min_replica_count=1,
            dedicated_resources_max_replica_count=1
        )).set_caching_options(False)

## Compile Pipeline

In [15]:
from kfp.v2 import compiler
import warnings
warnings.filterwarnings('ignore')

compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='dask_cpu.json')

## Run Pipeline Job

In [16]:
import google.cloud.aiplatform as aip

job = aip.PipelineJob(
    display_name="dask_cpu",
    template_path="dask_cpu.json",
    pipeline_root=PIPELINE_ROOT_PATH,
    parameter_values={
        'project_id': PROJECT_ID,
        'custom_train_name': CUSTOM_TRAIN_NAME,
        'region': REGION,
        'eval_acc_threshold': 0.5,
        'eval_metric_key': 'mlogloss', # mlogloss
        'model_file_bucket': MODEL_FILE_BUCKET,
    },
)

job.submit(service_account='vtx-pipe@jchavezar-demo.iam.gserviceaccount.com')

Creating PipelineJob
PipelineJob created. Resource name: projects/569083142710/locations/us-central1/pipelineJobs/dask-gpu-1-20220801130945
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/569083142710/locations/us-central1/pipelineJobs/dask-gpu-1-20220801130945')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/dask-gpu-1-20220801130945?project=569083142710


![Pipe](images/vertex-pipe-gpu.png)