# Vertex AI Pipelines

Great to have you here

This notebook is part of a article about Vertex AI Pipelines. If you want to get more background information head over to the article. 

* 📖 Article: https://medium.com/google-cloud/google-vertex-ai-the-easiest-way-to-run-ml-pipelines-3a41c5ed153
* 🎥 Video: https://www.youtube.com/watch?v=gtVHw5YCRhE

Your feedback and questions are highly appreciated. <br>You can find me on Twitter [@HeyerSascha](https://twitter.com/HeyerSascha) or connect with me via [LinkedIn](https://www.linkedin.com/in/saschaheyer/). <br>Even better, subscribe to my [YouTube](https://www.youtube.com/channel/UC--Sm3D-rqCUeLXmraypdPQ) channel ❤️.

In [None]:
#@title
from IPython.display import HTML

HTML('<iframe width="560" height="315" src="https://www.youtube.com/embed/gtVHw5YCRhE" title="YouTube video player" frameborder="0" allowfullscreen></iframe>')

# Dependencies


In [None]:
# updated to the latest dependencies on February 2023
!pip install google-cloud-aiplatform==1.21.0 --upgrade
!pip install google-cloud-pipeline-components==1.0.27 --upgrade
!pip install kfp==1.8.16 --upgrade

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting google-cloud-aiplatform==1.21.0
  Downloading google_cloud_aiplatform-1.21.0-py2.py3-none-any.whl (2.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.3/2.3 MB[0m [31m26.8 MB/s[0m eta [36m0:00:00[0m
Collecting shapely<2.0.0
  Downloading Shapely-1.8.5.post1-cp38-cp38-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (2.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.1/2.1 MB[0m [31m58.5 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: shapely, google-cloud-aiplatform
  Attempting uninstall: shapely
    Found existing installation: shapely 2.0.1
    Uninstalling shapely-2.0.1:
      Successfully uninstalled shapely-2.0.1
  Attempting uninstall: google-cloud-aiplatform
    Found existing installation: google-cloud-aiplatform 1.19.0
    Uninstalling google-cloud-aiplatform-1.19.0:
      Successfully uninstalled google-c

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


**Please restart your Colab runtime before importing the modules**

Otherwise you might get a version conflict related error. 

In [None]:
import kfp

from typing import NamedTuple

from kfp.v2.dsl import pipeline
from kfp.v2.dsl import component
from kfp.v2.dsl import OutputPath
from kfp.v2.dsl import InputPath

from kfp.v2.dsl import Output
from kfp.v2.dsl import Metrics

from kfp.v2 import compiler
from kfp.v2.google.client import AIPlatformClient


from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

from google_cloud_pipeline_components import aiplatform as gcc_aip

from google_cloud_pipeline_components.v1.model import ModelUploadOp

# Authentication

In [None]:
from google.colab import auth
auth.authenticate_user()

credentials = auth._check_adc()
print(credentials)

True


set the project id

In [None]:
PROJECT_ID = "sascha-playground-doit"
PIPELINE_ROOT = "gs://doit-vertex-demo/"

# Clients

In [None]:
# use this instead
aiplatform.init(project=PROJECT_ID,
                location='us-central1')

# Pipeline Basic

## Components

In [None]:
@component() 
def concat(a: str, b: str) -> str:
  return a + b

In [None]:
@component
def reverse(a: str)->NamedTuple("outputs", [("before", str), ("after", str)]):
  return a, a[::-1]

## Pipeline

In [None]:
@pipeline(name="basic-pipeline",
          pipeline_root=PIPELINE_ROOT + "basic-pipeline")
def basic_pipeline(a: str='stres', b: str='sed'):
    concat_task = concat(a, b)
    reverse_task = reverse(concat_task.output)

## Compile

The compiler takes our pipeline function and compiles it into our pipeline specifiction as json file. This json file we can use to create our pipeline in Vertex AI Pipelines.

In [None]:
compiler.Compiler().compile(
pipeline_func=basic_pipeline, package_path="basic_pipeline.json"
)



## Run

Create the run job using the API. 
You can also directly upload the pipeline JSON file in the Vertx AI UI.

In [None]:
job = pipeline_jobs.PipelineJob(
    display_name="basic-pipeline",
    template_path="basic_pipeline.json",
    parameter_values={"a": "stres", "b": "sed"}
)

In [None]:
job.run(sync=False)

Creating PipelineJob


INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob


PipelineJob created. Resource name: projects/234439745674/locations/us-central1/pipelineJobs/basic-pipeline-20230216075123


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/234439745674/locations/us-central1/pipelineJobs/basic-pipeline-20230216075123


To use this PipelineJob in another session:


INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:


pipeline_job = aiplatform.PipelineJob.get('projects/234439745674/locations/us-central1/pipelineJobs/basic-pipeline-20230216075123')


INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/234439745674/locations/us-central1/pipelineJobs/basic-pipeline-20230216075123')


View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/basic-pipeline-20230216075123?project=234439745674


INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/basic-pipeline-20230216075123?project=234439745674


# Is There More? I Want to Learn the Honest Stuff.

## Component Specification (function based component)


In [None]:
@component(output_component_file="concat_component.yaml") 
def concat(a: str, b: str) -> str:
  return a + b

In [None]:
!cat ./concat_component.yaml

name: Concat
inputs:
- {name: a, type: String}
- {name: b, type: String}
outputs:
- {name: Output, type: String}
implementation:
  container:
    image: python:3.7
    command:
    - sh
    - -c
    - |2

      if ! [ -x "$(command -v pip)" ]; then
          python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip
      fi

      PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet     --no-warn-script-location 'kfp==1.8.12' && "$0" "$@"
    - sh
    - -ec
    - |
      program_path=$(mktemp -d)
      printf "%s" "$0" > "$program_path/ephemeral_component.py"
      python3 -m kfp.v2.components.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"
    - |2+

      import kfp
      from kfp.v2 import dsl
      from kfp.v2.dsl import *
      from typing import *

      def concat(a: str, b: str) -> str:
        return a + b

    args:
    - --executor_inpu

## Share components
In this example we use the component specification created based on a function based component.

In [None]:
@pipeline(name="share-component", 
          pipeline_root=PIPELINE_ROOT + "share-component-pipeine")
def share_component_pipeline(a: str='stres', b: str='sed'):
    #concat_op = concat(a, b)
    concat_component = kfp.components.load_component_from_file('./concat_component.yaml')

    concat_task = concat_component(a,b)
    reverse_task = reverse(concat_task.output)

In [None]:
# just the usual boilerplate code to run the pipeline
compiler.Compiler().compile(
  pipeline_func=share_component_pipeline, package_path="share_component_pipeline.json"
)

job = pipeline_jobs.PipelineJob(
    display_name="share-component-pipeline",
    template_path="share_component_pipeline.json",
    parameter_values={"a": "stres", "b": "sed"}
)

job.run(sync=False)

Creating PipelineJob




## Pipeline with GPU and machine type

The following is an example how you can add an GPU to your component. For example the training componentn that needs access to accelerators.

In [None]:
@component(output_component_file="gpu_training.yaml", 
           base_image="gcr.io/deeplearning-platform-release/tf2-gpu.2-6") 
def gpuTrainingFunc() -> bool:
  import logging
  import tensorflow as tf

  gpus = tf.config.list_physical_devices('GPU')

  for gpu in gpus:
    logging.info('Name: {} Type: {}'.format(gpu.name, gpu.device_type))
  
  return True

In [None]:
@pipeline(name="gpu-pipeline",
          pipeline_root=PIPELINE_ROOT + "gpu-pipeline")
def gpu_pipeline():
    gpuTraining = gpuTrainingFunc().add_node_selector_constraint(
        label_name="cloud.google.com/gke-accelerator", 
        value="NVIDIA_TESLA_T4")

In [None]:
compiler.Compiler().compile(
  pipeline_func=gpu_pipeline, package_path="gpu_pipeline.json"
)

job = pipeline_jobs.PipelineJob(
   display_name="gpu-pipeline",
   template_path="gpu_pipeline.json"
)

job.run(sync=False)

Creating PipelineJob




## Schedule 
switch over to GCP and check your Cloud Functions and Cloud Scheduler ;)

In [None]:
# Deprecated don't use this anymore
from kfp.v2.google.client import AIPlatformClient
api_client = AIPlatformClient(project_id=PROJECT_ID,
                              region='us-central1')


In [None]:
from kfp.v2.google.client import AIPlatformClient
api_client = AIPlatformClient(project_id=PROJECT_ID, region='us-central1')




In [None]:
api_client.create_schedule_from_job_spec(
    job_spec_path='basic_pipeline.json',
    schedule='*/10 * * * *' # every 10 minutes
)



RuntimeError: ignored

## Additional Packages

In [None]:
@component(packages_to_install = ["pandas==1.3.4"],) 
def additional_packages():
  import pandas
  print(pandas.__version__)

# fails for demonstration purposes
@component() 
def additional_packages_missing():
  import pandas
  print(pandas.__version__)

In [None]:
@pipeline(name="additional-packages-pipeline",
          pipeline_root=PIPELINE_ROOT + "additional-packages-pipeline")
def additional_packges_pipeline():
    additional_packages_task = additional_packages()
    additional_packages_missing_task = additional_packages_missing()

compiler.Compiler().compile(
  pipeline_func=additional_packges_pipeline, package_path="additional_packages_pipeline.json"
)

job = pipeline_jobs.PipelineJob(
    display_name="additional-packages-pipeline",
    template_path="additional_packages_pipeline.json"
)

job.run(sync=False)

Creating PipelineJob




## Base Image

Compare the component specification yaml and find the difference

https://cloud.google.com/vertex-ai/docs/training/pre-built-containers

In [None]:
@component(output_component_file="custom_base_image_component.yaml",
           base_image="gcr.io/deeplearning-platform-release/tf2-gpu.2-6"
) 
def custom_base_image():
  import tensorflow as tf
  print(tf.version.VERSION)

# fails for demonstration purposes
@component(output_component_file="default_base_image_component.yaml") 
def default_base_image():
  import tensorflow as tf
  print(tf.version.VERSION)

In [None]:
!cat ./default_base_image_component.yaml

name: Default base image
implementation:
  container:
    image: python:3.7
    command:
    - sh
    - -c
    - |2

      if ! [ -x "$(command -v pip)" ]; then
          python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip
      fi

      PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet     --no-warn-script-location 'kfp==1.8.12' && "$0" "$@"
    - sh
    - -ec
    - |
      program_path=$(mktemp -d)
      printf "%s" "$0" > "$program_path/ephemeral_component.py"
      python3 -m kfp.v2.components.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"
    - |2+

      import kfp
      from kfp.v2 import dsl
      from kfp.v2.dsl import *
      from typing import *

      def default_base_image():
        import tensorflow as tf
        print(tf.version.VERSION)

    args:
    - --executor_input
    - {executorInput: null}
    - --function_to_ex

In [None]:
!cat ./custom_base_image_component.yaml

name: Custom base image
implementation:
  container:
    image: gcr.io/deeplearning-platform-release/tf2-gpu.2-6
    command:
    - sh
    - -c
    - |2

      if ! [ -x "$(command -v pip)" ]; then
          python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip
      fi

      PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet     --no-warn-script-location 'kfp==1.8.12' && "$0" "$@"
    - sh
    - -ec
    - |
      program_path=$(mktemp -d)
      printf "%s" "$0" > "$program_path/ephemeral_component.py"
      python3 -m kfp.v2.components.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"
    - |2+

      import kfp
      from kfp.v2 import dsl
      from kfp.v2.dsl import *
      from typing import *

      def custom_base_image():
        import tensorflow as tf
        print(tf.version.VERSION)

    args:
    - --executor_input
    - {executo

## Predefined Components

For a full list of pre-defined components see https://cloud.google.com/vertex-ai/docs/pipelines/gcpc-list

Predefined components depends on the use case.


# Pipeline end to end (XGBoost)

## Dependencies

In [None]:
from typing import NamedTuple

from kfp.v2 import dsl
from kfp.v2.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component, 
                        Markdown)

from kfp.v2 import compiler

## Data

In [None]:
@component(
    packages_to_install = [
        "pandas==1.3.4",
        "scikit-learn==1.0.1",
    ],
)
def get_data(
    dataset_train: Output[Dataset],
    dataset_test: Output[Dataset],
):
    
    from sklearn import datasets
    from sklearn.model_selection import train_test_split as tts
    import pandas as pd


    # dataset https://www.kaggle.com/uciml/breast-cancer-wisconsin-data
    data_raw = datasets.load_breast_cancer()
    data = pd.DataFrame(data_raw.data, columns=data_raw.feature_names)
    data["target"] = data_raw.target
    
    train, test = tts(data, test_size=0.3)
    
    train.to_csv(dataset_train.path)
    test.to_csv(dataset_test.path)

## Training

In [None]:
@component(
    packages_to_install = [
        "pandas==1.3.4",
        "xgboost==1.5.1",
        "scikit-learn==1.0.1", #xgboost requires scikitlearn
    ],
)
def train_model(
    dataset: Input[Dataset],
    model_artifact: Output[Model]
):
    
    from xgboost import XGBClassifier
    import pandas as pd
    
    data = pd.read_csv(dataset.path)

    model = XGBClassifier(
        objective="binary:logistic"
    )
    model.fit(
        data.drop(columns=["target"]),
        data.target,
    )

    score = model.score(
        data.drop(columns=["target"]),
        data.target,
    )

    model_artifact.metadata["train_score"] = float(score)
    model_artifact.metadata["framework"] = "XGBoost"

    print(model_artifact.path)
    
    model.save_model(model_artifact.path)

## Evaluation

In [None]:
@component(
    packages_to_install = [
        "pandas==1.3.4",
        "scikit-learn==1.0.1",
        "xgboost==1.5.1"
    ],
)
def eval_model(
    test_set: Input[Dataset],
    xgb_model: Input[Model],
    metrics: Output[ClassificationMetrics],
    smetrics: Output[Metrics]
) -> NamedTuple("Outputs", [("deploy", str)]): 
    from xgboost import XGBClassifier
    import pandas as pd
    
    data = pd.read_csv(test_set.path)
    model = XGBClassifier()
    model.load_model(xgb_model.path)
    
    score = model.score(
        data.drop(columns=["target"]),
        data.target,
    )
    
    from sklearn.metrics import roc_curve
    y_scores =  model.predict_proba(data.drop(columns=["target"]))[:, 1]
    fpr, tpr, thresholds = roc_curve(
         y_true=data.target.to_numpy(), y_score=y_scores, pos_label=True
    )
    metrics.log_roc_curve(fpr.tolist(), tpr.tolist(), thresholds.tolist())
    
    from sklearn.metrics import confusion_matrix
    y_pred = model.predict(data.drop(columns=["target"]))
    
    metrics.log_confusion_matrix(
       ["False", "True"],
       confusion_matrix(
           data.target, y_pred
       ).tolist()
    )
    
    xgb_model.metadata["test_score"] = float(score)
    smetrics.log_metric("score", float(score))


    deploy = "true"
    #compare threshold or to previous

    return (deploy,)

## Deployment

In [None]:
@component(packages_to_install=["google-cloud-aiplatform==1.3.0"])
def deploy(
    model: Input[Model],
    project: str,
    region: str,):
  
  import logging
  from google.cloud import aiplatform
  aiplatform.init(project=project, location=region)

  logging.basicConfig(level=logging.DEBUG)
  logging.debug(model)

  print(model)
  print(model.uri)

  import os
  path,file = os.path.split(model.uri)

  import datetime
  
  # datetime.datetime.now().strftime('%Y%m%d%H%M%S')
  # serving image https://cloud.google.com/vertex-ai/docs/predictions/pre-built-containers#xgboost
  deployed_model = aiplatform.Model.upload(
        display_name="xgboost-pipeline",
        artifact_uri = path,
        serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.1-4:latest"
  )


## Pipeline

In [None]:
@dsl.pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root=PIPELINE_ROOT + "xgboost-pipeline",
    # A name for the pipeline. Use to determine the pipeline Context.
    name="xgboost-pipeline-with-deployment",
)
def pipeline():
    dataset_op = get_data()
    training_op = train_model(dataset_op.outputs["dataset_train"])
    eval_op = eval_model(
        test_set=dataset_op.outputs["dataset_test"],
        xgb_model=training_op.outputs["model_artifact"]
    )

    with dsl.Condition(
        eval_op.outputs["deploy"] == "true",
        name="deploy",
    ):

      deploy_op = deploy(training_op.outputs["model_artifact"], 
                         'sascha-playground-doit',
                         'us-central1')

    # we need a solution for xgb models
    # its here https://cloud.google.com/vertex-ai/docs/predictions/deploy-model-api#aiplatform_deploy_model_custom_trained_model_sample-python
  

## Compile

In [None]:
compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path='xgb_pipeline.json')



## Run Pipeline (old soon deprectated)
deprectation see https://github.com/kubeflow/pipelines/blob/56cf094fd3058b5c640077968ffb0f01e0511a57/sdk/python/kfp/v2/google/client/client.py#L169

In [None]:
response = api_client.create_run_from_job_spec(
    'xgb_pipeline.json',
    enable_caching=False
)

## Run Pipeline (new)

In [None]:
job = pipeline_jobs.PipelineJob(
    display_name="xgb-pipeline",
    template_path="xgb_pipeline.json"
)

job.run(sync=False)

Creating PipelineJob


INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob


# Pipeline end to end (Huggingface, Sentiment)
Work in progress

## Dependencies

In [None]:
import kfp

from typing import NamedTuple

from kfp.v2.dsl import pipeline
from kfp.v2.dsl import component
from kfp.v2.dsl import OutputPath
from kfp.v2.dsl import InputPath


from typing import NamedTuple

from kfp.v2 import dsl
from kfp.v2.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component, 
                        Markdown)

from kfp.v2 import compiler
from kfp.v2.google.client import AIPlatformClient


from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

from google_cloud_pipeline_components import aiplatform as gcc_aip

from google_cloud_pipeline_components.v1.model import ModelUploadOp

In [None]:
PROJECT_ID = "sascha-playground-doit"
PIPELINE_ROOT = "gs://doit-vertex-demo/"

In [None]:
# use this instead
aiplatform.init(project=PROJECT_ID,
                location='us-central1')

## Train

In [None]:
@component(
    packages_to_install = [
        "transformers==4.1.1",
        "google-cloud-storage==1.35.0",
        "scikit-learn==0.24.0", 
        "pandas==1.1.5"
    ],
    base_image="gcr.io/deeplearning-platform-release/tf2-gpu.2-6"
)
def train_model(
    epochs: int,
    model_artifact: Output[Model],
    smetrics: Output[Metrics]
):
    
    from sklearn.model_selection import train_test_split
    from transformers import DistilBertTokenizerFast
    from transformers import TFDistilBertForSequenceClassification
    from google.cloud import storage

    import tensorflow as tf
    import json
    import pandas as pd
    import numpy as np
    from io import StringIO

    print('load distilbert')
    # load model and tokenizer
    model = TFDistilBertForSequenceClassification.from_pretrained(
        "distilbert-base-uncased")
    tokenizer = DistilBertTokenizerFast.from_pretrained("distilbert-base-uncased")


    print('start training')
    file = tf.io.gfile.GFile(
        'gs://machine-learning-samples/datasets/sentiment/imdb/csv/dataset.csv', mode='r').read()
    df = pd.read_csv(StringIO(file))

    #df = df.head()

    sentiments = df['sentiment'].values.tolist()
    reviews = df['review'].values.tolist()

    training_sentences, validation_sentences, training_labels, validation_labels = train_test_split(
        reviews,
        sentiments,
        test_size=.2)

    train_encodings = tokenizer(training_sentences,
                                truncation=True,
                                padding=True)
    val_encodings = tokenizer(validation_sentences,
                              truncation=True,
                              padding=True)

    train_dataset = tf.data.Dataset.from_tensor_slices((
        dict(train_encodings),
        training_labels
    ))

    val_dataset = tf.data.Dataset.from_tensor_slices((
        dict(val_encodings),
        validation_labels
    ))

    optimizer = tf.keras.optimizers.Adam(learning_rate=5e-5)

    model.compile(optimizer=optimizer,
                  loss=model.compute_loss,
                  metrics=['accuracy'])

    model.fit(train_dataset.shuffle(100).batch(16),
              epochs=epochs,
              batch_size=16,)

    model.save_pretrained(model_artifact.path)
    print(model_artifact.path)

    print('eval')
    evaluation = model.evaluate(val_dataset.shuffle(100).batch(16),
               batch_size=16)
    print(evaluation)

    smetrics.log_metric("accuracy", float(evaluation[1]))
    

## Serving Container
code for the container see https://github.com/SaschaHeyer/serving-custom-container

In [None]:
@component(packages_to_install=[
    "google-cloud-build==3.8.3",
    "google-api-python-client"])
def build_serving_container(model_artifact: Input[Model]) -> NamedTuple("Outputs", [("container", str)]): 
    from google.cloud.devtools import cloudbuild
    from googleapiclient.discovery import build
    import time

    print('deploy.............')
    print(model_artifact.uri)

    client = cloudbuild.CloudBuildClient()
    build = cloudbuild.Build()


    # version is current timestamp 
    version = str(int(time.time()))
    container = "gcr.io/sascha-playground-doit/sentiment-fast-api-test:{}".format(version)


    #todo get the model from the pipeline folder
    build.steps = [{"name": "gcr.io/cloud-builders/git",
                    "args": ["clone", "https://github.com/SaschaHeyer/serving-custom-container"]}, 
                   {"name": "gcr.io/cloud-builders/gsutil",
                    "args": ["cp", "-r", "gs://doit-vertex-demo/models/sentiment", "./serving-custom-container"]}, 
                   {"name": "gcr.io/cloud-builders/docker",
                    "args": ["build", "-t", container, "serving-custom-container" ]},
                   {"name": "gcr.io/cloud-builders/docker",
                    "args": ["push", container]}]

    #build.substitutions = {"_VERSION": version}

    operation = client.create_build(project_id="sascha-playground-doit", build=build)
    # Print the in-progress operation
    print("IN PROGRESS:")
    print(operation.metadata)

    result = operation.result()
    # Print the completed status
    print("RESULT:", result.status)

    return (container,)

## Deploy

In [None]:
@component(packages_to_install=["google-cloud-aiplatform==1.15.0"])
def deploy_model(
    project: str,
    region: str,
    container: str
):
  from google.cloud import aiplatform
  aiplatform.init(project=project, location=region)

  ENDPOINT_NAME = "sentiment"
  DISPLAY_NAME  = "sentiment"

  MODEL_TYPE = "query"
  MODEL_NAME = f"{MODEL_TYPE}_model"  # Used by the deployment container.

  def create_endpoint():
    print('create endpoint')
    endpoints = aiplatform.Endpoint.list(
      filter='display_name="{}"'.format(ENDPOINT_NAME),
      order_by='create_time desc',
      project=project, 
      location=region,
    )

    if len(endpoints) > 0:
      endpoint = endpoints[0]  
    else:
      endpoint = aiplatform.Endpoint.create(
        display_name=ENDPOINT_NAME, project=project, location=region
      )

    return endpoint

  models = aiplatform.Model.list(filter=("display_name={}").format(DISPLAY_NAME))

  PORT = 80
  HEALTH_ROUTE = "/health"
  PREDICT_ROUTE = "/predict"


  if len(models) == 0:
    # upload the initial model
    model_uploaded = aiplatform.Model.upload(
          display_name = DISPLAY_NAME, 
          serving_container_image_uri = container,
          serving_container_health_route=HEALTH_ROUTE,
          serving_container_predict_route=PREDICT_ROUTE,   
          serving_container_ports=[PORT]
    )
  else: 
    #upload a new model version using the exiting model ressource ID
    parent_model = models[0].resource_name

    model_uploaded = aiplatform.Model.upload(
          parent_model = parent_model,
          display_name = DISPLAY_NAME, 
          serving_container_image_uri = container,
          serving_container_health_route=HEALTH_ROUTE,
          serving_container_predict_route=PREDICT_ROUTE,   
          serving_container_ports=[PORT]      
    )

  endpoint = create_endpoint() 
  
  model_deploy = model_uploaded.deploy(
        machine_type="n1-standard-4", 
        endpoint=endpoint,
        traffic_split={"0": 100},
        deployed_model_display_name=DISPLAY_NAME,
  )

  # undeploy models without traffic for this specific endpoint
  for model in endpoint.list_models():
    print(model)
    if model.id not in endpoint.traffic_split:
      endpoint.undeploy(deployed_model_id = model.id)


## Pipeline

In [None]:
@dsl.pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root=PIPELINE_ROOT + "sentiment-pipeline",
    # A name for the pipeline. Use to determine the pipeline Context.
    name="sentiment-pipeline",
)
def pipeline(epochs:int):
    train_op = train_model(epochs).add_node_selector_constraint(
        label_name="cloud.google.com/gke-accelerator", 
        value="NVIDIA_TESLA_T4").set_caching_options(False)
    
    # build custom serving container with latest model
    build_serving_container_op = build_serving_container(train_op.outputs["model_artifact"]).set_caching_options(False)

    # upload and deploy model to vertex ai
    deploy_op = deploy_model("sascha-playground-doit","us-central1", build_serving_container_op.outputs["container"])


## Compile

In [None]:
compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path='sentiment_pipeline.json')



## Run

In [None]:
job = pipeline_jobs.PipelineJob(
    display_name="sentiment-pipeline",
    template_path="sentiment_pipeline.json",
    parameter_values={
        'epochs': 3
    }
)

job.submit(experiment="sentiment")

Creating PipelineJob


INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob


PipelineJob created. Resource name: projects/234439745674/locations/us-central1/pipelineJobs/sentiment-pipeline-20221123181836


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/234439745674/locations/us-central1/pipelineJobs/sentiment-pipeline-20221123181836


To use this PipelineJob in another session:


INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:


pipeline_job = aiplatform.PipelineJob.get('projects/234439745674/locations/us-central1/pipelineJobs/sentiment-pipeline-20221123181836')


INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/234439745674/locations/us-central1/pipelineJobs/sentiment-pipeline-20221123181836')


View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/sentiment-pipeline-20221123181836?project=234439745674


INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/sentiment-pipeline-20221123181836?project=234439745674


Associating projects/234439745674/locations/us-central1/pipelineJobs/sentiment-pipeline-20221123181836 to Experiment: sentiment


INFO:google.cloud.aiplatform.metadata.experiment_resources:Associating projects/234439745674/locations/us-central1/pipelineJobs/sentiment-pipeline-20221123181836 to Experiment: sentiment


# What about artifacts that are not a dataset or model?

For that we can use outputPath which provides similar like the artifacts a path to Google Cloud Storage where we can store data. In any case you need to serialize any intermediate object from memory to save it as a file. In your next component you then can load the serialized file back to in memory object. 

In [None]:
@component() 
def first(output_path: OutputPath()):

  # everything that can be serialized to a file can be stored
  # you are not limited to the artifact types like dataset or model

  # common cases are tfidf
 
  animals = ['cat', 'dog']

  import pickle

  with open(output_path + "animals.pkl", 'wb') as file:
    pickle.dump(animals, file)

In [None]:
@component() 
def second(input_path: InputPath()):

  import pickle
  file = open(input_path + "animals.pkl", 'rb')
  data = pickle.load(file)
  file.close()

  print(data)

In [None]:
@pipeline(name="output-path-pipeline",
          pipeline_root=PIPELINE_ROOT + "output-path-pipeline")
def output_pipeline():
    first_task = first()
    second_task = second(first_task.output)

In [None]:
compiler.Compiler().compile(
  pipeline_func=output_pipeline, package_path="output_pipeline.json"
)



In [None]:
job = pipeline_jobs.PipelineJob(
    display_name="output-pipeline",
    template_path="output_pipeline.json"
)

In [None]:
job.run(sync=False)

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
