# An end-to-end Vertex Training Pipeline Demonstration

Finally, check that you have correctly installed the packages. The KFP SDK version should be >=1.6:

In [1]:
!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"

KFP SDK version: 1.8.18


In [5]:
import os
import json
from functools import partial

import kfp
import pprint
import yaml
from jinja2 import Template
from kfp.v2 import dsl
from kfp.v2.compiler import compiler
from kfp.v2.dsl import Dataset, Input, Metrics, Model, Output, component
from kfp.v2.google.client import AIPlatformClient

from google.cloud import aiplatform, firestore
from datetime import datetime
from google_cloud_pipeline_components.experimental import hyperparameter_tuning_job
from google_cloud_pipeline_components.v1.hyperparameter_tuning_job import HyperparameterTuningJobRunOp
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from google_cloud_pipeline_components.types import artifact_types
from kfp.v2.components import importer_node

In [6]:
project_id='hyu-ml-sandbox'
project_number='439762652'

In [7]:
af_registry_location='us-central1'
af_registry_name='mlops-vertex-kit'

In [8]:
components_dir='../components/'

In [9]:
def _load_custom_component(project_id: str,
                           af_registry_location: str,
                           af_registry_name: str,
                           components_dir: str,
                           component_name: str):
  component_path = os.path.join(components_dir,
                                component_name,
                                'component.yaml.jinja')
  with open(component_path, 'r') as f:
    component_text = Template(f.read()).render(
      project_id=project_id,
      af_registry_location=af_registry_location,
      af_registry_name=af_registry_name)

  return kfp.components.load_component_from_text(component_text)

load_custom_component = partial(_load_custom_component,
                                project_id=project_id,
                                af_registry_location=af_registry_location,
                                af_registry_name=af_registry_name,
                                components_dir=components_dir)

In [10]:
preprocess_op = load_custom_component(component_name='data_preprocess')
train_op = load_custom_component(component_name='train_model')

Then define the pipeline using the following function:

In [11]:
pipeline_region='us-central1'
pipeline_root='gs://vertex_pipeline_demo_root_hyu/pipeline_root'

In [12]:
data_region='us-central1'
input_dataset_uri='bq://hyu-ml-sandbox.vertex_pipeline_demo.banknote_authentication'
gcs_data_output_folder='gs://vertex_pipeline_demo_root_hyu/datasets/training'
training_data_schema='VWT:float;SWT:float;KWT:float;Entropy:float;Class:int'

data_pipeline_root='gs://vertex_pipeline_demo_root_hyu/compute_root'

In [13]:
training_container_image_uri=f'{af_registry_location}-docker.pkg.dev/{project_id}/{af_registry_name}/training:latest'
serving_container_image_uri=f'{af_registry_location}-docker.pkg.dev/{project_id}/{af_registry_name}/serving:latest'
hpo_container_image_uri=f'{af_registry_location}-docker.pkg.dev/{project_id}/{af_registry_name}/hpo:latest'
custom_job_service_account=f'{project_number}-compute@developer.gserviceaccount.com'


In [14]:
training_container_image_uri,serving_container_image_uri,custom_job_service_account,hpo_container_image_uri

('us-central1-docker.pkg.dev/hyu-ml-sandbox/mlops-vertex-kit/training:latest',
 'us-central1-docker.pkg.dev/hyu-ml-sandbox/mlops-vertex-kit/serving:latest',
 '439762652-compute@developer.gserviceaccount.com',
 'us-central1-docker.pkg.dev/hyu-ml-sandbox/mlops-vertex-kit/hpo:latest')

In [15]:
train_additional_args = json.dumps({
    'num_leaves_hp_param_min': 6,
    'num_leaves_hp_param_max': 11,
    'max_depth_hp_param_min': -1,
    'max_depth_hp_param_max': 4
})
train_additional_args

'{"num_leaves_hp_param_min": 6, "num_leaves_hp_param_max": 11, "max_depth_hp_param_min": -1, "max_depth_hp_param_max": 4}'

In [41]:
from kfp.v2.dsl import component
from kfp.v2.dsl import Dataset, Input, Metrics, Model, Output

@component
def worker_pool_specs(project_id: str,
    data_region: str,
    data_pipeline_root: str,
    hpo_container_image_uri: str,
    custom_job_service_account: str,
    input_dataset: Input[Dataset],
    input_data_schema: str) -> list:
    """
    Pass the preprocessed data uri to HPO as a worker pool argument. The vanilla HPO API 
    doesn't support 'input data' so it's done this way.
    
    data_preprocess -> dataset.uri -> CMDARGS -> worker_pool_specs -> HPO
    """
    
    task_type = 'training'
    display_name = 'hpo-pipeline-template'
    fields = [field.split(':')[0] for field in input_data_schema.split(';')]
    label = fields[-1]
    features = ','.join(fields[0:-1])
    CMDARGS = [
    "--training_data_uri="+str(input_dataset.uri),
    "--training_data_schema="+input_data_schema,
    "--label="+label,
    "--features="+features
]

    # The spec of the worker pools including machine type and Docker image
    worker_pool_specs = [
    {
        "machine_spec": {
            "machine_type": "n1-standard-4",
        },
        "replica_count": 1,
        "container_spec": {"image_uri": hpo_container_image_uri, "args": CMDARGS},
    }
    ]
    
    return worker_pool_specs

@component(packages_to_install=['google-cloud-firestore==2.3'])
def best_hpo_to_args(hpo_best: str,
                    project_id: str,
                    solution_name: str,
                    as_at_date: str) -> str:
    """
    Write the best HPO params to firestore. 
    We keep the output to chain this component to the hpo_completion step.
    """
    
    import json
    from google.cloud import firestore
    hpo_best = json.loads(hpo_best.replace("'", '"'))

    hpo_best_dict = {}
    
    for i in hpo_best['parameters']:
        hpo_best_dict.update({i['parameterId']: i['value']})
    
    for i in hpo_best['finalMeasurement']['metrics']:
        hpo_best_dict.update({i['metricId']: i['value']})
    
    db = firestore.Client(project=project_id)
    db.collection(solution_name+"_hpo").document("params").set(hpo_best_dict,merge=True)
    
    hpo_best_dict=str(hpo_best_dict).replace("'", '"')
    
    return hpo_best_dict

@component
def hpo_completion(hpo_flags: str) -> str:
    """
    This function doesn nothing but wait to merge all the async HPO jobs so as 
    to gurantee that the following training module gets the latest params from
    firestore for all warehouses.
    """
    return "true"

def hpo(project_id,
         data_region,
         data_pipeline_root,
         preprocess_task,
         display_name,
         metric_spec,
         parameter_spec,
         max_trial_count,
         parallel_trial_count,
         study_spec_algorithm,
         hpo_container_image_uri,
         training_data_schema):
    """
    This is not a component function. It's a normal function that generates combines hpo operations. 
    We return the pipeline operation to chain this component to the hpo_completion step. 
    """
    
    worker_pool_specs_op = worker_pool_specs(project_id=project_id,
    data_region=data_region,
    data_pipeline_root=data_pipeline_root,
    hpo_container_image_uri=hpo_container_image_uri,
    custom_job_service_account=custom_job_service_account,                               
    input_dataset=preprocess_task.outputs['output_dataset'],
    input_data_schema=training_data_schema
    )

    tuning_op = HyperparameterTuningJobRunOp(
    display_name=display_name,
    project=project_id,
    location=data_region,
    worker_pool_specs=worker_pool_specs_op.output,
    study_spec_metrics=metric_spec,
    study_spec_parameters=parameter_spec,
    max_trial_count=max_trial_count,
    parallel_trial_count=parallel_trial_count,
    base_output_directory=data_pipeline_root,
    study_spec_algorithm=study_spec_algorithm
    )
 
    trials_op = hyperparameter_tuning_job.GetTrialsOp(
        gcp_resources=tuning_op.outputs["gcp_resources"]
    )

    best_trial_op = hyperparameter_tuning_job.GetBestTrialOp(
        trials=trials_op.output, study_spec_metrics=metric_spec
    )
    
    best_hpo_to_args_op = best_hpo_to_args(best_trial_op.output,
                                          project_id=project_id,               
                                        as_at_date=datetime.now().strftime('%Y-%m-%d'),
                                          solution_name=display_name)
    return best_hpo_to_args_op

In [42]:
@dsl.pipeline(name='training-pipeline-template')
def pipeline(project_id: str,
             data_region: str,
             gcs_data_output_folder: str,
             input_dataset_uri: str,
             training_data_schema: str,
             data_pipeline_root: str,
             
             training_container_image_uri: str,
             hpo_container_image_uri: str,
             train_additional_args: str,
             serving_container_image_uri: str,
             custom_job_service_account: str,
             hptune_region: str,
             hp_config_suggestions_per_request: int,
             hp_config_max_trials: int,
             
             metrics_name: str,
             metrics_threshold: float,
             
             endpoint_machine_type: str,
             endpoint_min_replica_count: int,
             endpoint_max_replica_count: int,
             endpoint_test_instances: str,
             
             monitoring_user_emails: str,
             monitoring_log_sample_rate: float,
             monitor_interval: int,
             monitoring_default_threshold: float,
             monitoring_custom_skew_thresholds: str,
             monitoring_custom_drift_thresholds: str,
             
             machine_type: str = "n1-standard-4",
             accelerator_count: int = 0,
             accelerator_type: str = 'ACCELERATOR_TYPE_UNSPECIFIED',
             vpc_network: str = "",
             enable_model_monitoring: str = 'False'):
    
    display_name = 'hpo-pipeline-template'
    metric_spec = hyperparameter_tuning_job.serialize_metrics({"au_roc": "maximize"})
    parameter_spec = hyperparameter_tuning_job.serialize_parameters(
    {
        "num_boost_round": aiplatform.hyperparameter_tuning.DiscreteParameterSpec(
            values=[100, 200], scale=None
        ),
        "min_data_in_leaf": aiplatform.hyperparameter_tuning.DiscreteParameterSpec(
            values=[5, 10], scale=None
        ),
    })
    max_trial_count=4
    parallel_trial_count=4
    study_spec_algorithm='GRID_SEARCH'
    

    dataset_importer = kfp.v2.dsl.importer(
      artifact_uri=input_dataset_uri,
      artifact_class=Dataset,
      reimport=False)

    preprocess_task = preprocess_op(
      project_id=project_id,
      data_region=data_region,
      gcs_output_folder=gcs_data_output_folder,
      gcs_output_format="CSV",
      input_dataset=dataset_importer.output)
    
    hpo_op = hpo(project_id,
             data_region,
             data_pipeline_root,
             preprocess_task,
             display_name,
             metric_spec,
             parameter_spec,
             max_trial_count,
             parallel_trial_count,
             study_spec_algorithm,
             hpo_container_image_uri,
             training_data_schema)
    
    hpo_completion_op = hpo_completion(str(hpo_op.output))
    
    with dsl.Condition(
         hpo_completion_op.output=="true",
        name="train_model"
    ):
        """
        We use the condition module to check if all HPO jobs for different warehouse are finished so as to
        kick off the training step at the right time.
        """

        train_task = train_op(
          project_id=project_id,
          data_region=data_region,
          data_pipeline_root=data_pipeline_root,
          input_data_schema=training_data_schema,
          training_container_image_uri=training_container_image_uri,
          train_additional_args=train_additional_args,
          serving_container_image_uri=serving_container_image_uri,
          custom_job_service_account=custom_job_service_account,
          input_dataset=preprocess_task.outputs['output_dataset'],
          machine_type=machine_type,
          accelerator_count=accelerator_count,
          accelerator_type=accelerator_type,
          hptune_region=hptune_region,
          hp_config_max_trials=hp_config_max_trials,
          hp_config_suggestions_per_request=hp_config_suggestions_per_request,
          vpc_network=vpc_network)

### Compile and run the end-to-end ML pipeline
With our full pipeline defined, it's time to compile it:

In [43]:
compiler.Compiler().compile(
    pipeline_func=pipeline, 
    package_path="hpo_pipeline_job.json"
)

Next, instantiate an API client:

In [44]:
api_client = AIPlatformClient(
    project_id=project_id,
    region=pipeline_region)

Next, kick off a pipeline run:

In [45]:
test_instances = json.dumps([
		{"VWT":3.6216,"SWT":8.6661,"KWT":-2.8073,"Entropy":-0.44699,"Class":"0"},
		{"VWT":4.5459,"SWT":8.1674,"KWT":-2.4586,"Entropy":-1.4621,"Class":"0"},
		{"VWT":3.866,"SWT":-2.6383,"KWT":1.9242,"Entropy":0.10645,"Class":"0"},
		{"VWT":-3.7503,"SWT":-13.4586,"KWT":17.5932,"Entropy":-2.7771,"Class":"1"},
		{"VWT":-3.5637,"SWT":-8.3827,"KWT":12.393,"Entropy":-1.2823,"Class":"1"},
		{"VWT":-2.5419,"SWT":-0.65804,"KWT":2.6842,"Entropy":1.1952,"Class":"1"}
		])
test_instances

'[{"VWT": 3.6216, "SWT": 8.6661, "KWT": -2.8073, "Entropy": -0.44699, "Class": "0"}, {"VWT": 4.5459, "SWT": 8.1674, "KWT": -2.4586, "Entropy": -1.4621, "Class": "0"}, {"VWT": 3.866, "SWT": -2.6383, "KWT": 1.9242, "Entropy": 0.10645, "Class": "0"}, {"VWT": -3.7503, "SWT": -13.4586, "KWT": 17.5932, "Entropy": -2.7771, "Class": "1"}, {"VWT": -3.5637, "SWT": -8.3827, "KWT": 12.393, "Entropy": -1.2823, "Class": "1"}, {"VWT": -2.5419, "SWT": -0.65804, "KWT": 2.6842, "Entropy": 1.1952, "Class": "1"}]'

In [55]:
pipeline_params = {
    'project_id': project_id,
    'data_region': data_region,
    'gcs_data_output_folder': gcs_data_output_folder,
    'input_dataset_uri': input_dataset_uri,
    'training_data_schema': training_data_schema,
    'data_pipeline_root': data_pipeline_root,
    
    'training_container_image_uri': training_container_image_uri,
    'hpo_container_image_uri': hpo_container_image_uri,
    'train_additional_args': train_additional_args,
    'serving_container_image_uri': serving_container_image_uri,
    'custom_job_service_account': custom_job_service_account,
    'hptune_region':"us-central1",
    'hp_config_suggestions_per_request': 5,
    'hp_config_max_trials': 30,
    
    'metrics_name': 'au_prc',
    'metrics_threshold': 0.4,
    
    'endpoint_machine_type': 'n1-standard-4',
    'endpoint_min_replica_count': 1,
    'endpoint_max_replica_count': 1,
    'endpoint_test_instances': test_instances,
    
    'monitoring_user_emails': 'simon19891101@google.com',
    'monitoring_log_sample_rate': 0.8,
    'monitor_interval': 3600,
    'monitoring_default_threshold': 0.3,
    'monitoring_custom_skew_thresholds': 'VWT:.5,SWT:.2,KWT:.7,Entropy:.4',
    'monitoring_custom_drift_thresholds': 'VWT:.5,SWT:.2,KWT:.7,Entropy:.4',
    'enable_model_monitoring': 'True'
}

response = api_client.create_run_from_job_spec(
    job_spec_path="hpo_pipeline_job.json", 
    pipeline_root=pipeline_root,
    parameter_values=pipeline_params,
    service_account=custom_job_service_account,
    enable_caching=True)

In [53]:
import hptune as hpt
hpt.report_hyperparameter_tuning_metric?

ModuleNotFoundError: No module named 'hptune'