# An end-to-end Vertex Pipeline Demonstration

Finally, check that you have correctly installed the packages. The KFP SDK version should be >=1.6:

In [1]:
!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"

KFP SDK version: 1.8.2


In [20]:
import os
from functools import partial

import kfp
import pprint
import yaml
from jinja2 import Template
from kfp.v2 import dsl
from kfp.v2.compiler import compiler
from kfp.v2.dsl import Dataset
from kfp.v2.google.client import AIPlatformClient

In [21]:
project_id='woven-rush-197905'
project_number='297370817971'

In [22]:
af_registry_location='asia-southeast1'
af_registry_name='mlops-vertex-kit'

In [23]:
components_dir='../components/'

In [24]:
def _load_custom_component(project_id: str,
                           af_registry_location: str,
                           af_registry_name: str,
                           components_dir: str,
                           component_name: str):
  component_path = os.path.join(components_dir,
                                component_name,
                                'component.yaml.jinja')
  with open(component_path, 'r') as f:
    component_text = Template(f.read()).render(
      project_id=project_id,
      af_registry_location=af_registry_location,
      af_registry_name=af_registry_name)

  return kfp.components.load_component_from_text(component_text)

load_custom_component = partial(_load_custom_component,
                                project_id=project_id,
                                af_registry_location=af_registry_location,
                                af_registry_name=af_registry_name,
                                components_dir=components_dir)

In [25]:
preprocess_op = load_custom_component(component_name='data_preprocess')
train_op = load_custom_component(component_name='train_model')
check_metrics_op = load_custom_component(component_name='check_model_metrics')
create_endpoint_op = load_custom_component(component_name='create_endpoint')
deploy_model_op = load_custom_component(component_name='deploy_model')
monitor_model_op = load_custom_component(component_name='monitor_model')

Then define the pipeline using the following function:

In [42]:
pipeline_region='asia-southeast1'
pipeline_root='gs://vertex_pipeline_demo_root/pipeline_root'

In [43]:
data_region='asia-southeast1'
input_dataset_uri='bq://woven-rush-197905.vertex_pipeline_demo.banknote_authentication'
gcs_data_output_folder='gs://vertex_pipeline_demo_root/datasets/training'
training_data_schema='VWT:float;SWT:float;KWT:float;Entropy:float;Class:int'

data_pipeline_root='gs://vertex_pipeline_demo_root/compute_root'

In [44]:
training_container_image_uri=f'{af_registry_location}-docker.pkg.dev/{project_id}/{af_registry_name}/training:latest'
serving_container_image_uri=f'{af_registry_location}-docker.pkg.dev/{project_id}/{af_registry_name}/serving:latest'
custom_job_service_account=f'{project_number}-compute@developer.gserviceaccount.com'

In [61]:
@dsl.pipeline(name='training-pipeline-template')
def pipeline(project_id: str,
             data_region: str,
             gcs_data_output_folder: str,
             input_dataset_uri: str,
             training_data_schema: str,
             data_pipeline_root: str,
             
             training_container_image_uri: str,
             serving_container_image_uri: str,
             custom_job_service_account: str,
             hptune_region: str,
             
             metrics_name: str,
             metrics_threshold: float,
             
             endpoint_machine_type: str,
             endpoint_min_replica_count: int,
             endpoint_max_replica_count: int,
             
             machine_type: str = "n1-standard-8",
             accelerator_count: int = 0,
             accelerator_type: str = 'ACCELERATOR_TYPE_UNSPECIFIED',
             vpc_network: str = ""):

    dataset_importer = kfp.v2.dsl.importer(
      artifact_uri=input_dataset_uri,
      artifact_class=Dataset,
      reimport=False)

    preprocess_task = preprocess_op(
      project_id=project_id,
      data_region=data_region,
      gcs_output_folder=gcs_data_output_folder,
      gcs_output_format="CSV",
      input_dataset=dataset_importer.output)
    
    train_task = train_op(
      project_id=project_id,
      data_region=data_region,
      data_pipeline_root=data_pipeline_root,
      input_data_schema=training_data_schema,
      training_container_image_uri=training_container_image_uri,
      serving_container_image_uri=serving_container_image_uri,
      custom_job_service_account=custom_job_service_account,
      input_dataset=preprocess_task.outputs['output_dataset'],
      machine_type=machine_type,
      accelerator_count=accelerator_count,
      accelerator_type=accelerator_type,
      hptune_region=hptune_region,
      vpc_network=vpc_network)
    
    check_metrics_task = check_metrics_op(
      metrics_name=metrics_name,
      metrics_threshold=metrics_threshold,
      basic_metrics=train_task.outputs['basic_metrics'])
    
    create_endpoint_task = create_endpoint_op(
      project_id=project_id,
      data_region=data_region,
      data_pipeline_root=data_pipeline_root,
      display_name='endpoint-template',
      create_if_not_exists=True)

    deploy_model_task = deploy_model_op(
      project_id=project_id,
      data_region=data_region,
      data_pipeline_root=data_pipeline_root,
      machine_type=endpoint_machine_type,
      min_replica_count=endpoint_min_replica_count,
      max_replica_count=endpoint_max_replica_count,
      model=train_task.outputs['output_model'],
      endpoint=create_endpoint_task.outputs['endpoint'])

### Compile and run the end-to-end ML pipeline
With our full pipeline defined, it's time to compile it:

In [62]:
compiler.Compiler().compile(
    pipeline_func=pipeline, 
    package_path="training_pipeline_job.json"
)

Next, instantiate an API client:

In [63]:
api_client = AIPlatformClient(
    project_id=project_id,
    region=pipeline_region)



Next, kick off a pipeline run:

In [64]:
pipeline_params = {
    'project_id': project_id,
    'data_region': data_region,
    'gcs_data_output_folder': gcs_data_output_folder,
    'input_dataset_uri': input_dataset_uri,
    'training_data_schema': training_data_schema,
    'data_pipeline_root': data_pipeline_root,
    
    'training_container_image_uri': training_container_image_uri,
    'serving_container_image_uri': serving_container_image_uri,
    'custom_job_service_account': custom_job_service_account,
    'hptune_region':"",
    
    'metrics_name': 'au_prc',
    'metrics_threshold': 0.4,
    
    'endpoint_machine_type': 'n1-standard-4',
    'endpoint_min_replica_count': 1,
    'endpoint_max_replica_count': 1,
}

response = api_client.create_run_from_job_spec(
    job_spec_path="training_pipeline_job.json", 
    pipeline_root=pipeline_root,
    parameter_values=pipeline_params,
    enable_caching=False)