# Build and deploy a diabetes classification model with Vertex AI

## Setup

### Install libraries

In [None]:
!pip3 install google-cloud-bigquery==3.25.0 -U
!pip install google-cloud-aiplatform==1.59.0
!pip uninstall -y shapely pygeos geopandas
!pip install shapely==1.8.5.post1 pygeos==0.12.0 geopandas>=0.12.2
# Install pydot and graphviz
!pip install pydot
!sudo apt install graphviz -y

Collecting google-cloud-bigquery==3.25.0
  Downloading google_cloud_bigquery-3.25.0-py2.py3-none-any.whl.metadata (8.9 kB)
Downloading google_cloud_bigquery-3.25.0-py2.py3-none-any.whl (239 kB)
Installing collected packages: google-cloud-bigquery
  Attempting uninstall: google-cloud-bigquery
    Found existing installation: google-cloud-bigquery 3.4.2
    Uninstalling google-cloud-bigquery-3.4.2:
      Successfully uninstalled google-cloud-bigquery-3.4.2
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
bigframes 1.9.0 requires google-cloud-resource-manager>=1.10.3, but you have google-cloud-resource-manager 1.8.1 which is incompatible.[0m[31m
[0mSuccessfully installed google-cloud-bigquery-3.25.0
Collecting google-cloud-aiplatform==1.59.0
  Downloading google_cloud_aiplatform-1.59.0-py2.py3-none-any.whl.metadata (31 kB)
Downloading google_cloud_aiplatfor

### Restart the kernel

In [None]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

### Define constants

In [None]:
# Add installed library dependencies to Python PATH variable.
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin

env: PATH=/usr/local/cuda/bin:/opt/conda/bin:/opt/conda/condabin:/usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games:/home/jupyter/.local/bin:/home/jupyter/.local/bin


In [None]:
# Retrieve and set PROJECT_ID and REGION environment variables.
PROJECT_ID = "qwiklabs-gcp-02-816e025829d7"
REGION = "us-central1"
GCS_BUCKET = f"gs://{PROJECT_ID}"

In [None]:
!gcloud storage buckets create -l $REGION $GCS_BUCKET

Creating gs://qwiklabs-gcp-02-816e025829d7/...


### Import libraries

In [None]:
import os
import shutil
import logging

# TensorFlow model building libraries.
import tensorflow as tf
from tensorflow.keras import layers

# Libraries for data and plot model training metrics.
import pandas as pd
import matplotlib.pyplot as plt

import numpy as np

# Import the Vertex AI Python SDK.
from google.cloud import aiplatform as vertexai


2025-01-20 11:35:17.227353: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


### Initialize Vertex AI Python SDK

Initialize the Vertex AI Python SDK with GCP Project, Region, and Google Cloud Storage Bucket.

In [None]:
vertexai.init(project=PROJECT_ID, location=REGION, staging_bucket=GCS_BUCKET)

## Build and train the model locally in a Vertex Notebook

### Load the dataset and read it into a pandas DataFrame

In [None]:
csv_file = 'data/diabetes_prediction_dataset.csv'
dataframe = pd.read_csv(csv_file)

Inspect the dataset by checking the first five rows of the DataFrame:

In [None]:
dataframe.head()

Unnamed: 0,gender,age,hypertension,heart_disease,smoking_history,bmi,HbA1c_level,blood_glucose_level,diabetes
0,Female,80.0,0,1,never,25.19,6.6,140,0
1,Female,54.0,0,0,No Info,27.32,6.6,80,0
2,Male,28.0,0,0,never,27.32,5.7,158,0
3,Female,36.0,0,0,current,23.45,5.0,155,0
4,Male,76.0,1,1,current,20.14,4.8,155,0


Create the target variable:

In [None]:
dataframe['target'] = dataframe.pop('diabetes')

### Split the DataFrame into training, validation, and test sets

The dataset is in a single pandas DataFrame. Split it into training, validation, and test sets using a, for example, 80:10:10 ratio, respectively:

In [None]:
train, val, test = np.split(dataframe.sample(frac=1), [int(0.8*len(dataframe)), int(0.9*len(dataframe))])

  return bound(*args, **kwds)


In [None]:
print(len(train), 'training examples')
print(len(val), 'validation examples')
print(len(test), 'test examples')

80000 training examples
10000 validation examples
10000 test examples


### Create an input pipeline using tf.data

Next, create a utility function that converts each training, validation, and test set DataFrame into a `tf.data.Dataset`, then shuffles and batches the data.

In [None]:
def df_to_dataset(dataframe, shuffle=True, batch_size=256):
  df = dataframe.copy()
  labels = df.pop('target')
  df = {key: value.to_numpy()[:,tf.newaxis] for key, value in dataframe.items()}
  ds = tf.data.Dataset.from_tensor_slices((dict(df), labels))
  if shuffle:
    ds = ds.shuffle(buffer_size=len(dataframe))
  ds = ds.batch(batch_size)
  ds = ds.prefetch(batch_size)
  return ds

### Apply the Keras preprocessing layers

The Keras preprocessing layers allow you to build Keras-native input processing pipelines, which can be used as independent preprocessing code in non-Keras workflows, combined directly with Keras models, and exported as part of a Keras SavedModel.

In this tutorial, you will use the following four preprocessing layers to demonstrate how to perform preprocessing, structured data encoding, and feature engineering:

- `tf.keras.layers.Normalization`: Performs feature-wise normalization of input features.
- `tf.keras.layers.CategoryEncoding`: Turns integer categorical features into one-hot, multi-hot, or <a href="https://en.wikipedia.org/wiki/Tf%E2%80%93idf" class="external">tf-idf</a>
dense representations.
- `tf.keras.layers.StringLookup`: Turns string categorical values into integer indices.
- `tf.keras.layers.IntegerLookup`: Turns integer categorical values into integer indices.

You can learn more about the available layers in the [Working with preprocessing layers](https://www.tensorflow.org/guide/keras/preprocessing_layers) guide.

- For _numerical features_ of the dataset, you will use a `tf.keras.layers.Normalization` layer to standardize the distribution of the data.
- For _categorical features_, you will transform them to multi-hot encoded tensors with `tf.keras.layers.CategoryEncoding`.

### Numerical columns

Define a new utility function that returns a layer which applies feature-wise normalization to numerical features using that Keras preprocessing layer:

In [None]:
def get_normalization_layer(name, dataset):
  # Create a Normalization layer for the feature.
  normalizer = layers.Normalization(axis=None)

  # Prepare a Dataset that only yields the feature.
  feature_ds = dataset.map(lambda x, y: x[name])

  # Learn the statistics of the data.
  normalizer.adapt(feature_ds)

  return normalizer

### Categorical columns

Define another new utility function that returns a layer which maps values from a vocabulary to integer indices and multi-hot encodes the features using the `tf.keras.layers.StringLookup`, `tf.keras.layers.IntegerLookup`, and `tf.keras.CategoryEncoding` preprocessing layers:

In [None]:
def get_category_encoding_layer(name, dataset, dtype, max_tokens=None):
  # Create a layer that turns strings into integer indices.
  if dtype == 'string':
    index = layers.StringLookup(max_tokens=max_tokens)
  # Otherwise, create a layer that turns integer values into integer indices.
  else:
    index = layers.IntegerLookup(max_tokens=max_tokens)

  # Prepare a `tf.data.Dataset` that only yields the feature.
  feature_ds = dataset.map(lambda x, y: x[name])

  # Learn the set of possible values and assign them a fixed integer index.
  index.adapt(feature_ds)

  # Encode the integer indices.
  encoder = layers.CategoryEncoding(num_tokens=index.vocabulary_size())

  # Apply multi-hot encoding to the indices. The lambda function captures the
  # layer, so you can use them, or include them in the Keras Functional model later.
  return lambda feature: encoder(index(feature))

### Preprocess selected features to train the model on

In [None]:
batch_size = 256
train_ds = df_to_dataset(train, batch_size=batch_size)
val_ds = df_to_dataset(val, shuffle=False, batch_size=batch_size)
test_ds = df_to_dataset(test, shuffle=False, batch_size=batch_size)

Normalize the numerical features, and add them to one list of inputs called `encoded_features`:

In [None]:
all_inputs = {}
encoded_features = []

# Numerical features.
for header in ['age','bmi','HbA1c_level','blood_glucose_level']:
  numeric_col = tf.keras.Input(shape=(1,), name=header)
  normalization_layer = get_normalization_layer(header, train_ds)
  encoded_numeric_col = normalization_layer(numeric_col)
  all_inputs[header] = numeric_col
  encoded_features.append(encoded_numeric_col)

Turn the integer categorical values from the dataset into integer indices, perform multi-hot encoding, and add the resulting feature inputs to `encoded_features`:

In [None]:
int_categorical_cols = ['hypertension', 'heart_disease']

for header in int_categorical_cols:
  int_categorical_col = tf.keras.Input(shape=(1,), name=header, dtype='int64')
  encoding_layer = get_category_encoding_layer(name=header,
                                             dataset=train_ds,
                                             dtype='int64',
                                             max_tokens=5)
  encoded_int_categorical_col = encoding_layer(int_categorical_col)
  all_inputs[header] = int_categorical_col
  encoded_features.append(encoded_int_categorical_col)

Repeat the same step for the string categorical values:

In [None]:
categorical_cols = ['gender', 'smoking_history']

for header in categorical_cols:
  categorical_col = tf.keras.Input(shape=(1,), name=header, dtype='string')
  encoding_layer = get_category_encoding_layer(name=header,
                                               dataset=train_ds,
                                               dtype='string',
                                               max_tokens=5)
  encoded_categorical_col = encoding_layer(categorical_col)
  all_inputs[header] = categorical_col
  encoded_features.append(encoded_categorical_col)

In [None]:
print(encoded_features)

[<KerasTensor: shape=(None, 1) dtype=float32 (created by layer 'normalization')>, <KerasTensor: shape=(None, 1) dtype=float32 (created by layer 'normalization_1')>, <KerasTensor: shape=(None, 1) dtype=float32 (created by layer 'normalization_2')>, <KerasTensor: shape=(None, 1) dtype=float32 (created by layer 'normalization_3')>, <KerasTensor: shape=(None, 3) dtype=float32 (created by layer 'category_encoding')>, <KerasTensor: shape=(None, 3) dtype=float32 (created by layer 'category_encoding_1')>, <KerasTensor: shape=(None, 4) dtype=float32 (created by layer 'category_encoding_2')>, <KerasTensor: shape=(None, 5) dtype=float32 (created by layer 'category_encoding_3')>]


### Create, compile, and train the model


In [None]:
all_features = tf.keras.layers.concatenate(encoded_features)
x = tf.keras.layers.Dense(32, activation="relu")(all_features)
x = tf.keras.layers.Dropout(0.5)(x)
output = tf.keras.layers.Dense(1)(x)

model = tf.keras.Model(all_inputs, output)

Configure the model with Keras `Model.compile`:

In [None]:
model.compile(optimizer='adam',
              loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
              metrics=["accuracy"],
              run_eagerly=True)

Next, train and test the model:

In [None]:
model.fit(train_ds, epochs=10, validation_data=val_ds)

Epoch 1/10


  inputs = self._flatten_to_reference_inputs(inputs)


Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.src.callbacks.History at 0x7f6ab8447d90>

In [None]:
result = model.evaluate(test_ds, return_dict=True)
print(result)

{'loss': 0.09882796555757523, 'accuracy': 0.9661999940872192}


## Containerize the model code

Next step is to train and deploy the model on Google Cloud's Vertex AI platform.

### 1. Write a `model.py` training script

First, tidy up the local TensorFlow model training code from above into a training script.

In [None]:
MODEL_DIR = "diabetes-prediction-model"
if not os.path.exists(MODEL_DIR):
    os.makedirs(MODEL_DIR)
    print(f"Created directory: {MODEL_DIR}")
trainer_dir = os.path.join(MODEL_DIR, "trainer")
if not os.path.exists(trainer_dir):
    os.makedirs(trainer_dir)
    print(f"Created directory: {trainer_dir}")

Created directory: diabetes-prediction-model
Created directory: diabetes-prediction-model/trainer


In [None]:
%%writefile {MODEL_DIR}/trainer/model.py
import os
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers

def df_to_dataset(dataframe, shuffle=True, batch_size=32):
    """Convert a Pandas DataFrame into a TensorFlow Dataset."""
    df = dataframe.copy()
    labels = df.pop('target')
    # Convert each feature column to (batch, 1) shape
    df = {key: value.to_numpy()[:, tf.newaxis] for key, value in df.items()}
    ds = tf.data.Dataset.from_tensor_slices((dict(df), labels))

    if shuffle:
        ds = ds.shuffle(buffer_size=len(dataframe))
    ds = ds.batch(batch_size)
    ds = ds.prefetch(batch_size)
    return ds

def get_normalization_layer(name, dataset):
    """Create and adapt a normalization layer for a numerical feature."""
    normalizer = layers.Normalization(axis=None)
    feature_ds = dataset.map(lambda x, y: x[name])  # Extract a single feature
    normalizer.adapt(feature_ds)
    return normalizer

def get_category_encoding_layer(name, dataset, dtype, max_tokens=None):
    """Create and adapt a category encoding layer for a categorical feature."""
    # Create either a StringLookup or IntegerLookup based on dtype
    if dtype == 'string':
        index = layers.StringLookup(max_tokens=max_tokens)
    else:
        index = layers.IntegerLookup(max_tokens=max_tokens)

    # Adapt the index layer to the dataset
    feature_ds = dataset.map(lambda x, y: x[name])
    index.adapt(feature_ds)

    # Create a one-hot (multi-hot) encoding layer
    encoder = layers.CategoryEncoding(num_tokens=index.vocabulary_size())
    return lambda feature: encoder(index(feature))

def load_datasets(csv_file, batch_size=32):
    """
    Load the CSV file and split it into train, validation, and test sets.
    Returns tf.data.Datasets for each split.
    """
    dataframe = pd.read_csv(csv_file)

    # Rename 'diabetes' column to 'target'
    dataframe['target'] = dataframe.pop('diabetes')

    # Randomly shuffle and split data into train/val/test
    train, val, test = np.split(
        dataframe.sample(frac=1),
        [int(0.8 * len(dataframe)), int(0.9 * len(dataframe))]
    )

    train_ds = df_to_dataset(train, batch_size=batch_size)
    val_ds = df_to_dataset(val, shuffle=False, batch_size=batch_size)
    test_ds = df_to_dataset(test, shuffle=False, batch_size=batch_size)

    return train_ds, val_ds, test_ds

def build_feature_layers(train_ds):
    """
    Build input layers and corresponding preprocessing/encoding layers for each feature.
    Returns (dict_of_inputs, list_of_encoded_features).
    """
    all_inputs = {}
    encoded_features = []

    # Numerical features
    numeric_features = ['age', 'bmi', 'HbA1c_level', 'blood_glucose_level']
    for header in numeric_features:
        numeric_col = tf.keras.Input(shape=(1,), name=header)
        normalization_layer = get_normalization_layer(header, train_ds)
        encoded_numeric_col = normalization_layer(numeric_col)
        all_inputs[header] = numeric_col
        encoded_features.append(encoded_numeric_col)

    # Integer categorical features
    int_categorical_cols = ['hypertension', 'heart_disease']
    for header in int_categorical_cols:
        int_categorical_col = tf.keras.Input(shape=(1,), name=header, dtype='int32')
        encoding_layer = get_category_encoding_layer(
            name=header, dataset=train_ds, dtype='int', max_tokens=5
        )
        encoded_int_categorical_col = encoding_layer(int_categorical_col)
        all_inputs[header] = int_categorical_col
        encoded_features.append(encoded_int_categorical_col)

    # String categorical features
    categorical_cols = ['gender', 'smoking_history']
    for header in categorical_cols:
        categorical_col = tf.keras.Input(shape=(1,), name=header, dtype='string')
        encoding_layer = get_category_encoding_layer(
            name=header, dataset=train_ds, dtype='string', max_tokens=5
        )
        encoded_categorical_col = encoding_layer(categorical_col)
        all_inputs[header] = categorical_col
        encoded_features.append(encoded_categorical_col)

    return all_inputs, encoded_features

def build_model(all_inputs, encoded_features, dropout_rate=0.5):
    """
    Build and compile the TensorFlow model.
    """
    # Concatenate all encoded features
    all_features = layers.concatenate(encoded_features)
    x = layers.Dense(32, activation="relu")(all_features)
    x = layers.Dropout(dropout_rate)(x)
    outputs = layers.Dense(1)(x)

    model = tf.keras.Model(inputs=all_inputs, outputs=outputs)
    model.compile(
        optimizer='adam',
        loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
        metrics=['accuracy']
    )
    return model

def train_evaluate(hparams):
    """
    Main training procedure: loads datasets, builds a model, trains, evaluates,
    and exports the model in SavedModel format.

    hparams(dict) is expected to have the following keys:
      - 'data-file': path to the CSV file
      - 'batch-size': batch size
      - 'epochs': number of epochs
      - 'dropout': dropout rate
      - 'model-dir': path where the model will be exported
    """
    # 1. Load the data
    csv_file = hparams['data-file']
    train_ds, val_ds, test_ds = load_datasets(csv_file, batch_size=hparams['batch-size'])

    # 2. Build feature layers and the model
    all_inputs, encoded_features = build_feature_layers(train_ds)
    model = build_model(all_inputs, encoded_features, dropout_rate=hparams['dropout'])

    # 3. Train the model
    history = model.fit(
        train_ds,
        validation_data=val_ds,
        epochs=hparams['epochs']
    )

    # 4. Evaluate on the test set
    test_result = model.evaluate(test_ds, return_dict=True)
    print("Test evaluation:", test_result)

    # 5. Export the model
    export_dir = hparams['model-dir']
    model.save(export_dir)
    print(f"Model exported to: {export_dir}")

    return history

Writing diabetes-prediction-model/trainer/model.py


### 2. Write a `task.py` file as an entrypoint to the custom model container

In [None]:
%%writefile {MODEL_DIR}/trainer/task.py

import os
import argparse

from trainer import model  # Assume model.py is under trainer/ directory

if __name__ == '__main__':
    parser = argparse.ArgumentParser()

    # Vertex AI sets the model artifact path in AIP_MODEL_DIR,
    # but you can also provide a custom default or override it.
    parser.add_argument('--model-dir', dest='model-dir',
                        default=os.environ.get('AIP_MODEL_DIR', 'exported_model'),
                        type=str,
                        help='Path to export the trained model.')

    parser.add_argument('--data-file', dest='data-file',
                        default='data/diabetes_prediction_dataset.csv',
                        type=str,
                        help='Path to the CSV dataset file.')

    parser.add_argument('--batch-size', dest='batch-size',
                        default=32, type=int,
                        help='Batch size for training.')

    parser.add_argument('--epochs', dest='epochs',
                        default=5, type=int,
                        help='Number of training epochs.')

    parser.add_argument('--dropout', dest='dropout',
                        default=0.5, type=float,
                        help='Dropout rate.')

    args = parser.parse_args()
    hparams = args.__dict__

    model.train_evaluate(hparams)

Writing diabetes-prediction-model/trainer/task.py


### 3. Write a `Dockerfile` for the custom model container

The `Dockerfile` contains instructions to package the model code in `bert-sentiment-classifier` as well as specifies the model code's dependencies needed for execution together in a Docker container.

In [None]:
%%writefile {MODEL_DIR}/Dockerfile
# Specifies base image and tag.
# https://cloud.google.com/vertex-ai/docs/training/pre-built-containers
FROM us-docker.pkg.dev/vertex-ai/training/tf-cpu.2-11:latest

# Sets the container working directory.
WORKDIR /root

# Copies the requirements.txt into the container to reduce network calls.
COPY requirements.txt .

# Installs additional packages.
RUN pip3 install -U -r requirements.txt

# b/203105209 Removes unneeded file from TF2.5 CPU image for python_module CustomJob training.
# Will be removed on subsequent public Vertex images.
RUN rm -rf /var/sitecustomize/sitecustomize.py

# Copies the trainer code to the docker image.
COPY . /trainer

# Sets the container working directory.
WORKDIR /trainer

# Sets up the entry point to invoke the trainer.
ENTRYPOINT ["python", "-m", "trainer.task"]

Writing diabetes-prediction-model/Dockerfile


### 4. Write a `requirements.txt` file to specify additional ML code dependencies

In [None]:
%%writefile {MODEL_DIR}/requirements.txt
tf-models-official==2.15.0

Writing diabetes-prediction-model/requirements.txt


## Use Cloud Build to build and submit the model container to Artifact Registry

Next, [Cloud Build](https://cloud.google.com/build) is used to build and upload the custom TensorFlow model container to [Google Cloud Artifact Registry](https://cloud.google.com/artifact-registry).

Cloud Build brings reusability and automation to ML experimentation by enabling building, testing, and deploying ML model code as part of a CI/CD workflow. Artifact Registry provides a centralized repository to store, manage, and secure ML container images. This will allow users to securely share ML work with others and reproduce experiment results.

### 1. Create Artifact Registry for custom container images

In [None]:
ARTIFACT_REGISTRY="diabetes-prediction-model"

In [None]:
# create a Docker Artifact Registry using the gcloud CLI
!gcloud artifacts repositories create $ARTIFACT_REGISTRY \
    --repository-format=docker \
    --location=$REGION \
    --description="Artifact Registry for the diabetes prediction model"

Create request issued for: [diabetes-prediction-model]
Waiting for operation [projects/qwiklabs-gcp-02-816e025829d7/locations/us-centr
al1/operations/d7cafca9-f35b-4981-8749-91061351d48b] to complete...done.       
Created repository [diabetes-prediction-model].


### 2. Create `cloudbuild.yaml` instructions

In [None]:
IMAGE_NAME="diabetes-prediction-model"
IMAGE_TAG="latest"
IMAGE_URI=f"{REGION}-docker.pkg.dev/{PROJECT_ID}/{ARTIFACT_REGISTRY}/{IMAGE_NAME}:{IMAGE_TAG}"

In [None]:
cloudbuild_yaml = f"""steps:
- name: 'gcr.io/cloud-builders/docker'
  args: [ 'build', '-t', '{IMAGE_URI}', '.' ]
images:
- '{IMAGE_URI}'"""

with open(f"{MODEL_DIR}/cloudbuild.yaml", "w") as fp:
    fp.write(cloudbuild_yaml)

### 3. Build and submit the container image to Artifact Registry using Cloud Build

In [None]:
# use Cloud Build to build and submit the custom model container to Artifact Registry.
!gcloud builds submit $MODEL_DIR \
    --config $MODEL_DIR/cloudbuild.yaml \
    --timeout=30m

Creating temporary archive of 5 file(s) totalling 8.2 KiB before compression.
Uploading tarball of [diabetes-prediction-model] to [gs://qwiklabs-gcp-02-816e025829d7_cloudbuild/source/1737373366.538596-2525e80fba04411881443e6c12dd98e0.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/qwiklabs-gcp-02-816e025829d7/locations/global/builds/141100e6-cbfe-4c78-893d-6c57d785d97e].
Logs are available at [ https://console.cloud.google.com/cloud-build/builds/141100e6-cbfe-4c78-893d-6c57d785d97e?project=424228142313 ].
Waiting for build to complete. Polling interval: 1 second(s).
----------------------------- REMOTE BUILD OUTPUT ------------------------------
starting build "141100e6-cbfe-4c78-893d-6c57d785d97e"

FETCHSOURCE
Fetching storage object: gs://qwiklabs-gcp-02-816e025829d7_cloudbuild/source/1737373366.538596-2525e80fba04411881443e6c12dd98e0.tgz#1737373367368273
Copying gs://qwiklabs-gcp-02-816e025829d7_cloudbuild/source/1737373366.538596-2525e80fba04411881443e6c12dd98e0.tgz#173

## Define a pipeline using the KFP V2 SDK

To get higher performing model into production to deliver value faster, a pipeline is defined using the [**Kubeflow Pipelines (KFP) V2 SDK**](https://www.kubeflow.org/docs/components/pipelines/sdk/v2/v2-compatibility) to orchestrate the training and deployment of the model on [**Vertex Pipelines**](https://cloud.google.com/vertex-ai/docs/pipelines) below.

In [None]:
import datetime
# google_cloud_pipeline_components includes pre-built KFP components for interfacing with Vertex AI services.
from google_cloud_pipeline_components import aiplatform as gcc_aip
from kfp.v2 import dsl

In [None]:
TIMESTAMP=datetime.datetime.now().strftime('%Y%m%d%H%M%S')
DISPLAY_NAME = "diabetes-prediction-{}".format(TIMESTAMP)
GCS_BASE_OUTPUT_DIR= f"{GCS_BUCKET}/{MODEL_DIR}-{TIMESTAMP}"

USER = "ZhaoYufan"
PIPELINE_ROOT = "{}/pipeline_root/{}".format(GCS_BUCKET, USER)

print(f"Model display name: {DISPLAY_NAME}")
print(f"GCS dir for model training artifacts: {GCS_BASE_OUTPUT_DIR}")
print(f"GCS dir for pipeline artifacts: {PIPELINE_ROOT}")

Model display name: diabetes-prediction-20250120115236
GCS dir for model training artifacts: gs://qwiklabs-gcp-02-816e025829d7/diabetes-prediction-model-20250120115236
GCS dir for pipeline artifacts: gs://qwiklabs-gcp-02-816e025829d7/pipeline_root/ZhaoYufan


In [None]:
# Pre-built Vertex model serving container for deployment.
# https://cloud.google.com/vertex-ai/docs/predictions/pre-built-containers
SERVING_IMAGE_URI = "us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-11:latest"

The pipeline consists of three components:

* `CustomContainerTrainingJobRunOp` [(documentation)](https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-0.2.0/google_cloud_pipeline_components.aiplatform.html#google_cloud_pipeline_components.aiplatform.CustomContainerTrainingJobRunOp): trains the custom model container using Vertex Training.

*  `EndpointCreateOp` [(documentation)](https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-0.2.0/google_cloud_pipeline_components.aiplatform.html#google_cloud_pipeline_components.aiplatform.EndpointCreateOp): Creates a Google Cloud Vertex Endpoint resource that maps physical machine resources with the model to enable it to serve online predictions. Online predictions have low latency requirements; providing resources to the model in advance reduces latency.

* `ModelDeployOp`[(documentation)](https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-0.2.0/google_cloud_pipeline_components.aiplatform.html#google_cloud_pipeline_components.aiplatform.ModelDeployOp): deploys the model to a Vertex Prediction Endpoint for online predictions.

In [None]:
@dsl.pipeline(name="diabetes-prediction", pipeline_root=PIPELINE_ROOT)
def pipeline(
    project: str = PROJECT_ID,
    location: str = REGION,
    staging_bucket: str = GCS_BUCKET,
    display_name: str = DISPLAY_NAME,
    container_uri: str = IMAGE_URI,
    model_serving_container_image_uri: str = SERVING_IMAGE_URI,
    base_output_dir: str = GCS_BASE_OUTPUT_DIR,
):

    # Add and configure the pre-built KFP CustomContainerTrainingJobRunOp component
    model_train_evaluate_op = gcc_aip.CustomContainerTrainingJobRunOp(
        # Vertex AI Python SDK authentication parameters.
        project=project,
        location=location,
        staging_bucket=staging_bucket,
        # WorkerPool arguments.
        replica_count=1,
        machine_type="e2-standard-4",
        display_name = display_name,
        container_uri = container_uri,
        model_serving_container_image_uri = model_serving_container_image_uri,
        base_output_dir = base_output_dir,
    )

    # Create a Vertex Endpoint resource in parallel with model training.
    endpoint_create_op = gcc_aip.EndpointCreateOp(
        # Vertex AI Python SDK authentication parameters.
        project=project,
        location=location,
        display_name=display_name

    )

    # Deploy the model to the created Endpoint resource for online predictions.
    model_deploy_op = gcc_aip.ModelDeployOp(
        # Link to model training component through output model artifact.
        model=model_train_evaluate_op.outputs["model"],
        # Link to the created Endpoint.
        endpoint=endpoint_create_op.outputs["endpoint"],
        # Define prediction request routing. {"0": 100} indicates 100% of traffic
        # to the ID of the current model being deployed.
        traffic_split={"0": 100},
        # WorkerPool arguments.
        dedicated_resources_machine_type="e2-standard-4",
        dedicated_resources_min_replica_count=1,
        dedicated_resources_max_replica_count=2
    )

## Compile the pipeline

In [None]:
from kfp.v2 import compiler

In [None]:
compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="diabetes-prediction.json"
)



## Run the pipeline on Vertex Pipelines

The `PipelineJob` is configured below and triggered through the `run()` method.

In [None]:
vertex_pipelines_job = vertexai.pipeline_jobs.PipelineJob(
    display_name="diabetes-prediction",
    template_path="diabetes-prediction.json",
    parameter_values={
        "project": PROJECT_ID,
        "location": REGION,
        "staging_bucket": GCS_BUCKET,
        "display_name": DISPLAY_NAME,
        "container_uri": IMAGE_URI,
        "model_serving_container_image_uri": SERVING_IMAGE_URI,
        "base_output_dir": GCS_BASE_OUTPUT_DIR},
    enable_caching=True,
)

In [None]:
vertex_pipelines_job.run()

## Query the deployed model using the Vertex endpoint

Finally, retrieve the `Endpoint` deployed by the pipeline and use it to query the model for online predictions.

In [None]:
# Retrieve the deployed Endpoint name from pipeline.
ENDPOINT_NAME = vertexai.Endpoint.list()[0].name

In [None]:
# Generate online predictions using Vertex Endpoint.
endpoint = vertexai.Endpoint(
    endpoint_name=ENDPOINT_NAME,
    project=PROJECT_ID,
    location=REGION
)

In [None]:
sample = {
    'gender': 'Female',
    'age': 56.0,
    'hypertension': 1,
    'heart_disease': 0,
    'smoking_history': 'never',
    'bmi': 27.53,
    'HbA1c_level': 8.1,
    'blood_glucose_level': 77,
}

In [None]:
predictions = endpoint.predict(instances=[sample])

In [None]:
print(predictions)

[[2.8715334]]


In [None]:
prob = tf.nn.sigmoid(predictions[0])

In [None]:
print(
    "This particular patient had a %.1f percent probability "
    "of being diabetic." % (100 * prob)
)

This particular patient had a 94.6 percent probability of being diabetic.
