In [None]:
# An example of the Vertex AI Pipeline, serverless ML pipeline

# Vertex Pipelines: Model train, upload, and deploy using google-cloud-pipeline-components


### Install additional packages


In [None]:
import os

# Google Cloud Notebook
if os.path.exists("/opt/deeplearning/metadata/env_version"):
    USER_FLAG = "--user"
else:
    USER_FLAG = ""

In [None]:
!pip3 install --upgrade google-cloud-aiplatform $USER_FLAG
!pip3 install --upgrade google-cloud-storage $USER_FLAG
!pip3 install --upgrade kfp $USER_FLAG
!pip3 install --upgrade google-cloud-pipeline-components $USER_FLAG

### Restart the kernel

After you install the additional packages, you need to restart the notebook kernel so it can find the packages.

In [None]:
# Automatically restart kernel after installs
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

Check the versions of the packages you installed.  The KFP SDK version should be >=1.6.

In [None]:
!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"

In [None]:
import os

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

Otherwise, set your project ID here.

In [None]:
if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "google.com:ml-baguette-demos"  
    !gcloud config set project {PROJECT_ID}

#### Timestamp

If you are in a live tutorial session, you might be using a shared test account or project. To avoid name collisions between users on resources created, you create a timestamp for each instance session, and append it onto the name of resources you create in this tutorial.

In [None]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

In [None]:
# TIMESTAMP = "20220404161754"

### Create a Cloud Storage bucket as necessary

You will need a Cloud Storage bucket for this example.  If you don't have one that you want to use, you can make one now.

In [None]:
BUCKET_NAME = "gs://[your-bucket-name]"  # @param {type:"string"}
REGION = "us-central1"  # @param {type:"string"}

In [None]:
if BUCKET_NAME == "" or BUCKET_NAME is None or BUCKET_NAME == "gs://[your-bucket-name]":
    BUCKET_NAME = "gs://" + PROJECT_ID + "aip-" + TIMESTAMP
    BUCKET_NAME = BUCKET_NAME.replace('google.com:', '')

**Only if your bucket doesn't already exist**: Run the following cell to create your Cloud Storage bucket.

In [None]:
! gsutil mb -l $REGION $BUCKET_NAME

Finally, validate access to your Cloud Storage bucket by examining its contents:

In [None]:
! gsutil ls -al $BUCKET_NAME

#### Service Account

**If you don't know your service account**, try to get your service account using `gcloud` command by executing the second cell below.

In [None]:
SERVICE_ACCOUNT = "[your-service-account]"  # @param {type:"string"}

In [None]:
if (
    SERVICE_ACCOUNT == ""
    or SERVICE_ACCOUNT is None
    or SERVICE_ACCOUNT == "[your-service-account]"
):
    # Get your GCP project id from gcloud
    shell_output = !gcloud auth list 2>/dev/null
    SERVICE_ACCOUNT = shell_output[2].replace('*', '').strip()
    print("Service Account:", SERVICE_ACCOUNT)

#### Set service account access for Vertex AI Pipelines

Run the following commands to grant your service account access to read and write pipeline artifacts in the bucket that you created in the previous step -- you only need to run these once per service account.

In [None]:
! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_NAME

! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_NAME

### Import libraries and define constants

Define some constants. 

In [None]:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin

PIPELINE_ROOT = "{}/pipeline_root/cifar_image_clf".format(BUCKET_NAME)
PIPELINE_ROOT

Do some imports:

In [None]:
import uuid
from typing import NamedTuple

import kfp
from kfp.v2.dsl import component
from kfp.v2.components import importer_node

import google.cloud.aiplatform as aip

from google_cloud_pipeline_components.experimental.custom_job import utils
from google_cloud_pipeline_components.types import artifact_types
from google_cloud_pipeline_components.v1.custom_job import \
    CustomTrainingJobOp
from google_cloud_pipeline_components.v1.endpoint import (EndpointCreateOp,
                                                          ModelDeployOp)
from google_cloud_pipeline_components.v1.model import ModelUploadOp

#### Set hardware accelerators
You can set hardware accelerators for both training and prediction.

In [None]:
TRAIN_GPU, TRAIN_NGPU = (aip.gapic.AcceleratorType.NVIDIA_TESLA_K80, 1)

DEPLOY_GPU, DEPLOY_NGPU = (aip.gapic.AcceleratorType.NVIDIA_TESLA_K80, 1)

#### Set pre-built containers

Vertex AI provides pre-built containers to run training and prediction.

For the latest list, see [Pre-built containers for training](https://cloud.google.com/ai-platform-unified/docs/training/pre-built-containers) and [Pre-built containers for prediction](https://cloud.google.com/ai-platform-unified/docs/predictions/pre-built-containers)

In [None]:
TRAIN_VERSION = "tf-gpu.2-4"
DEPLOY_VERSION = "tf2-gpu.2-4" 

TF2_GPU_IMAGE = "tensorflow/tensorflow:latest-gpu"
TRAIN_IMAGE = "gcr.io/cloud-aiplatform/training/{}:latest".format(TRAIN_VERSION)
DEPLOY_IMAGE = "gcr.io/cloud-aiplatform/prediction/{}:latest".format(DEPLOY_VERSION)

print("Training:", TF2_GPU_IMAGE, TRAIN_GPU, TRAIN_NGPU)
print("Deployment:", DEPLOY_IMAGE, DEPLOY_GPU, DEPLOY_NGPU)

#### Set machine types

Next, set the machine types to use for training and prediction.

In [None]:
MACHINE_TYPE = "n1-standard"

VCPU = "4"
TRAIN_COMPUTE = MACHINE_TYPE + "-" + VCPU
print("Train machine type", TRAIN_COMPUTE)

MACHINE_TYPE = "n1-standard"

VCPU = "4"
DEPLOY_COMPUTE = MACHINE_TYPE + "-" + VCPU
print("Deploy machine type", DEPLOY_COMPUTE)

#### Instantiate an Vertex Pipeline KFP API client object:

In [None]:
aip.init(project=PROJECT_ID, staging_bucket=BUCKET_NAME)

## Train a model

In [None]:
JOB_NAME = "custom_job_" + TIMESTAMP
MODEL_DIR = "{}/{}".format(BUCKET_NAME, JOB_NAME)
MODEL_DISPLAY_NAME = "cifar10-" + TIMESTAMP

if not TRAIN_NGPU or TRAIN_NGPU < 2:
    TRAIN_STRATEGY = "single"
else:
    TRAIN_STRATEGY = "mirror"

EPOCHS = 10
STEPS = 100

print(MODEL_DIR)

#### Training script

In [None]:
# Prepare a function to convert it to a component

@component(base_image=TF2_GPU_IMAGE,
           output_component_file='train_op.yaml', 
           packages_to_install=['tensorflow-datasets'])
def train(
  lr: float,
  epochs: int,
  steps: int,
  distribute: str,
  model_uri: str,
):
  
  # Imports
  import tensorflow_datasets as tfds
  import tensorflow as tf
  from tensorflow.python.client import device_lib
  import os
  import sys

  import logging
  logging.warning('Start custom execution')

  try:
    tfds.disable_progress_bar()
  except Exception as e:
    print(f'Exception: {e}')

  print('Python Version = {}'.format(sys.version))
  print('TensorFlow Version = {}'.format(tf.__version__))
  print('TF_CONFIG = {}'.format(os.environ.get('TF_CONFIG', 'Not found')))
  print('DEVICES', device_lib.list_local_devices())

  # Single Machine, single compute device
  if distribute == 'single':
      if tf.test.is_gpu_available():
          strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")
      else:
          strategy = tf.distribute.OneDeviceStrategy(device="/cpu:0")
  # Single Machine, multiple compute device
  elif args.distribute == 'mirror':
      strategy = tf.distribute.MirroredStrategy()

  # Preparing dataset
  BUFFER_SIZE = 10000
  BATCH_SIZE = 64

  def make_datasets_unbatched():
    # Scaling CIFAR10 data from (0, 255] to (0., 1.]
    def scale(image, label):
      image = tf.cast(image, tf.float32)
      image /= 255.0
      return image, label

    print('Make datasets unbatched')
    datasets, info = tfds.load(name='cifar10',
                              with_info=True,
                              as_supervised=True)
    return datasets['train'].map(scale).cache().shuffle(BUFFER_SIZE).repeat()


  # Build the Keras model
  def build_and_compile_cnn_model():
    print('Build model')
    model = tf.keras.Sequential([
        tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(32, 32, 3)),
        tf.keras.layers.MaxPooling2D(),
        tf.keras.layers.Conv2D(32, 3, activation='relu'),
        tf.keras.layers.MaxPooling2D(),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(10, activation='softmax')
    ])
    model.compile(
        loss=tf.keras.losses.sparse_categorical_crossentropy,
        optimizer=tf.keras.optimizers.SGD(learning_rate=lr),
        metrics=['accuracy'])
    return model

  # Train the model
  train_dataset = make_datasets_unbatched().batch(BATCH_SIZE)

  with strategy.scope():
    # Creation of dataset, and model building/compiling need to be within
    # `strategy.scope()`.
    model = build_and_compile_cnn_model()
    
  print('Train model')
  model.fit(x=train_dataset, epochs=epochs, steps_per_epoch=steps)
  print('Save model')
  model.save(model_uri)

In [None]:
# train(lr=0.01, epochs=1, steps=STEPS, distribute='single', model_uri=MODEL_DIR)

In [None]:
custom_job_distributed_training_op = utils.create_custom_training_job_op_from_component(
    train,
    display_name=MODEL_DISPLAY_NAME,
    replica_count=1,
    machine_type="n1-standard-4",
    accelerator_type=TRAIN_GPU,
    accelerator_count=TRAIN_NGPU,
    )

In [None]:
@kfp.dsl.pipeline(name="train-endpoint-deploy" + str(uuid.uuid4()))
def pipeline(
    project: str = PROJECT_ID,
    model_display_name: str = MODEL_DISPLAY_NAME,
    lr: float = 0.01,
    epochs: int = EPOCHS,
    steps: int = STEPS,
    distribute: str = 'single'
):


    custom_producer_task = custom_job_distributed_training_op(
        model_uri=MODEL_DIR,
        lr=lr,
        epochs=epochs,
        steps=steps,
        distribute=distribute,
        project=project,
        location=REGION,
        base_output_directory=PIPELINE_ROOT,
    )

    import_unmanaged_model_task = importer_node.importer(
        artifact_uri=MODEL_DIR,
        artifact_class=artifact_types.UnmanagedContainerModel,
        metadata={
            "containerSpec": {
                "imageUri": DEPLOY_IMAGE,
            },
        },
    ).after(custom_producer_task)
    
    model_upload_op = ModelUploadOp(
        project=project,
        display_name=model_display_name,
        unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
    )
    model_upload_op.after(import_unmanaged_model_task)

    endpoint_create_op = EndpointCreateOp(
        project=project,
        display_name="pipelines-created-endpoint",
    )

    model_deploy_op = ModelDeployOp(
        endpoint=endpoint_create_op.outputs["endpoint"],
        model=model_upload_op.outputs["model"],
        traffic_split={"0": 100},
        deployed_model_display_name=model_display_name,
        dedicated_resources_machine_type="n1-standard-4",
        dedicated_resources_accelerator_type=DEPLOY_GPU.name,
        dedicated_resources_accelerator_count=DEPLOY_NGPU,
        dedicated_resources_min_replica_count=1,
        dedicated_resources_max_replica_count=1,
    )

In [None]:
from kfp.v2 import compiler  # noqa: F811

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="train_upload_deploy.json"
)

The pipeline compilation generates the `image_classif_pipeline.json` job spec file.

Then, you run the defined pipeline like this: 

In [None]:
DISPLAY_NAME = MODEL_DISPLAY_NAME + "-train-pipeline"

job = aip.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="train_upload_deploy.json",
    pipeline_root=PIPELINE_ROOT,
    enable_caching=True,
)

job.run()

## Make an online prediction request

Send an online prediction request to your deployed model.

### Get test data

Download images from the CIFAR dataset and preprocess them.

#### Download the test images

Download the provided set of images from the CIFAR dataset:

In [None]:
# Download the images
! gsutil -m cp -r gs://cloud-samples-data/ai-platform-unified/cifar_test_images .

#### Preprocess the images
Before you can run the data through the endpoint, you need to preprocess it to match the format that your custom model defined in `task.py` expects.

`x_test`:
Normalize (rescale) the pixel data by dividing each pixel by 255. This replaces each single byte integer pixel with a 32-bit floating point number between 0 and 1.

`y_test`:
You can extract the labels from the image filenames. Each image's filename format is "image_{LABEL}_{IMAGE_NUMBER}.jpg"

In [None]:
import numpy as np
from PIL import Image

# Load image data
IMAGE_DIRECTORY = "cifar_test_images"

image_files = [file for file in os.listdir(IMAGE_DIRECTORY) if file.endswith(".jpg")]

# Decode JPEG images into numpy arrays
image_data = [
    np.asarray(Image.open(os.path.join(IMAGE_DIRECTORY, file))) for file in image_files
]

# Scale and convert to expected format
x_test = [(image / 255.0).astype(np.float32).tolist() for image in image_data]

# Extract labels from image name
y_test = [int(file.split("_")[1]) for file in image_files]

### Send the prediction request

Now that you have test images, you can use them to send a prediction request. Use the `Endpoint` object's `predict` function, which takes the following parameters:

- `instances`: A list of image instances. According to your custom model, each image instance should be a 3-dimensional matrix of floats. This was prepared in the previous step.

The `predict` function returns a list, where each element in the list corresponds to the corresponding image in the request. You will see in the output for each prediction:

- Confidence level for the prediction (`predictions`), between 0 and 1, for each of the ten classes.

You can then run a quick evaluation on the prediction results:
1. `np.argmax`: Convert each list of confidence levels to a label
2. Compare the predicted labels to the actual labels
3. Calculate `accuracy` as `correct/total`

In [None]:
# Get and update the endpoint definiton from the pipeline UI logs:
endpoint_resource_name = "projects/660199673046/locations/us-central1/endpoints/3099153842793611264"
endpoint = aip.Endpoint(endpoint_resource_name)


predictions = endpoint.predict(instances=x_test)
y_predicted = np.argmax(predictions.predictions, axis=1)

correct = sum(y_predicted == np.array(y_test))
accuracy = len(y_predicted)
print(
    f"Correct predictions = {correct}, Total predictions = {accuracy}, Accuracy = {correct/accuracy}"
)

## Cleaning up

You can delete the individual resources you created in this tutorial:
- Delete Cloud Storage objects that were created.  Uncomment and run the command in the cell below **only if you are not using the `PIPELINE_ROOT` path for any other purpose**.
- Delete your deployed model: first, undeploy it from its *endpoint*, then delete the model and endpoint.


In [None]:
# Warning: this command will delete ALL Cloud Storage objects under the PIPELINE_ROOT path.
# ! gsutil -m rm -r $PIPELINE_ROOT