# Build a pipeline for continuous model training

## Objectives

The following steps cover this process:

1. Acquire and prepare dataset in BigQuery.
2. Create and upload a custom training package. When executed, it reads data from the dataset and trains the model.
3. Build a Vertex AI Pipeline. This pipeline executes the custom training package, uploads the model to the Vertex AI Model Registry, runs the evaluation job, and sends an email notification.
4. Manually run the pipeline.
5. Create a Cloud Function with an Eventarc trigger that runs the pipeline whenever new data is inserted into the BigQuery dataset.

In [1]:
! python3 -c "from google.cloud import aiplatform; print('aiplatform SDK version: {}'.format(aiplatform.__version__))"
! python3 -c "from google.cloud import bigquery; print('bigquery SDK version: {}'.format(bigquery.__version__))"
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
! python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

aiplatform SDK version: 1.71.0
bigquery SDK version: 3.25.0
KFP SDK version: 2.7.0
google_cloud_pipeline_components version: 2.17.0


In [3]:
VERSION = "v2"

In [4]:
from google.cloud import aiplatform
from google.cloud import bigquery
from google.cloud import storage

from pprint import pprint
# from time import time
import time

print(f'aiplatform SDK version: {aiplatform.__version__}')
print(f'bigquery SDK version: {bigquery.__version__}')

PROJECT_ID = "hybrid-vertex"
REGION = "us-central1"
BUCKET_NAME = f"ct-pipeline-{VERSION}"
BUCKET_URI = f"gs://{BUCKET_NAME}"

bq_client = bigquery.Client(
    project=PROJECT_ID,
    location=REGION
)

# Set the project id
! gcloud config set project {PROJECT_ID}

aiplatform SDK version: 1.71.0
bigquery SDK version: 3.25.0
Updated property [core/project].


In [5]:
! gcloud storage buckets create $BUCKET_URI --location=$REGION --project=$PROJECT_ID

Creating gs://ct-pipeline-v2/...


In [6]:
! ls

README.md			    ct-training.yaml  requirements.txt
continuous_training_pipeline.ipynb  data_prep.ipynb   training_package


Copy the training package to the bucket

# Custom training package

### training package directory

```
training_package
├── __init__.py
├── setup.py
└── trainer
    ├── __init__.py
    └── task.py
```

Run `setup.py` to create source distribution for training app

In [7]:
! cd training_package && python setup.py sdist --formats=gztar && cd ..

running sdist
running egg_info
writing trainer.egg-info/PKG-INFO
writing dependency_links to trainer.egg-info/dependency_links.txt
writing requirements to trainer.egg-info/requires.txt
writing top-level names to trainer.egg-info/top_level.txt
reading manifest file 'trainer.egg-info/SOURCES.txt'
writing manifest file 'trainer.egg-info/SOURCES.txt'

running check
creating trainer-0.1
creating trainer-0.1/trainer
creating trainer-0.1/trainer.egg-info
copying files to trainer-0.1...
copying setup.py -> trainer-0.1
copying trainer/__init__.py -> trainer-0.1/trainer
copying trainer/task.py -> trainer-0.1/trainer
copying trainer.egg-info/PKG-INFO -> trainer-0.1/trainer.egg-info
copying trainer.egg-info/SOURCES.txt -> trainer-0.1/trainer.egg-info
copying trainer.egg-info/dependency_links.txt -> trainer-0.1/trainer.egg-info
copying trainer.egg-info/requires.txt -> trainer-0.1/trainer.egg-info
copying trainer.egg-info/top_level.txt -> trainer-0.1/trainer.egg-info
copying trainer.egg-info/SOURCES

In [8]:
! gcloud storage cp training_package/dist/trainer-0.1.tar.gz $BUCKET_URI/

Copying file://training_package/dist/trainer-0.1.tar.gz to gs://ct-pipeline-v2/trainer-0.1.tar.gz
  Completed files 1/1 | 3.2kiB/3.2kiB                                          


# Vertex Experiments

In [76]:
EXPERIMENT_NAME   = f'training-{VERSION}'

# new experiment
invoke_time       = time.strftime("%Y%m%d-%H%M%S")
RUN_NAME          = f'run-{invoke_time}'

CHECKPT_DIR       = f"{BUCKET_URI}/{EXPERIMENT_NAME}/chkpoint"
BASE_OUTPUT_DIR   = f"{BUCKET_URI}/{EXPERIMENT_NAME}/{RUN_NAME}"
LOG_DIR           = f"{BASE_OUTPUT_DIR}/logs"
DATA_DIR          = f"{BASE_OUTPUT_DIR}/data"
ARTIFACTS_DIR     = f"{BASE_OUTPUT_DIR}/artifacts"

print(f"EXPERIMENT_NAME   : {EXPERIMENT_NAME}")
print(f"RUN_NAME          : {RUN_NAME}\n")
print(f"CHECKPT_DIR       : {CHECKPT_DIR}")
print(f"BASE_OUTPUT_DIR   : {BASE_OUTPUT_DIR}")
print(f"LOG_DIR           : {LOG_DIR}")
print(f"DATA_DIR          : {DATA_DIR}")
print(f"ARTIFACTS_DIR     : {ARTIFACTS_DIR}")

EXPERIMENT_NAME   : training-v2
RUN_NAME          : run-20241105-131344

CHECKPT_DIR       : gs://ct-pipeline-v2/training-v2/chkpoint
BASE_OUTPUT_DIR   : gs://ct-pipeline-v2/training-v2/run-20241105-131344
LOG_DIR           : gs://ct-pipeline-v2/training-v2/run-20241105-131344/logs
DATA_DIR          : gs://ct-pipeline-v2/training-v2/run-20241105-131344/data
ARTIFACTS_DIR     : gs://ct-pipeline-v2/training-v2/run-20241105-131344/artifacts


In [77]:
! gsutil -q cp requirements.txt $ARTIFACTS_DIR/requirements.txt
! gsutil -q cp requirements.txt $DATA_DIR/requirements.txt

! gsutil ls $BASE_OUTPUT_DIR

gs://ct-pipeline-v2/training-v2/run-20241105-131344/artifacts/
gs://ct-pipeline-v2/training-v2/run-20241105-131344/data/


In [78]:
from google.cloud import aiplatform

aiplatform.init(
    project=PROJECT_ID,
    staging_bucket=BUCKET_URI,
    location=REGION,
    experiment=EXPERIMENT_NAME
)

aiplatform.autolog()

# Vertex Pipeline

In [79]:
import os

EMAIL_RECIPIENTS = [ "jordantotten@google.com" ]

PIPELINE_ROOT = f"{BUCKET_URI}/pipeline_root/chicago-taxi-pipe"
PIPELINE_NAME = "ct-training"
WORKING_DIR = f"{PIPELINE_ROOT}/mlops-trigger-tutorial"
os.environ['AIP_MODEL_DIR'] = ARTIFACTS_DIR

PIPELINE_FILE = PIPELINE_NAME + ".yaml"

print(f"WORKING_DIR     : {WORKING_DIR}")
print(f"PIPELINE_FILE   : {PIPELINE_FILE}")

WORKING_DIR     : gs://ct-pipeline-v2/pipeline_root/chicago-taxi-pipe/mlops-trigger-tutorial
PIPELINE_FILE   : ct-training.yaml


## Build pipeline

In [80]:
from kfp import dsl
from kfp.dsl import importer
from kfp.dsl import OneOf
from google_cloud_pipeline_components.v1.custom_job import CustomTrainingJobOp
from google_cloud_pipeline_components.types import artifact_types
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from google_cloud_pipeline_components.v1.batch_predict_job import ModelBatchPredictOp
from google_cloud_pipeline_components.v1.model_evaluation import ModelEvaluationRegressionOp
from google_cloud_pipeline_components.v1.vertex_notification_email import VertexNotificationEmailOp
from google_cloud_pipeline_components.v1.endpoint import ModelDeployOp
from google_cloud_pipeline_components.v1.endpoint import EndpointCreateOp
from google.cloud import aiplatform
from google_cloud_pipeline_components.preview.model_evaluation.model_evaluation_import_component import model_evaluation_import as ModelImportEvaluationOp

# define the train-deploy pipeline
@dsl.pipeline(name=PIPELINE_NAME)
def custom_model_training_evaluation_pipeline(
    project: str,
    location: str,
    training_job_display_name: str,
    worker_pool_specs: list,
    base_output_dir: str,
    artifacts_dir: str,
    prediction_container_uri: str,
    model_display_name: str,
    batch_prediction_job_display_name: str,
    target_field_name: str,
    test_data_gcs_uri: list,
    ground_truth_gcs_source: list,
    batch_predictions_gcs_prefix: str,
    eval_display_name: str,
    batch_predictions_input_format: str="csv",
    batch_predictions_output_format: str="jsonl",
    ground_truth_format: str="csv",
    parent_model_resource_name: str=None,
    parent_model_artifact_uri: str=None,
    existing_model: bool=False

):
    # Notification task
    notify_task = VertexNotificationEmailOp(
        recipients= EMAIL_RECIPIENTS
    )
    
    with dsl.ExitHandler(notify_task, name='MLOps Continuous Training Pipeline'):
        # Train the model
        custom_job_task = CustomTrainingJobOp(
            project=project,
            display_name=training_job_display_name,
            worker_pool_specs=worker_pool_specs,
            base_output_directory=base_output_dir,
            location=location
        )

        # Import the unmanaged model
        import_unmanaged_model_task = importer(
            artifact_uri=artifacts_dir,
            artifact_class=artifact_types.UnmanagedContainerModel,
            metadata={
                "containerSpec": {
                    "imageUri": prediction_container_uri,
                },
            },
        ).after(custom_job_task)

        with dsl.If(existing_model == True):
            # Import the parent model to upload as a version
            import_registry_model_task = importer(
                artifact_uri=parent_model_artifact_uri,
                artifact_class=artifact_types.VertexModel,
                metadata={
                    "resourceName": parent_model_resource_name
                },
            ).after(import_unmanaged_model_task)
            
            # Upload the model as a version
            model_version_upload_op = ModelUploadOp(
                project=project,
                location=location,
                display_name=model_display_name,
                parent_model=import_registry_model_task.outputs["artifact"],
                unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
            )

        with dsl.Else():
            # Upload the model
            model_upload_op = ModelUploadOp(
                project=project,
                location=location,
                display_name=model_display_name,
                unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
            )
        
        # Get the model (or model version)
        model_resource = OneOf(
            model_version_upload_op.outputs["model"], 
            model_upload_op.outputs["model"]
        )

        # Batch prediction
        batch_predict_task = ModelBatchPredictOp(
            project=project,
            job_display_name=batch_prediction_job_display_name,
            model=model_resource,
            location=location,
            instances_format=batch_predictions_input_format,
            predictions_format=batch_predictions_output_format,
            gcs_source_uris=test_data_gcs_uri,
            gcs_destination_output_uri_prefix=batch_predictions_gcs_prefix,
            machine_type='n1-standard-4'
        )
        
        # Evaluation task
        evaluation_task = ModelEvaluationRegressionOp(
            project=project,
            target_field_name=target_field_name,
            location=location,
            model=model_resource,
            predictions_format=batch_predictions_output_format,
            predictions_gcs_source=batch_predict_task.outputs["gcs_output_directory"],
            ground_truth_format=ground_truth_format,
            ground_truth_gcs_source=ground_truth_gcs_source
        )
        
        # Import the evaluation result to Vertex AI.
        import_evaluation_task = ModelImportEvaluationOp(
            regression_metrics=evaluation_task.outputs['evaluation_metrics'],
            model=model_resource,
            dataset_type=batch_predictions_input_format,
            dataset_path="", # test_data_gcs_uri
            dataset_paths=ground_truth_gcs_source,
            display_name=eval_display_name,
        )
    return

## Compile pipeline

In [81]:
from kfp import dsl
from kfp import compiler

LOCAL_PIPELINE_YAML = f"{PIPELINE_NAME}.yaml"

compiler.Compiler().compile(
    pipeline_func=custom_model_training_evaluation_pipeline,
    package_path=LOCAL_PIPELINE_YAML,
    # package_path="{}.yaml".format(PIPELINE_NAME),
)

print(f"LOCAL_PIPELINE_YAML: {LOCAL_PIPELINE_YAML}")

LOCAL_PIPELINE_YAML: ct-training.yaml


In [82]:
! gsutil -q cp $LOCAL_PIPELINE_YAML $BASE_OUTPUT_DIR/$LOCAL_PIPELINE_YAML

! gsutil ls $BASE_OUTPUT_DIR

gs://ct-pipeline-v2/training-v2/run-20241105-131344/ct-training.yaml
gs://ct-pipeline-v2/training-v2/run-20241105-131344/artifacts/
gs://ct-pipeline-v2/training-v2/run-20241105-131344/data/


## Upload as pipeline template

In [83]:
REPO_NAME = "mlops"

# Create a repo in the artifact registry
# ! gcloud artifacts repositories create $REPO_NAME --location=$REGION --repository-format=KFP

In [84]:
from kfp.registry import RegistryClient

host = f"https://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}"
client = RegistryClient(host=host)

TEMPLATE_NAME, VERSION_NAME = client.upload_pipeline(
    file_name=PIPELINE_FILE,
    tags=[VERSION, "latest"],
    extra_headers={"description":"This is an example pipeline template."}
)

TEMPLATE_URI = f"https://{REGION}-kfp.pkg.dev/{PROJECT_ID}/{REPO_NAME}/{TEMPLATE_NAME}/latest"

## Manually Run Pipeline

### Set parameters

In [85]:
DATASET_NAME = "mlops"
TABLE_NAME = "chicago"

worker_pool_specs = [
    {
        "machine_spec": {"machine_type": "e2-highmem-2"},
        "replica_count": 1,
        "python_package_spec": {
            "executor_image_uri": "us-docker.pkg.dev/vertex-ai/training/sklearn-cpu.1-0:latest",
            "package_uris": [f"{BUCKET_URI}/trainer-0.1.tar.gz"],
            "python_module": "trainer.task",
            "args":[
                "--project-id", PROJECT_ID,
                "--data-dir", f"/gcs/{BUCKET_NAME}/{EXPERIMENT_NAME}/{RUN_NAME}/data",
                "--training-dir", f"/gcs/{BUCKET_NAME}/{EXPERIMENT_NAME}/{RUN_NAME}/artifacts",
                "--bq-source", f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}",
                # "--experiment-name", EXPERIMENT_NAME,
            ]
        },
    }
]
pprint(worker_pool_specs)

[{'machine_spec': {'machine_type': 'e2-highmem-2'},
  'python_package_spec': {'args': ['--project-id',
                                   'hybrid-vertex',
                                   '--data-dir',
                                   '/gcs/ct-pipeline-v2/training-v2/run-20241105-131344/data',
                                   '--training-dir',
                                   '/gcs/ct-pipeline-v2/training-v2/run-20241105-131344/artifacts',
                                   '--bq-source',
                                   'hybrid-vertex.mlops.chicago'],
                          'executor_image_uri': 'us-docker.pkg.dev/vertex-ai/training/sklearn-cpu.1-0:latest',
                          'package_uris': ['gs://ct-pipeline-v2/trainer-0.1.tar.gz'],
                          'python_module': 'trainer.task'},
  'replica_count': 1}]


In [86]:
EXISTING_MODEL_BOOL=True # True | False
PARENT_MODEL_ID="192288091023605760"
PARENT_MODEL_URI="gs://ct-pipeline-v2/training-v2/run-20241105-123152/artifacts"

parameters = {
    "project": PROJECT_ID,
    "location": REGION,
    "training_job_display_name": "taxifare-prediction-training-job",
    "worker_pool_specs": worker_pool_specs,
    "base_output_dir": BASE_OUTPUT_DIR,
    "artifacts_dir": ARTIFACTS_DIR,
    "prediction_container_uri": "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest",
    "model_display_name": "taxifare-prediction-model",
    "batch_prediction_job_display_name": "taxifare-prediction-batch-job",
    "target_field_name": "fare",
    "test_data_gcs_uri": [f"{DATA_DIR}/test_no_target.csv"],
    "ground_truth_gcs_source": [f"{DATA_DIR}/test.csv"],
    "batch_predictions_gcs_prefix": f"{BUCKET_URI}/batch_predict_output",
    "eval_display_name": f'eval-{invoke_time}',
    "existing_model": EXISTING_MODEL_BOOL,
}

if EXISTING_MODEL_BOOL:
    parameters["parent_model_resource_name"] = f"projects/{PROJECT_ID}/locations/{REGION}/models/{PARENT_MODEL_ID}"
    parameters["parent_model_artifact_uri"] = PARENT_MODEL_URI


pprint(parameters)

{'artifacts_dir': 'gs://ct-pipeline-v2/training-v2/run-20241105-131344/artifacts',
 'base_output_dir': 'gs://ct-pipeline-v2/training-v2/run-20241105-131344',
 'batch_prediction_job_display_name': 'taxifare-prediction-batch-job',
 'batch_predictions_gcs_prefix': 'gs://ct-pipeline-v2/batch_predict_output',
 'eval_display_name': 'eval-20241105-131344',
 'existing_model': True,
 'ground_truth_gcs_source': ['gs://ct-pipeline-v2/training-v2/run-20241105-131344/data/test.csv'],
 'location': 'us-central1',
 'model_display_name': 'taxifare-prediction-model',
 'parent_model_artifact_uri': 'gs://ct-pipeline-v2/training-v2/run-20241105-123152/artifacts',
 'parent_model_resource_name': 'projects/hybrid-vertex/locations/us-central1/models/192288091023605760',
 'prediction_container_uri': 'us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest',
 'project': 'hybrid-vertex',
 'target_field_name': 'fare',
 'test_data_gcs_uri': ['gs://ct-pipeline-v2/training-v2/run-20241105-131344/data/test_no_ta

## Create and Run pipeline job

In [87]:
# Create a pipeline job
job = aiplatform.PipelineJob(
    display_name=f"{PIPELINE_NAME}-manual",
    template_path=TEMPLATE_URI,
    parameter_values=parameters,
    pipeline_root=PIPELINE_ROOT,
    enable_caching=False,
    failure_policy='fast',
)
# Run the pipeline job
# job.run(sync=False)

job.submit(
    experiment=EXPERIMENT_NAME,
)

Creating PipelineJob
PipelineJob created. Resource name: projects/934903580331/locations/us-central1/pipelineJobs/ct-training-20241105131430
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/934903580331/locations/us-central1/pipelineJobs/ct-training-20241105131430')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/ct-training-20241105131430?project=934903580331
Associating projects/934903580331/locations/us-central1/pipelineJobs/ct-training-20241105131430 to Experiment: training-v2


# Inspect deployed models 

In [69]:
my_model = aiplatform.Model(
    model_name="projects/934903580331/locations/us-central1/models/7404802894257455104@2"
)
my_model

<google.cloud.aiplatform.models.Model object at 0x7fce75dda140> 
resource name: projects/934903580331/locations/us-central1/models/7404802894257455104

In [70]:
my_evaluations = my_model.list_model_evaluations()
my_evaluations

[]

In [71]:
my_model.resource_name

'projects/934903580331/locations/us-central1/models/7404802894257455104'

In [72]:
from google.cloud.aiplatform import gapic

# metrics = {"logLoss": 1.4, "auPrc": 0.85}
# print(metrics)

# model_eval = gapic.ModelEvaluation(
#     display_name="eval",
#     metrics_schema_uri="gs://google-cloud-aiplatform/schema/modelevaluation/classification_metrics_1.0.0.yaml",
#     metrics=metrics,
# )


metrics = {
    "rootMeanSquaredError": 2.019573,
    "rSquared": 0.9765769,
    "meanAbsoluteError": 0.7664642, 
    "meanAbsolutePercentageError": 5.1649256,
    "rootMeanSquaredLogError": 0.093754485
,
}
print(metrics)

model_eval = gapic.ModelEvaluation(
    display_name="eval-sdk",
    metrics_schema_uri="gs://google-cloud-aiplatform/schema/modelevaluation/regression_metrics_1.0.0.yaml",
    metrics=metrics,
)

model_eval

{'rootMeanSquaredError': 2.019573, 'rSquared': 0.9765769, 'meanAbsoluteError': 0.7664642, 'meanAbsolutePercentageError': 5.1649256, 'rootMeanSquaredLogError': 0.093754485}


display_name: "eval-sdk"
metrics_schema_uri: "gs://google-cloud-aiplatform/schema/modelevaluation/regression_metrics_1.0.0.yaml"
metrics {
  struct_value {
    fields {
      key: "rootMeanSquaredLogError"
      value {
        number_value: 0.093754485
      }
    }
    fields {
      key: "rootMeanSquaredError"
      value {
        number_value: 2.019573
      }
    }
    fields {
      key: "rSquared"
      value {
        number_value: 0.9765769
      }
    }
    fields {
      key: "meanAbsolutePercentageError"
      value {
        number_value: 5.1649256
      }
    }
    fields {
      key: "meanAbsoluteError"
      value {
        number_value: 0.7664642
      }
    }
  }
}

### Upload the evaluation metrics to the Model Registry

In [73]:
API_ENDPOINT = f"{REGION}-aiplatform.googleapis.com"
client = gapic.ModelServiceClient(client_options={"api_endpoint": API_ENDPOINT})
client

<google.cloud.aiplatform_v1.services.model_service.client.ModelServiceClient at 0x7fce75ddad10>

In [74]:
client.import_model_evaluation(parent=my_model.resource_name, model_evaluation=model_eval)

name: "projects/934903580331/locations/us-central1/models/7404802894257455104/evaluations/3687972264888737838"
display_name: "eval-sdk"
metrics_schema_uri: "gs://google-cloud-aiplatform/schema/modelevaluation/regression_metrics_1.0.0.yaml"
metrics {
  struct_value {
    fields {
      key: "rootMeanSquaredLogError"
      value {
        number_value: 0.093754485
      }
    }
    fields {
      key: "rootMeanSquaredError"
      value {
        number_value: 2.019573
      }
    }
    fields {
      key: "rSquared"
      value {
        number_value: 0.9765769
      }
    }
    fields {
      key: "meanAbsolutePercentageError"
      value {
        number_value: 5.1649256
      }
    }
    fields {
      key: "meanAbsoluteError"
      value {
        number_value: 0.7664642
      }
    }
  }
}

In [75]:
my_evaluations = my_model.list_model_evaluations()
my_evaluations

[]

In [67]:
model_registry = aiplatform.models.ModelRegistry(model="projects/934903580331/locations/us-central1/models/7404802894257455104")
model_version_info = model_registry.get_version_info(version="1")
model_version_info

Getting version 1 info for projects/934903580331/locations/us-central1/models/7404802894257455104


VersionInfo(version_id='1', version_create_time=DatetimeWithNanoseconds(2024, 11, 5, 3, 53, 2, 910944, tzinfo=datetime.timezone.utc), version_update_time=DatetimeWithNanoseconds(2024, 11, 5, 3, 53, 4, 865125, tzinfo=datetime.timezone.utc), model_display_name='taxifare-prediction-model', model_resource_name='projects/934903580331/locations/us-central1/models/7404802894257455104', version_aliases=['default'], version_description='')