In [1]:
from datetime import datetime

from google.cloud import aiplatform

In [2]:
REGION = "us-central1"
PROJECT_ID = !(gcloud config get-value project)
PROJECT_ID = PROJECT_ID[0]

In [3]:
# Set `PATH` to include the directory containing KFP CLI
PATH = %env PATH
%env PATH=/home/jupyter/.local/bin:{PATH}

env: PATH=/home/jupyter/.local/bin:/usr/local/cuda/bin:/opt/conda/bin:/opt/conda/condabin:/usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games:/home/jupyter/.local/bin:


In [4]:
%%writefile ./trainer_image_vertex2/Dockerfile
FROM gcr.io/deeplearning-platform-release/tf-gpu.2-8
RUN pip install -U fire cloudml-hypertune
WORKDIR /app
COPY model.py .

ENTRYPOINT ["python", "model.py"]

Overwriting ./trainer_image_vertex2/Dockerfile


In [18]:
%%writefile ./trainer_image_vertex2/model.py
import datetime
import os
import shutil
import pickle
import subprocess
import sys

import fire
import pandas as pd
import numpy as np
import tensorflow as tf
import hypertune
import numpy as np
from google.cloud import bigquery, storage
from google.oauth2 import credentials
from tensorflow.keras import Sequential
from tensorflow.keras.callbacks import TensorBoard
from tensorflow.keras.layers import (Conv1D, Dense, Dropout, Flatten, MaxPooling1D, Softmax)

AIP_MODEL_DIR = os.environ["AIP_MODEL_DIR"]
MODEL_FILENAME = "model.pkl"

def get_blob(blobs):
    for blob in blobs:
        yield blob
        
def get_image_paths(image_input_dir):
    # initialize the GCS client
    image_bucket = image_input_dir.split('/')[2]
    prefix_dir = '/'.join(image_input_dir.split('/')[3:])
    storage_client = storage.Client()
    # get the storage bucket
    bucket = storage_client.get_bucket(image_bucket)
   

    image_paths=[]
    # Note: Client.list_blobs requires at least package version 1.17.0.
    blobs = storage_client.list_blobs(image_bucket, prefix=prefix_dir)
    
    for blob in get_blob(blobs):
        if "output" in blob.name:
            image_paths.append('gs://spectrain_new/'+blob.name)
    return image_paths

def load_images(imagePath):
    # read the image from disk, decode it, convert the data type to
    # floating point, and resize it
    image = tf.io.read_file(imagePath)
    image = tf.image.decode_png(image, channels=1)
    image = tf.image.convert_image_dtype(image, dtype=tf.float32)
    image = tf.image.resize(image, (256,256))
    # parse the class label from the file path
    label = tf.strings.split(imagePath, os.path.sep)[-2]
    if label=='positive':
        label=1
    else:
        label=0
    # return the image and the label
    return (image, label)

    # return the image and the label
    return (image, label)

def load_dataset(images_dir, batch_size, training):
    filePaths = get_image_paths(image_input_dir=images_dir)
    
    ds = tf.data.Dataset.from_tensor_slices(filePaths)
    ds = (ds
        .map(load_images)
        .cache()
        .shuffle(len(filePaths))
        .batch(batch_size)
    )

    if training:
        return ds.repeat()
    else:
        return ds

def build_model(filter_size_1, filter_size_2, kernel_size, pool_kernel_size, hidden_units_1, hidden_units_2):
    model = Sequential()
    model.add(Conv1D(filter_size_1, kernel_size=kernel_size, activation='relu', input_shape=(256, 256), padding='same'))
    model.add(Conv1D(filter_size_1, kernel_size=kernel_size, activation='relu', padding='same'))
    model.add(MaxPooling1D(pool_kernel_size, padding='same'))
    model.add(Conv1D(filter_size_2, kernel_size=kernel_size,activation='relu', padding='same'))
    model.add(MaxPooling1D(pool_kernel_size, padding='same'))
    model.add(Flatten())
    model.add(Dense(hidden_units_1, activation='relu'))
    model.add(Dense(hidden_units_1, activation='relu'))
    model.add(Dense(hidden_units_2, activation='relu'))
    model.add(Dropout(0.2))
    model.add(Dense(1, activation='sigmoid'))
    
    
    model.compile(optimizer='adam',
                  loss='binary_crossentropy',
                  metrics=['AUC'],
                  run_eagerly=True)
    
    return model

    
# Instantiate the HyperTune reporting object
hpt = hypertune.HyperTune()

# Reporting callback
class HPTCallback(tf.keras.callbacks.Callback):

    def on_epoch_end(self, epoch, logs=None):
        global hpt
        hpt.report_hyperparameter_tuning_metric(
            hyperparameter_metric_tag='auc',
            metric_value=logs['val_auc'],
            global_step=epoch)
        
        
def train_and_evaluate(train_data_path,
                    eval_data_path,                
                    filt_size1,
                    filt_size2, 
                    nnsize_1,
                    nnsize_2,batch_size, hptune):
    num_epochs=20
    train_examples=5000
    eval_steps=100
    filt_size1 = int(filt_size1)
    filt_size2 = int(filt_size2)
    ksize = 4
    pool_ksize = 2
    nnsize_1 = int(nnsize_1)
    nnsize_2 = int(nnsize_2)
    batch_size = int(batch_size)
    model = build_model(filter_size_1=filt_size1, filter_size_2=filt_size2, 
                        kernel_size=ksize, pool_kernel_size=pool_ksize
                        , hidden_units_1=nnsize_1, hidden_units_2=nnsize_2)

    trainds = load_dataset(train_data_path, batch_size, training=True)

    evalds = load_dataset(eval_data_path, batch_size, training=False)
    
    
    if eval_steps:
        evalds = evalds.take(count=eval_steps)

    num_batches = batch_size * num_epochs
    steps_per_epoch = train_examples // batch_size
   

    history = model.fit(
        trainds,
        validation_data=evalds,
        epochs=num_epochs,
        steps_per_epoch=steps_per_epoch,
        verbose=2,
        callbacks=[HPTCallback()])
    
    if not hptune:
        tf.saved_model.save(obj=model, export_dir=AIP_MODEL_DIR) 
        
        print("Exported trained model to {}".format(AIP_MODEL_DIR))
    

if __name__ == "__main__":
    fire.Fire(train_and_evaluate)

Overwriting ./trainer_image_vertex2/model.py


In [19]:
#!pip install google_cloud_pipeline_components

In [4]:
IMAGE_NAME = "trainer_image_spectrain_vertex"
TAG = "latest"
TRAINING_CONTAINER_IMAGE_URI = f"gcr.io/{PROJECT_ID}/{IMAGE_NAME}:{TAG}"
TRAINING_CONTAINER_IMAGE_URI

'gcr.io/qwiklabs-asl-00-c812c3b423f2/trainer_image_spectrain_vertex:latest'

In [21]:
!gcloud builds submit --timeout 15m --tag $TRAINING_CONTAINER_IMAGE_URI trainer_image_vertex2

Creating temporary tarball archive of 4 file(s) totalling 10.3 KiB before compression.
Uploading tarball of [trainer_image_vertex2] to [gs://qwiklabs-asl-00-c812c3b423f2_cloudbuild/source/1686921239.40684-9b0ca264851b45bba3e1f461997af6ea.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/qwiklabs-asl-00-c812c3b423f2/locations/global/builds/65634dc6-adfb-4c64-9e28-083a316ce004].
Logs are available at [ https://console.cloud.google.com/cloud-build/builds/65634dc6-adfb-4c64-9e28-083a316ce004?project=469700469475 ].
----------------------------- REMOTE BUILD OUTPUT ------------------------------
starting build "65634dc6-adfb-4c64-9e28-083a316ce004"

FETCHSOURCE
Fetching storage object: gs://qwiklabs-asl-00-c812c3b423f2_cloudbuild/source/1686921239.40684-9b0ca264851b45bba3e1f461997af6ea.tgz#1686921239670924
Copying gs://qwiklabs-asl-00-c812c3b423f2_cloudbuild/source/1686921239.40684-9b0ca264851b45bba3e1f461997af6ea.tgz#1686921239670924...
/ [1 files][  2.3 KiB/  2.3 KiB]           

In [5]:
SERVING_CONTAINER_IMAGE_URI = (
    "us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-8:latest"
)

In [6]:
!rm ./pipeline_vertex/pipeline_prebuilt.py

In [7]:
BUCKET = !gcloud storage ls
BUCKET = BUCKET[-1].split("//")[-1]
BUCKET = BUCKET[:-1]

%env BUCKET={BUCKET}

env: BUCKET=spectrain_new


In [8]:
%%writefile ./pipeline_vertex/pipeline_prebuilt.py
# Copyright 2021 Google LLC

# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at

# https://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS"
# BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
"""Kubeflow Covertype Pipeline."""
import os

from google.cloud.aiplatform import hyperparameter_tuning as hpt
from google_cloud_pipeline_components.aiplatform import (
    EndpointCreateOp,
    ModelDeployOp,
    ModelUploadOp,
)
from google_cloud_pipeline_components.experimental import (
    hyperparameter_tuning_job,
)
from google_cloud_pipeline_components.experimental.custom_job import (
    CustomTrainingJobOp,
)
from kfp.v2 import dsl

PIPELINE_ROOT = os.getenv("PIPELINE_ROOT")
PROJECT_ID = os.getenv("PROJECT_ID")
REGION = os.getenv("REGION")

TRAINING_CONTAINER_IMAGE_URI = os.getenv("TRAINING_CONTAINER_IMAGE_URI")
SERVING_CONTAINER_IMAGE_URI = os.getenv("SERVING_CONTAINER_IMAGE_URI")
SERVING_MACHINE_TYPE = os.getenv("SERVING_MACHINE_TYPE", "n1-standard-4")

TRAINING_FILE_PATH = os.getenv("TRAINING_FILE_PATH")
VALIDATION_FILE_PATH = os.getenv("VALIDATION_FILE_PATH")

MAX_TRIAL_COUNT = int(os.getenv("MAX_TRIAL_COUNT", "20"))
PARALLEL_TRIAL_COUNT = int(os.getenv("PARALLEL_TRIAL_COUNT", "5"))

PIPELINE_NAME = os.getenv("PIPELINE_NAME", "covertype")
BASE_OUTPUT_DIR = os.getenv("BASE_OUTPUT_DIR", PIPELINE_ROOT)
MODEL_DISPLAY_NAME = os.getenv("MODEL_DISPLAY_NAME", PIPELINE_NAME)



@dsl.pipeline(
    name=f"{PIPELINE_NAME}-kfp-pipeline",
    description="Kubeflow pipeline that tunes, trains, and deploys on Vertex",
    pipeline_root=PIPELINE_ROOT,
)
def create_pipeline():
    worker_pool_specs = [
        {
            "machine_spec": {
                "machine_type": "n1-standard-4",
                "accelerator_type": "NVIDIA_TESLA_V100",
                "accelerator_count": 1,
            },
            "replica_count": 1,
            "container_spec": {
                "image_uri": TRAINING_CONTAINER_IMAGE_URI,
                "args": [
                    f"--train_data_path={TRAINING_FILE_PATH}",
                    f"--eval_data_path={VALIDATION_FILE_PATH}",
                    # hptune
                    "--hptune",
                ],
            },
        }
    ]

    metric_spec = hyperparameter_tuning_job.serialize_metrics(
       {"auc": "maximize"}
   )

    parameter_spec = hyperparameter_tuning_job.serialize_parameters(
       {
           
           "filt_size1": hpt.DiscreteParameterSpec(
               values=[16, 32, 64], scale=None
           ),
           "filt_size2": hpt.DiscreteParameterSpec(
               values=[8, 16, 32], scale=None
           ),
           "nnsize_1": hpt.DiscreteParameterSpec(
               values=[128, 256, 512], scale=None
           ),
           "nnsize_2": hpt.DiscreteParameterSpec(
               values=[64, 128, 256], scale=None
           ),
           "batch_size": hpt.DiscreteParameterSpec(
               values=[32, 64], scale=None
           ),
       }
   )

    hp_tuning_task = hyperparameter_tuning_job.HyperparameterTuningJobRunOp(
       display_name=f"{PIPELINE_NAME}-kfp-tuning-job",
       project=PROJECT_ID,
       location=REGION,
       worker_pool_specs=worker_pool_specs,
       study_spec_metrics=metric_spec,
       study_spec_parameters=parameter_spec,
       max_trial_count=MAX_TRIAL_COUNT,
       parallel_trial_count=PARALLEL_TRIAL_COUNT,
       base_output_directory=PIPELINE_ROOT,
   )

    trials_task = hyperparameter_tuning_job.GetTrialsOp(
        gcp_resources=hp_tuning_task.outputs["gcp_resources"]
    )

    best_hyperparameters_task = (
        hyperparameter_tuning_job.GetBestHyperparametersOp(
            trials=trials_task.output, study_spec_metrics=metric_spec
        )
    )

    # Construct new worker_pool_specs and
    # train new model based on best hyperparameters
    worker_pool_specs_task = hyperparameter_tuning_job.GetWorkerPoolSpecsOp(
        best_hyperparameters=best_hyperparameters_task.output,
        worker_pool_specs=[
            {
                "machine_spec": {"machine_type": "n1-standard-4", "accelerator_type": "NVIDIA_TESLA_V100",
                "accelerator_count": 1},
                "replica_count": 1,
                "container_spec": {
                    "image_uri": TRAINING_CONTAINER_IMAGE_URI,
                    "args": [
                        f"--train_data_path={TRAINING_FILE_PATH}",
                        f"--eval_data_path={VALIDATION_FILE_PATH}",
                        "--nohptune",
                    ],
                },
            }
        ],
    )
    
    training_task = CustomTrainingJobOp(
        project=PROJECT_ID,
        location=REGION,
        display_name=f"{PIPELINE_NAME}-kfp-training-job",
        worker_pool_specs=worker_pool_specs_task.output,
        base_output_directory=BASE_OUTPUT_DIR,
    )

    
    model_upload_task = ModelUploadOp(
        project=PROJECT_ID,
        display_name=f"{PIPELINE_NAME}-kfp-model-upload-job",
        artifact_uri=f"{BASE_OUTPUT_DIR}/model",
        serving_container_image_uri=SERVING_CONTAINER_IMAGE_URI,
    )
    model_upload_task.after(training_task)

    endpoint_create_task = EndpointCreateOp(
        project=PROJECT_ID,
        display_name=f"{PIPELINE_NAME}-kfp-create-endpoint-job",
    )
    endpoint_create_task.after(model_upload_task)

    model_deploy_op = ModelDeployOp(  # pylint: disable=unused-variable
        model=model_upload_task.outputs["model"],
        endpoint=endpoint_create_task.outputs["endpoint"],
        deployed_model_display_name=MODEL_DISPLAY_NAME,
        dedicated_resources_machine_type=SERVING_MACHINE_TYPE,
        dedicated_resources_min_replica_count=1,
        dedicated_resources_max_replica_count=1,
    )


Writing ./pipeline_vertex/pipeline_prebuilt.py


In [9]:
TRAINING_CONTAINER_IMAGE_URI

'gcr.io/qwiklabs-asl-00-c812c3b423f2/trainer_image_spectrain_vertex:latest'

In [14]:
ARTIFACT_STORE = f"gs://{BUCKET}"
PIPELINE_ROOT = f"{ARTIFACT_STORE}/pipeline"

TRAINING_FILE_PATH = f"gs://{BUCKET}/bhavani/train_images"
VALIDATION_FILE_PATH = f"gs://{BUCKET}/bhavani/valid_images"

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
#TIMESTAMP = 20230616134122
BASE_OUTPUT_DIR = f"{ARTIFACT_STORE}/models/{TIMESTAMP}"

%env PIPELINE_ROOT={PIPELINE_ROOT}
%env PROJECT_ID={PROJECT_ID}
%env REGION={REGION}
%env SERVING_CONTAINER_IMAGE_URI={SERVING_CONTAINER_IMAGE_URI}
%env TRAINING_CONTAINER_IMAGE_URI={TRAINING_CONTAINER_IMAGE_URI}
%env TRAINING_FILE_PATH={TRAINING_FILE_PATH}
%env VALIDATION_FILE_PATH={VALIDATION_FILE_PATH}
%env BASE_OUTPUT_DIR={BASE_OUTPUT_DIR}

env: PIPELINE_ROOT=gs://spectrain_new/pipeline
env: PROJECT_ID=qwiklabs-asl-00-c812c3b423f2
env: REGION=us-central1
env: SERVING_CONTAINER_IMAGE_URI=us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-8:latest
env: TRAINING_CONTAINER_IMAGE_URI=gcr.io/qwiklabs-asl-00-c812c3b423f2/trainer_image_spectrain_vertex:latest
env: TRAINING_FILE_PATH=gs://spectrain_new/bhavani/train_images
env: VALIDATION_FILE_PATH=gs://spectrain_new/bhavani/valid_images
env: BASE_OUTPUT_DIR=gs://spectrain_new/models/20230616161314


In [15]:
!gsutil ls | grep ^{ARTIFACT_STORE}/$ || gsutil mb -l {REGION} {ARTIFACT_STORE}

gs://spectrain_new/


In [16]:
PIPELINE_JSON = "covertype_kfp_pipeline.json"

In [17]:
!dsl-compile-v2 --py pipeline_vertex/pipeline_prebuilt.py --output $PIPELINE_JSON



In [18]:
from kfp.v2 import compiler

from pipeline_vertex.pipeline_prebuilt import create_pipeline

compiler.Compiler().compile(
    pipeline_func=create_pipeline, 
    package_path=PIPELINE_JSON,
)



In [19]:
aiplatform.init(project=PROJECT_ID, location=REGION)

pipeline = aiplatform.PipelineJob(
    display_name="covertype_kfp_pipeline",
    template_path=PIPELINE_JSON,
    enable_caching=True,
)

pipeline.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/469700469475/locations/us-central1/pipelineJobs/covertype-kfp-pipeline-20230616161329
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/469700469475/locations/us-central1/pipelineJobs/covertype-kfp-pipeline-20230616161329')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/covertype-kfp-pipeline-20230616161329?project=469700469475
PipelineJob projects/469700469475/locations/us-central1/pipelineJobs/covertype-kfp-pipeline-20230616161329 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/469700469475/locations/us-central1/pipelineJobs/covertype-kfp-pipeline-20230616161329 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/469700469475/locations/us-central1/pipelineJobs/covertype-kfp-pipeline-20230616161329 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/46970046