In [None]:
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Notebook Setup

<table align="left">

  <td>
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/experiments/comparing_pipeline_runs.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Colab logo"> Run in Colab
    </a>
  </td>
  <td>
    <a href="https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/experiments/comparing_pipeline_runs.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo">
      View on GitHub
    </a>
  </td>
  <td>
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/experiments/comparing_pipeline_runs.ipynb">
      <img src="https://lh3.googleusercontent.com/UiNooY4LUgW_oTvpsNhPpQzsstV5W8F7rYgxgGBD85cWJoLmrOzhVs_ksK_vgx40SHs7jCqkTkCk=e14-rj-sc0xffffff-h130-w32" alt="Vertex AI logo">
      Open in Vertex AI Workbench
    </a>
  </td>                                                                                               
</table>

## Overview

As a Data Scientist, you probably start running model experiments locally on your notebook. Depending on the framework you use, you would need to track parameters, training time series and evaluation metrics. In this way, you would be able to explain the modelling approach you would choose.

### Dataset

The dataset is the Tensorflow Dataset's Large Yelp Review Dataset. The Yelp reviews dataset consists of reviews from Yelp. For more information, please refer to [this](http://www.yelp.com/dataset) link.


### Objective
* log the model parameters
* log the loss and metrics on every epoch to TensorBoard
* log the evaluation metrics
* compare two experiments


### Install additional packages

Install additional package dependencies not installed in your notebook environment, such as Vertex AI SDK and KFP SDK. Use the latest major GA version of each package.

In [1]:
import os

# The Vertex AI Workbench Notebook product has specific requirements
IS_WORKBENCH_NOTEBOOK = os.getenv("DL_ANACONDA_HOME")
IS_USER_MANAGED_WORKBENCH_NOTEBOOK = os.path.exists(
    "/opt/deeplearning/metadata/env_version"
)

# Vertex AI Notebook requires dependencies to be installed with '--user'
USER_FLAG = ""
if IS_WORKBENCH_NOTEBOOK:
    USER_FLAG = "--user"

!pip3 install {USER_FLAG} --force-reinstall git+https://github.com/sasha-gitg/python-aiplatform@main -q
!pip3 install {USER_FLAG} google-cloud-aiplatform[metadata] -q
!pip3 install {USER_FLAG} kfp -q

[0m[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
tfx-bsl 1.9.0 requires google-api-python-client<2,>=1.7.11, but you have google-api-python-client 2.52.0 which is incompatible.
tfx-bsl 1.9.0 requires pyarrow<6,>=1, but you have pyarrow 8.0.0 which is incompatible.
tensorflow 2.9.0rc2 requires tensorboard<2.10,>=2.9, but you have tensorboard 2.8.0 which is incompatible.
tensorflow-transform 1.9.0 requires pyarrow<6,>=1, but you have pyarrow 8.0.0 which is incompatible.
tensorflow-serving-api 2.9.0 requires tensorflow<3,>=2.9.0, but you have tensorflow 2.9.0rc2 which is incompatible.
google-cloud-recommendations-ai 0.2.0 requires google-api-core[grpc]<2.0.0dev,>=1.22.2, but you have google-api-core 2.8.2 which is incompatible.
cloud-tpu-client 0.10 requires google-api-python-client==1.8.0, but you have google-api-python-client 2.52.0 which is incompatible

### Restart the kernel

After you install the additional packages, you need to restart the notebook kernel so it can find the packages.

In [2]:
# Automatically restart kernel after installs
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

#### Set your project ID

**If you don't know your project ID**, you may be able to get your project ID using `gcloud`.

In [1]:
import os

PROJECT_ID = "hybrid-vertex" # TODO:
REGION = "us-central1"   # TODO:

from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

Project ID:  hybrid-vertex


In [5]:
# If you are running this notebook in Colab, run this cell and follow the
# instructions to authenticate your GCP account. This provides access to your
# Cloud Storage bucket and lets you submit training jobs and prediction
# requests.

import os
import sys

# If on Vertex AI Workbench, then don't execute this code

IS_COLAB = False

if not os.path.exists("/opt/deeplearning/metadata/env_version") and not os.getenv(
    "DL_ANACONDA_HOME"
):
    if "google.colab" in sys.modules:

        IS_COLAB = True

        from google.colab import auth as google_auth

        google_auth.authenticate_user()

    # If you are running this notebook locally, replace the string below with the
    # path to your service account key and run this cell to authenticate your GCP
    # account.

    elif not os.getenv("IS_TESTING"):

        %env GOOGLE_APPLICATION_CREDENTIALS ''

In [6]:
BUCKET_NAME = "test-us-central"  # TODO:
BUCKET_URI = f"gs://{BUCKET_NAME}"

**Only if your bucket doesn't already exist**: Run the following cell to create your Cloud Storage bucket.

In [None]:
# ! gsutil mb -l $REGION -p $PROJECT_ID $BUCKET_URI

Finally, validate access to your Cloud Storage bucket by examining its contents:

In [7]:
! gsutil ls -al $BUCKET_URI

                                 gs://test-us-central/tensorboard-logs/


### Download training dataset


In [8]:
DATASET_URI = "gs://cloud-samples-data/ai-platform/iris"

In [9]:
!gsutil cp -r $DATASET_URI $BUCKET_URI

Copying gs://cloud-samples-data/ai-platform/iris/classification/evaluate.csv [Content-Type=text/csv]...
Copying gs://cloud-samples-data/ai-platform/iris/classification/train.csv [Content-Type=text/csv]...
Copying gs://cloud-samples-data/ai-platform/iris/iris_data.csv [Content-Type=application/octet-stream]...
Copying gs://cloud-samples-data/ai-platform/iris/iris_predict.csv [Content-Type=text/csv]...
- [4 files][  5.4 KiB/  5.4 KiB]                                                
==> NOTE: You are performing a sequence of gsutil operations that may
run significantly faster if you instead use gsutil -m cp ... Please
see the -m section under "gsutil help options" for further information
about when gsutil -m can be advantageous.

Copying gs://cloud-samples-data/ai-platform/iris/iris_target.csv [Content-Type=application/octet-stream]...
Copying gs://cloud-samples-data/ai-platform/iris/iris_test.csv [Content-Type=text/csv]...
Copying gs://cloud-samples-data/ai-platform/iris/iris_training.cs

### Import libraries and define constants

In [10]:
import logging
# General
import os
import time

logger = logging.getLogger("logger")
logging.basicConfig(level=logging.INFO)

import kfp.v2.compiler as compiler

# Pipeline Experiments
import kfp.v2.dsl as dsl

# Vertex AI
from google.cloud import aiplatform as vertex_ai
from kfp.v2.dsl import Metrics, Model, Output, component

# Example 1 - Local 

**Create experiment for tracking training related metadata**
* First, create an experiment using the `init()` method and then initialize a run within the `experiment` using `start_run()`.

* `vertex_ai.init()` - Create an experiment instance
* `vertex_ai.start_run()` - Track a specific run within the experiment

In [122]:
vertex_ai.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_URI)
    
    # Specify a name for the experiment
EXPERIMENT_NAME = "test-experiment-name"

# Create experiment
vertex_ai.init(experiment=EXPERIMENT_NAME)

vertex_ai.start_run("run-1")

Associating projects/934903580331/locations/us-central1/metadataStores/default/contexts/test-experiment-name-run-1 to Experiment: test-experiment-name


INFO:google.cloud.aiplatform.metadata.experiment_resources:Associating projects/934903580331/locations/us-central1/metadataStores/default/contexts/test-experiment-name-run-1 to Experiment: test-experiment-name


<google.cloud.aiplatform.metadata.experiment_run_resource.ExperimentRun at 0x7f1b7e0aad10>

### Log parameters for the experiment
Typically, an experiment is associated with a specific dataset and a model architecture. Within an experiment, you may have multiple training runs, where each run tries a different configuration. For example:

* Data feeding, such as:
> * Dataset split
> * Dataset sampling and boosting
* Metaparameters, such as:
> * Depth and width of layers
* Hyperparameters, such as:
> * batch size
> * learning rate


These configuration settings are referred to as parameters, which you store as key-value pairs using the method `log_params()`

In [123]:
metaparams = {}
metaparams["units"] = 128
vertex_ai.log_params(metaparams)

hyperparams = {}
hyperparams["epochs"] = 100
hyperparams["batch_size"] = 32
hyperparams["learning_rate"] = 0.01
vertex_ai.log_params(hyperparams)

### Log metrics for the experiment
At the completion or termination of a run within an experiment, you can log results that you use to compare runs. For example:

* Evaluation metrics
* Hyperparameter search selection
* Time to train the model
* Early stop trigger

In [124]:
metrics = {}
metrics["test_acc"] = 98.7
metrics["train_acc"] = 99.3
vertex_ai.log_metrics(metrics)

In [125]:
vertex_ai.end_run()

experiment_df = vertex_ai.get_experiment_df()
experiment_df = experiment_df[experiment_df.experiment_name == EXPERIMENT_NAME]
experiment_df.T

Unnamed: 0,0
experiment_name,test-experiment-name
run_name,run-1
run_type,system.ExperimentRun
state,COMPLETE
param.epochs,100.0
param.batch_size,32.0
param.learning_rate,0.01
param.units,128.0
metric.test_acc,98.7
metric.train_acc,99.3


### Start subsequent run in an experiment

In [126]:
vertex_ai.start_run("run-2")

metaparams = {}
metaparams["units"] = 256  # changed the value
vertex_ai.log_params(metaparams)

hyperparams = {}
hyperparams["epochs"] = 100
hyperparams["batch_size"] = 32
hyperparams["learning_rate"] = 0.01
vertex_ai.log_params(hyperparams)

metrics = {}
metrics["test_acc"] = 98.8  # value changed
metrics["train_acc"] = 99.5  # value changed
vertex_ai.log_metrics(metrics)

Associating projects/934903580331/locations/us-central1/metadataStores/default/contexts/test-experiment-name-run-2 to Experiment: test-experiment-name


INFO:google.cloud.aiplatform.metadata.experiment_resources:Associating projects/934903580331/locations/us-central1/metadataStores/default/contexts/test-experiment-name-run-2 to Experiment: test-experiment-name


### Comparing runs in the same experiment
Finally, you use the experiment name as a parameter to the method `get_experiment_df()` to get the results of all the runs within the experiment as a pandas dataframe.

In [127]:
vertex_ai.end_run()

experiment_df = vertex_ai.get_experiment_df()
experiment_df = experiment_df[experiment_df.experiment_name == EXPERIMENT_NAME]
experiment_df.T

Unnamed: 0,0,1
experiment_name,test-experiment-name,test-experiment-name
run_name,run-2,run-1
run_type,system.ExperimentRun,system.ExperimentRun
state,COMPLETE,COMPLETE
param.units,256.0,128.0
param.epochs,100.0,100.0
param.learning_rate,0.01,0.01
param.batch_size,32.0,32.0
metric.train_acc,99.5,99.3
metric.test_acc,98.8,98.7


# Example 2 - compare locally trained models

> Run experiment and evaluate experiment runs using `with` statement

In [263]:
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
VERSION = 'v1'

# Training
DATA_DIR = "data"
LOG_DIR = "logs"

# Experiments
TASK = "compare-local"
MODEL_TYPE = "rnn"
EXPERIMENT_NAME = f"{TASK}-{VERSION}-{MODEL_TYPE}-{TIMESTAMP}"

vertex_ai_tb = vertex_ai.Tensorboard.create()
vertex_ai.init(experiment=EXPERIMENT_NAME, experiment_tensorboard=vertex_ai_tb)

Creating Tensorboard


INFO:google.cloud.aiplatform.tensorboard.tensorboard_resource:Creating Tensorboard


Create Tensorboard backing LRO: projects/934903580331/locations/us-central1/tensorboards/7825760816556146688/operations/2378638994029150208


INFO:google.cloud.aiplatform.tensorboard.tensorboard_resource:Create Tensorboard backing LRO: projects/934903580331/locations/us-central1/tensorboards/7825760816556146688/operations/2378638994029150208


Tensorboard created. Resource name: projects/934903580331/locations/us-central1/tensorboards/7825760816556146688


INFO:google.cloud.aiplatform.tensorboard.tensorboard_resource:Tensorboard created. Resource name: projects/934903580331/locations/us-central1/tensorboards/7825760816556146688


To use this Tensorboard in another session:


INFO:google.cloud.aiplatform.tensorboard.tensorboard_resource:To use this Tensorboard in another session:


tb = aiplatform.Tensorboard('projects/934903580331/locations/us-central1/tensorboards/7825760816556146688')


INFO:google.cloud.aiplatform.tensorboard.tensorboard_resource:tb = aiplatform.Tensorboard('projects/934903580331/locations/us-central1/tensorboards/7825760816556146688')


In [None]:
!mkdir -p -m 777 $DATA_DIR $LOG_DIR

### Helper functions for model, training, eval

In [264]:
import tensorflow_datasets as tfds

tfds.disable_progress_bar()
import tensorflow as tf


# Helpers ----------------------------------------------------------------------
def get_dataset(data_dir, buffer_size, batch_size):
    """
    Returns a tf.data.Dataset object containing the training data
    Returns:
        tf.data.Dataset: A tf.data.Dataset object containing the training data
        buffer_size: The buffer size for prefetch data
        batch_size: The batch size of dataset
    """

    dataset = tfds.load(
        "imdb_reviews",
        download=True,
        data_dir=data_dir,
        with_info=False,
        as_supervised=True,
    )

    train_dataset, test_dataset = dataset["train"], dataset["test"]

    train_dataset = (
        train_dataset.shuffle(buffer_size).batch(batch_size).prefetch(tf.data.AUTOTUNE)
    )
    test_dataset = test_dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    return train_dataset, test_dataset


def get_encoder(train_dataset, vocab_size):
    """
    Returns a TextVectorization object for the encoder
    Args:
        train_dataset: A tf.data.Dataset object containing the training data
    Returns:
        TextVectorization: A TextVectorization object for the encoder
    """

    encoder = tf.keras.layers.TextVectorization(max_tokens=vocab_size)
    encoder.adapt(
        train_dataset.map(lambda text, label: text, num_parallel_calls=tf.data.AUTOTUNE)
    )
    return encoder


def get_model(encoder, model_params, role):
    """
    Returns a tf.keras.Model object for the model
    Args:
        encoder: A TextVectorization object for the encoder
        model_params: A dictionary with model parameters
        role: A variable to set the role of model
    Returns:
        tf.keras.Model: A tf.keras.Model object for the model
    """
    model = tf.keras.Sequential()
    model.add(encoder)
    model.add(
        tf.keras.layers.Embedding(
            input_dim=len(encoder.get_vocabulary()), output_dim=64, mask_zero=True
        )
    )
    if role == "baseline":
        model.add(tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(64)))
        model.add(tf.keras.layers.Dense(64, activation="relu"))
        model.add(tf.keras.layers.Dense(1))
    else:
        model.add(
            tf.keras.layers.Bidirectional(
                tf.keras.layers.LSTM(64, return_sequences=True)
            )
        )
        model.add(tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(32)))
        model.add(tf.keras.layers.Dropout(0.5))
        model.add(tf.keras.layers.Dense(1))

    model.compile(
        loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
        optimizer=tf.keras.optimizers.Adam(
            learning_rate=model_params["learning_rate"],
            beta_1=model_params["beta_1"],
            beta_2=model_params["beta_2"],
            epsilon=model_params["epsilon"],
        ),
        metrics=["accuracy"],
    )
    return model


def train(train_dataset, test_dataset, model, epochs, steps, log_dir=LOG_DIR):
    """
    Trains the model
    Args:
        train_dataset: A tf.data.Dataset object containing the training data
        test_dataset: A tf.data.Dataset object containing the testing data
        model: A tf.keras.Model object for the model
        epochs: The number of epochs
        steps: The number of validation steps
        log_dir: The location of tf training logs
    """
    tensorboard = tf.keras.callbacks.TensorBoard(log_dir=log_dir)
    history = model.fit(
        train_dataset,
        validation_data=test_dataset,
        epochs=epochs,
        validation_steps=steps,
        callbacks=[tensorboard],
    )
    return history

## Model 1: baseline

> Run experiment and evaluate experiment runs using `with` statement


Model training takes ~10 minutes

In [265]:
# Experiment Settings ----------------------------------------------------------
ID_1 = "exp-run-id-1"
BUFFER_SIZE = 10000
BATCH_SIZE = 64
VOCAB_SIZE = 1000
EPOCHS = 2
STEPS = 20
ROLE = "baseline"
LR = 1e-4
B_1 = 0.9
B_2 = 0.999
EPS = 1e-07

# Initialize the experiment
logging.info("Initialize the experiment.")
with vertex_ai.start_run(ID_1) as run:

    # Get the training and testing datasets
    logging.info("Get the training and testing datasets.")
    data_params = {"buffer_size": BUFFER_SIZE, "batch_size": BATCH_SIZE}
    train_dataset, test_dataset = get_dataset(
        "./data",
        buffer_size=data_params["buffer_size"],
        batch_size=data_params["batch_size"],
    )
    run.log_params(data_params)

    # Get the encoder
    logging.info("Get the encoder.")
    encoder_params = {"vocab_size": VOCAB_SIZE}
    encoder = get_encoder(
        train_dataset=train_dataset, vocab_size=encoder_params["vocab_size"]
    )
    run.log_params(encoder_params)

    # Get the model
    logging.info("Get the model.")
    run.log_params({"role": ROLE})
    model_params = {"learning_rate": LR, "beta_1": B_1, "beta_2": B_2, "epsilon": EPS}
    model = get_model(encoder=encoder, model_params=model_params, role=ROLE)
    run.log_params(model_params)

    # Train the model
    logging.info("Train the model.")
    history = train(
        model=model,
        train_dataset=train_dataset,
        test_dataset=test_dataset,
        epochs=EPOCHS,
        steps=STEPS,
    )

    run.log_params(history.params)
    for idx in range(0, history.params["epochs"]):
        run.log_time_series_metrics(
            {
                "train_loss": history.history["loss"][idx],
                "train_accuracy": history.history["accuracy"][idx],
            }
        )

    # Evaluate model
    logging.info("Evaluate model.")
    test_loss, test_accuracy = model.evaluate(test_dataset)
    run.log_metrics({"test_loss": test_loss, "test_accuracy": test_accuracy})

    # Exit the experiment
    logging.info("Exit the run.")

INFO:root:Initialize the experiment.


Associating projects/934903580331/locations/us-central1/metadataStores/default/contexts/compare-local-v1-rnn-20220729031913-exp-run-id-1 to Experiment: compare-local-v1-rnn-20220729031913


INFO:google.cloud.aiplatform.metadata.experiment_resources:Associating projects/934903580331/locations/us-central1/metadataStores/default/contexts/compare-local-v1-rnn-20220729031913-exp-run-id-1 to Experiment: compare-local-v1-rnn-20220729031913
INFO:root:Get the training and testing datasets.
INFO:absl:No config specified, defaulting to first: imdb_reviews/plain_text
INFO:absl:Load pre-computed DatasetInfo (eg: splits, num examples,...) from GCS: imdb_reviews/plain_text/1.0.0
INFO:absl:Load dataset info from /tmp/tmpicq86bxztfds
INFO:absl:Field info.config_name from disk and from code do not match. Keeping the one from code.
INFO:absl:Field info.config_description from disk and from code do not match. Keeping the one from code.
INFO:absl:Field info.citation from disk and from code do not match. Keeping the one from code.
INFO:absl:Field info.splits from disk and from code do not match. Keeping the one from code.
INFO:absl:Field info.module_name from disk and from code do not match. K

[1mDownloading and preparing dataset 80.23 MiB (download: 80.23 MiB, generated: Unknown size, total: 80.23 MiB) to ./data/imdb_reviews/plain_text/1.0.0...[0m


INFO:absl:Downloading http://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz into data/downloads/ai.stanfor.edu_amaas_sentime_aclImdb_v1PaujRp-TxjBWz59jHXsMDm5WiexbxzaFQkEnXc3Tvo8.tar.gz.tmp.df15ca4b36de41d7804ddb6de3f06061...
INFO:absl:Done writing imdb_reviews-train.tfrecord. Number of examples: 25000 (shards: [25000])
INFO:absl:Done writing imdb_reviews-test.tfrecord. Number of examples: 25000 (shards: [25000])
INFO:absl:Done writing imdb_reviews-unsupervised.tfrecord. Number of examples: 50000 (shards: [50000])
INFO:absl:Constructing tf.data.Dataset imdb_reviews for split None, from ./data/imdb_reviews/plain_text/1.0.0
2022-07-29 03:20:42.551347: I tensorflow/core/common_runtime/process_util.cc:146] Creating new thread pool with default inter op setting: 2. Tune using inter_op_parallelism_threads for best performance.


[1mDataset imdb_reviews downloaded and prepared to ./data/imdb_reviews/plain_text/1.0.0. Subsequent calls will reuse this data.[0m


INFO:root:Get the encoder.
INFO:root:Get the model.
INFO:root:Train the model.


Epoch 1/2


2022-07-29 03:39:18.993918: W tensorflow/core/kernels/data/cache_dataset_ops.cc:856] The calling iterator did not fully read the dataset being cached. In order to avoid unexpected truncation of the dataset, the partially cached contents of the dataset  will be discarded. This can happen if you have an input pipeline similar to `dataset.cache().take(k).repeat()`. You should use `dataset.take(k).cache().repeat()` instead.


Epoch 2/2


2022-07-29 03:58:01.439140: W tensorflow/core/kernels/data/cache_dataset_ops.cc:856] The calling iterator did not fully read the dataset being cached. In order to avoid unexpected truncation of the dataset, the partially cached contents of the dataset  will be discarded. This can happen if you have an input pipeline similar to `dataset.cache().take(k).repeat()`. You should use `dataset.take(k).cache().repeat()` instead.
INFO:root:Evaluate model.




INFO:root:Exit the run.


### Evaluate baseline model

In [266]:
# Get experiment
logging.info("Get experiment status.")
experiment_df = vertex_ai.get_experiment_df()
experiment_df

INFO:root:Get experiment status.


Unnamed: 0,experiment_name,run_name,run_type,state,param.role,param.epochs,param.epsilon,param.steps,param.vocab_size,param.batch_size,param.beta_2,param.buffer_size,param.beta_1,param.learning_rate,param.verbose,metric.test_accuracy,metric.test_loss,time_series_metric.train_loss,time_series_metric.train_accuracy
0,compare-local-v1-rnn-20220729031913,exp-run-id-1,system.ExperimentRun,COMPLETE,baseline,2.0,1e-07,391.0,1000.0,64.0,0.999,10000.0,0.9,0.0001,1.0,0.839,0.38272,0.426342,0.79668


In [267]:
# Get time series metrics
logging.info("Get time series metrics.")
ts_runs_df = run.get_time_series_data_frame()
ts_runs_df

INFO:root:Get time series metrics.


Unnamed: 0,step,wall_time,train_loss,train_accuracy
0,1,2022-07-29 03:58:39.023000+00:00,0.651169,0.54892
1,2,2022-07-29 03:58:39.094000+00:00,0.426342,0.79668


## Model 2: improvement

> Run experiment and evaluate experiment runs using `with` statement

Model training takes ~10 minutes

In [268]:
# Experiment Settings ----------------------------------------------------------
ID_2 = "exp-run-id-2"
ROLE = "stacked"
BUFFER_SIZE = 10000
BATCH_SIZE = 64
VOCAB_SIZE = 1000
EPOCHS = 2
STEPS = 20
LR = 1e-4
B_1 = 0.9
B_2 = 0.999
EPS = 1e-07

# Initialize the experiment
logging.info("Initialize the experiment.")
with vertex_ai.start_run(ID_2) as run:
    
    data_params = {"buffer_size": BUFFER_SIZE, "batch_size": BATCH_SIZE}
    run.log_params(data_params)
    
    encoder_params = {"vocab_size": VOCAB_SIZE}
    run.log_params(encoder_params)
    
    # Get the model
    logging.info("Get the model.")
    run.log_params({"role": ROLE})
    model = get_model(encoder=encoder, model_params=model_params, role=ROLE)
    vertex_ai.log_params(model_params)
    
    # Train the model
    logging.info("Train the model.")
    history = train(
        model=model,
        train_dataset=train_dataset,
        test_dataset=test_dataset,
        epochs=EPOCHS,
        steps=STEPS,
    )
    
    vertex_ai.log_params(history.params)
    for idx in range(0, history.params["epochs"]):
        vertex_ai.log_time_series_metrics(
            {
                "train_loss": history.history["loss"][idx],
                "train_accuracy": history.history["accuracy"][idx],
            }
        )
        
    # Evaluate model
    logging.info("Evaluate model.")
    test_loss, test_accuracy = model.evaluate(test_dataset)
    vertex_ai.log_metrics({"test_loss": test_loss, "test_accuracy": test_accuracy})

    # Exit the experiment
    logging.info("Exit the experiment.")
    vertex_ai.end_run()

INFO:logger:Initialize the experiment.


Associating projects/934903580331/locations/us-central1/metadataStores/default/contexts/compare-local-v1-rnn-20220729031913-exp-run-id-2 to Experiment: compare-local-v1-rnn-20220729031913


INFO:google.cloud.aiplatform.metadata.experiment_resources:Associating projects/934903580331/locations/us-central1/metadataStores/default/contexts/compare-local-v1-rnn-20220729031913-exp-run-id-2 to Experiment: compare-local-v1-rnn-20220729031913
INFO:root:Get the model.
INFO:root:Train the model.


Epoch 1/2
Epoch 2/2


INFO:root:Evaluate model.




INFO:root:Exit the experiment.


### Compare the baseline with the new model implementation

In [271]:
# Get experiment
logging.info("Get experiment status.")
experiment_df = vertex_ai.get_experiment_df()
experiment_df

INFO:root:Get experiment status.


Unnamed: 0,experiment_name,run_name,run_type,state,param.epochs,param.learning_rate,param.epsilon,param.verbose,param.batch_size,param.beta_2,param.vocab_size,param.role,param.steps,param.buffer_size,param.beta_1,metric.test_accuracy,metric.test_loss,time_series_metric.train_loss,time_series_metric.train_accuracy
0,compare-local-v1-rnn-20220729031913,exp-run-id-2,system.ExperimentRun,COMPLETE,2.0,0.0001,1e-07,1.0,64.0,0.999,1000.0,stacked,391.0,10000.0,0.9,0.85356,0.354947,0.384967,0.83004
1,compare-local-v1-rnn-20220729031913,exp-run-id-1,system.ExperimentRun,COMPLETE,2.0,0.0001,1e-07,1.0,64.0,0.999,1000.0,stacked,391.0,10000.0,0.9,0.839,0.38272,0.426342,0.79668


In [273]:
# Get time series metrics
exp_run = vertex_ai.ExperimentRun(ID_2, experiment=EXPERIMENT_NAME)
logging.info("Get time series metrics.")
ts_runs_df = exp_run.get_time_series_data_frame()
print(ts_runs_df)

INFO:root:Get time series metrics.


   step                        wall_time  train_loss  train_accuracy
0     1 2022-07-29 05:09:01.553000+00:00    0.624836         0.58660
1     2 2022-07-29 05:09:01.649000+00:00    0.384967         0.83004


### Clean-up

In [None]:
# # Delete experiment
# exp = vertex_ai.Experiment(EXPERIMENT_NAME)
# exp.delete(delete_backing_tensorboard_runs=True)

# # Delete Tensorboard
# vertex_ai_tb.delete()

Before you start running your pipeline experiments, you have to formalize your training as pipeline component.

To do that, you will use the `kfp.v2.dsl.component` decorator to convert your training task into a pipeline component. 

In [234]:
@component(
    packages_to_install=[
        "numpy==1.18.5",
        "pandas==1.0.4",
        "scikit-learn==0.23.1",
        "xgboost==1.1.1",
    ]
)
def custom_trainer(
    train_uri: str,
    label_uri: str,
    max_depth: int,
    learning_rate: float,
    boost_rounds: int,
    model_uri: str,
    metrics: Output[Metrics],
    model_metadata: Output[Model],
):

    # import libraries
    import logging
    import uuid
    from pathlib import Path as path

    import pandas as pd
    import xgboost as xgb
    from sklearn.metrics import accuracy_score
    from sklearn.model_selection import train_test_split

    # variables
    gs_prefix = "gs://"
    gcsfuse_prefix = "/gcs/"
    train_path = train_uri.replace(gs_prefix, gcsfuse_prefix)
    label_path = label_uri.replace(gs_prefix, gcsfuse_prefix)
    model_path = model_uri.replace(gs_prefix, gcsfuse_prefix)

    def get_logger():
        """
        Get the logger
        """
        logger = logging.getLogger(__name__)
        logger.setLevel(logging.INFO)
        handler = logging.StreamHandler()
        handler.setFormatter(
            logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
        )
        logger.addHandler(handler)
        return logger

    def get_data(
        train_path: str, label_path: str
    ) -> (xgb.DMatrix, pd.DataFrame, pd.DataFrame):
        """
        Get the data
        Args:
            train_path: the path of the train data
            label_path: the path of the label data
        Returns:
            the train data and the label data
        """
        # Load data into pandas, then use `.values` to get NumPy arrays
        data = pd.read_csv(train_path).values
        labels = pd.read_csv(label_path).values

        # Convert one-column 2D array into 1D array for use with XGBoost
        labels = labels.reshape((labels.size,))
        train_data, test_data, train_labels, test_labels = train_test_split(
            data, labels, test_size=0.2, random_state=7
        )

        # Load data into DMatrix object
        dtrain = xgb.DMatrix(train_data, label=train_labels)
        return dtrain, test_data, test_labels

    def train_model(max_depth: int, eta: int, boost_rounds, dtrain: xgb.DMatrix):
        """
        Train the model
        Args:
            max_depth: the max depth of the model
            eta: the eta of the model
            boost_rounds: the boost rounds of the model
            dtrain: the train data
        Returns:
            the trained model
        """
        # Train XGBoost model
        param = {"max_depth": max_depth, "eta": eta}
        model = xgb.train(param, dtrain, num_boost_round=boost_rounds)
        return model

    def evaluate_model(model, test_data, test_labels):
        """
        Evaluate the model
        Args:
            model: the trained model
            test_data: the test data
            test_labels: the test labels
        Returns:
            the accuracy of the model
        """
        dtest = xgb.DMatrix(test_data)
        pred = model.predict(dtest)
        predictions = [round(value) for value in pred]
        # Evaluate predictions
        accuracy = accuracy_score(test_labels, predictions)
        return accuracy

    def save_model(model, model_path):
        """
        Save the model
        Args:
            model: the trained model
            model_path: the path of the model
        """
        model_id = str(uuid.uuid1())
        model_path = f"{model_path}/{model_id}/model.bst"
        path(model_path).parent.mkdir(parents=True, exist_ok=True)
        model.save_model(model_path)

    # Main ----------------------------------------------

    dtrain, test_data, test_labels = get_data(train_path, label_path)
    model = train_model(max_depth, learning_rate, boost_rounds, dtrain)
    accuracy = evaluate_model(model, test_data, test_labels)
    save_model(model, model_path)

    # Metadata ------------------------------------------
    metrics.log_metric("accurancy", accuracy)
    model_metadata.uri = model_uri

## Build a pipeline

### Define your workflow using Kubeflow Pipelines DSL package

In [235]:
@dsl.pipeline(name="custom-training-pipeline")
def pipeline(
    train_uri: str,
    label_uri: str,
    max_depth: int,
    learning_rate: float,
    boost_rounds: int,
    model_uri: str,
):

    custom_trainer(
        train_uri, label_uri, max_depth, learning_rate, boost_rounds, model_uri
    )

### Compile your pipeline into a JSON file

In [236]:
compiler.Compiler().compile(pipeline_func=pipeline, package_path="pipeline.json")

## Submit and track pipeline runs

### Submit Pipeline runs

Now that you have the pipeline, you define its training configuration depending on the defined parameters. Below you have an example and how you can submit several pipeline runs. 

In [237]:
runs = [
    {"max_depth": 4, "learning_rate": 0.2, "boost_rounds": 10},
    {"max_depth": 5, "learning_rate": 0.3, "boost_rounds": 20},
    {"max_depth": 3, "learning_rate": 0.1, "boost_rounds": 30},
    {"max_depth": 6, "learning_rate": 0.5, "boost_rounds": 40},
    {"max_depth": 5, "learning_rate": 0.4, "boost_rounds": 30},
]

In [238]:
for i, run in enumerate(runs):

    job = vertex_ai.PipelineJob(
        display_name=f"{EXPERIMENT_NAME}-pipeline-run-{i}",
        template_path="pipeline.json",
        pipeline_root=PIPELINE_URI,
        enable_caching=False,
        parameter_values={
            "train_uri": TRAIN_URI,
            "label_uri": LABEL_URI,
            "model_uri": MODEL_URI,
            **run,
        },
    )
    job.submit(experiment=EXPERIMENT_NAME)

Creating PipelineJob


INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob


PipelineJob created. Resource name: projects/934903580331/locations/us-central1/pipelineJobs/custom-training-pipeline-20220729024813


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/934903580331/locations/us-central1/pipelineJobs/custom-training-pipeline-20220729024813


To use this PipelineJob in another session:


INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:


pipeline_job = aiplatform.PipelineJob.get('projects/934903580331/locations/us-central1/pipelineJobs/custom-training-pipeline-20220729024813')


INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/934903580331/locations/us-central1/pipelineJobs/custom-training-pipeline-20220729024813')


View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/custom-training-pipeline-20220729024813?project=934903580331


INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/custom-training-pipeline-20220729024813?project=934903580331


Associating projects/934903580331/locations/us-central1/pipelineJobs/custom-training-pipeline-20220729024813 to Experiment: pipe-run-associated-experiment-v3-xgboost-20220729024800


INFO:google.cloud.aiplatform.metadata.experiment_resources:Associating projects/934903580331/locations/us-central1/pipelineJobs/custom-training-pipeline-20220729024813 to Experiment: pipe-run-associated-experiment-v3-xgboost-20220729024800


Creating PipelineJob


INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob


PipelineJob created. Resource name: projects/934903580331/locations/us-central1/pipelineJobs/custom-training-pipeline-20220729024815


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/934903580331/locations/us-central1/pipelineJobs/custom-training-pipeline-20220729024815


To use this PipelineJob in another session:


INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:


pipeline_job = aiplatform.PipelineJob.get('projects/934903580331/locations/us-central1/pipelineJobs/custom-training-pipeline-20220729024815')


INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/934903580331/locations/us-central1/pipelineJobs/custom-training-pipeline-20220729024815')


View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/custom-training-pipeline-20220729024815?project=934903580331


INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/custom-training-pipeline-20220729024815?project=934903580331


Associating projects/934903580331/locations/us-central1/pipelineJobs/custom-training-pipeline-20220729024815 to Experiment: pipe-run-associated-experiment-v3-xgboost-20220729024800


INFO:google.cloud.aiplatform.metadata.experiment_resources:Associating projects/934903580331/locations/us-central1/pipelineJobs/custom-training-pipeline-20220729024815 to Experiment: pipe-run-associated-experiment-v3-xgboost-20220729024800


Creating PipelineJob


INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob


PipelineJob created. Resource name: projects/934903580331/locations/us-central1/pipelineJobs/custom-training-pipeline-20220729024817


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/934903580331/locations/us-central1/pipelineJobs/custom-training-pipeline-20220729024817


To use this PipelineJob in another session:


INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:


pipeline_job = aiplatform.PipelineJob.get('projects/934903580331/locations/us-central1/pipelineJobs/custom-training-pipeline-20220729024817')


INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/934903580331/locations/us-central1/pipelineJobs/custom-training-pipeline-20220729024817')


View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/custom-training-pipeline-20220729024817?project=934903580331


INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/custom-training-pipeline-20220729024817?project=934903580331


Associating projects/934903580331/locations/us-central1/pipelineJobs/custom-training-pipeline-20220729024817 to Experiment: pipe-run-associated-experiment-v3-xgboost-20220729024800


INFO:google.cloud.aiplatform.metadata.experiment_resources:Associating projects/934903580331/locations/us-central1/pipelineJobs/custom-training-pipeline-20220729024817 to Experiment: pipe-run-associated-experiment-v3-xgboost-20220729024800


Creating PipelineJob


INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob


PipelineJob created. Resource name: projects/934903580331/locations/us-central1/pipelineJobs/custom-training-pipeline-20220729024819


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/934903580331/locations/us-central1/pipelineJobs/custom-training-pipeline-20220729024819


To use this PipelineJob in another session:


INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:


pipeline_job = aiplatform.PipelineJob.get('projects/934903580331/locations/us-central1/pipelineJobs/custom-training-pipeline-20220729024819')


INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/934903580331/locations/us-central1/pipelineJobs/custom-training-pipeline-20220729024819')


View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/custom-training-pipeline-20220729024819?project=934903580331


INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/custom-training-pipeline-20220729024819?project=934903580331


Associating projects/934903580331/locations/us-central1/pipelineJobs/custom-training-pipeline-20220729024819 to Experiment: pipe-run-associated-experiment-v3-xgboost-20220729024800


INFO:google.cloud.aiplatform.metadata.experiment_resources:Associating projects/934903580331/locations/us-central1/pipelineJobs/custom-training-pipeline-20220729024819 to Experiment: pipe-run-associated-experiment-v3-xgboost-20220729024800


Creating PipelineJob


INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob


PipelineJob created. Resource name: projects/934903580331/locations/us-central1/pipelineJobs/custom-training-pipeline-20220729024820


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/934903580331/locations/us-central1/pipelineJobs/custom-training-pipeline-20220729024820


To use this PipelineJob in another session:


INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:


pipeline_job = aiplatform.PipelineJob.get('projects/934903580331/locations/us-central1/pipelineJobs/custom-training-pipeline-20220729024820')


INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/934903580331/locations/us-central1/pipelineJobs/custom-training-pipeline-20220729024820')


View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/custom-training-pipeline-20220729024820?project=934903580331


INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/custom-training-pipeline-20220729024820?project=934903580331


Associating projects/934903580331/locations/us-central1/pipelineJobs/custom-training-pipeline-20220729024820 to Experiment: pipe-run-associated-experiment-v3-xgboost-20220729024800


INFO:google.cloud.aiplatform.metadata.experiment_resources:Associating projects/934903580331/locations/us-central1/pipelineJobs/custom-training-pipeline-20220729024820 to Experiment: pipe-run-associated-experiment-v3-xgboost-20220729024800


### Check Pipeline run states

Vertex AI SDK provides you `get_experiment_df` method to monitor the status of pipeline runs. You can use it either to return parameters and metrics of the Pipeline Runs in the Vertex AI Experiment or in combination with `get` method of `PipelineJob` to return the pipeline job in Vertex AI Pipeline.


In [239]:
# see state of all pipelineJob
vertex_ai.get_experiment_df(EXPERIMENT_NAME)

Unnamed: 0,experiment_name,run_name,run_type,state,param.model_uri,param.learning_rate,param.max_depth,param.train_uri,param.boost_rounds,param.label_uri
0,pipe-run-associated-experiment-v3-xgboost-2022...,custom-training-pipeline-20220729024820,system.PipelineRun,RUNNING,gs://test-us-central/model,0.4,5,gs://test-us-central/iris/iris_data.csv,30,gs://test-us-central/iris/iris_target.csv
1,pipe-run-associated-experiment-v3-xgboost-2022...,custom-training-pipeline-20220729024819,system.PipelineRun,RUNNING,gs://test-us-central/model,0.5,6,gs://test-us-central/iris/iris_data.csv,40,gs://test-us-central/iris/iris_target.csv
2,pipe-run-associated-experiment-v3-xgboost-2022...,custom-training-pipeline-20220729024817,system.PipelineRun,RUNNING,gs://test-us-central/model,0.1,3,gs://test-us-central/iris/iris_data.csv,30,gs://test-us-central/iris/iris_target.csv
3,pipe-run-associated-experiment-v3-xgboost-2022...,custom-training-pipeline-20220729024815,system.PipelineRun,RUNNING,gs://test-us-central/model,0.3,5,gs://test-us-central/iris/iris_data.csv,20,gs://test-us-central/iris/iris_target.csv
4,pipe-run-associated-experiment-v3-xgboost-2022...,custom-training-pipeline-20220729024813,system.PipelineRun,RUNNING,gs://test-us-central/model,0.2,4,gs://test-us-central/iris/iris_data.csv,10,gs://test-us-central/iris/iris_target.csv


In [240]:
while True:
    pipeline_experiments_df = vertex_ai.get_experiment_df(EXPERIMENT_NAME)
    if all(
        pipeline_state != "COMPLETE" for pipeline_state in pipeline_experiments_df.state
    ):
        print("Pipeline runs are still running...")
        if any(
            pipeline_state == "FAILED"
            for pipeline_state in pipeline_experiments_df.state
        ):
            print("At least one Pipeline run failed")
            break
    else:
        print("Pipeline experiment runs have completed")
        break
    time.sleep(60)

Pipeline runs are still running...
Pipeline experiment runs have completed


In [241]:
# Get the PipelineJob resource using the experiment run name
pipeline_experiments_df = vertex_ai.get_experiment_df(EXPERIMENT_NAME)
job = vertex_ai.PipelineJob.get(pipeline_experiments_df.run_name[0])
print(job.resource_name)
print(job._dashboard_uri())

projects/934903580331/locations/us-central1/pipelineJobs/custom-training-pipeline-20220729024820
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/custom-training-pipeline-20220729024820?project=934903580331


# Example 4 - Associate pipeline run with experiment run
* explictly start / stop experiment runs
* explicitly log params and metrics

In [255]:
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
VERSION = 'v26'

# Experiments
TASK = "explicit-logging"
MODEL_TYPE = "xgboost"
EXPERIMENT_NAME = f"pipe-run-{TASK}-{VERSION}-{MODEL_TYPE}-{TIMESTAMP}"

# Pipeline
PIPELINE_URI = f"{BUCKET_URI}/pipelines"
TRAIN_URI = f"{BUCKET_URI}/iris/iris_data.csv"
LABEL_URI = f"{BUCKET_URI}/iris/iris_target.csv"
MODEL_URI = f"{BUCKET_URI}/model"

print(f"EXPERIMENT_NAME: {EXPERIMENT_NAME}")

EXPERIMENT_NAME: pipe-run-explicit-logging-v26-xgboost-20220729030143


In [257]:
@component(
    packages_to_install=[
        "numpy==1.18.5",
        "pandas==1.0.4",
        "scikit-learn==0.23.1",
        "xgboost==1.1.1",
        "google-cloud-aiplatform",
    ]
)
def custom_trainer(
    train_uri: str,
    label_uri: str,
    max_depth: int,
    learning_rate: float,
    epochs: int,
    batch_size: int,
    boost_rounds: int,
    model_uri: str,
    experiment_name: str,
    experiment_run: str,
    pipeline_name: str,
    metrics: Output[Metrics],
    model_metadata: Output[Model],
):

    # import libraries
    import logging
    import uuid
    from pathlib import Path as path

    import pandas as pd
    import xgboost as xgb
    from sklearn.metrics import accuracy_score
    from sklearn.model_selection import train_test_split
    import google.cloud.aiplatform as vertex_ai
    
    vertex_ai.init(project='hybrid-vertex', location='us-central1', staging_bucket='gs://test-us-central')

    # variables
    gs_prefix = "gs://"
    gcsfuse_prefix = "/gcs/"
    train_path = train_uri.replace(gs_prefix, gcsfuse_prefix)
    label_path = label_uri.replace(gs_prefix, gcsfuse_prefix)
    model_path = model_uri.replace(gs_prefix, gcsfuse_prefix)
    
    metaparams = {}
    metaparams["train_uri"] = train_uri
    metaparams["label_uri"] = label_uri
    metaparams["model_uri"] = model_uri
    metaparams["experiment_name"] = experiment_name
    metaparams["experiment_run"] = experiment_run
    
    hyperparams = {}
    hyperparams["epochs"] = epochs
    hyperparams["batch_size"] = batch_size
    hyperparams["learning_rate"] = learning_rate
    hyperparams["max_depth"] = max_depth
    hyperparams["boost_rounds"] = boost_rounds
    
    # ====================================================
    # Experiments
    # ====================================================
    
    EXPERIMENT_NAME = f"{experiment_name}"
    RUN_NAME = f"{experiment_run}"
    
    logging.info(f"Creating run: {RUN_NAME}; for experiment: {EXPERIMENT_NAME}")
    
    # Create experiment
    vertex_ai.init(experiment=EXPERIMENT_NAME)
    # vertex_ai.start_run(RUN_NAME,resume=True) # RUN_NAME
    
    with vertex_ai.start_run(RUN_NAME) as my_run:
        logging.info(f"logging metaparams")
        my_run.log_params(metaparams)
        
        logging.info(f"logging hyperparams")
        my_run.log_params(hyperparams)
    
    # ====================================================

    def get_logger():
        """
        Get the logger
        """
        logger = logging.getLogger(__name__)
        logger.setLevel(logging.INFO)
        handler = logging.StreamHandler()
        handler.setFormatter(
            logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
        )
        logger.addHandler(handler)
        return logger

    def get_data(
        train_path: str, label_path: str
    ) -> (xgb.DMatrix, pd.DataFrame, pd.DataFrame):
        """
        Get the data
        Args:
            train_path: the path of the train data
            label_path: the path of the label data
        Returns:
            the train data and the label data
        """
        # Load data into pandas, then use `.values` to get NumPy arrays
        data = pd.read_csv(train_path).values
        labels = pd.read_csv(label_path).values

        # Convert one-column 2D array into 1D array for use with XGBoost
        labels = labels.reshape((labels.size,))
        train_data, test_data, train_labels, test_labels = train_test_split(
            data, labels, test_size=0.2, random_state=7
        )

        # Load data into DMatrix object
        dtrain = xgb.DMatrix(train_data, label=train_labels)
        return dtrain, test_data, test_labels

    def train_model(max_depth: int, eta: int, boost_rounds, dtrain: xgb.DMatrix):
        """
        Train the model
        Args:
            max_depth: the max depth of the model
            eta: the eta of the model
            boost_rounds: the boost rounds of the model
            dtrain: the train data
        Returns:
            the trained model
        """
        # Train XGBoost model
        param = {"max_depth": max_depth, "eta": eta}
        model = xgb.train(param, dtrain, num_boost_round=boost_rounds)
        return model

    def evaluate_model(model, test_data, test_labels):
        """
        Evaluate the model
        Args:
            model: the trained model
            test_data: the test data
            test_labels: the test labels
        Returns:
            the accuracy of the model
        """
        dtest = xgb.DMatrix(test_data)
        pred = model.predict(dtest)
        predictions = [round(value) for value in pred]
        # Evaluate predictions
        accuracy = accuracy_score(test_labels, predictions)
        return accuracy

    def save_model(model, model_path):
        """
        Save the model
        Args:
            model: the trained model
            model_path: the path of the model
        """
        model_id = str(uuid.uuid1())
        model_path = f"{model_path}/{model_id}/model.bst"
        path(model_path).parent.mkdir(parents=True, exist_ok=True)
        model.save_model(model_path)

    # Main ----------------------------------------------

    dtrain, test_data, test_labels = get_data(train_path, label_path)
    model = train_model(max_depth, learning_rate, boost_rounds, dtrain)
    accuracy = evaluate_model(model, test_data, test_labels)
    save_model(model, model_path)

    # Metadata ------------------------------------------
    logging.info(f"logging metrics")
    metrics.log_metric("accuracy", accuracy)
    model_metadata.uri = model_uri

    with vertex_ai.start_run(RUN_NAME,resume=True) as my_run:
        logging.info(f"logging accuracy to experiment run {RUN_NAME}")
        my_run.log_metrics({"accuracy": accuracy})
    
    # logging.info(f"Ending experiment run: {RUN_NAME}")
    # vertex_ai.end_run()

In [258]:
@dsl.pipeline(name=f"custom-training-pipeline-{VERSION}")
def pipeline(
    train_uri: str,
    label_uri: str,
    max_depth: int,
    learning_rate: float,
    boost_rounds: int,
    model_uri: str,
    experiment_name: str,
    experiment_run: str,
    epochs: int,
    batch_size: int,
    pipeline_name: str,
):

    custom_trainer(
        train_uri=train_uri, 
        label_uri=label_uri, 
        max_depth=max_depth, 
        learning_rate=learning_rate, 
        epochs=epochs, 
        pipeline_name=pipeline_name,
        batch_size=batch_size, 
        boost_rounds=boost_rounds, 
        model_uri=model_uri, 
        experiment_name=experiment_name,
        experiment_run=experiment_run
    )

In [259]:
compiler.Compiler().compile(pipeline_func=pipeline, package_path="pipeline.json")

In [260]:
runs = [
    {"experiment_name": f'{EXPERIMENT_NAME}', "run_name": 'run-1', "max_depth": 4, "learning_rate": 0.2, "boost_rounds": 10, "epochs": 10, "batch_size": 200},
    {"experiment_name": f'{EXPERIMENT_NAME}', "run_name": 'run-2', "max_depth": 5, "learning_rate": 0.3, "boost_rounds": 20, "epochs": 12, "batch_size": 250},
    {"experiment_name": f'{EXPERIMENT_NAME}', "run_name": 'run-3', "max_depth": 3, "learning_rate": 0.1, "boost_rounds": 30, "epochs": 14, "batch_size": 300},
    {"experiment_name": f'{EXPERIMENT_NAME}', "run_name": 'run-4', "max_depth": 6, "learning_rate": 0.5, "boost_rounds": 40, "epochs": 16, "batch_size": 350},
    {"experiment_name": f'{EXPERIMENT_NAME}', "run_name": 'run-5', "max_depth": 5, "learning_rate": 0.4, "boost_rounds": 30, "epochs": 18, "batch_size": 400},
]

In [261]:
import google.cloud.aiplatform as vertex_ai

vertex_ai.init(experiment=EXPERIMENT_NAME, project='hybrid-vertex', location='us-central1', staging_bucket='gs://test-us-central')
    
for i, run in enumerate(runs):

    job = vertex_ai.PipelineJob(
        display_name=f"{EXPERIMENT_NAME}-pipeline-run-{i}",
        template_path="pipeline.json",
        pipeline_root=f'{PIPELINE_URI}',
        job_id=f"{EXPERIMENT_NAME}-pipeline-run-{i}",
        enable_caching=False,
        parameter_values={
            "train_uri": TRAIN_URI,
            "label_uri": LABEL_URI,
            "model_uri": MODEL_URI,
            "experiment_name": run["experiment_name"],
            "experiment_run": run["run_name"],
            "max_depth": run["max_depth"],
            "learning_rate": run["learning_rate"],
            "boost_rounds": run["boost_rounds"],
            "epochs": run["epochs"],
            "batch_size": run["batch_size"],
            "pipeline_name": f"{EXPERIMENT_NAME}-pipeline-run-{i}",
        },
    )
    # job.submit(experiment=EXPERIMENT_NAME)
    job.submit()

Creating PipelineJob


INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob


PipelineJob created. Resource name: projects/934903580331/locations/us-central1/pipelineJobs/pipe-run-explicit-logging-v26-xgboost-20220729030143-pipeline-run-0


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/934903580331/locations/us-central1/pipelineJobs/pipe-run-explicit-logging-v26-xgboost-20220729030143-pipeline-run-0


To use this PipelineJob in another session:


INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:


pipeline_job = aiplatform.PipelineJob.get('projects/934903580331/locations/us-central1/pipelineJobs/pipe-run-explicit-logging-v26-xgboost-20220729030143-pipeline-run-0')


INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/934903580331/locations/us-central1/pipelineJobs/pipe-run-explicit-logging-v26-xgboost-20220729030143-pipeline-run-0')


View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/pipe-run-explicit-logging-v26-xgboost-20220729030143-pipeline-run-0?project=934903580331


INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/pipe-run-explicit-logging-v26-xgboost-20220729030143-pipeline-run-0?project=934903580331


Creating PipelineJob


INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob


PipelineJob created. Resource name: projects/934903580331/locations/us-central1/pipelineJobs/pipe-run-explicit-logging-v26-xgboost-20220729030143-pipeline-run-1


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/934903580331/locations/us-central1/pipelineJobs/pipe-run-explicit-logging-v26-xgboost-20220729030143-pipeline-run-1


To use this PipelineJob in another session:


INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:


pipeline_job = aiplatform.PipelineJob.get('projects/934903580331/locations/us-central1/pipelineJobs/pipe-run-explicit-logging-v26-xgboost-20220729030143-pipeline-run-1')


INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/934903580331/locations/us-central1/pipelineJobs/pipe-run-explicit-logging-v26-xgboost-20220729030143-pipeline-run-1')


View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/pipe-run-explicit-logging-v26-xgboost-20220729030143-pipeline-run-1?project=934903580331


INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/pipe-run-explicit-logging-v26-xgboost-20220729030143-pipeline-run-1?project=934903580331


Creating PipelineJob


INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob


PipelineJob created. Resource name: projects/934903580331/locations/us-central1/pipelineJobs/pipe-run-explicit-logging-v26-xgboost-20220729030143-pipeline-run-2


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/934903580331/locations/us-central1/pipelineJobs/pipe-run-explicit-logging-v26-xgboost-20220729030143-pipeline-run-2


To use this PipelineJob in another session:


INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:


pipeline_job = aiplatform.PipelineJob.get('projects/934903580331/locations/us-central1/pipelineJobs/pipe-run-explicit-logging-v26-xgboost-20220729030143-pipeline-run-2')


INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/934903580331/locations/us-central1/pipelineJobs/pipe-run-explicit-logging-v26-xgboost-20220729030143-pipeline-run-2')


View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/pipe-run-explicit-logging-v26-xgboost-20220729030143-pipeline-run-2?project=934903580331


INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/pipe-run-explicit-logging-v26-xgboost-20220729030143-pipeline-run-2?project=934903580331


Creating PipelineJob


INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob


PipelineJob created. Resource name: projects/934903580331/locations/us-central1/pipelineJobs/pipe-run-explicit-logging-v26-xgboost-20220729030143-pipeline-run-3


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/934903580331/locations/us-central1/pipelineJobs/pipe-run-explicit-logging-v26-xgboost-20220729030143-pipeline-run-3


To use this PipelineJob in another session:


INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:


pipeline_job = aiplatform.PipelineJob.get('projects/934903580331/locations/us-central1/pipelineJobs/pipe-run-explicit-logging-v26-xgboost-20220729030143-pipeline-run-3')


INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/934903580331/locations/us-central1/pipelineJobs/pipe-run-explicit-logging-v26-xgboost-20220729030143-pipeline-run-3')


View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/pipe-run-explicit-logging-v26-xgboost-20220729030143-pipeline-run-3?project=934903580331


INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/pipe-run-explicit-logging-v26-xgboost-20220729030143-pipeline-run-3?project=934903580331


Creating PipelineJob


INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob


PipelineJob created. Resource name: projects/934903580331/locations/us-central1/pipelineJobs/pipe-run-explicit-logging-v26-xgboost-20220729030143-pipeline-run-4


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/934903580331/locations/us-central1/pipelineJobs/pipe-run-explicit-logging-v26-xgboost-20220729030143-pipeline-run-4


To use this PipelineJob in another session:


INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:


pipeline_job = aiplatform.PipelineJob.get('projects/934903580331/locations/us-central1/pipelineJobs/pipe-run-explicit-logging-v26-xgboost-20220729030143-pipeline-run-4')


INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/934903580331/locations/us-central1/pipelineJobs/pipe-run-explicit-logging-v26-xgboost-20220729030143-pipeline-run-4')


View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/pipe-run-explicit-logging-v26-xgboost-20220729030143-pipeline-run-4?project=934903580331


INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/pipe-run-explicit-logging-v26-xgboost-20220729030143-pipeline-run-4?project=934903580331


In [262]:
# see state of all pipelineJob
vertex_ai.get_experiment_df(EXPERIMENT_NAME)

Unnamed: 0,experiment_name,run_name,run_type,state,param.learning_rate,param.model_uri,param.max_depth,param.label_uri,param.batch_size,param.train_uri,param.epochs,param.experiment_name,param.experiment_run,param.boost_rounds,metric.accuracy
0,pipe-run-explicit-logging-v26-xgboost-20220729...,run-4,system.ExperimentRun,COMPLETE,0.5,gs://test-us-central/model,6.0,gs://test-us-central/iris/iris_target.csv,350.0,gs://test-us-central/iris/iris_data.csv,16.0,pipe-run-explicit-logging-v26-xgboost-20220729...,run-4,40.0,0.9
1,pipe-run-explicit-logging-v26-xgboost-20220729...,run-5,system.ExperimentRun,COMPLETE,0.4,gs://test-us-central/model,5.0,gs://test-us-central/iris/iris_target.csv,400.0,gs://test-us-central/iris/iris_data.csv,18.0,pipe-run-explicit-logging-v26-xgboost-20220729...,run-5,30.0,0.9
2,pipe-run-explicit-logging-v26-xgboost-20220729...,run-2,system.ExperimentRun,COMPLETE,0.3,gs://test-us-central/model,5.0,gs://test-us-central/iris/iris_target.csv,250.0,gs://test-us-central/iris/iris_data.csv,12.0,pipe-run-explicit-logging-v26-xgboost-20220729...,run-2,20.0,0.866667
3,pipe-run-explicit-logging-v26-xgboost-20220729...,run-3,system.ExperimentRun,COMPLETE,0.1,gs://test-us-central/model,3.0,gs://test-us-central/iris/iris_target.csv,300.0,gs://test-us-central/iris/iris_data.csv,14.0,pipe-run-explicit-logging-v26-xgboost-20220729...,run-3,30.0,0.9
4,pipe-run-explicit-logging-v26-xgboost-20220729...,run-1,system.ExperimentRun,COMPLETE,0.2,gs://test-us-central/model,4.0,gs://test-us-central/iris/iris_target.csv,200.0,gs://test-us-central/iris/iris_data.csv,10.0,pipe-run-explicit-logging-v26-xgboost-20220729...,run-1,10.0,0.9


In [None]:
# runs = [
#     {"experiment_name": f'{EXPERIMENT_NAME}', "max_depth": 4, "learning_rate": 0.2, "boost_rounds": 10, "epochs": 10, "batch_size": 200},
#     {"experiment_name": f'{EXPERIMENT_NAME}', "max_depth": 5, "learning_rate": 0.3, "boost_rounds": 20, "epochs": 12, "batch_size": 250},
#     {"experiment_name": f'{EXPERIMENT_NAME}', "max_depth": 3, "learning_rate": 0.1, "boost_rounds": 30, "epochs": 14, "batch_size": 300},
#     {"experiment_name": f'{EXPERIMENT_NAME}', "max_depth": 6, "learning_rate": 0.5, "boost_rounds": 40, "epochs": 16, "batch_size": 350},
#     {"experiment_name": f'{EXPERIMENT_NAME}', "max_depth": 5, "learning_rate": 0.4, "boost_rounds": 30, "epochs": 18, "batch_size": 400},
# ]

## Cleaning up

To clean up all Google Cloud resources used in this project, you can [delete the Google Cloud
project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.

Otherwise, you can delete the individual resources you created in this tutorial

In [None]:
# Delete the pipeline
for i in range(0, len(runs)):
    pipeline_job = vertex_ai.PipelineJob.get(pipeline_experiments_df.run_name[i])
    pipeline_job.delete()

In [None]:
# Delete experiment
exp = vertex_ai.Experiment(EXPERIMENT_NAME)
exp.delete()

In [None]:
delete_bucket = False

if delete_bucket or os.getenv("IS_TESTING"):
    ! gsutil rm -rf {BUCKET_URI}