# Imports

In [1]:
import kfp
from kfp.v2 import compiler
from kfp.components import create_component_from_func
from kfp.v2.google.client import AIPlatformClient
from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from kfp.v2.google import experimental

In [2]:
from kfp.v2 import dsl
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                        OutputPath, component)
from kfp.v2.google.client import AIPlatformClient

In [3]:
project_id = 'renatoleite-321218'
region = 'us-central1'
pipeline_root_path = 'gs://renatoleite-pipeline-samples/pipeline_root'

# Execute Vertex Training Job

In [None]:
@component
def test_nvt():
    print('test')

In [None]:
# Run as a Vertex training Job
@kfp.dsl.pipeline(
    name='pyfunc',
    pipeline_root=pipeline_root_path)
def pipeline(project_id: str):
    nvt_task = test_nvt()
    experimental.run_as_aiplatform_custom_job(
        nvt_task,
        worker_pool_specs=[
            {
                "containerSpec": {
                    "imageUri": "gcr.io/renatoleite-321218/test-nvt",
                },
                "replicaCount": "1",
                "machineSpec": {
                    "machineType": "n1-standard-16",
                    "accelerator_type": aiplatform.gapic.AcceleratorType.NVIDIA_TESLA_T4,
                    "accelerator_count": 2,
                },
            }
        ],
    )

In [6]:
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='pyfunc_training.json')

In [7]:
api_client = AIPlatformClient(project_id=project_id, region=region)

response = api_client.create_run_from_job_spec(
    'pyfunc_training.json',
    pipeline_root=pipeline_root_path,
    parameter_values={
        'project_id': project_id
    })



# Execute Vertex Pipeline Step

In [6]:
# Image based on Deeplearning Container
@component(base_image='gcr.io/renatoleite-321218/test-nvt')
def test_nvt_selector():
    print('pass all - Deeplearning container')

In [16]:
# Image based on Merlin Training
@component(base_image='gcr.io/renatoleite-321218/merlin-traning')
def test_nvt_selector():
    from nvtabular.utils import download_file
    import numpy as np
    from dask.distributed import Client
    from dask_cuda import LocalCUDACluster

    import nvtabular as nvt
    from nvtabular.utils import device_mem_size, get_rmm_size

    # External Dependencies
    import numpy as np
    from dask_cuda import LocalCUDACluster
    from dask.distributed import Client

    # NVTabular
    import nvtabular as nvt
    from nvtabular.ops import (
        Categorify,
        Clip,
        FillMissing,
        Normalize,
    )
    from nvtabular.utils import _pynvml_mem_size, device_mem_size

    print('pass all - Merlin image')

In [15]:
custom_nvt_comp = kfp.components.load_component_from_file('component.yaml')

In [5]:
# Run as a Vertex Pipeline step with GPU
@kfp.dsl.pipeline(
    name='pyfunc-selector-test',
    pipeline_root=pipeline_root_path)
def pipeline(project_id: str):
    nvt_task = (test_nvt_selector().
        set_cpu_limit('16').
        set_memory_limit('32').
        add_node_selector_constraint('cloud.google.com/gke-accelerator','nvidia-tesla-t4').
        set_gpu_limit('2'))

In [16]:
# Run as a Vertex Pipeline step
@kfp.dsl.pipeline(
    name='pyfunc-selector-test',
    pipeline_root=pipeline_root_path)
def pipeline(project_id: str):
    nvt_custom_task = (custom_nvt_comp().
        set_cpu_limit('16').
        set_memory_limit('32').
        add_node_selector_constraint('cloud.google.com/gke-accelerator','nvidia-tesla-t4').
        set_gpu_limit('2'))

In [17]:
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='pyfunc-selector-test.json')

In [18]:
api_client = AIPlatformClient(project_id=project_id, region=region)

response = api_client.create_run_from_job_spec(
    'pyfunc-selector-test.json',
    pipeline_root=pipeline_root_path,
    parameter_values={
        'project_id': project_id
    })

