# MLOps End to End Workflow (II)


## CICD:

3. Deployment of the Vertex AI Pipeline through a CI/CD process
4. Deployment of a Continuous Training pipeline that can be triggered via Pub/Sub and produces a model in the Model Registry
5. Deployment of the Inference Pipeline consisting of a Cloud Function that retrieves features from Feature Store and calls the model on a Vertex AI Endpoint
6. Deployment of the model to a Vertex AI Endpoint through a CI/CD process.

## Prediction:

7. Deploy the model to an endpoint
8. Create a test prediction


### Configuration

In [7]:
#%load_ext autoreload
#%autoreload 2

import os
import pandas as pd
import tensorflow as tf
import tensorflow_data_validation as tfdv
from google.cloud import bigquery
import matplotlib.pyplot as plt

from google.cloud import aiplatform as vertex_ai

import yaml
import os

In [8]:
with open('mainconfig.yaml') as f:
    main_config = yaml.safe_load(f)

# select your config    
main_config = main_config['creditcards']

In [9]:
PROJECT = main_config['project'] 
REGION = main_config['region'] 
DOCKER_REPO = main_config['docker_repo']

SERVICE_ACCOUNT = main_config['service_account']

# BigQuery and data locations

BQ_SOURCE_TABLE= main_config['bq']['source_table'] # raw input
ML_TABLE = main_config['bq']['ml_table'] # the one we will use for the training

BQ_DATASET_NAME = main_config['bq']['dataset']
BQ_LOCATION = main_config['bq']['location'] # multiregion provides more resilience

VERTEX_DATASET_NAME = main_config['vertex_dataset_name']

RAW_SCHEMA_DIR = main_config['raw_schema_dir']

BUCKET =  main_config['bucket']

# TFX and model config

# model version
VERSION = main_config['version']


MODEL_DISPLAY_NAME = f'{VERTEX_DATASET_NAME}-classifier-{VERSION}'
WORKSPACE = f'gs://{BUCKET}/{VERTEX_DATASET_NAME}'

MLMD_SQLLITE = 'mlmd.sqllite'
ARTIFACT_STORE = os.path.join(WORKSPACE, 'tfx_artifacts_interactive')
MODEL_REGISTRY = os.path.join(WORKSPACE, 'model_registry')
PIPELINE_NAME = f'{MODEL_DISPLAY_NAME}-train-pipeline'
PIPELINE_ROOT = os.path.join(ARTIFACT_STORE, PIPELINE_NAME)

ENDPOINT_DISPLAY_NAME = f'{VERTEX_DATASET_NAME}-classifier'

FEATURESTORE_ID = main_config['featurestore_id']

CF_REGION = main_config['cloudfunction_region']

DATAFLOW_SUBNETWORK = f"https://www.googleapis.com/compute/v1/projects/{PROJECT}/regions/{REGION}/subnetworks/{main_config['dataflow']['subnet']}"
DATAFLOW_SERVICE_ACCOUNT = main_config['dataflow']['service_account']

CLOUDBUILD_SA = f'projects/{PROJECT}/serviceAccounts/{SERVICE_ACCOUNT}'

LIMIT=main_config['limit']

In [11]:
print("Project ID:", PROJECT)
print("Region:", REGION)
print("Service Account:", SERVICE_ACCOUNT)

vertex_ai.init(
    project=PROJECT,
    location=REGION
)

Project ID: student-mlops5
Region: us-central1
Service Account: 743451655808-compute@developer.gserviceaccount.com


## Unit Testing

In [12]:
os.environ["VERTEX_DATASET_NAME"] = VERTEX_DATASET_NAME
os.environ["MODEL_DISPLAY_NAME"] = MODEL_DISPLAY_NAME
os.environ["PIPELINE_NAME"] = PIPELINE_NAME
os.environ["PROJECT"] = PROJECT
os.environ['GOOGLE_CLOUD_PROJECT'] = PROJECT
os.environ["REGION"] = REGION
os.environ["GCS_LOCATION"] = f"gs://{BUCKET}/{VERTEX_DATASET_NAME}"
os.environ["MODEL_REGISTRY_URI"] = os.path.join(os.environ["GCS_LOCATION"], "model_registry")
os.environ["TRAIN_LIMIT"] = "8000"
os.environ["TEST_LIMIT"] = "8000"
os.environ["BEAM_RUNNER"] = "DataflowRunner"
os.environ["TRAINING_RUNNER"] = "vertex"
os.environ["DATAFLOW_IMAGE_URI"] = f"{DOCKER_REPO}/dataflow:latest"
os.environ["TFX_IMAGE_URI"] = f"{DOCKER_REPO}/vertex:latest"
os.environ["ENABLE_CACHE"] = "1"
os.environ["SUBNETWORK"] = DATAFLOW_SUBNETWORK
os.environ["SERVICE_ACCOUNT"] = DATAFLOW_SERVICE_ACCOUNT
os.environ["BQ_LOCATION"] = BQ_LOCATION
os.environ["BQ_DATASET_NAME"] = BQ_DATASET_NAME
os.environ["ML_TABLE"] = ML_TABLE
os.environ["GCS_LOCATION"] = f"gs://{BUCKET}/{VERTEX_DATASET_NAME}/e2e_tests"
os.environ["SUBNETWORK"] = DATAFLOW_SUBNETWORK

In [13]:
os.environ["UPLOAD_MODEL"] = "0"
os.environ["ACCURACY_THRESHOLD"] = "-0.1"    # NB Negative accuracy threshold makes no sense - allows everything
os.environ["BEAM_RUNNER"] = "DirectRunner"
os.environ["TRAINING_RUNNER"] = "local"

In [14]:
from src.tfx_pipelines import config
import importlib
importlib.reload(config)

for key, value in config.__dict__.items():
    if key.isupper(): print(f'{key}: {value}')

PROJECT: student-mlops5
REGION: us-central1
GCS_LOCATION: gs://student-mlops5/creditcards/e2e_tests
DOCKER_REPO_NAME: docker-repo
ARTIFACT_STORE_URI: gs://student-mlops5/creditcards/e2e_tests/tfx_artifacts
MODEL_REGISTRY_URI: gs://student-mlops5/creditcards/model_registry
VERTEX_DATASET_NAME: creditcards
MODEL_DISPLAY_NAME: creditcards-classifier-v02
PIPELINE_NAME: creditcards-classifier-v02-train-pipeline
ML_USE_COLUMN: ml_use
EXCLUDE_COLUMNS: trip_start_timestamp
TRAIN_LIMIT: 8000
TEST_LIMIT: 8000
SERVE_LIMIT: 0
NUM_TRAIN_SPLITS: 4
NUM_EVAL_SPLITS: 1
ACCURACY_THRESHOLD: -0.1
USE_KFP_SA: False
TFX_IMAGE_URI: us-central1-docker.pkg.dev/student-mlops5/creditcards/vertex:latest
DATAFLOW_IMAGE_URI: us-central1-docker.pkg.dev/student-mlops5/creditcards/dataflow:latest
BEAM_RUNNER: DirectRunner
SERVICE_ACCOUNT: 743451655808-compute@developer.gserviceaccount.com
SUBNETWORK: https://www.googleapis.com/compute/v1/projects/student-mlops5/regions/us-central1/subnetworks/${subnetwork}
BEAM_DIRECT

In [15]:
!python -m pytest src/tests/datasource_utils_tests.py -s

platform linux -- Python 3.7.12, pytest-7.1.2, pluggy-1.2.0
rootdir: /home/jupyter/rafal
plugins: anyio-3.7.1, typeguard-2.13.3
collected 2 items                                                              [0m[1m

src/tests/datasource_utils_tests.py BigQuery Source: student-mlops5.creditcards.creditcards_ml
[32m.[0mBigQuery Source: student-mlops5.creditcards.creditcards_ml
[32m.[0m

../../../opt/conda/lib/python3.7/site-packages/google/rpc/__init__.py:18
    import pkg_resources

  Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
    declare_namespace(pkg)

  Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
    declare_namespace(pkg)

../.local/lib/python3.7/si

In [16]:
!python -m pytest src/tests/model_tests.py -s

platform linux -- Python 3.7.12, pytest-7.1.2, pluggy-1.2.0
rootdir: /home/jupyter/rafal
plugins: anyio-3.7.1, typeguard-2.13.3
collected 2 items                                                              [0m[1m

src/tests/model_tests.py [32m.[0m2024-02-23 08:52:04.202454: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/local/cuda/lib64:/usr/local/nccl2/lib:/usr/local/cuda/extras/CUPTI/lib64
2024-02-23 08:52:04.202507: W tensorflow/stream_executor/cuda/cuda_driver.cc:269] failed call to cuInit: UNKNOWN ERROR (303)
2024-02-23 08:52:04.202551: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (instance-20240206-073850): /proc/driver/nvidia/version does not exist
2024-02-23 08:52:04.203009: I tensorflow/core/platform/cpu_feature_guard.cc:151] This Tensor

#### End to end pipeline unit test

In [17]:
!python -m pytest src/tests/pipeline_deployment_tests.py::test_e2e_pipeline -s

platform linux -- Python 3.7.12, pytest-7.1.2, pluggy-1.2.0
rootdir: /home/jupyter/rafal
plugins: anyio-3.7.1, typeguard-2.13.3
collected 1 item                                                               [0m[1m

src/tests/pipeline_deployment_tests.py upload_model: 0
Pipeline e2e test artifacts stored in: gs://student-mlops5/creditcards/e2e_tests
ML metadata store is ready.
Using dataset creditcards
Excluding no splits because exclude_splits is not set.
Excluding no splits because exclude_splits is not set.
Labels for model: {"dataset_name": "creditcards", "pipeline_name": "creditcards-classifier-v02-train-pipeline", "pipeline_root": "gs://student-mlops5/creditcards/e2e_tests/tfx_artifacts/credit"}
Pipeline components: ['HyperparamsGen', 'TrainDataGen', 'TestDataGen', 'StatisticsGen', 'SchemaImporter', 'ExampleValidator', 'DataTransformer', 'WarmstartModelResolver', 'ModelTrainer', 'BaselineModelResolver', 'ModelEvaluator', 'GcsModelPusher']
Beam pipeline args: ['--project=student-

## Deploy to Vertex AI Pipelines

In [18]:
config.BEAM_RUNNER

'DirectRunner'

In [19]:
from src.tfx_pipelines import config
import importlib

importlib.reload(config)

for key, value in config.__dict__.items():
    if key.isupper(): print(f'{key}: {value}')

PROJECT: student-mlops5
REGION: us-central1
GCS_LOCATION: gs://student-mlops5/creditcards/e2e_tests
DOCKER_REPO_NAME: docker-repo
ARTIFACT_STORE_URI: gs://student-mlops5/creditcards/e2e_tests/tfx_artifacts
MODEL_REGISTRY_URI: gs://student-mlops5/creditcards/model_registry
VERTEX_DATASET_NAME: creditcards
MODEL_DISPLAY_NAME: creditcards-classifier-v02
PIPELINE_NAME: creditcards-classifier-v02-train-pipeline
ML_USE_COLUMN: ml_use
EXCLUDE_COLUMNS: trip_start_timestamp
TRAIN_LIMIT: 8000
TEST_LIMIT: 8000
SERVE_LIMIT: 0
NUM_TRAIN_SPLITS: 4
NUM_EVAL_SPLITS: 1
ACCURACY_THRESHOLD: -0.1
USE_KFP_SA: False
TFX_IMAGE_URI: us-central1-docker.pkg.dev/student-mlops5/creditcards/vertex:latest
DATAFLOW_IMAGE_URI: us-central1-docker.pkg.dev/student-mlops5/creditcards/dataflow:latest
BEAM_RUNNER: DirectRunner
SERVICE_ACCOUNT: 743451655808-compute@developer.gserviceaccount.com
SUBNETWORK: https://www.googleapis.com/compute/v1/projects/student-mlops5/regions/us-central1/subnetworks/${subnetwork}
BEAM_DIRECT

### Create Repo for Images

In [20]:
# Repo should has been created in the Terraform automation stage
! gcloud artifacts repositories create {VERTEX_DATASET_NAME} --location={REGION} --repository-format=docker

Create request issued for: [creditcards]
Waiting for operation [projects/student-mlops5/locations/us-central1/operations
/64030d34-de2d-4809-96fa-e6fd9714b7fc] to complete...done.                     
Created repository [creditcards].


To take a quick anonymous survey, run:
  $ gcloud survey



### Build Dataflow Worker Image

In [21]:
# You can also use build/Dockerfile.dataflow in case Internet access is not allowed
!cp build/Dockerfile.dataflow Dockerfile

In [22]:
os.environ["DOCKER_REPO"] = f"{DOCKER_REPO}/vertex:latest"

In [23]:
!gcloud builds submit --project=$PROJECT --billing-project=$PROJECT --region $REGION --tag $DOCKER_REPO/dataflow:latest . --timeout=15m --machine-type=e2-highcpu-8 --suppress-logs

Creating temporary tarball archive of 65 file(s) totalling 891.0 KiB before compression.
Some files were not included in the source upload.

Check the gcloud log [/home/jupyter/.config/gcloud/logs/2024.02.23/09.14.04.808060.log] to see which files and the contents of the
default gcloudignore file used (see `$ gcloud topic gcloudignore` to learn
more).

Uploading tarball of [.] to [gs://student-mlops5_cloudbuild/source/1708679645.095858-90c9fc32fda44a41afcd2c69da4008e9.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/student-mlops5/locations/us-central1/builds/e0e9b972-1427-4834-803c-8be0b629e87e].
Logs are available at [ https://console.cloud.google.com/cloud-build/builds;region=us-central1/e0e9b972-1427-4834-803c-8be0b629e87e?project=743451655808 ].
ID                                    CREATE_TIME                DURATION  SOURCE                                                                                        IMAGES                                                     

### Build Vertex worker image

In [24]:
!echo $TFX_IMAGE_URI

us-central1-docker.pkg.dev/student-mlops5/creditcards/vertex:latest


In [25]:
!echo $PYTHONPATH




In [26]:
!cp build/Dockerfile.vertex Dockerfile

In [27]:
!gcloud builds submit --project=$PROJECT --billing-project=$PROJECT --region $REGION --tag $TFX_IMAGE_URI . --timeout=15m --machine-type=e2-highcpu-8 --suppress-logs

Creating temporary tarball archive of 67 file(s) totalling 893.2 KiB before compression.
Some files were not included in the source upload.

Check the gcloud log [/home/jupyter/.config/gcloud/logs/2024.02.23/09.28.26.182880.log] to see which files and the contents of the
default gcloudignore file used (see `$ gcloud topic gcloudignore` to learn
more).

Uploading tarball of [.] to [gs://student-mlops5_cloudbuild/source/1708680506.363734-bd93d9afec1e46fcaae0dfdfee9049a6.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/student-mlops5/locations/us-central1/builds/0dfa5f9d-266e-4f35-a2aa-6bbad645b274].
Logs are available at [ https://console.cloud.google.com/cloud-build/builds;region=us-central1/0dfa5f9d-266e-4f35-a2aa-6bbad645b274?project=743451655808 ].
ID                                    CREATE_TIME                DURATION  SOURCE                                                                                        IMAGES                                                     

### Compile the pipeline

In [28]:
import importlib, sys
importlib.reload(sys.modules['src.tfx_pipelines'])

<module 'src.tfx_pipelines' (namespace)>

In [29]:
from src.tfx_pipelines import config, runner

pipeline_definition_file = f'{config.PIPELINE_NAME}.json'
pipeline_definition = runner.compile_training_pipeline(pipeline_definition_file)

Labels for model: {"dataset_name": "creditcards", "pipeline_name": "creditcards-classifier-v02-train-pipeline", "pipeline_root": "gs://student-mlops5/creditcards/e2e_tests/tfx_artifacts/credit"}
Beam pipeline args: ['--project=student-mlops5', '--temp_location=gs://student-mlops5/creditcards/e2e_tests/temp']
running bdist_wheel
running build
running build_py
creating build
creating build/lib
copying transformations.py -> build/lib
installing to /var/tmp/tmp6hicryi8
running install
running install_lib
copying build/lib/transformations.py -> /var/tmp/tmp6hicryi8
running install_egg_info
running egg_info
creating tfx_user_code_DataTransformer.egg-info
writing tfx_user_code_DataTransformer.egg-info/PKG-INFO
writing dependency_links to tfx_user_code_DataTransformer.egg-info/dependency_links.txt
writing top-level names to tfx_user_code_DataTransformer.egg-info/top_level.txt
writing manifest file 'tfx_user_code_DataTransformer.egg-info/SOURCES.txt'
reading manifest file 'tfx_user_code_DataTra

!!

        ********************************************************************************
        Please avoid running ``setup.py`` directly.
        Instead, use pypa/build, pypa/installer or other
        standards-based tools.

        See https://blog.ganssle.io/articles/2021/10/setup-py-deprecated.html for details.
        ********************************************************************************

!!
  self.initialize_options()


running bdist_wheel
running build
running build_py
creating build
creating build/lib
copying defaults.py -> build/lib
copying runner.py -> build/lib
copying model.py -> build/lib
copying data.py -> build/lib
copying exporter.py -> build/lib
copying trainer.py -> build/lib
installing to /var/tmp/tmp4wxxi92x
running install
running install_lib
copying build/lib/exporter.py -> /var/tmp/tmp4wxxi92x
copying build/lib/runner.py -> /var/tmp/tmp4wxxi92x
copying build/lib/data.py -> /var/tmp/tmp4wxxi92x
copying build/lib/model.py -> /var/tmp/tmp4wxxi92x
copying build/lib/defaults.py -> /var/tmp/tmp4wxxi92x
copying build/lib/trainer.py -> /var/tmp/tmp4wxxi92x
running install_egg_info
running egg_info
creating tfx_user_code_ModelTrainer.egg-info
writing tfx_user_code_ModelTrainer.egg-info/PKG-INFO
writing dependency_links to tfx_user_code_ModelTrainer.egg-info/dependency_links.txt
writing top-level names to tfx_user_code_ModelTrainer.egg-info/top_level.txt
writing manifest file 'tfx_user_code_Mod

!!

        ********************************************************************************
        Please avoid running ``setup.py`` directly.
        Instead, use pypa/build, pypa/installer or other
        standards-based tools.

        See https://blog.ganssle.io/articles/2021/10/setup-py-deprecated.html for details.
        ********************************************************************************

!!
  self.initialize_options()


In [30]:
PIPELINES_STORE = f"gs://{BUCKET}/{VERTEX_DATASET_NAME}/compiled_pipelines/"
!gsutil cp {pipeline_definition_file} {PIPELINES_STORE}
PIPELINES_STORE

Copying file://creditcards-classifier-v02-train-pipeline.json [Content-Type=application/json]...
/ [1 files][ 24.6 KiB/ 24.6 KiB]                                                
Operation completed over 1 objects/24.6 KiB.                                     


'gs://student-mlops5/creditcards/compiled_pipelines/'

### Submit Vertex AI Pipelines run

In [31]:
pipeline_definition_file

'creditcards-classifier-v02-train-pipeline.json'

In [32]:
SERVICE_ACCOUNT

'743451655808-compute@developer.gserviceaccount.com'

In [33]:
from google.cloud.aiplatform import pipeline_jobs
    
job = pipeline_jobs.PipelineJob(template_path = pipeline_definition_file,
                                display_name=VERTEX_DATASET_NAME,
                                enable_caching=False,
                                parameter_values={
                                    'learning_rate': 0.003,
                                    'batch_size': 512,
                                    'hidden_units': '128,128',
                                    'num_epochs': 15,
                                })

job.run(sync=False, service_account=SERVICE_ACCOUNT)

Creating PipelineJob


INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob


PipelineJob created. Resource name: projects/743451655808/locations/us-central1/pipelineJobs/creditcards-classifier-v02-train-pipeline-20240223100802


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/743451655808/locations/us-central1/pipelineJobs/creditcards-classifier-v02-train-pipeline-20240223100802


To use this PipelineJob in another session:


INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:


pipeline_job = aiplatform.PipelineJob.get('projects/743451655808/locations/us-central1/pipelineJobs/creditcards-classifier-v02-train-pipeline-20240223100802')


INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/743451655808/locations/us-central1/pipelineJobs/creditcards-classifier-v02-train-pipeline-20240223100802')


View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/creditcards-classifier-v02-train-pipeline-20240223100802?project=743451655808


INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/creditcards-classifier-v02-train-pipeline-20240223100802?project=743451655808


PipelineJob projects/743451655808/locations/us-central1/pipelineJobs/creditcards-classifier-v02-train-pipeline-20240223100802 current state:
PipelineState.PIPELINE_STATE_RUNNING


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/743451655808/locations/us-central1/pipelineJobs/creditcards-classifier-v02-train-pipeline-20240223100802 current state:
PipelineState.PIPELINE_STATE_RUNNING


PipelineJob projects/743451655808/locations/us-central1/pipelineJobs/creditcards-classifier-v02-train-pipeline-20240223100802 current state:
PipelineState.PIPELINE_STATE_RUNNING


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/743451655808/locations/us-central1/pipelineJobs/creditcards-classifier-v02-train-pipeline-20240223100802 current state:
PipelineState.PIPELINE_STATE_RUNNING


PipelineJob projects/743451655808/locations/us-central1/pipelineJobs/creditcards-classifier-v02-train-pipeline-20240223100802 current state:
PipelineState.PIPELINE_STATE_RUNNING


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/743451655808/locations/us-central1/pipelineJobs/creditcards-classifier-v02-train-pipeline-20240223100802 current state:
PipelineState.PIPELINE_STATE_RUNNING


PipelineJob projects/743451655808/locations/us-central1/pipelineJobs/creditcards-classifier-v02-train-pipeline-20240223100802 current state:
PipelineState.PIPELINE_STATE_RUNNING


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/743451655808/locations/us-central1/pipelineJobs/creditcards-classifier-v02-train-pipeline-20240223100802 current state:
PipelineState.PIPELINE_STATE_RUNNING


PipelineJob projects/743451655808/locations/us-central1/pipelineJobs/creditcards-classifier-v02-train-pipeline-20240223100802 current state:
PipelineState.PIPELINE_STATE_RUNNING


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/743451655808/locations/us-central1/pipelineJobs/creditcards-classifier-v02-train-pipeline-20240223100802 current state:
PipelineState.PIPELINE_STATE_RUNNING


PipelineJob projects/743451655808/locations/us-central1/pipelineJobs/creditcards-classifier-v02-train-pipeline-20240223100802 current state:
PipelineState.PIPELINE_STATE_RUNNING


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/743451655808/locations/us-central1/pipelineJobs/creditcards-classifier-v02-train-pipeline-20240223100802 current state:
PipelineState.PIPELINE_STATE_RUNNING


PipelineJob projects/743451655808/locations/us-central1/pipelineJobs/creditcards-classifier-v02-train-pipeline-20240223100802 current state:
PipelineState.PIPELINE_STATE_RUNNING


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/743451655808/locations/us-central1/pipelineJobs/creditcards-classifier-v02-train-pipeline-20240223100802 current state:
PipelineState.PIPELINE_STATE_RUNNING


## Deploy Continuous Training Pipeline ("CI/CD")


### Build CI/CD image

In [34]:
CICD_IMAGE_URI = f"{DOCKER_REPO}/cicd:latest"
CICD_IMAGE_URI

'us-central1-docker.pkg.dev/student-mlops5/creditcards/cicd:latest'

In [None]:
!cp build/Dockerfile.cicd-tfx build/Dockerfile
!gcloud builds submit --project=$PROJECT --billing-project=$PROJECT --region $REGION --tag $CICD_IMAGE_URI build/. --timeout=15m --machine-type=e2-highcpu-8 --suppress-logs


Creating temporary tarball archive of 13 file(s) totalling 19.2 KiB before compression.
Uploading tarball of [build/.] to [gs://student-mlops5_cloudbuild/source/1708683606.770518-3aa3ac2e5cfa4204adb23b3aa7cd5e7b.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/student-mlops5/locations/us-central1/builds/45e85e98-b378-4a6d-b39e-b7583933f60b].
Logs are available at [ https://console.cloud.google.com/cloud-build/builds;region=us-central1/45e85e98-b378-4a6d-b39e-b7583933f60b?project=743451655808 ].
PipelineJob projects/743451655808/locations/us-central1/pipelineJobs/creditcards-classifier-v02-train-pipeline-20240223100802 current state:
PipelineState.PIPELINE_STATE_RUNNING


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/743451655808/locations/us-central1/pipelineJobs/creditcards-classifier-v02-train-pipeline-20240223100802 current state:
PipelineState.PIPELINE_STATE_RUNNING


### Automate the deployment of the Training Pipeline using Cloud Build
***Important*** you should commit the code to the git repo since the next build process will checkout the code from the repo.

In [None]:
REPO_URL

In [None]:
os.environ['project_id'] = PROJECT

In [None]:
REPO_NAME

In [None]:
REPO_URL = main_config['git']['repo_url']
BRANCH = main_config['git']['branch']


GCS_LOCATION = f"gs://{BUCKET}/{VERTEX_DATASET_NAME}/"
TEST_GCS_LOCATION = f"gs://{BUCKET}/{VERTEX_DATASET_NAME}/e2e_tests"
CI_TRAIN_LIMIT = 1000
CI_TEST_LIMIT = 100
CI_UPLOAD_MODEL = 0
CI_ACCURACY_THRESHOLD = -0.1 # again setting accuracy threshold to negative
BEAM_RUNNER = "DataflowRunner"
TRAINING_RUNNER = "vertex"
VERSION = 'latest'
PIPELINE_NAME = f'{MODEL_DISPLAY_NAME}-train-pipeline'
PIPELINES_STORE = f"{GCS_LOCATION}compiled_pipelines/"

TFX_IMAGE_URI = f"{DOCKER_REPO}/vertex:{VERSION}"
DATAFLOW_IMAGE_URI = f"{DOCKER_REPO}/dataflow:latest"

REPO_NAME = REPO_URL.split('/')[-1][:-4]
DESCR=f'"Deploy train pipeline to GCS from {BRANCH}"'


SUBSTITUTIONS=f"""\
_REPO_URL='{REPO_URL}',\
_BRANCH={BRANCH},\
_CICD_IMAGE_URI={CICD_IMAGE_URI},\
_PROJECT={PROJECT},\
_REGION={REGION},\
_GCS_LOCATION={GCS_LOCATION},\
_TEST_GCS_LOCATION={TEST_GCS_LOCATION},\
_BQ_LOCATION={BQ_LOCATION},\
_BQ_DATASET_NAME={BQ_DATASET_NAME},\
_ML_TABLE={ML_TABLE},\
_VERTEX_DATASET_NAME={VERTEX_DATASET_NAME},\
_MODEL_DISPLAY_NAME={MODEL_DISPLAY_NAME},\
_CI_TRAIN_LIMIT={CI_TRAIN_LIMIT},\
_CI_TEST_LIMIT={CI_TEST_LIMIT},\
_CI_UPLOAD_MODEL={CI_UPLOAD_MODEL},\
_CI_ACCURACY_THRESHOLD={CI_ACCURACY_THRESHOLD},\
_BEAM_RUNNER={BEAM_RUNNER},\
_TRAINING_RUNNER={TRAINING_RUNNER},\
_DATAFLOW_IMAGE_URI={DATAFLOW_IMAGE_URI},\
_TFX_IMAGE_URI={TFX_IMAGE_URI},\
_PIPELINE_NAME={PIPELINE_NAME},\
_PIPELINES_STORE={PIPELINES_STORE},\
_SUBNETWORK={DATAFLOW_SUBNETWORK},\
_GCS_BUCKET={BUCKET}/cloudbuild,\
_SERVICE_ACCOUNT={DATAFLOW_SERVICE_ACCOUNT},\
_WORKDIR={REPO_NAME}\
"""
!echo $SUBSTITUTIONS

In [None]:
!gcloud builds submit build/known_hosts.github.zip --config build/pipeline-deployment-tfx.yaml --substitutions {SUBSTITUTIONS} --project=$PROJECT --billing-project=$PROJECT --region $REGION --suppress-logs

### (Optional for Cloud Sources Repositories) Define the trigger that will deploy the pipeline after a commit

In [None]:
!echo gcloud beta builds triggers create cloud-source-repositories --repo={REPO_NAME} --branch-pattern=^{BRANCH}$ --description={DESCR} --build-config=mlops-creditcard/build/pipeline-deployment-tfx.yaml --substitutions={SUBSTITUTIONS} --billing-project={PROJECT}  --service-account={TRIGGER_SA}

### Set up the trigger for the Training Pipeline


In [None]:
PUBSUB_TOPIC = f'trigger-{PIPELINE_NAME}'
CLOUD_FUNCTION_NAME = f'trigger-{PIPELINE_NAME}-fn'
GCS_PIPELINE_FILE_LOCATION = os.path.join(PIPELINES_STORE, f'{PIPELINE_NAME}.json')

#### Create Pub/Sub Topic

In [None]:
!gcloud pubsub topics create {PUBSUB_TOPIC}

#### Deploy Cloud Function

In [None]:
CF_REGION = 'us-central1'

In [None]:
ENV_VARS=f"""\
PROJECT={PROJECT},\
REGION={REGION},\
GCS_PIPELINE_FILE_LOCATION={GCS_PIPELINE_FILE_LOCATION},\
SERVICE_ACCOUNT={SERVICE_ACCOUNT},\
PIPELINE_NAME={PIPELINE_NAME}
"""

!echo {ENV_VARS}

In [None]:
!rm -rf src/pipeline_triggering/.ipynb_checkpoints

In [None]:
!gcloud functions deploy {CLOUD_FUNCTION_NAME} --gen2 \
    --region={CF_REGION} \
    --trigger-topic={PUBSUB_TOPIC} \
    --runtime=python38 \
    --source=src/pipeline_triggering\
    --entry-point=trigger_pipeline\
    --stage-bucket={BUCKET}\
    --ingress-settings=internal-only\
    --service-account={SERVICE_ACCOUNT}\
    --update-env-vars={ENV_VARS} 

#### Test triggering the pipeline with a Pub/Sub message

In [None]:
from google.cloud import pubsub
import json

publish_client = pubsub.PublisherClient()
topic = f'projects/{PROJECT}/topics/{PUBSUB_TOPIC}'
data = {
    'num_epochs': 7,
    'learning_rate': 0.0015,
    'batch_size': 512,
    'hidden_units': '256,126'
}
message = json.dumps(data)

_ = publish_client.publish(topic, message.encode())

## Deploy the model




### Preparation

#### Vertex AI Endpoint

In [None]:
ENDPOINT_DISPLAY_NAME

In [None]:
from build.utils import create_endpoint

endpoint = create_endpoint(PROJECT, REGION, ENDPOINT_DISPLAY_NAME)
endpoint

### Model Deployment Pipeline

#### Run the model artifact testing



In [None]:
os.environ["PROJECT"] = PROJECT
os.environ["REGION"] = REGION
os.environ["SERVICE_ACCOUNT"] = SERVICE_ACCOUNT
os.environ['ENDPOINT_DISPLAY_NAME'] = ENDPOINT_DISPLAY_NAME
os.environ["MODEL_DISPLAY_NAME"] = MODEL_DISPLAY_NAME

In [None]:
!python -m pytest src/tests/model_deployment_tests.py::test_model_artifact -s

#### Deploy Model to Endpoint

In [None]:
!python build/utils.py \
    --mode=deploy-model\
    --project={PROJECT}\
    --region={REGION}\
    --endpoint-display-name={ENDPOINT_DISPLAY_NAME}\
    --model-display-name={MODEL_DISPLAY_NAME}

#### Test model on Endpoint

In [None]:
!python -m pytest src/tests/model_deployment_tests.py::test_model_endpoint

#### Run the pipeline

In [None]:
REPO_URL = main_config['git']['repo_url']
BRANCH = main_config['git']['branch']
REPO_NAME = REPO_URL.split('/')[-1]
CICD_IMAGE_URI = f"{DOCKER_REPO}/cicd:latest"

In [None]:
SUBSTITUTIONS=f"""\
_REPO_URL='{REPO_URL}',\
_BRANCH={BRANCH},\
_CICD_IMAGE_URI={CICD_IMAGE_URI},\
_PROJECT={PROJECT},\
_REGION={REGION},\
_MODEL_DISPLAY_NAME={MODEL_DISPLAY_NAME},\
_ENDPOINT_DISPLAY_NAME={ENDPOINT_DISPLAY_NAME},\
_GCS_BUCKET={BUCKET}/cloudbuild,\
_SERVICE_ACCOUNT={SERVICE_ACCOUNT},\
_WORKDIR={REPO_NAME}\
"""

SUBSTITUTIONS

### Test the build and define a manual trigger

In [None]:
!echo gcloud builds submit --no-source --config build/model-deployment.yaml --substitutions {SUBSTITUTIONS} --billing-project {PROJECT} --suppress-logs --async

In [None]:
DESCR=f'"Deploy model from branch {BRANCH}"'

In [None]:
!echo gcloud alpha builds triggers create manual --repo={REPO_URL} --repo-type=CLOUD_SOURCE_REPOSITORIES --branch={BRANCH} --description={DESCR} --build-config=mlops-creditcard/build/model-deployment.yaml --substitutions={SUBSTITUTIONS} --billing-project={PROJECT} --service-account={CLOUDBUILD_SA}

## Deploy Prediction Cloud Function



In [None]:
endpoints = vertex_ai.Endpoint.list(
    filter=f'display_name={ENDPOINT_DISPLAY_NAME}',
    order_by="update_time"
)

if len(endpoints) == 0:
    print(f'No endpoints found with name {ENDPOINT_DISPLAY_NAME}')
endpoint = endpoints[-1]

os.environ['ENDPOINT_NAME'] = endpoint.name

entity = 'user'
os.environ['ENTITY'] = entity
os.environ['FEATURESTORE_ID'] = FEATURESTORE_ID

PREDICT_CLOUD_FUNCTION_NAME = "predict-" + PIPELINE_NAME + "-fn"
PREDICT_CLOUD_FUNCTION_NAME

In [None]:
from src.tests.model_deployment_tests import test_instance
test_instance

In [None]:
test_instances = [test_instance]
predictions = endpoint.predict(test_instances).predictions

for prediction in predictions:
    print(prediction)


In [None]:
import json

request = {'instances': test_instances}

REQ_JSON=json.dumps(request)
REQ_JSON

In [None]:
os.environ['PROJECT_ID'] = PROJECT
os.environ['REGION'] = REGION
os.environ['ENDPOINT_ID'] = endpoint.name
os.environ['INPUT_DATA_FILE'] = "INPUT-JSON"
os.environ['REQ_JSON'] = REQ_JSON
!echo ${REQ_JSON} > ${INPUT_DATA_FILE}
!curl -X POST -H "Authorization: Bearer $(gcloud auth print-access-token)"  -H "Content-Type: application/json"  https://${REGION}-aiplatform.googleapis.com/v1/projects/${PROJECT_ID}/locations/europe-west4/endpoints/${ENDPOINT_ID}:predict  -d "@${INPUT_DATA_FILE}"

## Feature Store (Optional)

### Create Feature Store

In [None]:
from google.cloud.aiplatform_v1beta1 import FeaturestoreOnlineServingServiceClient, FeaturestoreServiceClient

In [None]:
from src.feature_store import feature_store as fs

In [None]:
fs.create_fs(PROJECT, REGION, FEATURESTORE_ID, "Feature Store for credit card use case")

In [None]:
from google.api_core import operations_v1
from google.cloud.aiplatform_v1beta1 import FeaturestoreOnlineServingServiceClient, FeaturestoreServiceClient, FeatureSelector
from google.cloud.aiplatform_v1beta1.types import featurestore_online_service as featurestore_online_service_pb2
from google.cloud.aiplatform_v1beta1.types import entity_type as entity_type_pb2
from google.cloud.aiplatform_v1beta1.types import feature as feature_pb2
from google.cloud.aiplatform_v1beta1.types import featurestore as featurestore_pb2
from google.cloud.aiplatform_v1beta1.types import featurestore_service as featurestore_service_pb2
from google.cloud.aiplatform_v1beta1.types import io as io_pb2
from google.cloud.aiplatform_v1beta1.types import ListFeaturestoresRequest, CreateFeaturestoreRequest, Featurestore, ListEntityTypesRequest

from google.protobuf.timestamp_pb2 import Timestamp
from google.cloud.aiplatform_v1beta1.types import featurestore_monitoring as featurestore_monitoring_pb2
from google.protobuf.duration_pb2 import Duration


API_ENDPOINT = f"{REGION}-aiplatform.googleapis.com"  
admin_client = FeaturestoreServiceClient(client_options={"api_endpoint": API_ENDPOINT})
parent = f'{admin_client.common_location_path(PROJECT, REGION)}/featurestores/{FEATURESTORE_ID}'
request = ListEntityTypesRequest(parent=parent)

# Make the request
page_result = admin_client.list_entity_types(request=request)

# Handle the response
[x.name.split('/')[-1] for x in page_result]


In [None]:
admin_client.featurestore_path(PROJECT, REGION, FEATURESTORE_ID)

#### Create an entity with features, generate some data and upload it

In [None]:
entity = 'user'
entity_descr = 'User ID'
features = ['v27', 'v28']

In [None]:
fs.create_entity(PROJECT, REGION, FEATURESTORE_ID, entity, entity_descr, features)

In [None]:
import random

filename = f'features_{entity}.csv'

with open(filename, 'w') as f:
    line = f'{entity},{",".join(features)}\n'
    f.write(line)
    for i in range(100):
        f.write(f'user{i},{random.random()},{random.random()}\n')

In [None]:
!tail -20 {filename}

In [None]:
BUCKET

In [None]:
!gsutil cp {filename} gs://{BUCKET}/{filename} 

In [None]:
gcs_uris = [f'gs://{BUCKET}/{filename}']

fs.ingest_entities_csv(PROJECT, REGION, FEATURESTORE_ID, entity, features, gcs_uris)

Test reading some features back

In [None]:
features_data = {}
for i in range(90,102):
    entity_id = f'user{i}'
    features_data[entity_id] = fs.read_features(PROJECT, REGION, FEATURESTORE_ID, entity, features, entity_id)

features_data

### Deploy Prediction Cloud Function to use with Feature Store


#### Test the enpoint with Feature store

In [None]:
from src.tests.model_deployment_tests import test_instance

import base64

if 'V27' in test_instance:
    del test_instance['V27']
if 'V28' in test_instance:
    del test_instance['V28']
test_instance['userid'] = 'user99'

test_instance

In [None]:
from flask import Flask
from src.prediction_cf.main import predict

app = Flask('test')
ctx = app.test_request_context(json=test_instance)
request = ctx.request

pred_retval = predict(request)
pred_retval

In [None]:
GOOGLE_FUNCTION_SOURCE ='src/prediction_cf/main.py'

ENV_VARS=f"""\
PROJECT={PROJECT},\
REGION={REGION},\
ENDPOINT_NAME={endpoint.name},\
ENTITY={entity},\
FEATURESTORE_ID={FEATURESTORE_ID}
"""

!echo {ENV_VARS}

In [None]:
!rm -rf src/prediction_cf/.ipynb_checkpoints

In [None]:
!echo gcloud functions deploy {PREDICT_CLOUD_FUNCTION_NAME} --gen2 \
    --set-build-env-vars=GOOGLE_FUNCTION_SOURCE={GOOGLE_FUNCTION_SOURCE} \
    --region={CF_REGION} \
    --runtime=python38 \
    --trigger-http \
    --source=. \
    --entry-point=predict\
    --stage-bucket={BUCKET}\
    --ingress-settings=internal-only\
    --service-account={SERVICE_ACCOUNT}\
    --set-env-vars={ENV_VARS}      