In [1]:
!pip install -Uqq kfp
!pip install -Uqq google-cloud-aiplatform
!pip install -Uqq google-cloud-pipeline-components

In [3]:
import kfp
from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from kfp.v2.dsl import component

In [4]:
#@title GCS
PROJECT_ID = 'lias-project'
PIPELINE_ROOT_PATH = 'gs://vertexai-demo-pipeline'  #@param {type:"string"}
PIPELINE_NAME = 'cifar10-pipeline-automl' #@param {type:"string"}
PROJECT_ID

'lias-project'

In [14]:
from kfp.v2.dsl import Artifact, Output

@component(
    packages_to_install=["google-cloud-aiplatform", "google-cloud-pipeline-components"]
)
#to get dataset ID if one exists
def get_dataset_id(project_id: str, 
                  location: str,
                  dataset_name: str,
                  dataset_path: str,
                  dataset: Output[Artifact]) -> str:
    from google.cloud import aiplatform
    from google.cloud.aiplatform.datasets.image_dataset import ImageDataset
    from google_cloud_pipeline_components.types.artifact_types import VertexDataset

    
    aiplatform.init(project=project_id, location=location)
    
    datasets = aiplatform.ImageDataset.list(project=project_id,
                                            location=location,
                                            filter=f'display_name={dataset_name}')
    
    if len(datasets) > 0:
        dataset.metadata['resourceName'] = f'projects/{project_id}/locations/{location}/datasets/{datasets[0].name}'
        return f'projects/{project_id}/locations/{location}/datasets/{datasets[0].name}'
    else:
        return 'None'

In [15]:
from kfp.v2.dsl import Artifact, Output

@component(
    packages_to_install=["google-cloud-aiplatform", "google-cloud-pipeline-components"]
)
#to get model ID if one exists
def get_model_id(project_id: str, 
                 location: str,
                 model_name: str,
                 model: Output[Artifact]) -> str:
    from google.cloud import aiplatform
    from google_cloud_pipeline_components.types.artifact_types import VertexModel
    
    aiplatform.init(project=project_id, location=location)
    
    models = aiplatform.Model.list(project=project_id,
                                   location=location,
                                   filter=f'display_name={model_name}')
    
    if len(models) > 0:
        model.metadata['resourceName'] = f'projects/{project_id}/locations/{location}/models/{models[0].name}'
        return f'projects/{project_id}/locations/{location}/models/{models[0].name}'
    else:
        return 'None'

In [16]:
from google.cloud.aiplatform.datasets.image_dataset import ImageDataset
from google_cloud_pipeline_components.types.artifact_types import VertexDataset

#define the workflow of the pipeline
@kfp.dsl.pipeline(
    name=PIPELINE_NAME,
    pipeline_root=PIPELINE_ROOT_PATH)
def pipeline(project_id: str, 
             location: str,
             dataset_name: str,
             dataset_path: str,
             base_model_name: str):
    
    #the first step of the workflow is a dataset generator.
    #this step checks if the dataset already exists, if no dataset exists, it will create one
    dataset_op = get_dataset_id(project_id=project_id,
                                location=location,
                                dataset_name=dataset_name,
                                dataset_path=dataset_path)
    
    #it takes a Google Cloud pipeline componenet, provides the necessary input arguments, and uses ds_op to define its output.
    #ds_op only stores the definition of the output, not the actual returned object from the execution
    with kfp.dsl.Condition(dataset_op.outputs['Output'] == 'None', name="create dataset"):
        ds_op = gcc_aip.ImageDatasetCreateOp(
            project=project_id,
            display_name=dataset_name,
            gcs_source=dataset_path,
            import_schema_uri=aiplatform.schema.dataset.ioformat.image.single_label_classification,
        )
        ds_op.after(dataset_op)
        
        #the second step is a model training component. 
        #it takes the dataset outputted from the first step and supplies it as an input argument to the component
        #it puts the outputs into training_job_run_op
        training_job_run_op = gcc_aip.AutoMLImageTrainingJobRunOp(
            project=project_id,
            display_name="train-cifar10-automl",
            prediction_type="classification",
            model_type="CLOUD",
            dataset=ds_op.outputs["dataset"],
            model_display_name="cifar10-model",
            training_fraction_split=0.6,
            validation_fraction_split=0.2,
            test_fraction_split=0.2,
            budget_milli_node_hours=8000,
        )
        training_job_run_op.after(ds_op)

        #the third and fourth step are for deploying the model
        create_endpoint_op = gcc_aip.EndpointCreateOp(
            project=project_id,
            display_name = "cifar10-automl-endpoint",
        )
        create_endpoint_op.after(training_job_run_op)

        model_deploy_op = gcc_aip.ModelDeployOp(
            model=training_job_run_op.outputs["model"],
            endpoint=create_endpoint_op.outputs['endpoint'],
            automatic_resources_min_replica_count=1,
            automatic_resources_max_replica_count=1,
        )
        model_deploy_op.after(create_endpoint_op) 

    with kfp.dsl.Condition(dataset_op.outputs['Output'] != 'None', name="update dataset"):
        ds_op = gcc_aip.ImageDatasetImportDataOp(
            project=project_id,
            dataset=dataset_op.outputs['dataset'],
            gcs_source=dataset_path,
            import_schema_uri=aiplatform.schema.dataset.ioformat.image.single_label_classification
        )
        ds_op.after(dataset_op)

        model_op = get_model_id(
            project_id=project_id,
            location=location,
            model_name=base_model_name
        )
        model_op.after(ds_op)

        with kfp.dsl.Condition(model_op.outputs['Output'] != 'None', name='model exist'):
            training_job_run_op = gcc_aip.AutoMLImageTrainingJobRunOp(
                  project=project_id,
                  display_name="train-cifar10-automl",
                  prediction_type="classification",
                  model_type="CLOUD",
                  base_model=model_op.outputs['model'],
                  dataset=ds_op.outputs["dataset"],
                  model_display_name="cifar10-model",
                  training_fraction_split=0.6,
                  validation_fraction_split=0.2,
                  test_fraction_split=0.2,
                  budget_milli_node_hours=8000,
            )
            training_job_run_op.after(model_op)

            create_endpoint_op = gcc_aip.EndpointCreateOp(
                project=project_id,
                display_name = "cifar10-automl-endpoint",
            )
            create_endpoint_op.after(training_job_run_op)

            model_deploy_op = gcc_aip.ModelDeployOp(
              model=training_job_run_op.outputs["model"],
              endpoint=create_endpoint_op.outputs['endpoint'],
              automatic_resources_min_replica_count=1,
              automatic_resources_max_replica_count=1,
              traffic_split={"0": 100},
            )
            model_deploy_op.after(create_endpoint_op)      

        with kfp.dsl.Condition(model_op.outputs['Output'] == 'None', name='model not exist'):
            training_job_run_op = gcc_aip.AutoMLImageTrainingJobRunOp(
              project=project_id,
              display_name="train-cifar10-automl",
              prediction_type="classification",
              model_type="CLOUD",
              dataset=ds_op.outputs["dataset"],
              model_display_name="cifar10-model",
              training_fraction_split=0.6,
              validation_fraction_split=0.2,
              test_fraction_split=0.2,
              budget_milli_node_hours=8000,
            )
            training_job_run_op.after(model_op)

            create_endpoint_op = gcc_aip.EndpointCreateOp(
              project=project_id,
              display_name = "cifar10-automl-endpoint",
            )
            create_endpoint_op.after(training_job_run_op)

            model_deploy_op = gcc_aip.ModelDeployOp(
              model=training_job_run_op.outputs["model"],
              endpoint=create_endpoint_op.outputs['endpoint'],
              automatic_resources_min_replica_count=1,
              automatic_resources_max_replica_count=1,
            )
            model_deploy_op.after(create_endpoint_op) 

In [17]:
pipeline_spec_file = 'cifar10_classification_pipeline.json'

In [18]:
from kfp.v2 import compiler

compiler.Compiler().compile(
        pipeline_func=pipeline,
        package_path=pipeline_spec_file)

In [19]:
#@title GCS
REGION = "us-central1" #@param {type:"string"}

!gsutil mb -l {REGION} {PIPELINE_ROOT_PATH}
!gsutil cp {pipeline_spec_file} {PIPELINE_ROOT_PATH}/

Creating gs://vertexai-demo-pipeline/...
ServiceException: 409 A Cloud Storage bucket named 'vertexai-demo-pipeline' already exists. Try another name. Bucket names must be globally unique across all Google Cloud projects, including those outside of your organization.
Copying file://cifar10_classification_pipeline.json [Content-Type=application/json]...
/ [1 files][ 84.0 KiB/ 84.0 KiB]                                                
Operation completed over 1 objects/84.0 KiB.                                     


In [11]:
import os
os.environ['GOOGLE_CLOUD_PROJECT'] = PROJECT_ID

In [12]:
location = 'us-central1'

job = aiplatform.PipelineJob(
    display_name="automl-image-training-v2",
    template_path="cifar10_classification_pipeline.json",
    pipeline_root=PIPELINE_ROOT_PATH,
    parameter_values={
        'project_id': PROJECT_ID,
        'location': REGION,
        'dataset_name': 'my-cifar10-dataset-1012',
        'dataset_path': 'gs://demo-cifar10-dataset-annotations-1012/span-1/annotations.csv',
        'base_model_name': 'cifar10-model',
    }
)

job.submit()

Creating PipelineJob
PipelineJob created. Resource name: projects/425772488260/locations/us-central1/pipelineJobs/cifar10-pipeline-automl-20221205175208
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/425772488260/locations/us-central1/pipelineJobs/cifar10-pipeline-automl-20221205175208')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/cifar10-pipeline-automl-20221205175208?project=425772488260
