In [136]:
from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics, Dataset
from typing import NamedTuple
from google.cloud import aiplatform

In [137]:
project_id = 'qwiklabs-gcp-03-6e0d35a97dd4'
pipeline_root_path = 'gs://pipeline-tester3'

In [138]:
# INGEST THE DATA

In [139]:
@dsl.component(
    packages_to_install=['tensorflow==2.11.0', 'tensorflow-datasets', 'numpy==1.21.6']
)
def ingest_data() -> str:
    import tensorflow_datasets as tfds
    import numpy as np
    import tensorflow as tf

    #bucket = 'gs://tfds-dir3'
    #bucket = 'gs://pipeline-tester3/keras_model'
    bucket = 'gs://workbench-ron-tensor'

    (ds_train, ds_test), ds_info = tfds.load(
        'cifar10',
        split=['train', 'test'],
        shuffle_files=True,
        as_supervised=True,
        with_info=True,
    )

    def normalize_img(image, label):
        """Normalizes images: `uint8` -> `float32`."""
        return tf.cast(image, tf.float32) / 255., label

    ds_train = ds_train.map(
        normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
    ds_train = ds_train.cache()
    ds_train = ds_train.shuffle(ds_info.splits['train'].num_examples)
    ds_train = ds_train.batch(128)
    ds_train = ds_train.prefetch(tf.data.AUTOTUNE)

    ds_test = ds_test.map(
        normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
    ds_test = ds_test.batch(128)
    ds_test = ds_test.cache()
    ds_test = ds_test.prefetch(tf.data.AUTOTUNE)


   # need the "self" parameter as their is an implicit argument in the custom_shard_func
   #  that gives an error saying one arg expected but two were given
    def custom_shard_func(self, element):
        return np.int64(0)

    ds_train.save(path=bucket + "/ds_train", shard_func=custom_shard_func)
    ds_test.save(path=bucket + "/ds_test", shard_func=custom_shard_func)

    return bucket


In [140]:
# CREATE THE MODEL

In [141]:
@dsl.component(
    packages_to_install=['tensorflow==2.11.0', 'keras'],
    base_image='gcr.io/deeplearning-platform-release/tf-gpu.2-11',
)
def create_model(text: str) -> str:
    import tensorflow as tf
    from keras import models, layers, datasets, applications

    #bucket = 'gs://tfds-dir3'
    #bucket = 'gs://pipeline-tester3/keras_model'
    bucket = 'gs://workbench-ron-tensor'

    # check for GPU:
    print('\n\n GPU name: ', tf.config.experimental.list_physical_devices('GPU'))
    print('\n\n')

    #Multi GPU strategy
    strategy = tf.distribute.MirroredStrategy()

    with strategy.scope():
        model = models.Sequential()
        model.add(layers.Conv2D(32, (3, 3), activation='relu', input_shape=(32, 32, 3)))
        model.add(layers.MaxPooling2D((2, 2)))
        model.add(layers.Conv2D(64, (3, 3), activation='relu'))
        model.add(layers.MaxPooling2D((2, 2)))
        model.add(layers.Conv2D(64, (3, 3), activation='relu'))
        model.add(layers.Flatten())
        model.add(layers.Dense(64, activation='relu'))
        model.add(layers.Dense(10))

    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

    print('\n\n' + str(model.summary()) + '\n\n')
    model.save(bucket + "/untrained-model")
    return "model saved:" + bucket



In [142]:
# TRAIN THE MODEL

In [145]:
@dsl.component(
    packages_to_install=['tensorflow==2.11.0', 'tensorflow-datasets'],
    base_image='gcr.io/deeplearning-platform-release/tf-gpu.2-11',
)
def train_model(text: str) -> str:
    import tensorflow_datasets as tfds
    import tensorflow as tf


    # check for GPU:
    print('\n\n GPU name: ', tf.config.experimental.list_physical_devices('GPU'))
    print('\n\n')

    #Storage buckets
    # bucket = 'gs://tfds-dir3'
    # bucket = 'gs://pipeline-tester3/keras_model'
    bucket = 'gs://workbench-ron-tensor'

    #Multi GPU strategy
    strategy = tf.distribute.MirroredStrategy()

   

    # load data from gcs bucket
    ds_train = tf.data.Dataset.load(bucket + "/ds_train")
    ds_test = tf.data.Dataset.load(bucket + "/ds_test")

    print("\n\n finish loading training data")
    print("\n\n finish loading test data \n\n")

    # load model from gcs
    model = tf.keras.models.load_model(bucket + '/untrained-model')

    # Create and Train the model
    model.compile(optimizer=tf.keras.optimizers.Adam(0.001),
                  loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                  metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

    history = model.fit(
    ds_train,
    epochs=30,
    validation_data=ds_test,
    )

    print("\n\n history" + str(history))

    #Save the model
    model.save(bucket + "/keras_ model")

    return "model trained"


In [146]:
# CREATE THE PIPELINE

In [147]:
# RUNNING THE TRAIN_MODEL COMPONENT TWICE WITH **TWO DIFFERENT GPU CONFIGURATIONS** TO SHOW 
# THE DIFFERENCE IN TRAINING TIME THAT HARDWARE ACCELERATORS CAN GIVE


@dsl.pipeline(
    name='simple-pipeline',
    description='testing pipeline',
    pipeline_root=pipeline_root_path
)
def workbench_cnn_pipeline(    
    bucket: str = 'gs://workbench-ron-tensor',
    project: str = 'tensor-1-1',
):
    ingestion_task = ingest_data()
    create_model_task = create_model(text=ingestion_task.output).set_accelerator_type('NVIDIA_TESLA_V100').set_cpu_limit('4').set_memory_limit('16G').set_accelerator_limit(4)
    train_model_task_4_V100_GPUs = train_model(text=create_model_task.output)\
        .set_accelerator_type('NVIDIA_TESLA_V100')\
        .set_cpu_limit('4')\
        .set_memory_limit('16G')\
        .set_accelerator_limit(4)\
        .set_display_name('4 x V100 GPUS ')
    train_model_task_2_V100_GPUs = train_model(text=create_model_task.output)\
        .set_accelerator_type('NVIDIA_TESLA_V100')\
        .set_cpu_limit('4')\
        .set_memory_limit('16G')\
        .set_accelerator_limit(2)\
        .set_display_name('2 x V100 GPUS ')

In [148]:
# COMPILE THE PIPELINE TO CREATE THE PIPELINE JSON

In [158]:
if __name__ == '__main__':
    compiler.Compiler().compile(
        pipeline_func=workbench_cnn_pipeline,
        package_path='/home/jupyter/tensor project/tensorflow/workbench_demo/workbench_cnn_pipeline.json'
    )

In [159]:
aiplatform.init

<bound method _Config.init of <google.cloud.aiplatform.initializer._Config object at 0x7fde091b5710>>

In [161]:

job = aiplatform.PipelineJob(display_name = 'workbench_cnn_pipeline',
                             template_path = '/home/jupyter/tensor project/tensorflow/workbench_demo/workbench_cnn_pipeline.json',
                             enable_caching=True,
                             )

job.submit()

Creating PipelineJob
PipelineJob created. Resource name: projects/10566138111/locations/us-central1/pipelineJobs/simple-pipeline-20230424181553
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/10566138111/locations/us-central1/pipelineJobs/simple-pipeline-20230424181553')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/simple-pipeline-20230424181553?project=10566138111


In [162]:
# ADD MODEL TO MODEL REGISTRY

In [167]:
#Storage buckets
# bucket = 'gs://tfds-dir3'
# bucket = 'gs://pipeline-tester3'
bucket = 'gs://workbench-ron-tensor'
model_uri = bucket + "/keras_ model"
    
my_model = aiplatform.Model.upload(display_name='keras-model', 
                                   artifact_uri=model_uri, 
                                   serving_container_image_uri= 'us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-11:latest')

#Output models ID and modely registry uri


Creating Model
Create Model backing LRO: projects/10566138111/locations/us-central1/models/5538945955572744192/operations/8289288324389011456
Model created. Resource name: projects/10566138111/locations/us-central1/models/5538945955572744192@1
To use this Model in another session:
model = aiplatform.Model('projects/10566138111/locations/us-central1/models/5538945955572744192@1')


In [None]:
# DEPLOY MODEL TO ENDPOINT

In [168]:
# This aiplatform.Model("projects/{PROJECT_NUMBER}/locations/us-central1/models/{MODEL_ID}") 
#is the output from the above cmd

my_model = aiplatform.Model('projects/10566138111/locations/us-central1/models/5538945955572744192@1')


endpoint = my_model.deploy(
     deployed_model_display_name='keras-endpoint',
     traffic_split={"0": 100},
     machine_type="n1-standard-4",
     accelerator_count=0,
     min_replica_count=1,
     max_replica_count=1,
   )

#Outputs Endpoint name to use for predictions

Creating Endpoint
Create Endpoint backing LRO: projects/10566138111/locations/us-central1/endpoints/2186968210065063936/operations/2861324873500721152
Endpoint created. Resource name: projects/10566138111/locations/us-central1/endpoints/2186968210065063936
To use this Endpoint in another session:
endpoint = aiplatform.Endpoint('projects/10566138111/locations/us-central1/endpoints/2186968210065063936')
Deploying model to Endpoint : projects/10566138111/locations/us-central1/endpoints/2186968210065063936
Deploy Endpoint model backing LRO: projects/10566138111/locations/us-central1/endpoints/2186968210065063936/operations/8135040037151571968
Endpoint model deployed. Resource name: projects/10566138111/locations/us-central1/endpoints/2186968210065063936


In [None]:
# GET PREDICTIONS FROM DEPLOYED LOCAL MODEL

In [172]:
from google.cloud import aiplatform

import numpy as np
from PIL import Image
import base64
from typing import List, Dict
import argparse
import io
import tensorflow as tf


path_image = "/home/jupyter/tensor project/tensorflow/workbench_demo/test_images/image_0_6.jpg"

img = tf.keras.utils.load_img(
    path_image, target_size=(32, 32))
img_array = tf.keras.utils.img_to_array(img)
img_array = tf.expand_dims(img_array, 0)

model = tf.keras.models.load_model(bucket + "/keras_ model")
predictions = model.predict(img_array)
score = tf.nn.softmax(predictions[0])

class_names = ['airplane', 'automobile', 'bird', 'cat', 'deer',
               'dog', 'frog', 'horse', 'ship', 'truck']

print(
    "The local model predicts this image most likely belongs to {} with a {:.2f} percent confidence."
    .format(class_names[np.argmax(score)], 100 * np.max(score))
)

print(bucket)


The local model predicts this image most likely belongs to airplane with a 100.00 percent confidence.
gs://workbench-ron-tensor


In [None]:
# GET PREDICTIONS FROM MODEL DEPLOYED AT ENDPOINT

In [173]:
from google.cloud import aiplatform

import numpy as np
from PIL import Image
import base64
from typing import List, Dict
import argparse
import io
import tensorflow as tf

endpoint = aiplatform.Endpoint(
    endpoint_name='projects/10566138111/locations/us-central1/endpoints/2186968210065063936')

path_image = "/home/jupyter/tensor project/tensorflow/workbench_demo/test_images/image_0_3.jpg"

class_names = ['airplane', 'automobile', 'bird', 'cat', 'deer',
               'dog', 'frog', 'horse', 'ship', 'truck']


image_data = [np.asarray(Image.open(path_image))]
test_image = [(image_data[0] / 255.0).astype(np.float32).tolist()]

endpoint_prediction = endpoint.predict(instances=test_image)
endpoint_score = tf.nn.softmax(predictions[0])

print("\n" + "\n")
print(
    "The Vertex AI Endpoint predicts this image most likely belongs to {} with a {:.2f} percent confidence."
    .format(class_names[np.argmax(endpoint_score)], 100 * np.max(endpoint_score))
)





The Vertex AI Endpoint predicts this image most likely belongs to airplane with a 100.00 percent confidence.
